Thank you so much John Roesler! <3

Thank you for also seeing the core strengths of apache kafka!

We just cannot make sacrifices like this to the architecture for the benefits of streams.

Streams somehow got lost in the whole "Interactive Query" idea that is spreading like cancer across the project and is one of the fundamental breaks in the architecture
already. I am so so happy to see your feedback!

The discussion can be held even simpler than it currently is. I will try to catch the threads that started and point them to (maybe the first email in this thread) to outline that what I am asking for is not much.

The only two things in broker support I need is:
1. When a producer gets blocked in produces because his partition number is incorrect. (initial KIP) I wanted to be able to stop it until the copy making process is done. Same pattern as waiting for ISR / min replicas.
Fully optional of course, so only for log compaction topic.

2. A way to distinguish the topics, by name versionnumber epoch something really. For convenience different versions should not share a log segment so one does not need to include it in the format.

All the other things from my initial description on how to grow, can be dealt with in different JVMs.
(copy making MirrorMaker consumers)

I hope I can outline further how little this is I am asking for.

Thank you John for outlining the !!weirdness!!! that come from the current suggestion on producer side. On the consumption side they are by no way fewer. Its an ugly proposal fighting the own architecture and it can only show is ugly face in so many problems upfront. Noone has yet dared to ask about the race conditions that occur with having the same key running across two partitions. It opens the whole flower of "key widening / key opening" that is discussed in streams KIP-213 extensively! The partitionId esentially becomes part of the key for downstream processing and Noone will get that! soo horribly complicated for joins already. I cannot think of all the bugs that will come out if we have to apply
the carefulness of 1:n join in every basic operation.

I cannot thank you more John! Your mail gave purpose to all my mailing list efforts. Thank you again so much

<3 Jan



On 27.03.2018 20:34, John Roesler wrote:
Hey Dong and Jun,

Thanks for the thoughtful responses. If you don't mind, I'll mix my replies
together to try for a coherent response. I'm not too familiar with
mailing-list etiquette, though.

I'm going to keep numbering my points because it makes it easy for you all
to respond.

Point 1:
As I read it, KIP-253 is *just* about properly fencing the producers and
consumers so that you preserve the correct ordering of records during
partition expansion. This is clearly necessary regardless of anything else
we discuss. I think this whole discussion about backfill, consumers,
streams, etc., is beyond the scope of KIP-253. But it would be cumbersome
to start a new thread at this point.

I had missed KIP-253's Proposed Change #9 among all the details... I think
this is a nice addition to the proposal. One thought is that it's actually
irrelevant whether the hash function is linear. This is simply an algorithm
for moving a key from one partition to another, so the type of hash
function need not be a precondition. In fact, it also doesn't matter
whether the topic is compacted or not, the algorithm works regardless.

I think this is a good algorithm to keep in mind, as it might solve a
variety of problems, but it does have a downside: that the producer won't
know whether or not K1 was actually in P1, it just knows that K1 was in
P1's keyspace before the new epoch. Therefore, it will have to
pessimistically send (K1,null) to P1 just in case. But the next time K1
comes along, the producer *also* won't remember that it already retracted
K1 from P1, so it will have to send (K1,null) *again*. By extension, every
time the producer sends to P2, it will also have to send a tombstone to P1,
which is a pretty big burden. To make the situation worse, if there is a
second split, say P2 becomes P2 and P3, then any key Kx belonging to P3
will also have to be retracted from P2 *and* P1, since the producer can't
know whether Kx had been last written to P2 or P1. Over a long period of
time, this clearly becomes a issue, as the producer must send an arbitrary
number of retractions along with every update.

In contrast, the proposed backfill operation has an end, and after it ends,
everyone can afford to forget that there ever was a different partition
layout.

Really, though, figuring out how to split compacted topics is beyond the
scope of KIP-253, so I'm not sure #9 really even needs to be in this KIP...
We do need in-order delivery during partition expansion. It would be fine
by me to say that you *cannot* expand partitions of a log-compacted topic
and call it a day. I think it would be better to tackle that in another KIP.


Point 2:
Regarding whether the consumer re-shuffles its inputs, this is always on
the table; any consumer who wants to re-shuffle its input is free to do so.
But this is currently not required. It's just that the current high-level
story with Kafka encourages the use of partitions as a unit of concurrency.
As long as consumers are single-threaded, they can happily consume a single
partition without concurrency control of any kind. This is a key aspect to
this system that lets folks design high-throughput systems on top of it
surprisingly easily. If all consumers were instead encouraged/required to
implement a repartition of their own, then the consumer becomes
significantly more complex, requiring either the consumer to first produce
to its own intermediate repartition topic or to ensure that consumer
threads have a reliable, high-bandwith channel of communication with every
other consumer thread.

Either of those tradeoffs may be reasonable for a particular user of Kafka,
but I don't know if we're in a position to say that they are reasonable for
*every* user of Kafka.


Point 3:
Regarding Jun's point about this use case, "(3) stateful and maintaining the
states in a local store", I agree that they may use a framework *like*
Kafka Streams, but that is not the same as using Kafka Streams. This is why
I think it's better to solve it in Core: because it is then solved for
KStreams and also for everything else that facilitates local state
maintenance. To me, Streams is a member of the category of "stream
processing frameworks", which is itself a subcategory of "things requiring
local state maintenence". I'm not sure if it makes sense to assert that
Streams is a sufficient and practical replacement for everything in "things
requiring local state maintenence".

