Hey Dong,

I appreciate your thoughtful responses. Let's do one more round :-)


> Here are my current concern with the three alternatives you described
> earlier:
> - The first alternative requires support from input system which is
> currently not available. It will limit the usage of partition expansion to
> only systems that support such interface. And it is not guaranteed that we
> can persuade the developer of the input system to add this interface. This
> is not desirable for Samza in the long term.

Agreed. It is very wishful thinking that each supported system would
provide such a contract.


>
> - I can not comment on the second alternative because I don't understand
> how it reshuffles all existing changelog data. We can discuss more if there
> is more specific detail. My gut feel is that this will be complex and
> carries performance overhead.

After giving this more thought, I agree. There is no clear way to migrate a
changelog without knowing the original key->partition mapping. Which leads
us to alternative 3...


>
> - The third alternative requires performance overhead. Given that user can
> already use this solution to enable partition expansion, maybe Samza
> developers can provide more input as to why we are not doing it by default.
> My gut feel is that it carries considerable performance overhead and
> increases the cost-to-serve Samze job (e.g. disk usage), which may make it
> undesirable in the long term.

I think the only performance overhead would be the mandatory repartitioning
stage for stateful jobs. But a repartitioner is usually much faster than
the downstream stateful job, so it only seems a cost to serve issue.

As for why we aren't already doing this, I would posit that before the
introduction of the high level API, which trivializes repartitioning, it
was unreasonable to expect each job owner to do the mandatory
repartitioning. With the high level API, I would argue this is much more
doable.


I am not sure it is true that "any future feature that utilizes this
> mapping without accounting for the assumptions of this SEP is likely to
> malfunction". Suppose we allow user to specify new-to-old-partition
> mapping, then we can use the partition-to-task mapping correctly without
> replying on the assumption in this SEP, right?

Right, but my point was that the partition->task mapping is not sufficient
by itself. So adding it by itself is potentially misleading.

On Mon, Jun 12, 2017 at 8:34 PM, Dong Lin <lindon...@gmail.com> wrote:

