Want to quickly step in here again because it is going places again.

The last part of the discussion is just a pain to read and completely diverged from what I suggested without making the reasons clear to me.

I don't know why this happens.... here are my comments anyway.

@Guozhang: That Streams is working on automatic creating copartition-usuable topics: great for streams, has literally nothing todo with the KIP as we want to grow the input topic. Everyone can reshuffle rel. easily but that is not what we need todo, we need to grow the topic in question. After streams automatically reshuffled the input topic still has the same size and it didn't help a bit. I fail to see why this is relevant. What am i missing here?

@Dong
I am still on the position that the current proposal brings us into the wrong direction. Especially introducing PartitionKeyRebalanceListener From this point we can never move away to proper state full handling without completely deprecating this creature from hell again. Linear hashing is not the optimising step we have todo here. An interface that when a topic is a topic its always the same even after it had grown or shrunk is important. So from my POV I have major concerns that this KIP is benefitial in its current state.

What is it that makes everyone so addicted to the idea of linear hashing? not attractive at all for me. And with statefull consumers still a complete mess. Why not stick with the Kappa architecture???





On 03.04.2018 17:38, Dong Lin wrote:
Hey John,

Thanks much for your comments!!

I have yet to go through the emails of John/Jun/Guozhang in detail. But let
me present my idea for how to minimize the delay for state loading for
stream use-case.

For ease of understanding, let's assume that the initial partition number
of input topics and change log topic are both 10. And initial number of
stream processor is also 10. If we only increase initial partition number
of input topics to 15 without changing number of stream processor, the
current KIP already guarantees in-order delivery and no state needs to be
moved between consumers for stream use-case. Next, let's say we want to
increase the number of processor to expand the processing capacity for
stream use-case. This requires us to move state between processors which
will take time. Our goal is to minimize the impact (i.e. delay) for
processing while we increase the number of processors.

Note that stream processor generally includes both consumer and producer.
In addition to consume from the input topic, consumer may also need to
consume from change log topic on startup for recovery. And producer may
produce state to the change log topic.


The solution will include the following steps:

1) Increase partition number of the input topic from 10 to 15. Since the
messages with the same key will still go to the same consume before and
after the partition expansion, this step can be done without having to move
state between processors.

2) Increase partition number of the change log topic from 10 to 15. Note
that this step can also be done without impacting existing workflow. After
we increase partition number of the change log topic, key space may split
and some key will be produced to the newly-added partition. But the same
key will still go to the same processor (i.e. consumer) before and after
the partition. Thus this step can also be done without having to move state
between processors.

3) Now, let's add 5 new consumers whose groupId is different from the
existing processor's groupId. Thus these new consumers will not impact
existing workflow. Each of these new consumers should consume two
partitions from the earliest offset, where these two partitions are the
same partitions that will be consumed if the consumers have the same
groupId as the existing processor's groupId. For example, the first of the
five consumers will consume partition 0 and partition 10. The purpose of
these consumers is to rebuild the state (e.g. RocksDB) for the processors
in advance. Also note that, by design of the current KIP, each consume will
consume the existing partition of the change log topic up to the offset
before the partition expansion. Then they will only need to consume the
state of the new partition of the change log topic.

4) After consumers have caught up in step 3), we should stop these
consumers and add 5 new processors to the stream processing job. These 5
new processors should run in the same location as the previous 5 consumers
to re-use the state (e.g. RocksDB). And these processors' consumers should
consume partitions of the change log topic from the committed offset the
previous 5 consumers so that no state is missed.

One important trick to note here is that, the mapping from partition to
consumer should also use linear hashing. And we need to remember the
initial number of processors in the job, 10 in this example, and use this
number in the linear hashing algorithm. This is pretty much the same as how
we use linear hashing to map key to partition. In this case, we get an
identity map from partition -> processor, for both input topic and the
change log topic. For example, processor 12 will consume partition 12 of
the input topic and produce state to the partition 12 of the change log
topic.

There are a few important properties of this solution to note:

- We can increase the number of partitions for input topic and the change
log topic in any order asynchronously.
- The expansion of the processors in a given job in step 4) only requires
the step 3) for the same job. It does not require coordination across
different jobs for step 3) and 4). Thus different jobs can independently
expand there capacity without waiting for each other.
- The logic for 1) and 2) is already supported in the current KIP. The
logic for 3) and 4) appears to be independent of the core Kafka logic and
can be implemented separately outside core Kafka. Thus the current KIP is
probably sufficient after we agree on the efficiency and the correctness of
the solution. We can have a separate KIP for Kafka Stream to support 3) and
4).


Cheers,
Dong


On Mon, Apr 2, 2018 at 3:25 PM, Guozhang Wang <wangg...@gmail.com> wrote:

Hey guys, just sharing my two cents here (I promise it will be shorter than
John's article :).

0. Just to quickly recap, the main discussion point now is how to support
"key partitioning preservation" (John's #4 in topic characteristics above)
beyond the "single-key ordering preservation" that KIP-253 was originally
proposed to maintain (John's #6 above).

1. From the streams project, we are actively working on improving the
elastic scalability of the library. One of the key features is to decouple
the input topics from the parallelism model of Streams: i.e. not enforcing
the topic to be partitioned by the key, not enforcing joining topics to be
co-partitioned, not relying the number of parallel tasks on the input topic
partitions. This can be achieved by re-shuffling on the input topics to
make sure key-partitioning / co-partitioning on the internal topics. Note
the re-shuffling task is purely stateless and hence does not require "key
partitioning preservation". Operational-wise it is similar to the "creating
a new topic with new number of partitions, pipe the data to the new topic
and cut over consumers from old topics" idea, just that users can
optionally let Streams to handle such rather than doing it manually
themselves. There are a few more details on that regard but I will skip
since they are not directly related to this discussion.

2. Assuming that 1) above is done, then the only topics involved in the
scaling events are all input topics. For these topics the only producers /
consumers of these topics are controlled by Streams clients themselves, and
hence achieving "key partitioning preservation" is simpler than non-Streams
scenarios: consumers know the partitioning scheme that producers are using,
so that for their stateful operations it is doable to split the local state
stores accordingly or execute backfilling on its own. Of course, if we
decide to do server-side backfilling, it can still help Streams to directly
rely on that functionality.

3. As John mentioned, another way inside Streams is to do over-partitioning
on all internal topics; then with 1) Streams would not rely on KIP-253 at
all. But personally I'd like to avoid it if possible to reduce Kafka side
footprint: say we overpartition each input topic up to 1k, with a
reasonable sized stateful topology it can still contribute to tens of
thousands of topics to the topic partition capacity of a single cluster.

4. Summing up 1/2/3, I think we should focus more on non-Streams users
writing their stateful computations with local states, and think whether /
how we could enable "key partitioning preservation" for them easily, than
to think heavily for Streams library. People may have different opinion on
how common of a usage pattern it is (I think Jun might be suggesting that
for DIY apps people may more likely use remote states so that it is not a
problem for them). My opinion is that for non-Streams users such usage
pattern could still be large (think: if you are piping data from Kafka to
an external data storage which has single-writer requirements for each
single shard, even though it is not a stateful computational application it
may still require "key partitioning preservation"), so I prefer to have
backfilling in our KIP than only exposing the API for expansion and
requires consumers to have pre-knowledge of the producer's partitioning
scheme.



Guozhang



On Thu, Mar 29, 2018 at 2:33 PM, John Roesler <j...@confluent.io> wrote:

Hey Dong,

Congrats on becoming a committer!!!

Since I just sent a novel-length email, I'll try and keep this one brief
;)
Regarding producer coordination, I'll grant that in that case, producers
may coordinate among themselves to produce into the same topic or to
produce co-partitioned topics. Nothing in KStreams or the Kafka ecosystem
in general requires such coordination for correctness or in fact for any
optional features, though, so I would not say that we require producer
coordination of partition logic. If producers currently coordinate, it's
completely optional and their own choice.