But, yes, I do agree that per-key ordering is an absolute requirement,
therefore I think that KIP-253 itself is a necessary step. Regarding the
coupling of the state store partitioning to the topic partitioning, yes,
this is an issue we are discussing solutions to right now. We may go ahead
and introduce an overpartition layer on our inputs to solve it, but then
again, if we get the ability to split partitions with backfill, we may not
need to!


Point 4:
On this:

Regarding thought 2: If we don't care about the stream use-case, then the
current KIP probably has already addressed problem without requiring
consumer to know the partition function. If we care about the stream
use-case, we already need coordination across producers of different
topics, i.e. the same partition function needs to be used by producers of
topics A and B in order to join topics A and B. Thus, it might be
reasonable to extend coordination a bit and say we need coordination across
clients (i.e. producer and consumer), such that consumer knows the
partition function used by producer. If we do so, then we can let consumer
re-copy data for the change log topic using the same partition function as
producer. This approach has lower overhead as compared to having producer
re-copy data of the input topic.
Also, producer currently does not need to know the data already produced to
the topic. If we let producer split/merge partition, it would require
producer to consume the existing data, which intuitively is the task of
consumer.

I think we do care about use cases *like* Streams, I just don't think we
should rely on Streams to implement a feature of Core like partition
expansion.

Note, though, that we (Streams) do not require coordination across
producers. If two topics are certified to be co-partitioned, then Streams
apps can make use of that knowledge to optimize their topology (skipping a
repartition). But if they don't know whether they are co-partitioned, then
they'd better go ahead and repartition within the topology. This is the
current state.

A huge selling point of Kafka is enabling different parts of loosely
coupled organizations to produce and consume data independently. Some
coordination between producers and consumers is necessary, like
coordinating on the names of topics and their schemas. But Kafka's value
proposition w.r.t. ESBs, etc. is inversely proportional to the amount of
coordination required. I think it behooves us to be extremely skeptical
about introducing any coordination beyond correctness protocols.

Asking producers and consumers, or even two different producers, to share
code like the partition function is a pretty huge ask. What if they are
using different languages?

Comparing organizational overhead vs computational overhead, there are
maybe two orders of magnitude difference between them. In other words, I
would happily take on the (linear) overhead of having the producer re-copy
the data once during a re-partition in order to save the organizational
overhead of tying all the producers and consumers together across multiple
boundaries.

On that last paragraph: note that the producer *did* know the data it
already produced. It handled it the first time around. Asking it to
re-produce it into a new partition layout is squarely within its scope of
capabilities. Contrast this with the alternative, asking the consumer to
re-partition the data. I think this is even less intuitive, when the
partition function belongs to the producer.


Point 5:
Dong asked this:

For stream use-case that needs to increase consumer number, the
existing consumer can backfill the existing data in the change log topic to
the same change log topic with the new partition number, before the new set
of consumers bootstrap state from the new partitions of the change log
topic, right?

In this sense, the "consumer" is actually the producer of the changelog
topic, so if we support partition expansion + backfill as a producer/broker
operation, then it would be very straightforward for Streams to split a
state store. As you say, they would simply instruct the broker to split the
changelog topic's partitions, then backfill. Once the backfill is ready,
they can create a new crop of StandbyTasks to bootstrap the more granular
state stores and finally switch over to them when they are ready.

But this actually seems to be an argument in favor of split+backfill, so
maybe I missed the point.

You also asked me to explain why copying the "input" topic is better than
copying the "changelog" topic. I think they are totally independent,
actually. For one thing, you can't depend on the existence of a "changelog"
topic in general, only within Streams, but Kafka's user base clearly
exceeds Streams's user base. Plus, you actually also can't depend on the
existence of a changelog topic within Streams, since that is an optional
feature of *some* state store implementations. Even in the situation where
you do have a changelog topic in Streams, there may be use cases where it
makes sense to expand the partitions of just the input, or just the
changelog.

The ask for a Core feature of split+backfill is really about supporting the
use case of splitting partitions in log-compacted topics, regardless of
whether that topic is an "input" or a "changelog" or anything else for that
matter.


Point 6:
On the concern about the performance overhead of copying data between the
brokers, I think it's actually a bit overestimated. Splitting a topic's
partition is probably rare, certainly rarer in general than bootstrapping
new consumers on that topic. If "bootstrapping new consumers" means that
they have to re-shuffle the data before they consume it, then you wind up
copying the same record multiple times:

(broker: input topic) -> (initial consumer) -> (broker: repartition topic)
-> (real consumer)

That's 3x, and it's also 3x for every new record after the split as well,
since you don't get to stop repartitioning/reshuffling once you start.

Whereas if you do a backfill in something like the procedure I outlined,
you only copy the prefix of the partition before the split, and you send it
once to the producer and then once to the new generation partition. Plus,
assuming we're splitting the partition for the benefit of consumers,
there's no reason we can't co-locate the post-split partitions on the same
host as the pre-split partition, making the second copy a local filesystem
operation.

Even if you follow these two copies up with bootstrapping a new consumer,
it's still rare for this to occur, so you get to amortize these copies over
the lifetime of the topic, whereas a reshuffle just keeps making copies for
every new event.

