linear hashing (or partition splitting) vs rehashing every key: The benefit
of the former is that it reduces the # of partitions to which keys from an
existing partition are re-distributed, which potentially reduces the
overhead of rebuilding the state in a consumer. The downside is that the
load may not be distributed evenly across partitions unless # partitions is
doubled. When the number of existing partitions is already large, one may
not want to always double the partitions.
Even with linear hashing, certain consumer instances within a consumer
group still need to rebuild the state for some partitions. This can still
affect the overall latency of a streaming job if the processing depends on
data coming from all partitions. Another way to improve this is to add
partitions in two steps. In the first step, new partitions will be added
and exposed to the consumer, but not the producer. After this step, the
consumer can start preparing the state for the new partitions, but won't
need to use them since there is no data in those new partitions yet. In the
second step, the producer can start publishing to the new partitions. At
this point, the consumer needs to process the data in new partitions.
However, if the state for the new partitions are almost ready, the amount
of waiting will be minimal. We can potentially add a new config that
controls the delay between the 2 steps.
On Fri, Mar 2, 2018 at 1:28 PM, Jay Kreps <j...@confluent.io> wrote:
> Hey Dong,
> Cool, obviously we'd need to have a solution here work with connect and
> streams to be viable.
> On the linear hashing thing, what I am talking about is something
> different. I am talking about splitting existing partitions incrementally.
> E.g. if you have 100 partitions and want to move to 110. Obviously a naive
> approach which added partitions would require you to reshuffle all data as
> the hashing of all data would change. A linear hashing-like scheme gives an
> approach by which you can split individual partitions one at a time to
> avoid needing to reshuffle much data. This approach has the benefit that at
> any time you have a fixed number of partitions and all data is fully
> partitioned with whatever the partition count you choose is but also has
> the benefit that you can dynamically scale up or down the partition count.
> This seems like it simplifies things like log compaction etc.
> On Sun, Feb 25, 2018 at 3:51 PM, Dong Lin <lindon...@gmail.com> wrote:
> > Hey Jay,
> > Thanks for the comment!
> > I have not specifically thought about how this works with Streams and
> > Connect. The current KIP w.r.t. the interface that our producer and
> > consumer exposes to the user. It ensures that if there are two messages
> > with the same key produced by the same producer, say messageA and
> > and suppose messageB is produced after messageA to a different partition
> > than messageA, then we can guarantee that the following sequence can
> > in order:
> > - Consumer of messageA can execute callback, in which user can flush
> > related to the key of messageA.
> > - messageA is delivered by its consumer to the application
> > - Consumer of messageB can execute callback, in which user can load the
> > state related to the key of messageB.
> > - messageB is delivered by its consumer to the application.
> > So it seems that it should support Streams and Connect properly. But I am
> > not entirely sure because I have not looked into how Streams and Connect
> > works. I can think about it more if you can provide an example where this
> > does not work for Streams and Connect.
> > Regarding the second question, I think linear hashing approach provides a
> > way to reduce the number of partitions that can "conflict" with a give
> > partition to *log_2(n)*, as compares to *n* in the current KIP, where n
> > the total number of partitions of the topic. This will be useful when
> > number of partition is large and asymptotic complexity matters.
> > I personally don't think this optimization is worth the additional
> > complexity in Kafka. This is because partition expansion or deletion
> > happen infrequently and the largest number of partitions of a single
> > today is not that large -- probably 1000 or less. And when partitions of
> > topic changes, each consumer will likely need to query and wait for
> > positions of a large percentage of partitions of the topic anyway even
> > this optimization. I think this algorithm is kind of orthogonal to this
> > KIP. We can extend the KIP to support this algorithm in the future as
> > Thanks,
> > Dong
> > On Thu, Feb 22, 2018 at 5:19 PM, Jay Kreps <j...@confluent.io> wrote:
> > > Hey Dong,
> > >
> > > Two questions:
> > > 1. How will this work with Streams and Connect?
> > > 2. How does this compare to a solution where we physically split
> > partitions
> > > using a linear hashing approach (the partition number is equivalent to
> > the
> > > hash bucket in a hash table)? https://en.wikipedia.org/wiki/
> > Linear_hashing
> > >
> > > -Jay
> > >
> > > On Sat, Feb 10, 2018 at 3:35 PM, Dong Lin <lindon...@gmail.com> wrote:
> > >
> > > > Hi all,
> > > >
> > > > I have created KIP-253: Support in-order message delivery with
> > partition
> > > > expansion. See
> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > 253%3A+Support+in-order+message+delivery+with+partition+expansion
> > > > .
> > > >
> > > > This KIP provides a way to allow messages of the same key from the
> > > > producer to be consumed in the same order they are produced even if
> > > > expand partition of the topic.
> > > >
> > > > Thanks,
> > > > Dong
> > > >
> > >