Regarding the portability of partition algorithms, my observation is that
systems requiring independent implementations of the same algorithm with
100% correctness are a large source of risk and also a burden on those
who
have to maintain them. If people could flawlessly implement algorithms in
actual software, the world would be a wonderful place indeed! For a
system
as important and widespread as Kafka, I would recommend restricting
limiting such requirements as aggressively as possible.

I'd agree that we can always revisit decisions like allowing arbitrary
partition functions, but of course, we shouldn't do that in a vacuum.
That
feels like the kind of thing we'd need to proactively seek guidance from
the users list about. I do think that the general approach of saying that
"if you use a custom partitioner, you cannot do partition expansion" is
very reasonable (but I don't think we need to go that far with the
current
proposal). It's similar to my statement in my email to Jun that in
principle KStreams doesn't *need* backfill, we only need it if we want to
employ partition expansion.

I reckon that the main motivation for backfill is to support KStreams use
cases and also any other use cases involving stateful consumers.

Thanks for your response, and congrats again!
-John


On Wed, Mar 28, 2018 at 1:34 AM, Dong Lin <lindon...@gmail.com> wrote:

Hey John,

Great! Thanks for all the comment. It seems that we agree that the
current
KIP is in good shape for core Kafka. IMO, what we have been discussing
in
the recent email exchanges is mostly about the second step, i.e. how to
address problem for the stream use-case (or stateful processing in
general).

I will comment inline.




On Tue, Mar 27, 2018 at 4:38 PM, John Roesler <j...@confluent.io>
wrote:
Thanks for the response, Dong.

Here are my answers to your questions:

- "Asking producers and consumers, or even two different producers,
to
share code like the partition function is a pretty huge ask. What
if
they
are using different languages?". It seems that today we already
require
different producer's to use the same hash function -- otherwise
messages
with the same key will go to different partitions of the same topic
which
may cause problem for downstream consumption. So not sure if it
adds
any
more constraint by assuming consumers know the hash function of
producer.
Could you explain more why user would want to use a cusmtom
partition
function? Maybe we can check if this is something that can be
supported
in
the default Kafka hash function. Also, can you explain more why it
is
difficuilt to implement the same hash function in different
languages?

Sorry, I meant two different producers as in producers to two
different
topics. This was in response to the suggestion that we already
require
coordination among producers to different topics in order to achieve
co-partitioning. I was saying that we do not (and should not).

It is probably common for producers of different team to produce
message
to
the same topic. In order to ensure that messages with the same key go
to
same partition, we need producers of different team to share the same
partition algorithm, which by definition requires coordination among
producers of different teams in an organization. Even for producers of
different topics, it may be common to require producers to use the same
partition algorithm in order to join two topics for stream processing.
Does
this make it reasonable to say we already require coordination across
producers?


By design, consumers are currently ignorant of the partitioning
scheme.
It
suffices to trust that the producer has partitioned the topic by key,
if
they claim to have done so. If you don't trust that, or even if you
just
need some other partitioning scheme, then you must re-partition it
yourself. Nothing we're discussing can or should change that. The
value
of
backfill is that it preserves the ability for consumers to avoid
re-partitioning before consuming, in the case where they don't need
to
today.

Regarding shared "hash functions", note that it's a bit inaccurate to
talk
about the "hash function" of the producer. Properly speaking, the
producer
has only a "partition function". We do not know that it is a hash.
The
producer can use any method at their disposal to assign a partition
to
a
record. The partition function obviously may we written in any
programming
language, so in general it's not something that can be shared around
without a formal spec or the ability to execute arbitrary executables
in
arbitrary runtime environments.

Yeah it is probably better to say partition algorithm. I guess it
should
not be difficult to implement same partition algorithms in different
languages, right? Yes we would need a formal specification of the
default
partition algorithm in the producer. I think that can be documented as
part
of the producer interface.


Why would a producer want a custom partition function? I don't
know...
why
did we design the interface so that our users can provide one? In
general,
such systems provide custom partitioners because some data sets may
be
unbalanced under the default or because they can provide some
interesting
functionality built on top of the partitioning scheme, etc. Having
provided
this ability, I don't know why we would remove it.

Yeah it is reasonable to assume that there was reason to support custom
partition function in producer. On the other hand it may also be
reasonable
to revisit this interface and discuss whether we actually need to
support
custom partition function. If we don't have a good reason, we can
choose
not to support custom partition function in this KIP in a backward
compatible manner, i.e. user can still use custom partition function
but
they would not get the benefit of in-order delivery when there is
partition
expansion. What do you think?


- Besides the assumption that consumer needs to share the hash
function
of
producer, is there other organization overhead of the proposal in
the
current KIP?

It wasn't clear to me that KIP-253 currently required the producer
and
consumer to share the partition function, or in fact that it had a
hard
requirement to abandon the general partition function and use a
linear
hash
function instead.

In my reading, there is a requirement to track the metadata about
what
partitions split into what other partitions during an expansion
operation.
If the partition function is linear, this is easy. If not, you can
always
just record that all old partitions split into all new partitions.
This
has
the effect of forcing all consumers to wait until the old epoch is
completely consumed before starting on the new epoch. But this may
be a
reasonable tradeoff, and it doesn't otherwise alter your design.

You only mention the consumer needing to know that the partition
function
is linear, not what the actual function is, so I don't think your
design
actually calls for sharing the function. Plus, really all the
consumer
needs is the metadata about what old-epoch partitions to wait for
before
consuming a new-epoch partition. This information is directly
captured
in
metadata, so I don't think it actually even cares whether the
partition
function is linear or not.

You are right that the current KIP does not mention it. My comment
related
to the partition function coordination was related to support the
stream-use case which we have been discussing so far.


So, no, I really think KIP-253 is in good shape. I was really more
talking
about the part of this thread that's outside of KIP-253's scope,
namely,
creating the possibility of backfilling partitions after expansion.

Great! Can you also confirm that the main motivation for backfilling
partitions after expansion is to support the stream use-case?


- Currently producer can forget about the message that has been
acknowledged by the broker. Thus the producer probably does not
know
most
of the exiting messages in topic, including those messages produced
by
other producers. We can have the owner of the producer to
split+backfill.
In my opion it will be a new program that wraps around the existing
producer and consumer classes.

This sounds fine by me!

Really, I was just emphasizing that the part of the organization that
produces a topic shouldn't have to export their partition function to
the
part(s) of the organization (or other organizations) that consume the
topic. Whether the backfill operation goes into the Producer
interface
is
secondary, I think.

- Regarding point 5. The argument is in favor of the split+backfill
but
for
changelog topic. And it intends to address the problem for stream
use-case
in general. In this KIP we will provide interface (i.e.
PartitionKeyRebalanceListener in the KIP) to be used by sream
use-case
and
the goal is that user can flush/re-consume the state as part of the
interface implementation regardless of whether there is change log
topic.
Maybe you are suggesting that the main reason to do split+backfill
of
input
topic is to support log compacted topics? You mentioned in Point 1
that
log
compacted topics is out of the scope of this KIP. Maybe I could
understand
your position better. Regarding Jan's proposal to split partitions
with
backfill, do you think this should replace the proposal in the
existing
KIP, or do you think this is something that we should do in
addition
to
the
existing KIP?

I think that interface is a good/necessary component of KIP-253.

I personally (FWIW) feel that KIP-253 is appropriately scoped, but I
do
think its utility will be limited unless there is a later KIP
offering
backfill. But, maybe unlike Jan, I think it makes sense to try and
tackle
the ordering problem independently of backfill, so I'm in support of
the
current KIP.

- Regarding point 6. I guess we can agree that it is better not to
have
the
performance overhread of copying the input data. Before we discuss
more
on
whether the performance overhead is acceptable or not, I am trying
to
figure out what is the benefit of introducing this overhread. You
mentioned
that the benefit is the loose organizational coupling. By
"organizational
coupling", are you referring to the requirement that consumer needs
to
know
the hash function of producer? If so, maybe we can discuss the
use-case
of
custom partiton function and see whether we can find a way to
support
such
use-case without having to copy the input data.