And finally, I really do think that regardless of any performance concerns
about this operation, if it preserves loose organizational coupling, it is
certainly worth it.


In conclusion:
It might actually be a good idea for us to clarify the scope of KIP-253. If
we're all agreed that it's a good algorithm for allowing in-order message
delivery during partition expansion, then we can continue this discussion
as a new KIP, something like "backfill with partition expansion". This
would let Dong proceed with KIP-253. On the other hand, if it seems like
this conversation may alter the design of KIP-253, then maybe we *should*
just finish working it out.

For my part, my only concern about KIP-253 is the one I raised earlier.

Thanks again, all, for considering these points,
-John


On Tue, Mar 27, 2018 at 2:10 AM, Dong Lin <lindon...@gmail.com> wrote:

On Tue, Mar 27, 2018 at 12:04 AM, Dong Lin <lindon...@gmail.com> wrote:

Hey Jan,

Thanks for the enthusiasm in improving Kafka's design. Now that I have
read through your discussion with Jun, here are my thoughts:

- The latest proposal should with log compacted topics by properly
deleting old messages after a new message with the same key is produced.
So
it is probably not a concern anymore. Could you comment if there is still
issue?

- I wrote the SEP-5 and I am pretty familiar with the motivation and the
design of SEP-5. SEP-5 is probably orthornal to the motivation of this
KIP.
The goal of SEP-5 is to allow user to increase task number of an existing
Samza job. But if we increase the partition number of input topics,
messages may still be consumed out-of-order by tasks in Samza which cause
incorrect result. Similarly, the approach you proposed does not seem to
ensure that the messages can be delivered in order, even if we can make
sure that each consumer instance is assigned the set of new partitions
covering the same set of keys.

Let me correct this comment. The approach of copying data to a new topic
can ensure in-order message delivery suppose we properly migrate offsets
from old topic to new topic.


- I am trying to understand why it is better to copy the data instead of
copying the change log topic for streaming use-case. For core Kafka
use-case, and for the stream use-case that does not need to increase
consumers, the current KIP already supports in-order delivery without the
overhead of copying the data. For stream use-case that needs to increase
consumer number, the existing consumer can backfill the existing data in
the change log topic to the same change log topic with the new partition
number, before the new set of consumers bootstrap state from the new
partitions of the change log topic. If this solution works, then could
you
summarize the advantage of copying the data of input topic as compared to
copying the change log topic? For example, does it enable more use-case,
simplify the implementation of Kafka library, or reduce the operation
overhead etc?

Thanks,
Dong


On Wed, Mar 21, 2018 at 6:57 AM, Jan Filipiak <jan.filip...@trivago.com>
wrote:

Hi Jun,

I was really seeing progress in our conversation but your latest reply
is
just devastating.
I though we were getting close being on the same page now it feels like
we are in different libraries.

I just quickly slam my answers in here. If they are to brief I am sorry
give me a ping and try to go into details more.
Just want to show that your pro/cons listing is broken.

Best Jan

and want to get rid of this horrible compromise


On 19.03.2018 05:48, Jun Rao wrote:

Hi, Jan,

Thanks for the discussion. Great points.

Let me try to summarize the approach that you are proposing. On the
broker
side, we reshuffle the existing data in a topic from current partitions
to
the new partitions. Once the reshuffle fully catches up, switch the
consumers to start consuming from the new partitions. If a consumer
needs
to rebuild its local state (due to partition changes), let the consumer
rebuild its state by reading all existing data from the new partitions.
Once all consumers have switches over, cut over the producer to the new
partitions.

The pros for this approach are that :
1. There is just one way to rebuild the local state, which is simpler.

true thanks

The cons for this approach are:
1. Need to copy existing data.

Very unfair and not correct. It does not require you to copy over
existing data. It _allows_ you to copy all existing data.

2. The cutover of the producer is a bit complicated since it needs to
coordinate with all consumer groups.

Also not true. I explicitly tried to make clear that there is only one
special consumer (in the case of actually copying data) coordination is
required.

3. The rebuilding of the state in the consumer is from the input topic,
which can be more expensive than rebuilding from the existing state.

true, but rebuilding state is only required if you want to increase
processing power, so we assume this is at hand.

4. The broker potentially has to know the partitioning function. If
this
needs to be customized at the topic level, it can be a bit messy.

I would argue against having the operation being performed by the
broker.
This was not discussed yet but if you see my original email i suggested
otherwise from the beginning.

Here is an alternative approach by applying your idea not in the
broker,
but in the consumer. When new partitions are added, we don't move
existing
data. In KStreams, we first reshuffle the new input data to a new topic
T1
with the old number of partitions and feed T1's data to the rest of the
pipeline. In the meantime, KStreams reshuffles all existing data of the
change capture topic to another topic C1 with the new number of
partitions.
We can then build the state of the new tasks from C1. Once the new
states
have been fully built, we can cut over the consumption to the input
topic
and delete T1. This approach works with compacted topic too. If an
application reads from the beginning of a compacted topic, the consumer
will reshuffle the portion of the input when the number of partitions
doesn't match the number of tasks.

We all wipe this idea from our heads instantly. Mixing Ideas from an
argument is not a resolution strategy
just leads to horrible horrible software.


