Hi, Jan, Dong, John, Guozhang,

Perhaps it will be useful to have a KIP meeting to discuss this together as
a group. Would Apr. 9 (Monday) at 9:00am PDT work? If so, I will send out
an invite to the mailing list.

Thanks,

Jun


On Wed, Apr 4, 2018 at 1:25 AM, Jan Filipiak <jan.filip...@trivago.com>
wrote:

> Want to quickly step in here again because it is going places again.
>
> The last part of the discussion is just a pain to read and completely
> diverged from what I suggested without making the reasons clear to me.
>
> I don't know why this happens.... here are my comments anyway.
>
> @Guozhang: That Streams is working on automatic creating
> copartition-usuable topics: great for streams, has literally nothing todo
> with the KIP as we want to grow the
> input topic. Everyone can reshuffle rel. easily but that is not what we
> need todo, we need to grow the topic in question. After streams
> automatically reshuffled the input topic
> still has the same size and it didn't help a bit. I fail to see why this
> is relevant. What am i missing here?
>
> @Dong
> I am still on the position that the current proposal brings us into the
> wrong direction. Especially introducing PartitionKeyRebalanceListener
> From this point we can never move away to proper state full handling
> without completely deprecating this creature from hell again.
> Linear hashing is not the optimising step we have todo here. An interface
> that when a topic is a topic its always the same even after it had
> grown or shrunk is important. So from my POV I have major concerns that
> this KIP is benefitial in its current state.
>
> What is it that makes everyone so addicted to the idea of linear hashing?
> not attractive at all for me.
> And with statefull consumers still a complete mess. Why not stick with the
> Kappa architecture???
>
>
>
>
>
> On 03.04.2018 17:38, Dong Lin wrote:
>
>> Hey John,
>>
>> Thanks much for your comments!!
>>
>> I have yet to go through the emails of John/Jun/Guozhang in detail. But
>> let
>> me present my idea for how to minimize the delay for state loading for
>> stream use-case.
>>
>> For ease of understanding, let's assume that the initial partition number
>> of input topics and change log topic are both 10. And initial number of
>> stream processor is also 10. If we only increase initial partition number
>> of input topics to 15 without changing number of stream processor, the
>> current KIP already guarantees in-order delivery and no state needs to be
>> moved between consumers for stream use-case. Next, let's say we want to
>> increase the number of processor to expand the processing capacity for
>> stream use-case. This requires us to move state between processors which
>> will take time. Our goal is to minimize the impact (i.e. delay) for
>> processing while we increase the number of processors.
>>
>> Note that stream processor generally includes both consumer and producer.
>> In addition to consume from the input topic, consumer may also need to
>> consume from change log topic on startup for recovery. And producer may
>> produce state to the change log topic.
>>
>>
> The solution will include the following steps:
>>
>> 1) Increase partition number of the input topic from 10 to 15. Since the
>> messages with the same key will still go to the same consume before and
>> after the partition expansion, this step can be done without having to
>> move
>> state between processors.
>>
>> 2) Increase partition number of the change log topic from 10 to 15. Note
>> that this step can also be done without impacting existing workflow. After
>> we increase partition number of the change log topic, key space may split
>> and some key will be produced to the newly-added partition. But the same
>> key will still go to the same processor (i.e. consumer) before and after
>> the partition. Thus this step can also be done without having to move
>> state
>> between processors.
>>
>> 3) Now, let's add 5 new consumers whose groupId is different from the
>> existing processor's groupId. Thus these new consumers will not impact
>> existing workflow. Each of these new consumers should consume two
>> partitions from the earliest offset, where these two partitions are the
>> same partitions that will be consumed if the consumers have the same
>> groupId as the existing processor's groupId. For example, the first of the
>> five consumers will consume partition 0 and partition 10. The purpose of
>> these consumers is to rebuild the state (e.g. RocksDB) for the processors
>> in advance. Also note that, by design of the current KIP, each consume
>> will
>> consume the existing partition of the change log topic up to the offset
>> before the partition expansion. Then they will only need to consume the
>> state of the new partition of the change log topic.
>>
>> 4) After consumers have caught up in step 3), we should stop these
>> consumers and add 5 new processors to the stream processing job. These 5
>> new processors should run in the same location as the previous 5 consumers
>> to re-use the state (e.g. RocksDB). And these processors' consumers should
>> consume partitions of the change log topic from the committed offset the
>> previous 5 consumers so that no state is missed.
>>
>> One important trick to note here is that, the mapping from partition to
>> consumer should also use linear hashing. And we need to remember the
>> initial number of processors in the job, 10 in this example, and use this
>> number in the linear hashing algorithm. This is pretty much the same as
>> how
>> we use linear hashing to map key to partition. In this case, we get an
>> identity map from partition -> processor, for both input topic and the
>> change log topic. For example, processor 12 will consume partition 12 of
>> the input topic and produce state to the partition 12 of the change log
>> topic.
>>
>> There are a few important properties of this solution to note:
>>
>> - We can increase the number of partitions for input topic and the change
>> log topic in any order asynchronously.
>> - The expansion of the processors in a given job in step 4) only requires
>> the step 3) for the same job. It does not require coordination across
>> different jobs for step 3) and 4). Thus different jobs can independently
>> expand there capacity without waiting for each other.
>> - The logic for 1) and 2) is already supported in the current KIP. The
>> logic for 3) and 4) appears to be independent of the core Kafka logic and
>> can be implemented separately outside core Kafka. Thus the current KIP is
>> probably sufficient after we agree on the efficiency and the correctness
>> of
>> the solution. We can have a separate KIP for Kafka Stream to support 3)
>> and
>> 4).
>>
>>
>> Cheers,
>> Dong
>>
>>
>> On Mon, Apr 2, 2018 at 3:25 PM, Guozhang Wang <wangg...@gmail.com> wrote:
>>
>> Hey guys, just sharing my two cents here (I promise it will be shorter
>>> than
>>> John's article :).
>>>
>>> 0. Just to quickly recap, the main discussion point now is how to support
>>> "key partitioning preservation" (John's #4 in topic characteristics
>>> above)
>>> beyond the "single-key ordering preservation" that KIP-253 was originally
>>> proposed to maintain (John's #6 above).
>>>
>>> 1. From the streams project, we are actively working on improving the
>>> elastic scalability of the library. One of the key features is to
>>> decouple
>>> the input topics from the parallelism model of Streams: i.e. not
>>> enforcing
>>> the topic to be partitioned by the key, not enforcing joining topics to
>>> be
>>> co-partitioned, not relying the number of parallel tasks on the input
>>> topic
>>> partitions. This can be achieved by re-shuffling on the input topics to
>>> make sure key-partitioning / co-partitioning on the internal topics. Note
>>> the re-shuffling task is purely stateless and hence does not require "key
>>> partitioning preservation". Operational-wise it is similar to the
>>> "creating
>>> a new topic with new number of partitions, pipe the data to the new topic
>>> and cut over consumers from old topics" idea, just that users can
>>> optionally let Streams to handle such rather than doing it manually
>>> themselves. There are a few more details on that regard but I will skip
>>> since they are not directly related to this discussion.
>>>
>>> 2. Assuming that 1) above is done, then the only topics involved in the
>>> scaling events are all input topics. For these topics the only producers
>>> /
>>> consumers of these topics are controlled by Streams clients themselves,
>>> and
>>> hence achieving "key partitioning preservation" is simpler than
>>> non-Streams
>>> scenarios: consumers know the partitioning scheme that producers are
>>> using,
>>> so that for their stateful operations it is doable to split the local
>>> state
>>> stores accordingly or execute backfilling on its own. Of course, if we
>>> decide to do server-side backfilling, it can still help Streams to
>>> directly
>>> rely on that functionality.
>>>
>>> 3. As John mentioned, another way inside Streams is to do
>>> over-partitioning
>>> on all internal topics; then with 1) Streams would not rely on KIP-253 at
>>> all. But personally I'd like to avoid it if possible to reduce Kafka side
>>> footprint: say we overpartition each input topic up to 1k, with a
>>> reasonable sized stateful topology it can still contribute to tens of
>>> thousands of topics to the topic partition capacity of a single cluster.
>>>
>>> 4. Summing up 1/2/3, I think we should focus more on non-Streams users
>>> writing their stateful computations with local states, and think whether
>>> /
>>> how we could enable "key partitioning preservation" for them easily, than
>>> to think heavily for Streams library. People may have different opinion
>>> on
>>> how common of a usage pattern it is (I think Jun might be suggesting that
>>> for DIY apps people may more likely use remote states so that it is not a
>>> problem for them). My opinion is that for non-Streams users such usage
>>> pattern could still be large (think: if you are piping data from Kafka to
>>> an external data storage which has single-writer requirements for each
>>> single shard, even though it is not a stateful computational application
>>> it
>>> may still require "key partitioning preservation"), so I prefer to have
>>> backfilling in our KIP than only exposing the API for expansion and
>>> requires consumers to have pre-knowledge of the producer's partitioning
>>> scheme.
>>>
>>>
>>>
>>> Guozhang
>>>
>>>
>>>
>>> On Thu, Mar 29, 2018 at 2:33 PM, John Roesler <j...@confluent.io> wrote:
>>>
>>> Hey Dong,
>>>>
>>>> Congrats on becoming a committer!!!
>>>>
>>>> Since I just sent a novel-length email, I'll try and keep this one brief
>>>>
>>> ;)
>>>
>>>> Regarding producer coordination, I'll grant that in that case, producers
>>>> may coordinate among themselves to produce into the same topic or to
>>>> produce co-partitioned topics. Nothing in KStreams or the Kafka
>>>> ecosystem
>>>> in general requires such coordination for correctness or in fact for any
>>>> optional features, though, so I would not say that we require producer
>>>> coordination of partition logic. If producers currently coordinate, it's
>>>> completely optional and their own choice.
>>>>
>>>> Regarding the portability of partition algorithms, my observation is
>>>> that
>>>> systems requiring independent implementations of the same algorithm with
>>>> 100% correctness are a large source of risk and also a burden on those
>>>>
>>> who
>>>
>>>> have to maintain them. If people could flawlessly implement algorithms
>>>> in
>>>> actual software, the world would be a wonderful place indeed! For a
>>>>
>>> system
>>>
>>>> as important and widespread as Kafka, I would recommend restricting
>>>> limiting such requirements as aggressively as possible.
>>>>
>>>> I'd agree that we can always revisit decisions like allowing arbitrary
>>>> partition functions, but of course, we shouldn't do that in a vacuum.
>>>>
>>> That
>>>
>>>> feels like the kind of thing we'd need to proactively seek guidance from
>>>> the users list about. I do think that the general approach of saying
>>>> that
>>>> "if you use a custom partitioner, you cannot do partition expansion" is
>>>> very reasonable (but I don't think we need to go that far with the
>>>>
>>> current
>>>
>>>> proposal). It's similar to my statement in my email to Jun that in
>>>> principle KStreams doesn't *need* backfill, we only need it if we want
>>>> to
>>>> employ partition expansion.
>>>>
>>>> I reckon that the main motivation for backfill is to support KStreams
>>>> use
>>>> cases and also any other use cases involving stateful consumers.
>>>>
>>>> Thanks for your response, and congrats again!
>>>> -John
>>>>
>>>>
>>>> On Wed, Mar 28, 2018 at 1:34 AM, Dong Lin <lindon...@gmail.com> wrote:
>>>>
>>>> Hey John,
>>>>>
>>>>> Great! Thanks for all the comment. It seems that we agree that the
>>>>>
>>>> current
>>>>
>>>>> KIP is in good shape for core Kafka. IMO, what we have been discussing
>>>>>
>>>> in
>>>
>>>> the recent email exchanges is mostly about the second step, i.e. how to
>>>>> address problem for the stream use-case (or stateful processing in
>>>>> general).
>>>>>
>>>>> I will comment inline.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Mar 27, 2018 at 4:38 PM, John Roesler <j...@confluent.io>
>>>>>
>>>> wrote:
>>>
>>>> Thanks for the response, Dong.
>>>>>>
>>>>>> Here are my answers to your questions:
>>>>>>
>>>>>> - "Asking producers and consumers, or even two different producers,
>>>>>>
>>>>> to
>>>
>>>> share code like the partition function is a pretty huge ask. What
>>>>>>>
>>>>>> if
>>>
>>>> they
>>>>>
>>>>>> are using different languages?". It seems that today we already
>>>>>>>
>>>>>> require
>>>>
>>>>> different producer's to use the same hash function -- otherwise
>>>>>>>
>>>>>> messages
>>>>>
>>>>>> with the same key will go to different partitions of the same topic
>>>>>>>
>>>>>> which
>>>>>
>>>>>> may cause problem for downstream consumption. So not sure if it
>>>>>>>
>>>>>> adds
>>>
>>>> any
>>>>>
>>>>>> more constraint by assuming consumers know the hash function of
>>>>>>>
>>>>>> producer.
>>>>>
>>>>>> Could you explain more why user would want to use a cusmtom
>>>>>>>
>>>>>> partition
>>>
>>>> function? Maybe we can check if this is something that can be
>>>>>>>
>>>>>> supported
>>>>
>>>>> in
>>>>>>
>>>>>>> the default Kafka hash function. Also, can you explain more why it
>>>>>>>
>>>>>> is
>>>
>>>> difficuilt to implement the same hash function in different
>>>>>>>
>>>>>> languages?
>>>>
>>>>>
>>>>>> Sorry, I meant two different producers as in producers to two
>>>>>>
>>>>> different
>>>
>>>> topics. This was in response to the suggestion that we already
>>>>>>
>>>>> require
>>>
>>>> coordination among producers to different topics in order to achieve
>>>>>> co-partitioning. I was saying that we do not (and should not).
>>>>>>
>>>>>
>>>>> It is probably common for producers of different team to produce
>>>>>
>>>> message
>>>
>>>> to
>>>>
>>>>> the same topic. In order to ensure that messages with the same key go
>>>>>
>>>> to
>>>
>>>> same partition, we need producers of different team to share the same
>>>>> partition algorithm, which by definition requires coordination among
>>>>> producers of different teams in an organization. Even for producers of
>>>>> different topics, it may be common to require producers to use the same
>>>>> partition algorithm in order to join two topics for stream processing.
>>>>>
>>>> Does
>>>>
>>>>> this make it reasonable to say we already require coordination across
>>>>> producers?
>>>>>
>>>>>
>>>>> By design, consumers are currently ignorant of the partitioning
>>>>>>
>>>>> scheme.
>>>
>>>> It
>>>>>
>>>>>> suffices to trust that the producer has partitioned the topic by key,
>>>>>>
>>>>> if
>>>>
>>>>> they claim to have done so. If you don't trust that, or even if you
>>>>>>
>>>>> just
>>>>
>>>>> need some other partitioning scheme, then you must re-partition it
>>>>>> yourself. Nothing we're discussing can or should change that. The
>>>>>>
>>>>> value
>>>
>>>> of
>>>>>
>>>>>> backfill is that it preserves the ability for consumers to avoid
>>>>>> re-partitioning before consuming, in the case where they don't need
>>>>>>
>>>>> to
>>>
>>>> today.
>>>>>>
>>>>>
>>>>> Regarding shared "hash functions", note that it's a bit inaccurate to
>>>>>>
>>>>> talk
>>>>>
>>>>>> about the "hash function" of the producer. Properly speaking, the
>>>>>>
>>>>> producer
>>>>>
>>>>>> has only a "partition function". We do not know that it is a hash.
>>>>>>
>>>>> The
>>>
>>>> producer can use any method at their disposal to assign a partition
>>>>>>
>>>>> to
>>>
>>>> a
>>>>
>>>>> record. The partition function obviously may we written in any
>>>>>>
>>>>> programming
>>>>>
>>>>>> language, so in general it's not something that can be shared around
>>>>>> without a formal spec or the ability to execute arbitrary executables
>>>>>>
>>>>> in
>>>>
>>>>> arbitrary runtime environments.
>>>>>>
>>>>>> Yeah it is probably better to say partition algorithm. I guess it
>>>>>
>>>> should
>>>
>>>> not be difficult to implement same partition algorithms in different
>>>>> languages, right? Yes we would need a formal specification of the
>>>>>
>>>> default
>>>
>>>> partition algorithm in the producer. I think that can be documented as
>>>>>
>>>> part
>>>>
>>>>> of the producer interface.
>>>>>
>>>>>
>>>>> Why would a producer want a custom partition function? I don't
>>>>>>
>>>>> know...
>>>
>>>> why
>>>>>
>>>>>> did we design the interface so that our users can provide one? In
>>>>>>
>>>>> general,
>>>>>
>>>>>> such systems provide custom partitioners because some data sets may
>>>>>>
>>>>> be
>>>
>>>> unbalanced under the default or because they can provide some
>>>>>>
>>>>> interesting
>>>>
>>>>> functionality built on top of the partitioning scheme, etc. Having
>>>>>>
>>>>> provided
>>>>>
>>>>>> this ability, I don't know why we would remove it.
>>>>>>
>>>>>> Yeah it is reasonable to assume that there was reason to support
>>>>> custom
>>>>> partition function in producer. On the other hand it may also be
>>>>>
>>>> reasonable
>>>>
>>>>> to revisit this interface and discuss whether we actually need to
>>>>>
>>>> support
>>>
>>>> custom partition function. If we don't have a good reason, we can
>>>>>
>>>> choose
>>>
>>>> not to support custom partition function in this KIP in a backward
>>>>> compatible manner, i.e. user can still use custom partition function
>>>>>
>>>> but
>>>
>>>> they would not get the benefit of in-order delivery when there is
>>>>>
>>>> partition
>>>>
>>>>> expansion. What do you think?
>>>>>
>>>>>
>>>>> - Besides the assumption that consumer needs to share the hash
>>>>>>
>>>>> function
>>>
>>>> of
>>>>>
>>>>>> producer, is there other organization overhead of the proposal in
>>>>>>>
>>>>>> the
>>>
>>>> current KIP?
>>>>>>>
>>>>>>> It wasn't clear to me that KIP-253 currently required the producer
>>>>>>
>>>>> and
>>>
>>>> consumer to share the partition function, or in fact that it had a
>>>>>>
>>>>> hard
>>>
>>>> requirement to abandon the general partition function and use a
>>>>>>
>>>>> linear
>>>
>>>> hash
>>>>>
>>>>>> function instead.
>>>>>>
>>>>>
>>>>> In my reading, there is a requirement to track the metadata about
>>>>>>
>>>>> what
>>>
>>>> partitions split into what other partitions during an expansion
>>>>>>
>>>>> operation.
>>>>>
>>>>>> If the partition function is linear, this is easy. If not, you can
>>>>>>
>>>>> always
>>>>
>>>>> just record that all old partitions split into all new partitions.
>>>>>>
>>>>> This
>>>
>>>> has
>>>>>
>>>>>> the effect of forcing all consumers to wait until the old epoch is
>>>>>> completely consumed before starting on the new epoch. But this may
>>>>>>
>>>>> be a
>>>
>>>> reasonable tradeoff, and it doesn't otherwise alter your design.
>>>>>>
>>>>>> You only mention the consumer needing to know that the partition
>>>>>>
>>>>> function
>>>>
>>>>> is linear, not what the actual function is, so I don't think your
>>>>>>
>>>>> design
>>>>
>>>>> actually calls for sharing the function. Plus, really all the
>>>>>>
>>>>> consumer
>>>
>>>> needs is the metadata about what old-epoch partitions to wait for
>>>>>>
>>>>> before
>>>>
>>>>> consuming a new-epoch partition. This information is directly
>>>>>>
>>>>> captured
>>>
>>>> in
>>>>
>>>>> metadata, so I don't think it actually even cares whether the
>>>>>>
>>>>> partition
>>>
>>>> function is linear or not.
>>>>>>
>>>>>> You are right that the current KIP does not mention it. My comment
>>>>>
>>>> related
>>>>
>>>>> to the partition function coordination was related to support the
>>>>> stream-use case which we have been discussing so far.
>>>>>
>>>>>
>>>>> So, no, I really think KIP-253 is in good shape. I was really more
>>>>>>
>>>>> talking
>>>>>
>>>>>> about the part of this thread that's outside of KIP-253's scope,
>>>>>>
>>>>> namely,
>>>>
>>>>> creating the possibility of backfilling partitions after expansion.
>>>>>>
>>>>>> Great! Can you also confirm that the main motivation for backfilling
>>>>> partitions after expansion is to support the stream use-case?
>>>>>
>>>>>
>>>>> - Currently producer can forget about the message that has been
>>>>>>
>>>>>>> acknowledged by the broker. Thus the producer probably does not
>>>>>>>
>>>>>> know
>>>
>>>> most
>>>>>
>>>>>> of the exiting messages in topic, including those messages produced
>>>>>>>
>>>>>> by
>>>>
>>>>> other producers. We can have the owner of the producer to
>>>>>>>
>>>>>> split+backfill.
>>>>>
>>>>>> In my opion it will be a new program that wraps around the existing
>>>>>>> producer and consumer classes.
>>>>>>>
>>>>>>> This sounds fine by me!
>>>>>>
>>>>>> Really, I was just emphasizing that the part of the organization that
>>>>>> produces a topic shouldn't have to export their partition function to
>>>>>>
>>>>> the
>>>>
>>>>> part(s) of the organization (or other organizations) that consume the
>>>>>> topic. Whether the backfill operation goes into the Producer
>>>>>>
>>>>> interface
>>>
>>>> is
>>>>
>>>>> secondary, I think.
>>>>>>
>>>>>> - Regarding point 5. The argument is in favor of the split+backfill
>>>>>>
>>>>> but
>>>
>>>> for
>>>>>
>>>>>> changelog topic. And it intends to address the problem for stream
>>>>>>>
>>>>>> use-case
>>>>>>
>>>>>>> in general. In this KIP we will provide interface (i.e.
>>>>>>> PartitionKeyRebalanceListener in the KIP) to be used by sream
>>>>>>>
>>>>>> use-case
>>>>
>>>>> and
>>>>>>
>>>>>>> the goal is that user can flush/re-consume the state as part of the
>>>>>>> interface implementation regardless of whether there is change log
>>>>>>>
>>>>>> topic.
>>>>>
>>>>>> Maybe you are suggesting that the main reason to do split+backfill
>>>>>>>
>>>>>> of
>>>
>>>> input
>>>>>>
>>>>>>> topic is to support log compacted topics? You mentioned in Point 1
>>>>>>>
>>>>>> that
>>>>
>>>>> log
>>>>>>
>>>>>>> compacted topics is out of the scope of this KIP. Maybe I could
>>>>>>>
>>>>>> understand
>>>>>>
>>>>>>> your position better. Regarding Jan's proposal to split partitions
>>>>>>>
>>>>>> with
>>>>
>>>>> backfill, do you think this should replace the proposal in the
>>>>>>>
>>>>>> existing
>>>>
>>>>> KIP, or do you think this is something that we should do in
>>>>>>>
>>>>>> addition
>>>
>>>> to
>>>>
>>>>> the
>>>>>>
>>>>>>> existing KIP?
>>>>>>>
>>>>>>> I think that interface is a good/necessary component of KIP-253.
>>>>>>
>>>>>> I personally (FWIW) feel that KIP-253 is appropriately scoped, but I
>>>>>>
>>>>> do
>>>
>>>> think its utility will be limited unless there is a later KIP
>>>>>>
>>>>> offering
>>>
>>>> backfill. But, maybe unlike Jan, I think it makes sense to try and
>>>>>>
>>>>> tackle
>>>>
>>>>> the ordering problem independently of backfill, so I'm in support of
>>>>>>
>>>>> the
>>>>
>>>>> current KIP.
>>>>>>
>>>>>> - Regarding point 6. I guess we can agree that it is better not to
>>>>>>
>>>>> have
>>>
>>>> the
>>>>>
>>>>>> performance overhread of copying the input data. Before we discuss
>>>>>>>
>>>>>> more
>>>>
>>>>> on
>>>>>>
>>>>>>> whether the performance overhead is acceptable or not, I am trying
>>>>>>>
>>>>>> to
>>>
>>>> figure out what is the benefit of introducing this overhread. You
>>>>>>>
>>>>>> mentioned
>>>>>>
>>>>>>> that the benefit is the loose organizational coupling. By
>>>>>>>
>>>>>> "organizational
>>>>>
>>>>>> coupling", are you referring to the requirement that consumer needs
>>>>>>>
>>>>>> to
>>>>
>>>>> know
>>>>>>
>>>>>>> the hash function of producer? If so, maybe we can discuss the
>>>>>>>
>>>>>> use-case
>>>>
>>>>> of
>>>>>>
>>>>>>> custom partiton function and see whether we can find a way to
>>>>>>>
>>>>>> support
>>>
>>>> such
>>>>>>
>>>>>>> use-case without having to copy the input data.
>>>>>>>
>>>>>>> I'm not too sure about what an "input" is in this sense, since we are
>>>>>>
>>>>> just
>>>>>
>>>>>> talking about topics. Actually the point I was making there is that
>>>>>>
>>>>> AKAICT
>>>>>
>>>>>> the performance overhead of a backfill is less than any other option,
>>>>>> assuming you split partitions rarely.
>>>>>>
>>>>>> By "input" I was referring to source Kafka topic of a stream
>>>>> processing
>>>>> job.
>>>>>
>>>>>
>>>>> Separately, yes, "organizational coupling" increases if producers and
>>>>>> consumers have to share code, such as the partition function. This
>>>>>>
>>>>> would
>>>>
>>>>> not be the case if producers could only pick from a menu of a few
>>>>>> well-known partition functions, but I think this is a poor tradeoff.
>>>>>>
>>>>>> Maybe we can revisit the custom partition function and see whether we
>>>>> actually need it? Otherwise, I am concerned that every user will pay
>>>>>
>>>> the
>>>
>>>> overhead of data movement to support something that was not really
>>>>>
>>>> needed
>>>
>>>> for most users.
>>>>>
>>>>>
>>>>> To me, this is two strong arguments in favor of backfill being less
>>>>>> expensive than no backfill, but again, I think that particular debate
>>>>>>
>>>>> comes
>>>>>
>>>>>> after KIP-253, so I don't want to create the impression of opposition
>>>>>>
>>>>> to
>>>>
>>>>> your proposal.
>>>>>>
>>>>>>
>>>>>> Finally, to respond to a new email I just noticed:
>>>>>>
>>>>>> BTW, here is my understanding of the scope of this KIP. We want to
>>>>>>>
>>>>>> allow
>>>>>
>>>>>> consumers to always consume messages with the same key from the
>>>>>>>
>>>>>> same
>>>
>>>> producer in the order they are produced. And we need to provide a
>>>>>>>
>>>>>> way
>>>
>>>> for
>>>>>
>>>>>> stream use-case to be able to flush/load state when messages with
>>>>>>>
>>>>>> the
>>>
>>>> same
>>>>>>
>>>>>>> key are migrated between consumers. In addition to ensuring that
>>>>>>>
>>>>>> this
>>>
>>>> goal
>>>>>>
>>>>>>> is correctly supported, we should do our best to keep the
>>>>>>>
>>>>>> performance
>>>
>>>> and
>>>>>
>>>>>> organization overhead of this KIP as low as possible.
>>>>>>>
>>>>>>> I think we're on the same page there! In fact, I would generalize a
>>>>>>
>>>>> little
>>>>>
>>>>>> more and say that the mechanism you've designed provides *all
>>>>>>
>>>>> consumers*
>>>>
>>>>> the ability "to flush/load state when messages with the same key are
>>>>>> migrated between consumers", not just Streams.
>>>>>>
>>>>>> Thanks for all the comment!
>>>>>
>>>>>
>>>>> Thanks for the discussion,
>>>>>> -John
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Tue, Mar 27, 2018 at 3:14 PM, Dong Lin <lindon...@gmail.com>
>>>>>>
>>>>> wrote:
>>>
>>>> Hey John,
>>>>>>>
>>>>>>> Thanks much for the detailed comments. Here are my thoughts:
>>>>>>>
>>>>>>> - The need to delete messages from log compacted topics is mainly
>>>>>>>
>>>>>> for
>>>
>>>> performance (e.g. storage space) optimization than for correctness
>>>>>>>
>>>>>> for
>>>>
>>>>> this
>>>>>>
>>>>>>> KIP. I agree that we probably don't need to focus on this in our
>>>>>>>
>>>>>> discussion
>>>>>>
>>>>>>> since it is mostly for performance optimization.
>>>>>>>
>>>>>>> - "Asking producers and consumers, or even two different producers,
>>>>>>>
>>>>>> to
>>>>
>>>>> share code like the partition function is a pretty huge ask. What
>>>>>>>
>>>>>> if
>>>
>>>> they
>>>>>
>>>>>> are using different languages?". It seems that today we already
>>>>>>>
>>>>>> require
>>>>
>>>>> different producer's to use the same hash function -- otherwise
>>>>>>>
>>>>>> messages
>>>>>
>>>>>> with the same key will go to different partitions of the same topic
>>>>>>>
>>>>>> which
>>>>>
>>>>>> may cause problem for downstream consumption. So not sure if it
>>>>>>>
>>>>>> adds
>>>
>>>> any
>>>>>
>>>>>> more constraint by assuming consumers know the hash function of
>>>>>>>
>>>>>> producer.
>>>>>
>>>>>> Could you explain more why user would want to use a cusmtom
>>>>>>>
>>>>>> partition
>>>
>>>> function? Maybe we can check if this is something that can be
>>>>>>>
>>>>>> supported
>>>>
>>>>> in
>>>>>>
>>>>>>> the default Kafka hash function. Also, can you explain more why it
>>>>>>>
>>>>>> is
>>>
>>>> difficuilt to implement the same hash function in different
>>>>>>>
>>>>>> languages?
>>>>
>>>>> - Besides the assumption that consumer needs to share the hash
>>>>>>>
>>>>>> function
>>>>
>>>>> of
>>>>>>
>>>>>>> producer, is there other organization overhead of the proposal in
>>>>>>>
>>>>>> the
>>>
>>>> current KIP?
>>>>>>>
>>>>>>> - Currently producer can forget about the message that has been
>>>>>>> acknowledged by the broker. Thus the producer probably does not
>>>>>>>
>>>>>> know
>>>
>>>> most
>>>>>
>>>>>> of the exiting messages in topic, including those messages produced
>>>>>>>
>>>>>> by
>>>>
>>>>> other producers. We can have the owner of the producer to
>>>>>>>
>>>>>> split+backfill.
>>>>>
>>>>>> In my opion it will be a new program that wraps around the existing
>>>>>>> producer and consumer classes.
>>>>>>>
>>>>>>> - Regarding point 5. The argument is in favor of the split+backfill
>>>>>>>
>>>>>> but
>>>>
>>>>> for
>>>>>>
>>>>>>> changelog topic. And it intends to address the problem for stream
>>>>>>>
>>>>>> use-case
>>>>>>
>>>>>>> in general. In this KIP we will provide interface (i.e.
>>>>>>> PartitionKeyRebalanceListener in the KIP) to be used by sream
>>>>>>>
>>>>>> use-case
>>>>
>>>>> and
>>>>>>
>>>>>>> the goal is that user can flush/re-consume the state as part of the
>>>>>>> interface implementation regardless of whether there is change log
>>>>>>>
>>>>>> topic.
>>>>>
>>>>>> Maybe you are suggesting that the main reason to do split+backfill
>>>>>>>
>>>>>> of
>>>
>>>> input
>>>>>>
>>>>>>> topic is to support log compacted topics? You mentioned in Point 1
>>>>>>>
>>>>>> that
>>>>
>>>>> log
>>>>>>
>>>>>>> compacted topics is out of the scope of this KIP. Maybe I could
>>>>>>>
>>>>>> understand
>>>>>>
>>>>>>> your position better. Regarding Jan's proposal to split partitions
>>>>>>>
>>>>>> with
>>>>
>>>>> backfill, do you think this should replace the proposal in the
>>>>>>>
>>>>>> existing
>>>>
>>>>> KIP, or do you think this is something that we should do in
>>>>>>>
>>>>>> addition
>>>
>>>> to
>>>>
>>>>> the
>>>>>>
>>>>>>> existing KIP?
>>>>>>>
>>>>>>> - Regarding point 6. I guess we can agree that it is better not to
>>>>>>>
>>>>>> have
>>>>
>>>>> the
>>>>>>
>>>>>>> performance overhread of copying the input data. Before we discuss
>>>>>>>
>>>>>> more
>>>>
>>>>> on
>>>>>>
>>>>>>> whether the performance overhead is acceptable or not, I am trying
>>>>>>>
>>>>>> to
>>>
>>>> figure out what is the benefit of introducing this overhread. You
>>>>>>>
>>>>>> mentioned
>>>>>>
>>>>>>> that the benefit is the loose organizational coupling. By
>>>>>>>
>>>>>> "organizational
>>>>>
>>>>>> coupling", are you referring to the requirement that consumer needs
>>>>>>>
>>>>>> to
>>>>
>>>>> know
>>>>>>
>>>>>>> the hash function of producer? If so, maybe we can discuss the
>>>>>>>
>>>>>> use-case
>>>>
>>>>> of
>>>>>>
>>>>>>> custom partiton function and see whether we can find a way to
>>>>>>>
>>>>>> support
>>>
>>>> such
>>>>>>
>>>>>>> use-case without having to copy the input data.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Dong
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Mar 27, 2018 at 11:34 AM, John Roesler <j...@confluent.io>
>>>>>>>
>>>>>> wrote:
>>>>>>
>>>>>>> Hey Dong and Jun,
>>>>>>>>
>>>>>>>> Thanks for the thoughtful responses. If you don't mind, I'll mix
>>>>>>>>
>>>>>>> my
>>>
>>>> replies
>>>>>>>
>>>>>>>> together to try for a coherent response. I'm not too familiar
>>>>>>>>
>>>>>>> with
>>>
>>>> mailing-list etiquette, though.
>>>>>>>>
>>>>>>>> I'm going to keep numbering my points because it makes it easy
>>>>>>>>
>>>>>>> for
>>>
>>>> you
>>>>>
>>>>>> all
>>>>>>>
>>>>>>>> to respond.
>>>>>>>>
>>>>>>>> Point 1:
>>>>>>>> As I read it, KIP-253 is *just* about properly fencing the
>>>>>>>>
>>>>>>> producers
>>>>
>>>>> and
>>>>>>
>>>>>>> consumers so that you preserve the correct ordering of records
>>>>>>>>
>>>>>>> during
>>>>
>>>>> partition expansion. This is clearly necessary regardless of
>>>>>>>>
>>>>>>> anything
>>>>
>>>>> else
>>>>>>>
>>>>>>>> we discuss. I think this whole discussion about backfill,
>>>>>>>>
>>>>>>> consumers,
>>>>
>>>>> streams, etc., is beyond the scope of KIP-253. But it would be
>>>>>>>>
>>>>>>> cumbersome
>>>>>>
>>>>>>> to start a new thread at this point.
>>>>>>>>
>>>>>>>> I had missed KIP-253's Proposed Change #9 among all the
>>>>>>>>
>>>>>>> details...
>>>
>>>> I
>>>>
>>>>> think
>>>>>>>
>>>>>>>> this is a nice addition to the proposal. One thought is that it's
>>>>>>>>
>>>>>>> actually
>>>>>>>
>>>>>>>> irrelevant whether the hash function is linear. This is simply an
>>>>>>>>
>>>>>>> algorithm
>>>>>>>
>>>>>>>> for moving a key from one partition to another, so the type of
>>>>>>>>
>>>>>>> hash
>>>
>>>> function need not be a precondition. In fact, it also doesn't
>>>>>>>>
>>>>>>> matter
>>>>
>>>>> whether the topic is compacted or not, the algorithm works
>>>>>>>>
>>>>>>> regardless.
>>>>>
>>>>>> I think this is a good algorithm to keep in mind, as it might
>>>>>>>>
>>>>>>> solve a
>>>>
>>>>> variety of problems, but it does have a downside: that the
>>>>>>>>
>>>>>>> producer
>>>
>>>> won't
>>>>>>
>>>>>>> know whether or not K1 was actually in P1, it just knows that K1
>>>>>>>>
>>>>>>> was
>>>>
>>>>> in
>>>>>
>>>>>> P1's keyspace before the new epoch. Therefore, it will have to
>>>>>>>> pessimistically send (K1,null) to P1 just in case. But the next
>>>>>>>>
>>>>>>> time
>>>>
>>>>> K1
>>>>>
>>>>>> comes along, the producer *also* won't remember that it already
>>>>>>>>
>>>>>>> retracted
>>>>>>
>>>>>>> K1 from P1, so it will have to send (K1,null) *again*. By
>>>>>>>>
>>>>>>> extension,
>>>>
>>>>> every
>>>>>>>
>>>>>>>> time the producer sends to P2, it will also have to send a
>>>>>>>>
>>>>>>> tombstone
>>>>
>>>>> to
>>>>>
>>>>>> P1,
>>>>>>>
>>>>>>>> which is a pretty big burden. To make the situation worse, if
>>>>>>>>
>>>>>>> there
>>>
>>>> is
>>>>>
>>>>>> a
>>>>>>
>>>>>>> second split, say P2 becomes P2 and P3, then any key Kx belonging
>>>>>>>>
>>>>>>> to
>>>>
>>>>> P3
>>>>>
>>>>>> will also have to be retracted from P2 *and* P1, since the
>>>>>>>>
>>>>>>> producer
>>>
>>>> can't
>>>>>>
>>>>>>> know whether Kx had been last written to P2 or P1. Over a long
>>>>>>>>
>>>>>>> period
>>>>
>>>>> of
>>>>>>
>>>>>>> time, this clearly becomes a issue, as the producer must send an
>>>>>>>>
>>>>>>> arbitrary
>>>>>>>
>>>>>>>> number of retractions along with every update.
>>>>>>>>
>>>>>>>> In contrast, the proposed backfill operation has an end, and
>>>>>>>>
>>>>>>> after
>>>
>>>> it
>>>>
>>>>> ends,
>>>>>>>
>>>>>>>> everyone can afford to forget that there ever was a different
>>>>>>>>
>>>>>>> partition
>>>>>
>>>>>> layout.
>>>>>>>>
>>>>>>>> Really, though, figuring out how to split compacted topics is
>>>>>>>>
>>>>>>> beyond
>>>>
>>>>> the
>>>>>>
>>>>>>> scope of KIP-253, so I'm not sure #9 really even needs to be in
>>>>>>>>
>>>>>>> this
>>>>
>>>>> KIP...
>>>>>>>
>>>>>>>> We do need in-order delivery during partition expansion. It would
>>>>>>>>
>>>>>>> be
>>>>
>>>>> fine
>>>>>>
>>>>>>> by me to say that you *cannot* expand partitions of a
>>>>>>>>
>>>>>>> log-compacted
>>>
>>>> topic
>>>>>>
>>>>>>> and call it a day. I think it would be better to tackle that in
>>>>>>>>
>>>>>>> another
>>>>>
>>>>>> KIP.
>>>>>>>>
>>>>>>>>
>>>>>>>> Point 2:
>>>>>>>> Regarding whether the consumer re-shuffles its inputs, this is
>>>>>>>>
>>>>>>> always
>>>>
>>>>> on
>>>>>>
>>>>>>> the table; any consumer who wants to re-shuffle its input is free
>>>>>>>>
>>>>>>> to
>>>>
>>>>> do
>>>>>
>>>>>> so.
>>>>>>>
>>>>>>>> But this is currently not required. It's just that the current
>>>>>>>>
>>>>>>> high-level
>>>>>>
>>>>>>> story with Kafka encourages the use of partitions as a unit of
>>>>>>>>
>>>>>>> concurrency.
>>>>>>>
>>>>>>>> As long as consumers are single-threaded, they can happily
>>>>>>>>
>>>>>>> consume
>>>
>>>> a
>>>>
>>>>> single
>>>>>>>
>>>>>>>> partition without concurrency control of any kind. This is a key
>>>>>>>>
>>>>>>> aspect
>>>>>
>>>>>> to
>>>>>>>
>>>>>>>> this system that lets folks design high-throughput systems on top
>>>>>>>>
>>>>>>> of
>>>>
>>>>> it
>>>>>
>>>>>> surprisingly easily. If all consumers were instead
>>>>>>>>
>>>>>>> encouraged/required
>>>>>
>>>>>> to
>>>>>>
>>>>>>> implement a repartition of their own, then the consumer becomes
>>>>>>>> significantly more complex, requiring either the consumer to
>>>>>>>>
>>>>>>> first
>>>
>>>> produce
>>>>>>>
>>>>>>>> to its own intermediate repartition topic or to ensure that
>>>>>>>>
>>>>>>> consumer
>>>>
>>>>> threads have a reliable, high-bandwith channel of communication
>>>>>>>>
>>>>>>> with
>>>>
>>>>> every
>>>>>>>
>>>>>>>> other consumer thread.
>>>>>>>>
>>>>>>>> Either of those tradeoffs may be reasonable for a particular user
>>>>>>>>
>>>>>>> of
>>>>
>>>>> Kafka,
>>>>>>>
>>>>>>>> but I don't know if we're in a position to say that they are
>>>>>>>>
>>>>>>> reasonable
>>>>>
>>>>>> for
>>>>>>>
>>>>>>>> *every* user of Kafka.
>>>>>>>>
>>>>>>>>
>>>>>>>> Point 3:
>>>>>>>> Regarding Jun's point about this use case, "(3) stateful and
>>>>>>>>
>>>>>>> maintaining
>>>>>>
>>>>>>> the
>>>>>>>> states in a local store", I agree that they may use a framework
>>>>>>>>
>>>>>>> *like*
>>>>>
>>>>>> Kafka Streams, but that is not the same as using Kafka Streams.
>>>>>>>>
>>>>>>> This
>>>>
>>>>> is
>>>>>
>>>>>> why
>>>>>>>
>>>>>>>> I think it's better to solve it in Core: because it is then
>>>>>>>>
>>>>>>> solved
>>>
>>>> for
>>>>>
>>>>>> KStreams and also for everything else that facilitates local
>>>>>>>>
>>>>>>> state
>>>
>>>> maintenance. To me, Streams is a member of the category of
>>>>>>>>
>>>>>>> "stream
>>>
>>>> processing frameworks", which is itself a subcategory of "things
>>>>>>>>
>>>>>>> requiring
>>>>>>>
>>>>>>>> local state maintenence". I'm not sure if it makes sense to
>>>>>>>
>>>>>>>

Reply via email to