Hi, Jan, Dong, John, Guozhang, Perhaps it will be useful to have a KIP meeting to discuss this together as a group. Would Apr. 9 (Monday) at 9:00am PDT work? If so, I will send out an invite to the mailing list.
Thanks, Jun On Wed, Apr 4, 2018 at 1:25 AM, Jan Filipiak <jan.filip...@trivago.com> wrote: > Want to quickly step in here again because it is going places again. > > The last part of the discussion is just a pain to read and completely > diverged from what I suggested without making the reasons clear to me. > > I don't know why this happens.... here are my comments anyway. > > @Guozhang: That Streams is working on automatic creating > copartition-usuable topics: great for streams, has literally nothing todo > with the KIP as we want to grow the > input topic. Everyone can reshuffle rel. easily but that is not what we > need todo, we need to grow the topic in question. After streams > automatically reshuffled the input topic > still has the same size and it didn't help a bit. I fail to see why this > is relevant. What am i missing here? > > @Dong > I am still on the position that the current proposal brings us into the > wrong direction. Especially introducing PartitionKeyRebalanceListener > From this point we can never move away to proper state full handling > without completely deprecating this creature from hell again. > Linear hashing is not the optimising step we have todo here. An interface > that when a topic is a topic its always the same even after it had > grown or shrunk is important. So from my POV I have major concerns that > this KIP is benefitial in its current state. > > What is it that makes everyone so addicted to the idea of linear hashing? > not attractive at all for me. > And with statefull consumers still a complete mess. Why not stick with the > Kappa architecture??? > > > > > > On 03.04.2018 17:38, Dong Lin wrote: > >> Hey John, >> >> Thanks much for your comments!! >> >> I have yet to go through the emails of John/Jun/Guozhang in detail. But >> let >> me present my idea for how to minimize the delay for state loading for >> stream use-case. >> >> For ease of understanding, let's assume that the initial partition number >> of input topics and change log topic are both 10. And initial number of >> stream processor is also 10. If we only increase initial partition number >> of input topics to 15 without changing number of stream processor, the >> current KIP already guarantees in-order delivery and no state needs to be >> moved between consumers for stream use-case. Next, let's say we want to >> increase the number of processor to expand the processing capacity for >> stream use-case. This requires us to move state between processors which >> will take time. Our goal is to minimize the impact (i.e. delay) for >> processing while we increase the number of processors. >> >> Note that stream processor generally includes both consumer and producer. >> In addition to consume from the input topic, consumer may also need to >> consume from change log topic on startup for recovery. And producer may >> produce state to the change log topic. >> >> > The solution will include the following steps: >> >> 1) Increase partition number of the input topic from 10 to 15. Since the >> messages with the same key will still go to the same consume before and >> after the partition expansion, this step can be done without having to >> move >> state between processors. >> >> 2) Increase partition number of the change log topic from 10 to 15. Note >> that this step can also be done without impacting existing workflow. After >> we increase partition number of the change log topic, key space may split >> and some key will be produced to the newly-added partition. But the same >> key will still go to the same processor (i.e. consumer) before and after >> the partition. Thus this step can also be done without having to move >> state >> between processors. >> >> 3) Now, let's add 5 new consumers whose groupId is different from the >> existing processor's groupId. Thus these new consumers will not impact >> existing workflow. Each of these new consumers should consume two >> partitions from the earliest offset, where these two partitions are the >> same partitions that will be consumed if the consumers have the same >> groupId as the existing processor's groupId. For example, the first of the >> five consumers will consume partition 0 and partition 10. The purpose of >> these consumers is to rebuild the state (e.g. RocksDB) for the processors >> in advance. Also note that, by design of the current KIP, each consume >> will >> consume the existing partition of the change log topic up to the offset >> before the partition expansion. Then they will only need to consume the >> state of the new partition of the change log topic. >> >> 4) After consumers have caught up in step 3), we should stop these >> consumers and add 5 new processors to the stream processing job. These 5 >> new processors should run in the same location as the previous 5 consumers >> to re-use the state (e.g. RocksDB). And these processors' consumers should >> consume partitions of the change log topic from the committed offset the >> previous 5 consumers so that no state is missed. >> >> One important trick to note here is that, the mapping from partition to >> consumer should also use linear hashing. And we need to remember the >> initial number of processors in the job, 10 in this example, and use this >> number in the linear hashing algorithm. This is pretty much the same as >> how >> we use linear hashing to map key to partition. In this case, we get an >> identity map from partition -> processor, for both input topic and the >> change log topic. For example, processor 12 will consume partition 12 of >> the input topic and produce state to the partition 12 of the change log >> topic. >> >> There are a few important properties of this solution to note: >> >> - We can increase the number of partitions for input topic and the change >> log topic in any order asynchronously. >> - The expansion of the processors in a given job in step 4) only requires >> the step 3) for the same job. It does not require coordination across >> different jobs for step 3) and 4). Thus different jobs can independently >> expand there capacity without waiting for each other. >> - The logic for 1) and 2) is already supported in the current KIP. The >> logic for 3) and 4) appears to be independent of the core Kafka logic and >> can be implemented separately outside core Kafka. Thus the current KIP is >> probably sufficient after we agree on the efficiency and the correctness >> of >> the solution. We can have a separate KIP for Kafka Stream to support 3) >> and >> 4). >> >> >> Cheers, >> Dong >> >> >> On Mon, Apr 2, 2018 at 3:25 PM, Guozhang Wang <wangg...@gmail.com> wrote: >> >> Hey guys, just sharing my two cents here (I promise it will be shorter >>> than >>> John's article :). >>> >>> 0. Just to quickly recap, the main discussion point now is how to support >>> "key partitioning preservation" (John's #4 in topic characteristics >>> above) >>> beyond the "single-key ordering preservation" that KIP-253 was originally >>> proposed to maintain (John's #6 above). >>> >>> 1. From the streams project, we are actively working on improving the >>> elastic scalability of the library. One of the key features is to >>> decouple >>> the input topics from the parallelism model of Streams: i.e. not >>> enforcing >>> the topic to be partitioned by the key, not enforcing joining topics to >>> be >>> co-partitioned, not relying the number of parallel tasks on the input >>> topic >>> partitions. This can be achieved by re-shuffling on the input topics to >>> make sure key-partitioning / co-partitioning on the internal topics. Note >>> the re-shuffling task is purely stateless and hence does not require "key >>> partitioning preservation". Operational-wise it is similar to the >>> "creating >>> a new topic with new number of partitions, pipe the data to the new topic >>> and cut over consumers from old topics" idea, just that users can >>> optionally let Streams to handle such rather than doing it manually >>> themselves. There are a few more details on that regard but I will skip >>> since they are not directly related to this discussion. >>> >>> 2. Assuming that 1) above is done, then the only topics involved in the >>> scaling events are all input topics. For these topics the only producers >>> / >>> consumers of these topics are controlled by Streams clients themselves, >>> and >>> hence achieving "key partitioning preservation" is simpler than >>> non-Streams >>> scenarios: consumers know the partitioning scheme that producers are >>> using, >>> so that for their stateful operations it is doable to split the local >>> state >>> stores accordingly or execute backfilling on its own. Of course, if we >>> decide to do server-side backfilling, it can still help Streams to >>> directly >>> rely on that functionality. >>> >>> 3. As John mentioned, another way inside Streams is to do >>> over-partitioning >>> on all internal topics; then with 1) Streams would not rely on KIP-253 at >>> all. But personally I'd like to avoid it if possible to reduce Kafka side >>> footprint: say we overpartition each input topic up to 1k, with a >>> reasonable sized stateful topology it can still contribute to tens of >>> thousands of topics to the topic partition capacity of a single cluster. >>> >>> 4. Summing up 1/2/3, I think we should focus more on non-Streams users >>> writing their stateful computations with local states, and think whether >>> / >>> how we could enable "key partitioning preservation" for them easily, than >>> to think heavily for Streams library. People may have different opinion >>> on >>> how common of a usage pattern it is (I think Jun might be suggesting that >>> for DIY apps people may more likely use remote states so that it is not a >>> problem for them). My opinion is that for non-Streams users such usage >>> pattern could still be large (think: if you are piping data from Kafka to >>> an external data storage which has single-writer requirements for each >>> single shard, even though it is not a stateful computational application >>> it >>> may still require "key partitioning preservation"), so I prefer to have >>> backfilling in our KIP than only exposing the API for expansion and >>> requires consumers to have pre-knowledge of the producer's partitioning >>> scheme. >>> >>> >>> >>> Guozhang >>> >>> >>> >>> On Thu, Mar 29, 2018 at 2:33 PM, John Roesler <j...@confluent.io> wrote: >>> >>> Hey Dong, >>>> >>>> Congrats on becoming a committer!!! >>>> >>>> Since I just sent a novel-length email, I'll try and keep this one brief >>>> >>> ;) >>> >>>> Regarding producer coordination, I'll grant that in that case, producers >>>> may coordinate among themselves to produce into the same topic or to >>>> produce co-partitioned topics. Nothing in KStreams or the Kafka >>>> ecosystem >>>> in general requires such coordination for correctness or in fact for any >>>> optional features, though, so I would not say that we require producer >>>> coordination of partition logic. If producers currently coordinate, it's >>>> completely optional and their own choice. >>>> >>>> Regarding the portability of partition algorithms, my observation is >>>> that >>>> systems requiring independent implementations of the same algorithm with >>>> 100% correctness are a large source of risk and also a burden on those >>>> >>> who >>> >>>> have to maintain them. If people could flawlessly implement algorithms >>>> in >>>> actual software, the world would be a wonderful place indeed! For a >>>> >>> system >>> >>>> as important and widespread as Kafka, I would recommend restricting >>>> limiting such requirements as aggressively as possible. >>>> >>>> I'd agree that we can always revisit decisions like allowing arbitrary >>>> partition functions, but of course, we shouldn't do that in a vacuum. >>>> >>> That >>> >>>> feels like the kind of thing we'd need to proactively seek guidance from >>>> the users list about. I do think that the general approach of saying >>>> that >>>> "if you use a custom partitioner, you cannot do partition expansion" is >>>> very reasonable (but I don't think we need to go that far with the >>>> >>> current >>> >>>> proposal). It's similar to my statement in my email to Jun that in >>>> principle KStreams doesn't *need* backfill, we only need it if we want >>>> to >>>> employ partition expansion. >>>> >>>> I reckon that the main motivation for backfill is to support KStreams >>>> use >>>> cases and also any other use cases involving stateful consumers. >>>> >>>> Thanks for your response, and congrats again! >>>> -John >>>> >>>> >>>> On Wed, Mar 28, 2018 at 1:34 AM, Dong Lin <lindon...@gmail.com> wrote: >>>> >>>> Hey John, >>>>> >>>>> Great! Thanks for all the comment. It seems that we agree that the >>>>> >>>> current >>>> >>>>> KIP is in good shape for core Kafka. IMO, what we have been discussing >>>>> >>>> in >>> >>>> the recent email exchanges is mostly about the second step, i.e. how to >>>>> address problem for the stream use-case (or stateful processing in >>>>> general). >>>>> >>>>> I will comment inline. >>>>> >>>>> >>>>> >>>>> >>>>> On Tue, Mar 27, 2018 at 4:38 PM, John Roesler <j...@confluent.io> >>>>> >>>> wrote: >>> >>>> Thanks for the response, Dong. >>>>>> >>>>>> Here are my answers to your questions: >>>>>> >>>>>> - "Asking producers and consumers, or even two different producers, >>>>>> >>>>> to >>> >>>> share code like the partition function is a pretty huge ask. What >>>>>>> >>>>>> if >>> >>>> they >>>>> >>>>>> are using different languages?". It seems that today we already >>>>>>> >>>>>> require >>>> >>>>> different producer's to use the same hash function -- otherwise >>>>>>> >>>>>> messages >>>>> >>>>>> with the same key will go to different partitions of the same topic >>>>>>> >>>>>> which >>>>> >>>>>> may cause problem for downstream consumption. So not sure if it >>>>>>> >>>>>> adds >>> >>>> any >>>>> >>>>>> more constraint by assuming consumers know the hash function of >>>>>>> >>>>>> producer. >>>>> >>>>>> Could you explain more why user would want to use a cusmtom >>>>>>> >>>>>> partition >>> >>>> function? Maybe we can check if this is something that can be >>>>>>> >>>>>> supported >>>> >>>>> in >>>>>> >>>>>>> the default Kafka hash function. Also, can you explain more why it >>>>>>> >>>>>> is >>> >>>> difficuilt to implement the same hash function in different >>>>>>> >>>>>> languages? >>>> >>>>> >>>>>> Sorry, I meant two different producers as in producers to two >>>>>> >>>>> different >>> >>>> topics. This was in response to the suggestion that we already >>>>>> >>>>> require >>> >>>> coordination among producers to different topics in order to achieve >>>>>> co-partitioning. I was saying that we do not (and should not). >>>>>> >>>>> >>>>> It is probably common for producers of different team to produce >>>>> >>>> message >>> >>>> to >>>> >>>>> the same topic. In order to ensure that messages with the same key go >>>>> >>>> to >>> >>>> same partition, we need producers of different team to share the same >>>>> partition algorithm, which by definition requires coordination among >>>>> producers of different teams in an organization. Even for producers of >>>>> different topics, it may be common to require producers to use the same >>>>> partition algorithm in order to join two topics for stream processing. >>>>> >>>> Does >>>> >>>>> this make it reasonable to say we already require coordination across >>>>> producers? >>>>> >>>>> >>>>> By design, consumers are currently ignorant of the partitioning >>>>>> >>>>> scheme. >>> >>>> It >>>>> >>>>>> suffices to trust that the producer has partitioned the topic by key, >>>>>> >>>>> if >>>> >>>>> they claim to have done so. If you don't trust that, or even if you >>>>>> >>>>> just >>>> >>>>> need some other partitioning scheme, then you must re-partition it >>>>>> yourself. Nothing we're discussing can or should change that. The >>>>>> >>>>> value >>> >>>> of >>>>> >>>>>> backfill is that it preserves the ability for consumers to avoid >>>>>> re-partitioning before consuming, in the case where they don't need >>>>>> >>>>> to >>> >>>> today. >>>>>> >>>>> >>>>> Regarding shared "hash functions", note that it's a bit inaccurate to >>>>>> >>>>> talk >>>>> >>>>>> about the "hash function" of the producer. Properly speaking, the >>>>>> >>>>> producer >>>>> >>>>>> has only a "partition function". We do not know that it is a hash. >>>>>> >>>>> The >>> >>>> producer can use any method at their disposal to assign a partition >>>>>> >>>>> to >>> >>>> a >>>> >>>>> record. The partition function obviously may we written in any >>>>>> >>>>> programming >>>>> >>>>>> language, so in general it's not something that can be shared around >>>>>> without a formal spec or the ability to execute arbitrary executables >>>>>> >>>>> in >>>> >>>>> arbitrary runtime environments. >>>>>> >>>>>> Yeah it is probably better to say partition algorithm. I guess it >>>>> >>>> should >>> >>>> not be difficult to implement same partition algorithms in different >>>>> languages, right? Yes we would need a formal specification of the >>>>> >>>> default >>> >>>> partition algorithm in the producer. I think that can be documented as >>>>> >>>> part >>>> >>>>> of the producer interface. >>>>> >>>>> >>>>> Why would a producer want a custom partition function? I don't >>>>>> >>>>> know... >>> >>>> why >>>>> >>>>>> did we design the interface so that our users can provide one? In >>>>>> >>>>> general, >>>>> >>>>>> such systems provide custom partitioners because some data sets may >>>>>> >>>>> be >>> >>>> unbalanced under the default or because they can provide some >>>>>> >>>>> interesting >>>> >>>>> functionality built on top of the partitioning scheme, etc. Having >>>>>> >>>>> provided >>>>> >>>>>> this ability, I don't know why we would remove it. >>>>>> >>>>>> Yeah it is reasonable to assume that there was reason to support >>>>> custom >>>>> partition function in producer. On the other hand it may also be >>>>> >>>> reasonable >>>> >>>>> to revisit this interface and discuss whether we actually need to >>>>> >>>> support >>> >>>> custom partition function. If we don't have a good reason, we can >>>>> >>>> choose >>> >>>> not to support custom partition function in this KIP in a backward >>>>> compatible manner, i.e. user can still use custom partition function >>>>> >>>> but >>> >>>> they would not get the benefit of in-order delivery when there is >>>>> >>>> partition >>>> >>>>> expansion. What do you think? >>>>> >>>>> >>>>> - Besides the assumption that consumer needs to share the hash >>>>>> >>>>> function >>> >>>> of >>>>> >>>>>> producer, is there other organization overhead of the proposal in >>>>>>> >>>>>> the >>> >>>> current KIP? >>>>>>> >>>>>>> It wasn't clear to me that KIP-253 currently required the producer >>>>>> >>>>> and >>> >>>> consumer to share the partition function, or in fact that it had a >>>>>> >>>>> hard >>> >>>> requirement to abandon the general partition function and use a >>>>>> >>>>> linear >>> >>>> hash >>>>> >>>>>> function instead. >>>>>> >>>>> >>>>> In my reading, there is a requirement to track the metadata about >>>>>> >>>>> what >>> >>>> partitions split into what other partitions during an expansion >>>>>> >>>>> operation. >>>>> >>>>>> If the partition function is linear, this is easy. If not, you can >>>>>> >>>>> always >>>> >>>>> just record that all old partitions split into all new partitions. >>>>>> >>>>> This >>> >>>> has >>>>> >>>>>> the effect of forcing all consumers to wait until the old epoch is >>>>>> completely consumed before starting on the new epoch. But this may >>>>>> >>>>> be a >>> >>>> reasonable tradeoff, and it doesn't otherwise alter your design. >>>>>> >>>>>> You only mention the consumer needing to know that the partition >>>>>> >>>>> function >>>> >>>>> is linear, not what the actual function is, so I don't think your >>>>>> >>>>> design >>>> >>>>> actually calls for sharing the function. Plus, really all the >>>>>> >>>>> consumer >>> >>>> needs is the metadata about what old-epoch partitions to wait for >>>>>> >>>>> before >>>> >>>>> consuming a new-epoch partition. This information is directly >>>>>> >>>>> captured >>> >>>> in >>>> >>>>> metadata, so I don't think it actually even cares whether the >>>>>> >>>>> partition >>> >>>> function is linear or not. >>>>>> >>>>>> You are right that the current KIP does not mention it. My comment >>>>> >>>> related >>>> >>>>> to the partition function coordination was related to support the >>>>> stream-use case which we have been discussing so far. >>>>> >>>>> >>>>> So, no, I really think KIP-253 is in good shape. I was really more >>>>>> >>>>> talking >>>>> >>>>>> about the part of this thread that's outside of KIP-253's scope, >>>>>> >>>>> namely, >>>> >>>>> creating the possibility of backfilling partitions after expansion. >>>>>> >>>>>> Great! Can you also confirm that the main motivation for backfilling >>>>> partitions after expansion is to support the stream use-case? >>>>> >>>>> >>>>> - Currently producer can forget about the message that has been >>>>>> >>>>>>> acknowledged by the broker. Thus the producer probably does not >>>>>>> >>>>>> know >>> >>>> most >>>>> >>>>>> of the exiting messages in topic, including those messages produced >>>>>>> >>>>>> by >>>> >>>>> other producers. We can have the owner of the producer to >>>>>>> >>>>>> split+backfill. >>>>> >>>>>> In my opion it will be a new program that wraps around the existing >>>>>>> producer and consumer classes. >>>>>>> >>>>>>> This sounds fine by me! >>>>>> >>>>>> Really, I was just emphasizing that the part of the organization that >>>>>> produces a topic shouldn't have to export their partition function to >>>>>> >>>>> the >>>> >>>>> part(s) of the organization (or other organizations) that consume the >>>>>> topic. Whether the backfill operation goes into the Producer >>>>>> >>>>> interface >>> >>>> is >>>> >>>>> secondary, I think. >>>>>> >>>>>> - Regarding point 5. The argument is in favor of the split+backfill >>>>>> >>>>> but >>> >>>> for >>>>> >>>>>> changelog topic. And it intends to address the problem for stream >>>>>>> >>>>>> use-case >>>>>> >>>>>>> in general. In this KIP we will provide interface (i.e. >>>>>>> PartitionKeyRebalanceListener in the KIP) to be used by sream >>>>>>> >>>>>> use-case >>>> >>>>> and >>>>>> >>>>>>> the goal is that user can flush/re-consume the state as part of the >>>>>>> interface implementation regardless of whether there is change log >>>>>>> >>>>>> topic. >>>>> >>>>>> Maybe you are suggesting that the main reason to do split+backfill >>>>>>> >>>>>> of >>> >>>> input >>>>>> >>>>>>> topic is to support log compacted topics? You mentioned in Point 1 >>>>>>> >>>>>> that >>>> >>>>> log >>>>>> >>>>>>> compacted topics is out of the scope of this KIP. Maybe I could >>>>>>> >>>>>> understand >>>>>> >>>>>>> your position better. Regarding Jan's proposal to split partitions >>>>>>> >>>>>> with >>>> >>>>> backfill, do you think this should replace the proposal in the >>>>>>> >>>>>> existing >>>> >>>>> KIP, or do you think this is something that we should do in >>>>>>> >>>>>> addition >>> >>>> to >>>> >>>>> the >>>>>> >>>>>>> existing KIP? >>>>>>> >>>>>>> I think that interface is a good/necessary component of KIP-253. >>>>>> >>>>>> I personally (FWIW) feel that KIP-253 is appropriately scoped, but I >>>>>> >>>>> do >>> >>>> think its utility will be limited unless there is a later KIP >>>>>> >>>>> offering >>> >>>> backfill. But, maybe unlike Jan, I think it makes sense to try and >>>>>> >>>>> tackle >>>> >>>>> the ordering problem independently of backfill, so I'm in support of >>>>>> >>>>> the >>>> >>>>> current KIP. >>>>>> >>>>>> - Regarding point 6. I guess we can agree that it is better not to >>>>>> >>>>> have >>> >>>> the >>>>> >>>>>> performance overhread of copying the input data. Before we discuss >>>>>>> >>>>>> more >>>> >>>>> on >>>>>> >>>>>>> whether the performance overhead is acceptable or not, I am trying >>>>>>> >>>>>> to >>> >>>> figure out what is the benefit of introducing this overhread. You >>>>>>> >>>>>> mentioned >>>>>> >>>>>>> that the benefit is the loose organizational coupling. By >>>>>>> >>>>>> "organizational >>>>> >>>>>> coupling", are you referring to the requirement that consumer needs >>>>>>> >>>>>> to >>>> >>>>> know >>>>>> >>>>>>> the hash function of producer? If so, maybe we can discuss the >>>>>>> >>>>>> use-case >>>> >>>>> of >>>>>> >>>>>>> custom partiton function and see whether we can find a way to >>>>>>> >>>>>> support >>> >>>> such >>>>>> >>>>>>> use-case without having to copy the input data. >>>>>>> >>>>>>> I'm not too sure about what an "input" is in this sense, since we are >>>>>> >>>>> just >>>>> >>>>>> talking about topics. Actually the point I was making there is that >>>>>> >>>>> AKAICT >>>>> >>>>>> the performance overhead of a backfill is less than any other option, >>>>>> assuming you split partitions rarely. >>>>>> >>>>>> By "input" I was referring to source Kafka topic of a stream >>>>> processing >>>>> job. >>>>> >>>>> >>>>> Separately, yes, "organizational coupling" increases if producers and >>>>>> consumers have to share code, such as the partition function. This >>>>>> >>>>> would >>>> >>>>> not be the case if producers could only pick from a menu of a few >>>>>> well-known partition functions, but I think this is a poor tradeoff. >>>>>> >>>>>> Maybe we can revisit the custom partition function and see whether we >>>>> actually need it? Otherwise, I am concerned that every user will pay >>>>> >>>> the >>> >>>> overhead of data movement to support something that was not really >>>>> >>>> needed >>> >>>> for most users. >>>>> >>>>> >>>>> To me, this is two strong arguments in favor of backfill being less >>>>>> expensive than no backfill, but again, I think that particular debate >>>>>> >>>>> comes >>>>> >>>>>> after KIP-253, so I don't want to create the impression of opposition >>>>>> >>>>> to >>>> >>>>> your proposal. >>>>>> >>>>>> >>>>>> Finally, to respond to a new email I just noticed: >>>>>> >>>>>> BTW, here is my understanding of the scope of this KIP. We want to >>>>>>> >>>>>> allow >>>>> >>>>>> consumers to always consume messages with the same key from the >>>>>>> >>>>>> same >>> >>>> producer in the order they are produced. And we need to provide a >>>>>>> >>>>>> way >>> >>>> for >>>>> >>>>>> stream use-case to be able to flush/load state when messages with >>>>>>> >>>>>> the >>> >>>> same >>>>>> >>>>>>> key are migrated between consumers. In addition to ensuring that >>>>>>> >>>>>> this >>> >>>> goal >>>>>> >>>>>>> is correctly supported, we should do our best to keep the >>>>>>> >>>>>> performance >>> >>>> and >>>>> >>>>>> organization overhead of this KIP as low as possible. >>>>>>> >>>>>>> I think we're on the same page there! In fact, I would generalize a >>>>>> >>>>> little >>>>> >>>>>> more and say that the mechanism you've designed provides *all >>>>>> >>>>> consumers* >>>> >>>>> the ability "to flush/load state when messages with the same key are >>>>>> migrated between consumers", not just Streams. >>>>>> >>>>>> Thanks for all the comment! >>>>> >>>>> >>>>> Thanks for the discussion, >>>>>> -John >>>>>> >>>>>> >>>>>> >>>>>> On Tue, Mar 27, 2018 at 3:14 PM, Dong Lin <lindon...@gmail.com> >>>>>> >>>>> wrote: >>> >>>> Hey John, >>>>>>> >>>>>>> Thanks much for the detailed comments. Here are my thoughts: >>>>>>> >>>>>>> - The need to delete messages from log compacted topics is mainly >>>>>>> >>>>>> for >>> >>>> performance (e.g. storage space) optimization than for correctness >>>>>>> >>>>>> for >>>> >>>>> this >>>>>> >>>>>>> KIP. I agree that we probably don't need to focus on this in our >>>>>>> >>>>>> discussion >>>>>> >>>>>>> since it is mostly for performance optimization. >>>>>>> >>>>>>> - "Asking producers and consumers, or even two different producers, >>>>>>> >>>>>> to >>>> >>>>> share code like the partition function is a pretty huge ask. What >>>>>>> >>>>>> if >>> >>>> they >>>>> >>>>>> are using different languages?". It seems that today we already >>>>>>> >>>>>> require >>>> >>>>> different producer's to use the same hash function -- otherwise >>>>>>> >>>>>> messages >>>>> >>>>>> with the same key will go to different partitions of the same topic >>>>>>> >>>>>> which >>>>> >>>>>> may cause problem for downstream consumption. So not sure if it >>>>>>> >>>>>> adds >>> >>>> any >>>>> >>>>>> more constraint by assuming consumers know the hash function of >>>>>>> >>>>>> producer. >>>>> >>>>>> Could you explain more why user would want to use a cusmtom >>>>>>> >>>>>> partition >>> >>>> function? Maybe we can check if this is something that can be >>>>>>> >>>>>> supported >>>> >>>>> in >>>>>> >>>>>>> the default Kafka hash function. Also, can you explain more why it >>>>>>> >>>>>> is >>> >>>> difficuilt to implement the same hash function in different >>>>>>> >>>>>> languages? >>>> >>>>> - Besides the assumption that consumer needs to share the hash >>>>>>> >>>>>> function >>>> >>>>> of >>>>>> >>>>>>> producer, is there other organization overhead of the proposal in >>>>>>> >>>>>> the >>> >>>> current KIP? >>>>>>> >>>>>>> - Currently producer can forget about the message that has been >>>>>>> acknowledged by the broker. Thus the producer probably does not >>>>>>> >>>>>> know >>> >>>> most >>>>> >>>>>> of the exiting messages in topic, including those messages produced >>>>>>> >>>>>> by >>>> >>>>> other producers. We can have the owner of the producer to >>>>>>> >>>>>> split+backfill. >>>>> >>>>>> In my opion it will be a new program that wraps around the existing >>>>>>> producer and consumer classes. >>>>>>> >>>>>>> - Regarding point 5. The argument is in favor of the split+backfill >>>>>>> >>>>>> but >>>> >>>>> for >>>>>> >>>>>>> changelog topic. And it intends to address the problem for stream >>>>>>> >>>>>> use-case >>>>>> >>>>>>> in general. In this KIP we will provide interface (i.e. >>>>>>> PartitionKeyRebalanceListener in the KIP) to be used by sream >>>>>>> >>>>>> use-case >>>> >>>>> and >>>>>> >>>>>>> the goal is that user can flush/re-consume the state as part of the >>>>>>> interface implementation regardless of whether there is change log >>>>>>> >>>>>> topic. >>>>> >>>>>> Maybe you are suggesting that the main reason to do split+backfill >>>>>>> >>>>>> of >>> >>>> input >>>>>> >>>>>>> topic is to support log compacted topics? You mentioned in Point 1 >>>>>>> >>>>>> that >>>> >>>>> log >>>>>> >>>>>>> compacted topics is out of the scope of this KIP. Maybe I could >>>>>>> >>>>>> understand >>>>>> >>>>>>> your position better. Regarding Jan's proposal to split partitions >>>>>>> >>>>>> with >>>> >>>>> backfill, do you think this should replace the proposal in the >>>>>>> >>>>>> existing >>>> >>>>> KIP, or do you think this is something that we should do in >>>>>>> >>>>>> addition >>> >>>> to >>>> >>>>> the >>>>>> >>>>>>> existing KIP? >>>>>>> >>>>>>> - Regarding point 6. I guess we can agree that it is better not to >>>>>>> >>>>>> have >>>> >>>>> the >>>>>> >>>>>>> performance overhread of copying the input data. Before we discuss >>>>>>> >>>>>> more >>>> >>>>> on >>>>>> >>>>>>> whether the performance overhead is acceptable or not, I am trying >>>>>>> >>>>>> to >>> >>>> figure out what is the benefit of introducing this overhread. You >>>>>>> >>>>>> mentioned >>>>>> >>>>>>> that the benefit is the loose organizational coupling. By >>>>>>> >>>>>> "organizational >>>>> >>>>>> coupling", are you referring to the requirement that consumer needs >>>>>>> >>>>>> to >>>> >>>>> know >>>>>> >>>>>>> the hash function of producer? If so, maybe we can discuss the >>>>>>> >>>>>> use-case >>>> >>>>> of >>>>>> >>>>>>> custom partiton function and see whether we can find a way to >>>>>>> >>>>>> support >>> >>>> such >>>>>> >>>>>>> use-case without having to copy the input data. >>>>>>> >>>>>>> Thanks, >>>>>>> Dong >>>>>>> >>>>>>> >>>>>>> On Tue, Mar 27, 2018 at 11:34 AM, John Roesler <j...@confluent.io> >>>>>>> >>>>>> wrote: >>>>>> >>>>>>> Hey Dong and Jun, >>>>>>>> >>>>>>>> Thanks for the thoughtful responses. If you don't mind, I'll mix >>>>>>>> >>>>>>> my >>> >>>> replies >>>>>>> >>>>>>>> together to try for a coherent response. I'm not too familiar >>>>>>>> >>>>>>> with >>> >>>> mailing-list etiquette, though. >>>>>>>> >>>>>>>> I'm going to keep numbering my points because it makes it easy >>>>>>>> >>>>>>> for >>> >>>> you >>>>> >>>>>> all >>>>>>> >>>>>>>> to respond. >>>>>>>> >>>>>>>> Point 1: >>>>>>>> As I read it, KIP-253 is *just* about properly fencing the >>>>>>>> >>>>>>> producers >>>> >>>>> and >>>>>> >>>>>>> consumers so that you preserve the correct ordering of records >>>>>>>> >>>>>>> during >>>> >>>>> partition expansion. This is clearly necessary regardless of >>>>>>>> >>>>>>> anything >>>> >>>>> else >>>>>>> >>>>>>>> we discuss. I think this whole discussion about backfill, >>>>>>>> >>>>>>> consumers, >>>> >>>>> streams, etc., is beyond the scope of KIP-253. But it would be >>>>>>>> >>>>>>> cumbersome >>>>>> >>>>>>> to start a new thread at this point. >>>>>>>> >>>>>>>> I had missed KIP-253's Proposed Change #9 among all the >>>>>>>> >>>>>>> details... >>> >>>> I >>>> >>>>> think >>>>>>> >>>>>>>> this is a nice addition to the proposal. One thought is that it's >>>>>>>> >>>>>>> actually >>>>>>> >>>>>>>> irrelevant whether the hash function is linear. This is simply an >>>>>>>> >>>>>>> algorithm >>>>>>> >>>>>>>> for moving a key from one partition to another, so the type of >>>>>>>> >>>>>>> hash >>> >>>> function need not be a precondition. In fact, it also doesn't >>>>>>>> >>>>>>> matter >>>> >>>>> whether the topic is compacted or not, the algorithm works >>>>>>>> >>>>>>> regardless. >>>>> >>>>>> I think this is a good algorithm to keep in mind, as it might >>>>>>>> >>>>>>> solve a >>>> >>>>> variety of problems, but it does have a downside: that the >>>>>>>> >>>>>>> producer >>> >>>> won't >>>>>> >>>>>>> know whether or not K1 was actually in P1, it just knows that K1 >>>>>>>> >>>>>>> was >>>> >>>>> in >>>>> >>>>>> P1's keyspace before the new epoch. Therefore, it will have to >>>>>>>> pessimistically send (K1,null) to P1 just in case. But the next >>>>>>>> >>>>>>> time >>>> >>>>> K1 >>>>> >>>>>> comes along, the producer *also* won't remember that it already >>>>>>>> >>>>>>> retracted >>>>>> >>>>>>> K1 from P1, so it will have to send (K1,null) *again*. By >>>>>>>> >>>>>>> extension, >>>> >>>>> every >>>>>>> >>>>>>>> time the producer sends to P2, it will also have to send a >>>>>>>> >>>>>>> tombstone >>>> >>>>> to >>>>> >>>>>> P1, >>>>>>> >>>>>>>> which is a pretty big burden. To make the situation worse, if >>>>>>>> >>>>>>> there >>> >>>> is >>>>> >>>>>> a >>>>>> >>>>>>> second split, say P2 becomes P2 and P3, then any key Kx belonging >>>>>>>> >>>>>>> to >>>> >>>>> P3 >>>>> >>>>>> will also have to be retracted from P2 *and* P1, since the >>>>>>>> >>>>>>> producer >>> >>>> can't >>>>>> >>>>>>> know whether Kx had been last written to P2 or P1. Over a long >>>>>>>> >>>>>>> period >>>> >>>>> of >>>>>> >>>>>>> time, this clearly becomes a issue, as the producer must send an >>>>>>>> >>>>>>> arbitrary >>>>>>> >>>>>>>> number of retractions along with every update. >>>>>>>> >>>>>>>> In contrast, the proposed backfill operation has an end, and >>>>>>>> >>>>>>> after >>> >>>> it >>>> >>>>> ends, >>>>>>> >>>>>>>> everyone can afford to forget that there ever was a different >>>>>>>> >>>>>>> partition >>>>> >>>>>> layout. >>>>>>>> >>>>>>>> Really, though, figuring out how to split compacted topics is >>>>>>>> >>>>>>> beyond >>>> >>>>> the >>>>>> >>>>>>> scope of KIP-253, so I'm not sure #9 really even needs to be in >>>>>>>> >>>>>>> this >>>> >>>>> KIP... >>>>>>> >>>>>>>> We do need in-order delivery during partition expansion. It would >>>>>>>> >>>>>>> be >>>> >>>>> fine >>>>>> >>>>>>> by me to say that you *cannot* expand partitions of a >>>>>>>> >>>>>>> log-compacted >>> >>>> topic >>>>>> >>>>>>> and call it a day. I think it would be better to tackle that in >>>>>>>> >>>>>>> another >>>>> >>>>>> KIP. >>>>>>>> >>>>>>>> >>>>>>>> Point 2: >>>>>>>> Regarding whether the consumer re-shuffles its inputs, this is >>>>>>>> >>>>>>> always >>>> >>>>> on >>>>>> >>>>>>> the table; any consumer who wants to re-shuffle its input is free >>>>>>>> >>>>>>> to >>>> >>>>> do >>>>> >>>>>> so. >>>>>>> >>>>>>>> But this is currently not required. It's just that the current >>>>>>>> >>>>>>> high-level >>>>>> >>>>>>> story with Kafka encourages the use of partitions as a unit of >>>>>>>> >>>>>>> concurrency. >>>>>>> >>>>>>>> As long as consumers are single-threaded, they can happily >>>>>>>> >>>>>>> consume >>> >>>> a >>>> >>>>> single >>>>>>> >>>>>>>> partition without concurrency control of any kind. This is a key >>>>>>>> >>>>>>> aspect >>>>> >>>>>> to >>>>>>> >>>>>>>> this system that lets folks design high-throughput systems on top >>>>>>>> >>>>>>> of >>>> >>>>> it >>>>> >>>>>> surprisingly easily. If all consumers were instead >>>>>>>> >>>>>>> encouraged/required >>>>> >>>>>> to >>>>>> >>>>>>> implement a repartition of their own, then the consumer becomes >>>>>>>> significantly more complex, requiring either the consumer to >>>>>>>> >>>>>>> first >>> >>>> produce >>>>>>> >>>>>>>> to its own intermediate repartition topic or to ensure that >>>>>>>> >>>>>>> consumer >>>> >>>>> threads have a reliable, high-bandwith channel of communication >>>>>>>> >>>>>>> with >>>> >>>>> every >>>>>>> >>>>>>>> other consumer thread. >>>>>>>> >>>>>>>> Either of those tradeoffs may be reasonable for a particular user >>>>>>>> >>>>>>> of >>>> >>>>> Kafka, >>>>>>> >>>>>>>> but I don't know if we're in a position to say that they are >>>>>>>> >>>>>>> reasonable >>>>> >>>>>> for >>>>>>> >>>>>>>> *every* user of Kafka. >>>>>>>> >>>>>>>> >>>>>>>> Point 3: >>>>>>>> Regarding Jun's point about this use case, "(3) stateful and >>>>>>>> >>>>>>> maintaining >>>>>> >>>>>>> the >>>>>>>> states in a local store", I agree that they may use a framework >>>>>>>> >>>>>>> *like* >>>>> >>>>>> Kafka Streams, but that is not the same as using Kafka Streams. >>>>>>>> >>>>>>> This >>>> >>>>> is >>>>> >>>>>> why >>>>>>> >>>>>>>> I think it's better to solve it in Core: because it is then >>>>>>>> >>>>>>> solved >>> >>>> for >>>>> >>>>>> KStreams and also for everything else that facilitates local >>>>>>>> >>>>>>> state >>> >>>> maintenance. To me, Streams is a member of the category of >>>>>>>> >>>>>>> "stream >>> >>>> processing frameworks", which is itself a subcategory of "things >>>>>>>> >>>>>>> requiring >>>>>>> >>>>>>>> local state maintenence". I'm not sure if it makes sense to >>>>>>> >>>>>>>