Hey Dong,

I'm opposed (or a +0, at best) to this limited, Kafka-specific solution. I
understand that the proposal is relatively simple to implement, but I think
it will cause headaches for Samza users. They will not only have to
understand all the limitations (increase only, double partitions only,
partition using hash+modulo, etc) of this approach, but enforcing these
limitations can be a major problem, especially when the Samza jobs and
message brokers are managed by separate orgs in a company. Separate orgs
are often difficult to coordinate and a system which depends on such
significant process/coordination is too fragile for my taste.

That said, I realize that my opinion is just one of many in the broader
community which may feel differently, so let me respond to some of the
other items in the discussion so we can clear them up:

The task-to-container assignment matters because if the correlated tasks
> (i.e. tasks that consume messages with the same key) needs to be in the
> same container so that they can share the same key/value local store on the
> same physical machine.

There is currently no supported way of sharing state among the tasks of a
container.  Each task has its own isolated store and that logical isolation
is the primary thing that enables Samza jobs to scale with a simple
container count change. My feeling is that we should not change this
without good reason.

I think we can hardcode new logic in KafkaCheckpointLogKey.scala such that
> exception will not be thrown if new grouper is
> GroupByPartitionWithFixedTaskNum and old grouper is GroupByPartition. Does
> this look reasonable?

With the current proposal, we'd also need a similar check for
GroupBySystemStreamPartitionWithFixedTaskNum as well. And if any other
groupers were later added with both these modes, we'd probably need to add
those too. It might be easier and cleaner to add a config to ignore that
check temporarily. Down side is that it further complicates the Samza
config, which is already huge. Thoughts?

I think storing the previous task-to-partition mapping is more general than
> storing the partition count of all topics for the following reasons:
> - Samza already stores the task-to-container mapping and container-to-host
> mapping in the coordinator stream. It seems consistent to also store the
> partition-to-task mapping. And this information may be useful for other
> use-case such as debugging.
> - By having the new interface take the previous task-to-partition
> assignment instead of a topic-to-partition-count mapping as new parameter,
> we can potentially have grouper implementation to support other types of
> input systems.
> - It is sightly simpler to store the task-to-partition assignment because
> we don't need to know whether this is the first time a job is started or
> not. On the other hand, you can write topic-to-partition-count mapping to
> the coordinator stream only if this is the first time the job is run

The task-to-container and container-to-host mappings are both meaningful in
context of the JobModel. Partition-to-task mapping is not meaningful
without some definition of the key-to-partition assignments. It's
incomplete information and therefore misleading. I think it only makes
sense to use this mapping if we adopt a solution wherein Samza also knows
the partition key assignment.

-Jake

On Tue, Jun 6, 2017 at 11:06 PM, Dong Lin <lindon...@gmail.com> wrote:

> Hey Jacob,
>
> Thanks for taking time to review the SEP.
>
> I agree with you and Navina that the current SEP doesn't provide support to
> arbitrary input systems and it doesn't support partition shrink. I think
> the scope of this SEP is to support partition expansion for Kafka (the most
> widely used input system of Samza) and keep the door open for partition
> support of various input systems. The current design can support any system
> that meets the two operational requirement specified in the doc.
>
> While it is possible to support more types of input systems, it will likely
> add more complexity to the design. For example, the first alternative
> solution from you requires broker-side support to negotiate hash algorithm.
> The second alternative solution requires changelog partition reshuffle
> which carries its own design complexity and performance overhead. There is
> tradeoff between the generality and the complexity among these choices. I
> like the current design because it is simple and addresses a big usage
> scenario for us. We can add more complexity to generalize the design if it
> enables important use-case. Does this sound reasonable?
>
> Note that the "Rejected Alternative" section also mentions the possibility
> of supporting a wider range of input systems by allowing user to specify
> the new-partition to old-partition mapping. We are not doing it because 1)
> we may have better understanding of the design after we have a specific
> second input system to support 2) the current design can be extended to
> support general input systems. I think similar argument can be applied
> explain why we don't have to support general input systems using the
> potentially-good alternatives you mentioned.
>
> I hope SEP-5 can be an important first-step towards supporting partition
> expansion of any input system.
>
> To answer your questions about the current proposal:
>
> >1. "An alternative solution is to allow task number to increase after
> >partition expansion and uses a proper task-to-container assignment to make
> >sure the Samza output is correct." What does the container have to do with
> >stateful processing or output in general?
>
> The task-to-container assignment matters because if the correlated tasks
> (i.e. tasks that consume messages with the same key) needs to be in the
> same container so that they can share the same key/value local store on the
> same physical machine.
>
> >2. When you use "Join" as an example, you basically mean multiple
> >co-partitioned streams, right? This is opposed to multiple,
> >independently-partitioned streams or a single stream. Would be nice to
> >formulate the proposal in these more general terms.
>
> I thought "join" is a commonly used to refer to the join opeartion with
> co-partitioned stream but I may be wrong. I have updated the wiki to
> explicitly mention "co-partitioned stream". Does this look better now?
>
> >3. When switching SSP groupers, how will the users avoid the
> >org.apache.samza.checkpoint.kafka.DifferingSystemStreamPartition
> GrouperFactoryValues
> >exception?
>
> I think we can hardcode new logic in KafkaCheckpointLogKey.scala such that
> exception will not be thrown if new grouper is
> GroupByPartitionWithFixedTaskNum and old grouper is GroupByPartition. Does
> this look reasonable?
>
> >4. Partition to task assignment is meaningless without key to partition
> >mapping. The real semantics are captured in the external requirement for
> >partitioning via hash+modulo. But in that case, iiuc, only the partition
> >count matters. So why not just store the original partition count rather
> >than the whole mapping?
>
> I think storing the previous task-to-partition mapping is more general than
> storing the partition count of all topics for the following reasons:
>
> - Samza already stores the task-to-container mapping and container-to-host
> mapping in the coordinator stream. It seems consistent to also store the
> partition-to-task mapping. And this information may be useful for other
> use-case such as debugging.
>
> - By having the new interface take the previous task-to-partition
> assignment instead of a topic-to-partition-count mapping as new parameter,
> we can potentially have grouper implementation to support other types of
> input systems.
>
> - It is sightly simpler to store the task-to-partition assignment because
> we don't need to know whether this is the first time a job is started or
> not. On the other hand, you can write topic-to-partition-count mapping to
> the coordinator stream only if this is the first time the job is run
>
> Thanks,
> Dong
>
> On Mon, Jun 5, 2017 at 11:17 AM, Jacob Maes <jacob.m...@gmail.com> wrote:
>
> > Hey Dong,
> >
> > Thanks for the SEP. Supporting partition changes is critically important
> > for stateful Samza jobs, so it's great to see some ideas on that front!
> >
> > Sorry for the late feedback, but I have a few thoughts to contribute.
> >
> > Big +1 on Navina's comment:
> >
> > > My biggest gripe with this SEP is that it seems like a tailor-made
> > > solution
> > > that relies on the semantics of the Kafka system and yet, we are trying
> > to
> > > masquerade that as operational requirements for other systems
> interacting
> > > with Samza. (Not to say that this is the first time such a choice is
> > being
> > > made in the Samza design). I am not seeing how this can a "general"
> > > solution for all input systems. That's my two cents. I would like to
> hear
> > > alternative points of view for this from other devs.
> >
> >
> > Two examples of this:
> > 1. This is mostly a hypothetical, but some message brokers may use key
> > range assignment rather than hash+modulo.
> > 2. Kafka can't reduce the number of partitions, but it can happen on
> other
> > systems. For example, it may be cheaper to reduce the number of
> partitions
> > on a hosted service where the cost model depends on the number of
> > partitions/shards.
> >
> > It seems to me that a solution which doesn't depend on partition key
> > assignment in the message broker. Here are a few alternatives that
> weren't
> > discussed and I think should be considered:
> >
> > Alternatives in order of increasing preference:
> > 1. Samza manages the partition hash (via some new contract with the
> > brokers) and guarantees correct routing of keys among the new partitions.
> > 2. Samza detects a task count change, creates a new changelog with
> correct
> > partitions, and *somehow* reshuffles all existing changelog data into the
> > new topic and then uses the new topic from then on. (doesn't work without
> > changelog, but in that case durability isn't paramount, so we can just
> > wipe)
> > 3. Use RPC in between stages and samza fully manages key assignment among
> > tasks. No on-disk topic data to clean up. Mandatory repartitioning in the
> > first stage to pre-scaled tasks in next stage.
> > 4. Combined 2-3 solution
> >
> > Finally, some questions about the current proposal:
> > 1. "An alternative solution is to allow task number to increase after
> > partition expansion and uses a proper task-to-container assignment to
> make
> > sure the Samza output is correct." What does the container have to do
> with
> > stateful processing or output in general?
> > 2. When you use "Join" as an example, you basically mean multiple
> > co-partitioned streams, right? This is opposed to multiple,
> > independently-partitioned streams or a single stream. Would be nice to
> > formulate the proposal in these more general terms.
> > 3. When switching SSP groupers, how will the users avoid the
> > org.apache.samza.checkpoint.kafka.DifferingSystemStreamParti
> > tionGrouperFactoryValues
> > exception?
> > 4. Partition to task assignment is meaningless without key to partition
> > mapping. The real semantics are captured in the external requirement for
> > partitioning via hash+modulo. But in that case, iiuc, only the partition
> > count matters. So why not just store the original partition count rather
> > than the whole mapping?
> >
> > -Jake
> >
> > On Sun, Jun 4, 2017 at 12:32 PM, Dong Lin <lindon...@gmail.com> wrote:
> >
> > > Hey Yi, Navina,
> > >
> > > I have updated the SEP-5 document based on our discussion. The
> difference
> > > can be found here
> > > <https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?
> > > pageId=70255476&selectedPageVersions=14&selectedPageVersions=15>.
> > > Here is the summary of changes:
> > >
> > > - Add new interface that extends the existing interface
> > > SystemStreamPartitionGrouper. Newly-added grouper class should
> implement
> > > this interface.
> > > - Explained in the Rejected Alternative Section why we don't add new
> > method
> > > in the existing interface
> > > - Explained in the Rejected Alternative Section why we don't
> config/class
> > > for user to specify new-partition to old-partition mapping.
> > >
> > > Can you take another look at the proposal and let me know if there is
> any
> > > concern?
> > >
> > > Cheers,
> > > Dong
> > >
> > >
> > > On Thu, Jun 1, 2017 at 12:58 AM, Dong Lin <lindon...@gmail.com> wrote:
> > >
> > > > Hey Yi,
> > > >
> > > > Thanks much for the comment. I have updated the doc to address all
> your
> > > > comments except the one related to the interface. I am not sure I
> > > > understand your suggestion of the new interface. Will discuss
> tomorrow.
> > > >
> > > > Thanks,
> > > > Dong
> > > >
> > > > On Wed, May 31, 2017 at 4:29 PM, Yi Pan <nickpa...@gmail.com> wrote:
> > > >
> > > >> Hi, Don,
> > > >>
> > > >> Thanks for the detailed design doc for a long-waited feature in
> Samza!
> > > >> Really appreciate it! I did a quick pass and have the following
> > > comments:
> > > >>
> > > >> - minor: "limit the maximum size of partition" ==> "limit the
> maximum
> > > size
> > > >> of each partition"
> > > >> - "However, Samza currently is not able to handle partition
> expansion
> > of
> > > >> the input streams"==>better point out "for stateful jobs". For
> > stateless
> > > >> jobs, simply bouncing the job now can pick up the new partitions.
> > > >> - "it is possible (e.g. with Kafka) that messages with a given key
> > > exists
> > > >> in both partition 1 an 3. Because GroupByPartition will assign
> > > partition 1
> > > >> and 3 to different tasks, messages with the same key may be handled
> by
> > > >> different task/container/process and their state will be stored in
> > > >> different changelog partition." The problem statement is not super
> > clear
> > > >> here. The issues with stateful jobs is: after GroupByPartition
> assign
> > > >> partition 1 and 3 to different tasks, the new task handling
> partition
> > 3
> > > >> does not have the previous state to resume the work. e.g. a page-key
> > > based
> > > >> counter would start from 0 in the new task for a specific key,
> instead
> > > of
> > > >> resuming the previous count 50 held by task 1.
> > > >> - minor rewording: "the first solution in this doc" ==> "the
> solution
> > > >> proposed in this doc"
> > > >> - "Thus additional development work is needed in Kafka to meet this
> > > >> requirement" It would be good to link to a KIP if and when it exists
> > > >> - Instead of touching/deprecating the interface
> > > >> SystemStreamPartitionGrouper, I would recommend to have a different
> > > >> implementation class of the interface, which in the constructor of
> the
> > > >> grouper, takes two parameters: a) the previous task number read from
> > the
> > > >> coordinator stream; b) the configured new-partition to old-partition
> > > >> mapping policy. Then, the grouper's interface method stays the same
> > and
> > > >> the
> > > >> behavior of the grouper is more configurable which is good to
> support
> > a
> > > >> broader set of use cases in addition to Kafka's built-in partition
> > > >> expansion policies.
> > > >> - Minor renaming suggestion to the new grouper class names:
> > > >> GroupByPartitionWithFixedTaskNum
> > > >> and GroupBySystemStreamPartitionWithFixedTaskNum
> > > >>
> > > >> Thanks!
> > > >>
> > > >> - Yi
> > > >>
> > > >> On Wed, May 31, 2017 at 10:33 AM, Dong Lin <lindon...@gmail.com>
> > wrote:
> > > >>
> > > >> > Hey Navina,
> > > >> >
> > > >> > Thanks much for the comment. Please see my response below.
> > > >> >
> > > >> > Regarding your biggest gripe with the SEP, I personally think the
> > > >> > operational requirement proposed in the KIP are pretty general and
> > > >> could be
> > > >> > easily enforced by other systems. The reason is that the module
> > > >> operation
> > > >> > is pretty standard and the default option when we choose
> partition.
> > > And
> > > >> > usually the underlying system allows user to select arbitrary
> > > partition
> > > >> > number if it supports partition expansion. Do you know any system
> > that
> > > >> does
> > > >> > not meet these two requirement?
> > > >> >
> > > >> > Regarding your comment of the Motivation section, I renamed the
> > first
> > > >> > section as "Problem and Goal" and specified that "*The goal of
> this
> > > >> > proposal is to enable partition expansion of the input
> streams*.". I
> > > >> also
> > > >> > put a sentence at the end of the Motivation section that "*The
> > feature
> > > >> of
> > > >> > task expansion is out of the scope of this proposal and will be
> > > >> addressed
> > > >> > in a future SEP*". The second paragraph in the Motivation section
> is
> > > >> mainly
> > > >> > used to explain the thinking process that we have gone through,
> what
> > > >> other
> > > >> > alternative we have considered, and we plan to do in Samza in the
> > nex
> > > >> step.
> > > >> >
> > > >> > To answer your question why increasing the partition number will
> > > >> increase
> > > >> > the throughput of the kafka consumer in the container, Kafka
> > consumer
> > > >> can
> > > >> > potentially fetch more data in one FetchResponse with more
> > partitions
> > > in
> > > >> > the FetchRequest. This is because we limit the maximum amount of
> > data
> > > >> that
> > > >> > can be fetch for a given partition in the FetchResponse. This by
> > > >> default is
> > > >> > set to 1 MB. And there is reason that we can not arbitrarily bump
> up
> > > >> this
> > > >> > limit.
> > > >> >
> > > >> > To answer your question how partition expansion in Kafka impacts
> the
> > > >> > clients, Kafka consumer is able to automatically detect new
> > partition
> > > of
> > > >> > the topic and reassign all (both old and new) partitions across
> > > >> consumers
> > > >> > in the consumer group IF you tell consumer the topic to be
> > subscribed.
> > > >> But
> > > >> > consumer in Samza's container uses another way of subscription.
> > > Instead
> > > >> of
> > > >> > subscribing to the topic, the consumer in Samza's container
> > subscribes
> > > >> to
> > > >> > the specific partitions of the topic. In this case, if new
> > partitions
> > > >> have
> > > >> > been added, Samza will need to explicitly subscribe to the new
> > > >> partitions
> > > >> > of the topic. The "Handle partition expansion while tasks are
> > running"
> > > >> > section in the SEP addresses this issue in Samza -- it
> recalculates
> > > the
> > > >> job
> > > >> > model and restart container so that consumer can subscribe to the
> > new
> > > >> > partitions.
> > > >> >
> > > >> > I will ask other dev to take a look at the proposal. I will start
> > the
> > > >> > voting thread tomorrow if there is no further concern with the
> SEP.
> > > >> >
> > > >> > Thanks!
> > > >> > Dong
> > > >> >
> > > >> >
> > > >> > On Wed, May 31, 2017 at 12:01 AM, Navina Ramesh (Apache) <
> > > >> > nav...@apache.org>
> > > >> > wrote:
> > > >> >
> > > >> > > Hey Dong,
> > > >> > >
> > > >> > > >  I have updated the motivation section to clarify this.
> > > >> > >
> > > >> > > Thanks for updating the motivation. Couple of notes here:
> > > >> > >
> > > >> > > 1.
> > > >> > > > "The motivation of increasing partition number of Kafka topic
> > > >> includes
> > > >> > 1)
> > > >> > > limit the maximum size of a partition in order to improve broker
> > > >> > > performance and 2) increase throughput of Kafka consumer in the
> > > Samza
> > > >> > > container."
> > > >> > >
> > > >> > > It's unclear to me how increasing the partition number will
> > increase
> > > >> the
> > > >> > > throughput of the kafka consumer in the container?
> Theoretically,
> > > you
> > > >> > will
> > > >> > > still be consuming the same amount of data in the container,
> > > >> irrespective
> > > >> > > of whether it is coming from one partition or more than one
> > expanded
> > > >> > > partitions. Can you please explain it for me here, what you mean
> > by
> > > >> that?
> > > >> > >
> > > >> > > 2. I believe the second paragraph under motivation is simply
> > talking
> > > >> > about
> > > >> > > the scope of the current SEP. It will be easier to read if what
> > > >> solution
> > > >> > is
> > > >> > > included in this SEP and what is left out as not in scope. (for
> > > >> example,
> > > >> > > expansions for stateful jobs is supported or not).
> > > >> > >
> > > >> > > > We need to persist the task-to-sspList mapping in the
> > > >> > > coordinator stream so that the job can derive the original
> number
> > of
> > > >> > > partitions of each input stream regardless of how many times the
> > > >> > partition
> > > >> > > has expanded. Does this make sense?
> > > >> > >
> > > >> > > Yes. It does!
> > > >> > >
> > > >> > > > I am not sure how this is related to the locality though. Can
> > you
> > > >> > clarify
> > > >> > > your question if I haven't answered your question?
> > > >> > >
> > > >> > > It's not related. I just meant to give an example of yet another
> > > >> > > coordinator message that is persisted. Your ssp-to-task mapping
> is
> > > >> > > following a similar pattern for persisting. Just wanted to
> clarify
> > > >> that.
> > > >> > >
> > > >> > > > Can you let me know if this, together with the answers in the
> > > >> previous
> > > >> > > email, addresses all your questions?
> > > >> > >
> > > >> > > Yes. I believe you have addressed most of my questions. Thanks
> for
> > > >> taking
> > > >> > > time to do that.
> > > >> > >
> > > >> > > > Is there specific question you have regarding partition
> > > >> > > expansion in Kafka?
> > > >> > >
> > > >> > > I guess my questions are on how partition expansion in Kafka
> > impacts
> > > >> the
> > > >> > > clients. Iiuc, partition expansions are done manually in Kafka
> > based
> > > >> on
> > > >> > the
> > > >> > > bytes-in rate of the partition. Do the existing kafka clients
> > handle
> > > >> this
> > > >> > > expansion automatically? if yes, how does it work? If not, are
> > there
> > > >> > plans
> > > >> > > to support it in the future?
> > > >> > >
> > > >> > > > Thus user's job should not need to bootstrap key/value store
> > from
> > > >> the
> > > >> > > changelog topic.
> > > >> > >
> > > >> > > Why is this discussion relevant here? Key/value store /
> changelog
> > > >> topic
> > > >> > > partition is scoped with the context of a task. Since we are not
> > > >> changing
> > > >> > > the number of tasks, I don't think it is required to mention it
> > > here.
> > > >> > >
> > > >> > > > The new method takes the SystemStreamPartition-to-Task
> > assignment
> > > >> from
> > > >> > > the previous job model which can be read from the coordinator
> > > stream.
> > > >> > >
> > > >> > > Jobmodel is currently not persisted to coordinator stream. In
> your
> > > >> > design,
> > > >> > > you talk about writing separate coordinator messages for
> > ssp-to-task
> > > >> > > assignments. Hence, please correct this statement. It is kind of
> > > >> > misleading
> > > >> > > to the reader.
> > > >> > >
> > > >> > > My biggest gripe with this SEP is that it seems like a
> tailor-made
> > > >> > solution
> > > >> > > that relies on the semantics of the Kafka system and yet, we are
> > > >> trying
> > > >> > to
> > > >> > > masquerade that as operational requirements for other systems
> > > >> interacting
> > > >> > > with Samza. (Not to say that this is the first time such a
> choice
> > is
> > > >> > being
> > > >> > > made in the Samza design). I am not seeing how this can a
> > "general"
> > > >> > > solution for all input systems. That's my two cents. I would
> like
> > to
> > > >> hear
> > > >> > > alternative points of view for this from other devs.
> > > >> > >
> > > >> > > Please make sure you have enough eyes on this SEP. If you do,
> > please
> > > >> > start
> > > >> > > a VOTE thread to approve this SEP.
> > > >> > >
> > > >> > > Thanks!
> > > >> > > Navina
> > > >> > >
> > > >> > >
> > > >> > > On Mon, May 29, 2017 at 12:32 AM, Dong Lin <lindon...@gmail.com
> >
> > > >> wrote:
> > > >> > >
> > > >> > > > Hey Navina,
> > > >> > > >
> > > >> > > > I have updated the wiki based on your suggestion. More
> > > >> specifically, I
> > > >> > > have
> > > >> > > > made the following changes:
> > > >> > > >
> > > >> > > > - Improved Problem section and Motivation section to describe
> > why
> > > we
> > > >> > use
> > > >> > > > the solution in this proposal instead of tackling the problem
> of
> > > >> task
> > > >> > > > expansion directly.
> > > >> > > >
> > > >> > > > - Illustrate the design in a way that doesn't bind to Kafka.
> > Kafka
> > > >> is
> > > >> > > only
> > > >> > > > used as example to illustrate why we want to expand partition
> > > >> expansion
> > > >> > > and
> > > >> > > > whether the operational requirement can be supported when
> Kafka
> > is
> > > >> used
> > > >> > > as
> > > >> > > > the input system. Note that the proposed solution should work
> > for
> > > >> any
> > > >> > > input
> > > >> > > > system that meets the operational requirement described in the
> > > wiki.
> > > >> > > >
> > > >> > > > - Fixed the problem in the figure.
> > > >> > > >
> > > >> > > > - Added a new class GroupBySystemStreamPartitionFixedTaskNum
> to
> > > the
> > > >> > > wiki.
> > > >> > > > Together with GroupByPartitionFixedTaskNum, it should ensure
> > that
> > > we
> > > >> > > have a
> > > >> > > > solution to enable partition expansion for all users that are
> > > using
> > > >> > > > pre-defined grouper in Samza. Note that those users who use
> > custom
> > > >> > > grouper
> > > >> > > > would need to update their implementation.
> > > >> > > >
> > > >> > > > Can you let me know if this, together with the answers in the
> > > >> previous
> > > >> > > > email, addresses all your questions? Thanks for taking time to
> > > >> review
> > > >> > the
> > > >> > > > proposal.
> > > >> > > >
> > > >> > > > Regards,
> > > >> > > > Dong
> > > >> > > >
> > > >> > > >
> > > >> > > >
> > > >> > > >
> > > >> > > >
> > > >> > > >
> > > >> > > >
> > > >> > > > On Wed, May 24, 2017 at 11:15 PM, Dong Lin <
> lindon...@gmail.com
> > >
> > > >> > wrote:
> > > >> > > >
> > > >> > > > > Hey Navina,
> > > >> > > > >
> > > >> > > > > Thanks much for your comments. Please see my reply inline.
> > > >> > > > >
> > > >> > > > > On Wed, May 24, 2017 at 10:22 AM, Navina Ramesh (Apache) <
> > > >> > > > > nav...@apache.org> wrote:
> > > >> > > > >
> > > >> > > > >> Thanks for the SEP, Dong. I have a couple of questions to
> > > >> understand
> > > >> > > > your
> > > >> > > > >> proposal better:
> > > >> > > > >>
> > > >> > > > >> * Under motivation, you mention that "_We expect this
> > solution
> > > to
> > > >> > work
> > > >> > > > >> similarly with other input system as well._", yet I don't
> see
> > > any
> > > >> > > > >> discussion on how it will work with other input systems.
> That
> > > is,
> > > >> > what
> > > >> > > > >> kind
> > > >> > > > >> of contract does samza expect from other input systems ? If
> > we
> > > >> are
> > > >> > not
> > > >> > > > >> planning to provide a generic solution, it might be worth
> > > >> calling it
> > > >> > > out
> > > >> > > > >> in
> > > >> > > > >> the SEP.
> > > >> > > > >>
> > > >> > > > >
> > > >> > > > > I think the contract we expect from other systems are
> exactly
> > > the
> > > >> > > > > operational requirement mentioned in the SEP, i.e.
> partitions
> > > >> should
> > > >> > > > always
> > > >> > > > > be doubled and the hash algorithm should module the number
> of
> > > >> > > partitions.
> > > >> > > > > SEP-5 should also allow partition expansion of all input
> > systems
> > > >> that
> > > >> > > > meet
> > > >> > > > > these two requirements. I have updated the motivation
> section
> > to
> > > >> > > clarify
> > > >> > > > > this.
> > > >> > > > >
> > > >> > > > >
> > > >> > > > >>
> > > >> > > > >> * I understand the partition mapping logic you have
> proposed.
> > > >> But I
> > > >> > > > think
> > > >> > > > >> the example explanation doesn't match the diagram. In the
> > > >> diagram,
> > > >> > > after
> > > >> > > > >> expansion, partiion-0 and partition-1 are pointing to
> bucket
> > 0
> > > >> and
> > > >> > > > >> partition-3 and partition-4 are pointing to bucket 1. I
> think
> > > the
> > > >> > > former
> > > >> > > > >> has to be partition-0 and partition-2 and the latter, is
> > > >> partition-1
> > > >> > > and
> > > >> > > > >> partition-3. If I am wrong, please help me understand the
> > logic
> > > >> :)
> > > >> > > > >>
> > > >> > > > >
> > > >> > > > > Good catch. I will update the figure to fix this problem.
> > > >> > > > >
> > > >> > > > >
> > > >> > > > >>
> > > >> > > > >> * I don't know how partition expansion in Kafka works. I am
> > > >> familiar
> > > >> > > > with
> > > >> > > > >> how shard splitting happens in Kinesis - there is
> > hierarchical
> > > >> > > relation
> > > >> > > > >> between the parent and child shards. This way, it will also
> > > allow
> > > >> > the
> > > >> > > > >> shards to be merged back. Iiuc, Kafka only supports
> partition
> > > >> > > > "expansion",
> > > >> > > > >> as opposed to "splits". Can you provide some context or
> link
> > > >> related
> > > >> > > to
> > > >> > > > >> how
> > > >> > > > >> partition expansion works in Kafka?
> > > >> > > > >>
> > > >> > > > >
> > > >> > > > > I couldn't find any wiki on partition expansion in Kafka.
> The
> > > >> > partition
> > > >> > > > > expansion logic in Kafka is very simply -- it simply adds
> new
> > > >> > partition
> > > >> > > > to
> > > >> > > > > the existing topic. Is there specific question you have
> > > regarding
> > > >> > > > partition
> > > >> > > > > expansion in Kafka?
> > > >> > > > >
> > > >> > > > >
> > > >> > > > >>
> > > >> > > > >> * Are you only recommending that expansion can be supported
> > for
> > > >> > samza
> > > >> > > > jobs
> > > >> > > > >> that use Kafka as input systems **and** configure the
> > > SSPGrouper
> > > >> as
> > > >> > > > >> GroupByPartitionFixedTaskNum? Sounds to me like this only
> > > applies
> > > >> > for
> > > >> > > > >> GroupByPartition. Please correct me if I am wrong. What is
> > the
> > > >> > > > expectation
> > > >> > > > >> for custom SSP Groupers?
> > > >> > > > >>
> > > >> > > > >
> > > >> > > > > The expansion can be supported for Samza jobs if the input
> > > system
> > > >> > meets
> > > >> > > > > the operational requirement mentioned above. It doesn't have
> > to
> > > >> use
> > > >> > > Kafka
> > > >> > > > > as input system.
> > > >> > > > >
> > > >> > > > > The current proposal provided solution for jobs that
> currently
> > > use
> > > >> > > > > GroupByPartition. The proposal can be extended to support
> jobs
> > > >> that
> > > >> > use
> > > >> > > > > other grouper that are pre-defined in Samza. The custom SSP
> > > >> grouper
> > > >> > > needs
> > > >> > > > > to handle partition expansion similar to how
> > > >> > > GroupByPartitionFixedTaskNum
> > > >> > > > > handles it and it is users' responsibility to update their
> > > custom
> > > >> > > grouper
> > > >> > > > > implementation.
> > > >> > > > >
> > > >> > > > >
> > > >> > > > >>
> > > >> > > > >> * Regarding storing SSP-to-Task assignment to coordinator
> > > stream:
> > > >> > > Today,
> > > >> > > > >> the JobModel encapsulates the data model in samza which
> also
> > > >> > includes
> > > >> > > > >> **TaskModels**. TaskModel, typically shows the
> > task-to-sspList
> > > >> > > mapping.
> > > >> > > > >> What is the reason for using a separate coordinator stream
> > > >> message
> > > >> > > > >> *SetSSPTaskMapping*? Is it because the JobModel itself is
> not
> > > >> > > persisted
> > > >> > > > in
> > > >> > > > >> the coordinator stream today?  The reason locality exists
> > > >> outside of
> > > >> > > the
> > > >> > > > >> jobmodel is because *locality* information is written by
> each
> > > >> > > container,
> > > >> > > > >> where as it is consumed only by the leader
> jobcoordinator/AM.
> > > In
> > > >> > this
> > > >> > > > >> case,
> > > >> > > > >> the writer of the mapping information and the reader is
> still
> > > the
> > > >> > > leader
> > > >> > > > >> jobcoordinator/AM. So, I want to understand the motivation
> > for
> > > >> this
> > > >> > > > >> choice.
> > > >> > > > >>
> > > >> > > > >
> > > >> > > > > Yes, the reason for using a separate coordinate stream
> message
> > > is
> > > >> > > because
> > > >> > > > > the task-to-sspList mapping is not currently persisted in
> the
> > > >> > > coordinator
> > > >> > > > > stream. We wouldn't need to create this new stream message
> if
> > > >> > JobModel
> > > >> > > is
> > > >> > > > > persisted. We need to persist the task-to-sspList mapping in
> > the
> > > >> > > > > coordinator stream so that the job can derive the original
> > > number
> > > >> of
> > > >> > > > > partitions of each input stream regardless of how many times
> > the
> > > >> > > > partition
> > > >> > > > > has expanded. Does this make sense?
> > > >> > > > >
> > > >> > > > > I am not sure how this is related to the locality though.
> Can
> > > you
> > > >> > > clarify
> > > >> > > > > your question if I haven't answered your question?
> > > >> > > > >
> > > >> > > > > Thanks!
> > > >> > > > > Dong
> > > >> > > > >
> > > >> > > > >
> > > >> > > > >>
> > > >> > > > >> Cheers!
> > > >> > > > >> Navina
> > > >> > > > >>
> > > >> > > > >> On Tue, May 23, 2017 at 5:45 PM, Dong Lin <
> > lindon...@gmail.com
> > > >
> > > >> > > wrote:
> > > >> > > > >>
> > > >> > > > >> > Hi all,
> > > >> > > > >> >
> > > >> > > > >> > We created SEP-5: Enable partition expansion of input
> > > streams.
> > > >> > > Please
> > > >> > > > >> find
> > > >> > > > >> > the SEP wiki in the link
> > > >> > > > >> > https://cwiki.apache.org/confluence/display/SAMZA/SEP-
> > > >> > > > >> > 5%3A+Enable+partition+expansion+of+input+streams
> > > >> > > > >> > .
> > > >> > > > >> >
> > > >> > > > >> > You feedback is appreciated!
> > > >> > > > >> >
> > > >> > > > >> > Thanks,
> > > >> > > > >> > Dong
> > > >> > > > >> >
> > > >> > > > >>
> > > >> > > > >
> > > >> > > > >
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > > >
> > > >
> > >
> >
>

Reply via email to