>
> - For users that need partition expansion of the input streams for stateful
> job, they have a really big headache in the sense that Samza does not allow
> partition expansion for stateful job. SEP-5 addresses this headache for
> them.
> You are right that SEP-5 requires user to understand and enforce
> limitations across organizations. But it is still much better than not
> allowing user to expansion partition for stateful jobs at all, right? Did I
> miss something here?

I guess this one is a matter of perspective.

One argument is that if the system supports one case, it's better than none
because there is one less scenario in which the system does the wrong
thing.

The counter argument is for uniform and consistent behavior, which is easy
for users to understand and properly leverage.

Specifically, I'd argue that the current rule is very simple: "you cannot
repartition inputs on a stateful job, so you must over-partition the
initial implementation". To me, while that rule is not ideal, its
simplicity is better that introducing a new solution that has a bunch of
caveats, any one of which could be missed. If any one of the assumptions in
this SEP design are violated, the job would behave incorrectly. That puts a
lot more burden on the users than the simpler rule.

That's why I mentioned a few alternatives that, while more complex to
implement, would provide a more consistent behavior with simple rules for
the users.

Yes, we need a similar check for GroupBySystemStreamPartitionWi
> thFixedTaskNum
> as well. If there is more grouper classes needed in the future, we can
> solve this problem cleanly without new config. Given the
> previousGrouperClass and newGrouperClass, KafkaCheckpointLogKey will throw
> exception if and only if newGrouperClass is an instance of
> previousGrouperClass.
> GroupBySystemStreamPartitionWithFixedTaskNum should extend
> GroupBySystemStreamPartition
> and GroupByPartitionWithFixedTaskNum should extend GroupByPartition. Does
> this address your concern?

Sounds workable, thanks.

>
> Can
> you be more specific why Partition-to-task mapping is not meaningful
> without
> some definition of the key-to-partition assignments and why it is
> incomplete and misleading?

 A partition is (in my naive interpretation) an independent queue for
messages of a particular key set. It is not the *identity* of the partition
that determine the contents of the associated task's local state. Rather it
is the *contents* of the partition that affect the task's state. A
partiton-to-task mapping only captures an identity relationship:
partition1->task1. Without the assumptions of this SEP, this is
insufficient to determine the assignment of keys to tasks, which is what
really matters. Therefore, any future feature that utilizes this mapping
without accounting for the assumptions of this SEP is likely to malfunction.


On Mon, Jun 12, 2017 at 5:09 PM, Dong Lin <lindon...@gmail.com> wrote:

