Hi Jun,

thank you for following me on these thoughts. It was important to me to feel that kind of understanding for my arguments.

What I was hoping for (I mentioned this earlier) is that we can model the case where we do not want to copy the data the exact same way as the case when we do copy the data. Maybe you can peek into the mails before to see more details for this.

This means we have the same mechanism to transfer consumer groups to switch topic. The offset mapping that would be generated would even be simpler End Offset of the Old topic => offset 0 off all the partitions of the new topic. Then we could model the transition of a non-copy expansion the exact same way as a copy-expansion.

I know this only works when topic growth by a factor. But the benefits of only growing by a factor are to strong anyways. See Clemens's hint and remember that state reshuffling is entirely not needed if one doesn't want to grow processing power.

I think these benefits should be clear, and that there is basically no downside to what is currently at hand but just makes everything easy.

One thing you need to know is. that if you do not offer rebuilding a log compacted topic like i suggest that even if you have consumer state reshuffling. The topic is broken and can not be used to bootstrap new consumers. They don't know if they need to apply a key from and old partition or not. This is a horrible downside I haven't seen a solution for in the email conversation.

I argue to:

Only grow topic by a factor always.
Have the "no copy consumer" transition as the trivial case of the "copy consumer transition". If processors needs to be scaled, let them rebuild from the new topic and leave the old running in the mean time.
Do not implement key shuffling in streams.

I hope I can convince you especially with the fact how I want to handle consumer transition. I think you didn't quite understood me there before. I think the term "new topic" intimidated you a little. How we solve this on disc doesn't really matter, If the data goes into the same Dir or a different Dir or anything. I do think that it needs to involve at least rolling a new segment for the existing partitions. But most of the transitions should work without restarting consumers. (newer consumers with support for this). But with new topic i just meant the topic that now has a different partition count. Plenty of ways to handle that (versions, aliases)

Hope I can further get my idea across.

Best Jan





On 14.03.2018 02:45, Jun Rao wrote:
Hi, Jan,

Thanks for sharing your view.

I agree with you that recopying the data potentially makes the state
management easier since the consumer can just rebuild its state from
scratch (i.e., no need for state reshuffling).

On the flip slide, I saw a few disadvantages of the approach that you
suggested. (1) Building the state from the input topic from scratch is in
general less efficient than state reshuffling. Let's say one computes a
count per key from an input topic. The former requires reading all existing
records in the input topic whereas the latter only requires reading data
proportional to the number of unique keys. (2) The switching of the topic
needs modification to the application. If there are many applications on a
topic, coordinating such an effort may not be easy. Also, it's not clear
how to enforce exactly-once semantic during the switch. (3) If a topic
doesn't need any state management, recopying the data seems wasteful. In
that case, in place partition expansion seems more desirable.

I understand your concern about adding complexity in KStreams. But, perhaps
we could iterate on that a bit more to see if it can be simplified.

Jun


On Mon, Mar 12, 2018 at 11:21 PM, Jan Filipiak <jan.filip...@trivago.com>
wrote:

Hi Jun,

I will focus on point 61 as I think its _the_ fundamental part that I cant
get across at the moment.

Kafka is the platform to have state materialized multiple times from one
input. I emphasize this: It is the building block in architectures that
allow you to
have your state maintained multiple times. You put a message in once, and
you have it pop out as often as you like. I believe you understand this.

Now! The path of thinking goes the following: I am using apache kafka and
I _want_ my state multiple times. What am I going todo?

A) Am I going to take my state that I build up, plunge some sort of RPC
layer ontop of it, use that RPC layer to throw my records across instances?
B) Am I just going to read the damn message twice?

Approach A is fundamentally flawed and a violation of all that is good and
holy in kafka deployments. I can not understand how this Idea can come in
the first place.
(I do understand: IQ in streams, they polluted the kafka streams codebase
really bad already. It is not funny! I think they are equally flawed as A)

I say, we do what Kafka is good at. We repartition the topic once. We
switch the consumers.
(Those that need more partitions are going to rebuild their state in
multiple partitions by reading the new topic, those that don't just assign
the new partitions properly)
We switch producers. Done!

