Great discussion. I think I'm wondering whether we can continue to leave
Kafka agnostic to the partitioning strategy. The challenge is communicating
the partitioning logic from producers to consumers so that the dependencies
between each epoch can be determined. For the sake of discussion, imagine
you did something like the following:
1. The name (and perhaps version) of a partitioning strategy is stored in
topic configuration when a topic is created.
2. The producer looks up the partitioning strategy before writing to a
topic and includes it in the produce request (for fencing). If it doesn't
have an implementation for the configured strategy, it fails.
3. The consumer also looks up the partitioning strategy and uses it to
determine dependencies when reading a new epoch. It could either fail or
make the most conservative dependency assumptions if it doesn't know how to
implement the partitioning strategy. For the consumer, the new interface
might look something like this:
// Return the partition dependencies following an epoch bump
Map<Integer, List<Integer>> dependencies(int numPartitionsBeforeEpochBump,
int numPartitionsAfterEpochBump)
The unordered case then is just a particular implementation which never has
any epoch dependencies. To implement this, we would need some way for the
consumer to find out how many partitions there were in each epoch, but
maybe that's not too unreasonable.
Thanks,
Jason
On Mon, Mar 5, 2018 at 4:51 AM, Jan Filipiak <jan.filip...@trivago.com>
wrote:
Hi Dong
thank you very much for your questions.
regarding the time spend copying data across:
It is correct that copying data from a topic with one partition mapping
to
a topic with a different partition mapping takes way longer than we can
stop producers. Tens of minutes is a very optimistic estimate here. Many
people can not afford copy full steam and therefore will have some rate
limiting in place, this can bump the timespan into the day's. The good
part
is that the vast majority of the data can be copied while the producers
are
still going. One can then, piggyback the consumers ontop of this
timeframe,
by the method mentioned (provide them an mapping from their old offsets
to
new offsets in their repartitioned topics. In that way we separate
migration of consumers from migration of producers (decoupling these is
what kafka is strongest at). The time to actually swap over the producers
should be kept minimal by ensuring that when a swap attempt is started
the
consumer copying over should be very close to the log end and is expected
to finish within the next fetch. The operation should have a time-out and
should be "reattemtable".
Importance of logcompaction:
If a producer produces key A, to partiton 0, its forever gonna be there,
unless it gets deleted. The record might sit in there for years. A new
producer started with the new partitions will fail to delete the record
in
the correct partition. Th record will be there forever and one can not
reliable bootstrap new consumers. I cannot see how linear hashing can
solve
this.
Regarding your skipping of userland copying:
100%, copying the data across in userland is, as far as i can see, only a
usecase for log compacted topics. Even for logcompaction + retentions it
should only be opt-in. Why did I bring it up? I think log compaction is a
very important feature to really embrace kafka as a "data plattform". The
point I also want to make is that copying data this way is completely
inline with the kafka architecture. it only consists of reading and
writing
to topics.
I hope it clarifies more why I think we should aim for more than the
current KIP. I fear that once the KIP is done not much more effort will
be
taken.
On 04.03.2018 02:28, Dong Lin wrote:
Hey Jan,
In the current proposal, the consumer will be blocked on waiting for
other
consumers of the group to consume up to a given offset. In most cases,
all
consumers should be close to the LEO of the partitions when the
partition
expansion happens. Thus the time waiting should not be long e.g. on the
order of seconds. On the other hand, it may take a long time to wait for
the entire partition to be copied -- the amount of time is proportional
to
the amount of existing data in the partition, which can take tens of
minutes. So the amount of time that we stop consumers may not be on the
same order of magnitude.
If we can implement this suggestion without copying data over in purse
userland, it will be much more valuable. Do you have ideas on how this
can
be done?
Not sure why the current KIP not help people who depend on log
compaction.
Could you elaborate more on this point?
Thanks,
Dong
On Wed, Feb 28, 2018 at 10:55 PM, Jan Filipiak<jan.filip...@trivago.com
wrote:
Hi Dong,
I tried to focus on what the steps are one can currently perform to
expand
or shrink a keyed topic while maintaining a top notch semantics.
I can understand that there might be confusion about "stopping the
consumer". It is exactly the same as proposed in the KIP. there needs
to
be
a time the producers agree on the new partitioning. The extra
semantics I
want to put in there is that we have a possibility to wait until all
the
existing data
is copied over into the new partitioning scheme. When I say stopping I
think more of having a memory barrier that ensures the ordering. I am
still
aming for latencies on the scale of leader failovers.
Consumers have to explicitly adapt the new partitioning scheme in the
above scenario. The reason is that in these cases where you are
dependent
on a particular partitioning scheme, you also have other topics that
have
co-partition enforcements or the kind -frequently. Therefore all your
other
input topics might need to grow accordingly.
What I was suggesting was to streamline all these operations as best as
possible to have "real" partition grow and shrinkage going on.
Migrating
the producers to a new partitioning scheme can be much more streamlined
with proper broker support for this. Migrating consumer is a step that
might be made completly unnecessary if - for example streams - takes
the
gcd as partitioning scheme instead of enforcing 1 to 1. Connect
consumers
and other consumers should be fine anyways.
I hope this makes more clear where I was aiming at. The rest needs to
be
figured out. The only danger i see is that when we are introducing this
feature as supposed in the KIP, it wont help any people depending on
log
compaction.
The other thing I wanted to mention is that I believe the current
suggestion (without copying data over) can be implemented in pure
userland
with a custom partitioner and a small feedbackloop from ProduceResponse
=>
Partitionier in coorporation with a change management system.
Best Jan
On 28.02.2018 07:13, Dong Lin wrote:
Hey Jan,
I am not sure if it is acceptable for producer to be stopped for a
while,
particularly for online application which requires low latency. I am
also
not sure how consumers can switch to a new topic. Does user
application
needs to explicitly specify a different topic for producer/consumer to
subscribe to? It will be helpful for discussion if you can provide
more
detail on the interface change for this solution.
Thanks,
Dong
On Mon, Feb 26, 2018 at 12:48 AM, Jan Filipiak<Jan.Filipiak@trivago.
com
wrote:
Hi,
just want to throw my though in. In general the functionality is very
usefull, we should though not try to find the architecture to hard
while
implementing.
The manual steps would be to
create a new topic
the mirrormake from the new old topic to the new topic
wait for mirror making to catch up.
then put the consumers onto the new topic
(having mirrormaker spit out a mapping from old offsets to new
offsets:
if topic is increased by factor X there is gonna be a clean
mapping from 1 offset in the old topic to X offsets in the new topic,
if there is no factor then there is no chance to generate a
mapping that can be reasonable used for continuing)
make consumers stop at appropriate points and continue
consumption
with offsets from the mapping.
have the producers stop for a minimal time.
wait for mirrormaker to finish
let producer produce with the new metadata.
Instead of implementing the approach suggest in the KIP which will
leave
log compacted topic completely crumbled and unusable.
I would much rather try to build infrastructure to support the
mentioned
above operations more smoothly.
Especially having producers stop and use another topic is difficult
and
it would be nice if one can trigger "invalid metadata" exceptions for
them
and
if one could give topics aliases so that their produces with the old
topic
will arrive in the new topic.
The downsides are obvious I guess ( having the same data twice for
the
transition period, but kafka tends to scale well with datasize). So
its a
nicer fit into the architecture.
I further want to argument that the functionality by the KIP can
completely be implementing in "userland" with a custom partitioner
that
handles the transition as needed. I would appreciate if someone could
point
out what a custom partitioner couldn't handle in this case?
With the above approach, shrinking a topic becomes the same steps.
Without
loosing keys in the discontinued partitions.
Would love to hear what everyone thinks.
Best Jan
On 11.02.2018 00:35, Dong Lin wrote:
Hi all,
I have created KIP-253: Support in-order message delivery with
partition
expansion. See
https://cwiki.apache.org/confluence/display/KAFKA/KIP-253%
3A+Support+in-order+message+delivery+with+partition+expansion
.
This KIP provides a way to allow messages of the same key from the
same
producer to be consumed in the same order they are produced even if
we
expand partition of the topic.
Thanks,
Dong