I'm not too sure about what an "input" is in this sense, since we are
just
talking about topics. Actually the point I was making there is that
AKAICT
the performance overhead of a backfill is less than any other option,
assuming you split partitions rarely.

By "input" I was referring to source Kafka topic of a stream processing
job.


Separately, yes, "organizational coupling" increases if producers and
consumers have to share code, such as the partition function. This
would
not be the case if producers could only pick from a menu of a few
well-known partition functions, but I think this is a poor tradeoff.

Maybe we can revisit the custom partition function and see whether we
actually need it? Otherwise, I am concerned that every user will pay
the
overhead of data movement to support something that was not really
needed
for most users.


To me, this is two strong arguments in favor of backfill being less
expensive than no backfill, but again, I think that particular debate
comes
after KIP-253, so I don't want to create the impression of opposition
to
your proposal.


Finally, to respond to a new email I just noticed:

BTW, here is my understanding of the scope of this KIP. We want to
allow
consumers to always consume messages with the same key from the
same
producer in the order they are produced. And we need to provide a
way
for
stream use-case to be able to flush/load state when messages with
the
same
key are migrated between consumers. In addition to ensuring that
this
goal
is correctly supported, we should do our best to keep the
performance
and
organization overhead of this KIP as low as possible.

I think we're on the same page there! In fact, I would generalize a
little
more and say that the mechanism you've designed provides *all
consumers*
the ability "to flush/load state when messages with the same key are
migrated between consumers", not just Streams.

Thanks for all the comment!


Thanks for the discussion,
-John



On Tue, Mar 27, 2018 at 3:14 PM, Dong Lin <lindon...@gmail.com>
wrote:
Hey John,

Thanks much for the detailed comments. Here are my thoughts:

- The need to delete messages from log compacted topics is mainly
for
performance (e.g. storage space) optimization than for correctness
for
this
KIP. I agree that we probably don't need to focus on this in our
discussion
since it is mostly for performance optimization.

- "Asking producers and consumers, or even two different producers,
to
share code like the partition function is a pretty huge ask. What
if
they
are using different languages?". It seems that today we already
require
different producer's to use the same hash function -- otherwise
messages
with the same key will go to different partitions of the same topic
which
may cause problem for downstream consumption. So not sure if it
adds
any
more constraint by assuming consumers know the hash function of
producer.
Could you explain more why user would want to use a cusmtom
partition
function? Maybe we can check if this is something that can be
supported
in
the default Kafka hash function. Also, can you explain more why it
is
difficuilt to implement the same hash function in different
languages?
- Besides the assumption that consumer needs to share the hash
function
of
producer, is there other organization overhead of the proposal in
the
current KIP?

- Currently producer can forget about the message that has been
acknowledged by the broker. Thus the producer probably does not
know
most
of the exiting messages in topic, including those messages produced
by
other producers. We can have the owner of the producer to
split+backfill.
In my opion it will be a new program that wraps around the existing
producer and consumer classes.

- Regarding point 5. The argument is in favor of the split+backfill
but
for
changelog topic. And it intends to address the problem for stream
use-case
in general. In this KIP we will provide interface (i.e.
PartitionKeyRebalanceListener in the KIP) to be used by sream
use-case
and
the goal is that user can flush/re-consume the state as part of the
interface implementation regardless of whether there is change log
topic.
Maybe you are suggesting that the main reason to do split+backfill
of
input
topic is to support log compacted topics? You mentioned in Point 1
that
log
compacted topics is out of the scope of this KIP. Maybe I could
understand
your position better. Regarding Jan's proposal to split partitions
with
backfill, do you think this should replace the proposal in the
existing
KIP, or do you think this is something that we should do in
addition
to
the
existing KIP?

- Regarding point 6. I guess we can agree that it is better not to
have
the
performance overhread of copying the input data. Before we discuss
more
on
whether the performance overhead is acceptable or not, I am trying
to
figure out what is the benefit of introducing this overhread. You
mentioned
that the benefit is the loose organizational coupling. By
"organizational
coupling", are you referring to the requirement that consumer needs
to
know
the hash function of producer? If so, maybe we can discuss the
use-case
of
custom partiton function and see whether we can find a way to
support
such
use-case without having to copy the input data.

Thanks,
Dong


On Tue, Mar 27, 2018 at 11:34 AM, John Roesler <j...@confluent.io>
wrote:
Hey Dong and Jun,

Thanks for the thoughtful responses. If you don't mind, I'll mix
my
replies
together to try for a coherent response. I'm not too familiar
with
mailing-list etiquette, though.

I'm going to keep numbering my points because it makes it easy
for
you
all
to respond.

Point 1:
As I read it, KIP-253 is *just* about properly fencing the
producers
and
consumers so that you preserve the correct ordering of records
during
partition expansion. This is clearly necessary regardless of
anything
else
we discuss. I think this whole discussion about backfill,
consumers,
streams, etc., is beyond the scope of KIP-253. But it would be
cumbersome
to start a new thread at this point.

I had missed KIP-253's Proposed Change #9 among all the
details...
I
think
this is a nice addition to the proposal. One thought is that it's
actually
irrelevant whether the hash function is linear. This is simply an
algorithm
for moving a key from one partition to another, so the type of
hash
function need not be a precondition. In fact, it also doesn't
matter
whether the topic is compacted or not, the algorithm works
regardless.
I think this is a good algorithm to keep in mind, as it might
solve a
variety of problems, but it does have a downside: that the
producer
won't
know whether or not K1 was actually in P1, it just knows that K1
was
in
P1's keyspace before the new epoch. Therefore, it will have to
pessimistically send (K1,null) to P1 just in case. But the next
time
K1
comes along, the producer *also* won't remember that it already
retracted
K1 from P1, so it will have to send (K1,null) *again*. By
extension,
every
time the producer sends to P2, it will also have to send a
tombstone
to
P1,
which is a pretty big burden. To make the situation worse, if
there
is
a
second split, say P2 becomes P2 and P3, then any key Kx belonging
to
P3
will also have to be retracted from P2 *and* P1, since the
producer
can't
know whether Kx had been last written to P2 or P1. Over a long
period
of
time, this clearly becomes a issue, as the producer must send an
arbitrary
number of retractions along with every update.

In contrast, the proposed backfill operation has an end, and
after
it
ends,
everyone can afford to forget that there ever was a different
partition
layout.

Really, though, figuring out how to split compacted topics is
beyond
the
scope of KIP-253, so I'm not sure #9 really even needs to be in
this
KIP...
We do need in-order delivery during partition expansion. It would
be
fine
by me to say that you *cannot* expand partitions of a
log-compacted
topic
and call it a day. I think it would be better to tackle that in
another
KIP.


Point 2:
Regarding whether the consumer re-shuffles its inputs, this is
always
on
the table; any consumer who wants to re-shuffle its input is free
to
do
so.
But this is currently not required. It's just that the current
high-level
story with Kafka encourages the use of partitions as a unit of
concurrency.
As long as consumers are single-threaded, they can happily
consume
a
single
partition without concurrency control of any kind. This is a key
aspect
to
this system that lets folks design high-throughput systems on top
of
it
surprisingly easily. If all consumers were instead
encouraged/required
to
implement a repartition of their own, then the consumer becomes
significantly more complex, requiring either the consumer to
first
produce
to its own intermediate repartition topic or to ensure that
consumer
threads have a reliable, high-bandwith channel of communication
with
every
other consumer thread.

Either of those tradeoffs may be reasonable for a particular user
of
Kafka,
but I don't know if we're in a position to say that they are
reasonable
for
*every* user of Kafka.