The pros of this approach are:
1. No need to copy existing data.
2. Each consumer group can cut over to the new partitions
independently.
3. The state is rebuilt from the change capture topic, which is cheaper
than rebuilding from the input topic.
4. Only the KStreams job needs to know the partitioning function.

The cons of this approach are:
1. Potentially the same input topic needs to be reshuffled more than
once
in different consumer groups during the transition phase.

What do you think?

Thanks,

Jun



On Thu, Mar 15, 2018 at 1:04 AM, Jan Filipiak <
jan.filip...@trivago.com>
wrote:

Hi Jun,
thank you for following me on these thoughts. It was important to me
to
feel that kind of understanding for my arguments.

What I was hoping for (I mentioned this earlier) is that we can model
the
case where we do not want to copy the data the exact same way as the
case
when we do copy the data. Maybe you can peek into the mails before to
see
more details for this.

This means we have the same mechanism to transfer consumer groups to
switch topic. The offset mapping that would be generated would even be
simpler End Offset of the Old topic => offset 0 off all the partitions
of
the new topic. Then we could model the transition of a non-copy
expansion
the exact same way as a copy-expansion.

I know this only works when topic growth by a factor. But the benefits
of
only growing by a factor are to strong anyways. See Clemens's hint and
remember that state reshuffling is entirely not needed if one doesn't
want
to grow processing power.

I think these benefits should be clear, and that there is basically no
downside to what is currently at hand but just makes everything easy.

One thing you need to know is. that if you do not offer rebuilding a
log
compacted topic like i suggest that even if you have consumer state
reshuffling. The topic is broken and can not be used to bootstrap new
consumers. They don't know if they need to apply a key from and old
partition or not. This is a horrible downside I haven't seen a
solution
for
in the email conversation.

I argue to:

Only grow topic by a factor always.
Have the "no copy consumer" transition as the trivial case of the
"copy
consumer transition".
If processors needs to be scaled, let them rebuild from the new topic
and
leave the old running in the mean time.
Do not implement key shuffling in streams.

I hope I can convince you especially with the fact how I want to
handle
consumer transition. I think
you didn't quite understood me there before. I think the term "new
topic"
intimidated you a little.
How we solve this on disc doesn't really matter, If the data goes into
the
same Dir or a different Dir or anything. I do think that it needs to
involve at least rolling a new segment for the existing partitions.
But most of the transitions should work without restarting consumers.
(newer consumers with support for this). But with new topic i just
meant
the topic that now has a different partition count. Plenty of ways to
handle that (versions, aliases)

Hope I can further get my idea across.

Best Jan






On 14.03.2018 02:45, Jun Rao wrote:

Hi, Jan,
Thanks for sharing your view.

I agree with you that recopying the data potentially makes the state
management easier since the consumer can just rebuild its state from
scratch (i.e., no need for state reshuffling).

On the flip slide, I saw a few disadvantages of the approach that you
suggested. (1) Building the state from the input topic from scratch
is
in
general less efficient than state reshuffling. Let's say one
computes a
count per key from an input topic. The former requires reading all
existing
records in the input topic whereas the latter only requires reading
data
proportional to the number of unique keys. (2) The switching of the
topic
needs modification to the application. If there are many applications
on a
topic, coordinating such an effort may not be easy. Also, it's not
clear
how to enforce exactly-once semantic during the switch. (3) If a
topic
doesn't need any state management, recopying the data seems wasteful.
In
that case, in place partition expansion seems more desirable.

I understand your concern about adding complexity in KStreams. But,
perhaps
we could iterate on that a bit more to see if it can be simplified.

Jun


On Mon, Mar 12, 2018 at 11:21 PM, Jan Filipiak <
jan.filip...@trivago.com>
wrote:

Hi Jun,

I will focus on point 61 as I think its _the_ fundamental part that
I
cant
get across at the moment.

Kafka is the platform to have state materialized multiple times from
one
input. I emphasize this: It is the building block in architectures
that
allow you to
have your state maintained multiple times. You put a message in
once,
and
you have it pop out as often as you like. I believe you understand
this.

Now! The path of thinking goes the following: I am using apache
kafka
and
I _want_ my state multiple times. What am I going todo?

A) Am I going to take my state that I build up, plunge some sort of
RPC
layer ontop of it, use that RPC layer to throw my records across
instances?
B) Am I just going to read the damn message twice?

Approach A is fundamentally flawed and a violation of all that is
good
and
holy in kafka deployments. I can not understand how this Idea can
come in
the first place.
(I do understand: IQ in streams, they polluted the kafka streams
codebase
really bad already. It is not funny! I think they are equally flawed
as
A)

I say, we do what Kafka is good at. We repartition the topic once.
We
switch the consumers.
(Those that need more partitions are going to rebuild their state in
multiple partitions by reading the new topic, those that don't just
assign
the new partitions properly)
We switch producers. Done!

The best thing! It is trivial, hipster stream processor will have an
easy
time with that aswell. Its so super simple. And simple IS good!
It is what kafka was build todo. It is how we do it today. All I am
saying
is that a little broker help doing the producer swap is super
useful.
For everyone interested in why kafka is so powerful with approach B,
please watch https://youtu.be/bEbeZPVo98c?t=1633
I already looked up a good point in time, I think after 5 minutes
the
"state" topic is handled and you should be able to understand me
and inch better.

Please do not do A to the project, it deserves better!

