Hi Dong,


On Tue, Feb 27, 2018 at 10:07 PM, Dong Lin <lindon...@gmail.com> wrote:

> Hey Allen,
>
> Thanks for the comments.
>
> On Mon, Feb 26, 2018 at 9:27 PM, Allen Wang <aw...@netflix.com.invalid>
> wrote:
>
> > Hi Dong,
> >
> > Please see my comments inline.
> >
> > Thanks,
> > Allen
> >
> > On Sun, Feb 25, 2018 at 3:33 PM, Dong Lin <lindon...@gmail.com> wrote:
> >
> > > Hey Allen,
> > >
> > > Thanks for your comment. I will comment inline.
> > >
> > > On Thu, Feb 22, 2018 at 3:05 PM, Allen Wang <aw...@netflix.com.invalid
> >
> > > wrote:
> > >
> > > > Overall this is a very useful feature. With this we can finally scale
> > > keyed
> > > > messages.
> > > >
> > > > +1 on the ability to remove partitions. This will greatly increase
> > > Kafka's
> > > > scalability in cloud.
> > > >
> > > > For example, when there is traffic increase, we can add brokers and
> > > assign
> > > > new partitions to the new brokers. When traffic decreases, we can
> mark
> > > > these new partitions as read only and remove them afterwards,
> together
> > > with
> > > > the brokers that host these partitions. This will be a light-weight
> > > > approach to scale a Kafka cluster compared to partition reassignment
> > > where
> > > > you will always have to move data.
> > > >
> > > > I have some suggestions:
> > > >
> > > > - The KIP described each step in detail which is great. However, it
> > lacks
> > > > the "why" part to explain the high level goal we want to achieve with
> > > each
> > > > step. For example, the purpose of step 5 may be described as "Make
> sure
> > > > consumers always first finish consuming all data prior to partition
> > > > expansion to enforce message ordering".
> > > >
> > >
> > > Yeah I think this is useful. This is a non-trivial KIP and it is useful
> > to
> > > explain the motivation of each change to help reading. I will added
> > > motivation for each change in the KIP. Please let me know if there is
> > > anything else that can make the KIP more readable.
> > >
> > >
> > > >
> > > > - The rejection of produce request at partition expansion should be
> > > > configurable because it does not matter for non-keyed messages. Same
> > with
> > > > the consumer behavior for step 5. This will ensure that for non-keyed
> > > > messages, partition expansion does not add the cost of possible
> message
> > > > drop on producer or message latency on the consumer.
> > > >
> > >
> > > Ideally we would like to avoid adding extra configs to keep the
> interface
> > > simple. I think the current overhead in the producer is actually very
> > > small. Partition expansion or deletion should happen very infrequently.
> > > Note that our producer today needs to refresh metadata whenever there
> is
> > > leadership movement, i.e. producer will receive
> > > NotLeaderForPartitionException from the old leader and keep refreshing
> > > metadata until it gets the new leader of the partition, which happens
> > much
> > > more frequently than Partition expansion or deletion. So I am not sure
> we
> > > should add a config to optimize this.
> > >
> >
> > I was concerned that at high message rate, rejecting requests could lead
> to
> > producer side buffer full and lead to unnecessary message drop on
> producer
> > side for non-keyed messages.
> >
> > What about the delay on consumer? It could be significant when one
> consumer
> > is lagging for certain partitions and all consumers in the same group
> have
> > to wait. This delay could be significant and again unnecessary for
> messages
> > where the order does not matter.
> >
>
> I agree this may increase delay on the producer side. Consumer is not
> impacted directly and any extra delay in consumer all comes from the extra
> delay in producer.
>
> Note that when broker has leadership change, if producer's metadata is
> still using the old leader, producer will see
> NotLeaderForPartitionException and will have to repeatedly update
> metadadata until the metadata uses the new leader. Do you think the
> metadata update after partition expansion, as introduced in this KIP, is
> any worse than the metadata update required during leadership change? If
> not, given that our user already needs to handle or tolerate the extra
> delay during leadership change, I think the extra delay after partition
> expansion should be fine.
>

I think the difference here is that when you expand partitions, almost all
produce requests will be rejected at that time. So this will have a
significant impact on producer buffer. Usually the leadership changes only
affect a few partitions so the impact is small.

On the consumer side, as Jason pointed out, all consumers will have to
synchronize on leader epoch. Let's say a consumer is lagging behind in a
partition which exists prior to expansion, and it takes an hour to catch
up. During this hour, all consumers in the same group will have to wait.
This is completely waste of time if ordering is not important.