Point 3:
Regarding Jun's point about this use case, "(3) stateful and
maintaining
the
states in a local store", I agree that they may use a framework
*like*
Kafka Streams, but that is not the same as using Kafka Streams.
This
is
why
I think it's better to solve it in Core: because it is then
solved
for
KStreams and also for everything else that facilitates local
state
maintenance. To me, Streams is a member of the category of
"stream
processing frameworks", which is itself a subcategory of "things
requiring
local state maintenence". I'm not sure if it makes sense to
assert
that
Streams is a sufficient and practical replacement for everything
in
"things
requiring local state maintenence".

But, yes, I do agree that per-key ordering is an absolute
requirement,
therefore I think that KIP-253 itself is a necessary step.
Regarding
the
coupling of the state store partitioning to the topic
partitioning,
yes,
this is an issue we are discussing solutions to right now. We may
go
ahead
and introduce an overpartition layer on our inputs to solve it,
but
then
again, if we get the ability to split partitions with backfill,
we
may
not
need to!


Point 4:
On this:

Regarding thought 2: If we don't care about the stream
use-case,
then
the
current KIP probably has already addressed problem without
requiring
consumer to know the partition function. If we care about the
stream
use-case, we already need coordination across producers of
different
topics, i.e. the same partition function needs to be used by
producers
of
topics A and B in order to join topics A and B. Thus, it might
be
reasonable to extend coordination a bit and say we need
coordination
across
clients (i.e. producer and consumer), such that consumer knows
the
partition function used by producer. If we do so, then we can
let
consumer
re-copy data for the change log topic using the same partition
function
as
producer. This approach has lower overhead as compared to
having
producer
re-copy data of the input topic.
Also, producer currently does not need to know the data already
produced
to
the topic. If we let producer split/merge partition, it would
require
producer to consume the existing data, which intuitively is the
task
of
consumer.

I think we do care about use cases *like* Streams, I just don't
think
we
should rely on Streams to implement a feature of Core like
partition
expansion.

Note, though, that we (Streams) do not require coordination
across
producers. If two topics are certified to be co-partitioned, then
Streams
apps can make use of that knowledge to optimize their topology
(skipping
a
repartition). But if they don't know whether they are
co-partitioned,
then
they'd better go ahead and repartition within the topology. This
is
the
current state.

A huge selling point of Kafka is enabling different parts of
loosely
coupled organizations to produce and consume data independently.
Some
coordination between producers and consumers is necessary, like
coordinating on the names of topics and their schemas. But
Kafka's
value
proposition w.r.t. ESBs, etc. is inversely proportional to the
amount
of
coordination required. I think it behooves us to be extremely
skeptical
about introducing any coordination beyond correctness protocols.

Asking producers and consumers, or even two different producers,
to
share
code like the partition function is a pretty huge ask. What if
they
are
using different languages?

Comparing organizational overhead vs computational overhead,
there
are
maybe two orders of magnitude difference between them. In other
words,
I
would happily take on the (linear) overhead of having the
producer
re-copy
the data once during a re-partition in order to save the
organizational
overhead of tying all the producers and consumers together across
multiple
boundaries.

On that last paragraph: note that the producer *did* know the
data
it
already produced. It handled it the first time around. Asking it
to
re-produce it into a new partition layout is squarely within its
scope
of
capabilities. Contrast this with the alternative, asking the
consumer
to
re-partition the data. I think this is even less intuitive, when
the
partition function belongs to the producer.


Point 5:
Dong asked this:

For stream use-case that needs to increase consumer number, the
existing consumer can backfill the existing data in the change
log
topic
to
the same change log topic with the new partition number, before
the
new
set
of consumers bootstrap state from the new partitions of the
change
log
topic, right?

In this sense, the "consumer" is actually the producer of the
changelog
topic, so if we support partition expansion + backfill as a
producer/broker
operation, then it would be very straightforward for Streams to
split a
state store. As you say, they would simply instruct the broker to
split
the
changelog topic's partitions, then backfill. Once the backfill is
ready,
they can create a new crop of StandbyTasks to bootstrap the more
granular
state stores and finally switch over to them when they are ready.

But this actually seems to be an argument in favor of
split+backfill,
so
maybe I missed the point.

You also asked me to explain why copying the "input" topic is
better
than
copying the "changelog" topic. I think they are totally
independent,
actually. For one thing, you can't depend on the existence of a
"changelog"
topic in general, only within Streams, but Kafka's user base
clearly
exceeds Streams's user base. Plus, you actually also can't depend
on
the
existence of a changelog topic within Streams, since that is an
optional
feature of *some* state store implementations. Even in the
situation
where
you do have a changelog topic in Streams, there may be use cases
where
it
makes sense to expand the partitions of just the input, or just
the
changelog.

The ask for a Core feature of split+backfill is really about
supporting
the
use case of splitting partitions in log-compacted topics,
regardless
of
whether that topic is an "input" or a "changelog" or anything
else
for
that
matter.


Point 6:
On the concern about the performance overhead of copying data
between
the
brokers, I think it's actually a bit overestimated. Splitting a
topic's
partition is probably rare, certainly rarer in general than
bootstrapping
new consumers on that topic. If "bootstrapping new consumers"
means
that
they have to re-shuffle the data before they consume it, then you
wind
up
copying the same record multiple times:

(broker: input topic) -> (initial consumer) -> (broker:
repartition
topic)
-> (real consumer)

That's 3x, and it's also 3x for every new record after the split
as
well,
since you don't get to stop repartitioning/reshuffling once you
start.
Whereas if you do a backfill in something like the procedure I
outlined,
you only copy the prefix of the partition before the split, and
you
send
it
once to the producer and then once to the new generation
partition.
Plus,
assuming we're splitting the partition for the benefit of
consumers,
there's no reason we can't co-locate the post-split partitions
on
the
same
host as the pre-split partition, making the second copy a local
filesystem
operation.

Even if you follow these two copies up with bootstrapping a new
consumer,
it's still rare for this to occur, so you get to amortize these
copies
over
the lifetime of the topic, whereas a reshuffle just keeps making
copies
for
every new event.

And finally, I really do think that regardless of any performance
concerns
about this operation, if it preserves loose organizational
coupling,
it
is
certainly worth it.


In conclusion:
It might actually be a good idea for us to clarify the scope of
KIP-253.
If
we're all agreed that it's a good algorithm for allowing in-order
message
delivery during partition expansion, then we can continue this
discussion
as a new KIP, something like "backfill with partition expansion".
This
would let Dong proceed with KIP-253. On the other hand, if it
seems
like
this conversation may alter the design of KIP-253, then maybe we
*should*
just finish working it out.

For my part, my only concern about KIP-253 is the one I raised
earlier.
Thanks again, all, for considering these points,
-John


On Tue, Mar 27, 2018 at 2:10 AM, Dong Lin <lindon...@gmail.com>
wrote:
On Tue, Mar 27, 2018 at 12:04 AM, Dong Lin <
lindon...@gmail.com>
wrote:
Hey Jan,

Thanks for the enthusiasm in improving Kafka's design. Now
that I
have
read through your discussion with Jun, here are my thoughts:

- The latest proposal should with log compacted topics by
properly
deleting old messages after a new message with the same key
is
produced.
So
it is probably not a concern anymore. Could you comment if
there
is
still
issue?

- I wrote the SEP-5 and I am pretty familiar with the
motivation
and
the
design of SEP-5. SEP-5 is probably orthornal to the
motivation
of
this
KIP.
The goal of SEP-5 is to allow user to increase task number of
an
existing
Samza job. But if we increase the partition number of input
topics,
messages may still be consumed out-of-order by tasks in Samza
which
cause
incorrect result. Similarly, the approach you proposed does
not
seem
to
ensure that the messages can be delivered in order, even if
we
can
make
sure that each consumer instance is assigned the set of new
partitions
covering the same set of keys.

Let me correct this comment. The approach of copying data to a
new
topic
can ensure in-order message delivery suppose we properly
migrate
offsets
from old topic to new topic.