The best thing! It is trivial, hipster stream processor will have an easy
time with that aswell. Its so super simple. And simple IS good!
It is what kafka was build todo. It is how we do it today. All I am saying
is that a little broker help doing the producer swap is super useful.

For everyone interested in why kafka is so powerful with approach B,
please watch https://youtu.be/bEbeZPVo98c?t=1633
I already looked up a good point in time, I think after 5 minutes the
"state" topic is handled and you should be able to understand me
and inch better.

Please do not do A to the project, it deserves better!

Best Jan



On 13.03.2018 02:40, Jun Rao wrote:

Hi, Jan,

Thanks for the reply. A few more comments below.

50. Ok, we can think a bit harder for supporting compacted topics.

51. This is a fundamental design question. In the more common case, the
reason why someone wants to increase the number of partitions is that the
consumer application is slow and one wants to run more consumer instances
to increase the degree of parallelism. So, fixing the number of running
consumer instances when expanding the partitions won't help this case. If
we do need to increase the number of consumer instances, we need to
somehow
reshuffle the state of the consumer across instances. What we have been
discussing in this KIP is whether we can do this more effectively through
the KStream library (e.g. through a 2-phase partition expansion). This
will
add some complexity, but it's probably better than everyone doing this in
the application space. The recopying approach that you mentioned doesn't
seem to address the consumer state management issue when the consumer
switches from an old to a new topic.

52. As for your example, it depends on whether the join key is the same
between (A,B) and (B,C). If the join key is the same, we can do a 2-phase
partition expansion of A, B, and C together. If the join keys are
different, one would need to repartition the data on a different key for
the second join, then the partition expansion can be done independently
between (A,B) and (B,C).

53. If you always fix the number of consumer instances, we you described
works. However, as I mentioned in #51, I am not sure how your proposal
deals with consumer states when the number of consumer instances grows.
Also, it just seems that it's better to avoid re-copying the existing
data.

60. "just want to throw in my question from the longer email in the other
Thread here. How will the bloom filter help a new consumer to decide to
apply the key or not?" Not sure that I fully understood your question. The
consumer just reads whatever key is in the log. The bloom filter just
helps
clean up the old keys.

61. "Why can we afford having a topic where its apparently not possible to
start a new application on? I think this is an overall flaw of the
discussed idea here. Not playing attention to the overall architecture."
Could you explain a bit more when one can't start a new application?

Jun



On Sat, Mar 10, 2018 at 1:40 AM, Jan Filipiak <jan.filip...@trivago.com>
wrote:

Hi Jun, thanks for your mail.
Thank you for your questions!
I think they are really good and tackle the core of the problem I see.

I will answer inline, mostly but still want to set the tone here.

The core strength of kafka is what Martin once called the
kappa-Architecture. How does this work?
You have everything as a log as in kafka. When you need to change
something.
You create the new version of your application and leave it running in
parallel.
Once the new version is good you switch your users to use the new
application.

The online reshuffling effectively breaks this architecture and I think
the switch in thinking here is more harmful
than any details about the partitioning function to allow such a change.
I
feel with my suggestion we are the closest to
the original and battle proven architecture and I can only warn to move
away from it.

I might have forgotten something, sometimes its hard for me to getting
all
the thoughts captured in a mail, but I hope the comments inline will
further make my concern clear, and put some emphasis on why I prefer my
solution ;)

One thing we should all be aware of when discussing this, and I think
Dong
should have mentioned it (maybe he did).
We are not discussing all of this out of thin air but there is an effort
in the Samza project.

https://cwiki.apache.org/confluence/display/SAMZA/SEP-5%3A+
Enable+partition+expansion+of+input+streams
https://issues.apache.org/jira/browse/SAMZA-1293

To be clear. I think SEP-5 (state of last week, dont know if it adapted
to
this discussion) is on a way better path than KIP-253, and I can't really
explain why.

Best Jan,

nice weekend everyone


On 09.03.2018 03:36, Jun Rao wrote:

Hi, Jan,
Thanks for the feedback. Just some comments on the earlier points that
you
mentioned.