Best Jan



On 13.03.2018 02:40, Jun Rao wrote:

Hi, Jan,

Thanks for the reply. A few more comments below.

50. Ok, we can think a bit harder for supporting compacted topics.

51. This is a fundamental design question. In the more common case,
the
reason why someone wants to increase the number of partitions is
that
the
consumer application is slow and one wants to run more consumer
instances
to increase the degree of parallelism. So, fixing the number of
running
consumer instances when expanding the partitions won't help this
case.
If
we do need to increase the number of consumer instances, we need to
somehow
reshuffle the state of the consumer across instances. What we have
been
discussing in this KIP is whether we can do this more effectively
through
the KStream library (e.g. through a 2-phase partition expansion).
This
will
add some complexity, but it's probably better than everyone doing
this
in
the application space. The recopying approach that you mentioned
doesn't
seem to address the consumer state management issue when the
consumer
switches from an old to a new topic.

52. As for your example, it depends on whether the join key is the
same
between (A,B) and (B,C). If the join key is the same, we can do a
2-phase
partition expansion of A, B, and C together. If the join keys are
different, one would need to repartition the data on a different
key
for
the second join, then the partition expansion can be done
independently
between (A,B) and (B,C).

53. If you always fix the number of consumer instances, we you
described
works. However, as I mentioned in #51, I am not sure how your
proposal
deals with consumer states when the number of consumer instances
grows.
Also, it just seems that it's better to avoid re-copying the
existing
data.

60. "just want to throw in my question from the longer email in the
other
Thread here. How will the bloom filter help a new consumer to
decide
to
apply the key or not?" Not sure that I fully understood your
question.
The
consumer just reads whatever key is in the log. The bloom filter
just
helps
clean up the old keys.

61. "Why can we afford having a topic where its apparently not
possible
to
start a new application on? I think this is an overall flaw of the
discussed idea here. Not playing attention to the overall
architecture."
Could you explain a bit more when one can't start a new
application?
Jun



On Sat, Mar 10, 2018 at 1:40 AM, Jan Filipiak <
jan.filip...@trivago.com
wrote:

Hi Jun, thanks for your mail.

Thank you for your questions!
I think they are really good and tackle the core of the problem I
see.

I will answer inline, mostly but still want to set the tone here.

The core strength of kafka is what Martin once called the
kappa-Architecture. How does this work?
You have everything as a log as in kafka. When you need to change
something.
You create the new version of your application and leave it
running
in
parallel.
Once the new version is good you switch your users to use the new
application.

The online reshuffling effectively breaks this architecture and I
think
the switch in thinking here is more harmful
than any details about the partitioning function to allow such a
change.
I
feel with my suggestion we are the closest to
the original and battle proven architecture and I can only warn to
move
away from it.

I might have forgotten something, sometimes its hard for me to
getting
all
the thoughts captured in a mail, but I hope the comments inline
will
further make my concern clear, and put some emphasis on why I
prefer my
solution ;)

One thing we should all be aware of when discussing this, and I
think
Dong
should have mentioned it (maybe he did).
We are not discussing all of this out of thin air but there is an
effort
in the Samza project.

https://cwiki.apache.org/confluence/display/SAMZA/SEP-5%3A+
Enable+partition+expansion+of+input+streams
https://issues.apache.org/jira/browse/SAMZA-1293

To be clear. I think SEP-5 (state of last week, dont know if it
adapted
to
this discussion) is on a way better path than KIP-253, and I can't
really
explain why.

Best Jan,

nice weekend everyone


On 09.03.2018 03:36, Jun Rao wrote:

Hi, Jan,

Thanks for the feedback. Just some comments on the earlier points
that
you
mentioned.

50. You brought up the question of whether existing data needs to
be
copied
during partition expansion. My understand of your view is that
avoid
copying existing data will be more efficient, but it doesn't work
well
with
compacted topics since some keys in the original partitions will
never
be
cleaned. It would be useful to understand your use case of
compacted
topics
a bit more. In the common use case, the data volume in a
compacted
topic
may not be large. So, I am not sure if there is a strong need to
expand
partitions in a compacted topic, at least initially.

I do agree. State is usually smaller. Update rates might be also

competitively high.
Doing Log-compaction (even beeing very efficient and configurable)
is
also
a more expensive operation than
just discarding old segments. Further if you want to use more
consumers
processing the events
you also have to grow the number of partitions. Especially for
use-cases
we do (KIP-213) a tiny state full
table might be very expensive to process if it joins against a
huge
table.

I can just say we have been in the spot of needing to grow log
compacted
topics. Mainly for processing power we can bring to the table.

Further i am not at all concerned about the extra spaced used by
"garbage
keys". I am more concerned about the correctness of innocent
consumers.
The
logic becomes complicated. Say for streams one would need to load
the
record into state but not forward it the topology ( to have it
available
for shuffeling). I rather have it simple and a topic clean
regardless
if
it
still has its old partition count. Especially with multiple
partitions
growth's I think it becomes insanely hard to to this shuffle
correct.
Maybe
Streams and Samza can do it. Especially if you do "hipster stream
processing" <https://www.confluent.io/blog
/introducing-kafka-streams-
stream-processing-made-simple/>. This makes kafka way to
complicated.
With my approach I think its way simpler because the topic has no
"history"
in terms of partitioning but is always clean.