- I am trying to understand why it is better to copy the data
instead
of
copying the change log topic for streaming use-case. For core
Kafka
use-case, and for the stream use-case that does not need to
increase
consumers, the current KIP already supports in-order delivery
without
the
overhead of copying the data. For stream use-case that needs
to
increase
consumer number, the existing consumer can backfill the
existing
data
in
the change log topic to the same change log topic with the
new
partition
number, before the new set of consumers bootstrap state from
the
new
partitions of the change log topic. If this solution works,
then
could
you
summarize the advantage of copying the data of input topic as
compared
to
copying the change log topic? For example, does it enable
more
use-case,
simplify the implementation of Kafka library, or reduce the
operation
overhead etc?

Thanks,
Dong


On Wed, Mar 21, 2018 at 6:57 AM, Jan Filipiak <
jan.filip...@trivago.com>
wrote:

Hi Jun,

I was really seeing progress in our conversation but your
latest
reply
is
just devastating.
I though we were getting close being on the same page now it
feels
like
we are in different libraries.

I just quickly slam my answers in here. If they are to
brief I
am
sorry
give me a ping and try to go into details more.
Just want to show that your pro/cons listing is broken.

Best Jan

and want to get rid of this horrible compromise


On 19.03.2018 05:48, Jun Rao wrote:

Hi, Jan,

Thanks for the discussion. Great points.

Let me try to summarize the approach that you are
proposing.
On
the
broker
side, we reshuffle the existing data in a topic from
current
partitions
to
the new partitions. Once the reshuffle fully catches up,
switch
the
consumers to start consuming from the new partitions. If a
consumer
needs
to rebuild its local state (due to partition changes), let
the
consumer
rebuild its state by reading all existing data from the new
partitions.
Once all consumers have switches over, cut over the
producer
to
the
new
partitions.

The pros for this approach are that :
1. There is just one way to rebuild the local state, which
is
simpler.
true thanks

The cons for this approach are:
1. Need to copy existing data.

Very unfair and not correct. It does not require you to copy
over
existing data. It _allows_ you to copy all existing data.

2. The cutover of the producer is a bit complicated since it
needs
to
coordinate with all consumer groups.

Also not true. I explicitly tried to make clear that there
is
only
one
special consumer (in the case of actually copying data)
coordination
is
required.

3. The rebuilding of the state in the consumer is from the
input
topic,
which can be more expensive than rebuilding from the
existing
state.
true, but rebuilding state is only required if you want to
increase
processing power, so we assume this is at hand.

4. The broker potentially has to know the partitioning
function.
If
this
needs to be customized at the topic level, it can be a bit
messy.
I would argue against having the operation being performed
by
the
broker.
This was not discussed yet but if you see my original email
i
suggested
otherwise from the beginning.

Here is an alternative approach by applying your idea not
in
the
broker,
but in the consumer. When new partitions are added, we
don't
move
existing
data. In KStreams, we first reshuffle the new input data
to a
new
topic
T1
with the old number of partitions and feed T1's data to the
rest
of
the
pipeline. In the meantime, KStreams reshuffles all existing
data
of
the
change capture topic to another topic C1 with the new
number
of
partitions.
We can then build the state of the new tasks from C1. Once
the
new
states
have been fully built, we can cut over the consumption to
the
input
topic
and delete T1. This approach works with compacted topic
too.
If
an
application reads from the beginning of a compacted topic,
the
consumer
will reshuffle the portion of the input when the number of
partitions
doesn't match the number of tasks.

We all wipe this idea from our heads instantly. Mixing Ideas
from
an
argument is not a resolution strategy
just leads to horrible horrible software.


The pros of this approach are:
1. No need to copy existing data.
2. Each consumer group can cut over to the new partitions
independently.
3. The state is rebuilt from the change capture topic,
which
is
cheaper
than rebuilding from the input topic.
4. Only the KStreams job needs to know the partitioning
function.
The cons of this approach are:
1. Potentially the same input topic needs to be reshuffled
more
than
once
in different consumer groups during the transition phase.

What do you think?

Thanks,

Jun



On Thu, Mar 15, 2018 at 1:04 AM, Jan Filipiak <
jan.filip...@trivago.com>
wrote:

Hi Jun,
thank you for following me on these thoughts. It was
important
to
me
to
feel that kind of understanding for my arguments.

What I was hoping for (I mentioned this earlier) is that
we
can
model
the
case where we do not want to copy the data the exact same
way
as
the
case
when we do copy the data. Maybe you can peek into the
mails
before
to
see
more details for this.

This means we have the same mechanism to transfer consumer
groups
to
switch topic. The offset mapping that would be generated
would
even
be
simpler End Offset of the Old topic => offset 0 off all
the
partitions
of
the new topic. Then we could model the transition of a
non-copy
expansion
the exact same way as a copy-expansion.

I know this only works when topic growth by a factor. But
the
benefits
of
only growing by a factor are to strong anyways. See
Clemens's
hint
and
remember that state reshuffling is entirely not needed if
one
doesn't
want
to grow processing power.

I think these benefits should be clear, and that there is
basically
no
downside to what is currently at hand but just makes
everything
easy.
One thing you need to know is. that if you do not offer
rebuilding a
log
compacted topic like i suggest that even if you have
consumer
state
reshuffling. The topic is broken and can not be used to
bootstrap
new
consumers. They don't know if they need to apply a key
from
and
old
partition or not. This is a horrible downside I haven't
seen a
solution
for
in the email conversation.

I argue to:

Only grow topic by a factor always.
Have the "no copy consumer" transition as the trivial case
of
the
"copy
consumer transition".
If processors needs to be scaled, let them rebuild from
the
new
topic
and
leave the old running in the mean time.
Do not implement key shuffling in streams.

I hope I can convince you especially with the fact how I
want
to
handle
consumer transition. I think
you didn't quite understood me there before. I think the
term
"new
topic"
intimidated you a little.
How we solve this on disc doesn't really matter, If the
data
goes
into
the
same Dir or a different Dir or anything. I do think that
it
needs
to
involve at least rolling a new segment for the existing
partitions.
But most of the transitions should work without restarting
consumers.
(newer consumers with support for this). But with new
topic
i
just
meant
the topic that now has a different partition count. Plenty
of
ways
to
handle that (versions, aliases)

Hope I can further get my idea across.

Best Jan






On 14.03.2018 02:45, Jun Rao wrote:

Hi, Jan,
Thanks for sharing your view.

I agree with you that recopying the data potentially
makes
the
state
management easier since the consumer can just rebuild its
state
from
scratch (i.e., no need for state reshuffling).

On the flip slide, I saw a few disadvantages of the
approach
that
you
suggested. (1) Building the state from the input topic
from
scratch
is
in
general less efficient than state reshuffling. Let's say
one
computes a
count per key from an input topic. The former requires
reading
all
existing
records in the input topic whereas the latter only
requires
reading
data
proportional to the number of unique keys. (2) The
switching
of
the
topic
needs modification to the application. If there are many
applications
on a
topic, coordinating such an effort may not be easy. Also,
it's
not
clear
how to enforce exactly-once semantic during the switch.
(3)
If
a
topic
doesn't need any state management, recopying the data
seems
wasteful.
In
that case, in place partition expansion seems more
desirable.
I understand your concern about adding complexity in
KStreams.
But,
perhaps
we could iterate on that a bit more to see if it can be
simplified.
Jun


On Mon, Mar 12, 2018 at 11:21 PM, Jan Filipiak <
jan.filip...@trivago.com>
wrote:

Hi Jun,

I will focus on point 61 as I think its _the_
fundamental
part
that
I
cant
get across at the moment.

Kafka is the platform to have state materialized
multiple
times
from
one
input. I emphasize this: It is the building block in
architectures
that
allow you to
have your state maintained multiple times. You put a
message
in
once,
and
you have it pop out as often as you like. I believe you
understand
this.

Now! The path of thinking goes the following: I am using
apache
kafka
and
I _want_ my state multiple times. What am I going todo?