> Thanks for the reply Jacob. Please see my comment inline.
>
> On Mon, Jun 12, 2017 at 7:51 PM, Jacob Maes <jacob.m...@gmail.com> wrote:
>
> > >
> > > - For users that need partition expansion of the input streams for
> > stateful
> > > job, they have a really big headache in the sense that Samza does not
> > allow
> > > partition expansion for stateful job. SEP-5 addresses this headache for
> > > them.
> > > You are right that SEP-5 requires user to understand and enforce
> > > limitations across organizations. But it is still much better than not
> > > allowing user to expansion partition for stateful jobs at all, right?
> > Did I
> > > miss something here?
> >
> > I guess this one is a matter of perspective.
> >
> > One argument is that if the system supports one case, it's better than
> none
> > because there is one less scenario in which the system does the wrong
> > thing.
> >
> > The counter argument is for uniform and consistent behavior, which is
> easy
> > for users to understand and properly leverage.
> >
> > Specifically, I'd argue that the current rule is very simple: "you cannot
> > repartition inputs on a stateful job, so you must over-partition the
> > initial implementation". To me, while that rule is not ideal, its
> > simplicity is better that introducing a new solution that has a bunch of
> > caveats, any one of which could be missed. If any one of the assumptions
> in
> > this SEP design are violated, the job would behave incorrectly. That
> puts a
> > lot more burden on the users than the simpler rule.
> >
>
> I agree that we have different perspective here. It is true that user would
> mess up their job if they used this feature in a wrong way, i.e. violate
> the assumption made in SEP-5. On the other hand, I think there is always a
> way for user to mess up their job if they configure the Samza job
> incorrectly. I also think the assumption made in this SEP is not
> particularly harder to understand than other existing configs in Samza.
>
> The answer to this can be subjective. I would love to hear perspective from
> other developers on this issue.
>
>
> >
> > That's why I mentioned a few alternatives that, while more complex to
> > implement, would provide a more consistent behavior with simple rules for
> > the users.
> >
>
> I am open to discuss alternative solutions that can address the the problem
> in a better manner. I am not opposed to complexity as long as it gives us
> good long term benefits.
>
> Here are my current concern with the three alternatives you described
> earlier:
>
> - The first alternative requires support from input system which is
> currently not available. It will limit the usage of partition expansion to
> only systems that support such interface. And it is not guaranteed that we
> can persuade the developer of the input system to add this interface. This
> is not desirable for Samza in the long term.
>
> - I can not comment on the second alternative because I don't understand
> how it reshuffles all existing changelog data. We can discuss more if there
> is more specific detail. My gut feel is that this will be complex and
> carries performance overhead.
>
> - The third alternative requires performance overhead. Given that user can
> already use this solution to enable partition expansion, maybe Samza
> developers can provide more input as to why we are not doing it by default.
> My gut feel is that it carries considerable performance overhead and
> increases the cost-to-serve Samze job (e.g. disk usage), which may make it
> undesirable in the long term.
>
>
>
> >
> > Yes, we need a similar check for GroupBySystemStreamPartitionWi
> > > thFixedTaskNum
> > > as well. If there is more grouper classes needed in the future, we can
> > > solve this problem cleanly without new config. Given the
> > > previousGrouperClass and newGrouperClass, KafkaCheckpointLogKey will
> > throw
> > > exception if and only if newGrouperClass is an instance of
> > > previousGrouperClass.
> > > GroupBySystemStreamPartitionWithFixedTaskNum should extend
> > > GroupBySystemStreamPartition
> > > and GroupByPartitionWithFixedTaskNum should extend GroupByPartition.
> > Does
> > > this address your concern?
> >
> > Sounds workable, thanks.
> >
> > >
> > > Can
> > > you be more specific why Partition-to-task mapping is not meaningful
> > > without
> > > some definition of the key-to-partition assignments and why it is
> > > incomplete and misleading?
> >
> >  A partition is (in my naive interpretation) an independent queue for
> > messages of a particular key set. It is not the *identity* of the
> partition
> > that determine the contents of the associated task's local state. Rather
> it
> > is the *contents* of the partition that affect the task's state. A
> > partiton-to-task mapping only captures an identity relationship:
> > partition1->task1. Without the assumptions of this SEP, this is
> > insufficient to determine the assignment of keys to tasks, which is what
> > really matters. Therefore, any future feature that utilizes this mapping
> > without accounting for the assumptions of this SEP is likely to
> > malfunction.
> >
> >
> I am not sure it is true that "any future feature that utilizes this
> mapping without accounting for the assumptions of this SEP is likely to
> malfunction". Suppose we allow user to specify new-to-old-partition
> mapping, then we can use the partition-to-task mapping correctly without
> replying on the assumption in this SEP, right?
>
>
> >
> > On Mon, Jun 12, 2017 at 5:09 PM, Dong Lin <lindon...@gmail.com> wrote:
> >
> > > Hey Jacob,
> > >
> > > Thanks for the explanation. It seems that your biggest concern is with
> > the
> > > generality of the proposal. Let me try to address this and other
> comments
> > > below.
> > >
> > > 1) ... it will cause headaches for Samza users ...
> > >
> > > I am not sure I understand why this proposal causes headache for Samza
> > > users. Here is the impact of the SEP-5 on users:
> > >
> > > - For users that do not need partition expansion of the input stream,
> > they
> > > can use Samza without change change in code/binary/config. Thus there
> is
> > no
> > > headache for them.
> > >
> > > - For users that need partition expansion of the input streams for
> > > stateless job, they currently need to manually reboot their Samza job
> in
> > > order to let Samza consume the new partitions created for the stream.
> > SEP-5
> > > actually reduced their headache by allowing Samza to automatically
> detect
> > > and consume new partitions.
> > >
> > > - For users that need partition expansion of the input streams for
> > stateful
> > > job, they have a really big headache in the sense that Samza does not
> > allow
> > > partition expansion for stateful job. SEP-5 addresses this headache for
> > > them.
> > >
> > > You are right that SEP-5 requires user to understand and enforce
> > > limitations across organizations. But it is still much better than not
> > > allowing user to expansion partition for stateful jobs at all, right?
> > Did I
> > > miss something here?
> > >
> > > 2) ... Separate orgs are often difficult to coordinate and a system
> which
> > > depends on such significant process/coordination is too fragile for my
> > > taste ..
> > >
> > > This is true. Ideally we want a system that is fully self-serving. I
> > think
> > > this is a long term goal for Samza. Still, for the reasons described
> > above,
> > > I think something is better than nothing. I am open to alternative
> design
> > > that can support partition expansion for stateful jobs without
> requiring
> > > coordination.
> > >
> > > 3) There is currently no supported way of sharing state among the tasks
> > of
> > > a container.  Each task has its own isolated store and that logical
> > > isolation is the primary thing that enables Samza jobs to scale with a
> > > simple container count change. My feeling is that we should not change
> > > this without
> > > good reason.
> > >
> > > I see your point. I will remove this sentence from the motivation
> > section.
> > > This won't have any impact on the design of the SEP-5. Does this
> address
> > > the problem?
> > >
> > > 4) With the current proposal, we'd also need a similar check for
> > > GroupBySystemStreamPartitionWithFixedTaskNum as well. And if any other
> > > groupers
> > > were later added with both these modes, we'd probably need to add those
> > > too. It might be easier and cleaner to add a config to ignore that
> check
> > > temporarily. Down side is that it further complicates the Samza config,
> > > which is already huge. Thoughts?
> > >
> > > Yes, we need a similar check for GroupBySystemStreamPartitionWi
> > > thFixedTaskNum
> > > as well. If there is more grouper classes needed in the future, we can
> > > solve this problem cleanly without new config. Given the
> > > previousGrouperClass and newGrouperClass, KafkaCheckpointLogKey will
> > throw
> > > exception if and only if newGrouperClass is an instance of
> > > previousGrouperClass.
> > > GroupBySystemStreamPartitionWithFixedTaskNum should extend
> > > GroupBySystemStreamPartition
> > > and GroupByPartitionWithFixedTaskNum should extend GroupByPartition.
> > Does
> > > this address your concern?
> > >
> > > 5) The task-to-container and container-to-host mappings are both
> > meaningful
> > > in context of the JobModel. Partition-to-task mapping is not meaningful
> > > without
> > > some definition of the key-to-partition assignments. It's incomplete
> > > information and therefore misleading. I think it only makes sense to
> use
> > > this mapping if we adopt a solution wherein Samza also knows the
> > partition
> > > key assignment.
> > >
> > > Partition-to-task is currently explicitly passed from job coordinator
> to
> > > each task as part of the job model to tell tasks which partitions to
> > > consume from. I think we can store some definition of the
> > key-to-partition
> > > assignments if Samza decides to get and use this information in the
> > > future. Can
> > > you be more specific why Partition-to-task mapping is not meaningful
> > > without
> > > some definition of the key-to-partition assignments and why it is
> > > incomplete and misleading?
> > >
> > >
> > > Thanks,
> > > Dong
> > >
> > > On Mon, Jun 12, 2017 at 3:54 PM, Jacob Maes <jacob.m...@gmail.com>
> > wrote:
> > >
> > > > Hey Dong,
> > > >
> > > > I'm opposed (or a +0, at best) to this limited, Kafka-specific
> > solution.
> > > I
> > > > understand that the proposal is relatively simple to implement, but I
> > > think
> > > > it will cause headaches for Samza users. They will not only have to
> > > > understand all the limitations (increase only, double partitions
> only,
> > > > partition using hash+modulo, etc) of this approach, but enforcing
> these
> > > > limitations can be a major problem, especially when the Samza jobs
> and
> > > > message brokers are managed by separate orgs in a company. Separate
> > orgs
> > > > are often difficult to coordinate and a system which depends on such
> > > > significant process/coordination is too fragile for my taste.
> > > >
> > > > That said, I realize that my opinion is just one of many in the
> broader
> > > > community which may feel differently, so let me respond to some of
> the
> > > > other items in the discussion so we can clear them up:
> > > >
> > > > The task-to-container assignment matters because if the correlated
> > tasks
> > > > > (i.e. tasks that consume messages with the same key) needs to be in
> > the
> > > > > same container so that they can share the same key/value local
> store
> > on
> > > > the
> > > > > same physical machine.
> > > >
> > > > There is currently no supported way of sharing state among the tasks
> > of a
> > > > container.  Each task has its own isolated store and that logical
> > > isolation
> > > > is the primary thing that enables Samza jobs to scale with a simple
> > > > container count change. My feeling is that we should not change this
> > > > without good reason.
> > > >
> > > > I think we can hardcode new logic in KafkaCheckpointLogKey.scala such
> > > that
> > > > > exception will not be thrown if new grouper is
> > > > > GroupByPartitionWithFixedTaskNum and old grouper is
> > GroupByPartition.
> > > > Does
> > > > > this look reasonable?
> > > >
> > > > With the current proposal, we'd also need a similar check for
> > > > GroupBySystemStreamPartitionWithFixedTaskNum as well. And if any
> other
> > > > groupers were later added with both these modes, we'd probably need
> to
> > > add
> > > > those too. It might be easier and cleaner to add a config to ignore
> > that
> > > > check temporarily. Down side is that it further complicates the Samza
> > > > config, which is already huge. Thoughts?
> > > >
> > > > I think storing the previous task-to-partition mapping is more
> general
> > > than
> > > > > storing the partition count of all topics for the following
> reasons:
> > > > > - Samza already stores the task-to-container mapping and
> > > > container-to-host
> > > > > mapping in the coordinator stream. It seems consistent to also
> store
> > > the
> > > > > partition-to-task mapping. And this information may be useful for
> > other
> > > > > use-case such as debugging.
> > > > > - By having the new interface take the previous task-to-partition
> > > > > assignment instead of a topic-to-partition-count mapping as new
> > > > parameter,
> > > > > we can potentially have grouper implementation to support other
> types
> > > of
> > > > > input systems.
> > > > > - It is sightly simpler to store the task-to-partition assignment
> > > because
> > > > > we don't need to know whether this is the first time a job is
> started
> > > or
> > > > > not. On the other hand, you can write topic-to-partition-count
> > mapping
> > > to
> > > > > the coordinator stream only if this is the first time the job is
> run
> > > >
> > > > The task-to-container and container-to-host mappings are both
> > meaningful
> > > in
> > > > context of the JobModel. Partition-to-task mapping is not meaningful
> > > > without some definition of the key-to-partition assignments. It's
> > > > incomplete information and therefore misleading. I think it only
> makes
> > > > sense to use this mapping if we adopt a solution wherein Samza also
> > knows
> > > > the partition key assignment.
> > > >
> > > > -Jake
> > > >
> > > > On Tue, Jun 6, 2017 at 11:06 PM, Dong Lin <lindon...@gmail.com>
> wrote:
> > > >
> > > > > Hey Jacob,
> > > > >
> > > > > Thanks for taking time to review the SEP.
> > > > >
> > > > > I agree with you and Navina that the current SEP doesn't provide
> > > support
> > > > to
> > > > > arbitrary input systems and it doesn't support partition shrink. I
> > > think
> > > > > the scope of this SEP is to support partition expansion for Kafka
> > (the
> > > > most
> > > > > widely used input system of Samza) and keep the door open for
> > partition
> > > > > support of various input systems. The current design can support
> any
> > > > system
> > > > > that meets the two operational requirement specified in the doc.
> > > > >
> > > > > While it is possible to support more types of input systems, it
> will
> > > > likely
> > > > > add more complexity to the design. For example, the first
> alternative
> > > > > solution from you requires broker-side support to negotiate hash
> > > > algorithm.
> > > > > The second alternative solution requires changelog partition
> > reshuffle
> > > > > which carries its own design complexity and performance overhead.
> > There
> > > > is
> > > > > tradeoff between the generality and the complexity among these
> > > choices. I
> > > > > like the current design because it is simple and addresses a big
> > usage
> > > > > scenario for us. We can add more complexity to generalize the
> design
> > if
> > > > it
> > > > > enables important use-case. Does this sound reasonable?
> > > > >
> > > > > Note that the "Rejected Alternative" section also mentions the
> > > > possibility
> > > > > of supporting a wider range of input systems by allowing user to
> > > specify
> > > > > the new-partition to old-partition mapping. We are not doing it
> > because
> > > > 1)
> > > > > we may have better understanding of the design after we have a
> > specific
> > > > > second input system to support 2) the current design can be
> extended
> > to
> > > > > support general input systems. I think similar argument can be
> > applied
> > > > > explain why we don't have to support general input systems using
> the
> > > > > potentially-good alternatives you mentioned.
> > > > >
> > > > > I hope SEP-5 can be an important first-step towards supporting
> > > partition
> > > > > expansion of any input system.
> > > > >
> > > > > To answer your questions about the current proposal:
> > > > >
> > > > > >1. "An alternative solution is to allow task number to increase
> > after
> > > > > >partition expansion and uses a proper task-to-container assignment
> > to
> > > > make
> > > > > >sure the Samza output is correct." What does the container have to
> > do
> > > > with
> > > > > >stateful processing or output in general?
> > > > >
> > > > > The task-to-container assignment matters because if the correlated
> > > tasks
> > > > > (i.e. tasks that consume messages with the same key) needs to be in
> > the
> > > > > same container so that they can share the same key/value local
> store
> > on
> > > > the
> > > > > same physical machine.
> > > > >
> > > > > >2. When you use "Join" as an example, you basically mean multiple
> > > > > >co-partitioned streams, right? This is opposed to multiple,
> > > > > >independently-partitioned streams or a single stream. Would be
> nice
> > to
> > > > > >formulate the proposal in these more general terms.
> > > > >
> > > > > I thought "join" is a commonly used to refer to the join opeartion
> > with
> > > > > co-partitioned stream but I may be wrong. I have updated the wiki
> to
> > > > > explicitly mention "co-partitioned stream". Does this look better
> > now?
> > > > >
> > > > > >3. When switching SSP groupers, how will the users avoid the
> > > > > >org.apache.samza.checkpoint.kafka.DifferingSystemStreamPartition
> > > > > GrouperFactoryValues
> > > > > >exception?
> > > > >
> > > > > I think we can hardcode new logic in KafkaCheckpointLogKey.scala
> such
> > > > that
> > > > > exception will not be thrown if new grouper is
> > > > > GroupByPartitionWithFixedTaskNum and old grouper is
> > GroupByPartition.
> > > > Does
> > > > > this look reasonable?
> > > > >
> > > > > >4. Partition to task assignment is meaningless without key to
> > > partition
> > > > > >mapping. The real semantics are captured in the external
> requirement
> > > for
> > > > > >partitioning via hash+modulo. But in that case, iiuc, only the
> > > partition
> > > > > >count matters. So why not just store the original partition count
> > > rather
> > > > > >than the whole mapping?
> > > > >
> > > > > I think storing the previous task-to-partition mapping is more
> > general
> > > > than
> > > > > storing the partition count of all topics for the following
> reasons:
> > > > >
> > > > > - Samza already stores the task-to-container mapping and
> > > > container-to-host
> > > > > mapping in the coordinator stream. It seems consistent to also
> store
> > > the
> > > > > partition-to-task mapping. And this information may be useful for
> > other
> > > > > use-case such as debugging.
> > > > >
> > > > > - By having the new interface take the previous task-to-partition
> > > > > assignment instead of a topic-to-partition-count mapping as new
> > > > parameter,
> > > > > we can potentially have grouper implementation to support other
> types
> > > of
> > > > > input systems.
> > > > >
> > > > > - It is sightly simpler to store the task-to-partition assignment
> > > because
> > > > > we don't need to know whether this is the first time a job is
> started
> > > or
> > > > > not. On the other hand, you can write topic-to-partition-count
> > mapping
> > > to
> > > > > the coordinator stream only if this is the first time the job is
> run
> > > > >
> > > > > Thanks,
> > > > > Dong
> > > > >
> > > > > On Mon, Jun 5, 2017 at 11:17 AM, Jacob Maes <jacob.m...@gmail.com>
> > > > wrote:
> > > > >
> > > > > > Hey Dong,
> > > > > >
> > > > > > Thanks for the SEP. Supporting partition changes is critically
> > > > important
> > > > > > for stateful Samza jobs, so it's great to see some ideas on that
> > > front!
> > > > > >
> > > > > > Sorry for the late feedback, but I have a few thoughts to
> > contribute.
> > > > > >
> > > > > > Big +1 on Navina's comment:
> > > > > >
> > > > > > > My biggest gripe with this SEP is that it seems like a
> > tailor-made
> > > > > > > solution
> > > > > > > that relies on the semantics of the Kafka system and yet, we
> are
> > > > trying
> > > > > > to
> > > > > > > masquerade that as operational requirements for other systems
> > > > > interacting
> > > > > > > with Samza. (Not to say that this is the first time such a
> choice
> > > is
> > > > > > being
> > > > > > > made in the Samza design). I am not seeing how this can a
> > "general"
> > > > > > > solution for all input systems. That's my two cents. I would
> like
> > > to
> > > > > hear
> > > > > > > alternative points of view for this from other devs.
> > > > > >
> > > > > >
> > > > > > Two examples of this:
> > > > > > 1. This is mostly a hypothetical, but some message brokers may
> use
> > > key
> > > > > > range assignment rather than hash+modulo.
> > > > > > 2. Kafka can't reduce the number of partitions, but it can happen
> > on
> > > > > other
> > > > > > systems. For example, it may be cheaper to reduce the number of
> > > > > partitions
> > > > > > on a hosted service where the cost model depends on the number of
> > > > > > partitions/shards.
> > > > > >
> > > > > > It seems to me that a solution which doesn't depend on partition
> > key
> > > > > > assignment in the message broker. Here are a few alternatives
> that
> > > > > weren't
> > > > > > discussed and I think should be considered:
> > > > > >
> > > > > > Alternatives in order of increasing preference:
> > > > > > 1. Samza manages the partition hash (via some new contract with
> the
> > > > > > brokers) and guarantees correct routing of keys among the new
> > > > partitions.
> > > > > > 2. Samza detects a task count change, creates a new changelog
> with
> > > > > correct
> > > > > > partitions, and *somehow* reshuffles all existing changelog data
> > into
> > > > the
> > > > > > new topic and then uses the new topic from then on. (doesn't work
> > > > without
> > > > > > changelog, but in that case durability isn't paramount, so we can
> > > just
> > > > > > wipe)
> > > > > > 3. Use RPC in between stages and samza fully manages key
> assignment
> > > > among
> > > > > > tasks. No on-disk topic data to clean up. Mandatory
> repartitioning
> > in
> > > > the
> > > > > > first stage to pre-scaled tasks in next stage.
> > > > > > 4. Combined 2-3 solution
> > > > > >
> > > > > > Finally, some questions about the current proposal:
> > > > > > 1. "An alternative solution is to allow task number to increase
> > after
> > > > > > partition expansion and uses a proper task-to-container
> assignment
> > to
> > > > > make
> > > > > > sure the Samza output is correct." What does the container have
> to
> > do
> > > > > with
> > > > > > stateful processing or output in general?
> > > > > > 2. When you use "Join" as an example, you basically mean multiple
> > > > > > co-partitioned streams, right? This is opposed to multiple,
> > > > > > independently-partitioned streams or a single stream. Would be
> nice
> > > to
> > > > > > formulate the proposal in these more general terms.
> > > > > > 3. When switching SSP groupers, how will the users avoid the
> > > > > > org.apache.samza.checkpoint.kafka.DifferingSystemStreamParti
> > > > > > tionGrouperFactoryValues
> > > > > > exception?
> > > > > > 4. Partition to task assignment is meaningless without key to
> > > partition
> > > > > > mapping. The real semantics are captured in the external
> > requirement
> > > > for
> > > > > > partitioning via hash+modulo. But in that case, iiuc, only the
> > > > partition
> > > > > > count matters. So why not just store the original partition count
> > > > rather
> > > > > > than the whole mapping?
> > > > > >
> > > > > > -Jake
> > > > > >
> > > > > > On Sun, Jun 4, 2017 at 12:32 PM, Dong Lin <lindon...@gmail.com>
> > > wrote:
> > > > > >
> > > > > > > Hey Yi, Navina,
> > > > > > >
> > > > > > > I have updated the SEP-5 document based on our discussion. The
> > > > > difference
> > > > > > > can be found here
> > > > > > > <https://cwiki.apache.org/confluence/pages/
> > > diffpagesbyversion.action
> > > > ?
> > > > > > > pageId=70255476&selectedPageVersions=14&selectedPageVersions
> > =15>.
> > > > > > > Here is the summary of changes:
> > > > > > >
> > > > > > > - Add new interface that extends the existing interface
> > > > > > > SystemStreamPartitionGrouper. Newly-added grouper class should
> > > > > implement
> > > > > > > this interface.
> > > > > > > - Explained in the Rejected Alternative Section why we don't
> add
> > > new
> > > > > > method
> > > > > > > in the existing interface
> > > > > > > - Explained in the Rejected Alternative Section why we don't
> > > > > config/class
> > > > > > > for user to specify new-partition to old-partition mapping.
> > > > > > >
> > > > > > > Can you take another look at the proposal and let me know if
> > there
> > > is
> > > > > any
> > > > > > > concern?
> > > > > > >
> > > > > > > Cheers,
> > > > > > > Dong
> > > > > > >
> > > > > > >
> > > > > > > On Thu, Jun 1, 2017 at 12:58 AM, Dong Lin <lindon...@gmail.com
> >
> > > > wrote:
> > > > > > >
> > > > > > > > Hey Yi,
> > > > > > > >
> > > > > > > > Thanks much for the comment. I have updated the doc to
> address
> > > all
> > > > > your
> > > > > > > > comments except the one related to the interface. I am not
> > sure I
> > > > > > > > understand your suggestion of the new interface. Will discuss
> > > > > tomorrow.
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > > Dong
> > > > > > > >
> > > > > > > > On Wed, May 31, 2017 at 4:29 PM, Yi Pan <nickpa...@gmail.com
> >
> > > > wrote:
> > > > > > > >
> > > > > > > >> Hi, Don,
> > > > > > > >>
> > > > > > > >> Thanks for the detailed design doc for a long-waited feature
> > in
> > > > > Samza!
> > > > > > > >> Really appreciate it! I did a quick pass and have the
> > following
> > > > > > > comments:
> > > > > > > >>
> > > > > > > >> - minor: "limit the maximum size of partition" ==> "limit
> the
> > > > > maximum
> > > > > > > size
> > > > > > > >> of each partition"
> > > > > > > >> - "However, Samza currently is not able to handle partition
> > > > > expansion
> > > > > > of
> > > > > > > >> the input streams"==>better point out "for stateful jobs".
> For
> > > > > > stateless
> > > > > > > >> jobs, simply bouncing the job now can pick up the new
> > > partitions.
> > > > > > > >> - "it is possible (e.g. with Kafka) that messages with a
> given
> > > key
> > > > > > > exists
> > > > > > > >> in both partition 1 an 3. Because GroupByPartition will
> assign
> > > > > > > partition 1
> > > > > > > >> and 3 to different tasks, messages with the same key may be
> > > > handled
> > > > > by
> > > > > > > >> different task/container/process and their state will be
> > stored
> > > in
> > > > > > > >> different changelog partition." The problem statement is not
> > > super
> > > > > > clear
> > > > > > > >> here. The issues with stateful jobs is: after
> GroupByPartition
> > > > > assign
> > > > > > > >> partition 1 and 3 to different tasks, the new task handling
> > > > > partition
> > > > > > 3
> > > > > > > >> does not have the previous state to resume the work. e.g. a
> > > > page-key
> > > > > > > based
> > > > > > > >> counter would start from 0 in the new task for a specific
> key,
> > > > > instead
> > > > > > > of
> > > > > > > >> resuming the previous count 50 held by task 1.
> > > > > > > >> - minor rewording: "the first solution in this doc" ==> "the
> > > > > solution
> > > > > > > >> proposed in this doc"
> > > > > > > >> - "Thus additional development work is needed in Kafka to
> meet
> > > > this
> > > > > > > >> requirement" It would be good to link to a KIP if and when
> it
> > > > exists
> > > > > > > >> - Instead of touching/deprecating the interface
> > > > > > > >> SystemStreamPartitionGrouper, I would recommend to have a
> > > > different
> > > > > > > >> implementation class of the interface, which in the
> > constructor
> > > of
> > > > > the
> > > > > > > >> grouper, takes two parameters: a) the previous task number
> > read
> > > > from
> > > > > > the
> > > > > > > >> coordinator stream; b) the configured new-partition to
> > > > old-partition
> > > > > > > >> mapping policy. Then, the grouper's interface method stays
> the
> > > > same
> > > > > > and
> > > > > > > >> the
> > > > > > > >> behavior of the grouper is more configurable which is good
> to
> > > > > support
> > > > > > a
> > > > > > > >> broader set of use cases in addition to Kafka's built-in
> > > partition
> > > > > > > >> expansion policies.
> > > > > > > >> - Minor renaming suggestion to the new grouper class names:
> > > > > > > >> GroupByPartitionWithFixedTaskNum
> > > > > > > >> and GroupBySystemStreamPartitionWithFixedTaskNum
> > > > > > > >>
> > > > > > > >> Thanks!
> > > > > > > >>
> > > > > > > >> - Yi
> > > > > > > >>
> > > > > > > >> On Wed, May 31, 2017 at 10:33 AM, Dong Lin <
> > lindon...@gmail.com
> > > >
> > > > > > wrote:
> > > > > > > >>
> > > > > > > >> > Hey Navina,
> > > > > > > >> >
> > > > > > > >> > Thanks much for the comment. Please see my response below.
> > > > > > > >> >
> > > > > > > >> > Regarding your biggest gripe with the SEP, I personally
> > think
> > > > the
> > > > > > > >> > operational requirement proposed in the KIP are pretty
> > general
> > > > and
> > > > > > > >> could be
> > > > > > > >> > easily enforced by other systems. The reason is that the
> > > module
> > > > > > > >> operation
> > > > > > > >> > is pretty standard and the default option when we choose
> > > > > partition.
> > > > > > > And
> > > > > > > >> > usually the underlying system allows user to select
> > arbitrary
> > > > > > > partition
> > > > > > > >> > number if it supports partition expansion. Do you know any
> > > > system
> > > > > > that
> > > > > > > >> does
> > > > > > > >> > not meet these two requirement?
> > > > > > > >> >
> > > > > > > >> > Regarding your comment of the Motivation section, I
> renamed
> > > the
> > > > > > first
> > > > > > > >> > section as "Problem and Goal" and specified that "*The
> goal
> > of
> > > > > this
> > > > > > > >> > proposal is to enable partition expansion of the input
> > > > > streams*.". I
> > > > > > > >> also
> > > > > > > >> > put a sentence at the end of the Motivation section that
> > "*The
> > > > > > feature
> > > > > > > >> of
> > > > > > > >> > task expansion is out of the scope of this proposal and
> will
> > > be
> > > > > > > >> addressed
> > > > > > > >> > in a future SEP*". The second paragraph in the Motivation
> > > > section
> > > > > is
> > > > > > > >> mainly
> > > > > > > >> > used to explain the thinking process that we have gone
> > > through,
> > > > > what
> > > > > > > >> other
> > > > > > > >> > alternative we have considered, and we plan to do in Samza
> > in
> > > > the
> > > > > > nex
> > > > > > > >> step.
> > > > > > > >> >
> > > > > > > >> > To answer your question why increasing the partition
> number
> > > will
> > > > > > > >> increase
> > > > > > > >> > the throughput of the kafka consumer in the container,
> Kafka
> > > > > > consumer
> > > > > > > >> can
> > > > > > > >> > potentially fetch more data in one FetchResponse with more
> > > > > > partitions
> > > > > > > in
> > > > > > > >> > the FetchRequest. This is because we limit the maximum
> > amount
> > > of
> > > > > > data
> > > > > > > >> that
> > > > > > > >> > can be fetch for a given partition in the FetchResponse.
> > This
> > > by
> > > > > > > >> default is
> > > > > > > >> > set to 1 MB. And there is reason that we can not
> arbitrarily
> > > > bump
> > > > > up
> > > > > > > >> this
> > > > > > > >> > limit.
> > > > > > > >> >
> > > > > > > >> > To answer your question how partition expansion in Kafka
> > > impacts
> > > > > the
> > > > > > > >> > clients, Kafka consumer is able to automatically detect
> new
> > > > > > partition
> > > > > > > of
> > > > > > > >> > the topic and reassign all (both old and new) partitions
> > > across
> > > > > > > >> consumers
> > > > > > > >> > in the consumer group IF you tell consumer the topic to be
> > > > > > subscribed.
> > > > > > > >> But
> > > > > > > >> > consumer in Samza's container uses another way of
> > > subscription.
> > > > > > > Instead
> > > > > > > >> of
> > > > > > > >> > subscribing to the topic, the consumer in Samza's
> container
> > > > > > subscribes
> > > > > > > >> to
> > > > > > > >> > the specific partitions of the topic. In this case, if new
> > > > > > partitions
> > > > > > > >> have
> > > > > > > >> > been added, Samza will need to explicitly subscribe to the
> > new
> > > > > > > >> partitions
> > > > > > > >> > of the topic. The "Handle partition expansion while tasks
> > are
> > > > > > running"
> > > > > > > >> > section in the SEP addresses this issue in Samza -- it
> > > > > recalculates
> > > > > > > the
> > > > > > > >> job
> > > > > > > >> > model and restart container so that consumer can subscribe
> > to
> > > > the
> > > > > > new
> > > > > > > >> > partitions.
> > > > > > > >> >
> > > > > > > >> > I will ask other dev to take a look at the proposal. I
> will
> > > > start
> > > > > > the
> > > > > > > >> > voting thread tomorrow if there is no further concern with
> > the
> > > > > SEP.
> > > > > > > >> >
> > > > > > > >> > Thanks!
> > > > > > > >> > Dong
> > > > > > > >> >
> > > > > > > >> >
> > > > > > > >> > On Wed, May 31, 2017 at 12:01 AM, Navina Ramesh (Apache) <
> > > > > > > >> > nav...@apache.org>
> > > > > > > >> > wrote:
> > > > > > > >> >
> > > > > > > >> > > Hey Dong,
> > > > > > > >> > >
> > > > > > > >> > > >  I have updated the motivation section to clarify
> this.
> > > > > > > >> > >
> > > > > > > >> > > Thanks for updating the motivation. Couple of notes
> here:
> > > > > > > >> > >
> > > > > > > >> > > 1.
> > > > > > > >> > > > "The motivation of increasing partition number of
> Kafka
> > > > topic
> > > > > > > >> includes
> > > > > > > >> > 1)
> > > > > > > >> > > limit the maximum size of a partition in order to
> improve
> > > > broker
> > > > > > > >> > > performance and 2) increase throughput of Kafka consumer
> > in
> > > > the
> > > > > > > Samza
> > > > > > > >> > > container."
> > > > > > > >> > >
> > > > > > > >> > > It's unclear to me how increasing the partition number
> > will
> > > > > > increase
> > > > > > > >> the
> > > > > > > >> > > throughput of the kafka consumer in the container?
> > > > > Theoretically,
> > > > > > > you
> > > > > > > >> > will
> > > > > > > >> > > still be consuming the same amount of data in the
> > container,
> > > > > > > >> irrespective
> > > > > > > >> > > of whether it is coming from one partition or more than
> > one
> > > > > > expanded
> > > > > > > >> > > partitions. Can you please explain it for me here, what
> > you
> > > > mean
> > > > > > by
> > > > > > > >> that?
> > > > > > > >> > >
> > > > > > > >> > > 2. I believe the second paragraph under motivation is
> > simply
> > > > > > talking
> > > > > > > >> > about
> > > > > > > >> > > the scope of the current SEP. It will be easier to read
> if
> > > > what
> > > > > > > >> solution
> > > > > > > >> > is
> > > > > > > >> > > included in this SEP and what is left out as not in
> scope.
> > > > (for
> > > > > > > >> example,
> > > > > > > >> > > expansions for stateful jobs is supported or not).
> > > > > > > >> > >
> > > > > > > >> > > > We need to persist the task-to-sspList mapping in the
> > > > > > > >> > > coordinator stream so that the job can derive the
> original
> > > > > number
> > > > > > of
> > > > > > > >> > > partitions of each input stream regardless of how many
> > times
> > > > the
> > > > > > > >> > partition
> > > > > > > >> > > has expanded. Does this make sense?
> > > > > > > >> > >
> > > > > > > >> > > Yes. It does!
> > > > > > > >> > >
> > > > > > > >> > > > I am not sure how this is related to the locality
> > though.
> > > > Can
> > > > > > you
> > > > > > > >> > clarify
> > > > > > > >> > > your question if I haven't answered your question?
> > > > > > > >> > >
> > > > > > > >> > > It's not related. I just meant to give an example of yet
> > > > another
> > > > > > > >> > > coordinator message that is persisted. Your ssp-to-task
> > > > mapping
> > > > > is
> > > > > > > >> > > following a similar pattern for persisting. Just wanted
> to
> > > > > clarify
> > > > > > > >> that.
> > > > > > > >> > >
> > > > > > > >> > > > Can you let me know if this, together with the answers
> > in
> > > > the
> > > > > > > >> previous
> > > > > > > >> > > email, addresses all your questions?
> > > > > > > >> > >
> > > > > > > >> > > Yes. I believe you have addressed most of my questions.
> > > Thanks
> > > > > for
> > > > > > > >> taking
> > > > > > > >> > > time to do that.
> > > > > > > >> > >
> > > > > > > >> > > > Is there specific question you have regarding
> partition
> > > > > > > >> > > expansion in Kafka?
> > > > > > > >> > >
> > > > > > > >> > > I guess my questions are on how partition expansion in
> > Kafka
> > > > > > impacts
> > > > > > > >> the
> > > > > > > >> > > clients. Iiuc, partition expansions are done manually in
> > > Kafka
> > > > > > based
> > > > > > > >> on
> > > > > > > >> > the
> > > > > > > >> > > bytes-in rate of the partition. Do the existing kafka
> > > clients
> > > > > > handle
> > > > > > > >> this
> > > > > > > >> > > expansion automatically? if yes, how does it work? If
> not,
> > > are
> > > > > > there
> > > > > > > >> > plans
> > > > > > > >> > > to support it in the future?
> > > > > > > >> > >
> > > > > > > >> > > > Thus user's job should not need to bootstrap key/value
> > > store
> > > > > > from
> > > > > > > >> the
> > > > > > > >> > > changelog topic.
> > > > > > > >> > >
> > > > > > > >> > > Why is this discussion relevant here? Key/value store /
> > > > > changelog
> > > > > > > >> topic
> > > > > > > >> > > partition is scoped with the context of a task. Since we
> > are
> > > > not
> > > > > > > >> changing
> > > > > > > >> > > the number of tasks, I don't think it is required to
> > mention
> > > > it
> > > > > > > here.
> > > > > > > >> > >
> > > > > > > >> > > > The new method takes the SystemStreamPartition-to-Task
> > > > > > assignment
> > > > > > > >> from
> > > > > > > >> > > the previous job model which can be read from the
> > > coordinator
> > > > > > > stream.
> > > > > > > >> > >
> > > > > > > >> > > Jobmodel is currently not persisted to coordinator
> stream.
> > > In
> > > > > your
> > > > > > > >> > design,
> > > > > > > >> > > you talk about writing separate coordinator messages for
> > > > > > ssp-to-task
> > > > > > > >> > > assignments. Hence, please correct this statement. It is
> > > kind
> > > > of
> > > > > > > >> > misleading
> > > > > > > >> > > to the reader.
> > > > > > > >> > >
> > > > > > > >> > > My biggest gripe with this SEP is that it seems like a
> > > > > tailor-made
> > > > > > > >> > solution
> > > > > > > >> > > that relies on the semantics of the Kafka system and
> yet,
> > we
> > > > are
> > > > > > > >> trying
> > > > > > > >> > to
> > > > > > > >> > > masquerade that as operational requirements for other
> > > systems
> > > > > > > >> interacting
> > > > > > > >> > > with Samza. (Not to say that this is the first time
> such a
> > > > > choice
> > > > > > is
> > > > > > > >> > being
> > > > > > > >> > > made in the Samza design). I am not seeing how this can
> a
> > > > > > "general"
> > > > > > > >> > > solution for all input systems. That's my two cents. I
> > would
> > > > > like
> > > > > > to
> > > > > > > >> hear
> > > > > > > >> > > alternative points of view for this from other devs.
> > > > > > > >> > >
> > > > > > > >> > > Please make sure you have enough eyes on this SEP. If
> you
> > > do,
> > > > > > please
> > > > > > > >> > start
> > > > > > > >> > > a VOTE thread to approve this SEP.
> > > > > > > >> > >
> > > > > > > >> > > Thanks!
> > > > > > > >> > > Navina
> > > > > > > >> > >
> > > > > > > >> > >
> > > > > > > >> > > On Mon, May 29, 2017 at 12:32 AM, Dong Lin <
> > > > lindon...@gmail.com
> > > > > >
> > > > > > > >> wrote:
> > > > > > > >> > >
> > > > > > > >> > > > Hey Navina,
> > > > > > > >> > > >
> > > > > > > >> > > > I have updated the wiki based on your suggestion. More
> > > > > > > >> specifically, I
> > > > > > > >> > > have
> > > > > > > >> > > > made the following changes:
> > > > > > > >> > > >
> > > > > > > >> > > > - Improved Problem section and Motivation section to
> > > > describe
> > > > > > why
> > > > > > > we
> > > > > > > >> > use
> > > > > > > >> > > > the solution in this proposal instead of tackling the
> > > > problem
> > > > > of
> > > > > > > >> task
> > > > > > > >> > > > expansion directly.
> > > > > > > >> > > >
> > > > > > > >> > > > - Illustrate the design in a way that doesn't bind to
> > > Kafka.
> > > > > > Kafka
> > > > > > > >> is
> > > > > > > >> > > only
> > > > > > > >> > > > used as example to illustrate why we want to expand
> > > > partition
> > > > > > > >> expansion
> > > > > > > >> > > and
> > > > > > > >> > > > whether the operational requirement can be supported
> > when
> > > > > Kafka
> > > > > > is
> > > > > > > >> used
> > > > > > > >> > > as
> > > > > > > >> > > > the input system. Note that the proposed solution
> should
> > > > work
> > > > > > for
> > > > > > > >> any
> > > > > > > >> > > input
> > > > > > > >> > > > system that meets the operational requirement
> described
> > in
> > > > the
> > > > > > > wiki.
> > > > > > > >> > > >
> > > > > > > >> > > > - Fixed the problem in the figure.
> > > > > > > >> > > >
> > > > > > > >> > > > - Added a new class GroupBySystemStreamPartitionFi
> > > > xedTaskNum
> > > > > to
> > > > > > > the
> > > > > > > >> > > wiki.
> > > > > > > >> > > > Together with GroupByPartitionFixedTaskNum, it should
> > > ensure
> > > > > > that
> > > > > > > we
> > > > > > > >> > > have a
> > > > > > > >> > > > solution to enable partition expansion for all users
> > that
> > > > are
> > > > > > > using
> > > > > > > >> > > > pre-defined grouper in Samza. Note that those users
> who
> > > use
> > > > > > custom
> > > > > > > >> > > grouper
> > > > > > > >> > > > would need to update their implementation.
> > > > > > > >> > > >
> > > > > > > >> > > > Can you let me know if this, together with the answers
> > in
> > > > the
> > > > > > > >> previous
> > > > > > > >> > > > email, addresses all your questions? Thanks for taking
> > > time
> > > > to
> > > > > > > >> review
> > > > > > > >> > the
> > > > > > > >> > > > proposal.
> > > > > > > >> > > >
> > > > > > > >> > > > Regards,
> > > > > > > >> > > > Dong
> > > > > > > >> > > >
> > > > > > > >> > > >
> > > > > > > >> > > >
> > > > > > > >> > > >
> > > > > > > >> > > >
> > > > > > > >> > > >
> > > > > > > >> > > >
> > > > > > > >> > > > On Wed, May 24, 2017 at 11:15 PM, Dong Lin <
> > > > > lindon...@gmail.com
> > > > > > >
> > > > > > > >> > wrote:
> > > > > > > >> > > >
> > > > > > > >> > > > > Hey Navina,
> > > > > > > >> > > > >
> > > > > > > >> > > > > Thanks much for your comments. Please see my reply
> > > inline.
> > > > > > > >> > > > >
> > > > > > > >> > > > > On Wed, May 24, 2017 at 10:22 AM, Navina Ramesh
> > > (Apache) <
> > > > > > > >> > > > > nav...@apache.org> wrote:
> > > > > > > >> > > > >
> > > > > > > >> > > > >> Thanks for the SEP, Dong. I have a couple of
> > questions
> > > to
> > > > > > > >> understand
> > > > > > > >> > > > your
> > > > > > > >> > > > >> proposal better:
> > > > > > > >> > > > >>
> > > > > > > >> > > > >> * Under motivation, you mention that "_We expect
> this
> > > > > > solution
> > > > > > > to
> > > > > > > >> > work
> > > > > > > >> > > > >> similarly with other input system as well._", yet I
> > > don't
> > > > > see
> > > > > > > any
> > > > > > > >> > > > >> discussion on how it will work with other input
> > > systems.
> > > > > That
> > > > > > > is,
> > > > > > > >> > what
> > > > > > > >> > > > >> kind
> > > > > > > >> > > > >> of contract does samza expect from other input
> > systems
> > > ?
> > > > If
> > > > > > we
> > > > > > > >> are
> > > > > > > >> > not
> > > > > > > >> > > > >> planning to provide a generic solution, it might be
> > > worth
> > > > > > > >> calling it
> > > > > > > >> > > out
> > > > > > > >> > > > >> in
> > > > > > > >> > > > >> the SEP.
> > > > > > > >> > > > >>
> > > > > > > >> > > > >
> > > > > > > >> > > > > I think the contract we expect from other systems
> are
> > > > > exactly
> > > > > > > the
> > > > > > > >> > > > > operational requirement mentioned in the SEP, i.e.
> > > > > partitions
> > > > > > > >> should
> > > > > > > >> > > > always
> > > > > > > >> > > > > be doubled and the hash algorithm should module the
> > > number
> > > > > of
> > > > > > > >> > > partitions.
> > > > > > > >> > > > > SEP-5 should also allow partition expansion of all
> > input
> > > > > > systems
> > > > > > > >> that
> > > > > > > >> > > > meet
> > > > > > > >> > > > > these two requirements. I have updated the
> motivation
> > > > > section
> > > > > > to
> > > > > > > >> > > clarify
> > > > > > > >> > > > > this.
> > > > > > > >> > > > >
> > > > > > > >> > > > >
> > > > > > > >> > > > >>
> > > > > > > >> > > > >> * I understand the partition mapping logic you have
> > > > > proposed.
> > > > > > > >> But I
> > > > > > > >> > > > think
> > > > > > > >> > > > >> the example explanation doesn't match the diagram.
> In
> > > the
> > > > > > > >> diagram,
> > > > > > > >> > > after
> > > > > > > >> > > > >> expansion, partiion-0 and partition-1 are pointing
> to
> > > > > bucket
> > > > > > 0
> > > > > > > >> and
> > > > > > > >> > > > >> partition-3 and partition-4 are pointing to bucket
> > 1. I
> > > > > think
> > > > > > > the
> > > > > > > >> > > former
> > > > > > > >> > > > >> has to be partition-0 and partition-2 and the
> latter,
> > > is
> > > > > > > >> partition-1
> > > > > > > >> > > and
> > > > > > > >> > > > >> partition-3. If I am wrong, please help me
> understand
> > > the
> > > > > > logic
> > > > > > > >> :)
> > > > > > > >> > > > >>
> > > > > > > >> > > > >
> > > > > > > >> > > > > Good catch. I will update the figure to fix this
> > > problem.
> > > > > > > >> > > > >
> > > > > > > >> > > > >
> > > > > > > >> > > > >>
> > > > > > > >> > > > >> * I don't know how partition expansion in Kafka
> > works.
> > > I
> > > > am
> > > > > > > >> familiar
> > > > > > > >> > > > with
> > > > > > > >> > > > >> how shard splitting happens in Kinesis - there is
> > > > > > hierarchical
> > > > > > > >> > > relation
> > > > > > > >> > > > >> between the parent and child shards. This way, it
> > will
> > > > also
> > > > > > > allow
> > > > > > > >> > the
> > > > > > > >> > > > >> shards to be merged back. Iiuc, Kafka only supports
> > > > > partition
> > > > > > > >> > > > "expansion",
> > > > > > > >> > > > >> as opposed to "splits". Can you provide some
> context
> > or
> > > > > link
> > > > > > > >> related
> > > > > > > >> > > to
> > > > > > > >> > > > >> how
> > > > > > > >> > > > >> partition expansion works in Kafka?
> > > > > > > >> > > > >>
> > > > > > > >> > > > >
> > > > > > > >> > > > > I couldn't find any wiki on partition expansion in
> > > Kafka.
> > > > > The
> > > > > > > >> > partition
> > > > > > > >> > > > > expansion logic in Kafka is very simply -- it simply
> > > adds
> > > > > new
> > > > > > > >> > partition
> > > > > > > >> > > > to
> > > > > > > >> > > > > the existing topic. Is there specific question you
> > have
> > > > > > > regarding
> > > > > > > >> > > > partition
> > > > > > > >> > > > > expansion in Kafka?
> > > > > > > >> > > > >
> > > > > > > >> > > > >
> > > > > > > >> > > > >>
> > > > > > > >> > > > >> * Are you only recommending that expansion can be
> > > > supported
> > > > > > for
> > > > > > > >> > samza
> > > > > > > >> > > > jobs
> > > > > > > >> > > > >> that use Kafka as input systems **and** configure
> the
> > > > > > > SSPGrouper
> > > > > > > >> as
> > > > > > > >> > > > >> GroupByPartitionFixedTaskNum? Sounds to me like
> this
> > > only
> > > > > > > applies
> > > > > > > >> > for
> > > > > > > >> > > > >> GroupByPartition. Please correct me if I am wrong.
> > What
> > > > is
> > > > > > the
> > > > > > > >> > > > expectation
> > > > > > > >> > > > >> for custom SSP Groupers?
> > > > > > > >> > > > >>
> > > > > > > >> > > > >
> > > > > > > >> > > > > The expansion can be supported for Samza jobs if the
> > > input
> > > > > > > system
> > > > > > > >> > meets
> > > > > > > >> > > > > the operational requirement mentioned above. It
> > doesn't
> > > > have
> > > > > > to
> > > > > > > >> use
> > > > > > > >> > > Kafka
> > > > > > > >> > > > > as input system.
> > > > > > > >> > > > >
> > > > > > > >> > > > > The current proposal provided solution for jobs that
> > > > > currently
> > > > > > > use
> > > > > > > >> > > > > GroupByPartition. The proposal can be extended to
> > > support
> > > > > jobs
> > > > > > > >> that
> > > > > > > >> > use
> > > > > > > >> > > > > other grouper that are pre-defined in Samza. The
> > custom
> > > > SSP
> > > > > > > >> grouper
> > > > > > > >> > > needs
> > > > > > > >> > > > > to handle partition expansion similar to how
> > > > > > > >> > > GroupByPartitionFixedTaskNum
> > > > > > > >> > > > > handles it and it is users' responsibility to update
> > > their
> > > > > > > custom
> > > > > > > >> > > grouper
> > > > > > > >> > > > > implementation.
> > > > > > > >> > > > >
> > > > > > > >> > > > >
> > > > > > > >> > > > >>
> > > > > > > >> > > > >> * Regarding storing SSP-to-Task assignment to
> > > coordinator
> > > > > > > stream:
> > > > > > > >> > > Today,
> > > > > > > >> > > > >> the JobModel encapsulates the data model in samza
> > which
> > > > > also
> > > > > > > >> > includes
> > > > > > > >> > > > >> **TaskModels**. TaskModel, typically shows the
> > > > > > task-to-sspList
> > > > > > > >> > > mapping.
> > > > > > > >> > > > >> What is the reason for using a separate coordinator
> > > > stream
> > > > > > > >> message
> > > > > > > >> > > > >> *SetSSPTaskMapping*? Is it because the JobModel
> > itself
> > > is
> > > > > not
> > > > > > > >> > > persisted
> > > > > > > >> > > > in
> > > > > > > >> > > > >> the coordinator stream today?  The reason locality
> > > exists
> > > > > > > >> outside of
> > > > > > > >> > > the
> > > > > > > >> > > > >> jobmodel is because *locality* information is
> written
> > > by
> > > > > each
> > > > > > > >> > > container,
> > > > > > > >> > > > >> where as it is consumed only by the leader
> > > > > jobcoordinator/AM.
> > > > > > > In
> > > > > > > >> > this
> > > > > > > >> > > > >> case,
> > > > > > > >> > > > >> the writer of the mapping information and the
> reader
> > is
> > > > > still
> > > > > > > the
> > > > > > > >> > > leader
> > > > > > > >> > > > >> jobcoordinator/AM. So, I want to understand the
> > > > motivation
> > > > > > for
> > > > > > > >> this
> > > > > > > >> > > > >> choice.
> > > > > > > >> > > > >>
> > > > > > > >> > > > >
> > > > > > > >> > > > > Yes, the reason for using a separate coordinate
> stream
> > > > > message
> > > > > > > is
> > > > > > > >> > > because
> > > > > > > >> > > > > the task-to-sspList mapping is not currently
> persisted
> > > in
> > > > > the
> > > > > > > >> > > coordinator
> > > > > > > >> > > > > stream. We wouldn't need to create this new stream
> > > message
> > > > > if
> > > > > > > >> > JobModel
> > > > > > > >> > > is
> > > > > > > >> > > > > persisted. We need to persist the task-to-sspList
> > > mapping
> > > > in
> > > > > > the
> > > > > > > >> > > > > coordinator stream so that the job can derive the
> > > original
> > > > > > > number
> > > > > > > >> of
> > > > > > > >> > > > > partitions of each input stream regardless of how
> many
> > > > times
> > > > > > the
> > > > > > > >> > > > partition
> > > > > > > >> > > > > has expanded. Does this make sense?
> > > > > > > >> > > > >
> > > > > > > >> > > > > I am not sure how this is related to the locality
> > > though.
> > > > > Can
> > > > > > > you
> > > > > > > >> > > clarify
> > > > > > > >> > > > > your question if I haven't answered your question?
> > > > > > > >> > > > >
> > > > > > > >> > > > > Thanks!
> > > > > > > >> > > > > Dong
> > > > > > > >> > > > >
> > > > > > > >> > > > >
> > > > > > > >> > > > >>
> > > > > > > >> > > > >> Cheers!
> > > > > > > >> > > > >> Navina
> > > > > > > >> > > > >>
> > > > > > > >> > > > >> On Tue, May 23, 2017 at 5:45 PM, Dong Lin <
> > > > > > lindon...@gmail.com
> > > > > > > >
> > > > > > > >> > > wrote:
> > > > > > > >> > > > >>
> > > > > > > >> > > > >> > Hi all,
> > > > > > > >> > > > >> >
> > > > > > > >> > > > >> > We created SEP-5: Enable partition expansion of
> > input
> > > > > > > streams.
> > > > > > > >> > > Please
> > > > > > > >> > > > >> find
> > > > > > > >> > > > >> > the SEP wiki in the link
> > > > > > > >> > > > >> > https://cwiki.apache.org/
> > > confluence/display/SAMZA/SEP-
> > > > > > > >> > > > >> > 5%3A+Enable+partition+expansion+of+input+streams
> > > > > > > >> > > > >> > .
> > > > > > > >> > > > >> >
> > > > > > > >> > > > >> > You feedback is appreciated!
> > > > > > > >> > > > >> >
> > > > > > > >> > > > >> > Thanks,
> > > > > > > >> > > > >> > Dong
> > > > > > > >> > > > >> >
> > > > > > > >> > > > >>
> > > > > > > >> > > > >
> > > > > > > >> > > > >
> > > > > > > >> > > >
> > > > > > > >> > >
> > > > > > > >> >
> > > > > > > >>
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to