> Hey Jacob,
>
> Thanks for the explanation. It seems that your biggest concern is with the
> generality of the proposal. Let me try to address this and other comments
> below.
>
> 1) ... it will cause headaches for Samza users ...
>
> I am not sure I understand why this proposal causes headache for Samza
> users. Here is the impact of the SEP-5 on users:
>
> - For users that do not need partition expansion of the input stream, they
> can use Samza without change change in code/binary/config. Thus there is no
> headache for them.
>
> - For users that need partition expansion of the input streams for
> stateless job, they currently need to manually reboot their Samza job in
> order to let Samza consume the new partitions created for the stream. SEP-5
> actually reduced their headache by allowing Samza to automatically detect
> and consume new partitions.
>
> - For users that need partition expansion of the input streams for stateful
> job, they have a really big headache in the sense that Samza does not allow
> partition expansion for stateful job. SEP-5 addresses this headache for
> them.
>
> You are right that SEP-5 requires user to understand and enforce
> limitations across organizations. But it is still much better than not
> allowing user to expansion partition for stateful jobs at all, right? Did I
> miss something here?
>
> 2) ... Separate orgs are often difficult to coordinate and a system which
> depends on such significant process/coordination is too fragile for my
> taste ..
>
> This is true. Ideally we want a system that is fully self-serving. I think
> this is a long term goal for Samza. Still, for the reasons described above,
> I think something is better than nothing. I am open to alternative design
> that can support partition expansion for stateful jobs without requiring
> coordination.
>
> 3) There is currently no supported way of sharing state among the tasks of
> a container.  Each task has its own isolated store and that logical
> isolation is the primary thing that enables Samza jobs to scale with a
> simple container count change. My feeling is that we should not change
> this without
> good reason.
>
> I see your point. I will remove this sentence from the motivation section.
> This won't have any impact on the design of the SEP-5. Does this address
> the problem?
>
> 4) With the current proposal, we'd also need a similar check for
> GroupBySystemStreamPartitionWithFixedTaskNum as well. And if any other
> groupers
> were later added with both these modes, we'd probably need to add those
> too. It might be easier and cleaner to add a config to ignore that check
> temporarily. Down side is that it further complicates the Samza config,
> which is already huge. Thoughts?
>
> Yes, we need a similar check for GroupBySystemStreamPartitionWi
> thFixedTaskNum
> as well. If there is more grouper classes needed in the future, we can
> solve this problem cleanly without new config. Given the
> previousGrouperClass and newGrouperClass, KafkaCheckpointLogKey will throw
> exception if and only if newGrouperClass is an instance of
> previousGrouperClass.
> GroupBySystemStreamPartitionWithFixedTaskNum should extend
> GroupBySystemStreamPartition
> and GroupByPartitionWithFixedTaskNum should extend GroupByPartition. Does
> this address your concern?
>
> 5) The task-to-container and container-to-host mappings are both meaningful
> in context of the JobModel. Partition-to-task mapping is not meaningful
> without
> some definition of the key-to-partition assignments. It's incomplete
> information and therefore misleading. I think it only makes sense to use
> this mapping if we adopt a solution wherein Samza also knows the partition
> key assignment.
>
> Partition-to-task is currently explicitly passed from job coordinator to
> each task as part of the job model to tell tasks which partitions to
> consume from. I think we can store some definition of the key-to-partition
> assignments if Samza decides to get and use this information in the
> future. Can
> you be more specific why Partition-to-task mapping is not meaningful
> without
> some definition of the key-to-partition assignments and why it is
> incomplete and misleading?
>
>
> Thanks,
> Dong
>
> On Mon, Jun 12, 2017 at 3:54 PM, Jacob Maes <jacob.m...@gmail.com> wrote:
>
> > Hey Dong,
> >
> > I'm opposed (or a +0, at best) to this limited, Kafka-specific solution.
> I
> > understand that the proposal is relatively simple to implement, but I
> think
> > it will cause headaches for Samza users. They will not only have to
> > understand all the limitations (increase only, double partitions only,
> > partition using hash+modulo, etc) of this approach, but enforcing these
> > limitations can be a major problem, especially when the Samza jobs and
> > message brokers are managed by separate orgs in a company. Separate orgs
> > are often difficult to coordinate and a system which depends on such
> > significant process/coordination is too fragile for my taste.
> >
> > That said, I realize that my opinion is just one of many in the broader
> > community which may feel differently, so let me respond to some of the
> > other items in the discussion so we can clear them up:
> >
> > The task-to-container assignment matters because if the correlated tasks
> > > (i.e. tasks that consume messages with the same key) needs to be in the
> > > same container so that they can share the same key/value local store on
> > the
> > > same physical machine.
> >
> > There is currently no supported way of sharing state among the tasks of a
> > container.  Each task has its own isolated store and that logical
> isolation
> > is the primary thing that enables Samza jobs to scale with a simple
> > container count change. My feeling is that we should not change this
> > without good reason.
> >
> > I think we can hardcode new logic in KafkaCheckpointLogKey.scala such
> that
> > > exception will not be thrown if new grouper is
> > > GroupByPartitionWithFixedTaskNum and old grouper is GroupByPartition.
> > Does
> > > this look reasonable?
> >
> > With the current proposal, we'd also need a similar check for
> > GroupBySystemStreamPartitionWithFixedTaskNum as well. And if any other
> > groupers were later added with both these modes, we'd probably need to
> add
> > those too. It might be easier and cleaner to add a config to ignore that
> > check temporarily. Down side is that it further complicates the Samza
> > config, which is already huge. Thoughts?
> >
> > I think storing the previous task-to-partition mapping is more general
> than
> > > storing the partition count of all topics for the following reasons:
> > > - Samza already stores the task-to-container mapping and
> > container-to-host
> > > mapping in the coordinator stream. It seems consistent to also store
> the
> > > partition-to-task mapping. And this information may be useful for other
> > > use-case such as debugging.
> > > - By having the new interface take the previous task-to-partition
> > > assignment instead of a topic-to-partition-count mapping as new
> > parameter,
> > > we can potentially have grouper implementation to support other types
> of
> > > input systems.
> > > - It is sightly simpler to store the task-to-partition assignment
> because
> > > we don't need to know whether this is the first time a job is started
> or
> > > not. On the other hand, you can write topic-to-partition-count mapping
> to
> > > the coordinator stream only if this is the first time the job is run
> >
> > The task-to-container and container-to-host mappings are both meaningful
> in
> > context of the JobModel. Partition-to-task mapping is not meaningful
> > without some definition of the key-to-partition assignments. It's
> > incomplete information and therefore misleading. I think it only makes
> > sense to use this mapping if we adopt a solution wherein Samza also knows
> > the partition key assignment.
> >
> > -Jake
> >
> > On Tue, Jun 6, 2017 at 11:06 PM, Dong Lin <lindon...@gmail.com> wrote:
> >
> > > Hey Jacob,
> > >
> > > Thanks for taking time to review the SEP.
> > >
> > > I agree with you and Navina that the current SEP doesn't provide
> support
> > to
> > > arbitrary input systems and it doesn't support partition shrink. I
> think
> > > the scope of this SEP is to support partition expansion for Kafka (the
> > most
> > > widely used input system of Samza) and keep the door open for partition
> > > support of various input systems. The current design can support any
> > system
> > > that meets the two operational requirement specified in the doc.
> > >
> > > While it is possible to support more types of input systems, it will
> > likely
> > > add more complexity to the design. For example, the first alternative
> > > solution from you requires broker-side support to negotiate hash
> > algorithm.
> > > The second alternative solution requires changelog partition reshuffle
> > > which carries its own design complexity and performance overhead. There
> > is
> > > tradeoff between the generality and the complexity among these
> choices. I
> > > like the current design because it is simple and addresses a big usage
> > > scenario for us. We can add more complexity to generalize the design if
> > it
> > > enables important use-case. Does this sound reasonable?
> > >
> > > Note that the "Rejected Alternative" section also mentions the
> > possibility
> > > of supporting a wider range of input systems by allowing user to
> specify
> > > the new-partition to old-partition mapping. We are not doing it because
> > 1)
> > > we may have better understanding of the design after we have a specific
> > > second input system to support 2) the current design can be extended to
> > > support general input systems. I think similar argument can be applied
> > > explain why we don't have to support general input systems using the
> > > potentially-good alternatives you mentioned.
> > >
> > > I hope SEP-5 can be an important first-step towards supporting
> partition
> > > expansion of any input system.
> > >
> > > To answer your questions about the current proposal:
> > >
> > > >1. "An alternative solution is to allow task number to increase after
> > > >partition expansion and uses a proper task-to-container assignment to
> > make
> > > >sure the Samza output is correct." What does the container have to do
> > with
> > > >stateful processing or output in general?
> > >
> > > The task-to-container assignment matters because if the correlated
> tasks
> > > (i.e. tasks that consume messages with the same key) needs to be in the
> > > same container so that they can share the same key/value local store on
> > the
> > > same physical machine.
> > >
> > > >2. When you use "Join" as an example, you basically mean multiple
> > > >co-partitioned streams, right? This is opposed to multiple,
> > > >independently-partitioned streams or a single stream. Would be nice to
> > > >formulate the proposal in these more general terms.
> > >
> > > I thought "join" is a commonly used to refer to the join opeartion with
> > > co-partitioned stream but I may be wrong. I have updated the wiki to
> > > explicitly mention "co-partitioned stream". Does this look better now?
> > >
> > > >3. When switching SSP groupers, how will the users avoid the
> > > >org.apache.samza.checkpoint.kafka.DifferingSystemStreamPartition
> > > GrouperFactoryValues
> > > >exception?
> > >
> > > I think we can hardcode new logic in KafkaCheckpointLogKey.scala such
> > that
> > > exception will not be thrown if new grouper is
> > > GroupByPartitionWithFixedTaskNum and old grouper is GroupByPartition.
> > Does
> > > this look reasonable?
> > >
> > > >4. Partition to task assignment is meaningless without key to
> partition
> > > >mapping. The real semantics are captured in the external requirement
> for
> > > >partitioning via hash+modulo. But in that case, iiuc, only the
> partition
> > > >count matters. So why not just store the original partition count
> rather
> > > >than the whole mapping?
> > >
> > > I think storing the previous task-to-partition mapping is more general
> > than
> > > storing the partition count of all topics for the following reasons:
> > >
> > > - Samza already stores the task-to-container mapping and
> > container-to-host
> > > mapping in the coordinator stream. It seems consistent to also store
> the
> > > partition-to-task mapping. And this information may be useful for other
> > > use-case such as debugging.
> > >
> > > - By having the new interface take the previous task-to-partition
> > > assignment instead of a topic-to-partition-count mapping as new
> > parameter,
> > > we can potentially have grouper implementation to support other types
> of
> > > input systems.
> > >
> > > - It is sightly simpler to store the task-to-partition assignment
> because
> > > we don't need to know whether this is the first time a job is started
> or
> > > not. On the other hand, you can write topic-to-partition-count mapping
> to
> > > the coordinator stream only if this is the first time the job is run
> > >
> > > Thanks,
> > > Dong
> > >
> > > On Mon, Jun 5, 2017 at 11:17 AM, Jacob Maes <jacob.m...@gmail.com>
> > wrote:
> > >
> > > > Hey Dong,
> > > >
> > > > Thanks for the SEP. Supporting partition changes is critically
> > important
> > > > for stateful Samza jobs, so it's great to see some ideas on that
> front!
> > > >
> > > > Sorry for the late feedback, but I have a few thoughts to contribute.
> > > >
> > > > Big +1 on Navina's comment:
> > > >
> > > > > My biggest gripe with this SEP is that it seems like a tailor-made
> > > > > solution
> > > > > that relies on the semantics of the Kafka system and yet, we are
> > trying
> > > > to
> > > > > masquerade that as operational requirements for other systems
> > > interacting
> > > > > with Samza. (Not to say that this is the first time such a choice
> is
> > > > being
> > > > > made in the Samza design). I am not seeing how this can a "general"
> > > > > solution for all input systems. That's my two cents. I would like
> to
> > > hear
> > > > > alternative points of view for this from other devs.
> > > >
> > > >
> > > > Two examples of this:
> > > > 1. This is mostly a hypothetical, but some message brokers may use
> key
> > > > range assignment rather than hash+modulo.
> > > > 2. Kafka can't reduce the number of partitions, but it can happen on
> > > other
> > > > systems. For example, it may be cheaper to reduce the number of
> > > partitions
> > > > on a hosted service where the cost model depends on the number of
> > > > partitions/shards.
> > > >
> > > > It seems to me that a solution which doesn't depend on partition key
> > > > assignment in the message broker. Here are a few alternatives that
> > > weren't
> > > > discussed and I think should be considered:
> > > >
> > > > Alternatives in order of increasing preference:
> > > > 1. Samza manages the partition hash (via some new contract with the
> > > > brokers) and guarantees correct routing of keys among the new
> > partitions.
> > > > 2. Samza detects a task count change, creates a new changelog with
> > > correct
> > > > partitions, and *somehow* reshuffles all existing changelog data into
> > the
> > > > new topic and then uses the new topic from then on. (doesn't work
> > without
> > > > changelog, but in that case durability isn't paramount, so we can
> just
> > > > wipe)
> > > > 3. Use RPC in between stages and samza fully manages key assignment
> > among
> > > > tasks. No on-disk topic data to clean up. Mandatory repartitioning in
> > the
> > > > first stage to pre-scaled tasks in next stage.
> > > > 4. Combined 2-3 solution
> > > >
> > > > Finally, some questions about the current proposal:
> > > > 1. "An alternative solution is to allow task number to increase after
> > > > partition expansion and uses a proper task-to-container assignment to
> > > make
> > > > sure the Samza output is correct." What does the container have to do
> > > with
> > > > stateful processing or output in general?
> > > > 2. When you use "Join" as an example, you basically mean multiple
> > > > co-partitioned streams, right? This is opposed to multiple,
> > > > independently-partitioned streams or a single stream. Would be nice
> to
> > > > formulate the proposal in these more general terms.
> > > > 3. When switching SSP groupers, how will the users avoid the
> > > > org.apache.samza.checkpoint.kafka.DifferingSystemStreamParti
> > > > tionGrouperFactoryValues
> > > > exception?
> > > > 4. Partition to task assignment is meaningless without key to
> partition
> > > > mapping. The real semantics are captured in the external requirement
> > for
> > > > partitioning via hash+modulo. But in that case, iiuc, only the
> > partition
> > > > count matters. So why not just store the original partition count
> > rather
> > > > than the whole mapping?
> > > >
> > > > -Jake
> > > >
> > > > On Sun, Jun 4, 2017 at 12:32 PM, Dong Lin <lindon...@gmail.com>
> wrote:
> > > >
> > > > > Hey Yi, Navina,
> > > > >
> > > > > I have updated the SEP-5 document based on our discussion. The
> > > difference
> > > > > can be found here
> > > > > <https://cwiki.apache.org/confluence/pages/
> diffpagesbyversion.action
> > ?
> > > > > pageId=70255476&selectedPageVersions=14&selectedPageVersions=15>.
> > > > > Here is the summary of changes:
> > > > >
> > > > > - Add new interface that extends the existing interface
> > > > > SystemStreamPartitionGrouper. Newly-added grouper class should
> > > implement
> > > > > this interface.
> > > > > - Explained in the Rejected Alternative Section why we don't add
> new
> > > > method
> > > > > in the existing interface
> > > > > - Explained in the Rejected Alternative Section why we don't
> > > config/class
> > > > > for user to specify new-partition to old-partition mapping.
> > > > >
> > > > > Can you take another look at the proposal and let me know if there
> is
> > > any
> > > > > concern?
> > > > >
> > > > > Cheers,
> > > > > Dong
> > > > >
> > > > >
> > > > > On Thu, Jun 1, 2017 at 12:58 AM, Dong Lin <lindon...@gmail.com>
> > wrote:
> > > > >
> > > > > > Hey Yi,
> > > > > >
> > > > > > Thanks much for the comment. I have updated the doc to address
> all
> > > your
> > > > > > comments except the one related to the interface. I am not sure I
> > > > > > understand your suggestion of the new interface. Will discuss
> > > tomorrow.
> > > > > >
> > > > > > Thanks,
> > > > > > Dong
> > > > > >
> > > > > > On Wed, May 31, 2017 at 4:29 PM, Yi Pan <nickpa...@gmail.com>
> > wrote:
> > > > > >
> > > > > >> Hi, Don,
> > > > > >>
> > > > > >> Thanks for the detailed design doc for a long-waited feature in
> > > Samza!
> > > > > >> Really appreciate it! I did a quick pass and have the following
> > > > > comments:
> > > > > >>
> > > > > >> - minor: "limit the maximum size of partition" ==> "limit the
> > > maximum
> > > > > size
> > > > > >> of each partition"
> > > > > >> - "However, Samza currently is not able to handle partition
> > > expansion
> > > > of
> > > > > >> the input streams"==>better point out "for stateful jobs". For
> > > > stateless
> > > > > >> jobs, simply bouncing the job now can pick up the new
> partitions.
> > > > > >> - "it is possible (e.g. with Kafka) that messages with a given
> key
> > > > > exists
> > > > > >> in both partition 1 an 3. Because GroupByPartition will assign
> > > > > partition 1
> > > > > >> and 3 to different tasks, messages with the same key may be
> > handled
> > > by
> > > > > >> different task/container/process and their state will be stored
> in
> > > > > >> different changelog partition." The problem statement is not
> super
> > > > clear
> > > > > >> here. The issues with stateful jobs is: after GroupByPartition
> > > assign
> > > > > >> partition 1 and 3 to different tasks, the new task handling
> > > partition
> > > > 3
> > > > > >> does not have the previous state to resume the work. e.g. a
> > page-key
> > > > > based
> > > > > >> counter would start from 0 in the new task for a specific key,
> > > instead
> > > > > of
> > > > > >> resuming the previous count 50 held by task 1.
> > > > > >> - minor rewording: "the first solution in this doc" ==> "the
> > > solution
> > > > > >> proposed in this doc"
> > > > > >> - "Thus additional development work is needed in Kafka to meet
> > this
> > > > > >> requirement" It would be good to link to a KIP if and when it
> > exists
> > > > > >> - Instead of touching/deprecating the interface
> > > > > >> SystemStreamPartitionGrouper, I would recommend to have a
> > different
> > > > > >> implementation class of the interface, which in the constructor
> of
> > > the
> > > > > >> grouper, takes two parameters: a) the previous task number read
> > from
> > > > the
> > > > > >> coordinator stream; b) the configured new-partition to
> > old-partition
> > > > > >> mapping policy. Then, the grouper's interface method stays the
> > same
> > > > and
> > > > > >> the
> > > > > >> behavior of the grouper is more configurable which is good to
> > > support
> > > > a
> > > > > >> broader set of use cases in addition to Kafka's built-in
> partition
> > > > > >> expansion policies.
> > > > > >> - Minor renaming suggestion to the new grouper class names:
> > > > > >> GroupByPartitionWithFixedTaskNum
> > > > > >> and GroupBySystemStreamPartitionWithFixedTaskNum
> > > > > >>
> > > > > >> Thanks!
> > > > > >>
> > > > > >> - Yi
> > > > > >>
> > > > > >> On Wed, May 31, 2017 at 10:33 AM, Dong Lin <lindon...@gmail.com
> >
> > > > wrote:
> > > > > >>
> > > > > >> > Hey Navina,
> > > > > >> >
> > > > > >> > Thanks much for the comment. Please see my response below.
> > > > > >> >
> > > > > >> > Regarding your biggest gripe with the SEP, I personally think
> > the
> > > > > >> > operational requirement proposed in the KIP are pretty general
> > and
> > > > > >> could be
> > > > > >> > easily enforced by other systems. The reason is that the
> module
> > > > > >> operation
> > > > > >> > is pretty standard and the default option when we choose
> > > partition.
> > > > > And
> > > > > >> > usually the underlying system allows user to select arbitrary
> > > > > partition
> > > > > >> > number if it supports partition expansion. Do you know any
> > system
> > > > that
> > > > > >> does
> > > > > >> > not meet these two requirement?
> > > > > >> >
> > > > > >> > Regarding your comment of the Motivation section, I renamed
> the
> > > > first
> > > > > >> > section as "Problem and Goal" and specified that "*The goal of
> > > this
> > > > > >> > proposal is to enable partition expansion of the input
> > > streams*.". I
> > > > > >> also
> > > > > >> > put a sentence at the end of the Motivation section that "*The
> > > > feature
> > > > > >> of
> > > > > >> > task expansion is out of the scope of this proposal and will
> be
> > > > > >> addressed
> > > > > >> > in a future SEP*". The second paragraph in the Motivation
> > section
> > > is
> > > > > >> mainly
> > > > > >> > used to explain the thinking process that we have gone
> through,
> > > what
> > > > > >> other
> > > > > >> > alternative we have considered, and we plan to do in Samza in
> > the
> > > > nex
> > > > > >> step.
> > > > > >> >
> > > > > >> > To answer your question why increasing the partition number
> will
> > > > > >> increase
> > > > > >> > the throughput of the kafka consumer in the container, Kafka
> > > > consumer
> > > > > >> can
> > > > > >> > potentially fetch more data in one FetchResponse with more
> > > > partitions
> > > > > in
> > > > > >> > the FetchRequest. This is because we limit the maximum amount
> of
> > > > data
> > > > > >> that
> > > > > >> > can be fetch for a given partition in the FetchResponse. This
> by
> > > > > >> default is
> > > > > >> > set to 1 MB. And there is reason that we can not arbitrarily
> > bump
> > > up
> > > > > >> this
> > > > > >> > limit.
> > > > > >> >
> > > > > >> > To answer your question how partition expansion in Kafka
> impacts
> > > the
> > > > > >> > clients, Kafka consumer is able to automatically detect new
> > > > partition
> > > > > of
> > > > > >> > the topic and reassign all (both old and new) partitions
> across
> > > > > >> consumers
> > > > > >> > in the consumer group IF you tell consumer the topic to be
> > > > subscribed.
> > > > > >> But
> > > > > >> > consumer in Samza's container uses another way of
> subscription.
> > > > > Instead
> > > > > >> of
> > > > > >> > subscribing to the topic, the consumer in Samza's container
> > > > subscribes
> > > > > >> to
> > > > > >> > the specific partitions of the topic. In this case, if new
> > > > partitions
> > > > > >> have
> > > > > >> > been added, Samza will need to explicitly subscribe to the new
> > > > > >> partitions
> > > > > >> > of the topic. The "Handle partition expansion while tasks are
> > > > running"
> > > > > >> > section in the SEP addresses this issue in Samza -- it
> > > recalculates
> > > > > the
> > > > > >> job
> > > > > >> > model and restart container so that consumer can subscribe to
> > the
> > > > new
> > > > > >> > partitions.
> > > > > >> >
> > > > > >> > I will ask other dev to take a look at the proposal. I will
> > start
> > > > the
> > > > > >> > voting thread tomorrow if there is no further concern with the
> > > SEP.
> > > > > >> >
> > > > > >> > Thanks!
> > > > > >> > Dong
> > > > > >> >
> > > > > >> >
> > > > > >> > On Wed, May 31, 2017 at 12:01 AM, Navina Ramesh (Apache) <
> > > > > >> > nav...@apache.org>
> > > > > >> > wrote:
> > > > > >> >
> > > > > >> > > Hey Dong,
> > > > > >> > >
> > > > > >> > > >  I have updated the motivation section to clarify this.
> > > > > >> > >
> > > > > >> > > Thanks for updating the motivation. Couple of notes here:
> > > > > >> > >
> > > > > >> > > 1.
> > > > > >> > > > "The motivation of increasing partition number of Kafka
> > topic
> > > > > >> includes
> > > > > >> > 1)
> > > > > >> > > limit the maximum size of a partition in order to improve
> > broker
> > > > > >> > > performance and 2) increase throughput of Kafka consumer in
> > the
> > > > > Samza
> > > > > >> > > container."
> > > > > >> > >
> > > > > >> > > It's unclear to me how increasing the partition number will
> > > > increase
> > > > > >> the
> > > > > >> > > throughput of the kafka consumer in the container?
> > > Theoretically,
> > > > > you
> > > > > >> > will
> > > > > >> > > still be consuming the same amount of data in the container,
> > > > > >> irrespective
> > > > > >> > > of whether it is coming from one partition or more than one
> > > > expanded
> > > > > >> > > partitions. Can you please explain it for me here, what you
> > mean
> > > > by
> > > > > >> that?
> > > > > >> > >
> > > > > >> > > 2. I believe the second paragraph under motivation is simply
> > > > talking
> > > > > >> > about
> > > > > >> > > the scope of the current SEP. It will be easier to read if
> > what
> > > > > >> solution
> > > > > >> > is
> > > > > >> > > included in this SEP and what is left out as not in scope.
> > (for
> > > > > >> example,
> > > > > >> > > expansions for stateful jobs is supported or not).
> > > > > >> > >
> > > > > >> > > > We need to persist the task-to-sspList mapping in the
> > > > > >> > > coordinator stream so that the job can derive the original
> > > number
> > > > of
> > > > > >> > > partitions of each input stream regardless of how many times
> > the
> > > > > >> > partition
> > > > > >> > > has expanded. Does this make sense?
> > > > > >> > >
> > > > > >> > > Yes. It does!
> > > > > >> > >
> > > > > >> > > > I am not sure how this is related to the locality though.
> > Can
> > > > you
> > > > > >> > clarify
> > > > > >> > > your question if I haven't answered your question?
> > > > > >> > >
> > > > > >> > > It's not related. I just meant to give an example of yet
> > another
> > > > > >> > > coordinator message that is persisted. Your ssp-to-task
> > mapping
> > > is
> > > > > >> > > following a similar pattern for persisting. Just wanted to
> > > clarify
> > > > > >> that.
> > > > > >> > >
> > > > > >> > > > Can you let me know if this, together with the answers in
> > the
> > > > > >> previous
> > > > > >> > > email, addresses all your questions?
> > > > > >> > >
> > > > > >> > > Yes. I believe you have addressed most of my questions.
> Thanks
> > > for
> > > > > >> taking
> > > > > >> > > time to do that.
> > > > > >> > >
> > > > > >> > > > Is there specific question you have regarding partition
> > > > > >> > > expansion in Kafka?
> > > > > >> > >
> > > > > >> > > I guess my questions are on how partition expansion in Kafka
> > > > impacts
> > > > > >> the
> > > > > >> > > clients. Iiuc, partition expansions are done manually in
> Kafka
> > > > based
> > > > > >> on
> > > > > >> > the
> > > > > >> > > bytes-in rate of the partition. Do the existing kafka
> clients
> > > > handle
> > > > > >> this
> > > > > >> > > expansion automatically? if yes, how does it work? If not,
> are
> > > > there
> > > > > >> > plans
> > > > > >> > > to support it in the future?
> > > > > >> > >
> > > > > >> > > > Thus user's job should not need to bootstrap key/value
> store
> > > > from
> > > > > >> the
> > > > > >> > > changelog topic.
> > > > > >> > >
> > > > > >> > > Why is this discussion relevant here? Key/value store /
> > > changelog
> > > > > >> topic
> > > > > >> > > partition is scoped with the context of a task. Since we are
> > not
> > > > > >> changing
> > > > > >> > > the number of tasks, I don't think it is required to mention
> > it
> > > > > here.
> > > > > >> > >
> > > > > >> > > > The new method takes the SystemStreamPartition-to-Task
> > > > assignment
> > > > > >> from
> > > > > >> > > the previous job model which can be read from the
> coordinator
> > > > > stream.
> > > > > >> > >
> > > > > >> > > Jobmodel is currently not persisted to coordinator stream.
> In
> > > your
> > > > > >> > design,
> > > > > >> > > you talk about writing separate coordinator messages for
> > > > ssp-to-task
> > > > > >> > > assignments. Hence, please correct this statement. It is
> kind
> > of
> > > > > >> > misleading
> > > > > >> > > to the reader.
> > > > > >> > >
> > > > > >> > > My biggest gripe with this SEP is that it seems like a
> > > tailor-made
> > > > > >> > solution
> > > > > >> > > that relies on the semantics of the Kafka system and yet, we
> > are
> > > > > >> trying
> > > > > >> > to
> > > > > >> > > masquerade that as operational requirements for other
> systems
> > > > > >> interacting
> > > > > >> > > with Samza. (Not to say that this is the first time such a
> > > choice
> > > > is
> > > > > >> > being
> > > > > >> > > made in the Samza design). I am not seeing how this can a
> > > > "general"
> > > > > >> > > solution for all input systems. That's my two cents. I would
> > > like
> > > > to
> > > > > >> hear
> > > > > >> > > alternative points of view for this from other devs.
> > > > > >> > >
> > > > > >> > > Please make sure you have enough eyes on this SEP. If you
> do,
> > > > please
> > > > > >> > start
> > > > > >> > > a VOTE thread to approve this SEP.
> > > > > >> > >
> > > > > >> > > Thanks!
> > > > > >> > > Navina
> > > > > >> > >
> > > > > >> > >
> > > > > >> > > On Mon, May 29, 2017 at 12:32 AM, Dong Lin <
> > lindon...@gmail.com
> > > >
> > > > > >> wrote:
> > > > > >> > >
> > > > > >> > > > Hey Navina,
> > > > > >> > > >
> > > > > >> > > > I have updated the wiki based on your suggestion. More
> > > > > >> specifically, I
> > > > > >> > > have
> > > > > >> > > > made the following changes:
> > > > > >> > > >
> > > > > >> > > > - Improved Problem section and Motivation section to
> > describe
> > > > why
> > > > > we
> > > > > >> > use
> > > > > >> > > > the solution in this proposal instead of tackling the
> > problem
> > > of
> > > > > >> task
> > > > > >> > > > expansion directly.
> > > > > >> > > >
> > > > > >> > > > - Illustrate the design in a way that doesn't bind to
> Kafka.
> > > > Kafka
> > > > > >> is
> > > > > >> > > only
> > > > > >> > > > used as example to illustrate why we want to expand
> > partition
> > > > > >> expansion
> > > > > >> > > and
> > > > > >> > > > whether the operational requirement can be supported when
> > > Kafka
> > > > is
> > > > > >> used
> > > > > >> > > as
> > > > > >> > > > the input system. Note that the proposed solution should
> > work
> > > > for
> > > > > >> any
> > > > > >> > > input
> > > > > >> > > > system that meets the operational requirement described in
> > the
> > > > > wiki.
> > > > > >> > > >
> > > > > >> > > > - Fixed the problem in the figure.
> > > > > >> > > >
> > > > > >> > > > - Added a new class GroupBySystemStreamPartitionFi
> > xedTaskNum
> > > to
> > > > > the
> > > > > >> > > wiki.
> > > > > >> > > > Together with GroupByPartitionFixedTaskNum, it should
> ensure
> > > > that
> > > > > we
> > > > > >> > > have a
> > > > > >> > > > solution to enable partition expansion for all users that
> > are
> > > > > using
> > > > > >> > > > pre-defined grouper in Samza. Note that those users who
> use
> > > > custom
> > > > > >> > > grouper
> > > > > >> > > > would need to update their implementation.
> > > > > >> > > >
> > > > > >> > > > Can you let me know if this, together with the answers in
> > the
> > > > > >> previous
> > > > > >> > > > email, addresses all your questions? Thanks for taking
> time
> > to
> > > > > >> review
> > > > > >> > the
> > > > > >> > > > proposal.
> > > > > >> > > >
> > > > > >> > > > Regards,
> > > > > >> > > > Dong
> > > > > >> > > >
> > > > > >> > > >
> > > > > >> > > >
> > > > > >> > > >
> > > > > >> > > >
> > > > > >> > > >
> > > > > >> > > >
> > > > > >> > > > On Wed, May 24, 2017 at 11:15 PM, Dong Lin <
> > > lindon...@gmail.com
> > > > >
> > > > > >> > wrote:
> > > > > >> > > >
> > > > > >> > > > > Hey Navina,
> > > > > >> > > > >
> > > > > >> > > > > Thanks much for your comments. Please see my reply
> inline.
> > > > > >> > > > >
> > > > > >> > > > > On Wed, May 24, 2017 at 10:22 AM, Navina Ramesh
> (Apache) <
> > > > > >> > > > > nav...@apache.org> wrote:
> > > > > >> > > > >
> > > > > >> > > > >> Thanks for the SEP, Dong. I have a couple of questions
> to
> > > > > >> understand
> > > > > >> > > > your
> > > > > >> > > > >> proposal better:
> > > > > >> > > > >>
> > > > > >> > > > >> * Under motivation, you mention that "_We expect this
> > > > solution
> > > > > to
> > > > > >> > work
> > > > > >> > > > >> similarly with other input system as well._", yet I
> don't
> > > see
> > > > > any
> > > > > >> > > > >> discussion on how it will work with other input
> systems.
> > > That
> > > > > is,
> > > > > >> > what
> > > > > >> > > > >> kind
> > > > > >> > > > >> of contract does samza expect from other input systems
> ?
> > If
> > > > we
> > > > > >> are
> > > > > >> > not
> > > > > >> > > > >> planning to provide a generic solution, it might be
> worth
> > > > > >> calling it
> > > > > >> > > out
> > > > > >> > > > >> in
> > > > > >> > > > >> the SEP.
> > > > > >> > > > >>
> > > > > >> > > > >
> > > > > >> > > > > I think the contract we expect from other systems are
> > > exactly
> > > > > the
> > > > > >> > > > > operational requirement mentioned in the SEP, i.e.
> > > partitions
> > > > > >> should
> > > > > >> > > > always
> > > > > >> > > > > be doubled and the hash algorithm should module the
> number
> > > of
> > > > > >> > > partitions.
> > > > > >> > > > > SEP-5 should also allow partition expansion of all input
> > > > systems
> > > > > >> that
> > > > > >> > > > meet
> > > > > >> > > > > these two requirements. I have updated the motivation
> > > section
> > > > to
> > > > > >> > > clarify
> > > > > >> > > > > this.
> > > > > >> > > > >
> > > > > >> > > > >
> > > > > >> > > > >>
> > > > > >> > > > >> * I understand the partition mapping logic you have
> > > proposed.
> > > > > >> But I
> > > > > >> > > > think
> > > > > >> > > > >> the example explanation doesn't match the diagram. In
> the
> > > > > >> diagram,
> > > > > >> > > after
> > > > > >> > > > >> expansion, partiion-0 and partition-1 are pointing to
> > > bucket
> > > > 0
> > > > > >> and
> > > > > >> > > > >> partition-3 and partition-4 are pointing to bucket 1. I
> > > think
> > > > > the
> > > > > >> > > former
> > > > > >> > > > >> has to be partition-0 and partition-2 and the latter,
> is
> > > > > >> partition-1
> > > > > >> > > and
> > > > > >> > > > >> partition-3. If I am wrong, please help me understand
> the
> > > > logic
> > > > > >> :)
> > > > > >> > > > >>
> > > > > >> > > > >
> > > > > >> > > > > Good catch. I will update the figure to fix this
> problem.
> > > > > >> > > > >
> > > > > >> > > > >
> > > > > >> > > > >>
> > > > > >> > > > >> * I don't know how partition expansion in Kafka works.
> I
> > am
> > > > > >> familiar
> > > > > >> > > > with
> > > > > >> > > > >> how shard splitting happens in Kinesis - there is
> > > > hierarchical
> > > > > >> > > relation
> > > > > >> > > > >> between the parent and child shards. This way, it will
> > also
> > > > > allow
> > > > > >> > the
> > > > > >> > > > >> shards to be merged back. Iiuc, Kafka only supports
> > > partition
> > > > > >> > > > "expansion",
> > > > > >> > > > >> as opposed to "splits". Can you provide some context or
> > > link
> > > > > >> related
> > > > > >> > > to
> > > > > >> > > > >> how
> > > > > >> > > > >> partition expansion works in Kafka?
> > > > > >> > > > >>
> > > > > >> > > > >
> > > > > >> > > > > I couldn't find any wiki on partition expansion in
> Kafka.
> > > The
> > > > > >> > partition
> > > > > >> > > > > expansion logic in Kafka is very simply -- it simply
> adds
> > > new
> > > > > >> > partition
> > > > > >> > > > to
> > > > > >> > > > > the existing topic. Is there specific question you have
> > > > > regarding
> > > > > >> > > > partition
> > > > > >> > > > > expansion in Kafka?
> > > > > >> > > > >
> > > > > >> > > > >
> > > > > >> > > > >>
> > > > > >> > > > >> * Are you only recommending that expansion can be
> > supported
> > > > for
> > > > > >> > samza
> > > > > >> > > > jobs
> > > > > >> > > > >> that use Kafka as input systems **and** configure the
> > > > > SSPGrouper
> > > > > >> as
> > > > > >> > > > >> GroupByPartitionFixedTaskNum? Sounds to me like this
> only
> > > > > applies
> > > > > >> > for
> > > > > >> > > > >> GroupByPartition. Please correct me if I am wrong. What
> > is
> > > > the
> > > > > >> > > > expectation
> > > > > >> > > > >> for custom SSP Groupers?
> > > > > >> > > > >>
> > > > > >> > > > >
> > > > > >> > > > > The expansion can be supported for Samza jobs if the
> input
> > > > > system
> > > > > >> > meets
> > > > > >> > > > > the operational requirement mentioned above. It doesn't
> > have
> > > > to
> > > > > >> use
> > > > > >> > > Kafka
> > > > > >> > > > > as input system.
> > > > > >> > > > >
> > > > > >> > > > > The current proposal provided solution for jobs that
> > > currently
> > > > > use
> > > > > >> > > > > GroupByPartition. The proposal can be extended to
> support
> > > jobs
> > > > > >> that
> > > > > >> > use
> > > > > >> > > > > other grouper that are pre-defined in Samza. The custom
> > SSP
> > > > > >> grouper
> > > > > >> > > needs
> > > > > >> > > > > to handle partition expansion similar to how
> > > > > >> > > GroupByPartitionFixedTaskNum
> > > > > >> > > > > handles it and it is users' responsibility to update
> their
> > > > > custom
> > > > > >> > > grouper
> > > > > >> > > > > implementation.
> > > > > >> > > > >
> > > > > >> > > > >
> > > > > >> > > > >>
> > > > > >> > > > >> * Regarding storing SSP-to-Task assignment to
> coordinator
> > > > > stream:
> > > > > >> > > Today,
> > > > > >> > > > >> the JobModel encapsulates the data model in samza which
> > > also
> > > > > >> > includes
> > > > > >> > > > >> **TaskModels**. TaskModel, typically shows the
> > > > task-to-sspList
> > > > > >> > > mapping.
> > > > > >> > > > >> What is the reason for using a separate coordinator
> > stream
> > > > > >> message
> > > > > >> > > > >> *SetSSPTaskMapping*? Is it because the JobModel itself
> is
> > > not
> > > > > >> > > persisted
> > > > > >> > > > in
> > > > > >> > > > >> the coordinator stream today?  The reason locality
> exists
> > > > > >> outside of
> > > > > >> > > the
> > > > > >> > > > >> jobmodel is because *locality* information is written
> by
> > > each
> > > > > >> > > container,
> > > > > >> > > > >> where as it is consumed only by the leader
> > > jobcoordinator/AM.
> > > > > In
> > > > > >> > this
> > > > > >> > > > >> case,
> > > > > >> > > > >> the writer of the mapping information and the reader is
> > > still
> > > > > the
> > > > > >> > > leader
> > > > > >> > > > >> jobcoordinator/AM. So, I want to understand the
> > motivation
> > > > for
> > > > > >> this
> > > > > >> > > > >> choice.
> > > > > >> > > > >>
> > > > > >> > > > >
> > > > > >> > > > > Yes, the reason for using a separate coordinate stream
> > > message
> > > > > is
> > > > > >> > > because
> > > > > >> > > > > the task-to-sspList mapping is not currently persisted
> in
> > > the
> > > > > >> > > coordinator
> > > > > >> > > > > stream. We wouldn't need to create this new stream
> message
> > > if
> > > > > >> > JobModel
> > > > > >> > > is
> > > > > >> > > > > persisted. We need to persist the task-to-sspList
> mapping
> > in
> > > > the
> > > > > >> > > > > coordinator stream so that the job can derive the
> original
> > > > > number
> > > > > >> of
> > > > > >> > > > > partitions of each input stream regardless of how many
> > times
> > > > the
> > > > > >> > > > partition
> > > > > >> > > > > has expanded. Does this make sense?
> > > > > >> > > > >
> > > > > >> > > > > I am not sure how this is related to the locality
> though.
> > > Can
> > > > > you
> > > > > >> > > clarify
> > > > > >> > > > > your question if I haven't answered your question?
> > > > > >> > > > >
> > > > > >> > > > > Thanks!
> > > > > >> > > > > Dong
> > > > > >> > > > >
> > > > > >> > > > >
> > > > > >> > > > >>
> > > > > >> > > > >> Cheers!
> > > > > >> > > > >> Navina
> > > > > >> > > > >>
> > > > > >> > > > >> On Tue, May 23, 2017 at 5:45 PM, Dong Lin <
> > > > lindon...@gmail.com
> > > > > >
> > > > > >> > > wrote:
> > > > > >> > > > >>
> > > > > >> > > > >> > Hi all,
> > > > > >> > > > >> >
> > > > > >> > > > >> > We created SEP-5: Enable partition expansion of input
> > > > > streams.
> > > > > >> > > Please
> > > > > >> > > > >> find
> > > > > >> > > > >> > the SEP wiki in the link
> > > > > >> > > > >> > https://cwiki.apache.org/
> confluence/display/SAMZA/SEP-
> > > > > >> > > > >> > 5%3A+Enable+partition+expansion+of+input+streams
> > > > > >> > > > >> > .
> > > > > >> > > > >> >
> > > > > >> > > > >> > You feedback is appreciated!
> > > > > >> > > > >> >
> > > > > >> > > > >> > Thanks,
> > > > > >> > > > >> > Dong
> > > > > >> > > > >> >
> > > > > >> > > > >>
> > > > > >> > > > >
> > > > > >> > > > >
> > > > > >> > > >
> > > > > >> > >
> > > > > >> >
> > > > > >>
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to