A) Am I going to take my state that I build up, plunge
some
sort
of
RPC
layer ontop of it, use that RPC layer to throw my
records
across
instances?
B) Am I just going to read the damn message twice?

Approach A is fundamentally flawed and a violation of
all
that
is
good
and
holy in kafka deployments. I can not understand how this
Idea
can
come in
the first place.
(I do understand: IQ in streams, they polluted the kafka
streams
codebase
really bad already. It is not funny! I think they are
equally
flawed
as
A)

I say, we do what Kafka is good at. We repartition the
topic
once.
We
switch the consumers.
(Those that need more partitions are going to rebuild
their
state
in
multiple partitions by reading the new topic, those that
don't
just
assign
the new partitions properly)
We switch producers. Done!

The best thing! It is trivial, hipster stream processor
will
have
an
easy
time with that aswell. Its so super simple. And simple
IS
good!
It is what kafka was build todo. It is how we do it
today.
All I
am
saying
is that a little broker help doing the producer swap is
super
useful.
For everyone interested in why kafka is so powerful with
approach
B,
please watch https://youtu.be/bEbeZPVo98c?t=1633
I already looked up a good point in time, I think after
5
minutes
the
"state" topic is handled and you should be able to
understand
me
and inch better.

Please do not do A to the project, it deserves better!

Best Jan



On 13.03.2018 02:40, Jun Rao wrote:

Hi, Jan,

Thanks for the reply. A few more comments below.

50. Ok, we can think a bit harder for supporting
compacted
topics.
51. This is a fundamental design question. In the more
common
case,
the
reason why someone wants to increase the number of
partitions
is
that
the
consumer application is slow and one wants to run more
consumer
instances
to increase the degree of parallelism. So, fixing the
number
of
running
consumer instances when expanding the partitions won't
help
this
case.
If
we do need to increase the number of consumer
instances,
we
need
to
somehow
reshuffle the state of the consumer across instances.
What
we
have
been
discussing in this KIP is whether we can do this more
effectively
through
the KStream library (e.g. through a 2-phase partition
expansion).
This
will
add some complexity, but it's probably better than
everyone
doing
this
in
the application space. The recopying approach that you
mentioned
doesn't
seem to address the consumer state management issue
when
the
consumer
switches from an old to a new topic.

52. As for your example, it depends on whether the join
key
is
the
same
between (A,B) and (B,C). If the join key is the same,
we
can
do a
2-phase
partition expansion of A, B, and C together. If the
join
keys
are
different, one would need to repartition the data on a
different
key
for
the second join, then the partition expansion can be
done
independently
between (A,B) and (B,C).

53. If you always fix the number of consumer instances,
we
you
described
works. However, as I mentioned in #51, I am not sure
how
your
proposal
deals with consumer states when the number of consumer
instances
grows.
Also, it just seems that it's better to avoid
re-copying
the
existing
data.

60. "just want to throw in my question from the longer
email
in
the
other
Thread here. How will the bloom filter help a new
consumer
to
decide
to
apply the key or not?" Not sure that I fully understood
your
question.
The
consumer just reads whatever key is in the log. The
bloom
filter
just
helps
clean up the old keys.

61. "Why can we afford having a topic where its
apparently
not
possible
to
start a new application on? I think this is an overall
flaw
of
the
discussed idea here. Not playing attention to the
overall
architecture."
Could you explain a bit more when one can't start a new
application?
Jun



On Sat, Mar 10, 2018 at 1:40 AM, Jan Filipiak <
jan.filip...@trivago.com
wrote:

Hi Jun, thanks for your mail.

Thank you for your questions!
I think they are really good and tackle the core of
the
problem
I
see.

I will answer inline, mostly but still want to set the
tone
here.
The core strength of kafka is what Martin once called
the
kappa-Architecture. How does this work?
You have everything as a log as in kafka. When you
need
to
change
something.
You create the new version of your application and
leave
it
running
in
parallel.
Once the new version is good you switch your users to
use
the
new
application.

The online reshuffling effectively breaks this
architecture
and
I
think
the switch in thinking here is more harmful
than any details about the partitioning function to
allow
such a
change.
I
feel with my suggestion we are the closest to
the original and battle proven architecture and I can
only
warn
to
move
away from it.

I might have forgotten something, sometimes its hard
for
me
to
getting
all
the thoughts captured in a mail, but I hope the
comments
inline
will
further make my concern clear, and put some emphasis
on
why
I
prefer my
solution ;)

One thing we should all be aware of when discussing
this,
and
I
think
Dong
should have mentioned it (maybe he did).
We are not discussing all of this out of thin air but
there
is
an
effort
in the Samza project.

https://cwiki.apache.org/confl
uence/display/SAMZA/SEP-
5%3A+
Enable+partition+expansion+of+input+streams
https://issues.apache.org/jira/browse/SAMZA-1293

To be clear. I think SEP-5 (state of last week, dont
know
if
it
adapted
to
this discussion) is on a way better path than KIP-253,
and I
can't
really
explain why.

Best Jan,

nice weekend everyone


On 09.03.2018 03:36, Jun Rao wrote:

Hi, Jan,

Thanks for the feedback. Just some comments on the
earlier
points
that
you
mentioned.

50. You brought up the question of whether existing
data
needs
to
be
copied
during partition expansion. My understand of your
view
is
that
avoid
copying existing data will be more efficient, but it
doesn't
work
well
with
compacted topics since some keys in the original
partitions
will
never
be
cleaned. It would be useful to understand your use
case
of
compacted
topics
a bit more. In the common use case, the data volume
in
a
compacted
topic
may not be large. So, I am not sure if there is a
strong
need
to
expand
partitions in a compacted topic, at least initially.

I do agree. State is usually smaller. Update rates
might
be
also
competitively high.
Doing Log-compaction (even beeing very efficient and
configurable)
is
also
a more expensive operation than
just discarding old segments. Further if you want to
use
more
consumers
processing the events
you also have to grow the number of partitions.
Especially
for
use-cases
we do (KIP-213) a tiny state full
table might be very expensive to process if it joins
against a
huge
table.

I can just say we have been in the spot of needing to
grow
log
compacted
topics. Mainly for processing power we can bring to
the
table.
Further i am not at all concerned about the extra
spaced
used
by
"garbage
keys". I am more concerned about the correctness of
innocent
consumers.
The
logic becomes complicated. Say for streams one would
need
to
load
the
record into state but not forward it the topology ( to
have
it
available
for shuffeling). I rather have it simple and a topic
clean
regardless
if
it
still has its old partition count. Especially with
multiple
partitions
growth's I think it becomes insanely hard to to this
shuffle
correct.
Maybe
Streams and Samza can do it. Especially if you do
"hipster
stream
processing" <https://www.confluent.io/blog
/introducing-kafka-streams-
stream-processing-made-simple/>. This makes kafka way
to
complicated.
With my approach I think its way simpler because the
topic
has
no
"history"
in terms of partitioning but is always clean.


51. "Growing the topic by an integer factor does not
require
any
state

redistribution at all." Could you clarify this a bit
more?
Let's
say
you
have a consumer app that computes the windowed count
per
key.
If
you
double
the number of partitions from 1 to 2 and grow the
consumer
instances
from
1
to 2, we would need to redistribute some of the
counts
to
the
new
consumer
instance. Regarding to linear hashing, it's true that
it
won't
solve
the
problem with compacted topics. The main benefit is
that
it
redistributes
the keys in one partition to no more than two
partitions,
which
could
help
redistribute the state.

You don't need to spin up a new consumer in this
case.
every
consumer

would just read every partition with the (partition %
num_task)
task.
it
will still have the previous data right there and can
go
on.
This sounds contradictory to what I said before, but
please
bear
with
me.

52. Good point on coordinating the expansion of 2
topics
that
need
to
be

