Hey Dong,

>  I have updated the motivation section to clarify this.

Thanks for updating the motivation. Couple of notes here:

1.
> "The motivation of increasing partition number of Kafka topic includes 1)
limit the maximum size of a partition in order to improve broker
performance and 2) increase throughput of Kafka consumer in the Samza
container."

It's unclear to me how increasing the partition number will increase the
throughput of the kafka consumer in the container? Theoretically, you will
still be consuming the same amount of data in the container, irrespective
of whether it is coming from one partition or more than one expanded
partitions. Can you please explain it for me here, what you mean by that?

2. I believe the second paragraph under motivation is simply talking about
the scope of the current SEP. It will be easier to read if what solution is
included in this SEP and what is left out as not in scope. (for example,
expansions for stateful jobs is supported or not).

> We need to persist the task-to-sspList mapping in the
coordinator stream so that the job can derive the original number of
partitions of each input stream regardless of how many times the partition
has expanded. Does this make sense?

Yes. It does!

> I am not sure how this is related to the locality though. Can you clarify
your question if I haven't answered your question?

It's not related. I just meant to give an example of yet another
coordinator message that is persisted. Your ssp-to-task mapping is
following a similar pattern for persisting. Just wanted to clarify that.

> Can you let me know if this, together with the answers in the previous
email, addresses all your questions?

Yes. I believe you have addressed most of my questions. Thanks for taking
time to do that.

> Is there specific question you have regarding partition
expansion in Kafka?

I guess my questions are on how partition expansion in Kafka impacts the
clients. Iiuc, partition expansions are done manually in Kafka based on the
bytes-in rate of the partition. Do the existing kafka clients handle this
expansion automatically? if yes, how does it work? If not, are there plans
to support it in the future?

> Thus user's job should not need to bootstrap key/value store from the
changelog topic.

Why is this discussion relevant here? Key/value store / changelog topic
partition is scoped with the context of a task. Since we are not changing
the number of tasks, I don't think it is required to mention it here.

> The new method takes the SystemStreamPartition-to-Task assignment from
the previous job model which can be read from the coordinator stream.

Jobmodel is currently not persisted to coordinator stream. In your design,
you talk about writing separate coordinator messages for ssp-to-task
assignments. Hence, please correct this statement. It is kind of misleading
to the reader.

My biggest gripe with this SEP is that it seems like a tailor-made solution
that relies on the semantics of the Kafka system and yet, we are trying to
masquerade that as operational requirements for other systems interacting
with Samza. (Not to say that this is the first time such a choice is being
made in the Samza design). I am not seeing how this can a "general"
solution for all input systems. That's my two cents. I would like to hear
alternative points of view for this from other devs.

Please make sure you have enough eyes on this SEP. If you do, please start
a VOTE thread to approve this SEP.

Thanks!
Navina


On Mon, May 29, 2017 at 12:32 AM, Dong Lin <lindon...@gmail.com> wrote:

> Hey Navina,
>
> I have updated the wiki based on your suggestion. More specifically, I have
> made the following changes:
>
> - Improved Problem section and Motivation section to describe why we use
> the solution in this proposal instead of tackling the problem of task
> expansion directly.
>
> - Illustrate the design in a way that doesn't bind to Kafka. Kafka is only
> used as example to illustrate why we want to expand partition expansion and
> whether the operational requirement can be supported when Kafka is used as
> the input system. Note that the proposed solution should work for any input
> system that meets the operational requirement described in the wiki.
>
> - Fixed the problem in the figure.
>
> - Added a new class GroupBySystemStreamPartitionFixedTaskNum to the wiki.
> Together with GroupByPartitionFixedTaskNum, it should ensure that we have a
> solution to enable partition expansion for all users that are using
> pre-defined grouper in Samza. Note that those users who use custom grouper
> would need to update their implementation.
>
> Can you let me know if this, together with the answers in the previous
> email, addresses all your questions? Thanks for taking time to review the
> proposal.
>
> Regards,
> Dong
>
>
>
>
>
>
>
> On Wed, May 24, 2017 at 11:15 PM, Dong Lin <lindon...@gmail.com> wrote:
>
> > Hey Navina,
> >
> > Thanks much for your comments. Please see my reply inline.
> >
> > On Wed, May 24, 2017 at 10:22 AM, Navina Ramesh (Apache) <
> > nav...@apache.org> wrote:
> >
> >> Thanks for the SEP, Dong. I have a couple of questions to understand
> your
> >> proposal better:
> >>
> >> * Under motivation, you mention that "_We expect this solution to work
> >> similarly with other input system as well._", yet I don't see any
> >> discussion on how it will work with other input systems. That is, what
> >> kind
> >> of contract does samza expect from other input systems ? If we are not
> >> planning to provide a generic solution, it might be worth calling it out
> >> in
> >> the SEP.
> >>
> >
> > I think the contract we expect from other systems are exactly the
> > operational requirement mentioned in the SEP, i.e. partitions should
> always
> > be doubled and the hash algorithm should module the number of partitions.
> > SEP-5 should also allow partition expansion of all input systems that
> meet
> > these two requirements. I have updated the motivation section to clarify
> > this.
> >
> >
> >>
> >> * I understand the partition mapping logic you have proposed. But I
> think
> >> the example explanation doesn't match the diagram. In the diagram, after
> >> expansion, partiion-0 and partition-1 are pointing to bucket 0 and
> >> partition-3 and partition-4 are pointing to bucket 1. I think the former
> >> has to be partition-0 and partition-2 and the latter, is partition-1 and
> >> partition-3. If I am wrong, please help me understand the logic :)
> >>
> >
> > Good catch. I will update the figure to fix this problem.
> >
> >
> >>
> >> * I don't know how partition expansion in Kafka works. I am familiar
> with
> >> how shard splitting happens in Kinesis - there is hierarchical relation
> >> between the parent and child shards. This way, it will also allow the
> >> shards to be merged back. Iiuc, Kafka only supports partition
> "expansion",
> >> as opposed to "splits". Can you provide some context or link related to
> >> how
> >> partition expansion works in Kafka?
> >>
> >
> > I couldn't find any wiki on partition expansion in Kafka. The partition
> > expansion logic in Kafka is very simply -- it simply adds new partition
> to
> > the existing topic. Is there specific question you have regarding
> partition
> > expansion in Kafka?
> >
> >
> >>
> >> * Are you only recommending that expansion can be supported for samza
> jobs
> >> that use Kafka as input systems **and** configure the SSPGrouper as
> >> GroupByPartitionFixedTaskNum? Sounds to me like this only applies for
> >> GroupByPartition. Please correct me if I am wrong. What is the
> expectation
> >> for custom SSP Groupers?
> >>
> >
> > The expansion can be supported for Samza jobs if the input system meets
> > the operational requirement mentioned above. It doesn't have to use Kafka
> > as input system.
> >
> > The current proposal provided solution for jobs that currently use
> > GroupByPartition. The proposal can be extended to support jobs that use
> > other grouper that are pre-defined in Samza. The custom SSP grouper needs
> > to handle partition expansion similar to how GroupByPartitionFixedTaskNum
> > handles it and it is users' responsibility to update their custom grouper
> > implementation.
> >
> >
> >>
> >> * Regarding storing SSP-to-Task assignment to coordinator stream: Today,
> >> the JobModel encapsulates the data model in samza which also includes
> >> **TaskModels**. TaskModel, typically shows the task-to-sspList mapping.
> >> What is the reason for using a separate coordinator stream message
> >> *SetSSPTaskMapping*? Is it because the JobModel itself is not persisted
> in
> >> the coordinator stream today?  The reason locality exists outside of the
> >> jobmodel is because *locality* information is written by each container,
> >> where as it is consumed only by the leader jobcoordinator/AM. In this
> >> case,
> >> the writer of the mapping information and the reader is still the leader
> >> jobcoordinator/AM. So, I want to understand the motivation for this
> >> choice.
> >>
> >
> > Yes, the reason for using a separate coordinate stream message is because
> > the task-to-sspList mapping is not currently persisted in the coordinator
> > stream. We wouldn't need to create this new stream message if JobModel is
> > persisted. We need to persist the task-to-sspList mapping in the
> > coordinator stream so that the job can derive the original number of
> > partitions of each input stream regardless of how many times the
> partition
> > has expanded. Does this make sense?
> >
> > I am not sure how this is related to the locality though. Can you clarify
> > your question if I haven't answered your question?
> >
> > Thanks!
> > Dong
> >
> >
> >>
> >> Cheers!
> >> Navina
> >>
> >> On Tue, May 23, 2017 at 5:45 PM, Dong Lin <lindon...@gmail.com> wrote:
> >>
> >> > Hi all,
> >> >
> >> > We created SEP-5: Enable partition expansion of input streams. Please
> >> find
> >> > the SEP wiki in the link
> >> > https://cwiki.apache.org/confluence/display/SAMZA/SEP-
> >> > 5%3A+Enable+partition+expansion+of+input+streams
> >> > .
> >> >
> >> > You feedback is appreciated!
> >> >
> >> > Thanks,
> >> > Dong
> >> >
> >>
> >
> >
>

Reply via email to