Hey Jeff,

Thanks for the review. The scheme for expanding processors of the stateful
processing job is described in "Support processor expansion" section in
KIP-287 (link
<https://cwiki.apache.org/confluence/display/KAFKA/KIP-287%3A+Support+partition+and+processor+expansion+for+stateful+processing+jobs#KIP-287:Supportpartitionandprocessorexpansionforstatefulprocessingjobs-Supportprocessorexpansion>).
In particular, we will expand the partitions of the input topics before
expanding processors of the processing job. While the new processor is
catching up, the producer would already be producing using the new
partitioning scheme. Does this answer your question?

Thanks,
Dong


On Sat, Apr 14, 2018 at 1:44 PM, Jeff Chao <jc...@salesforce.com> wrote:

> Hi, I had a question from Monday's meeting. The current mechanism, as Ray's
> notes points out, is to create a copy consumer in which a switch over
> happens when it catches up. Meanwhile, it looks like a producer would still
> be writing using the old partitioning scheme. Wouldn't there be a case
> where the copy consumer never catches up given a high enough produce rate?
> I'm imagining this to work similarly to Kinesis's resharding mechanism,
> though I may be missing something from the current proposal. Anyway, with
> Kinesis as an example, this would mean a producer would stop writing using
> the old scheme and start writing using the new scheme. That way, the
> consumer will be able to catch up in which it will start consuming using
> the new scheme afterwards.
>
> Thanks,
> Jeff Chao
>
> On Sat, Apr 14, 2018 at 6:44 AM, Dong Lin <lindon...@gmail.com> wrote:
>
> > Thanks for the notes by Jun and Ray. I have read through the notes. It
> > seems that there are a few questions for the alternative solution by Jan
> > and maybe Jan will answer these questions later?
> >
> > I have summarized the solution, which I previously provided in this
> thread,
> > in KIP-287, to hopefully show that we may have a promising direction to
> > solve the partition and processor expansion problem for stateful
> processing
> > jobs. Maybe we can continue the discussion related to stateful processing
> > in that discussion thread. If that looks good, maybe we can conclude the
> > discussion for KIP-253.
> >
> > Not sure if I missed some ideas or questions in the notes. Is there
> > specific concern for the latest KIP-253?
> >
> >
> > On Mon, Apr 9, 2018 at 11:18 AM, Ray Chiang <rchi...@apache.org> wrote:
> >
> > > My notes from today's meeting.  Sorry if I got anyone's name wrong.
> Plus
> > I
> > > missed a few moments with noise at home and/or dropped video.
> > >
> > > -Ray
> > >
> > > =====
> > >
> > > KIP-253 Discussion
> > >
> > > - Currently, adding partitions can cause keys to be read out-of-order.
> > >   This KIP is trying to preserve the key ordering when adding
> partitions.
> > >
> > > - State management in applications (i.e. Kafka Streams) can maintain
> > >   local state via caching.  If the number of partitions changes, how
> > >   would those applications update their local state.  This is the
> current
> > >   point of discussion/disagreement.
> > >
> > > - Jan Filipiak is mostly worried about log compacted topics.  Not as
> > >   concerned about producer swapping.  Worried about the consumer design
> > is
> > >   a bit contradictory compared to the architecture.
> > >
> > >   Current design is to start up a new consumer in parallel with old
> > >   topic/consumer.  Run until consumer finishes "copying" to the new
> > topic.
> > >   Once the consumer is caught up, point the producer at the new topic.
> > >
> > >   Would like to have this technique as a "core primitive" to Kafka.
> > >   - Is this a useful primitive?
> > >   - What's the best way to support it?
> > >
> > >   - Topic expansion as it currently exists just "adds partitions". But
> > >     how does this affect bootstrapping applications?  How to deal with
> > >     "moved" (from "old partition" to "new expanded partition") keys?
> > >
> > >   - Dong's proposal example.  10 partitions growing to 15.  5 of the
> > >     first 10 partitions are split into 2 each.  Say Kafka remembers
> > >     parent->child relationship.  Then for each parent partition, there
> > >     are two child partitions.  Initially, there were 10 states to
> > >     manage.  Then bootstrapping new application would have 15 states.
> > >     Need to know which "generation" of partition you are consuming
> > >     from.  Until you get to "newer" generation of data, then the keys
> > >     will be find (i.e. reading from old partition).
> > >
> > >   - Scheme works well for transient events.  Any stateful processor
> will
> > >     likely break.
> > >
> > >   - Tracking can become extremely complicated, since each split
> requires
> > >     potentially more and more offset/partition combos.
> > >
> > >   - Need to support continuation for consumers to read the new
> > partitions.
> > >
> > >   - With linear hashing, integral multiple increase (2x, 3x, 4x, etc).
> > >     Easier mapping from old partition sets to new partition sets.
> > >     Keys end up with a clean hierarchy instead of a major reshuffling.
> > >
> > >   - Dong's approach piggyback on existing leader epoch.  Log segment
> > >     could be tagged with version in linear hashing case.
> > >
> > >   - In Jan's case, existing consumers bootstrap from the beginning.
> > >
> > >   - James' use case.  Using Kafka as a long term persistent data store.
> > >     Putting "source of truth" information into Kafka.  Bootstrap case
> > >     is very important.  New applications could be bootstrapping as they
> > >     come up.
> > >
> > >     - Increasing partitions will help with load from prodcuer and
> > >       increasing consumer parallelism.
> > >     - How does Kinesis handling partition splits?  They don't have
> > >       compacted logs, so no issue with bootstrapping.  Kinesis uses
> > >       MD5 and splits results based on md5sum into bucket ranges.
> > >     - Is it useful for the server to know the partitioning function?
> > >       Consumer has some implicit assumptions about keyed partitions,
> > >       but not strongly enforced on server side.
> > >
> > >     - KIP-213 (one to many joins in Kafka Streams)
> > >
> > >       - MySQL case.  Primary key forced to be used as Kafka key.
> > >
> > >         (Sorry had some audio and video drop at this point)
> > >
> > >       - Mirror Maker.  Source cluster has custom partitioning function.
> > >         Producer won't duplicate to same partitioning setup as source.
> > >         Need to provide same partitioning function to producer.
> > >         Would need to determine partitioning function based on topic.
> > >
> > >         - Should server validate partitioning?
> > >         - Who does actual determination of which key goes to which
> > > partition.
> > >
> > >       - How to implement backfill?
> > >
> > >         - Who will do it?  In producer?  Hard to do.  Every client
> would
> > >           need to add this functionality.  Better to do on server side.
> > >         - Add a type of "copy consumer"?  Add backoff to producer?
> > >           Benefit of doing in consumer vs. producer?
> > >
> > >   - Still TBD
> > >     - How to dedupe control messages?
> > >     - How to deal with subtle cases during transition?
> > >     - Is it useful for the server to have the option to validate the
> key
> > >       distribution?
> > >     - Jan concerned about how a consumer application would look with
> the
> > >       new "split partition" design.
> > >     - KIP introduced callback.  Jan doesn't think is useful. Callback
> > >       for switching "between Partition 1 and can start on Partition
> 11".
> > >       Rely on marker in Partition 1 instead.  Intent for callback is
> > >       for possibility that delivery of messages for given key is moved
> > >       to a different consumer instance.
> > >
> > >
> > >
> > > On 4/6/18 9:44 AM, Dong Lin wrote:
> > >
> > >> Hey John,
> > >>
> > >> Thanks much for your super-detailed explanation. This is very helpful.
> > >>
> > >> Now that I have finished reading through your email, I think the
> > proposed
> > >> solution in my previous email probably meets the requirement #6
> without
> > >> requiring additional coordination (w.r.t. partition function) among
> > >> clients. My understanding of requirement #6 is that, after partition
> > >> expansion, messages with the given key will go to the same consumer
> > before
> > >> and after the partition expansion such that stream processing jobs
> won't
> > >> be
> > >> affected. Thus this approach seems to be better than backfilling since
> > it
> > >> does not require data copy for input topics.
> > >>
> > >> In order for the proposed solution to meet requirements #6, we need
> two
> > >> extra requirements in addition to what has been described in the
> > previous
> > >> email: 1) stream processing job starts with the same number of
> > processors
> > >> as the initial number of partitions of the input topics; and 2) at any
> > >> given time the number of partitions of the input topic >= the number
> of
> > >> processors of the given stream processing job.
> > >>
> > >> Could you take a look at the proposed solution and see if any of the
> > >> claims
> > >> above is false?
> > >>
> > >>
> > >> Hey Jan,
> > >>
> > >> Maybe it is more efficient for us to discuss your concern in the KIP
> > >> Meeting.
> > >>
> > >>
> > >> Thanks,
> > >> Dong
> > >>
> > >>
> > >> On Thu, Mar 29, 2018 at 2:05 PM, John Roesler <j...@confluent.io>
> > wrote:
> > >>
> > >> Hi Jun,
> > >>>
> > >>> Thanks for the response. I'm very new to this project, but I will
> share
> > >>> my
> > >>> perspective. I'm going to say a bunch of stuff that I know you know
> > >>> already, but just so we're on the same page...
> > >>>
> > >>> This may also be a good time to get feedback from the other KStreams
> > >>> folks.
> > >>>
> > >>> Using KStreams as a reference implementation for how stream
> processing
> > >>> frameworks may interact with Kafka, I think it's important to eschew
> > >>> knowledge about how KStreams currently handles internal
> communication,
> > >>> making state durable, etc. Both because these details may change, and
> > >>> because they won't be shared with other stream processors.
> > >>>
> > >>> =================================
> > >>> Background
> > >>>
> > >>> We are looking at a picture like this:
> > >>>
> > >>>       input input input
> > >>>           \   |   /
> > >>>        +-------------+
> > >>> +-----+ Consumer(s) +-------+
> > >>> |     +-------------+       |
> > >>> |                           |
> > >>> |    KStreams Application   |
> > >>> |                           |
> > >>> |     +-------------+       |
> > >>> +-----+ Producer(s) +-------+
> > >>>        +-------------+
> > >>>             /    \
> > >>>          output output
> > >>>
> > >>> The inputs and outputs are Kafka topics (and therefore have 1 or more
> > >>> partitions). We'd have at least 1 input and 0 or more outputs. The
> > >>> Consumers and Producers are both the official KafkaConsumer and
> > >>> KafkaProducer.
> > >>>
> > >>> In general, we'll assume that the input topics are provided by actors
> > >>> over
> > >>> which we have no control, although we may as well assume they are
> > >>> friendly
> > >>> and amenable to requests, and also that their promises are
> trustworthy.
> > >>> This is important because we must depend on them to uphold some
> > promises:
> > >>> * That they tell us the schema of the data they publish, and abide by
> > >>> that
> > >>> schema. Without this, the inputs are essentially garbage.
> > >>> * That they tell us some defining characteristics of the topics (more
> > on
> > >>> this in a sec.) and again strictly abide by that promise.
> > >>>
> > >>> What are the topic characteristics we care about?
> > >>> 1. The name (or name pattern)
> > >>> 2. How the messages are keyed (if at all)
> > >>> 3. Whether the message timestamps are meaningful, and if so, what
> their
> > >>> meaning is
> > >>> 4. Assuming the records have identity, whether the partitions
> partition
> > >>> the
> > >>> records' identity space
> > >>> 5. Whether the topic completely contains the data set
> > >>> 6. Whether the messages in the topic are ordered
> > >>>
> > >>> #1 is obvious: without this information, we cannot access the data at
> > >>> all.
> > >>>
> > >>> For #2, #3, #4, and #6, we may or may not need this information,
> > >>> depending
> > >>> on the logic of the application. For example, a trivial application
> > that
> > >>> simply counts all events it sees doesn't care about #2, #3, #4, or
> #6.
> > >>> But
> > >>> an application that groups by some attribute can take advantage of #2
> > and
> > >>> #4 if the topic data is already keyed and partitioned over that
> > >>> attribute.
> > >>> Likewise, if the application includes some temporal semantics on a
> > >>> temporal
> > >>> dimension that is already captured in #3, it may take advantage of
> that
> > >>> fact.
> > >>>
> > >>> Note that #2, #3, #4, and #6 are all optional. If they are not
> > promised,
> > >>> we
> > >>> can do extra work inside the application to accomplish what we need.
> > >>> However, if they are promised (and if we depend on that promise), it
> is
> > >>> essential that the topic providers uphold those promises, as we may
> not
> > >>> be
> > >>> in a position to verify them.
> > >>>
> > >>> Note also that if they make a promise, but it doesn't happen to line
> up
> > >>> with our needs (data is keyed by attr1, but we need it by attr2, or
> > >>> timestamp is produce-time, but we need it by event-time, etc.), then
> we
> > >>> will have to go ahead and do that extra work internally anyway. This
> > also
> > >>> captures the situation in which two inputs are produced by different
> > >>> providers, one of which meets our needs, and the other does not. The
> > fact
> > >>> that we can cope with this situation is the basis for my statement
> that
> > >>> we
> > >>> do not require coordination among producers.
> > >>>
> > >>> (Key Point A): In terms of optimization, #4 and #6 are the most
> > valuable.
> > >>> If these characteristics happen to line up with our needs, then
> > KStreams
> > >>> can be incredibly efficient in both time and computational resources.
> > >>>
> > >>>   #5 is similar to knowing the schema in that it tells us whether the
> > >>> computation we want to do is possible or not. For example, suppose we
> > >>> have
> > >>> a topic of "users", and we want to construct a table for querying. If
> > the
> > >>> user topic doesn't completely contain the dataset, we cannot
> construct
> > >>> the
> > >>> table. Note that it doesn't matter whether the topic is compacted or
> > not.
> > >>> If the topic is complete, I can consume it starting at "earliest" and
> > >>> build
> > >>> my table. If it is not complete, I can do other computations on it.
> In
> > >>> both
> > >>> cases, it may or may not be compacted; it just doesn't matter.
> > >>>
> > >>> On the output side, the roles are reversed. We provide (or not)
> exactly
> > >>> the
> > >>> same set of guarantees to consumers of our outputs, and we likewise
> > must
> > >>> abide by the promises we make.
> > >>>
> > >>>
> > >>> =================================
> > >>> Partition Expansion
> > >>>
> > >>> With this formation in place, let's talk about partition expansion.
> > >>>
> > >>> Why do we have partitions in the first place? (let me know if I miss
> > >>> something here)
> > >>> * For logical data streams that are themselves partitionable, it
> allows
> > >>> producers to operate concurrently without coordination. For example,
> > >>> streaming data from a sensor in a particle accelerator, the sensor
> can
> > be
> > >>> subdivided into a grid and each grid square can produce independently
> > to
> > >>> a
> > >>> different topic. This may be valuable because the total rate of data
> > >>> exceeds the throughput to a single broker node or just because it
> > allows
> > >>> for failure of a single producer to cause the loss of only part of
> the
> > >>> data.
> > >>> * The brokers can offer linearly scaling throughput on the number of
> > >>> partitions by hosting each partition on a separate broker node
> > >>> * The brokers can host topics that are too large to fit on a single
> > >>> broker's storage by hosting some partitions on separate broker nodes
> > >>> * In cases where the use case permits handling partitions
> > independently,
> > >>> consumers can have algorithmic simplicity by processing the data for
> > >>> separate partitions in separate threads, avoiding costly and
> > error-prone
> > >>> concurrent coordination code
> > >>> * In cases where the use case permits handling partitions
> > independently,
> > >>> consumers can exceed the total throughput of a single broker-consumer
> > >>> pair
> > >>> * Just to throw this in as well, in cases where some network links
> are
> > >>> less
> > >>> costly than others (or lower latency or more reliable), such as when
> > >>> brokers, producers, and consumers are running in racks: producer and
> > >>> consumers can both benefit (independently) by locating work on each
> > >>> partition in the same rack as the broker hosting that partition.
> > >>>
> > >>> In other words, we have three actors in this system: producers,
> > brokers,
> > >>> and consumers, and they all benefit from partitioning for different
> > (but
> > >>> sometimes related) reasons.
> > >>>
> > >>> This leads naturally to the conclusion that any of these actors may
> > find
> > >>> themselves in a sub-optimal or even dangerous situation in which
> > >>> partition
> > >>> expansion would be the solution. For example, the producer may find
> > that
> > >>> the existing throughput to the brokers is insufficient to match the
> > data
> > >>> rate, forcing them to drop data. Or a broker hosting a single
> partition
> > >>> may
> > >>> be running out of disk space. Or a consumer node handling a single
> > >>> partition cannot match the rate of production for that partition,
> > causing
> > >>> it to fall behind.
> > >>>
> > >>> I think it's reasonable to assume that all the actors in the system
> > can't
> > >>> just arbitrarily expand a topic's partition. I think it's reasonable
> to
> > >>> align this responsibility with the provider of the data, namely the
> > >>> producer (the logical producer, not the KafkaProducer class).
> > Therefore,
> > >>> the producer who finds themselves in trouble can unilaterally expand
> > >>> partitions to solve their problem.
> > >>>
> > >>> For the broker or a consumer in trouble, they have only one resort:
> to
> > >>> request the producer to expand partitions. This is where it's helpful
> > to
> > >>> assume the producer is friendly.
> > >>>
> > >>>
> > >>> Now, let's look at how a KStreams application fits into this
> scenario.
> > >>>
> > >>> (Key Point B1): As a consumer, we may find that the producer expands
> > the
> > >>> partitions of a topic, either for their own benefit or for the
> brokers.
> > >>> In
> > >>> this situation, the expand operation MUST NOT violate any promises
> that
> > >>> have previously been made to us. This is the essence of KIP-253, to
> > >>> ensure
> > >>> the maintenance of promises #6 and #4. It would be great if the
> > mechanics
> > >>> of the expansion required no major disruption to processing or human
> > >>> intervention.
> > >>>
> > >>> Specifically, let's say that input partition X splits into X1 and X2.
> > #6
> > >>> requires that the same old ordering guarantees of Kafka continue to
> > hold.
> > >>> Obviously, this is what KIP-253's title is about. #4 requires that we
> > >>> either ensure that X1 and X2 are assigned to the same thread that was
> > >>> previously assigned X OR that we immediately pause processing and
> split
> > >>> any
> > >>> state such that it appears X1 and X2 were *always* separate
> partitions.
> > >>>
> > >>> In other words, Option 1 is we treat X1 and X2 as still logically one
> > >>> partition, equal to X. This is ideal, since in this scenario,
> > partitions
> > >>> are expanding for external reasons. We don't need to expand our
> > >>> processing
> > >>> to match. Option 2 requires a major disruption, since we'd have to
> > pause
> > >>> processing while we split our state. Clearly, KStreams or any other
> > >>> stateful consumer would advocate for Option 1.
> > >>>
> > >>>
> > >>> (Corollary to Key Point A): Still on the consumer side, we may find
> > that
> > >>> we
> > >>> ourselves can benefit from partition expansion of an input. Since we
> > can
> > >>> cope with the absence of promise #4, partition expansion is not a
> hard
> > >>> requirement for us, but assuming we were already benefiting from the
> > >>> major
> > >>> performance optimizations afforded by #4, it would be nice to be able
> > to
> > >>> request the producer satisfy our request for partition expansion
> **and
> > to
> > >>> be able to benefit from it**.
> > >>>
> > >>> What does it mean to be able to benefit from partition expansion?
> > >>> Assuming
> > >>> input topic partition X splits into X1 and X2, in this scenario, we
> > >>> *would*
> > >>> wish to be able to split our state such that it appears X1 and X2
> were
> > >>> *always* separate partitions. Of course, the conclusion of Key Point
> B1
> > >>> still applies: we should be able to continue operating on (X1+X2 = X)
> > as
> > >>> one partition while asynchronously building the state of X1 and X2
> > >>> separately.
> > >>>
> > >>> When it comes to the mechanics of building the state of X1 and X2
> > >>> separately, we have really just two high-level options. Either this
> > >>> problem
> > >>> is solved by Kafka itself, giving me a view in which X1 and X2 were
> > >>> always
> > >>> separate partitions, or I have to do it myself. The latter means
> that I
> > >>> have to take on substantially more complexity than I do today:
> > >>> Bummer 1: My state has to be splittable to begin with, implying at
> the
> > >>> least that I need to be able to scan every record in my state, a
> > >>> requirement that otherwise does not exist.
> > >>> Bummer 2: After splitting the state  of X1 and X2, I need to be able
> to
> > >>> send at least one of those tasks, state included, to another
> > application
> > >>> node (in order to realize the benefit of the split). This is also a
> > >>> requirement that does not currently exist.
> > >>> Bummer 3: In order to actually perform the split, I must know and be
> > able
> > >>> to execute the exact same partition function the producer of my topic
> > >>> uses.
> > >>> This introduces a brand-new a priori commitment from my input
> > producers:
> > >>> (would-be #7: convey the partition function and abide by it). This
> is a
> > >>> big
> > >>> restriction over #4, which only requires them to guarantee *that
> there
> > >>> is a
> > >>> partition function*. Now they actually have to share the function
> with
> > >>> me.
> > >>> And I have to be able to implement and execute it myself. And if the
> > >>> producer wishes to refine the partition function for an existing
> topic,
> > >>> we
> > >>> have another round of coordination, as they have to be sure that I,
> and
> > >>> all
> > >>> other consumers, begin using the new function *before* they do. This
> is
> > >>> similar to the schema problem, with a similar set of solutions. We
> > would
> > >>> likely need a partition function registry and another magic byte on
> > every
> > >>> record to be sure we do this right. Not to mention some way to
> express
> > >>> arbitrary partitioning logic over arbitrary data in a way that is
> > >>> portable
> > >>> across programming languages.
> > >>>
> > >>> Alternatively, if Kafka gives me a view in which X1 and X2 were
> always
> > >>> separate, then I can create tasks for X1 and X2 and allow them to
> > >>> bootstrap
> > >>> while I continue to process X. Once they are ready, I can coordinate
> a
> > >>> transition to stop X's task and switch to X1 and X2. None of those
> > >>> bummers
> > >>> are present, so this is a significantly better option for me.
> > >>>
> > >>> (Key Point B2): As a (friendly) producer, we may once again want on
> our
> > >>> own
> > >>> to expand partitions, or we may want to satisfy a request from the
> > broker
> > >>> or our consumers to do so. Again, we MUST NOT violate any promises we
> > >>> have
> > >>> previously given, and it would be great if the expansion required no
> > >>> major
> > >>> disruption to processing or human intervention. Additionally, since
> we
> > >>> are
> > >>> actually driving the expansion, it would also be great if we could
> > avoid
> > >>> Bummer 3's coordination problems from the producer's perspective.
> > >>>
> > >>>
> > >>> ======================================
> > >>> Briefly: KStreams internals
> > >>>
> > >>> I'm pretty sure you were asking me to comment on the implementation
> > >>> details
> > >>> of KStreams, so I'll say a few words about it. The most important
> thing
> > >>> is
> > >>> that KStreams is still very early in its development. Maybe
> > "early-middle
> > >>> maturity" is a good way to put it. We are actively discussing
> > >>> more-or-less
> > >>> major implementation changes to improve performance, footprint,
> > >>> scalability, and ergonomics. So it may actually be misleading to
> > discuss
> > >>> deeply how KStreams internally uses Kafka topics.
> > >>>
> > >>> Nevertheless, it is currently true that KStreams uses Kafka topics
> for
> > >>> communication between some internal computation nodes. We partition
> > these
> > >>> topics as the base unit of concurrency granularity, so it would
> > >>> potentially
> > >>> be beneficial to be able to expand partitions for some of these
> > internal
> > >>> topics at some point. However, we can alternatively just
> overpartition
> > >>> these internal topics, creating in the low 100s of partitions instead
> > of
> > >>> the low 10s, for example. (Side note: if Kafka were someday to
> support
> > >>> higher numbers of partitions, we could expand this scheme to
> > >>> overpartition
> > >>> in the 1000s of partitions.) With the option to overpartition, we
> don't
> > >>> have a strong need for partition expansion internally.
> > >>>
> > >>> It is also currently true that KStreams uses Kafka to store a durable
> > >>> changelog for some of our internal state stores. But we *only* read
> > from
> > >>> this topic *if* we need to restore a state store after node loss (or
> to
> > >>> maintain a hot mirror of the state store), so I think it's unlikely
> > that
> > >>> we
> > >>> would ever make use of partition expansion on the changelog topics.
> > >>>
> > >>> But once again, I'd like to emphasize that we may choose an
> alternative
> > >>> implementation for either interprocess communication or state
> > durability.
> > >>>
> > >>>
> > >>> ======================================
> > >>> Concluding thoughts
> > >>>
> > >>> I know this is a very long email, and I really appreciate you
> sticking
> > >>> with
> > >>> me this long. I hope it was useful for syncing our mental picture of
> > this
> > >>> system. Also, you're far more knowledgeable than I am about this
> system
> > >>> and
> > >>> this domain, so please correct me if I've said anything wrong.
> > >>>
> > >>> To me the key takeaways are that:
> > >>> - KIP-253 satisfies all we need for correctness, since it contains
> > >>> solutions to guarantee producers can abide by their promises w.r.t.
> #4
> > >>> and
> > >>> #6.
> > >>> - From Key Point A: #4 is actually optional for KIP-253, but without
> > it,
> > >>> we
> > >>> lose a potentially valuable optimization in KStreams (and all other
> > >>> consumer applications)
> > >>> - From Corollary to Point A: Without low-level support for partition
> > >>> expansion with backfill, we cannot employ requesting partition
> > expansion
> > >>> as
> > >>> a consumer to improve application performance. In that case, to
> ensure
> > >>> performance scalability, we would have to discard for all KStreams
> > >>> applications the performance optimization afforded by #4.
> > >>> - From Key Point B1: After a partition split, we really need to be
> able
> > >>> to
> > >>> seamlessly continue operating as if it had not split.
> > >>> - From Key Point B2: Since KIP-253 allows us to maintain all our
> > >>> promises,
> > >>> we have the option of expanding partitions in the topics we produce.
> > >>> Without a backfill operation, though, our consumers may not be able
> to
> > >>> realize the benefits of that split, if they were hoping to.
> > >>>
> > >>> In general, faced with the possibility of having to coordinate the
> > >>> partition function with our inputs' producers or with our outputs'
> > >>> consumers, I would personally lean toward overprovisioning and
> > completely
> > >>> avoiding resize for our use case. This doesn't mean that it's not
> > useful
> > >>> in
> > >>> the ecosystem at large without backfill, just that it loses its
> luster
> > >>> for
> > >>> me. It also means that we can no longer take advantage of some of our
> > >>> current optimizations, and in fact that we must introduce an extra
> hop
> > of
> > >>> repartitioning on every single input.
> > >>>
> > >>> I think this is actually a pretty good picture of the opportunities
> and
> > >>> challenges that other consumers and producers in the Kafka ecosystem
> > will
> > >>> face.
> > >>>
> > >>> I hope this helps!
> > >>>
> > >>> Thanks,
> > >>> -John
> > >>>
> > >>> On Wed, Mar 28, 2018 at 11:51 AM, Jun Rao <j...@confluent.io> wrote:
> > >>>
> > >>> Hi, John,
> > >>>>
> > >>>> I actually think it's important to think through how KStreams
> handles
> > >>>> partition expansion in this KIP. If we do decide that we truly need
> > >>>> backfilling, it's much better to think through how to add it now,
> > >>>> instead
> > >>>> of retrofitting it later. It would be useful to outline how both
> > >>>> existing
> > >>>> KStreams jobs and new KStreams jobs work to see if backfilling is
> > really
> > >>>> needed.
> > >>>>
> > >>>> If we can figure out how KStreams works, at least we have one
> > reference
> > >>>> implementation for other stream processing frameworks that face the
> > same
> > >>>> issue.
> > >>>>
> > >>>> Thanks,
> > >>>>
> > >>>> Jun
> > >>>>
> > >>>>
> > >>>> On Tue, Mar 27, 2018 at 4:56 PM, John Roesler <j...@confluent.io>
> > >>>> wrote:
> > >>>>
> > >>>> Hi Jun,
> > >>>>>
> > >>>>> That's a good point.
> > >>>>>
> > >>>>> Yeah, I don't think it would work too well for existing consumers
> in
> > >>>>>
> > >>>> the
> > >>>
> > >>>> middle of gen 0 to try and switch to a newly backfilled prefix of
> gen
> > >>>>>
> > >>>> 1.
> > >>>
> > >>>> They probably just need to finish up until they get to the end of
> gen
> > 0
> > >>>>>
> > >>>> and
> > >>>>
> > >>>>> transition just as if there were no backfill available yet.
> > >>>>>
> > >>>>> This isn't terrible, since consumer applications that care about
> > >>>>>
> > >>>> scaling
> > >>>
> > >>>> up
> > >>>>
> > >>>>> to match a freshly split partition would wait until after the
> > backfill
> > >>>>>
> > >>>> is
> > >>>
> > >>>> available to scale up. The consumer that starts out in gen=0, part=0
> > is
> > >>>>> going to be stuck with part=0 and part=3 in gen=1 in my example
> > >>>>>
> > >>>> regardless
> > >>>>
> > >>>>> of whether they finish scanning gen=0 before or after the backfill
> is
> > >>>>> available.
> > >>>>>
> > >>>>> The broker knowing when it's ok to delete gen 0, including its
> offset
> > >>>>> mappings, is a big issue, though. I don't have any immediate ideas
> > for
> > >>>>> solving it, but it doesn't feel impossible. Hopefully, you agree
> this
> > >>>>>
> > >>>> is
> > >>>
> > >>>> outside of KIP-253's scope, so maybe we don't need to worry about it
> > >>>>>
> > >>>> right
> > >>>>
> > >>>>> now.
> > >>>>>
> > >>>>> I do agree that reshuffling in KStreams effectively solves the
> > >>>>>
> > >>>> scalability
> > >>>>
> > >>>>> problem as well, as it decouples the partition count (and the
> > partition
> > >>>>> scheme) upstream from the parallelism of the streams application.
> > >>>>>
> > >>>> Likely,
> > >>>
> > >>>> we will do this in any case. I'm predominantly advocating for
> > follow-on
> > >>>>> work to enable backfill for the *other* Kafka users that are not
> > >>>>>
> > >>>> KStreams.
> > >>>>
> > >>>>> Thanks for your consideration,
> > >>>>> -John
> > >>>>>
> > >>>>> On Tue, Mar 27, 2018 at 6:19 PM, Jun Rao <j...@confluent.io> wrote:
> > >>>>>
> > >>>>> Hi, John,
> > >>>>>>
> > >>>>>> Thanks for the reply. I agree that the backfill approach works
> > >>>>>>
> > >>>>> cleaner
> > >>>
> > >>>> for
> > >>>>>
> > >>>>>> newly started consumers. I am just not sure if it's a good
> primitive
> > >>>>>>
> > >>>>> to
> > >>>
> > >>>> support for existing consumers. One of the challenges that I see is
> > >>>>>>
> > >>>>> the
> > >>>
> > >>>> remapping of the offsets. In your approach, we need to copy the
> > >>>>>>
> > >>>>> existing
> > >>>>
> > >>>>> records from the partitions in generation 0 to generation 1. Those
> > >>>>>>
> > >>>>> records
> > >>>>>
> > >>>>>> will get different offsets in the new generation. The broker will
> > >>>>>>
> > >>>>> have
> > >>>
> > >>>> to
> > >>>>
> > >>>>> store those offset mappings somewhere. When the backfill completes,
> > >>>>>>
> > >>>>> you
> > >>>
> > >>>> can
> > >>>>>
> > >>>>>> delete generation 0's data. However, the broker can't throw away
> the
> > >>>>>>
> > >>>>> offset
> > >>>>>
> > >>>>>> mappings immediately since it doesn't know if there is any
> existing
> > >>>>>> consumer still consuming generation 0's records. In a compacted
> > >>>>>>
> > >>>>> topic,
> > >>>
> > >>>> the
> > >>>>>
> > >>>>>> broker probably can only safely remove the offset mappings when
> all
> > >>>>>>
> > >>>>> records
> > >>>>>
> > >>>>>> in generation 0 are removed by the cleaner. This may never happen
> > >>>>>>
> > >>>>> though.
> > >>>>
> > >>>>> If we reshuffle the input inside a KStreams job, it obviates the
> need
> > >>>>>>
> > >>>>> for
> > >>>>
> > >>>>> offset remapping on the broker.
> > >>>>>>
> > >>>>>> Jun
> > >>>>>>
> > >>>>>> On Tue, Mar 27, 2018 at 11:34 AM, John Roesler <j...@confluent.io
> >
> > >>>>>>
> > >>>>> wrote:
> > >>>>>
> > >>>>>> Hey Dong and Jun,
> > >>>>>>>
> > >>>>>>> Thanks for the thoughtful responses. If you don't mind, I'll mix
> my
> > >>>>>>>
> > >>>>>> replies
> > >>>>>>
> > >>>>>>> together to try for a coherent response. I'm not too familiar
> with
> > >>>>>>> mailing-list etiquette, though.
> > >>>>>>>
> > >>>>>>> I'm going to keep numbering my points because it makes it easy
> for
> > >>>>>>>
> > >>>>>> you
> > >>>>
> > >>>>> all
> > >>>>>>
> > >>>>>>> to respond.
> > >>>>>>>
> > >>>>>>> Point 1:
> > >>>>>>> As I read it, KIP-253 is *just* about properly fencing the
> > >>>>>>>
> > >>>>>> producers
> > >>>
> > >>>> and
> > >>>>>
> > >>>>>> consumers so that you preserve the correct ordering of records
> > >>>>>>>
> > >>>>>> during
> > >>>
> > >>>> partition expansion. This is clearly necessary regardless of
> > >>>>>>>
> > >>>>>> anything
> > >>>
> > >>>> else
> > >>>>>>
> > >>>>>>> we discuss. I think this whole discussion about backfill,
> > >>>>>>>
> > >>>>>> consumers,
> > >>>
> > >>>> streams, etc., is beyond the scope of KIP-253. But it would be
> > >>>>>>>
> > >>>>>> cumbersome
> > >>>>>
> > >>>>>> to start a new thread at this point.
> > >>>>>>>
> > >>>>>>> I had missed KIP-253's Proposed Change #9 among all the
> details...
> > >>>>>>>
> > >>>>>> I
> > >>>
> > >>>> think
> > >>>>>>
> > >>>>>>> this is a nice addition to the proposal. One thought is that it's
> > >>>>>>>
> > >>>>>> actually
> > >>>>>>
> > >>>>>>> irrelevant whether the hash function is linear. This is simply an
> > >>>>>>>
> > >>>>>> algorithm
> > >>>>>>
> > >>>>>>> for moving a key from one partition to another, so the type of
> hash
> > >>>>>>> function need not be a precondition. In fact, it also doesn't
> > >>>>>>>
> > >>>>>> matter
> > >>>
> > >>>> whether the topic is compacted or not, the algorithm works
> > >>>>>>>
> > >>>>>> regardless.
> > >>>>
> > >>>>> I think this is a good algorithm to keep in mind, as it might
> > >>>>>>>
> > >>>>>> solve a
> > >>>
> > >>>> variety of problems, but it does have a downside: that the producer
> > >>>>>>>
> > >>>>>> won't
> > >>>>>
> > >>>>>> know whether or not K1 was actually in P1, it just knows that K1
> > >>>>>>>
> > >>>>>> was
> > >>>
> > >>>> in
> > >>>>
> > >>>>> P1's keyspace before the new epoch. Therefore, it will have to
> > >>>>>>> pessimistically send (K1,null) to P1 just in case. But the next
> > >>>>>>>
> > >>>>>> time
> > >>>
> > >>>> K1
> > >>>>
> > >>>>> comes along, the producer *also* won't remember that it already
> > >>>>>>>
> > >>>>>> retracted
> > >>>>>
> > >>>>>> K1 from P1, so it will have to send (K1,null) *again*. By
> > >>>>>>>
> > >>>>>> extension,
> > >>>
> > >>>> every
> > >>>>>>
> > >>>>>>> time the producer sends to P2, it will also have to send a
> > >>>>>>>
> > >>>>>> tombstone
> > >>>
> > >>>> to
> > >>>>
> > >>>>> P1,
> > >>>>>>
> > >>>>>>> which is a pretty big burden. To make the situation worse, if
> there
> > >>>>>>>
> > >>>>>> is
> > >>>>
> > >>>>> a
> > >>>>>
> > >>>>>> second split, say P2 becomes P2 and P3, then any key Kx belonging
> > >>>>>>>
> > >>>>>> to
> > >>>
> > >>>> P3
> > >>>>
> > >>>>> will also have to be retracted from P2 *and* P1, since the producer
> > >>>>>>>
> > >>>>>> can't
> > >>>>>
> > >>>>>> know whether Kx had been last written to P2 or P1. Over a long
> > >>>>>>>
> > >>>>>> period
> > >>>
> > >>>> of
> > >>>>>
> > >>>>>> time, this clearly becomes a issue, as the producer must send an
> > >>>>>>>
> > >>>>>> arbitrary
> > >>>>>>
> > >>>>>>> number of retractions along with every update.
> > >>>>>>>
> > >>>>>>> In contrast, the proposed backfill operation has an end, and
> after
> > >>>>>>>
> > >>>>>> it
> > >>>
> > >>>> ends,
> > >>>>>>
> > >>>>>>> everyone can afford to forget that there ever was a different
> > >>>>>>>
> > >>>>>> partition
> > >>>>
> > >>>>> layout.
> > >>>>>>>
> > >>>>>>> Really, though, figuring out how to split compacted topics is
> > >>>>>>>
> > >>>>>> beyond
> > >>>
> > >>>> the
> > >>>>>
> > >>>>>> scope of KIP-253, so I'm not sure #9 really even needs to be in
> > >>>>>>>
> > >>>>>> this
> > >>>
> > >>>> KIP...
> > >>>>>>
> > >>>>>>> We do need in-order delivery during partition expansion. It would
> > >>>>>>>
> > >>>>>> be
> > >>>
> > >>>> fine
> > >>>>>
> > >>>>>> by me to say that you *cannot* expand partitions of a
> log-compacted
> > >>>>>>>
> > >>>>>> topic
> > >>>>>
> > >>>>>> and call it a day. I think it would be better to tackle that in
> > >>>>>>>
> > >>>>>> another
> > >>>>
> > >>>>> KIP.
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> Point 2:
> > >>>>>>> Regarding whether the consumer re-shuffles its inputs, this is
> > >>>>>>>
> > >>>>>> always
> > >>>
> > >>>> on
> > >>>>>
> > >>>>>> the table; any consumer who wants to re-shuffle its input is free
> > >>>>>>>
> > >>>>>> to
> > >>>
> > >>>> do
> > >>>>
> > >>>>> so.
> > >>>>>>
> > >>>>>>> But this is currently not required. It's just that the current
> > >>>>>>>
> > >>>>>> high-level
> > >>>>>
> > >>>>>> story with Kafka encourages the use of partitions as a unit of
> > >>>>>>>
> > >>>>>> concurrency.
> > >>>>>>
> > >>>>>>> As long as consumers are single-threaded, they can happily
> consume
> > >>>>>>>
> > >>>>>> a
> > >>>
> > >>>> single
> > >>>>>>
> > >>>>>>> partition without concurrency control of any kind. This is a key
> > >>>>>>>
> > >>>>>> aspect
> > >>>>
> > >>>>> to
> > >>>>>>
> > >>>>>>> this system that lets folks design high-throughput systems on top
> > >>>>>>>
> > >>>>>> of
> > >>>
> > >>>> it
> > >>>>
> > >>>>> surprisingly easily. If all consumers were instead
> > >>>>>>>
> > >>>>>> encouraged/required
> > >>>>
> > >>>>> to
> > >>>>>
> > >>>>>> implement a repartition of their own, then the consumer becomes
> > >>>>>>> significantly more complex, requiring either the consumer to
> first
> > >>>>>>>
> > >>>>>> produce
> > >>>>>>
> > >>>>>>> to its own intermediate repartition topic or to ensure that
> > >>>>>>>
> > >>>>>> consumer
> > >>>
> > >>>> threads have a reliable, high-bandwith channel of communication
> > >>>>>>>
> > >>>>>> with
> > >>>
> > >>>> every
> > >>>>>>
> > >>>>>>> other consumer thread.
> > >>>>>>>
> > >>>>>>> Either of those tradeoffs may be reasonable for a particular user
> > >>>>>>>
> > >>>>>> of
> > >>>
> > >>>> Kafka,
> > >>>>>>
> > >>>>>>> but I don't know if we're in a position to say that they are
> > >>>>>>>
> > >>>>>> reasonable
> > >>>>
> > >>>>> for
> > >>>>>>
> > >>>>>>> *every* user of Kafka.
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> Point 3:
> > >>>>>>> Regarding Jun's point about this use case, "(3) stateful and
> > >>>>>>>
> > >>>>>> maintaining
> > >>>>>
> > >>>>>> the
> > >>>>>>> states in a local store", I agree that they may use a framework
> > >>>>>>>
> > >>>>>> *like*
> > >>>>
> > >>>>> Kafka Streams, but that is not the same as using Kafka Streams.
> > >>>>>>>
> > >>>>>> This
> > >>>
> > >>>> is
> > >>>>
> > >>>>> why
> > >>>>>>
> > >>>>>>> I think it's better to solve it in Core: because it is then
> solved
> > >>>>>>>
> > >>>>>> for
> > >>>>
> > >>>>> KStreams and also for everything else that facilitates local state
> > >>>>>>> maintenance. To me, Streams is a member of the category of
> "stream
> > >>>>>>> processing frameworks", which is itself a subcategory of "things
> > >>>>>>>
> > >>>>>> requiring
> > >>>>>>
> > >>>>>>> local state maintenence". I'm not sure if it makes sense to
> assert
> > >>>>>>>
> > >>>>>> that
> > >>>>
> > >>>>> Streams is a sufficient and practical replacement for everything in
> > >>>>>>>
> > >>>>>> "things
> > >>>>>>
> > >>>>>>> requiring local state maintenence".
> > >>>>>>>
> > >>>>>>> But, yes, I do agree that per-key ordering is an absolute
> > >>>>>>>
> > >>>>>> requirement,
> > >>>>
> > >>>>> therefore I think that KIP-253 itself is a necessary step.
> > >>>>>>>
> > >>>>>> Regarding
> > >>>
> > >>>> the
> > >>>>>
> > >>>>>> coupling of the state store partitioning to the topic
> partitioning,
> > >>>>>>>
> > >>>>>> yes,
> > >>>>>
> > >>>>>> this is an issue we are discussing solutions to right now. We may
> > >>>>>>>
> > >>>>>> go
> > >>>
> > >>>> ahead
> > >>>>>>
> > >>>>>>> and introduce an overpartition layer on our inputs to solve it,
> but
> > >>>>>>>
> > >>>>>> then
> > >>>>>
> > >>>>>> again, if we get the ability to split partitions with backfill, we
> > >>>>>>>
> > >>>>>> may
> > >>>>
> > >>>>> not
> > >>>>>>
> > >>>>>>> need to!
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> Point 4:
> > >>>>>>> On this:
> > >>>>>>>
> > >>>>>>> Regarding thought 2: If we don't care about the stream use-case,
> > >>>>>>>>
> > >>>>>>> then
> > >>>>
> > >>>>> the
> > >>>>>>
> > >>>>>>> current KIP probably has already addressed problem without
> > >>>>>>>>
> > >>>>>>> requiring
> > >>>>
> > >>>>> consumer to know the partition function. If we care about the
> > >>>>>>>>
> > >>>>>>> stream
> > >>>>
> > >>>>> use-case, we already need coordination across producers of
> > >>>>>>>>
> > >>>>>>> different
> > >>>>
> > >>>>> topics, i.e. the same partition function needs to be used by
> > >>>>>>>>
> > >>>>>>> producers
> > >>>>>
> > >>>>>> of
> > >>>>>>
> > >>>>>>> topics A and B in order to join topics A and B. Thus, it might be
> > >>>>>>>> reasonable to extend coordination a bit and say we need
> > >>>>>>>>
> > >>>>>>> coordination
> > >>>>
> > >>>>> across
> > >>>>>>>
> > >>>>>>>> clients (i.e. producer and consumer), such that consumer knows
> > >>>>>>>>
> > >>>>>>> the
> > >>>
> > >>>> partition function used by producer. If we do so, then we can let
> > >>>>>>>>
> > >>>>>>> consumer
> > >>>>>>>
> > >>>>>>>> re-copy data for the change log topic using the same partition
> > >>>>>>>>
> > >>>>>>> function
> > >>>>>
> > >>>>>> as
> > >>>>>>>
> > >>>>>>>> producer. This approach has lower overhead as compared to having
> > >>>>>>>>
> > >>>>>>> producer
> > >>>>>>
> > >>>>>>> re-copy data of the input topic.
> > >>>>>>>> Also, producer currently does not need to know the data already
> > >>>>>>>>
> > >>>>>>> produced
> > >>>>>>
> > >>>>>>> to
> > >>>>>>>
> > >>>>>>>> the topic. If we let producer split/merge partition, it would
> > >>>>>>>>
> > >>>>>>> require
> > >>>>
> > >>>>> producer to consume the existing data, which intuitively is the
> > >>>>>>>>
> > >>>>>>> task
> > >>>>
> > >>>>> of
> > >>>>>
> > >>>>>> consumer.
> > >>>>>>>>
> > >>>>>>>
> > >>>>>>> I think we do care about use cases *like* Streams, I just don't
> > >>>>>>>
> > >>>>>> think
> > >>>
> > >>>> we
> > >>>>>
> > >>>>>> should rely on Streams to implement a feature of Core like
> > >>>>>>>
> > >>>>>> partition
> > >>>
> > >>>> expansion.
> > >>>>>>>
> > >>>>>>> Note, though, that we (Streams) do not require coordination
> across
> > >>>>>>> producers. If two topics are certified to be co-partitioned, then
> > >>>>>>>
> > >>>>>> Streams
> > >>>>>
> > >>>>>> apps can make use of that knowledge to optimize their topology
> > >>>>>>>
> > >>>>>> (skipping
> > >>>>>
> > >>>>>> a
> > >>>>>>
> > >>>>>>> repartition). But if they don't know whether they are
> > >>>>>>>
> > >>>>>> co-partitioned,
> > >>>
> > >>>> then
> > >>>>>>
> > >>>>>>> they'd better go ahead and repartition within the topology. This
> is
> > >>>>>>>
> > >>>>>> the
> > >>>>
> > >>>>> current state.
> > >>>>>>>
> > >>>>>>> A huge selling point of Kafka is enabling different parts of
> > >>>>>>>
> > >>>>>> loosely
> > >>>
> > >>>> coupled organizations to produce and consume data independently.
> > >>>>>>>
> > >>>>>> Some
> > >>>
> > >>>> coordination between producers and consumers is necessary, like
> > >>>>>>> coordinating on the names of topics and their schemas. But
> Kafka's
> > >>>>>>>
> > >>>>>> value
> > >>>>>
> > >>>>>> proposition w.r.t. ESBs, etc. is inversely proportional to the
> > >>>>>>>
> > >>>>>> amount
> > >>>
> > >>>> of
> > >>>>>
> > >>>>>> coordination required. I think it behooves us to be extremely
> > >>>>>>>
> > >>>>>> skeptical
> > >>>>
> > >>>>> about introducing any coordination beyond correctness protocols.
> > >>>>>>>
> > >>>>>>> Asking producers and consumers, or even two different producers,
> to
> > >>>>>>>
> > >>>>>> share
> > >>>>>
> > >>>>>> code like the partition function is a pretty huge ask. What if
> they
> > >>>>>>>
> > >>>>>> are
> > >>>>
> > >>>>> using different languages?
> > >>>>>>>
> > >>>>>>> Comparing organizational overhead vs computational overhead,
> there
> > >>>>>>>
> > >>>>>> are
> > >>>>
> > >>>>> maybe two orders of magnitude difference between them. In other
> > >>>>>>>
> > >>>>>> words,
> > >>>>
> > >>>>> I
> > >>>>>
> > >>>>>> would happily take on the (linear) overhead of having the producer
> > >>>>>>>
> > >>>>>> re-copy
> > >>>>>>
> > >>>>>>> the data once during a re-partition in order to save the
> > >>>>>>>
> > >>>>>> organizational
> > >>>>
> > >>>>> overhead of tying all the producers and consumers together across
> > >>>>>>>
> > >>>>>> multiple
> > >>>>>>
> > >>>>>>> boundaries.
> > >>>>>>>
> > >>>>>>> On that last paragraph: note that the producer *did* know the
> data
> > >>>>>>>
> > >>>>>> it
> > >>>
> > >>>> already produced. It handled it the first time around. Asking it to
> > >>>>>>> re-produce it into a new partition layout is squarely within its
> > >>>>>>>
> > >>>>>> scope
> > >>>>
> > >>>>> of
> > >>>>>
> > >>>>>> capabilities. Contrast this with the alternative, asking the
> > >>>>>>>
> > >>>>>> consumer
> > >>>
> > >>>> to
> > >>>>>
> > >>>>>> re-partition the data. I think this is even less intuitive, when
> > >>>>>>>
> > >>>>>> the
> > >>>
> > >>>> partition function belongs to the producer.
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> Point 5:
> > >>>>>>> Dong asked this:
> > >>>>>>>
> > >>>>>>> For stream use-case that needs to increase consumer number, the
> > >>>>>>>> existing consumer can backfill the existing data in the change
> > >>>>>>>>
> > >>>>>>> log
> > >>>
> > >>>> topic
> > >>>>>>
> > >>>>>>> to
> > >>>>>>>
> > >>>>>>>> the same change log topic with the new partition number, before
> > >>>>>>>>
> > >>>>>>> the
> > >>>
> > >>>> new
> > >>>>>
> > >>>>>> set
> > >>>>>>>
> > >>>>>>>> of consumers bootstrap state from the new partitions of the
> > >>>>>>>>
> > >>>>>>> change
> > >>>
> > >>>> log
> > >>>>>
> > >>>>>> topic, right?
> > >>>>>>>>
> > >>>>>>>
> > >>>>>>> In this sense, the "consumer" is actually the producer of the
> > >>>>>>>
> > >>>>>> changelog
> > >>>>
> > >>>>> topic, so if we support partition expansion + backfill as a
> > >>>>>>>
> > >>>>>> producer/broker
> > >>>>>>
> > >>>>>>> operation, then it would be very straightforward for Streams to
> > >>>>>>>
> > >>>>>> split a
> > >>>>
> > >>>>> state store. As you say, they would simply instruct the broker to
> > >>>>>>>
> > >>>>>> split
> > >>>>
> > >>>>> the
> > >>>>>>
> > >>>>>>> changelog topic's partitions, then backfill. Once the backfill is
> > >>>>>>>
> > >>>>>> ready,
> > >>>>>
> > >>>>>> they can create a new crop of StandbyTasks to bootstrap the more
> > >>>>>>>
> > >>>>>> granular
> > >>>>>
> > >>>>>> state stores and finally switch over to them when they are ready.
> > >>>>>>>
> > >>>>>>> But this actually seems to be an argument in favor of
> > >>>>>>>
> > >>>>>> split+backfill,
> > >>>
> > >>>> so
> > >>>>>
> > >>>>>> maybe I missed the point.
> > >>>>>>>
> > >>>>>>> You also asked me to explain why copying the "input" topic is
> > >>>>>>>
> > >>>>>> better
> > >>>
> > >>>> than
> > >>>>>
> > >>>>>> copying the "changelog" topic. I think they are totally
> > >>>>>>>
> > >>>>>> independent,
> > >>>
> > >>>> actually. For one thing, you can't depend on the existence of a
> > >>>>>>>
> > >>>>>> "changelog"
> > >>>>>>
> > >>>>>>> topic in general, only within Streams, but Kafka's user base
> > >>>>>>>
> > >>>>>> clearly
> > >>>
> > >>>> exceeds Streams's user base. Plus, you actually also can't depend
> > >>>>>>>
> > >>>>>> on
> > >>>
> > >>>> the
> > >>>>>
> > >>>>>> existence of a changelog topic within Streams, since that is an
> > >>>>>>>
> > >>>>>> optional
> > >>>>>
> > >>>>>> feature of *some* state store implementations. Even in the
> > >>>>>>>
> > >>>>>> situation
> > >>>
> > >>>> where
> > >>>>>>
> > >>>>>>> you do have a changelog topic in Streams, there may be use cases
> > >>>>>>>
> > >>>>>> where
> > >>>>
> > >>>>> it
> > >>>>>
> > >>>>>> makes sense to expand the partitions of just the input, or just
> the
> > >>>>>>> changelog.
> > >>>>>>>
> > >>>>>>> The ask for a Core feature of split+backfill is really about
> > >>>>>>>
> > >>>>>> supporting
> > >>>>
> > >>>>> the
> > >>>>>>
> > >>>>>>> use case of splitting partitions in log-compacted topics,
> > >>>>>>>
> > >>>>>> regardless
> > >>>
> > >>>> of
> > >>>>
> > >>>>> whether that topic is an "input" or a "changelog" or anything else
> > >>>>>>>
> > >>>>>> for
> > >>>>
> > >>>>> that
> > >>>>>>
> > >>>>>>> matter.
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> Point 6:
> > >>>>>>> On the concern about the performance overhead of copying data
> > >>>>>>>
> > >>>>>> between
> > >>>
> > >>>> the
> > >>>>>
> > >>>>>> brokers, I think it's actually a bit overestimated. Splitting a
> > >>>>>>>
> > >>>>>> topic's
> > >>>>
> > >>>>> partition is probably rare, certainly rarer in general than
> > >>>>>>>
> > >>>>>> bootstrapping
> > >>>>>
> > >>>>>> new consumers on that topic. If "bootstrapping new consumers"
> means
> > >>>>>>>
> > >>>>>> that
> > >>>>>
> > >>>>>> they have to re-shuffle the data before they consume it, then you
> > >>>>>>>
> > >>>>>> wind
> > >>>>
> > >>>>> up
> > >>>>>
> > >>>>>> copying the same record multiple times:
> > >>>>>>>
> > >>>>>>> (broker: input topic) -> (initial consumer) -> (broker:
> repartition
> > >>>>>>>
> > >>>>>> topic)
> > >>>>>>
> > >>>>>>> -> (real consumer)
> > >>>>>>>
> > >>>>>>> That's 3x, and it's also 3x for every new record after the split
> as
> > >>>>>>>
> > >>>>>> well,
> > >>>>>
> > >>>>>> since you don't get to stop repartitioning/reshuffling once you
> > >>>>>>>
> > >>>>>> start.
> > >>>>
> > >>>>> Whereas if you do a backfill in something like the procedure I
> > >>>>>>>
> > >>>>>> outlined,
> > >>>>>
> > >>>>>> you only copy the prefix of the partition before the split, and
> you
> > >>>>>>>
> > >>>>>> send
> > >>>>>
> > >>>>>> it
> > >>>>>>
> > >>>>>>> once to the producer and then once to the new generation
> partition.
> > >>>>>>>
> > >>>>>> Plus,
> > >>>>>
> > >>>>>> assuming we're splitting the partition for the benefit of
> > >>>>>>>
> > >>>>>> consumers,
> > >>>
> > >>>> there's no reason we can't co-locate the post-split partitions on
> > >>>>>>>
> > >>>>>> the
> > >>>
> > >>>> same
> > >>>>>>
> > >>>>>>> host as the pre-split partition, making the second copy a local
> > >>>>>>>
> > >>>>>> filesystem
> > >>>>>>
> > >>>>>>> operation.
> > >>>>>>>
> > >>>>>>> Even if you follow these two copies up with bootstrapping a new
> > >>>>>>>
> > >>>>>> consumer,
> > >>>>>
> > >>>>>> it's still rare for this to occur, so you get to amortize these
> > >>>>>>>
> > >>>>>> copies
> > >>>>
> > >>>>> over
> > >>>>>>
> > >>>>>>> the lifetime of the topic, whereas a reshuffle just keeps making
> > >>>>>>>
> > >>>>>> copies
> > >>>>
> > >>>>> for
> > >>>>>>
> > >>>>>>> every new event.
> > >>>>>>>
> > >>>>>>> And finally, I really do think that regardless of any performance
> > >>>>>>>
> > >>>>>> concerns
> > >>>>>>
> > >>>>>>> about this operation, if it preserves loose organizational
> > >>>>>>>
> > >>>>>> coupling,
> > >>>
> > >>>> it
> > >>>>
> > >>>>> is
> > >>>>>>
> > >>>>>>> certainly worth it.
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> In conclusion:
> > >>>>>>> It might actually be a good idea for us to clarify the scope of
> > >>>>>>>
> > >>>>>> KIP-253.
> > >>>>>
> > >>>>>> If
> > >>>>>>
> > >>>>>>> we're all agreed that it's a good algorithm for allowing in-order
> > >>>>>>>
> > >>>>>> message
> > >>>>>
> > >>>>>> delivery during partition expansion, then we can continue this
> > >>>>>>>
> > >>>>>> discussion
> > >>>>>
> > >>>>>> as a new KIP, something like "backfill with partition expansion".
> > >>>>>>>
> > >>>>>> This
> > >>>>
> > >>>>> would let Dong proceed with KIP-253. On the other hand, if it seems
> > >>>>>>>
> > >>>>>> like
> > >>>>>
> > >>>>>> this conversation may alter the design of KIP-253, then maybe we
> > >>>>>>>
> > >>>>>> *should*
> > >>>>>
> > >>>>>> just finish working it out.
> > >>>>>>>
> > >>>>>>> For my part, my only concern about KIP-253 is the one I raised
> > >>>>>>>
> > >>>>>> earlier.
> > >>>>
> > >>>>> Thanks again, all, for considering these points,
> > >>>>>>> -John
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> On Tue, Mar 27, 2018 at 2:10 AM, Dong Lin <lindon...@gmail.com>
> > >>>>>>>
> > >>>>>> wrote:
> > >>>>
> > >>>>> On Tue, Mar 27, 2018 at 12:04 AM, Dong Lin <lindon...@gmail.com>
> > >>>>>>>>
> > >>>>>>> wrote:
> > >>>>>>
> > >>>>>>> Hey Jan,
> > >>>>>>>>>
> > >>>>>>>>> Thanks for the enthusiasm in improving Kafka's design. Now
> > >>>>>>>>>
> > >>>>>>>> that I
> > >>>
> > >>>> have
> > >>>>>>
> > >>>>>>> read through your discussion with Jun, here are my thoughts:
> > >>>>>>>>>
> > >>>>>>>>> - The latest proposal should with log compacted topics by
> > >>>>>>>>>
> > >>>>>>>> properly
> > >>>>
> > >>>>> deleting old messages after a new message with the same key is
> > >>>>>>>>>
> > >>>>>>>> produced.
> > >>>>>>>
> > >>>>>>>> So
> > >>>>>>>>
> > >>>>>>>>> it is probably not a concern anymore. Could you comment if
> > >>>>>>>>>
> > >>>>>>>> there
> > >>>
> > >>>> is
> > >>>>
> > >>>>> still
> > >>>>>>>
> > >>>>>>>> issue?
> > >>>>>>>>>
> > >>>>>>>>> - I wrote the SEP-5 and I am pretty familiar with the
> > >>>>>>>>>
> > >>>>>>>> motivation
> > >>
> > >>
> >
>

Reply via email to