51. "Growing the topic by an integer factor does not require any
state

redistribution at all." Could you clarify this a bit more? Let's
say
you
have a consumer app that computes the windowed count per key. If
you
double
the number of partitions from 1 to 2 and grow the consumer
instances
from
1
to 2, we would need to redistribute some of the counts to the new
consumer
instance. Regarding to linear hashing, it's true that it won't
solve
the
problem with compacted topics. The main benefit is that it
redistributes
the keys in one partition to no more than two partitions, which
could
help
redistribute the state.

You don't need to spin up a new consumer in this case. every
consumer

would just read every partition with the (partition % num_task)
task.
it
will still have the previous data right there and can go on.

This sounds contradictory to what I said before, but please bear
with
me.

52. Good point on coordinating the expansion of 2 topics that need
to
be

joined together. This is where the 2-phase partition expansion
could
potentially help. In the first phase, we could add new partitions
to
the 2
topics one at a time but without publishing to the new patitions.
Then,
we
can add new consumer instances to pick up the new partitions. In
this
transition phase, no reshuffling is needed since no data is
coming
from
the
new partitions. Finally, we can enable the publishing to the new
partitions.

I think its even worse than you think. I would like to introduce
the

Term
transitive copartitioning. Imagine
2 streams application. One joins (A,B) the other (B,C) then there
is a
transitive copartition requirement for
(A,C) to be copartitioned aswell. This can spread significantly
and
require many consumers to adapt at the same time.

It is also not entirely clear to me how you not need reshuffling
in
this
case. If A has a record that never gets updated after the
expansion
and
the
coresponding B record moves to a new partition. How shall they
meet
w/o
shuffle?

53. "Migrating consumer is a step that might be made completly

unnecessary
if - for example streams - takes the gcd as partitioning scheme
instead
of
enforcing 1 to 1." Not sure that I fully understand this. I think
you
mean
that a consumer application can run more instances than the
number
of
partitions. In that case, the consumer can just repartitioning
the
input
data according to the number of instances. This is possible, but
just
has
the overhead of reshuffling the data.

No what I meant is ( that is also your question i think Mathias)
that
if

you grow a topic by a factor.
Even if your processor is statefull you can can just assign all
the
multiples of the previous partition to
this consumer and the state to keep processing correctly will be
present
w/o any shuffling.

Say you have an assignment
Statefull consumer => partition
0 => 0
1 => 1
2 => 2

and you grow you topic by 4 you get,

0 => 0,3,6,9
1 => 1,4,7,10
2 => 2,5,8,11

Say your hashcode is 8. 8%3 => 2  before so consumer for
partition 2
has
it.
Now you you have 12 partitions so 8%12 => 8, so it goes into
partition
8
which is assigned to the same consumer
who had 2 before and therefore knows the key.

Userland reshuffeling is there as an options. And it does exactly
what
I
suggest. And I think its the perfect strategie. All I am
suggestion
is
broker side support to switch the producers to the newly
partitioned
topic.
Then the old (to few partition topic) can go away.  Remember the
list
of
steps in the beginning of this thread. If one has broker support
for
all
where its required and streams support for those that aren’t
necessarily.
Then one has solved the problem.
I repeat it because I think its important. I am really happy that
you
brought that up! because its 100% what I want just with the
differences
to
have an option to discard the to small topic later (after all
consumers
adapted). And to have order correct there. I need broker support
managing
the copy process + the produces and fence them against each
other. I
also
repeat. the copy process can run for weeks in the worst case.
Copying
the
data is not the longest task migrating consumers might very well
be.
Once all consumers switched and copying is really up to date
(think
ISR
like up to date) only then we stop the producer, wait for the copy
to
finish and use the new topic for producing.

After this the topic is perfect in shape. and no one needs to
worry
about
complicated stuff. (old keys hanging around might arrive in some
other
topic later.....). can only imagine how many tricky bugs gonna
arrive
after
someone had grown and shrunken is topic 10 times.






54. "The other thing I wanted to mention is that I believe the
current

suggestion (without copying data over) can be implemented in pure
userland
with a custom partitioner and a small feedbackloop from
ProduceResponse
=>
Partitionier in coorporation with a change management system." I
am
not
sure a customized partitioner itself solves the problem. We
probably
need
some broker side support to enforce when the new partitions can
be
used.
We
also need some support on the consumer/kstream side to preserve
the
per
key
ordering and potentially migrate the processing state. This is
not
trivial
and I am not sure if it's ideal to fully push to the application
space.

Broker support is defenitly the preferred way here. I have
nothing
against
broker support.
I tried to say that for what I would preffer - copying the data
over,
at
least for log compacted topics -
I would require more broker support than the KIP currently offers.



Jun

On Tue, Mar 6, 2018 at 10:33 PM, Jan Filipiak <
jan.filip...@trivago.com
wrote:

Hi Dong,

are you actually reading my emails, or are you just using the
thread I

started for general announcements regarding the KIP?

I tried to argue really hard against linear hashing. Growing the
topic
by
an integer factor does not require any state redistribution at
all. I
fail
to see completely where linear hashing helps on log compacted
topics.

If you are not willing to explain to me what I might be
overlooking:
that
is fine.
But I ask you to not reply to my emails then. Please understand
my
frustration with this.

Best Jan