>

> >
> >
> > >
> > >
> > >
> > > > - Since we now allow adding partitions for keyed messages while
> > > preserving
> > > > the message ordering on the consumer side, the default producer
> > > partitioner
> > > > seems to be inadequate as it rehashes all keys. As part of this KIP,
> > > should
> > > > we also include a partitioner that better handles partition changes,
> > for
> > > > example, with consistent hashing?
> > > >
> >
> > I am not sure I understand the problem with the default partitioner. Can
> > > you explain a bit more why default producer partitioner is inadequate
> > with
> > > this KIP? And why consistent hashing can be helpful?
> > >
> > >
> > >
> > The default partitioner use this algorithm for keyed messages:
> >
> > Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions
> >
> > As you can see, when the number of partitions are changed, there will be
> > complete rehash of keys to partitions. This causes a lot of changes to
> key
> > -> partition mapping and could be costly for stateful consumers. This is
> > not an issue before since we cannot change number of partitions for keyed
> > messages anyway (to avoid impact on message orders).
> >
> > If we introduce a partitioner with consistent hashing, the changes of key
> > to partition mapping will be minimized and I think it will help
> consumers.
> >
>
> The goal of this KIP is to ensure that consume can consume keyed messages
> in-order even if we expand partition of a topic. I agree that consistent
> hashing can reduce the changes of key-to-partition mapping. But can you be
> more specific how that helps consumers? Does this simply the design of this
> KIP or make the consumer faster?
>
> This should mitigate the issue that Matthias mentioned for log-compacted
> topics. But I am not sure if it is worthwhile to change the default hashing
> algorithm for this issue. Note that if we change the hash algorithm, then
> messages with the same key will be mapped to a different partition and then
> they maybe consumed by different consumer out-of-order.
>
>
>
Using consistent hashing will reduce the load of consumer when they have to
reload the state for new keys (from new partitions assigned).

Let's assume keys A and B is mapped to partition P1 before expansion, and
one consumer is processing partition P1. Also assume that the consumer is
using the StickyAssignor to minimize the partition changes on consumer
rebalance.

Using conventional hashing, after partitions are expanded, it is likely
that for partition P1, it now has keys C and D because all keys have been
rehashed to the partitions. With StickyAssignor, consumer will still be
assigned P1 so the consumer would have to load state for key C and D.

If we use consistent hashing, we can be pretty sure that either A or B will
remain in P1. With StickyAssignor, the consumer will keep partition P1. The
consumer may be assigned an extra partition and therefore some new keys.
But because at least one key (A or B) remains in P1, comparing with
conventional hashing, the consumer has less new states to load and
therefore it has less impact to disk/network traffic.

This is my understanding and hopefully some Kafka Stream experts can chime
in and comment.

What I suggest is that we add a new Partitioner that does the consistent
hashing and keep the default partitioner do the conventional hashing for
compatibility reason.