50. You brought up the question of whether existing data needs to be
copied
during partition expansion. My understand of your view is that avoid
copying existing data will be more efficient, but it doesn't work well
with
compacted topics since some keys in the original partitions will never
be
cleaned. It would be useful to understand your use case of compacted
topics
a bit more. In the common use case, the data volume in a compacted topic
may not be large. So, I am not sure if there is a strong need to expand
partitions in a compacted topic, at least initially.

I do agree. State is usually smaller. Update rates might be also
competitively high.
Doing Log-compaction (even beeing very efficient and configurable) is
also
a more expensive operation than
just discarding old segments. Further if you want to use more consumers
processing the events
you also have to grow the number of partitions. Especially for use-cases
we do (KIP-213) a tiny state full
table might be very expensive to process if it joins against a huge
table.

I can just say we have been in the spot of needing to grow log compacted
topics. Mainly for processing power we can bring to the table.

Further i am not at all concerned about the extra spaced used by "garbage
keys". I am more concerned about the correctness of innocent consumers.
The
logic becomes complicated. Say for streams one would need to load the
record into state but not forward it the topology ( to have it available
for shuffeling). I rather have it simple and a topic clean regardless if
it
still has its old partition count. Especially with multiple partitions
growth's I think it becomes insanely hard to to this shuffle correct.
Maybe
Streams and Samza can do it. Especially if you do "hipster stream
processing" <https://www.confluent.io/blog/introducing-kafka-streams-
stream-processing-made-simple/>. This makes kafka way to complicated.
With my approach I think its way simpler because the topic has no
"history"
in terms of partitioning but is always clean.


51. "Growing the topic by an integer factor does not require any state
redistribution at all." Could you clarify this a bit more? Let's say you
have a consumer app that computes the windowed count per key. If you
double
the number of partitions from 1 to 2 and grow the consumer instances
from
1
to 2, we would need to redistribute some of the counts to the new
consumer
instance. Regarding to linear hashing, it's true that it won't solve the
problem with compacted topics. The main benefit is that it redistributes
the keys in one partition to no more than two partitions, which could
help
redistribute the state.

You don't need to spin up a new consumer in this case. every consumer
would just read every partition with the (partition % num_task) task. it
will still have the previous data right there and can go on.

This sounds contradictory to what I said before, but please bear with me.

52. Good point on coordinating the expansion of 2 topics that need to be
joined together. This is where the 2-phase partition expansion could
potentially help. In the first phase, we could add new partitions to
the 2
topics one at a time but without publishing to the new patitions. Then,
we
can add new consumer instances to pick up the new partitions. In this
transition phase, no reshuffling is needed since no data is coming from
the
new partitions. Finally, we can enable the publishing to the new
partitions.

I think its even worse than you think. I would like to introduce the
Term
transitive copartitioning. Imagine
2 streams application. One joins (A,B) the other (B,C) then there is a
transitive copartition requirement for
(A,C) to be copartitioned aswell. This can spread significantly and
require many consumers to adapt at the same time.

It is also not entirely clear to me how you not need reshuffling in this
case. If A has a record that never gets updated after the expansion and
the
coresponding B record moves to a new partition. How shall they meet w/o
shuffle?

53. "Migrating consumer is a step that might be made completly
unnecessary
if - for example streams - takes the gcd as partitioning scheme instead
of
enforcing 1 to 1." Not sure that I fully understand this. I think you
mean
that a consumer application can run more instances than the number of
partitions. In that case, the consumer can just repartitioning the input
data according to the number of instances. This is possible, but just
has
the overhead of reshuffling the data.

No what I meant is ( that is also your question i think Mathias) that if
you grow a topic by a factor.
Even if your processor is statefull you can can just assign all the
multiples of the previous partition to
this consumer and the state to keep processing correctly will be present
w/o any shuffling.

Say you have an assignment
Statefull consumer => partition
0 => 0
1 => 1
2 => 2

and you grow you topic by 4 you get,

0 => 0,3,6,9
1 => 1,4,7,10
2 => 2,5,8,11

Say your hashcode is 8. 8%3 => 2  before so consumer for partition 2 has
it.
Now you you have 12 partitions so 8%12 => 8, so it goes into partition 8
which is assigned to the same consumer
who had 2 before and therefore knows the key.