On 06.03.2018 19:38, Dong Lin wrote:

Hi everyone,

Thanks for all the comments! It appears that everyone prefers
linear

hashing because it reduces the amount of state that needs to be
moved
between consumers (for stream processing). The KIP has been
updated
to
use
linear hashing.

Regarding the migration endeavor: it seems that migrating
producer
library
to use linear hashing should be pretty straightforward without
much operational endeavor. If we don't upgrade client library
to
use
this
KIP, we can not support in-order delivery after partition is
changed
anyway. Suppose we upgrade client library to use this KIP, if
partition
number is not changed, the key -> partition mapping will be
exactly
the
same as it is now because it is still determined using
murmur_hash(key)
%
original_partition_num. In other words, this change is backward
compatible.

Regarding the load distribution: if we use linear hashing, the
load
may
be
unevenly distributed because those partitions which are not
split
may
receive twice as much traffic as other partitions that are
split.
This
issue can be mitigated by creating topic with partitions that
are
several
times the number of consumers. And there will be no imbalance
if
the
partition number is always doubled. So this imbalance seems
acceptable.

Regarding storing the partition strategy as per-topic config:
It
seems
not
necessary since we can still use murmur_hash as the default
hash
function
and additionally apply the linear hashing algorithm if the
partition
number
has increased. Not sure if there is any use-case for producer
to
use a
different hash function. Jason, can you check if there is some
use-case
that I missed for using the per-topic partition strategy?

Regarding how to reduce latency (due to state store/load) in
stream
processing consumer when partition number changes: I need to
read
the
Kafka
Stream code to understand how Kafka Stream currently migrate
state
between
consumers when the application is added/removed for a given
job.
I
will
reply after I finish reading the documentation and code.


Thanks,
Dong


On Mon, Mar 5, 2018 at 10:43 AM, Jason Gustafson <
ja...@confluent.io>
wrote:

Great discussion. I think I'm wondering whether we can continue
to
leave

Kafka agnostic to the partitioning strategy. The challenge is

communicating
the partitioning logic from producers to consumers so that the
dependencies
between each epoch can be determined. For the sake of
discussion,
imagine
you did something like the following:

1. The name (and perhaps version) of a partitioning strategy
is
stored
in
topic configuration when a topic is created.
2. The producer looks up the partitioning strategy before
writing
to
a
topic and includes it in the produce request (for fencing). If
it
doesn't
have an implementation for the configured strategy, it fails.
3. The consumer also looks up the partitioning strategy and
uses it
to
determine dependencies when reading a new epoch. It could
either
fail
or
make the most conservative dependency assumptions if it
doesn't
know
how
to
implement the partitioning strategy. For the consumer, the new
interface
might look something like this:

// Return the partition dependencies following an epoch bump
Map<Integer, List<Integer>> dependencies(int
numPartitionsBeforeEpochBump,
int numPartitionsAfterEpochBump)

The unordered case then is just a particular implementation
which
never
has
any epoch dependencies. To implement this, we would need some
way
for
the
consumer to find out how many partitions there were in each
epoch,
but
maybe that's not too unreasonable.

Thanks,
Jason


On Mon, Mar 5, 2018 at 4:51 AM, Jan Filipiak <
jan.filip...@trivago.com
wrote:

Hi Dong

thank you very much for your questions.

regarding the time spend copying data across:
It is correct that copying data from a topic with one
partition
mapping

to

a topic with a different partition mapping takes way longer
than

we
can

stop producers. Tens of minutes is a very optimistic estimate
here.

Many
people can not afford copy full steam and therefore will have
some
rate
limiting in place, this can bump the timespan into the day's.
The
good

part

is that the vast majority of the data can be copied while the

producers

are

still going. One can then, piggyback the consumers ontop of
this

timeframe,