>
> > >
> > > > Thanks,
> > > > Allen
> > > >
> > > >
> > > > On Thu, Feb 22, 2018 at 11:52 AM, Jun Rao <j...@confluent.io> wrote:
> > > >
> > > > > Hi, Dong,
> > > > >
> > > > > Regarding deleting partitions, Gwen's point is right on. In some of
> > the
> > > > > usage of Kafka, the traffic can be bursty. When the traffic goes
> up,
> > > > adding
> > > > > partitions is a quick way of shifting some traffic to the newly
> added
> > > > > brokers. Once the traffic goes down, the newly added brokers will
> be
> > > > > reclaimed (potentially by moving replicas off those brokers).
> > However,
> > > if
> > > > > one can only add partitions without removing, eventually, one will
> > hit
> > > > the
> > > > > limit.
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jun
> > > > >
> > > > > On Wed, Feb 21, 2018 at 12:23 PM, Dong Lin <lindon...@gmail.com>
> > > wrote:
> > > > >
> > > > > > Hey Jun,
> > > > > >
> > > > > > Thanks much for your comments.
> > > > > >
> > > > > > On Wed, Feb 21, 2018 at 10:17 AM, Jun Rao <j...@confluent.io>
> > wrote:
> > > > > >
> > > > > > > Hi, Dong,
> > > > > > >
> > > > > > > Thanks for the KIP. At the high level, this makes sense. A few
> > > > comments
> > > > > > > below.
> > > > > > >
> > > > > > > 1. It would be useful to support removing partitions as well.
> The
> > > > > general
> > > > > > > idea could be bumping the leader epoch for the remaining
> > > partitions.
> > > > > For
> > > > > > > the partitions to be removed, we can make them read-only and
> > remove
> > > > > them
> > > > > > > after the retention time.
> > > > > > >
> > > > > >
> > > > > > I think we should be able to find a way to delete partitions of
> an
> > > > > existing
> > > > > > topic. But it will also add complexity to our broker and client
> > > > > > implementation. I am just not sure whether this feature is worth
> > the
> > > > > > complexity. Could you explain a bit more why user would want to
> > > delete
> > > > > > partitions of an existing topic? Is it to handle the human error
> > > where
> > > > a
> > > > > > topic is created with too many partitions by mistake?
> > > > > >
> > > > > >
> > > > > > >
> > > > > > > 2. If we support removing partitions, I am not sure if it's
> > enough
> > > to
> > > > > > fence
> > > > > > > off the producer using total partition number since the total
> > > > partition
> > > > > > > number may remain the same after adding and then removing
> > > partitions.
> > > > > > > Perhaps we need some notion of partition epoch.
> > > > > > >
> > > > > > > 3. In step 5) of the Proposed Changes, I am not sure that we
> can
> > > > always
> > > > > > > rely upon position 0 for dealing with the new partitions. A
> > > consumer
> > > > > will
> > > > > > > start consuming the new partition when some of the existing
> > records
> > > > > have
> > > > > > > been removed due to retention.
> > > > > > >
> > > > > >
> > > > > >
> > > > > > You are right. I have updated the KIP to compare the
> startPosition
> > > with
> > > > > the
> > > > > > earliest offset of the partition. If the startPosition > earliest
> > > > offset,
> > > > > > then the consumer can consume messages from the given partition
> > > > directly.
> > > > > > This should handle the case where some of the existing records
> have
> > > > been
> > > > > > removed before consumer starts consumption.
> > > > > >
> > > > > >
> > > > > > >
> > > > > > > 4. When the consumer is allowed to read messages after the
> > > partition
> > > > > > > expansion point, a key may be moved from one consumer instance
> to
> > > > > > another.
> > > > > > > In this case, similar to consumer rebalance, it's useful to
> > inform
> > > > the
> > > > > > > application about this so that the consumer can save and reload
> > the
> > > > per
> > > > > > key
> > > > > > > state. So, we need to either add some new callbacks or reuse
> the
> > > > > existing
> > > > > > > rebalance callbacks.
> > > > > > >
> > > > > >
> > > > > >
> > > > > > Good point. I will add the callback later after we discuss the
> need
> > > for
> > > > > > partition deletion.
> > > > > >
> > > > > >
> > > > > > >
> > > > > > > 5. There is some subtlety in assigning partitions. Currently,
> the
> > > > > > consumer
> > > > > > > assigns partitions without needing to know the consumption
> > offset.
> > > > This
> > > > > > > could mean that a particular consumer may be assigned some new
> > > > > partitions
> > > > > > > that are not consumable yet, which could lead to imbalanced
> load
> > > > > > > temporarily. Not sure if this is super important to address
> > though.
> > > > > > >
> > > > > >
> > > > > > Personally I think it is not worth adding more complexity just to
> > > > > optimize
> > > > > > this scenario. This imbalance should exist only for a short
> period
> > of
> > > > > time.
> > > > > > If it is important I can think more about how to handle it.
> > > > > >
> > > > > >
> > > > > > >
> > > > > > > Thanks,
> > > > > > >
> > > > > > > Jun
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On Sat, Feb 10, 2018 at 3:35 PM, Dong Lin <lindon...@gmail.com
> >
> > > > wrote:
> > > > > > >
> > > > > > > > Hi all,
> > > > > > > >
> > > > > > > > I have created KIP-253: Support in-order message delivery
> with
> > > > > > partition
> > > > > > > > expansion. See
> > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > > > > > 253%3A+Support+in-order+message+delivery+with+
> > > partition+expansion
> > > > > > > > .
> > > > > > > >
> > > > > > > > This KIP provides a way to allow messages of the same key
> from
> > > the
> > > > > same
> > > > > > > > producer to be consumed in the same order they are produced
> > even
> > > if
> > > > > we
> > > > > > > > expand partition of the topic.
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > > Dong
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to