joined together. This is where the 2-phase partition
expansion
could
potentially help. In the first phase, we could add
new
partitions
to
the 2
topics one at a time but without publishing to the
new
patitions.
Then,
we
can add new consumer instances to pick up the new
partitions.
In
this
transition phase, no reshuffling is needed since no
data
is
coming
from
the
new partitions. Finally, we can enable the publishing
to
the
new
partitions.

I think its even worse than you think. I would like
to
introduce
the

Term
transitive copartitioning. Imagine
2 streams application. One joins (A,B) the other (B,C)
then
there
is a
transitive copartition requirement for
(A,C) to be copartitioned aswell. This can spread
significantly
and
require many consumers to adapt at the same time.

It is also not entirely clear to me how you not need
reshuffling
in
this
case. If A has a record that never gets updated after
the
expansion
and
the
coresponding B record moves to a new partition. How
shall
they
meet
w/o
shuffle?

53. "Migrating consumer is a step that might be made
completly
unnecessary
if - for example streams - takes the gcd as
partitioning
scheme
instead
of
enforcing 1 to 1." Not sure that I fully understand
this. I
think
you
mean
that a consumer application can run more instances
than
the
number
of
partitions. In that case, the consumer can just
repartitioning
the
input
data according to the number of instances. This is
possible,
but
just
has
the overhead of reshuffling the data.

No what I meant is ( that is also your question i
think
Mathias)
that
if

you grow a topic by a factor.
Even if your processor is statefull you can can just
assign
all
the
multiples of the previous partition to
this consumer and the state to keep processing
correctly
will
be
present
w/o any shuffling.

Say you have an assignment
Statefull consumer => partition
0 => 0
1 => 1
2 => 2

and you grow you topic by 4 you get,

0 => 0,3,6,9
1 => 1,4,7,10
2 => 2,5,8,11

Say your hashcode is 8. 8%3 => 2  before so consumer
for
partition 2
has
it.
Now you you have 12 partitions so 8%12 => 8, so it
goes
into
partition
8
which is assigned to the same consumer
who had 2 before and therefore knows the key.

Userland reshuffeling is there as an options. And it
does
exactly
what
I
suggest. And I think its the perfect strategie. All I
am
suggestion
is
broker side support to switch the producers to the
newly
partitioned
topic.
Then the old (to few partition topic) can go away.
Remember
the
list
of
steps in the beginning of this thread. If one has
broker
support
for
all
where its required and streams support for those that
aren’t
necessarily.
Then one has solved the problem.
I repeat it because I think its important. I am really
happy
that
you
brought that up! because its 100% what I want just
with
the
differences
to
have an option to discard the to small topic later
(after
all
consumers
adapted). And to have order correct there. I need
broker
support
managing
the copy process + the produces and fence them against
each
other. I
also
repeat. the copy process can run for weeks in the
worst
case.
Copying
the
data is not the longest task migrating consumers might
very
well
be.
Once all consumers switched and copying is really up
to
date
(think
ISR
like up to date) only then we stop the producer, wait
for
the
copy
to
finish and use the new topic for producing.

After this the topic is perfect in shape. and no one
needs
to
worry
about
complicated stuff. (old keys hanging around might
arrive
in
some
other
topic later.....). can only imagine how many tricky
bugs
gonna
arrive
after
someone had grown and shrunken is topic 10 times.






54. "The other thing I wanted to mention is that I
believe
the
current

suggestion (without copying data over) can be
implemented
in
pure
userland
with a custom partitioner and a small feedbackloop
from
ProduceResponse
=>
Partitionier in coorporation with a change management
system."
I
am
not
sure a customized partitioner itself solves the
problem.
We
probably
need
some broker side support to enforce when the new
partitions
can
be
used.
We
also need some support on the consumer/kstream side
to
preserve
the
per
key
ordering and potentially migrate the processing
state.
This
is
not
trivial
and I am not sure if it's ideal to fully push to the
application
space.

Broker support is defenitly the preferred way here. I
have
nothing
against
broker support.
I tried to say that for what I would preffer - copying
the
data
over,
at
least for log compacted topics -
I would require more broker support than the KIP
currently
offers.


Jun

On Tue, Mar 6, 2018 at 10:33 PM, Jan Filipiak <
jan.filip...@trivago.com
wrote:

Hi Dong,

are you actually reading my emails, or are you just
using
the
thread I

started for general announcements regarding the KIP?

I tried to argue really hard against linear hashing.
Growing
the
topic
by
an integer factor does not require any state
redistribution
at
all. I
fail
to see completely where linear hashing helps on log
compacted
topics.

If you are not willing to explain to me what I might
be
overlooking:
that
is fine.
But I ask you to not reply to my emails then. Please
understand
my
frustration with this.

Best Jan



On 06.03.2018 19:38, Dong Lin wrote:

Hi everyone,

Thanks for all the comments! It appears that
everyone
prefers
linear

hashing because it reduces the amount of state that
needs
to
be
moved
between consumers (for stream processing). The KIP
has
been
updated
to
use
linear hashing.

Regarding the migration endeavor: it seems that
migrating
producer
library
to use linear hashing should be pretty
straightforward
without
much operational endeavor. If we don't upgrade
client
library
to
use
this
KIP, we can not support in-order delivery after
partition
is
changed
anyway. Suppose we upgrade client library to use
this
KIP,
if
partition
number is not changed, the key -> partition mapping
will
be
exactly
the
same as it is now because it is still determined
using
murmur_hash(key)
%
original_partition_num. In other words, this change
is
backward
compatible.

Regarding the load distribution: if we use linear
hashing,
the
load
may
be
unevenly distributed because those partitions which
are
not
split
may
receive twice as much traffic as other partitions
that
are
split.
This
issue can be mitigated by creating topic with
partitions
that
are
several
times the number of consumers. And there will be no
imbalance
if
the
partition number is always doubled. So this
imbalance
seems
acceptable.

Regarding storing the partition strategy as
per-topic
config:
It
seems
not
necessary since we can still use murmur_hash as the
default
hash
function
and additionally apply the linear hashing algorithm
if
the
partition
number
has increased. Not sure if there is any use-case
for
producer
to
use a
different hash function. Jason, can you check if
there
is
some
use-case
that I missed for using the per-topic partition
strategy?
Regarding how to reduce latency (due to state
store/load)
in
stream
processing consumer when partition number changes:
I
need
to
read
the
Kafka
Stream code to understand how Kafka Stream
currently
migrate
state
between
consumers when the application is added/removed
for a
given
job.
I
will
reply after I finish reading the documentation and
code.

Thanks,
Dong


On Mon, Mar 5, 2018 at 10:43 AM, Jason Gustafson <
ja...@confluent.io>
wrote:

Great discussion. I think I'm wondering whether we
can
continue
to
leave

Kafka agnostic to the partitioning strategy. The
challenge
is
communicating
the partitioning logic from producers to consumers
so
that
the
dependencies
between each epoch can be determined. For the sake
of
discussion,
imagine
you did something like the following:

1. The name (and perhaps version) of a
partitioning
strategy
is
stored
in
topic configuration when a topic is created.
2. The producer looks up the partitioning strategy
before
writing
to
a
topic and includes it in the produce request (for
fencing).
If
it
doesn't
have an implementation for the configured
strategy,
it
fails.
3. The consumer also looks up the partitioning
strategy
and
uses it
to
determine dependencies when reading a new epoch.
It
could
either
fail
or
make the most conservative dependency assumptions
if
it
doesn't
know
how
to
implement the partitioning strategy. For the
consumer,
the
new
interface
might look something like this:

// Return the partition dependencies following an
epoch
bump
Map<Integer, List<Integer>> dependencies(int
numPartitionsBeforeEpochBump,
int numPartitionsAfterEpochBump)

The unordered case then is just a particular
implementation
which
never
has
any epoch dependencies. To implement this, we
would
need
some
way
for
the
consumer to find out how many partitions there
were
in
each
epoch,
but
maybe that's not too unreasonable.

Thanks,
Jason


On Mon, Mar 5, 2018 at 4:51 AM, Jan Filipiak <
jan.filip...@trivago.com
wrote:

Hi Dong

thank you very much for your questions.

regarding the time spend copying data across:
It is correct that copying data from a topic with
one
partition
mapping

to

a topic with a different partition mapping takes
way
longer
than

we
can

stop producers. Tens of minutes is a very
optimistic
estimate
here.

Many
people can not afford copy full steam and
therefore
will
have
some
rate
limiting in place, this can bump the timespan
into
the
day's.
The
good

part

is that the vast majority of the data can be
copied
while
the
producers

are

still going. One can then, piggyback the
consumers
ontop
of
this

timeframe,

by the method mentioned (provide them an mapping
from
their
old
offsets

to

new offsets in their repartitioned topics. In
that
way
we
separate

migration of consumers from migration of producers
(decoupling
these
is
what kafka is strongest at). The time to actually
swap
over
the
producers
should be kept minimal by ensuring that when a
swap
attempt
is
started

the

consumer copying over should be very close to the
log
end
and
is

expected

to finish within the next fetch. The operation
should
have
a
time-out
and
should be "reattemtable".

Importance of logcompaction:
If a producer produces key A, to partiton 0, its
forever
gonna
be
there,
unless it gets deleted. The record might sit in
there
for
years. A
new
producer started with the new partitions will
fail
to
delete
the
record

in

the correct partition. Th record will be there
forever
and
one
can

not

reliable bootstrap new consumers. I cannot see how
linear
hashing

can

solve

this.

Regarding your skipping of userland copying:

100%, copying the data across in userland is, as
far
as i
can
see,
only
a
usecase for log compacted topics. Even for
logcompaction
+
retentions
it
should only be opt-in. Why did I bring it up? I
think
log
compaction
is
a
very important feature to really embrace kafka
as a
"data
plattform".
The
point I also want to make is that copying data
this
way
is
completely
inline with the kafka architecture. it only
consists
of
reading
and

writing

to topics.

I hope it clarifies more why I think we should aim
for
more
than
the
current KIP. I fear that once the KIP is done not
much
more
effort
will

be

taken.

On 04.03.2018 02:28, Dong Lin wrote:
Hey Jan,

In the current proposal, the consumer will be
blocked
on
waiting
for

other
consumers of the group to consume up to a given
offset.
In
most

cases,
all
consumers should be close to the LEO of the
partitions
when
the
partition
expansion happens. Thus the time waiting should
not
be
long
e.g.
on
the

order of seconds. On the other hand, it may take
a
long
time
to
wait

for
the entire partition to be copied -- the amount
of
time
is
proportional

to

the amount of existing data in the partition,
which
can
take
tens of

minutes. So the amount of time that we stop
consumers
may
not
be
on

the
same order of magnitude.

If we can implement this suggestion without
copying
data
over
in
purse
userland, it will be much more valuable. Do you
have
ideas
on
how
this

can

be done?

Not sure why the current KIP not help people who
depend
on
log
compaction.
Could you elaborate more on this point?

Thanks,

Dong
On Wed, Feb 28, 2018 at 10:55 PM, Jan
Filipiak<Jan.Filipiak@trivago.
com
wrote:

Hi Dong,

I tried to focus on what the steps are one can
currently
perform
to

expand

or shrink a keyed topic while maintaining a top
notch
semantics.
I can understand that there might be confusion
about
"stopping
the
consumer". It is exactly the same as proposed
in
the
KIP.
there
needs

to

be

a time the producers agree on the new
partitioning.
The
extra
semantics I
want to put in there is that we have a
possibility
to
wait
until

all
the

existing data
is copied over into the new partitioning scheme.
When
I
say
stopping

I
think more of having a memory barrier that
ensures
the
ordering. I
am
still
aming for latencies  on the scale of leader
failovers.
Consumers have to explicitly adapt the new
partitioning
scheme
in
the
above scenario. The reason is that in these
cases
where
you
are

dependent

on a particular partitioning scheme, you also
have
other
topics

that
have

co-partition enforcements or the kind
-frequently.
Therefore
all

your
other

input topics might need to grow accordingly.
What I was suggesting was to streamline all
these
operations
as
best
as
possible to have "real" partition grow and
shrinkage
going
on.

Migrating

the producers to a new partitioning scheme can
be
much
more
streamlined
with proper broker support for this. Migrating
consumer
is
a
step

that
might be made completly unnecessary if - for
example
streams
-
takes

the

gcd as partitioning scheme instead of
enforcing 1
to
1.
Connect

consumers
and other consumers should be fine anyways.
I hope this makes more clear where I was aiming
at.
The
rest
needs

to
be

figured out. The only danger i see is that when
we
are
introducing

this
feature as supposed in the KIP, it wont help any
people
depending
on

log
compaction.

The other thing I wanted to mention is that I
believe
the
current

suggestion (without copying data over) can be
implemented
in
pure

userland
with a custom partitioner and a small
feedbackloop
from
ProduceResponse
=>
Partitionier in coorporation with a change
management
system.
Best Jan








On 28.02.2018 07:13, Dong Lin wrote:

Hey Jan,

I am not sure if it is acceptable for producer
to
be
stopped
for a

while,

particularly for online application which
requires
low
latency. I
am
also
not sure how consumers can switch to a new
topic.
Does
user
application

needs to explicitly specify a different topic
for
producer/consumer
to

subscribe to? It will be helpful for discussion
if
you
can
provide

more
detail on the interface change for this
solution.
Thanks,
Dong

On Mon, Feb 26, 2018 at 12:48 AM, Jan
Filipiak<Jan.Filipiak@trivago.

com

wrote:

Hi,
just want to throw my though in. In general the
functionality
is

very
usefull, we should though not try to find the
architecture
to

hard
while
implementing.

The manual steps would be to

create a new topic
the mirrormake from the new old topic to the
new
topic
wait for mirror making to catch up.
then put the consumers onto the new topic
            (having mirrormaker spit out a
mapping
from
old
offsets to
new
offsets:
                if topic is increased by
factor
X
there
is
gonna
be a
clean
mapping from 1 offset in the old topic to X
offsets
in
the
new
topic,
                if there is no factor then
there
is
no
chance to
generate a
mapping that can be reasonable used for
continuing)
            make consumers stop at appropriate
points
and
continue
consumption
with offsets from the mapping.
have the producers stop for a minimal time.
wait for mirrormaker to finish
let producer produce with the new metadata.


Instead of implementing the approach suggest
in
the
KIP
which
will
leave
log compacted topic completely crumbled and
unusable.
I would much rather try to build
infrastructure
to
support
the
mentioned
above operations more smoothly.
Especially having producers stop and use
another
topic
is
difficult

and

it would be nice if one can trigger "invalid
metadata"
exceptions

for
them
and
if one could give topics aliases so that their
produces
with
the
old
topic
will arrive in the new topic.

The downsides are obvious I guess ( having
the
same
data
twice
for

the

transition period, but kafka tends to scale
well
with
datasize).

So
its a

nicer fit into the architecture.

I further want to argument that the
functionality
by
the
KIP
can
completely be implementing in "userland"
with a
custom
partitioner

that

handles the transition as needed. I would
appreciate
if
someone

could
point
out what a custom partitioner couldn't handle in
this
case?
With the above approach, shrinking a topic
becomes
the
same
steps.
Without
loosing keys in the discontinued partitions.

Would love to hear what everyone thinks.

Best Jan


















On 11.02.2018 00:35, Dong Lin wrote:

Hi all,

I have created KIP-253: Support in-order
message
delivery
with

partition

expansion. See
https://cwiki.apache.org/confl
uence/display/KAFKA/KIP-253%
3A+Support+in-order+message+de
livery+with+partition+expansio
n
.

This KIP provides a way to allow messages of
the
same
key
from
the
same
producer to be consumed in the same order
they
are
produced
even
if

we

expand partition of the topic.

Thanks,
Dong









--
-- Guozhang


Reply via email to