Yi,
Thanks for summarizing. I think we should deal with further code related
changes/discussions in the PR directly since this SEP has been open for a
while. Let's try to wrap up the discussions by today.

@Dong: Thanks for updating the SEP. I think the TestPlan section is TBD
right now. You can update it whenever you get to it. Thanks a bunch for
your patience!

Cheers!
Navina

On Thu, Jun 22, 2017 at 3:36 PM, Dong Lin <lindon...@gmail.com> wrote:

> Hey Yi,
>
> Thanks for the detailed comment and the summary!
>
> To address your comments:
>
> 1) The current names are GroupByPartitionWithFixedTaskNum and
> GroupBySystemStreamPartitionWithFixedTaskNum. Instead of
> FixedTasksGroupByPartition
> and FixedTasksGroupBySystemStreamPartition, how about GroupByPartition
> FixedTasks and GroupBySystemStreamPartitionFixedTasks? The new names are
> equally long as the names you suggested. It seems a bit more intuitive
> because they would be prefixed with the grouper class name of their
> no-fixed-tasks counterpart. I have updated wiki with the new names. Can you
> let me know if it is OK?
>
> 2) Initially I want to design that config and interface later when we have
> more use-case so that we can have higher confidence in the interface
> design. But it seems that one common concern with the proposal is about its
> limitation assumption in the the old-partition-to-new-partition mapping. I
> have updated the wiki to illustrate the design of this interface and the
> new (and more general) assumption for the input system to use this
> partition expansion. Can you take a look and see if it is reasonable?
>
> 3) Yeah previously Jacob has raised the same concern and the solution is
> exactly the same as you suggested.
>
> Hey everyone,
>
> I have made non-trivial change to the wiki to illustrate the use of new
> config and interface for user to specify new-partition-to-old-partition
> mapping. Can you please help review it?
>
> Thanks,
> Dong
>
>
> On Thu, Jun 22, 2017 at 2:25 AM, Yi Pan <nickpa...@gmail.com> wrote:
>
> > Hi, Dong and everyone,
> >
> > Thanks for the detailed discussion on SEP-5! Really appreciate the
> thorough
> > consideration on this issue. I also noticed that Dong has updated the
> SEP-5
> > wiki to clarify:
> > 1) SEP-5 provides a solution to retain the same number of task/state w/o
> > re-partitioning (as illustrated in the stateful join example)
> > 2) Future work to expand number of tasks need to work together with
> > flexible re-partitioning to provide a complete solution
> >
> > Due to the cost to be paid in task number expansion:
> > 1) additional network I/O and latency in re-partitioning
> > 2) shuffling of the states among tasks
> > The current form of SEP-5 provides an alternative when partition
> expansion
> > in the messaging system is not due to increase of total input rate.
> >
> > The concern on the added complexity in grouper logic is valid. However,
> the
> > grouper-based solution is not completely unreasonable:
> > 1) Grouper is a public interface and we are already open to customized
> > implementation of groupers, although not being a main use case
> > 2) Deprecation of existing config-driven grouper needs longer time effort
> > to wait for fluent API has a better planner to automatically figuring out
> > the grouper to be used and stateful task expansion is automated. Hence,
> for
> > a foreseeable long time, grouper is still configured by the user.
> >
> > So, in general, I am in favor of the proposed SEP-5, given that it
> provides
> > a least-resistance to address some pain points for Samza users, w/o
> > breaking any existing use cases in opt-in mode.
> >
> > Some minor suggestions:
> > 1) The class names are too long. Can we change them to
> > FixedTasksGroupByPartition and FixedTasksGroupBySystemStreamPartition?
> > 2) I am still in favor of configurable partition expansion (i.e.
> new<->old
> > partition mapping) policy, since it makes this solution more general and
> > not fixed for Kafka. I am OK with default to power-of-2 expansion policy
> > and not introducing new config variable now.
> > 3) In the checkpoint/coordinator topic validation, KafkaCheckpointLogKey
> > class validates the current grouper factory class == the previous grouper
> > factory class in previous checkpoint. We need to make sure that we allow
> > the compatible change from GroupByPartition to
> FixedTasksGroupByPartition,
> > etc. Since FixedTasksGroupByPartition is a derived interface of
> > GroupByPartition, one possible solution is to check assignable (if
> current
> > grouper factory class is assignable to the previous grouper factory
> class)
> >
> > Thanks a lot!
> >
> > On Wed, Jun 21, 2017 at 5:11 PM, Navina Ramesh (Apache) <
> nav...@apache.org
> > >
> > wrote:
> >
> > > > But IMO it is the best available solution towards the support of
> > > partition expansion in comparison to alternative, no?
> > >
> > > At this time, relative to the other alternatives you have listed, this
> > is a
> > > path of least effort to solving this problem. I agree to that. :)
> > >
> > > > I can merge those two sections or update the statement if the current
> > > statement
> > > has not clearly explained the reason of partition expansion in Kafka.
> > >
> > > Given the significance of what you are actually trying to solve, I
> think
> > it
> > > will be better to have it in points. Let me come find you and we can
> > update
> > > it.
> > >
> > > > I have updated wiki and added the task expansion to the Future Work
> > > section.
> > > On the other hand I still keep it in the Rejected Alternative section
> to
> > > explain why this future work does not replace the existing proposal in
> > > SEP-5. Does this sound reasonable?
> > >
> > > It is very confusing to me how the same point can be under "Future
> Work"
> > > and "Rejected Alternative". There is no question about the future work
> > > *replacing* SEP-5. Iiuc, this SEP is a subset for the partition
> expansion
> > > solution. So, I don't think increasing task count should be a rejected
> > > alternative.
> > >
> > > > I am also not sure why a feature needs to be "utmost priority" in
> order
> > > to be accepted. Can you explain a bit on that?
> > >
> > > I don't think I ever claimed that the feature needs to be of "utmost
> > > priority" to be accepted. I was just stating my opinion.
> > >
> > >
> > > Thanks!
> > > Navina
> > >
> > > On Wed, Jun 21, 2017 at 3:52 PM, Dong Lin <lindon...@gmail.com> wrote:
> > >
> > > > Thanks much for the reply Navina. Please see my reply inline.
> > > >
> > > > On Wed, Jun 21, 2017 at 2:57 PM, Navina Ramesh (Apache) <
> > > nav...@apache.org
> > > > >
> > > > wrote:
> > > >
> > > > > Thanks to Jake, Dong and Kartik for keeping the discussion going.
> > > > >
> > > > > > Here are the pros and cons of the extra re-partitioning stage in
> > > > > comparison
> > > > > to SEP-5.
> > > > >
> > > > > I think that is good summarization of pros/cons for the
> > repartitioning
> > > > > stage based solution. Can you please include it in your SEP? It
> seems
> > > > like
> > > > > you already have access. If you are still unable to access the wiki
> > > page,
> > > > > feel free to walk over to Samza area and find me!
> > > > >
> > > >
> > > > Sure. I have added this summary to the Alternative Section.
> > > >
> > > >
> > > > >
> > > > > > I think there is always a way for user to mess up their job if
> they
> > > > > configure the Samza job incorrectly.
> > > > >
> > > > > I don't think Jake or anyone is arguing about an "incorrectly"
> > > configured
> > > > > Samza job. The question was towards how easy/difficult it is for
> > users
> > > to
> > > > > *not mess* up their job with incorrect configurations.
> > > > >
> > > > > > I also think the assumption made in this SEP is not particularly
> > > harder
> > > > > to understand than other existing configs in Samza.
> > > > >
> > > > > I disagree here. Other configs don't require you understand more
> than
> > > one
> > > > > assumption.
> > > > >
> > > > > There is already an overload of configs in Samza and I think we are
> > > > trying
> > > > > to shield it as much as possible from the users (esp. with fluent
> > api).
> > > > > More specifically, we don't want the user to know about the
> internals
> > > of
> > > > > Samza such ssp grouper, taskname grouper etc. Since the proposed
> > > solution
> > > > > makes the configuration more complex to understand, it *is a*
> burden
> > on
> > > > the
> > > > > user.
> > > > >
> > > > > Just because configs are the way it is, it doesn't mean we increase
> > the
> > > > > complexity of it and push the burden on users to manage it
> correctly.
> > > My
> > > > > two cents.
> > > > >
> > > >
> > > > Sure, I agree the proposal requires user to understand the assumption
> > in
> > > > order to expand the partition of the topic. But it is very subjective
> > as
> > > to
> > > > whether the added complexity is acceptable or not. If there is better
> > way
> > > > to allow user to expand partition of the input stream without making
> > > > assumption, then we can just do that. The current solution is not
> > > perfect.
> > > > But IMO it is the best available solution towards the support of
> > > partition
> > > > expansion in comparison to alternative, no?
> > > >
> > > >
> > > > > Here are a few things that I believe are needed for wrapping up the
> > > SEP:
> > > > >
> > > > > 1. For the longest time, I thought partition expansion happens in
> > Kafka
> > > > > only when the volume of messages across partitions is too high.
> Based
> > > on
> > > > > this assumption, I would only assume that re-mapping expanded
> > > partitions
> > > > to
> > > > > the same task will have adverse effect on the throughput/resource
> > > > > utilization of the processor/container in Samza (for example, disk
> > > > > utilization may increase significantly. With disk quota throttling,
> > it
> > > > > could cause the processor to drop.). However, after speaking with
> > > Xinyu,
> > > > it
> > > > > turns out that partition expansion also happens when there is a
> > > > > per-partition data retention limit imposed by Kafka (not sure if it
> > is
> > > > only
> > > > > in LinkedIn or in Kafka open-source as well). Imo, this is the
> > primary
> > > > > use-case that we are trying to solve for in Samza and it is not
> very
> > > > > obvious from the SEP.
> > > > > @Dong, can you please explain *the circumstances under which
> > partition
> > > > > expansion can happen*, under "Motivation" section?  I disagree to
> the
> > > > > current motivation described as -> "This design doc provides a
> > solution
> > > > to
> > > > > increase partition number of the input streams of a stateful Samza
> > job
> > > > > while still ensuring the correctness of Samze job output. "
> > > > > This is a solution, albeit not fully done through this SEP alone.
> > > > >
> > > >
> > > > This is actually already described in the Problem and Goal section,
> > i.e.
> > > > "For example, Kafka generally needs to limit the maximum size of each
> > > > partition to scale up its performance. Thus the number of partitions
> > of a
> > > > Kafka topic needs to be expanded to reduce the partition size if the
> > > > average byte-in-rate or retention time of the Kafka topic has
> > doubled". I
> > > > can merge those two sections or update the statement if the current
> > > > statement has not clearly explained the reason of partition expansion
> > in
> > > > Kafka.
> > > >
> > > >
> > > > >
> > > > > 2. I think we are in consensus about the fact that increasing the
> > task
> > > > > number and handling the state correctly is a good solution for
> Samza
> > in
> > > > the
> > > > > long-run. In your rejected alternatives, you mention "However, this
> > > > feature
> > > > > alone does not solve the problem of allowing partition expansion.".
> > > What
> > > > > else is required to allow partition expansion? Can you please
> > elaborate
> > > > on
> > > > > that in point #1 of the rejected alternatives? If there is still
> more
> > > > work
> > > > > to be done to support partition expansion in Samza, it is
> worthwhile
> > to
> > > > > mention it under *Future Work*, instead of under "Rejected
> > > Alternatives".
> > > > > Perhaps you were waiting for edit permissions to the wiki. Please
> > make
> > > > this
> > > > > change so it is well-tracked.
> > > > >
> > > >
> > > > I thought this is already explained in the rejected alternative
> > section.
> > > > More specifically, it is said that "However, this feature alone does
> > not
> > > > solve the problem of allowing partition expansion. For example, say
> we
> > > have
> > > > a job that joins two streams both of which have 3 partitions. If
> > > partition
> > > > number of one stream increases from 3 to 6, we would still want the
> > task
> > > > number to remain 3 to make sure that messages with the same key from
> > both
> > > > streams will be handled by the same task. This needs to be done with
> > the
> > > > new grouper classes proposed in this doc."
> > > >
> > > > Does this explanation make sense?
> > > >
> > > > I have updated wiki and added the task expansion to the Future Work
> > > > section. On the other hand I still keep it in the Rejected
> Alternative
> > > > section to explain why this future work does not replace the existing
> > > > proposal in SEP-5. Does this sound reasonable?
> > > >
> > > >
> > > > > I am still not totally crazy about the proposed solution because it
> > is
> > > > not
> > > > > clear for open-source, who or which use-cases stand to benefit. I
> am
> > > not
> > > > > convinced that this problem is of utmost priority for the Samza
> > > community
> > > > > *at this point of time*.
> > > > >
> > > >
> > > > I think the Problem and Goal section and the Motivation section have
> > > > illustrated the use-case for this feature. Let me answer your
> questions
> > > > more specifically:
> > > >
> > > > *Who will benefit from this feature:* any Samza user who runs
> stateful
> > > job
> > > > with input from Kafka and needs to expand partition of the input
> stream
> > > so
> > > > that the single partition size doesn't exceed a threshold.
> > > >
> > > > *Which use-case stand to benefit:* this SEP-5 is useful if user runs
> > > > stateful job with input from Kafka and needs to expand partition of
> the
> > > > input stream so that the single partition size doesn't exceed a
> > > threshold.
> > > >
> > > > *Why it is a important feature:* a user needs this feature if he runs
> > > > stateful job with input from Kafka and the partition size of Kafka
> has
> > > > become too large due to increase in throughput or increase in
> retention
> > > > time.
> > > >
> > > > I am not sure what kind of feature can be classified at "utmost
> > > priority".
> > > > I am also not sure why a feature needs to be "utmost priority" in
> order
> > > to
> > > > be accepted. Can you explain a bit on that? I think we should develop
> > > > feature that has a valid use-case.
> > > >
> > > >
> > > > > I am on the same page as Jake on this one. Not a +1, just a 0 (if
> > that
> > > > even
> > > > > matters).
> > > > >
> > > > > Thanks!
> > > > > Navina
> > > > >
> > > > > On Sun, Jun 18, 2017 at 12:04 AM, Dong Lin <lindon...@gmail.com>
> > > wrote:
> > > > >
> > > > > > BTW, I will update the SEP-5 wiki with our latest discussion
> after
> > I
> > > > have
> > > > > > got the wiki edit access.
> > > > > >
> > > > > > On Sat, Jun 17, 2017 at 11:36 PM, Dong Lin <lindon...@gmail.com>
> > > > wrote:
> > > > > >
> > > > > > > Thanks everyone for the comment!
> > > > > > >
> > > > > > > I am currently leaning towards the current approach. I think
> > Kartik
> > > > > > raised
> > > > > > > a good point that the extra repartitoning stage will also incur
> > > > > > additional
> > > > > > > throughput on Kafka in addition to the potential storage cost.
> > Any
> > > > > other
> > > > > > > Samza developers also chime in and provide your opinions on
> this
> > > > > > proposal?
> > > > > > >
> > > > > > > Since this discussion thread has been open for three weeks, I
> > will
> > > > > > > initiate voting thread on Monday if there is no major revision
> > > > > > suggestion.
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Dong
> > > > > > >
> > > > > > >
> > > > > > > On Thu, Jun 15, 2017 at 6:32 PM, Kartik Paramasivam <
> > > > > > > kparamasi...@linkedin.com.invalid> wrote:
> > > > > > >
> > > > > > >> Great discussion !
> > > > > > >>
> > > > > > >> Here are some more thoughts
> > > > > > >>
> > > > > > >> The point that repartitioning is a more general purpose
> solution
> > > is
> > > > > > surely
> > > > > > >> spot on.  For many source systems (Kinesis, Google Pub-Sub,
> any
> > of
> > > > the
> > > > > > >> older queuing systems (rabbitMQ etc. etc.), repartitioning is
> > > > anyways
> > > > > > >> functionally required to do even simple keyed aggregations.
> >  But
> > > in
> > > > > > most
> > > > > > >> of these systems, the concept of repartitioning either does
> not
> > > > exist
> > > > > or
> > > > > > >> exists in a way which is very unique (e.g. Kinesis).
> > > > > > >>
> > > > > > >> I think this feature is really only interesting for source
> > systems
> > > > > like
> > > > > > >> Kafka and EventHub.  EventHub (last I checked) didn't support
> > > > > > >> repartitioning. So this is probably not super-interesting
> (yet)
> > > for
> > > > > > >> EventHub.
> > > > > > >>
> > > > > > >> So Kafka is clearly the main use case here.
> > > > > > >>
> > > > > > >> For Kafka, I think it is pretty rare for people to customize
> the
> > > > > hashing
> > > > > > >> algorithm for sending messages.  I would argue that less than
> 5%
> > > of
> > > > > the
> > > > > > >> population (i am being generous ;)) would do that.   The
> current
> > > > > > proposal
> > > > > > >> works with the default hashing scheme for Kafka.  So
> > organizations
> > > > > will
> > > > > > >> typically never have to coordinate.
> > > > > > >>
> > > > > > >> If the proposed alternative (always repartition) was
> side-effect
> > > > free,
> > > > > > >> then
> > > > > > >> it would make sense to use an alternative design that would
> work
> > > for
> > > > > > 100%
> > > > > > >> of the population.    Repartitioning all input would however
> not
> > > be
> > > > a
> > > > > > >> feasible solution (atleast at LinkedIn) as it would double the
> > > kafka
> > > > > > >> workload.    If many samza jobs read from kafka topics, then
> the
> > > > > > increase
> > > > > > >> would be a function of the number of samza jobs.
> > > > > > >>
> > > > > > >> For low throughput kafka topics, surely explicit
> repartitioning
> > > > using
> > > > > > >> fluent api is feasible.
> > > > > > >>
> > > > > > >> If the proposal was to make this new policy the default then
> > that
> > > > > would
> > > > > > >> clearly not make much sense.
> > > > > > >>
> > > > > > >> But it is an opt in policy.  If it is not applicable, people
> > don't
> > > > > have
> > > > > > to
> > > > > > >> use it.
> > > > > > >>
> > > > > > >> I do have some questions about the implementation. I will try
> to
> > > > > respond
> > > > > > >> back after spending some more time on this.
> > > > > > >>
> > > > > > >>
> > > > > > >>
> > > > > > >> On Thu, Jun 15, 2017 at 7:53 AM, Jacob Maes <
> > jacob.m...@gmail.com
> > > >
> > > > > > wrote:
> > > > > > >>
> > > > > > >> > Thanks, Dong.
> > > > > > >> >
> > > > > > >> > The summary looks accurate.
> > > > > > >> >
> > > > > > >> > I'll let the others chime in, as I believe my perspective
> has
> > > been
> > > > > > >> > adequately captured in this thread.
> > > > > > >> >
> > > > > > >> > -Jake
> > > > > > >> >
> > > > > > >> > On Wed, Jun 14, 2017 at 12:12 PM, Dong Lin <
> > lindon...@gmail.com
> > > >
> > > > > > wrote:
> > > > > > >> >
> > > > > > >> > > Hey Jacob,
> > > > > > >> > >
> > > > > > >> > > Thank you for taking so much time to discuss with me! I
> > > > appreciate
> > > > > > the
> > > > > > >> > > discussion and the insight. I will summarize our
> discussion
> > > > below.
> > > > > > >> > >
> > > > > > >> > > 1) Whether it is reasonable to store partition-to-task
> > > mapping.
> > > > > > >> > >
> > > > > > >> > > We agree that this partition-to-task mapping will be
> > > reasonable
> > > > if
> > > > > > we
> > > > > > >> > allow
> > > > > > >> > > user to specify either the new-partition-to-old-partition
> > > > mapping
> > > > > or
> > > > > > >> > > key-to-partition mapping in the future. SEP-5 doesn't
> > > currently
> > > > > > >> provide a
> > > > > > >> > > way for user to specify new-partition-to-old partition
> > mapping
> > > > > > >> because we
> > > > > > >> > > don't have a good idea about that interface until we try
> to
> > > > enable
> > > > > > >> > > partition expansion for input system other than Kafka in
> the
> > > > > future.
> > > > > > >> This
> > > > > > >> > > is currently specified as the third future work in SEP-5.
> > > > > > >> > >
> > > > > > >> > > And if we decide to implement SEP-5, I will include a
> > warning
> > > > > > message
> > > > > > >> > > regarding the use of partition-to-task, i.e. "this does
> not
> > > > > specify
> > > > > > >> the
> > > > > > >> > > key-to-task mapping". We agree that this could address the
> > > > concern
> > > > > > >> here.
> > > > > > >> > >
> > > > > > >> > > 2) Whether we should follow the approach in SEP-5 or use
> an
> > > > extra
> > > > > > >> > > re-partitioning stage in the stateful Samza job to enable
> > > > > partition
> > > > > > >> > > expansion.
> > > > > > >> > >
> > > > > > >> > > Here are the pros and cons of the extra re-partitioning
> > stage
> > > in
> > > > > > >> > comparison
> > > > > > >> > > to SEP-5.
> > > > > > >> > >
> > > > > > >> > > Pros:
> > > > > > >> > > - It doesn't require owner of the Samza job to know the
> > > > > partitioning
> > > > > > >> > > algorithm of used for the input stream. If the owner of
> the
> > > > Samza
> > > > > > job
> > > > > > >> is
> > > > > > >> > in
> > > > > > >> > > a different organization than the producer of the input
> > > stream,
> > > > > this
> > > > > > >> > > solution frees different organizations from having to
> > > coordinate
> > > > > > with
> > > > > > >> > each
> > > > > > >> > > other.
> > > > > > >> > > - It doesn't require owner of the Samza job to specify the
> > > > > > >> partitioning
> > > > > > >> > > algorithm of used for the input stream. Thus less config.
> > > > > > >> > >
> > > > > > >> > > Cons:
> > > > > > >> > > - User has to make code change on their side to use the
> new
> > > > fluent
> > > > > > >> API.
> > > > > > >> > > - The extra partitioning stage would potentially increases
> > > > > latency.
> > > > > > >> > > - The extra partitioning stage would incur additional cost
> > due
> > > > to
> > > > > > the
> > > > > > >> > extra
> > > > > > >> > > internal topic. The cost is probably not that much with
> the
> > > new
> > > > > > trim()
> > > > > > >> > API
> > > > > > >> > > in Kafka if Samza uses Kafka to store the internal topic.
> > But
> > > > the
> > > > > > cost
> > > > > > >> > may
> > > > > > >> > > be doubled if Samza uses another input system that doesn't
> > > > provide
> > > > > > >> trim()
> > > > > > >> > > API to delete data on demand.
> > > > > > >> > >
> > > > > > >> > > My recommendation is to adopt a hybrid solution, i.e. we
> > still
> > > > > > >> implement
> > > > > > >> > > the current proposal in SEP-5 so that we enable partition
> > > > > expansion
> > > > > > >> > without
> > > > > > >> > > incurring extra latency/cost and without requiring users
> to
> > > > change
> > > > > > >> their
> > > > > > >> > > code. And we can recommend user to use the extra
> > partitioning
> > > > > stage
> > > > > > if
> > > > > > >> > the
> > > > > > >> > > coordination among different organization is indeed a
> > concern.
> > > > > > >> > >
> > > > > > >> > > Can other developers also provide feedback regarding your
> > > > > preference
> > > > > > >> > > between the two?
> > > > > > >> > >
> > > > > > >> > > Thanks,
> > > > > > >> > > Dong
> > > > > > >> > >
> > > > > > >> > >
> > > > > > >> > >
> > > > > > >> > >
> > > > > > >> > > On Tue, Jun 13, 2017 at 9:30 AM, Jacob Maes <
> > > > jacob.m...@gmail.com
> > > > > >
> > > > > > >> > wrote:
> > > > > > >> > >
> > > > > > >> > > > Hey Dong,
> > > > > > >> > > >
> > > > > > >> > > > I appreciate your thoughtful responses. Let's do one
> more
> > > > round
> > > > > > :-)
> > > > > > >> > > >
> > > > > > >> > > >
> > > > > > >> > > > > Here are my current concern with the three
> alternatives
> > > you
> > > > > > >> described
> > > > > > >> > > > > earlier:
> > > > > > >> > > > > - The first alternative requires support from input
> > system
> > > > > which
> > > > > > >> is
> > > > > > >> > > > > currently not available. It will limit the usage of
> > > > partition
> > > > > > >> > expansion
> > > > > > >> > > > to
> > > > > > >> > > > > only systems that support such interface. And it is
> not
> > > > > > guaranteed
> > > > > > >> > that
> > > > > > >> > > > we
> > > > > > >> > > > > can persuade the developer of the input system to add
> > this
> > > > > > >> interface.
> > > > > > >> > > > This
> > > > > > >> > > > > is not desirable for Samza in the long term.
> > > > > > >> > > >
> > > > > > >> > > > Agreed. It is very wishful thinking that each supported
> > > system
> > > > > > would
> > > > > > >> > > > provide such a contract.
> > > > > > >> > > >
> > > > > > >> > > >
> > > > > > >> > > > >
> > > > > > >> > > > > - I can not comment on the second alternative because
> I
> > > > don't
> > > > > > >> > > understand
> > > > > > >> > > > > how it reshuffles all existing changelog data. We can
> > > > discuss
> > > > > > >> more if
> > > > > > >> > > > there
> > > > > > >> > > > > is more specific detail. My gut feel is that this will
> > be
> > > > > > complex
> > > > > > >> and
> > > > > > >> > > > > carries performance overhead.
> > > > > > >> > > >
> > > > > > >> > > > After giving this more thought, I agree. There is no
> clear
> > > way
> > > > > to
> > > > > > >> > > migrate a
> > > > > > >> > > > changelog without knowing the original key->partition
> > > mapping.
> > > > > > Which
> > > > > > >> > > leads
> > > > > > >> > > > us to alternative 3...
> > > > > > >> > > >
> > > > > > >> > > >
> > > > > > >> > > > >
> > > > > > >> > > > > - The third alternative requires performance overhead.
> > > Given
> > > > > > that
> > > > > > >> > user
> > > > > > >> > > > can
> > > > > > >> > > > > already use this solution to enable partition
> expansion,
> > > > maybe
> > > > > > >> Samza
> > > > > > >> > > > > developers can provide more input as to why we are not
> > > doing
> > > > > it
> > > > > > by
> > > > > > >> > > > default.
> > > > > > >> > > > > My gut feel is that it carries considerable
> performance
> > > > > overhead
> > > > > > >> and
> > > > > > >> > > > > increases the cost-to-serve Samze job (e.g. disk
> usage),
> > > > which
> > > > > > may
> > > > > > >> > make
> > > > > > >> > > > it
> > > > > > >> > > > > undesirable in the long term.
> > > > > > >> > > >
> > > > > > >> > > > I think the only performance overhead would be the
> > mandatory
> > > > > > >> > > repartitioning
> > > > > > >> > > > stage for stateful jobs. But a repartitioner is usually
> > much
> > > > > > faster
> > > > > > >> > than
> > > > > > >> > > > the downstream stateful job, so it only seems a cost to
> > > serve
> > > > > > issue.
> > > > > > >> > > >
> > > > > > >> > > > As for why we aren't already doing this, I would posit
> > that
> > > > > before
> > > > > > >> the
> > > > > > >> > > > introduction of the high level API, which trivializes
> > > > > > >> repartitioning,
> > > > > > >> > it
> > > > > > >> > > > was unreasonable to expect each job owner to do the
> > > mandatory
> > > > > > >> > > > repartitioning. With the high level API, I would argue
> > this
> > > is
> > > > > > much
> > > > > > >> > more
> > > > > > >> > > > doable.
> > > > > > >> > > >
> > > > > > >> > > >
> > > > > > >> > > > I am not sure it is true that "any future feature that
> > > > utilizes
> > > > > > this
> > > > > > >> > > > > mapping without accounting for the assumptions of this
> > SEP
> > > > is
> > > > > > >> likely
> > > > > > >> > to
> > > > > > >> > > > > malfunction". Suppose we allow user to specify
> > > > > > >> new-to-old-partition
> > > > > > >> > > > > mapping, then we can use the partition-to-task mapping
> > > > > correctly
> > > > > > >> > > without
> > > > > > >> > > > > replying on the assumption in this SEP, right?
> > > > > > >> > > >
> > > > > > >> > > > Right, but my point was that the partition->task mapping
> > is
> > > > not
> > > > > > >> > > sufficient
> > > > > > >> > > > by itself. So adding it by itself is potentially
> > misleading.
> > > > > > >> > > >
> > > > > > >> > > > On Mon, Jun 12, 2017 at 8:34 PM, Dong Lin <
> > > > lindon...@gmail.com>
> > > > > > >> wrote:
> > > > > > >> > > >
> > > > > > >> > > > > Thanks for the reply Jacob. Please see my comment
> > inline.
> > > > > > >> > > > >
> > > > > > >> > > > > On Mon, Jun 12, 2017 at 7:51 PM, Jacob Maes <
> > > > > > jacob.m...@gmail.com
> > > > > > >> >
> > > > > > >> > > > wrote:
> > > > > > >> > > > >
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > - For users that need partition expansion of the
> > input
> > > > > > streams
> > > > > > >> > for
> > > > > > >> > > > > > stateful
> > > > > > >> > > > > > > job, they have a really big headache in the sense
> > that
> > > > > Samza
> > > > > > >> does
> > > > > > >> > > not
> > > > > > >> > > > > > allow
> > > > > > >> > > > > > > partition expansion for stateful job. SEP-5
> > addresses
> > > > this
> > > > > > >> > headache
> > > > > > >> > > > for
> > > > > > >> > > > > > > them.
> > > > > > >> > > > > > > You are right that SEP-5 requires user to
> understand
> > > and
> > > > > > >> enforce
> > > > > > >> > > > > > > limitations across organizations. But it is still
> > much
> > > > > > better
> > > > > > >> > than
> > > > > > >> > > > not
> > > > > > >> > > > > > > allowing user to expansion partition for stateful
> > jobs
> > > > at
> > > > > > all,
> > > > > > >> > > right?
> > > > > > >> > > > > > Did I
> > > > > > >> > > > > > > miss something here?
> > > > > > >> > > > > >
> > > > > > >> > > > > > I guess this one is a matter of perspective.
> > > > > > >> > > > > >
> > > > > > >> > > > > > One argument is that if the system supports one
> case,
> > > it's
> > > > > > >> better
> > > > > > >> > > than
> > > > > > >> > > > > none
> > > > > > >> > > > > > because there is one less scenario in which the
> system
> > > > does
> > > > > > the
> > > > > > >> > wrong
> > > > > > >> > > > > > thing.
> > > > > > >> > > > > >
> > > > > > >> > > > > > The counter argument is for uniform and consistent
> > > > behavior,
> > > > > > >> which
> > > > > > >> > is
> > > > > > >> > > > > easy
> > > > > > >> > > > > > for users to understand and properly leverage.
> > > > > > >> > > > > >
> > > > > > >> > > > > > Specifically, I'd argue that the current rule is
> very
> > > > > simple:
> > > > > > >> "you
> > > > > > >> > > > cannot
> > > > > > >> > > > > > repartition inputs on a stateful job, so you must
> > > > > > over-partition
> > > > > > >> > the
> > > > > > >> > > > > > initial implementation". To me, while that rule is
> not
> > > > > ideal,
> > > > > > >> its
> > > > > > >> > > > > > simplicity is better that introducing a new solution
> > > that
> > > > > has
> > > > > > a
> > > > > > >> > bunch
> > > > > > >> > > > of
> > > > > > >> > > > > > caveats, any one of which could be missed. If any
> one
> > of
> > > > the
> > > > > > >> > > > assumptions
> > > > > > >> > > > > in
> > > > > > >> > > > > > this SEP design are violated, the job would behave
> > > > > > incorrectly.
> > > > > > >> > That
> > > > > > >> > > > > puts a
> > > > > > >> > > > > > lot more burden on the users than the simpler rule.
> > > > > > >> > > > > >
> > > > > > >> > > > >
> > > > > > >> > > > > I agree that we have different perspective here. It is
> > > true
> > > > > that
> > > > > > >> user
> > > > > > >> > > > would
> > > > > > >> > > > > mess up their job if they used this feature in a wrong
> > > way,
> > > > > i.e.
> > > > > > >> > > violate
> > > > > > >> > > > > the assumption made in SEP-5. On the other hand, I
> think
> > > > there
> > > > > > is
> > > > > > >> > > always
> > > > > > >> > > > a
> > > > > > >> > > > > way for user to mess up their job if they configure
> the
> > > > Samza
> > > > > > job
> > > > > > >> > > > > incorrectly. I also think the assumption made in this
> > SEP
> > > is
> > > > > not
> > > > > > >> > > > > particularly harder to understand than other existing
> > > > configs
> > > > > in
> > > > > > >> > Samza.
> > > > > > >> > > > >
> > > > > > >> > > > > The answer to this can be subjective. I would love to
> > hear
> > > > > > >> > perspective
> > > > > > >> > > > from
> > > > > > >> > > > > other developers on this issue.
> > > > > > >> > > > >
> > > > > > >> > > > >
> > > > > > >> > > > > >
> > > > > > >> > > > > > That's why I mentioned a few alternatives that,
> while
> > > more
> > > > > > >> complex
> > > > > > >> > to
> > > > > > >> > > > > > implement, would provide a more consistent behavior
> > with
> > > > > > simple
> > > > > > >> > rules
> > > > > > >> > > > for
> > > > > > >> > > > > > the users.
> > > > > > >> > > > > >
> > > > > > >> > > > >
> > > > > > >> > > > > I am open to discuss alternative solutions that can
> > > address
> > > > > the
> > > > > > >> the
> > > > > > >> > > > problem
> > > > > > >> > > > > in a better manner. I am not opposed to complexity as
> > long
> > > > as
> > > > > it
> > > > > > >> > gives
> > > > > > >> > > us
> > > > > > >> > > > > good long term benefits.
> > > > > > >> > > > >
> > > > > > >> > > > > Here are my current concern with the three
> alternatives
> > > you
> > > > > > >> described
> > > > > > >> > > > > earlier:
> > > > > > >> > > > >
> > > > > > >> > > > > - The first alternative requires support from input
> > system
> > > > > which
> > > > > > >> is
> > > > > > >> > > > > currently not available. It will limit the usage of
> > > > partition
> > > > > > >> > expansion
> > > > > > >> > > > to
> > > > > > >> > > > > only systems that support such interface. And it is
> not
> > > > > > guaranteed
> > > > > > >> > that
> > > > > > >> > > > we
> > > > > > >> > > > > can persuade the developer of the input system to add
> > this
> > > > > > >> interface.
> > > > > > >> > > > This
> > > > > > >> > > > > is not desirable for Samza in the long term.
> > > > > > >> > > > >
> > > > > > >> > > > > - I can not comment on the second alternative because
> I
> > > > don't
> > > > > > >> > > understand
> > > > > > >> > > > > how it reshuffles all existing changelog data. We can
> > > > discuss
> > > > > > >> more if
> > > > > > >> > > > there
> > > > > > >> > > > > is more specific detail. My gut feel is that this will
> > be
> > > > > > complex
> > > > > > >> and
> > > > > > >> > > > > carries performance overhead.
> > > > > > >> > > > >
> > > > > > >> > > > > - The third alternative requires performance overhead.
> > > Given
> > > > > > that
> > > > > > >> > user
> > > > > > >> > > > can
> > > > > > >> > > > > already use this solution to enable partition
> expansion,
> > > > maybe
> > > > > > >> Samza
> > > > > > >> > > > > developers can provide more input as to why we are not
> > > doing
> > > > > it
> > > > > > by
> > > > > > >> > > > default.
> > > > > > >> > > > > My gut feel is that it carries considerable
> performance
> > > > > overhead
> > > > > > >> and
> > > > > > >> > > > > increases the cost-to-serve Samze job (e.g. disk
> usage),
> > > > which
> > > > > > may
> > > > > > >> > make
> > > > > > >> > > > it
> > > > > > >> > > > > undesirable in the long term.
> > > > > > >> > > > >
> > > > > > >> > > > >
> > > > > > >> > > > >
> > > > > > >> > > > > >
> > > > > > >> > > > > > Yes, we need a similar check for
> > > > > > GroupBySystemStreamPartitionWi
> > > > > > >> > > > > > > thFixedTaskNum
> > > > > > >> > > > > > > as well. If there is more grouper classes needed
> in
> > > the
> > > > > > >> future,
> > > > > > >> > we
> > > > > > >> > > > can
> > > > > > >> > > > > > > solve this problem cleanly without new config.
> Given
> > > the
> > > > > > >> > > > > > > previousGrouperClass and newGrouperClass,
> > > > > > >> KafkaCheckpointLogKey
> > > > > > >> > > will
> > > > > > >> > > > > > throw
> > > > > > >> > > > > > > exception if and only if newGrouperClass is an
> > > instance
> > > > of
> > > > > > >> > > > > > > previousGrouperClass.
> > > > > > >> > > > > > > GroupBySystemStreamPartitionWithFixedTaskNum
> should
> > > > > extend
> > > > > > >> > > > > > > GroupBySystemStreamPartition
> > > > > > >> > > > > > > and GroupByPartitionWithFixedTaskNum should
> extend
> > > > > > >> > > GroupByPartition.
> > > > > > >> > > > > > Does
> > > > > > >> > > > > > > this address your concern?
> > > > > > >> > > > > >
> > > > > > >> > > > > > Sounds workable, thanks.
> > > > > > >> > > > > >
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > Can
> > > > > > >> > > > > > > you be more specific why Partition-to-task mapping
> > is
> > > > not
> > > > > > >> > > meaningful
> > > > > > >> > > > > > > without
> > > > > > >> > > > > > > some definition of the key-to-partition
> assignments
> > > and
> > > > > why
> > > > > > >> it is
> > > > > > >> > > > > > > incomplete and misleading?
> > > > > > >> > > > > >
> > > > > > >> > > > > >  A partition is (in my naive interpretation) an
> > > > independent
> > > > > > >> queue
> > > > > > >> > for
> > > > > > >> > > > > > messages of a particular key set. It is not the
> > > *identity*
> > > > > of
> > > > > > >> the
> > > > > > >> > > > > partition
> > > > > > >> > > > > > that determine the contents of the associated task's
> > > local
> > > > > > >> state.
> > > > > > >> > > > Rather
> > > > > > >> > > > > it
> > > > > > >> > > > > > is the *contents* of the partition that affect the
> > > task's
> > > > > > >> state. A
> > > > > > >> > > > > > partiton-to-task mapping only captures an identity
> > > > > > relationship:
> > > > > > >> > > > > > partition1->task1. Without the assumptions of this
> > SEP,
> > > > this
> > > > > > is
> > > > > > >> > > > > > insufficient to determine the assignment of keys to
> > > tasks,
> > > > > > >> which is
> > > > > > >> > > > what
> > > > > > >> > > > > > really matters. Therefore, any future feature that
> > > > utilizes
> > > > > > this
> > > > > > >> > > > mapping
> > > > > > >> > > > > > without accounting for the assumptions of this SEP
> is
> > > > likely
> > > > > > to
> > > > > > >> > > > > > malfunction.
> > > > > > >> > > > > >
> > > > > > >> > > > > >
> > > > > > >> > > > > I am not sure it is true that "any future feature that
> > > > > utilizes
> > > > > > >> this
> > > > > > >> > > > > mapping without accounting for the assumptions of this
> > SEP
> > > > is
> > > > > > >> likely
> > > > > > >> > to
> > > > > > >> > > > > malfunction". Suppose we allow user to specify
> > > > > > >> new-to-old-partition
> > > > > > >> > > > > mapping, then we can use the partition-to-task mapping
> > > > > correctly
> > > > > > >> > > without
> > > > > > >> > > > > replying on the assumption in this SEP, right?
> > > > > > >> > > > >
> > > > > > >> > > > >
> > > > > > >> > > > > >
> > > > > > >> > > > > > On Mon, Jun 12, 2017 at 5:09 PM, Dong Lin <
> > > > > > lindon...@gmail.com>
> > > > > > >> > > wrote:
> > > > > > >> > > > > >
> > > > > > >> > > > > > > Hey Jacob,
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > Thanks for the explanation. It seems that your
> > biggest
> > > > > > >> concern is
> > > > > > >> > > > with
> > > > > > >> > > > > > the
> > > > > > >> > > > > > > generality of the proposal. Let me try to address
> > this
> > > > and
> > > > > > >> other
> > > > > > >> > > > > comments
> > > > > > >> > > > > > > below.
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > 1) ... it will cause headaches for Samza users ...
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > I am not sure I understand why this proposal
> causes
> > > > > headache
> > > > > > >> for
> > > > > > >> > > > Samza
> > > > > > >> > > > > > > users. Here is the impact of the SEP-5 on users:
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > - For users that do not need partition expansion
> of
> > > the
> > > > > > input
> > > > > > >> > > stream,
> > > > > > >> > > > > > they
> > > > > > >> > > > > > > can use Samza without change change in
> > > > code/binary/config.
> > > > > > >> Thus
> > > > > > >> > > there
> > > > > > >> > > > > is
> > > > > > >> > > > > > no
> > > > > > >> > > > > > > headache for them.
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > - For users that need partition expansion of the
> > input
> > > > > > streams
> > > > > > >> > for
> > > > > > >> > > > > > > stateless job, they currently need to manually
> > reboot
> > > > > their
> > > > > > >> Samza
> > > > > > >> > > job
> > > > > > >> > > > > in
> > > > > > >> > > > > > > order to let Samza consume the new partitions
> > created
> > > > for
> > > > > > the
> > > > > > >> > > stream.
> > > > > > >> > > > > > SEP-5
> > > > > > >> > > > > > > actually reduced their headache by allowing Samza
> to
> > > > > > >> > automatically
> > > > > > >> > > > > detect
> > > > > > >> > > > > > > and consume new partitions.
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > - For users that need partition expansion of the
> > input
> > > > > > streams
> > > > > > >> > for
> > > > > > >> > > > > > stateful
> > > > > > >> > > > > > > job, they have a really big headache in the sense
> > that
> > > > > Samza
> > > > > > >> does
> > > > > > >> > > not
> > > > > > >> > > > > > allow
> > > > > > >> > > > > > > partition expansion for stateful job. SEP-5
> > addresses
> > > > this
> > > > > > >> > headache
> > > > > > >> > > > for
> > > > > > >> > > > > > > them.
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > You are right that SEP-5 requires user to
> understand
> > > and
> > > > > > >> enforce
> > > > > > >> > > > > > > limitations across organizations. But it is still
> > much
> > > > > > better
> > > > > > >> > than
> > > > > > >> > > > not
> > > > > > >> > > > > > > allowing user to expansion partition for stateful
> > jobs
> > > > at
> > > > > > all,
> > > > > > >> > > right?
> > > > > > >> > > > > > Did I
> > > > > > >> > > > > > > miss something here?
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > 2) ... Separate orgs are often difficult to
> > coordinate
> > > > > and a
> > > > > > >> > system
> > > > > > >> > > > > which
> > > > > > >> > > > > > > depends on such significant process/coordination
> is
> > > too
> > > > > > >> fragile
> > > > > > >> > for
> > > > > > >> > > > my
> > > > > > >> > > > > > > taste ..
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > This is true. Ideally we want a system that is
> fully
> > > > > > >> > self-serving.
> > > > > > >> > > I
> > > > > > >> > > > > > think
> > > > > > >> > > > > > > this is a long term goal for Samza. Still, for the
> > > > reasons
> > > > > > >> > > described
> > > > > > >> > > > > > above,
> > > > > > >> > > > > > > I think something is better than nothing. I am
> open
> > to
> > > > > > >> > alternative
> > > > > > >> > > > > design
> > > > > > >> > > > > > > that can support partition expansion for stateful
> > jobs
> > > > > > without
> > > > > > >> > > > > requiring
> > > > > > >> > > > > > > coordination.
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > 3) There is currently no supported way of sharing
> > > state
> > > > > > among
> > > > > > >> the
> > > > > > >> > > > tasks
> > > > > > >> > > > > > of
> > > > > > >> > > > > > > a container.  Each task has its own isolated store
> > and
> > > > > that
> > > > > > >> > logical
> > > > > > >> > > > > > > isolation is the primary thing that enables Samza
> > jobs
> > > > to
> > > > > > >> scale
> > > > > > >> > > with
> > > > > > >> > > > a
> > > > > > >> > > > > > > simple container count change. My feeling is that
> we
> > > > > should
> > > > > > >> not
> > > > > > >> > > > change
> > > > > > >> > > > > > > this without
> > > > > > >> > > > > > > good reason.
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > I see your point. I will remove this sentence from
> > the
> > > > > > >> motivation
> > > > > > >> > > > > > section.
> > > > > > >> > > > > > > This won't have any impact on the design of the
> > SEP-5.
> > > > > Does
> > > > > > >> this
> > > > > > >> > > > > address
> > > > > > >> > > > > > > the problem?
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > 4) With the current proposal, we'd also need a
> > similar
> > > > > check
> > > > > > >> for
> > > > > > >> > > > > > > GroupBySystemStreamPartitionWithFixedTaskNum as
> > well.
> > > > And
> > > > > > if
> > > > > > >> any
> > > > > > >> > > > other
> > > > > > >> > > > > > > groupers
> > > > > > >> > > > > > > were later added with both these modes, we'd
> > probably
> > > > need
> > > > > > to
> > > > > > >> add
> > > > > > >> > > > those
> > > > > > >> > > > > > > too. It might be easier and cleaner to add a
> config
> > to
> > > > > > ignore
> > > > > > >> > that
> > > > > > >> > > > > check
> > > > > > >> > > > > > > temporarily. Down side is that it further
> > complicates
> > > > the
> > > > > > >> Samza
> > > > > > >> > > > config,
> > > > > > >> > > > > > > which is already huge. Thoughts?
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > Yes, we need a similar check for
> > > > > > >> GroupBySystemStreamPartitionWi
> > > > > > >> > > > > > > thFixedTaskNum
> > > > > > >> > > > > > > as well. If there is more grouper classes needed
> in
> > > the
> > > > > > >> future,
> > > > > > >> > we
> > > > > > >> > > > can
> > > > > > >> > > > > > > solve this problem cleanly without new config.
> Given
> > > the
> > > > > > >> > > > > > > previousGrouperClass and newGrouperClass,
> > > > > > >> KafkaCheckpointLogKey
> > > > > > >> > > will
> > > > > > >> > > > > > throw
> > > > > > >> > > > > > > exception if and only if newGrouperClass is an
> > > instance
> > > > of
> > > > > > >> > > > > > > previousGrouperClass.
> > > > > > >> > > > > > > GroupBySystemStreamPartitionWithFixedTaskNum
> should
> > > > > extend
> > > > > > >> > > > > > > GroupBySystemStreamPartition
> > > > > > >> > > > > > > and GroupByPartitionWithFixedTaskNum should
> extend
> > > > > > >> > > GroupByPartition.
> > > > > > >> > > > > > Does
> > > > > > >> > > > > > > this address your concern?
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > 5) The task-to-container and container-to-host
> > > mappings
> > > > > are
> > > > > > >> both
> > > > > > >> > > > > > meaningful
> > > > > > >> > > > > > > in context of the JobModel. Partition-to-task
> > mapping
> > > is
> > > > > not
> > > > > > >> > > > meaningful
> > > > > > >> > > > > > > without
> > > > > > >> > > > > > > some definition of the key-to-partition
> assignments.
> > > > It's
> > > > > > >> > > incomplete
> > > > > > >> > > > > > > information and therefore misleading. I think it
> > only
> > > > > makes
> > > > > > >> sense
> > > > > > >> > > to
> > > > > > >> > > > > use
> > > > > > >> > > > > > > this mapping if we adopt a solution wherein Samza
> > also
> > > > > knows
> > > > > > >> the
> > > > > > >> > > > > > partition
> > > > > > >> > > > > > > key assignment.
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > Partition-to-task is currently explicitly passed
> > from
> > > > job
> > > > > > >> > > coordinator
> > > > > > >> > > > > to
> > > > > > >> > > > > > > each task as part of the job model to tell tasks
> > which
> > > > > > >> partitions
> > > > > > >> > > to
> > > > > > >> > > > > > > consume from. I think we can store some definition
> > of
> > > > the
> > > > > > >> > > > > > key-to-partition
> > > > > > >> > > > > > > assignments if Samza decides to get and use this
> > > > > information
> > > > > > >> in
> > > > > > >> > the
> > > > > > >> > > > > > > future. Can
> > > > > > >> > > > > > > you be more specific why Partition-to-task mapping
> > is
> > > > not
> > > > > > >> > > meaningful
> > > > > > >> > > > > > > without
> > > > > > >> > > > > > > some definition of the key-to-partition
> assignments
> > > and
> > > > > why
> > > > > > >> it is
> > > > > > >> > > > > > > incomplete and misleading?
> > > > > > >> > > > > > >
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > Thanks,
> > > > > > >> > > > > > > Dong
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > On Mon, Jun 12, 2017 at 3:54 PM, Jacob Maes <
> > > > > > >> > jacob.m...@gmail.com>
> > > > > > >> > > > > > wrote:
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > > Hey Dong,
> > > > > > >> > > > > > > >
> > > > > > >> > > > > > > > I'm opposed (or a +0, at best) to this limited,
> > > > > > >> Kafka-specific
> > > > > > >> > > > > > solution.
> > > > > > >> > > > > > > I
> > > > > > >> > > > > > > > understand that the proposal is relatively
> simple
> > to
> > > > > > >> implement,
> > > > > > >> > > > but I
> > > > > > >> > > > > > > think
> > > > > > >> > > > > > > > it will cause headaches for Samza users. They
> will
> > > not
> > > > > > only
> > > > > > >> > have
> > > > > > >> > > to
> > > > > > >> > > > > > > > understand all the limitations (increase only,
> > > double
> > > > > > >> > partitions
> > > > > > >> > > > > only,
> > > > > > >> > > > > > > > partition using hash+modulo, etc) of this
> > approach,
> > > > but
> > > > > > >> > enforcing
> > > > > > >> > > > > these
> > > > > > >> > > > > > > > limitations can be a major problem, especially
> > when
> > > > the
> > > > > > >> Samza
> > > > > > >> > > jobs
> > > > > > >> > > > > and
> > > > > > >> > > > > > > > message brokers are managed by separate orgs in
> a
> > > > > company.
> > > > > > >> > > Separate
> > > > > > >> > > > > > orgs
> > > > > > >> > > > > > > > are often difficult to coordinate and a system
> > which
> > > > > > >> depends on
> > > > > > >> > > > such
> > > > > > >> > > > > > > > significant process/coordination is too fragile
> > for
> > > my
> > > > > > >> taste.
> > > > > > >> > > > > > > >
> > > > > > >> > > > > > > > That said, I realize that my opinion is just one
> > of
> > > > many
> > > > > > in
> > > > > > >> the
> > > > > > >> > > > > broader
> > > > > > >> > > > > > > > community which may feel differently, so let me
> > > > respond
> > > > > to
> > > > > > >> some
> > > > > > >> > > of
> > > > > > >> > > > > the
> > > > > > >> > > > > > > > other items in the discussion so we can clear
> them
> > > up:
> > > > > > >> > > > > > > >
> > > > > > >> > > > > > > > The task-to-container assignment matters because
> > if
> > > > the
> > > > > > >> > > correlated
> > > > > > >> > > > > > tasks
> > > > > > >> > > > > > > > > (i.e. tasks that consume messages with the
> same
> > > key)
> > > > > > >> needs to
> > > > > > >> > > be
> > > > > > >> > > > in
> > > > > > >> > > > > > the
> > > > > > >> > > > > > > > > same container so that they can share the same
> > > > > key/value
> > > > > > >> > local
> > > > > > >> > > > > store
> > > > > > >> > > > > > on
> > > > > > >> > > > > > > > the
> > > > > > >> > > > > > > > > same physical machine.
> > > > > > >> > > > > > > >
> > > > > > >> > > > > > > > There is currently no supported way of sharing
> > state
> > > > > among
> > > > > > >> the
> > > > > > >> > > > tasks
> > > > > > >> > > > > > of a
> > > > > > >> > > > > > > > container.  Each task has its own isolated store
> > and
> > > > > that
> > > > > > >> > logical
> > > > > > >> > > > > > > isolation
> > > > > > >> > > > > > > > is the primary thing that enables Samza jobs to
> > > scale
> > > > > > with a
> > > > > > >> > > simple
> > > > > > >> > > > > > > > container count change. My feeling is that we
> > should
> > > > not
> > > > > > >> change
> > > > > > >> > > > this
> > > > > > >> > > > > > > > without good reason.
> > > > > > >> > > > > > > >
> > > > > > >> > > > > > > > I think we can hardcode new logic in
> > > > > > >> > KafkaCheckpointLogKey.scala
> > > > > > >> > > > such
> > > > > > >> > > > > > > that
> > > > > > >> > > > > > > > > exception will not be thrown if new grouper is
> > > > > > >> > > > > > > > > GroupByPartitionWithFixedTaskNum and old
> > grouper
> > > is
> > > > > > >> > > > > > GroupByPartition.
> > > > > > >> > > > > > > > Does
> > > > > > >> > > > > > > > > this look reasonable?
> > > > > > >> > > > > > > >
> > > > > > >> > > > > > > > With the current proposal, we'd also need a
> > similar
> > > > > check
> > > > > > >> for
> > > > > > >> > > > > > > > GroupBySystemStreamPartitionWithFixedTaskNum as
> > > well.
> > > > > And
> > > > > > >> if
> > > > > > >> > any
> > > > > > >> > > > > other
> > > > > > >> > > > > > > > groupers were later added with both these modes,
> > > we'd
> > > > > > >> probably
> > > > > > >> > > need
> > > > > > >> > > > > to
> > > > > > >> > > > > > > add
> > > > > > >> > > > > > > > those too. It might be easier and cleaner to
> add a
> > > > > config
> > > > > > to
> > > > > > >> > > ignore
> > > > > > >> > > > > > that
> > > > > > >> > > > > > > > check temporarily. Down side is that it further
> > > > > > complicates
> > > > > > >> the
> > > > > > >> > > > Samza
> > > > > > >> > > > > > > > config, which is already huge. Thoughts?
> > > > > > >> > > > > > > >
> > > > > > >> > > > > > > > I think storing the previous task-to-partition
> > > mapping
> > > > > is
> > > > > > >> more
> > > > > > >> > > > > general
> > > > > > >> > > > > > > than
> > > > > > >> > > > > > > > > storing the partition count of all topics for
> > the
> > > > > > >> following
> > > > > > >> > > > > reasons:
> > > > > > >> > > > > > > > > - Samza already stores the task-to-container
> > > mapping
> > > > > and
> > > > > > >> > > > > > > > container-to-host
> > > > > > >> > > > > > > > > mapping in the coordinator stream. It seems
> > > > consistent
> > > > > > to
> > > > > > >> > also
> > > > > > >> > > > > store
> > > > > > >> > > > > > > the
> > > > > > >> > > > > > > > > partition-to-task mapping. And this
> information
> > > may
> > > > be
> > > > > > >> useful
> > > > > > >> > > for
> > > > > > >> > > > > > other
> > > > > > >> > > > > > > > > use-case such as debugging.
> > > > > > >> > > > > > > > > - By having the new interface take the
> previous
> > > > > > >> > > task-to-partition
> > > > > > >> > > > > > > > > assignment instead of a
> topic-to-partition-count
> > > > > mapping
> > > > > > >> as
> > > > > > >> > new
> > > > > > >> > > > > > > > parameter,
> > > > > > >> > > > > > > > > we can potentially have grouper implementation
> > to
> > > > > > support
> > > > > > >> > other
> > > > > > >> > > > > types
> > > > > > >> > > > > > > of
> > > > > > >> > > > > > > > > input systems.
> > > > > > >> > > > > > > > > - It is sightly simpler to store the
> > > > task-to-partition
> > > > > > >> > > assignment
> > > > > > >> > > > > > > because
> > > > > > >> > > > > > > > > we don't need to know whether this is the
> first
> > > > time a
> > > > > > >> job is
> > > > > > >> > > > > started
> > > > > > >> > > > > > > or
> > > > > > >> > > > > > > > > not. On the other hand, you can write
> > > > > > >> > topic-to-partition-count
> > > > > > >> > > > > > mapping
> > > > > > >> > > > > > > to
> > > > > > >> > > > > > > > > the coordinator stream only if this is the
> first
> > > > time
> > > > > > the
> > > > > > >> job
> > > > > > >> > > is
> > > > > > >> > > > > run
> > > > > > >> > > > > > > >
> > > > > > >> > > > > > > > The task-to-container and container-to-host
> > mappings
> > > > are
> > > > > > >> both
> > > > > > >> > > > > > meaningful
> > > > > > >> > > > > > > in
> > > > > > >> > > > > > > > context of the JobModel. Partition-to-task
> mapping
> > > is
> > > > > not
> > > > > > >> > > > meaningful
> > > > > > >> > > > > > > > without some definition of the key-to-partition
> > > > > > assignments.
> > > > > > >> > It's
> > > > > > >> > > > > > > > incomplete information and therefore
> misleading. I
> > > > think
> > > > > > it
> > > > > > >> > only
> > > > > > >> > > > > makes
> > > > > > >> > > > > > > > sense to use this mapping if we adopt a solution
> > > > wherein
> > > > > > >> Samza
> > > > > > >> > > also
> > > > > > >> > > > > > knows
> > > > > > >> > > > > > > > the partition key assignment.
> > > > > > >> > > > > > > >
> > > > > > >> > > > > > > > -Jake
> > > > > > >> > > > > > > >
> > > > > > >> > > > > > > > On Tue, Jun 6, 2017 at 11:06 PM, Dong Lin <
> > > > > > >> lindon...@gmail.com
> > > > > > >> > >
> > > > > > >> > > > > wrote:
> > > > > > >> > > > > > > >
> > > > > > >> > > > > > > > > Hey Jacob,
> > > > > > >> > > > > > > > >
> > > > > > >> > > > > > > > > Thanks for taking time to review the SEP.
> > > > > > >> > > > > > > > >
> > > > > > >> > > > > > > > > I agree with you and Navina that the current
> SEP
> > > > > doesn't
> > > > > > >> > > provide
> > > > > > >> > > > > > > support
> > > > > > >> > > > > > > > to
> > > > > > >> > > > > > > > > arbitrary input systems and it doesn't support
> > > > > partition
> > > > > > >> > > shrink.
> > > > > > >> > > > I
> > > > > > >> > > > > > > think
> > > > > > >> > > > > > > > > the scope of this SEP is to support partition
> > > > > expansion
> > > > > > >> for
> > > > > > >> > > Kafka
> > > > > > >> > > > > > (the
> > > > > > >> > > > > > > > most
> > > > > > >> > > > > > > > > widely used input system of Samza) and keep
> the
> > > door
> > > > > > open
> > > > > > >> for
> > > > > > >> > > > > > partition
> > > > > > >> > > > > > > > > support of various input systems. The current
> > > design
> > > > > can
> > > > > > >> > > support
> > > > > > >> > > > > any
> > > > > > >> > > > > > > > system
> > > > > > >> > > > > > > > > that meets the two operational requirement
> > > specified
> > > > > in
> > > > > > >> the
> > > > > > >> > > doc.
> > > > > > >> > > > > > > > >
> > > > > > >> > > > > > > > > While it is possible to support more types of
> > > input
> > > > > > >> systems,
> > > > > > >> > it
> > > > > > >> > > > > will
> > > > > > >> > > > > > > > likely
> > > > > > >> > > > > > > > > add more complexity to the design. For
> example,
> > > the
> > > > > > first
> > > > > > >> > > > > alternative
> > > > > > >> > > > > > > > > solution from you requires broker-side support
> > to
> > > > > > >> negotiate
> > > > > > >> > > hash
> > > > > > >> > > > > > > > algorithm.
> > > > > > >> > > > > > > > > The second alternative solution requires
> > changelog
> > > > > > >> partition
> > > > > > >> > > > > > reshuffle
> > > > > > >> > > > > > > > > which carries its own design complexity and
> > > > > performance
> > > > > > >> > > overhead.
> > > > > > >> > > > > > There
> > > > > > >> > > > > > > > is
> > > > > > >> > > > > > > > > tradeoff between the generality and the
> > complexity
> > > > > among
> > > > > > >> > these
> > > > > > >> > > > > > > choices. I
> > > > > > >> > > > > > > > > like the current design because it is simple
> and
> > > > > > >> addresses a
> > > > > > >> > > big
> > > > > > >> > > > > > usage
> > > > > > >> > > > > > > > > scenario for us. We can add more complexity to
> > > > > > generalize
> > > > > > >> the
> > > > > > >> > > > > design
> > > > > > >> > > > > > if
> > > > > > >> > > > > > > > it
> > > > > > >> > > > > > > > > enables important use-case. Does this sound
> > > > > reasonable?
> > > > > > >> > > > > > > > >
> > > > > > >> > > > > > > > > Note that the "Rejected Alternative" section
> > also
> > > > > > mentions
> > > > > > >> > the
> > > > > > >> > > > > > > > possibility
> > > > > > >> > > > > > > > > of supporting a wider range of input systems
> by
> > > > > allowing
> > > > > > >> user
> > > > > > >> > > to
> > > > > > >> > > > > > > specify
> > > > > > >> > > > > > > > > the new-partition to old-partition mapping. We
> > are
> > > > not
> > > > > > >> doing
> > > > > > >> > it
> > > > > > >> > > > > > because
> > > > > > >> > > > > > > > 1)
> > > > > > >> > > > > > > > > we may have better understanding of the design
> > > after
> > > > > we
> > > > > > >> have
> > > > > > >> > a
> > > > > > >> > > > > > specific
> > > > > > >> > > > > > > > > second input system to support 2) the current
> > > design
> > > > > can
> > > > > > >> be
> > > > > > >> > > > > extended
> > > > > > >> > > > > > to
> > > > > > >> > > > > > > > > support general input systems. I think similar
> > > > > argument
> > > > > > >> can
> > > > > > >> > be
> > > > > > >> > > > > > applied
> > > > > > >> > > > > > > > > explain why we don't have to support general
> > input
> > > > > > systems
> > > > > > >> > > using
> > > > > > >> > > > > the
> > > > > > >> > > > > > > > > potentially-good alternatives you mentioned.
> > > > > > >> > > > > > > > >
> > > > > > >> > > > > > > > > I hope SEP-5 can be an important first-step
> > > towards
> > > > > > >> > supporting
> > > > > > >> > > > > > > partition
> > > > > > >> > > > > > > > > expansion of any input system.
> > > > > > >> > > > > > > > >
> > > > > > >> > > > > > > > > To answer your questions about the current
> > > proposal:
> > > > > > >> > > > > > > > >
> > > > > > >> > > > > > > > > >1. "An alternative solution is to allow task
> > > number
> > > > > to
> > > > > > >> > > increase
> > > > > > >> > > > > > after
> > > > > > >> > > > > > > > > >partition expansion and uses a proper
> > > > > task-to-container
> > > > > > >> > > > assignment
> > > > > > >> > > > > > to
> > > > > > >> > > > > > > > make
> > > > > > >> > > > > > > > > >sure the Samza output is correct." What does
> > the
> > > > > > >> container
> > > > > > >> > > have
> > > > > > >> > > > to
> > > > > > >> > > > > > do
> > > > > > >> > > > > > > > with
> > > > > > >> > > > > > > > > >stateful processing or output in general?
> > > > > > >> > > > > > > > >
> > > > > > >> > > > > > > > > The task-to-container assignment matters
> because
> > > if
> > > > > the
> > > > > > >> > > > correlated
> > > > > > >> > > > > > > tasks
> > > > > > >> > > > > > > > > (i.e. tasks that consume messages with the
> same
> > > key)
> > > > > > >> needs to
> > > > > > >> > > be
> > > > > > >> > > > in
> > > > > > >> > > > > > the
> > > > > > >> > > > > > > > > same container so that they can share the same
> > > > > key/value
> > > > > > >> > local
> > > > > > >> > > > > store
> > > > > > >> > > > > > on
> > > > > > >> > > > > > > > the
> > > > > > >> > > > > > > > > same physical machine.
> > > > > > >> > > > > > > > >
> > > > > > >> > > > > > > > > >2. When you use "Join" as an example, you
> > > basically
> > > > > > mean
> > > > > > >> > > > multiple
> > > > > > >> > > > > > > > > >co-partitioned streams, right? This is
> opposed
> > to
> > > > > > >> multiple,
> > > > > > >> > > > > > > > > >independently-partitioned streams or a single
> > > > stream.
> > > > > > >> Would
> > > > > > >> > be
> > > > > > >> > > > > nice
> > > > > > >> > > > > > to
> > > > > > >> > > > > > > > > >formulate the proposal in these more general
> > > terms.
> > > > > > >> > > > > > > > >
> > > > > > >> > > > > > > > > I thought "join" is a commonly used to refer
> to
> > > the
> > > > > join
> > > > > > >> > > > opeartion
> > > > > > >> > > > > > with
> > > > > > >> > > > > > > > > co-partitioned stream but I may be wrong. I
> have
> > > > > updated
> > > > > > >> the
> > > > > > >> > > wiki
> > > > > > >> > > > > to
> > > > > > >> > > > > > > > > explicitly mention "co-partitioned stream".
> Does
> > > > this
> > > > > > look
> > > > > > >> > > better
> > > > > > >> > > > > > now?
> > > > > > >> > > > > > > > >
> > > > > > >> > > > > > > > > >3. When switching SSP groupers, how will the
> > > users
> > > > > > avoid
> > > > > > >> the
> > > > > > >> > > > > > > > > >org.apache.samza.checkpoint.kafka.
> > > > > > >> > > > DifferingSystemStreamPartition
> > > > > > >> > > > > > > > > GrouperFactoryValues
> > > > > > >> > > > > > > > > >exception?
> > > > > > >> > > > > > > > >
> > > > > > >> > > > > > > > > I think we can hardcode new logic in
> > > > > > >> > > KafkaCheckpointLogKey.scala
> > > > > > >> > > > > such
> > > > > > >> > > > > > > > that
> > > > > > >> > > > > > > > > exception will not be thrown if new grouper is
> > > > > > >> > > > > > > > > GroupByPartitionWithFixedTaskNum and old
> > grouper
> > > is
> > > > > > >> > > > > > GroupByPartition.
> > > > > > >> > > > > > > > Does
> > > > > > >> > > > > > > > > this look reasonable?
> > > > > > >> > > > > > > > >
> > > > > > >> > > > > > > > > >4. Partition to task assignment is
> meaningless
> > > > > without
> > > > > > >> key
> > > > > > >> > to
> > > > > > >> > > > > > > partition
> > > > > > >> > > > > > > > > >mapping. The real semantics are captured in
> the
> > > > > > external
> > > > > > >> > > > > requirement
> > > > > > >> > > > > > > for
> > > > > > >> > > > > > > > > >partitioning via hash+modulo. But in that
> case,
> > > > iiuc,
> > > > > > >> only
> > > > > > >> > the
> > > > > > >> > > > > > > partition
> > > > > > >> > > > > > > > > >count matters. So why not just store the
> > original
> > > > > > >> partition
> > > > > > >> > > > count
> > > > > > >> > > > > > > rather
> > > > > > >> > > > > > > > > >than the whole mapping?
> > > > > > >> > > > > > > > >
> > > > > > >> > > > > > > > > I think storing the previous task-to-partition
> > > > mapping
> > > > > > is
> > > > > > >> > more
> > > > > > >> > > > > > general
> > > > > > >> > > > > > > > than
> > > > > > >> > > > > > > > > storing the partition count of all topics for
> > the
> > > > > > >> following
> > > > > > >> > > > > reasons:
> > > > > > >> > > > > > > > >
> > > > > > >> > > > > > > > > - Samza already stores the task-to-container
> > > mapping
> > > > > and
> > > > > > >> > > > > > > > container-to-host
> > > > > > >> > > > > > > > > mapping in the coordinator stream. It seems
> > > > consistent
> > > > > > to
> > > > > > >> > also
> > > > > > >> > > > > store
> > > > > > >> > > > > > > the
> > > > > > >> > > > > > > > > partition-to-task mapping. And this
> information
> > > may
> > > > be
> > > > > > >> useful
> > > > > > >> > > for
> > > > > > >> > > > > > other
> > > > > > >> > > > > > > > > use-case such as debugging.
> > > > > > >> > > > > > > > >
> > > > > > >> > > > > > > > > - By having the new interface take the
> previous
> > > > > > >> > > task-to-partition
> > > > > > >> > > > > > > > > assignment instead of a
> topic-to-partition-count
> > > > > mapping
> > > > > > >> as
> > > > > > >> > new
> > > > > > >> > > > > > > > parameter,
> > > > > > >> > > > > > > > > we can potentially have grouper implementation
> > to
> > > > > > support
> > > > > > >> > other
> > > > > > >> > > > > types
> > > > > > >> > > > > > > of
> > > > > > >> > > > > > > > > input systems.
> > > > > > >> > > > > > > > >
> > > > > > >> > > > > > > > > - It is sightly simpler to store the
> > > > task-to-partition
> > > > > > >> > > assignment
> > > > > > >> > > > > > > because
> > > > > > >> > > > > > > > > we don't need to know whether this is the
> first
> > > > time a
> > > > > > >> job is
> > > > > > >> > > > > started
> > > > > > >> > > > > > > or
> > > > > > >> > > > > > > > > not. On the other hand, you can write
> > > > > > >> > topic-to-partition-count
> > > > > > >> > > > > > mapping
> > > > > > >> > > > > > > to
> > > > > > >> > > > > > > > > the coordinator stream only if this is the
> first
> > > > time
> > > > > > the
> > > > > > >> job
> > > > > > >> > > is
> > > > > > >> > > > > run
> > > > > > >> > > > > > > > >
> > > > > > >> > > > > > > > > Thanks,
> > > > > > >> > > > > > > > > Dong
> > > > > > >> > > > > > > > >
> > > > > > >> > > > > > > > > On Mon, Jun 5, 2017 at 11:17 AM, Jacob Maes <
> > > > > > >> > > > jacob.m...@gmail.com>
> > > > > > >> > > > > > > > wrote:
> > > > > > >> > > > > > > > >
> > > > > > >> > > > > > > > > > Hey Dong,
> > > > > > >> > > > > > > > > >
> > > > > > >> > > > > > > > > > Thanks for the SEP. Supporting partition
> > changes
> > > > is
> > > > > > >> > > critically
> > > > > > >> > > > > > > > important
> > > > > > >> > > > > > > > > > for stateful Samza jobs, so it's great to
> see
> > > some
> > > > > > >> ideas on
> > > > > > >> > > > that
> > > > > > >> > > > > > > front!
> > > > > > >> > > > > > > > > >
> > > > > > >> > > > > > > > > > Sorry for the late feedback, but I have a
> few
> > > > > thoughts
> > > > > > >> to
> > > > > > >> > > > > > contribute.
> > > > > > >> > > > > > > > > >
> > > > > > >> > > > > > > > > > Big +1 on Navina's comment:
> > > > > > >> > > > > > > > > >
> > > > > > >> > > > > > > > > > > My biggest gripe with this SEP is that it
> > > seems
> > > > > > like a
> > > > > > >> > > > > > tailor-made
> > > > > > >> > > > > > > > > > > solution
> > > > > > >> > > > > > > > > > > that relies on the semantics of the Kafka
> > > system
> > > > > and
> > > > > > >> yet,
> > > > > > >> > > we
> > > > > > >> > > > > are
> > > > > > >> > > > > > > > trying
> > > > > > >> > > > > > > > > > to
> > > > > > >> > > > > > > > > > > masquerade that as operational
> requirements
> > > for
> > > > > > other
> > > > > > >> > > systems
> > > > > > >> > > > > > > > > interacting
> > > > > > >> > > > > > > > > > > with Samza. (Not to say that this is the
> > first
> > > > > time
> > > > > > >> such
> > > > > > >> > a
> > > > > > >> > > > > choice
> > > > > > >> > > > > > > is
> > > > > > >> > > > > > > > > > being
> > > > > > >> > > > > > > > > > > made in the Samza design). I am not seeing
> > how
> > > > > this
> > > > > > >> can a
> > > > > > >> > > > > > "general"
> > > > > > >> > > > > > > > > > > solution for all input systems. That's my
> > two
> > > > > > cents. I
> > > > > > >> > > would
> > > > > > >> > > > > like
> > > > > > >> > > > > > > to
> > > > > > >> > > > > > > > > hear
> > > > > > >> > > > > > > > > > > alternative points of view for this from
> > other
> > > > > devs.
> > > > > > >> > > > > > > > > >
> > > > > > >> > > > > > > > > >
> > > > > > >> > > > > > > > > > Two examples of this:
> > > > > > >> > > > > > > > > > 1. This is mostly a hypothetical, but some
> > > message
> > > > > > >> brokers
> > > > > > >> > > may
> > > > > > >> > > > > use
> > > > > > >> > > > > > > key
> > > > > > >> > > > > > > > > > range assignment rather than hash+modulo.
> > > > > > >> > > > > > > > > > 2. Kafka can't reduce the number of
> > partitions,
> > > > but
> > > > > it
> > > > > > >> can
> > > > > > >> > > > happen
> > > > > > >> > > > > > on
> > > > > > >> > > > > > > > > other
> > > > > > >> > > > > > > > > > systems. For example, it may be cheaper to
> > > reduce
> > > > > the
> > > > > > >> > number
> > > > > > >> > > of
> > > > > > >> > > > > > > > > partitions
> > > > > > >> > > > > > > > > > on a hosted service where the cost model
> > depends
> > > > on
> > > > > > the
> > > > > > >> > > number
> > > > > > >> > > > of
> > > > > > >> > > > > > > > > > partitions/shards.
> > > > > > >> > > > > > > > > >
> > > > > > >> > > > > > > > > > It seems to me that a solution which doesn't
> > > > depend
> > > > > on
> > > > > > >> > > > partition
> > > > > > >> > > > > > key
> > > > > > >> > > > > > > > > > assignment in the message broker. Here are a
> > few
> > > > > > >> > alternatives
> > > > > > >> > > > > that
> > > > > > >> > > > > > > > > weren't
> > > > > > >> > > > > > > > > > discussed and I think should be considered:
> > > > > > >> > > > > > > > > >
> > > > > > >> > > > > > > > > > Alternatives in order of increasing
> > preference:
> > > > > > >> > > > > > > > > > 1. Samza manages the partition hash (via
> some
> > > new
> > > > > > >> contract
> > > > > > >> > > with
> > > > > > >> > > > > the
> > > > > > >> > > > > > > > > > brokers) and guarantees correct routing of
> > keys
> > > > > among
> > > > > > >> the
> > > > > > >> > new
> > > > > > >> > > > > > > > partitions.
> > > > > > >> > > > > > > > > > 2. Samza detects a task count change,
> creates
> > a
> > > > new
> > > > > > >> > changelog
> > > > > > >> > > > > with
> > > > > > >> > > > > > > > > correct
> > > > > > >> > > > > > > > > > partitions, and *somehow* reshuffles all
> > > existing
> > > > > > >> changelog
> > > > > > >> > > > data
> > > > > > >> > > > > > into
> > > > > > >> > > > > > > > the
> > > > > > >> > > > > > > > > > new topic and then uses the new topic from
> > then
> > > > on.
> > > > > > >> > (doesn't
> > > > > > >> > > > work
> > > > > > >> > > > > > > > without
> > > > > > >> > > > > > > > > > changelog, but in that case durability isn't
> > > > > > paramount,
> > > > > > >> so
> > > > > > >> > we
> > > > > > >> > > > can
> > > > > > >> > > > > > > just
> > > > > > >> > > > > > > > > > wipe)
> > > > > > >> > > > > > > > > > 3. Use RPC in between stages and samza fully
> > > > manages
> > > > > > key
> > > > > > >> > > > > assignment
> > > > > > >> > > > > > > > among
> > > > > > >> > > > > > > > > > tasks. No on-disk topic data to clean up.
> > > > Mandatory
> > > > > > >> > > > > repartitioning
> > > > > > >> > > > > > in
> > > > > > >> > > > > > > > the
> > > > > > >> > > > > > > > > > first stage to pre-scaled tasks in next
> stage.
> > > > > > >> > > > > > > > > > 4. Combined 2-3 solution
> > > > > > >> > > > > > > > > >
> > > > > > >> > > > > > > > > > Finally, some questions about the current
> > > > proposal:
> > > > > > >> > > > > > > > > > 1. "An alternative solution is to allow task
> > > > number
> > > > > to
> > > > > > >> > > increase
> > > > > > >> > > > > > after
> > > > > > >> > > > > > > > > > partition expansion and uses a proper
> > > > > > task-to-container
> > > > > > >> > > > > assignment
> > > > > > >> > > > > > to
> > > > > > >> > > > > > > > > make
> > > > > > >> > > > > > > > > > sure the Samza output is correct." What does
> > the
> > > > > > >> container
> > > > > > >> > > have
> > > > > > >> > > > > to
> > > > > > >> > > > > > do
> > > > > > >> > > > > > > > > with
> > > > > > >> > > > > > > > > > stateful processing or output in general?
> > > > > > >> > > > > > > > > > 2. When you use "Join" as an example, you
> > > > basically
> > > > > > mean
> > > > > > >> > > > multiple
> > > > > > >> > > > > > > > > > co-partitioned streams, right? This is
> opposed
> > > to
> > > > > > >> multiple,
> > > > > > >> > > > > > > > > > independently-partitioned streams or a
> single
> > > > > stream.
> > > > > > >> Would
> > > > > > >> > > be
> > > > > > >> > > > > nice
> > > > > > >> > > > > > > to
> > > > > > >> > > > > > > > > > formulate the proposal in these more general
> > > > terms.
> > > > > > >> > > > > > > > > > 3. When switching SSP groupers, how will the
> > > users
> > > > > > avoid
> > > > > > >> > the
> > > > > > >> > > > > > > > > > org.apache.samza.checkpoint.kafka.
> > > > > > >> > DifferingSystemStreamParti
> > > > > > >> > > > > > > > > > tionGrouperFactoryValues
> > > > > > >> > > > > > > > > > exception?
> > > > > > >> > > > > > > > > > 4. Partition to task assignment is
> meaningless
> > > > > without
> > > > > > >> key
> > > > > > >> > to
> > > > > > >> > > > > > > partition
> > > > > > >> > > > > > > > > > mapping. The real semantics are captured in
> > the
> > > > > > external
> > > > > > >> > > > > > requirement
> > > > > > >> > > > > > > > for
> > > > > > >> > > > > > > > > > partitioning via hash+modulo. But in that
> > case,
> > > > > iiuc,
> > > > > > >> only
> > > > > > >> > > the
> > > > > > >> > > > > > > > partition
> > > > > > >> > > > > > > > > > count matters. So why not just store the
> > > original
> > > > > > >> partition
> > > > > > >> > > > count
> > > > > > >> > > > > > > > rather
> > > > > > >> > > > > > > > > > than the whole mapping?
> > > > > > >> > > > > > > > > >
> > > > > > >> > > > > > > > > > -Jake
> > > > > > >> > > > > > > > > >
> > > > > > >> > > > > > > > > > On Sun, Jun 4, 2017 at 12:32 PM, Dong Lin <
> > > > > > >> > > lindon...@gmail.com
> > > > > > >> > > > >
> > > > > > >> > > > > > > wrote:
> > > > > > >> > > > > > > > > >
> > > > > > >> > > > > > > > > > > Hey Yi, Navina,
> > > > > > >> > > > > > > > > > >
> > > > > > >> > > > > > > > > > > I have updated the SEP-5 document based on
> > our
> > > > > > >> > discussion.
> > > > > > >> > > > The
> > > > > > >> > > > > > > > > difference
> > > > > > >> > > > > > > > > > > can be found here
> > > > > > >> > > > > > > > > > > <https://cwiki.apache.org/
> confluence/pages/
> > > > > > >> > > > > > > diffpagesbyversion.action
> > > > > > >> > > > > > > > ?
> > > > > > >> > > > > > > > > > > pageId=70255476&selectedPageVersions=14&
> > > > > > >> > > selectedPageVersions
> > > > > > >> > > > > > =15>.
> > > > > > >> > > > > > > > > > > Here is the summary of changes:
> > > > > > >> > > > > > > > > > >
> > > > > > >> > > > > > > > > > > - Add new interface that extends the
> > existing
> > > > > > >> interface
> > > > > > >> > > > > > > > > > > SystemStreamPartitionGrouper. Newly-added
> > > > grouper
> > > > > > >> class
> > > > > > >> > > > should
> > > > > > >> > > > > > > > > implement
> > > > > > >> > > > > > > > > > > this interface.
> > > > > > >> > > > > > > > > > > - Explained in the Rejected Alternative
> > > Section
> > > > > why
> > > > > > we
> > > > > > >> > > don't
> > > > > > >> > > > > add
> > > > > > >> > > > > > > new
> > > > > > >> > > > > > > > > > method
> > > > > > >> > > > > > > > > > > in the existing interface
> > > > > > >> > > > > > > > > > > - Explained in the Rejected Alternative
> > > Section
> > > > > why
> > > > > > we
> > > > > > >> > > don't
> > > > > > >> > > > > > > > > config/class
> > > > > > >> > > > > > > > > > > for user to specify new-partition to
> > > > old-partition
> > > > > > >> > mapping.
> > > > > > >> > > > > > > > > > >
> > > > > > >> > > > > > > > > > > Can you take another look at the proposal
> > and
> > > > let
> > > > > me
> > > > > > >> know
> > > > > > >> > > if
> > > > > > >> > > > > > there
> > > > > > >> > > > > > > is
> > > > > > >> > > > > > > > > any
> > > > > > >> > > > > > > > > > > concern?
> > > > > > >> > > > > > > > > > >
> > > > > > >> > > > > > > > > > > Cheers,
> > > > > > >> > > > > > > > > > > Dong
> > > > > > >> > > > > > > > > > >
> > > > > > >> > > > > > > > > > >
> > > > > > >> > > > > > > > > > > On Thu, Jun 1, 2017 at 12:58 AM, Dong Lin
> <
> > > > > > >> > > > lindon...@gmail.com
> > > > > > >> > > > > >
> > > > > > >> > > > > > > > wrote:
> > > > > > >> > > > > > > > > > >
> > > > > > >> > > > > > > > > > > > Hey Yi,
> > > > > > >> > > > > > > > > > > >
> > > > > > >> > > > > > > > > > > > Thanks much for the comment. I have
> > updated
> > > > the
> > > > > > doc
> > > > > > >> to
> > > > > > >> > > > > address
> > > > > > >> > > > > > > all
> > > > > > >> > > > > > > > > your
> > > > > > >> > > > > > > > > > > > comments except the one related to the
> > > > > interface.
> > > > > > I
> > > > > > >> am
> > > > > > >> > > not
> > > > > > >> > > > > > sure I
> > > > > > >> > > > > > > > > > > > understand your suggestion of the new
> > > > interface.
> > > > > > >> Will
> > > > > > >> > > > discuss
> > > > > > >> > > > > > > > > tomorrow.
> > > > > > >> > > > > > > > > > > >
> > > > > > >> > > > > > > > > > > > Thanks,
> > > > > > >> > > > > > > > > > > > Dong
> > > > > > >> > > > > > > > > > > >
> > > > > > >> > > > > > > > > > > > On Wed, May 31, 2017 at 4:29 PM, Yi Pan
> <
> > > > > > >> > > > nickpa...@gmail.com
> > > > > > >> > > > > >
> > > > > > >> > > > > > > > wrote:
> > > > > > >> > > > > > > > > > > >
> > > > > > >> > > > > > > > > > > >> Hi, Don,
> > > > > > >> > > > > > > > > > > >>
> > > > > > >> > > > > > > > > > > >> Thanks for the detailed design doc for
> a
> > > > > > >> long-waited
> > > > > > >> > > > feature
> > > > > > >> > > > > > in
> > > > > > >> > > > > > > > > Samza!
> > > > > > >> > > > > > > > > > > >> Really appreciate it! I did a quick
> pass
> > > and
> > > > > have
> > > > > > >> the
> > > > > > >> > > > > > following
> > > > > > >> > > > > > > > > > > comments:
> > > > > > >> > > > > > > > > > > >>
> > > > > > >> > > > > > > > > > > >> - minor: "limit the maximum size of
> > > > partition"
> > > > > > ==>
> > > > > > >> > > "limit
> > > > > > >> > > > > the
> > > > > > >> > > > > > > > > maximum
> > > > > > >> > > > > > > > > > > size
> > > > > > >> > > > > > > > > > > >> of each partition"
> > > > > > >> > > > > > > > > > > >> - "However, Samza currently is not able
> > to
> > > > > handle
> > > > > > >> > > > partition
> > > > > > >> > > > > > > > > expansion
> > > > > > >> > > > > > > > > > of
> > > > > > >> > > > > > > > > > > >> the input streams"==>better point out
> > "for
> > > > > > stateful
> > > > > > >> > > jobs".
> > > > > > >> > > > > For
> > > > > > >> > > > > > > > > > stateless
> > > > > > >> > > > > > > > > > > >> jobs, simply bouncing the job now can
> > pick
> > > up
> > > > > the
> > > > > > >> new
> > > > > > >> > > > > > > partitions.
> > > > > > >> > > > > > > > > > > >> - "it is possible (e.g. with Kafka)
> that
> > > > > messages
> > > > > > >> > with a
> > > > > > >> > > > > given
> > > > > > >> > > > > > > key
> > > > > > >> > > > > > > > > > > exists
> > > > > > >> > > > > > > > > > > >> in both partition 1 an 3. Because
> > > > > > GroupByPartition
> > > > > > >> > will
> > > > > > >> > > > > assign
> > > > > > >> > > > > > > > > > > partition 1
> > > > > > >> > > > > > > > > > > >> and 3 to different tasks, messages with
> > the
> > > > > same
> > > > > > >> key
> > > > > > >> > may
> > > > > > >> > > > be
> > > > > > >> > > > > > > > handled
> > > > > > >> > > > > > > > > by
> > > > > > >> > > > > > > > > > > >> different task/container/process and
> > their
> > > > > state
> > > > > > >> will
> > > > > > >> > be
> > > > > > >> > > > > > stored
> > > > > > >> > > > > > > in
> > > > > > >> > > > > > > > > > > >> different changelog partition." The
> > problem
> > > > > > >> statement
> > > > > > >> > is
> > > > > > >> > > > not
> > > > > > >> > > > > > > super
> > > > > > >> > > > > > > > > > clear
> > > > > > >> > > > > > > > > > > >> here. The issues with stateful jobs is:
> > > after
> > > > > > >> > > > > GroupByPartition
> > > > > > >> > > > > > > > > assign
> > > > > > >> > > > > > > > > > > >> partition 1 and 3 to different tasks,
> the
> > > new
> > > > > > task
> > > > > > >> > > > handling
> > > > > > >> > > > > > > > > partition
> > > > > > >> > > > > > > > > > 3
> > > > > > >> > > > > > > > > > > >> does not have the previous state to
> > resume
> > > > the
> > > > > > >> work.
> > > > > > >> > > e.g.
> > > > > > >> > > > a
> > > > > > >> > > > > > > > page-key
> > > > > > >> > > > > > > > > > > based
> > > > > > >> > > > > > > > > > > >> counter would start from 0 in the new
> > task
> > > > for
> > > > > a
> > > > > > >> > > specific
> > > > > > >> > > > > key,
> > > > > > >> > > > > > > > > instead
> > > > > > >> > > > > > > > > > > of
> > > > > > >> > > > > > > > > > > >> resuming the previous count 50 held by
> > task
> > > > 1.
> > > > > > >> > > > > > > > > > > >> - minor rewording: "the first solution
> in
> > > > this
> > > > > > doc"
> > > > > > >> > ==>
> > > > > > >> > > > "the
> > > > > > >> > > > > > > > > solution
> > > > > > >> > > > > > > > > > > >> proposed in this doc"
> > > > > > >> > > > > > > > > > > >> - "Thus additional development work is
> > > needed
> > > > > in
> > > > > > >> Kafka
> > > > > > >> > > to
> > > > > > >> > > > > meet
> > > > > > >> > > > > > > > this
> > > > > > >> > > > > > > > > > > >> requirement" It would be good to link
> to
> > a
> > > > KIP
> > > > > if
> > > > > > >> and
> > > > > > >> > > when
> > > > > > >> > > > > it
> > > > > > >> > > > > > > > exists
> > > > > > >> > > > > > > > > > > >> - Instead of touching/deprecating the
> > > > interface
> > > > > > >> > > > > > > > > > > >> SystemStreamPartitionGrouper, I would
> > > > recommend
> > > > > > to
> > > > > > >> > have
> > > > > > >> > > a
> > > > > > >> > > > > > > > different
> > > > > > >> > > > > > > > > > > >> implementation class of the interface,
> > > which
> > > > in
> > > > > > the
> > > > > > >> > > > > > constructor
> > > > > > >> > > > > > > of
> > > > > > >> > > > > > > > > the
> > > > > > >> > > > > > > > > > > >> grouper, takes two parameters: a) the
> > > > previous
> > > > > > task
> > > > > > >> > > number
> > > > > > >> > > > > > read
> > > > > > >> > > > > > > > from
> > > > > > >> > > > > > > > > > the
> > > > > > >> > > > > > > > > > > >> coordinator stream; b) the configured
> > > > > > >> new-partition to
> > > > > > >> > > > > > > > old-partition
> > > > > > >> > > > > > > > > > > >> mapping policy. Then, the grouper's
> > > interface
> > > > > > >> method
> > > > > > >> > > stays
> > > > > > >> > > > > the
> > > > > > >> > > > > > > > same
> > > > > > >> > > > > > > > > > and
> > > > > > >> > > > > > > > > > > >> the
> > > > > > >> > > > > > > > > > > >> behavior of the grouper is more
> > > configurable
> > > > > > which
> > > > > > >> is
> > > > > > >> > > good
> > > > > > >> > > > > to
> > > > > > >> > > > > > > > > support
> > > > > > >> > > > > > > > > > a
> > > > > > >> > > > > > > > > > > >> broader set of use cases in addition to
> > > > Kafka's
> > > > > > >> > built-in
> > > > > > >> > > > > > > partition
> > > > > > >> > > > > > > > > > > >> expansion policies.
> > > > > > >> > > > > > > > > > > >> - Minor renaming suggestion to the new
> > > > grouper
> > > > > > >> class
> > > > > > >> > > > names:
> > > > > > >> > > > > > > > > > > >> GroupByPartitionWithFixedTaskNum
> > > > > > >> > > > > > > > > > > >> and GroupBySystemStreamPartitionWi
> > > > > thFixedTaskNum
> > > > > > >> > > > > > > > > > > >>
> > > > > > >> > > > > > > > > > > >> Thanks!
> > > > > > >> > > > > > > > > > > >>
> > > > > > >> > > > > > > > > > > >> - Yi
> > > > > > >> > > > > > > > > > > >>
> > > > > > >> > > > > > > > > > > >> On Wed, May 31, 2017 at 10:33 AM, Dong
> > Lin
> > > <
> > > > > > >> > > > > > lindon...@gmail.com
> > > > > > >> > > > > > > >
> > > > > > >> > > > > > > > > > wrote:
> > > > > > >> > > > > > > > > > > >>
> > > > > > >> > > > > > > > > > > >> > Hey Navina,
> > > > > > >> > > > > > > > > > > >> >
> > > > > > >> > > > > > > > > > > >> > Thanks much for the comment. Please
> see
> > > my
> > > > > > >> response
> > > > > > >> > > > below.
> > > > > > >> > > > > > > > > > > >> >
> > > > > > >> > > > > > > > > > > >> > Regarding your biggest gripe with the
> > > SEP,
> > > > I
> > > > > > >> > > personally
> > > > > > >> > > > > > think
> > > > > > >> > > > > > > > the
> > > > > > >> > > > > > > > > > > >> > operational requirement proposed in
> the
> > > KIP
> > > > > are
> > > > > > >> > pretty
> > > > > > >> > > > > > general
> > > > > > >> > > > > > > > and
> > > > > > >> > > > > > > > > > > >> could be
> > > > > > >> > > > > > > > > > > >> > easily enforced by other systems. The
> > > > reason
> > > > > is
> > > > > > >> that
> > > > > > >> > > the
> > > > > > >> > > > > > > module
> > > > > > >> > > > > > > > > > > >> operation
> > > > > > >> > > > > > > > > > > >> > is pretty standard and the default
> > option
> > > > > when
> > > > > > we
> > > > > > >> > > choose
> > > > > > >> > > > > > > > > partition.
> > > > > > >> > > > > > > > > > > And
> > > > > > >> > > > > > > > > > > >> > usually the underlying system allows
> > user
> > > > to
> > > > > > >> select
> > > > > > >> > > > > > arbitrary
> > > > > > >> > > > > > > > > > > partition
> > > > > > >> > > > > > > > > > > >> > number if it supports partition
> > > expansion.
> > > > Do
> > > > > > you
> > > > > > >> > know
> > > > > > >> > > > any
> > > > > > >> > > > > > > > system
> > > > > > >> > > > > > > > > > that
> > > > > > >> > > > > > > > > > > >> does
> > > > > > >> > > > > > > > > > > >> > not meet these two requirement?
> > > > > > >> > > > > > > > > > > >> >
> > > > > > >> > > > > > > > > > > >> > Regarding your comment of the
> > Motivation
> > > > > > >> section, I
> > > > > > >> > > > > renamed
> > > > > > >> > > > > > > the
> > > > > > >> > > > > > > > > > first
> > > > > > >> > > > > > > > > > > >> > section as "Problem and Goal" and
> > > specified
> > > > > > that
> > > > > > >> > "*The
> > > > > > >> > > > > goal
> > > > > > >> > > > > > of
> > > > > > >> > > > > > > > > this
> > > > > > >> > > > > > > > > > > >> > proposal is to enable partition
> > expansion
> > > > of
> > > > > > the
> > > > > > >> > input
> > > > > > >> > > > > > > > > streams*.". I
> > > > > > >> > > > > > > > > > > >> also
> > > > > > >> > > > > > > > > > > >> > put a sentence at the end of the
> > > Motivation
> > > > > > >> section
> > > > > > >> > > that
> > > > > > >> > > > > > "*The
> > > > > > >> > > > > > > > > > feature
> > > > > > >> > > > > > > > > > > >> of
> > > > > > >> > > > > > > > > > > >> > task expansion is out of the scope of
> > > this
> > > > > > >> proposal
> > > > > > >> > > and
> > > > > > >> > > > > will
> > > > > > >> > > > > > > be
> > > > > > >> > > > > > > > > > > >> addressed
> > > > > > >> > > > > > > > > > > >> > in a future SEP*". The second
> paragraph
> > > in
> > > > > the
> > > > > > >> > > > Motivation
> > > > > > >> > > > > > > > section
> > > > > > >> > > > > > > > > is
> > > > > > >> > > > > > > > > > > >> mainly
> > > > > > >> > > > > > > > > > > >> > used to explain the thinking process
> > that
> > > > we
> > > > > > have
> > > > > > >> > gone
> > > > > > >> > > > > > > through,
> > > > > > >> > > > > > > > > what
> > > > > > >> > > > > > > > > > > >> other
> > > > > > >> > > > > > > > > > > >> > alternative we have considered, and
> we
> > > plan
> > > > > to
> > > > > > >> do in
> > > > > > >> > > > Samza
> > > > > > >> > > > > > in
> > > > > > >> > > > > > > > the
> > > > > > >> > > > > > > > > > nex
> > > > > > >> > > > > > > > > > > >> step.
> > > > > > >> > > > > > > > > > > >> >
> > > > > > >> > > > > > > > > > > >> > To answer your question why
> increasing
> > > the
> > > > > > >> partition
> > > > > > >> > > > > number
> > > > > > >> > > > > > > will
> > > > > > >> > > > > > > > > > > >> increase
> > > > > > >> > > > > > > > > > > >> > the throughput of the kafka consumer
> in
> > > the
> > > > > > >> > container,
> > > > > > >> > > > > Kafka
> > > > > > >> > > > > > > > > > consumer
> > > > > > >> > > > > > > > > > > >> can
> > > > > > >> > > > > > > > > > > >> > potentially fetch more data in one
> > > > > > FetchResponse
> > > > > > >> > with
> > > > > > >> > > > more
> > > > > > >> > > > > > > > > > partitions
> > > > > > >> > > > > > > > > > > in
> > > > > > >> > > > > > > > > > > >> > the FetchRequest. This is because we
> > > limit
> > > > > the
> > > > > > >> > maximum
> > > > > > >> > > > > > amount
> > > > > > >> > > > > > > of
> > > > > > >> > > > > > > > > > data
> > > > > > >> > > > > > > > > > > >> that
> > > > > > >> > > > > > > > > > > >> > can be fetch for a given partition in
> > the
> > > > > > >> > > FetchResponse.
> > > > > > >> > > > > > This
> > > > > > >> > > > > > > by
> > > > > > >> > > > > > > > > > > >> default is
> > > > > > >> > > > > > > > > > > >> > set to 1 MB. And there is reason that
> > we
> > > > can
> > > > > > not
> > > > > > >> > > > > arbitrarily
> > > > > > >> > > > > > > > bump
> > > > > > >> > > > > > > > > up
> > > > > > >> > > > > > > > > > > >> this
> > > > > > >> > > > > > > > > > > >> > limit.
> > > > > > >> > > > > > > > > > > >> >
> > > > > > >> > > > > > > > > > > >> > To answer your question how partition
> > > > > expansion
> > > > > > >> in
> > > > > > >> > > Kafka
> > > > > > >> > > > > > > impacts
> > > > > > >> > > > > > > > > the
> > > > > > >> > > > > > > > > > > >> > clients, Kafka consumer is able to
> > > > > > automatically
> > > > > > >> > > detect
> > > > > > >> > > > > new
> > > > > > >> > > > > > > > > > partition
> > > > > > >> > > > > > > > > > > of
> > > > > > >> > > > > > > > > > > >> > the topic and reassign all (both old
> > and
> > > > new)
> > > > > > >> > > partitions
> > > > > > >> > > > > > > across
> > > > > > >> > > > > > > > > > > >> consumers
> > > > > > >> > > > > > > > > > > >> > in the consumer group IF you tell
> > > consumer
> > > > > the
> > > > > > >> topic
> > > > > > >> > > to
> > > > > > >> > > > be
> > > > > > >> > > > > > > > > > subscribed.
> > > > > > >> > > > > > > > > > > >> But
> > > > > > >> > > > > > > > > > > >> > consumer in Samza's container uses
> > > another
> > > > > way
> > > > > > of
> > > > > > >> > > > > > > subscription.
> > > > > > >> > > > > > > > > > > Instead
> > > > > > >> > > > > > > > > > > >> of
> > > > > > >> > > > > > > > > > > >> > subscribing to the topic, the
> consumer
> > in
> > > > > > Samza's
> > > > > > >> > > > > container
> > > > > > >> > > > > > > > > > subscribes
> > > > > > >> > > > > > > > > > > >> to
> > > > > > >> > > > > > > > > > > >> > the specific partitions of the topic.
> > In
> > > > this
> > > > > > >> case,
> > > > > > >> > if
> > > > > > >> > > > new
> > > > > > >> > > > > > > > > > partitions
> > > > > > >> > > > > > > > > > > >> have
> > > > > > >> > > > > > > > > > > >> > been added, Samza will need to
> > explicitly
> > > > > > >> subscribe
> > > > > > >> > to
> > > > > > >> > > > the
> > > > > > >> > > > > > new
> > > > > > >> > > > > > > > > > > >> partitions
> > > > > > >> > > > > > > > > > > >> > of the topic. The "Handle partition
> > > > expansion
> > > > > > >> while
> > > > > > >> > > > tasks
> > > > > > >> > > > > > are
> > > > > > >> > > > > > > > > > running"
> > > > > > >> > > > > > > > > > > >> > section in the SEP addresses this
> issue
> > > in
> > > > > > Samza
> > > > > > >> --
> > > > > > >> > it
> > > > > > >> > > > > > > > > recalculates
> > > > > > >> > > > > > > > > > > the
> > > > > > >> > > > > > > > > > > >> job
> > > > > > >> > > > > > > > > > > >> > model and restart container so that
> > > > consumer
> > > > > > can
> > > > > > >> > > > subscribe
> > > > > > >> > > > > > to
> > > > > > >> > > > > > > > the
> > > > > > >> > > > > > > > > > new
> > > > > > >> > > > > > > > > > > >> > partitions.
> > > > > > >> > > > > > > > > > > >> >
> > > > > > >> > > > > > > > > > > >> > I will ask other dev to take a look
> at
> > > the
> > > > > > >> > proposal. I
> > > > > > >> > > > > will
> > > > > > >> > > > > > > > start
> > > > > > >> > > > > > > > > > the
> > > > > > >> > > > > > > > > > > >> > voting thread tomorrow if there is no
> > > > further
> > > > > > >> > concern
> > > > > > >> > > > with
> > > > > > >> > > > > > the
> > > > > > >> > > > > > > > > SEP.
> > > > > > >> > > > > > > > > > > >> >
> > > > > > >> > > > > > > > > > > >> > Thanks!
> > > > > > >> > > > > > > > > > > >> > Dong
> > > > > > >> > > > > > > > > > > >> >
> > > > > > >> > > > > > > > > > > >> >
> > > > > > >> > > > > > > > > > > >> > On Wed, May 31, 2017 at 12:01 AM,
> > Navina
> > > > > Ramesh
> > > > > > >> > > > (Apache) <
> > > > > > >> > > > > > > > > > > >> > nav...@apache.org>
> > > > > > >> > > > > > > > > > > >> > wrote:
> > > > > > >> > > > > > > > > > > >> >
> > > > > > >> > > > > > > > > > > >> > > Hey Dong,
> > > > > > >> > > > > > > > > > > >> > >
> > > > > > >> > > > > > > > > > > >> > > >  I have updated the motivation
> > > section
> > > > to
> > > > > > >> > clarify
> > > > > > >> > > > > this.
> > > > > > >> > > > > > > > > > > >> > >
> > > > > > >> > > > > > > > > > > >> > > Thanks for updating the motivation.
> > > > Couple
> > > > > of
> > > > > > >> > notes
> > > > > > >> > > > > here:
> > > > > > >> > > > > > > > > > > >> > >
> > > > > > >> > > > > > > > > > > >> > > 1.
> > > > > > >> > > > > > > > > > > >> > > > "The motivation of increasing
> > > partition
> > > > > > >> number
> > > > > > >> > of
> > > > > > >> > > > > Kafka
> > > > > > >> > > > > > > > topic
> > > > > > >> > > > > > > > > > > >> includes
> > > > > > >> > > > > > > > > > > >> > 1)
> > > > > > >> > > > > > > > > > > >> > > limit the maximum size of a
> partition
> > > in
> > > > > > order
> > > > > > >> to
> > > > > > >> > > > > improve
> > > > > > >> > > > > > > > broker
> > > > > > >> > > > > > > > > > > >> > > performance and 2) increase
> > throughput
> > > of
> > > > > > Kafka
> > > > > > >> > > > consumer
> > > > > > >> > > > > > in
> > > > > > >> > > > > > > > the
> > > > > > >> > > > > > > > > > > Samza
> > > > > > >> > > > > > > > > > > >> > > container."
> > > > > > >> > > > > > > > > > > >> > >
> > > > > > >> > > > > > > > > > > >> > > It's unclear to me how increasing
> the
> > > > > > partition
> > > > > > >> > > number
> > > > > > >> > > > > > will
> > > > > > >> > > > > > > > > > increase
> > > > > > >> > > > > > > > > > > >> the
> > > > > > >> > > > > > > > > > > >> > > throughput of the kafka consumer in
> > the
> > > > > > >> container?
> > > > > > >> > > > > > > > > Theoretically,
> > > > > > >> > > > > > > > > > > you
> > > > > > >> > > > > > > > > > > >> > will
> > > > > > >> > > > > > > > > > > >> > > still be consuming the same amount
> of
> > > > data
> > > > > in
> > > > > > >> the
> > > > > > >> > > > > > container,
> > > > > > >> > > > > > > > > > > >> irrespective
> > > > > > >> > > > > > > > > > > >> > > of whether it is coming from one
> > > > partition
> > > > > or
> > > > > > >> more
> > > > > > >> > > > than
> > > > > > >> > > > > > one
> > > > > > >> > > > > > > > > > expanded
> > > > > > >> > > > > > > > > > > >> > > partitions. Can you please explain
> it
> > > for
> > > > > me
> > > > > > >> here,
> > > > > > >> > > > what
> > > > > > >> > > > > > you
> > > > > > >> > > > > > > > mean
> > > > > > >> > > > > > > > > > by
> > > > > > >> > > > > > > > > > > >> that?
> > > > > > >> > > > > > > > > > > >> > >
> > > > > > >> > > > > > > > > > > >> > > 2. I believe the second paragraph
> > under
> > > > > > >> motivation
> > > > > > >> > > is
> > > > > > >> > > > > > simply
> > > > > > >> > > > > > > > > > talking
> > > > > > >> > > > > > > > > > > >> > about
> > > > > > >> > > > > > > > > > > >> > > the scope of the current SEP. It
> will
> > > be
> > > > > > >> easier to
> > > > > > >> > > > read
> > > > > > >> > > > > if
> > > > > > >> > > > > > > > what
> > > > > > >> > > > > > > > > > > >> solution
> > > > > > >> > > > > > > > > > > >> > is
> > > > > > >> > > > > > > > > > > >> > > included in this SEP and what is
> left
> > > out
> > > > > as
> > > > > > >> not
> > > > > > >> > in
> > > > > > >> > > > > scope.
> > > > > > >> > > > > > > > (for
> > > > > > >> > > > > > > > > > > >> example,
> > > > > > >> > > > > > > > > > > >> > > expansions for stateful jobs is
> > > supported
> > > > > or
> > > > > > >> not).
> > > > > > >> > > > > > > > > > > >> > >
> > > > > > >> > > > > > > > > > > >> > > > We need to persist the
> > > task-to-sspList
> > > > > > >> mapping
> > > > > > >> > in
> > > > > > >> > > > the
> > > > > > >> > > > > > > > > > > >> > > coordinator stream so that the job
> > can
> > > > > derive
> > > > > > >> the
> > > > > > >> > > > > original
> > > > > > >> > > > > > > > > number
> > > > > > >> > > > > > > > > > of
> > > > > > >> > > > > > > > > > > >> > > partitions of each input stream
> > > > regardless
> > > > > of
> > > > > > >> how
> > > > > > >> > > many
> > > > > > >> > > > > > times
> > > > > > >> > > > > > > > the
> > > > > > >> > > > > > > > > > > >> > partition
> > > > > > >> > > > > > > > > > > >> > > has expanded. Does this make sense?
> > > > > > >> > > > > > > > > > > >> > >
> > > > > > >> > > > > > > > > > > >> > > Yes. It does!
> > > > > > >> > > > > > > > > > > >> > >
> > > > > > >> > > > > > > > > > > >> > > > I am not sure how this is related
> > to
> > > > the
> > > > > > >> > locality
> > > > > > >> > > > > > though.
> > > > > > >> > > > > > > > Can
> > > > > > >> > > > > > > > > > you
> > > > > > >> > > > > > > > > > > >> > clarify
> > > > > > >> > > > > > > > > > > >> > > your question if I haven't answered
> > > your
> > > > > > >> question?
> > > > > > >> > > > > > > > > > > >> > >
> > > > > > >> > > > > > > > > > > >> > > It's not related. I just meant to
> > give
> > > an
> > > > > > >> example
> > > > > > >> > of
> > > > > > >> > > > yet
> > > > > > >> > > > > > > > another
> > > > > > >> > > > > > > > > > > >> > > coordinator message that is
> > persisted.
> > > > Your
> > > > > > >> > > > ssp-to-task
> > > > > > >> > > > > > > > mapping
> > > > > > >> > > > > > > > > is
> > > > > > >> > > > > > > > > > > >> > > following a similar pattern for
> > > > persisting.
> > > > > > >> Just
> > > > > > >> > > > wanted
> > > > > > >> > > > > to
> > > > > > >> > > > > > > > > clarify
> > > > > > >> > > > > > > > > > > >> that.
> > > > > > >> > > > > > > > > > > >> > >
> > > > > > >> > > > > > > > > > > >> > > > Can you let me know if this,
> > together
> > > > > with
> > > > > > >> the
> > > > > > >> > > > answers
> > > > > > >> > > > > > in
> > > > > > >> > > > > > > > the
> > > > > > >> > > > > > > > > > > >> previous
> > > > > > >> > > > > > > > > > > >> > > email, addresses all your
> questions?
> > > > > > >> > > > > > > > > > > >> > >
> > > > > > >> > > > > > > > > > > >> > > Yes. I believe you have addressed
> > most
> > > of
> > > > > my
> > > > > > >> > > > questions.
> > > > > > >> > > > > > > Thanks
> > > > > > >> > > > > > > > > for
> > > > > > >> > > > > > > > > > > >> taking
> > > > > > >> > > > > > > > > > > >> > > time to do that.
> > > > > > >> > > > > > > > > > > >> > >
> > > > > > >> > > > > > > > > > > >> > > > Is there specific question you
> have
> > > > > > regarding
> > > > > > >> > > > > partition
> > > > > > >> > > > > > > > > > > >> > > expansion in Kafka?
> > > > > > >> > > > > > > > > > > >> > >
> > > > > > >> > > > > > > > > > > >> > > I guess my questions are on how
> > > partition
> > > > > > >> > expansion
> > > > > > >> > > in
> > > > > > >> > > > > > Kafka
> > > > > > >> > > > > > > > > > impacts
> > > > > > >> > > > > > > > > > > >> the
> > > > > > >> > > > > > > > > > > >> > > clients. Iiuc, partition expansions
> > are
> > > > > done
> > > > > > >> > > manually
> > > > > > >> > > > in
> > > > > > >> > > > > > > Kafka
> > > > > > >> > > > > > > > > > based
> > > > > > >> > > > > > > > > > > >> on
> > > > > > >> > > > > > > > > > > >> > the
> > > > > > >> > > > > > > > > > > >> > > bytes-in rate of the partition. Do
> > the
> > > > > > existing
> > > > > > >> > > kafka
> > > > > > >> > > > > > > clients
> > > > > > >> > > > > > > > > > handle
> > > > > > >> > > > > > > > > > > >> this
> > > > > > >> > > > > > > > > > > >> > > expansion automatically? if yes,
> how
> > > does
> > > > > it
> > > > > > >> work?
> > > > > > >> > > If
> > > > > > >> > > > > not,
> > > > > > >> > > > > > > are
> > > > > > >> > > > > > > > > > there
> > > > > > >> > > > > > > > > > > >> > plans
> > > > > > >> > > > > > > > > > > >> > > to support it in the future?
> > > > > > >> > > > > > > > > > > >> > >
> > > > > > >> > > > > > > > > > > >> > > > Thus user's job should not need
> to
> > > > > > bootstrap
> > > > > > >> > > > key/value
> > > > > > >> > > > > > > store
> > > > > > >> > > > > > > > > > from
> > > > > > >> > > > > > > > > > > >> the
> > > > > > >> > > > > > > > > > > >> > > changelog topic.
> > > > > > >> > > > > > > > > > > >> > >
> > > > > > >> > > > > > > > > > > >> > > Why is this discussion relevant
> here?
> > > > > > Key/value
> > > > > > >> > > store
> > > > > > >> > > > /
> > > > > > >> > > > > > > > > changelog
> > > > > > >> > > > > > > > > > > >> topic
> > > > > > >> > > > > > > > > > > >> > > partition is scoped with the
> context
> > > of a
> > > > > > task.
> > > > > > >> > > Since
> > > > > > >> > > > we
> > > > > > >> > > > > > are
> > > > > > >> > > > > > > > not
> > > > > > >> > > > > > > > > > > >> changing
> > > > > > >> > > > > > > > > > > >> > > the number of tasks, I don't think
> it
> > > is
> > > > > > >> required
> > > > > > >> > to
> > > > > > >> > > > > > mention
> > > > > > >> > > > > > > > it
> > > > > > >> > > > > > > > > > > here.
> > > > > > >> > > > > > > > > > > >> > >
> > > > > > >> > > > > > > > > > > >> > > > The new method takes the
> > > > > > >> > > > SystemStreamPartition-to-Task
> > > > > > >> > > > > > > > > > assignment
> > > > > > >> > > > > > > > > > > >> from
> > > > > > >> > > > > > > > > > > >> > > the previous job model which can be
> > > read
> > > > > from
> > > > > > >> the
> > > > > > >> > > > > > > coordinator
> > > > > > >> > > > > > > > > > > stream.
> > > > > > >> > > > > > > > > > > >> > >
> > > > > > >> > > > > > > > > > > >> > > Jobmodel is currently not persisted
> > to
> > > > > > >> coordinator
> > > > > > >> > > > > stream.
> > > > > > >> > > > > > > In
> > > > > > >> > > > > > > > > your
> > > > > > >> > > > > > > > > > > >> > design,
> > > > > > >> > > > > > > > > > > >> > > you talk about writing separate
> > > > coordinator
> > > > > > >> > messages
> > > > > > >> > > > for
> > > > > > >> > > > > > > > > > ssp-to-task
> > > > > > >> > > > > > > > > > > >> > > assignments. Hence, please correct
> > this
> > > > > > >> statement.
> > > > > > >> > > It
> > > > > > >> > > > is
> > > > > > >> > > > > > > kind
> > > > > > >> > > > > > > > of
> > > > > > >> > > > > > > > > > > >> > misleading
> > > > > > >> > > > > > > > > > > >> > > to the reader.
> > > > > > >> > > > > > > > > > > >> > >
> > > > > > >> > > > > > > > > > > >> > > My biggest gripe with this SEP is
> > that
> > > it
> > > > > > seems
> > > > > > >> > > like a
> > > > > > >> > > > > > > > > tailor-made
> > > > > > >> > > > > > > > > > > >> > solution
> > > > > > >> > > > > > > > > > > >> > > that relies on the semantics of the
> > > Kafka
> > > > > > >> system
> > > > > > >> > and
> > > > > > >> > > > > yet,
> > > > > > >> > > > > > we
> > > > > > >> > > > > > > > are
> > > > > > >> > > > > > > > > > > >> trying
> > > > > > >> > > > > > > > > > > >> > to
> > > > > > >> > > > > > > > > > > >> > > masquerade that as operational
> > > > requirements
> > > > > > for
> > > > > > >> > > other
> > > > > > >> > > > > > > systems
> > > > > > >> > > > > > > > > > > >> interacting
> > > > > > >> > > > > > > > > > > >> > > with Samza. (Not to say that this
> is
> > > the
> > > > > > first
> > > > > > >> > time
> > > > > > >> > > > > such a
> > > > > > >> > > > > > > > > choice
> > > > > > >> > >
> ...
>
> [Message clipped]




-- 
Navina R.

Reply via email to