Userland reshuffeling is there as an options. And it does exactly what I
suggest. And I think its the perfect strategie. All I am suggestion is
broker side support to switch the producers to the newly partitioned
topic.
Then the old (to few partition topic) can go away.  Remember the list of
steps in the beginning of this thread. If one has broker support for all
where its required and streams support for those that aren’t necessarily.
Then one has solved the problem.
I repeat it because I think its important. I am really happy that you
brought that up! because its 100% what I want just with the differences
to
have an option to discard the to small topic later (after all consumers
adapted). And to have order correct there. I need broker support managing
the copy process + the produces and fence them against each other. I also
repeat. the copy process can run for weeks in the worst case. Copying the
data is not the longest task migrating consumers might very well be.
Once all consumers switched and copying is really up to date (think ISR
like up to date) only then we stop the producer, wait for the copy to
finish and use the new topic for producing.

After this the topic is perfect in shape. and no one needs to worry about
complicated stuff. (old keys hanging around might arrive in some other
topic later.....). can only imagine how many tricky bugs gonna arrive
after
someone had grown and shrunken is topic 10 times.






54. "The other thing I wanted to mention is that I believe the current
suggestion (without copying data over) can be implemented in pure
userland
with a custom partitioner and a small feedbackloop from ProduceResponse
=>
Partitionier in coorporation with a change management system." I am not
sure a customized partitioner itself solves the problem. We probably
need
some broker side support to enforce when the new partitions can be used.
We
also need some support on the consumer/kstream side to preserve the per
key
ordering and potentially migrate the processing state. This is not
trivial
and I am not sure if it's ideal to fully push to the application space.

Broker support is defenitly the preferred way here. I have nothing
against
broker support.
I tried to say that for what I would preffer - copying the data over, at
least for log compacted topics -
I would require more broker support than the KIP currently offers.



Jun

On Tue, Mar 6, 2018 at 10:33 PM, Jan Filipiak <jan.filip...@trivago.com
wrote:

Hi Dong,

are you actually reading my emails, or are you just using the thread I
started for general announcements regarding the KIP?

I tried to argue really hard against linear hashing. Growing the topic
by
an integer factor does not require any state redistribution at all. I
fail
to see completely where linear hashing helps on log compacted topics.

If you are not willing to explain to me what I might be overlooking:
that
is fine.
But I ask you to not reply to my emails then. Please understand my
frustration with this.

Best Jan



On 06.03.2018 19:38, Dong Lin wrote:

Hi everyone,

Thanks for all the comments! It appears that everyone prefers linear
hashing because it reduces the amount of state that needs to be moved
between consumers (for stream processing). The KIP has been updated to
use
linear hashing.

Regarding the migration endeavor: it seems that migrating producer
library
to use linear hashing should be pretty straightforward without
much operational endeavor. If we don't upgrade client library to use
this
KIP, we can not support in-order delivery after partition is changed
anyway. Suppose we upgrade client library to use this KIP, if
partition
number is not changed, the key -> partition mapping will be exactly
the
same as it is now because it is still determined using
murmur_hash(key)
%
original_partition_num. In other words, this change is backward
compatible.

Regarding the load distribution: if we use linear hashing, the load
may
be
unevenly distributed because those partitions which are not split may
receive twice as much traffic as other partitions that are split. This
issue can be mitigated by creating topic with partitions that are
several
times the number of consumers. And there will be no imbalance if the
partition number is always doubled. So this imbalance seems
acceptable.

Regarding storing the partition strategy as per-topic config: It seems
not
necessary since we can still use murmur_hash as the default hash
function
and additionally apply the linear hashing algorithm if the partition
number
has increased. Not sure if there is any use-case for producer to use a
different hash function. Jason, can you check if there is some
use-case
that I missed for using the per-topic partition strategy?

Regarding how to reduce latency (due to state store/load) in stream
processing consumer when partition number changes: I need to read the
Kafka
Stream code to understand how Kafka Stream currently migrate state
between
consumers when the application is added/removed for a given job. I
will
reply after I finish reading the documentation and code.


Thanks,
Dong


On Mon, Mar 5, 2018 at 10:43 AM, Jason Gustafson <ja...@confluent.io>
wrote:

Great discussion. I think I'm wondering whether we can continue to
leave

