Dong,

Thanks for your prompt responses.

>  And usually the underlying system allows user to select arbitrary
partition
number if it supports partition expansion. Do you know any system that does not
meet these two requirement?

I am not aware of a system that won't meet the modulo requirement. I was
mostly questioning the requirement around *Stream Management* - which
expects the expansion of partitions to always happen by doubling the
partition count. That is different from saying "underlying system allows
user to select arbitrary partition number if it support partition
expansion". Please correct me if I have misunderstood what you meant in
that requirement :)

> Regarding your comment of the Motivation section

Thanks for updating it.

> , Kafka consumer can potentially fetch more data in one FetchResponse
with more partitions in the FetchRequest. This is because we limit the
maximum amount of data that can be fetch for a given partition in the
FetchResponse.

That makes sense. I didn't know that you had this reasoning in mind. Thanks
for explaining.

>To answer your question how partition expansion in Kafka impacts the
clients, Kafka consumer is able to automatically detect new partition of
the topic and reassign all (both old and new) partitions across consumers
in the consumer group IF you tell consumer the topic to be subscribed. But
consumer in Samza's container uses another way of subscription.

Got it.

Thanks!
Navina



On Wed, May 31, 2017 at 4:29 PM, Yi Pan <nickpa...@gmail.com> wrote:

> Hi, Don,
>
> Thanks for the detailed design doc for a long-waited feature in Samza!
> Really appreciate it! I did a quick pass and have the following comments:
>
> - minor: "limit the maximum size of partition" ==> "limit the maximum size
> of each partition"
> - "However, Samza currently is not able to handle partition expansion of
> the input streams"==>better point out "for stateful jobs". For stateless
> jobs, simply bouncing the job now can pick up the new partitions.
> - "it is possible (e.g. with Kafka) that messages with a given key exists
> in both partition 1 an 3. Because GroupByPartition will assign partition 1
> and 3 to different tasks, messages with the same key may be handled by
> different task/container/process and their state will be stored in
> different changelog partition." The problem statement is not super clear
> here. The issues with stateful jobs is: after GroupByPartition assign
> partition 1 and 3 to different tasks, the new task handling partition 3
> does not have the previous state to resume the work. e.g. a page-key based
> counter would start from 0 in the new task for a specific key, instead of
> resuming the previous count 50 held by task 1.
> - minor rewording: "the first solution in this doc" ==> "the solution
> proposed in this doc"
> - "Thus additional development work is needed in Kafka to meet this
> requirement" It would be good to link to a KIP if and when it exists
> - Instead of touching/deprecating the interface
> SystemStreamPartitionGrouper, I would recommend to have a different
> implementation class of the interface, which in the constructor of the
> grouper, takes two parameters: a) the previous task number read from the
> coordinator stream; b) the configured new-partition to old-partition
> mapping policy. Then, the grouper's interface method stays the same and the
> behavior of the grouper is more configurable which is good to support a
> broader set of use cases in addition to Kafka's built-in partition
> expansion policies.
> - Minor renaming suggestion to the new grouper class names:
> GroupByPartitionWithFixedTaskNum
> and GroupBySystemStreamPartitionWithFixedTaskNum
>
> Thanks!
>
> - Yi
>
> On Wed, May 31, 2017 at 10:33 AM, Dong Lin <lindon...@gmail.com> wrote:
>
> > Hey Navina,
> >
> > Thanks much for the comment. Please see my response below.
> >
> > Regarding your biggest gripe with the SEP, I personally think the
> > operational requirement proposed in the KIP are pretty general and could
> be
> > easily enforced by other systems. The reason is that the module operation
> > is pretty standard and the default option when we choose partition. And
> > usually the underlying system allows user to select arbitrary partition
> > number if it supports partition expansion. Do you know any system that
> does
> > not meet these two requirement?
> >
> > Regarding your comment of the Motivation section, I renamed the first
> > section as "Problem and Goal" and specified that "*The goal of this
> > proposal is to enable partition expansion of the input streams*.". I also
> > put a sentence at the end of the Motivation section that "*The feature of
> > task expansion is out of the scope of this proposal and will be addressed
> > in a future SEP*". The second paragraph in the Motivation section is
> mainly
> > used to explain the thinking process that we have gone through, what
> other
> > alternative we have considered, and we plan to do in Samza in the nex
> step.
> >
> > To answer your question why increasing the partition number will increase
> > the throughput of the kafka consumer in the container, Kafka consumer can
> > potentially fetch more data in one FetchResponse with more partitions in
> > the FetchRequest. This is because we limit the maximum amount of data
> that
> > can be fetch for a given partition in the FetchResponse. This by default
> is
> > set to 1 MB. And there is reason that we can not arbitrarily bump up this
> > limit.
> >
> > To answer your question how partition expansion in Kafka impacts the
> > clients, Kafka consumer is able to automatically detect new partition of
> > the topic and reassign all (both old and new) partitions across consumers
> > in the consumer group IF you tell consumer the topic to be subscribed.
> But
> > consumer in Samza's container uses another way of subscription. Instead
> of
> > subscribing to the topic, the consumer in Samza's container subscribes to
> > the specific partitions of the topic. In this case, if new partitions
> have
> > been added, Samza will need to explicitly subscribe to the new partitions
> > of the topic. The "Handle partition expansion while tasks are running"
> > section in the SEP addresses this issue in Samza -- it recalculates the
> job
> > model and restart container so that consumer can subscribe to the new
> > partitions.
> >
> > I will ask other dev to take a look at the proposal. I will start the
> > voting thread tomorrow if there is no further concern with the SEP.
> >
> > Thanks!
> > Dong
> >
> >
> > On Wed, May 31, 2017 at 12:01 AM, Navina Ramesh (Apache) <
> > nav...@apache.org>
> > wrote:
> >
> > > Hey Dong,
> > >
> > > >  I have updated the motivation section to clarify this.
> > >
> > > Thanks for updating the motivation. Couple of notes here:
> > >
> > > 1.
> > > > "The motivation of increasing partition number of Kafka topic
> includes
> > 1)
> > > limit the maximum size of a partition in order to improve broker
> > > performance and 2) increase throughput of Kafka consumer in the Samza
> > > container."
> > >
> > > It's unclear to me how increasing the partition number will increase
> the
> > > throughput of the kafka consumer in the container? Theoretically, you
> > will
> > > still be consuming the same amount of data in the container,
> irrespective
> > > of whether it is coming from one partition or more than one expanded
> > > partitions. Can you please explain it for me here, what you mean by
> that?
> > >
> > > 2. I believe the second paragraph under motivation is simply talking
> > about
> > > the scope of the current SEP. It will be easier to read if what
> solution
> > is
> > > included in this SEP and what is left out as not in scope. (for
> example,
> > > expansions for stateful jobs is supported or not).
> > >
> > > > We need to persist the task-to-sspList mapping in the
> > > coordinator stream so that the job can derive the original number of
> > > partitions of each input stream regardless of how many times the
> > partition
> > > has expanded. Does this make sense?
> > >
> > > Yes. It does!
> > >
> > > > I am not sure how this is related to the locality though. Can you
> > clarify
> > > your question if I haven't answered your question?
> > >
> > > It's not related. I just meant to give an example of yet another
> > > coordinator message that is persisted. Your ssp-to-task mapping is
> > > following a similar pattern for persisting. Just wanted to clarify
> that.
> > >
> > > > Can you let me know if this, together with the answers in the
> previous
> > > email, addresses all your questions?
> > >
> > > Yes. I believe you have addressed most of my questions. Thanks for
> taking
> > > time to do that.
> > >
> > > > Is there specific question you have regarding partition
> > > expansion in Kafka?
> > >
> > > I guess my questions are on how partition expansion in Kafka impacts
> the
> > > clients. Iiuc, partition expansions are done manually in Kafka based on
> > the
> > > bytes-in rate of the partition. Do the existing kafka clients handle
> this
> > > expansion automatically? if yes, how does it work? If not, are there
> > plans
> > > to support it in the future?
> > >
> > > > Thus user's job should not need to bootstrap key/value store from the
> > > changelog topic.
> > >
> > > Why is this discussion relevant here? Key/value store / changelog topic
> > > partition is scoped with the context of a task. Since we are not
> changing
> > > the number of tasks, I don't think it is required to mention it here.
> > >
> > > > The new method takes the SystemStreamPartition-to-Task assignment
> from
> > > the previous job model which can be read from the coordinator stream.
> > >
> > > Jobmodel is currently not persisted to coordinator stream. In your
> > design,
> > > you talk about writing separate coordinator messages for ssp-to-task
> > > assignments. Hence, please correct this statement. It is kind of
> > misleading
> > > to the reader.
> > >
> > > My biggest gripe with this SEP is that it seems like a tailor-made
> > solution
> > > that relies on the semantics of the Kafka system and yet, we are trying
> > to
> > > masquerade that as operational requirements for other systems
> interacting
> > > with Samza. (Not to say that this is the first time such a choice is
> > being
> > > made in the Samza design). I am not seeing how this can a "general"
> > > solution for all input systems. That's my two cents. I would like to
> hear
> > > alternative points of view for this from other devs.
> > >
> > > Please make sure you have enough eyes on this SEP. If you do, please
> > start
> > > a VOTE thread to approve this SEP.
> > >
> > > Thanks!
> > > Navina
> > >
> > >
> > > On Mon, May 29, 2017 at 12:32 AM, Dong Lin <lindon...@gmail.com>
> wrote:
> > >
> > > > Hey Navina,
> > > >
> > > > I have updated the wiki based on your suggestion. More specifically,
> I
> > > have
> > > > made the following changes:
> > > >
> > > > - Improved Problem section and Motivation section to describe why we
> > use
> > > > the solution in this proposal instead of tackling the problem of task
> > > > expansion directly.
> > > >
> > > > - Illustrate the design in a way that doesn't bind to Kafka. Kafka is
> > > only
> > > > used as example to illustrate why we want to expand partition
> expansion
> > > and
> > > > whether the operational requirement can be supported when Kafka is
> used
> > > as
> > > > the input system. Note that the proposed solution should work for any
> > > input
> > > > system that meets the operational requirement described in the wiki.
> > > >
> > > > - Fixed the problem in the figure.
> > > >
> > > > - Added a new class GroupBySystemStreamPartitionFixedTaskNum to the
> > > wiki.
> > > > Together with GroupByPartitionFixedTaskNum, it should ensure that we
> > > have a
> > > > solution to enable partition expansion for all users that are using
> > > > pre-defined grouper in Samza. Note that those users who use custom
> > > grouper
> > > > would need to update their implementation.
> > > >
> > > > Can you let me know if this, together with the answers in the
> previous
> > > > email, addresses all your questions? Thanks for taking time to review
> > the
> > > > proposal.
> > > >
> > > > Regards,
> > > > Dong
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > On Wed, May 24, 2017 at 11:15 PM, Dong Lin <lindon...@gmail.com>
> > wrote:
> > > >
> > > > > Hey Navina,
> > > > >
> > > > > Thanks much for your comments. Please see my reply inline.
> > > > >
> > > > > On Wed, May 24, 2017 at 10:22 AM, Navina Ramesh (Apache) <
> > > > > nav...@apache.org> wrote:
> > > > >
> > > > >> Thanks for the SEP, Dong. I have a couple of questions to
> understand
> > > > your
> > > > >> proposal better:
> > > > >>
> > > > >> * Under motivation, you mention that "_We expect this solution to
> > work
> > > > >> similarly with other input system as well._", yet I don't see any
> > > > >> discussion on how it will work with other input systems. That is,
> > what
> > > > >> kind
> > > > >> of contract does samza expect from other input systems ? If we are
> > not
> > > > >> planning to provide a generic solution, it might be worth calling
> it
> > > out
> > > > >> in
> > > > >> the SEP.
> > > > >>
> > > > >
> > > > > I think the contract we expect from other systems are exactly the
> > > > > operational requirement mentioned in the SEP, i.e. partitions
> should
> > > > always
> > > > > be doubled and the hash algorithm should module the number of
> > > partitions.
> > > > > SEP-5 should also allow partition expansion of all input systems
> that
> > > > meet
> > > > > these two requirements. I have updated the motivation section to
> > > clarify
> > > > > this.
> > > > >
> > > > >
> > > > >>
> > > > >> * I understand the partition mapping logic you have proposed. But
> I
> > > > think
> > > > >> the example explanation doesn't match the diagram. In the diagram,
> > > after
> > > > >> expansion, partiion-0 and partition-1 are pointing to bucket 0 and
> > > > >> partition-3 and partition-4 are pointing to bucket 1. I think the
> > > former
> > > > >> has to be partition-0 and partition-2 and the latter, is
> partition-1
> > > and
> > > > >> partition-3. If I am wrong, please help me understand the logic :)
> > > > >>
> > > > >
> > > > > Good catch. I will update the figure to fix this problem.
> > > > >
> > > > >
> > > > >>
> > > > >> * I don't know how partition expansion in Kafka works. I am
> familiar
> > > > with
> > > > >> how shard splitting happens in Kinesis - there is hierarchical
> > > relation
> > > > >> between the parent and child shards. This way, it will also allow
> > the
> > > > >> shards to be merged back. Iiuc, Kafka only supports partition
> > > > "expansion",
> > > > >> as opposed to "splits". Can you provide some context or link
> related
> > > to
> > > > >> how
> > > > >> partition expansion works in Kafka?
> > > > >>
> > > > >
> > > > > I couldn't find any wiki on partition expansion in Kafka. The
> > partition
> > > > > expansion logic in Kafka is very simply -- it simply adds new
> > partition
> > > > to
> > > > > the existing topic. Is there specific question you have regarding
> > > > partition
> > > > > expansion in Kafka?
> > > > >
> > > > >
> > > > >>
> > > > >> * Are you only recommending that expansion can be supported for
> > samza
> > > > jobs
> > > > >> that use Kafka as input systems **and** configure the SSPGrouper
> as
> > > > >> GroupByPartitionFixedTaskNum? Sounds to me like this only applies
> > for
> > > > >> GroupByPartition. Please correct me if I am wrong. What is the
> > > > expectation
> > > > >> for custom SSP Groupers?
> > > > >>
> > > > >
> > > > > The expansion can be supported for Samza jobs if the input system
> > meets
> > > > > the operational requirement mentioned above. It doesn't have to use
> > > Kafka
> > > > > as input system.
> > > > >
> > > > > The current proposal provided solution for jobs that currently use
> > > > > GroupByPartition. The proposal can be extended to support jobs that
> > use
> > > > > other grouper that are pre-defined in Samza. The custom SSP grouper
> > > needs
> > > > > to handle partition expansion similar to how
> > > GroupByPartitionFixedTaskNum
> > > > > handles it and it is users' responsibility to update their custom
> > > grouper
> > > > > implementation.
> > > > >
> > > > >
> > > > >>
> > > > >> * Regarding storing SSP-to-Task assignment to coordinator stream:
> > > Today,
> > > > >> the JobModel encapsulates the data model in samza which also
> > includes
> > > > >> **TaskModels**. TaskModel, typically shows the task-to-sspList
> > > mapping.
> > > > >> What is the reason for using a separate coordinator stream message
> > > > >> *SetSSPTaskMapping*? Is it because the JobModel itself is not
> > > persisted
> > > > in
> > > > >> the coordinator stream today?  The reason locality exists outside
> of
> > > the
> > > > >> jobmodel is because *locality* information is written by each
> > > container,
> > > > >> where as it is consumed only by the leader jobcoordinator/AM. In
> > this
> > > > >> case,
> > > > >> the writer of the mapping information and the reader is still the
> > > leader
> > > > >> jobcoordinator/AM. So, I want to understand the motivation for
> this
> > > > >> choice.
> > > > >>
> > > > >
> > > > > Yes, the reason for using a separate coordinate stream message is
> > > because
> > > > > the task-to-sspList mapping is not currently persisted in the
> > > coordinator
> > > > > stream. We wouldn't need to create this new stream message if
> > JobModel
> > > is
> > > > > persisted. We need to persist the task-to-sspList mapping in the
> > > > > coordinator stream so that the job can derive the original number
> of
> > > > > partitions of each input stream regardless of how many times the
> > > > partition
> > > > > has expanded. Does this make sense?
> > > > >
> > > > > I am not sure how this is related to the locality though. Can you
> > > clarify
> > > > > your question if I haven't answered your question?
> > > > >
> > > > > Thanks!
> > > > > Dong
> > > > >
> > > > >
> > > > >>
> > > > >> Cheers!
> > > > >> Navina
> > > > >>
> > > > >> On Tue, May 23, 2017 at 5:45 PM, Dong Lin <lindon...@gmail.com>
> > > wrote:
> > > > >>
> > > > >> > Hi all,
> > > > >> >
> > > > >> > We created SEP-5: Enable partition expansion of input streams.
> > > Please
> > > > >> find
> > > > >> > the SEP wiki in the link
> > > > >> > https://cwiki.apache.org/confluence/display/SAMZA/SEP-
> > > > >> > 5%3A+Enable+partition+expansion+of+input+streams
> > > > >> > .
> > > > >> >
> > > > >> > You feedback is appreciated!
> > > > >> >
> > > > >> > Thanks,
> > > > >> > Dong
> > > > >> >
> > > > >>
> > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to