by the method mentioned (provide them an mapping from their
old
offsets

to

new offsets in their repartitioned topics. In that way we
separate

migration of consumers from migration of producers (decoupling

these
is
what kafka is strongest at). The time to actually swap over
the
producers
should be kept minimal by ensuring that when a swap attempt
is
started

the

consumer copying over should be very close to the log end and
is

expected

to finish within the next fetch. The operation should have a
time-out
and
should be "reattemtable".

Importance of logcompaction:
If a producer produces key A, to partiton 0, its forever
gonna
be
there,
unless it gets deleted. The record might sit in there for
years. A
new
producer started with the new partitions will fail to delete
the
record

in

the correct partition. Th record will be there forever and
one
can

not

reliable bootstrap new consumers. I cannot see how linear
hashing

can

solve

this.

Regarding your skipping of userland copying:

100%, copying the data across in userland is, as far as i can
see,
only
a
usecase for log compacted topics. Even for logcompaction +
retentions
it
should only be opt-in. Why did I bring it up? I think log
compaction
is
a
very important feature to really embrace kafka as a "data
plattform".
The
point I also want to make is that copying data this way is
completely
inline with the kafka architecture. it only consists of
reading
and

writing

to topics.

I hope it clarifies more why I think we should aim for more
than
the
current KIP. I fear that once the KIP is done not much more
effort
will

be

taken.

On 04.03.2018 02:28, Dong Lin wrote:
Hey Jan,

In the current proposal, the consumer will be blocked on
waiting
for

other
consumers of the group to consume up to a given offset. In
most

cases,
all
consumers should be close to the LEO of the partitions when
the
partition
expansion happens. Thus the time waiting should not be long
e.g.
on
the

order of seconds. On the other hand, it may take a long time
to
wait

for
the entire partition to be copied -- the amount of time is
proportional

to

the amount of existing data in the partition, which can take

tens of

minutes. So the amount of time that we stop consumers may not
be
on

the
same order of magnitude.

If we can implement this suggestion without copying data
over
in
purse
userland, it will be much more valuable. Do you have ideas
on
how
this

can

be done?

Not sure why the current KIP not help people who depend on
log
compaction.
Could you elaborate more on this point?

Thanks,

Dong
On Wed, Feb 28, 2018 at 10:55 PM, Jan
Filipiak<Jan.Filipiak@trivago.
com
wrote:

Hi Dong,

I tried to focus on what the steps are one can currently
perform
to

expand

or shrink a keyed topic while maintaining a top notch
semantics.
I can understand that there might be confusion about
"stopping
the
consumer". It is exactly the same as proposed in the KIP.
there
needs

to

be

a time the producers agree on the new partitioning. The
extra
semantics I
want to put in there is that we have a possibility to wait
until

all
the

existing data
is copied over into the new partitioning scheme. When I say
stopping

I
think more of having a memory barrier that ensures the
ordering. I
am
still
aming for latencies  on the scale of leader failovers.

Consumers have to explicitly adapt the new partitioning
scheme
in
the
above scenario. The reason is that in these cases where you
are

dependent

on a particular partitioning scheme, you also have other
topics

that
have

co-partition enforcements or the kind -frequently. Therefore
all

your
other

input topics might need to grow accordingly.
What I was suggesting was to streamline all these
operations
as
best
as
possible to have "real" partition grow and shrinkage going
on.

Migrating

the producers to a new partitioning scheme can be much more

streamlined
with proper broker support for this. Migrating consumer is a
step

that
might be made completly unnecessary if - for example
streams
-
takes

the

gcd as partitioning scheme instead of enforcing 1 to 1.
Connect

consumers
and other consumers should be fine anyways.
I hope this makes more clear where I was aiming at. The rest
needs

to
be

figured out. The only danger i see is that when we are

introducing

this
feature as supposed in the KIP, it wont help any people
depending
on

log
compaction.

The other thing I wanted to mention is that I believe the
current

suggestion (without copying data over) can be implemented in
pure

userland
with a custom partitioner and a small feedbackloop from
ProduceResponse
=>
Partitionier in coorporation with a change management
system.
Best Jan








On 28.02.2018 07:13, Dong Lin wrote:

Hey Jan,

I am not sure if it is acceptable for producer to be
stopped
for a

while,

particularly for online application which requires low
latency. I
am
also
not sure how consumers can switch to a new topic. Does
user
application

needs to explicitly specify a different topic for

producer/consumer
to

subscribe to? It will be helpful for discussion if you can
provide

more
detail on the interface change for this solution.

Thanks,
Dong

On Mon, Feb 26, 2018 at 12:48 AM, Jan
Filipiak<Jan.Filipiak@trivago.

com

wrote:

Hi,
just want to throw my though in. In general the
functionality
is

very
usefull, we should though not try to find the architecture
to

hard
while
implementing.

The manual steps would be to

create a new topic
the mirrormake from the new old topic to the new topic
wait for mirror making to catch up.
then put the consumers onto the new topic
            (having mirrormaker spit out a mapping from
old
offsets to
new
offsets:
                if topic is increased by factor X there is
gonna
be a
clean
mapping from 1 offset in the old topic to X offsets in
the
new
topic,
                if there is no factor then there is no
chance to
generate a
mapping that can be reasonable used for continuing)
            make consumers stop at appropriate points and
continue
consumption
with offsets from the mapping.
have the producers stop for a minimal time.
wait for mirrormaker to finish
let producer produce with the new metadata.


Instead of implementing the approach suggest in the KIP
which
will
leave
log compacted topic completely crumbled and unusable.
I would much rather try to build infrastructure to
support
the
mentioned
above operations more smoothly.
Especially having producers stop and use another topic is
difficult

and

it would be nice if one can trigger "invalid metadata"

exceptions

for
them
and
if one could give topics aliases so that their produces
with
the
old
topic
will arrive in the new topic.

The downsides are obvious I guess ( having the same data
twice
for

the

transition period, but kafka tends to scale well with

datasize).

So
its a

nicer fit into the architecture.

I further want to argument that the functionality by the
KIP
can
completely be implementing in "userland" with a custom
partitioner

that

handles the transition as needed. I would appreciate if

someone

could
point
out what a custom partitioner couldn't handle in this case?
With the above approach, shrinking a topic becomes the same
steps.
Without
loosing keys in the discontinued partitions.

Would love to hear what everyone thinks.

Best Jan


















On 11.02.2018 00:35, Dong Lin wrote:

Hi all,

I have created KIP-253: Support in-order message delivery
with

partition

expansion. See
https://cwiki.apache.org/confl
uence/display/KAFKA/KIP-253%
3A+Support+in-order+message+de
livery+with+partition+expansio
n
.

This KIP provides a way to allow messages of the same
key
from
the
same
producer to be consumed in the same order they are
produced
even
if

we

expand partition of the topic.

Thanks,
Dong








Reply via email to