Kafka agnostic to the partitioning strategy. The challenge is
communicating
the partitioning logic from producers to consumers so that the
dependencies
between each epoch can be determined. For the sake of discussion,
imagine
you did something like the following:

1. The name (and perhaps version) of a partitioning strategy is
stored
in
topic configuration when a topic is created.
2. The producer looks up the partitioning strategy before writing to
a
topic and includes it in the produce request (for fencing). If it
doesn't
have an implementation for the configured strategy, it fails.
3. The consumer also looks up the partitioning strategy and uses it
to
determine dependencies when reading a new epoch. It could either fail
or
make the most conservative dependency assumptions if it doesn't know
how
to
implement the partitioning strategy. For the consumer, the new
interface
might look something like this:

// Return the partition dependencies following an epoch bump
Map<Integer, List<Integer>> dependencies(int
numPartitionsBeforeEpochBump,
int numPartitionsAfterEpochBump)

The unordered case then is just a particular implementation which
never
has
any epoch dependencies. To implement this, we would need some way for
the
consumer to find out how many partitions there were in each epoch,
but
maybe that's not too unreasonable.

Thanks,
Jason


On Mon, Mar 5, 2018 at 4:51 AM, Jan Filipiak <
jan.filip...@trivago.com
wrote:

Hi Dong

thank you very much for your questions.
regarding the time spend copying data across:
It is correct that copying data from a topic with one partition
mapping

to

a topic with a different partition mapping takes way longer than we
can

stop producers. Tens of minutes is a very optimistic estimate here.
Many
people can not afford copy full steam and therefore will have some
rate
limiting in place, this can bump the timespan into the day's. The
good

part

is that the vast majority of the data can be copied while the
producers

are

still going. One can then, piggyback the consumers ontop of this

timeframe,

by the method mentioned (provide them an mapping from their old
offsets

to

new offsets in their repartitioned topics. In that way we separate

migration of consumers from migration of producers (decoupling these
is
what kafka is strongest at). The time to actually swap over the
producers
should be kept minimal by ensuring that when a swap attempt is
started

the

consumer copying over should be very close to the log end and is

expected
to finish within the next fetch. The operation should have a
time-out
and
should be "reattemtable".

Importance of logcompaction:
If a producer produces key A, to partiton 0, its forever gonna be
there,
unless it gets deleted. The record might sit in there for years. A
new
producer started with the new partitions will fail to delete the
record

in

the correct partition. Th record will be there forever and one can
not

reliable bootstrap new consumers. I cannot see how linear hashing
can

solve

this.

Regarding your skipping of userland copying:
100%, copying the data across in userland is, as far as i can see,
only
a
usecase for log compacted topics. Even for logcompaction +
retentions
it
should only be opt-in. Why did I bring it up? I think log compaction
is
a
very important feature to really embrace kafka as a "data
plattform".
The
point I also want to make is that copying data this way is
completely
inline with the kafka architecture. it only consists of reading and

writing

to topics.

I hope it clarifies more why I think we should aim for more than the
current KIP. I fear that once the KIP is done not much more effort
will

be

taken.



On 04.03.2018 02:28, Dong Lin wrote:

Hey Jan,

In the current proposal, the consumer will be blocked on waiting for
other

consumers of the group to consume up to a given offset. In most
cases,
all
consumers should be close to the LEO of the partitions when the
partition
expansion happens. Thus the time waiting should not be long e.g. on
the

order of seconds. On the other hand, it may take a long time to wait
for
the entire partition to be copied -- the amount of time is
proportional

to

the amount of existing data in the partition, which can take tens of

minutes. So the amount of time that we stop consumers may not be on
the
same order of magnitude.

If we can implement this suggestion without copying data over in
purse
userland, it will be much more valuable. Do you have ideas on how
this

can

be done?

Not sure why the current KIP not help people who depend on log
compaction.

Could you elaborate more on this point?

Thanks,
Dong

On Wed, Feb 28, 2018 at 10:55 PM, Jan
Filipiak<Jan.Filipiak@trivago.
com
wrote:

Hi Dong,

I tried to focus on what the steps are one can currently perform to

expand
or shrink a keyed topic while maintaining a top notch semantics.
I can understand that there might be confusion about "stopping the
consumer". It is exactly the same as proposed in the KIP. there
needs

to

be
a time the producers agree on the new partitioning. The extra
semantics I

want to put in there is that we have a possibility to wait until
all

the
existing data
is copied over into the new partitioning scheme. When I say stopping
I
think more of having a memory barrier that ensures the ordering. I
am
still
aming for latencies  on the scale of leader failovers.

Consumers have to explicitly adapt the new partitioning scheme in
the
above scenario. The reason is that in these cases where you are

dependent

on a particular partitioning scheme, you also have other topics
that

have
co-partition enforcements or the kind -frequently. Therefore all
your

other
input topics might need to grow accordingly.


What I was suggesting was to streamline all these operations as
best
as
possible to have "real" partition grow and shrinkage going on.

Migrating

the producers to a new partitioning scheme can be much more
streamlined

with proper broker support for this. Migrating consumer is a step
that
might be made completly unnecessary if - for example streams -
takes

the

gcd as partitioning scheme instead of enforcing 1 to 1. Connect
consumers
and other consumers should be fine anyways.
I hope this makes more clear where I was aiming at. The rest needs
to

be

figured out. The only danger i see is that when we are introducing
this

feature as supposed in the KIP, it wont help any people depending on
log

compaction.
The other thing I wanted to mention is that I believe the current
suggestion (without copying data over) can be implemented in pure
userland
with a custom partitioner and a small feedbackloop from
ProduceResponse
=>
Partitionier in coorporation with a change management system.

Best Jan








On 28.02.2018 07:13, Dong Lin wrote:

Hey Jan,

I am not sure if it is acceptable for producer to be stopped for a

while,
particularly for online application which requires low latency. I
am
also
not sure how consumers can switch to a new topic. Does user

application

needs to explicitly specify a different topic for
producer/consumer

to

subscribe to? It will be helpful for discussion if you can provide
more
detail on the interface change for this solution.
Thanks,

Dong
On Mon, Feb 26, 2018 at 12:48 AM, Jan
Filipiak<Jan.Filipiak@trivago.

com

wrote:
Hi,

just want to throw my though in. In general the functionality is
very

usefull, we should though not try to find the architecture to
hard
while
implementing.

The manual steps would be to

create a new topic
the mirrormake from the new old topic to the new topic
wait for mirror making to catch up.
then put the consumers onto the new topic
          (having mirrormaker spit out a mapping from old
offsets to
new
offsets:
              if topic is increased by factor X there is gonna
be a
clean
mapping from 1 offset in the old topic to X offsets in the new
topic,
              if there is no factor then there is no chance to
generate a
mapping that can be reasonable used for continuing)
          make consumers stop at appropriate points and continue
consumption
with offsets from the mapping.
have the producers stop for a minimal time.
wait for mirrormaker to finish
let producer produce with the new metadata.


Instead of implementing the approach suggest in the KIP which
will
leave
log compacted topic completely crumbled and unusable.
I would much rather try to build infrastructure to support the
mentioned
above operations more smoothly.
Especially having producers stop and use another topic is
difficult

and

it would be nice if one can trigger "invalid metadata" exceptions
for
them
and
if one could give topics aliases so that their produces with the
old
topic
will arrive in the new topic.

The downsides are obvious I guess ( having the same data twice
for

the

transition period, but kafka tends to scale well with datasize).
So

its a

nicer fit into the architecture.
I further want to argument that the functionality by the KIP can
completely be implementing in "userland" with a custom
partitioner

that

handles the transition as needed. I would appreciate if someone
could
point
out what a custom partitioner couldn't handle in this case?
With the above approach, shrinking a topic becomes the same
steps.
Without
loosing keys in the discontinued partitions.

Would love to hear what everyone thinks.

Best Jan


















On 11.02.2018 00:35, Dong Lin wrote:

Hi all,

I have created KIP-253: Support in-order message delivery with

partition
expansion. See
https://cwiki.apache.org/confluence/display/KAFKA/KIP-253%
3A+Support+in-order+message+delivery+with+partition+expansion
.

This KIP provides a way to allow messages of the same key from
the
same
producer to be consumed in the same order they are produced
even
if

we

expand partition of the topic.
Thanks,
Dong








Reply via email to