Hi PoAn,

I was late to the discussion but I had some questions.

JK01. I foresee scenarios where the cache leaves stale topic hashes around
when a group leaves.
Should we maintain a counter for the number of groups subscribed to the
topic (hash) to only keep active topics?

JK02. Can you help me understand when we would actually be computing
group-level hashes, is it on every
heartbeat?

Thanks,
Jeff

On Wed, Mar 12, 2025 at 11:33 PM PoAn Yang <yangp...@gmail.com> wrote:

> Hi David,
>
> Yes, I will continue working on this.
>
> DJ02: As Kirk noted earlier, since the topic hash calculation is on server
> side (not client-side), we can safely introduce Guava as a dependency.
>
> If there is no other discussion, we can start voting in
> https://lists.apache.org/thread/j90htmphjk783p697jjfg1xt8thmy33p. We can
> discuss implementation details in PR
> https://github.com/apache/kafka/pull/17444. I will resolve merge
> conflicts by March 14th.
>
> Thanks,
> PoAn
>
> > On Mar 13, 2025, at 12:43 AM, David Jacot <david.ja...@gmail.com> wrote:
> >
> > Hi PoAn,
> >
> > As we are getting closer to releasing 4.0, I think that we can resume
> > working on this one if you are still interested. Are you? I would like
> > to have it in 4.1.
> >
> > Best,
> > David
> >
> > On Wed, Jan 15, 2025 at 1:26 AM Kirk True <k...@kirktrue.pro> wrote:
> >>
> >> Hi all,
> >>
> >> Hopefully a quick question...
> >>
> >> KT01. Will clients calculate the topic hash on the client? Based on the
> current state of the KIP and PR, I would have thought "no", but I ask based
> on the discussion around the possible use of Guava on client.
> >>
> >> Thanks,
> >> Kirk
> >>
> >> On Mon, Jan 6, 2025, at 9:11 AM, David Jacot wrote:
> >>> Hi PoAn,
> >>>
> >>> Thanks for the update. I haven't read the updated KIP yet.
> >>>
> >>> DJ02: I am not sure about using Guava as a dependency. I mentioned it
> more
> >>> as an inspiration/reference. I suppose that we could use it on the
> server
> >>> but we should definitely not use it on the client. I am not sure how
> others
> >>> feel about it.
> >>>
> >>> Best,
> >>> David
> >>>
> >>> On Mon, Jan 6, 2025 at 5:21 AM PoAn Yang <yangp...@gmail.com> wrote:
> >>>
> >>>> Hi Chia-Ping / David / Lucas,
> >>>>
> >>>> Happy new year and thanks for the review.
> >>>>
> >>>> DJ02: Thanks for the suggestion. I updated the PR to use Guava.
> >>>>
> >>>> DJ03: Yes, I updated the description to mention ISR change,
> >>>> add altering partition reassignment case, and mention that
> >>>> non-related topic change doesn’t trigger a rebalance.
> >>>> DJ03.1: Yes, I will keep using ModernGroup#requestMetadataRefresh
> >>>> to notify group.
> >>>>
> >>>> DJ06: Updated the PR to use Guava Hashing#combineUnordered
> >>>> function to combine topic hash.
> >>>>
> >>>> DJ07: Renamed it to MetadataHash.
> >>>>
> >>>> DJ08: Added a sample hash function to the KIP and use first byte as
> magic
> >>>> byte. This is also included in latest PR.
> >>>>
> >>>> DJ09: Added two paragraphs about upgraded and downgraded.
> >>>>
> >>>> DJ10: According to Lucas’s comment, I add StreamsGroupMetadataValue
> update
> >>>> to this KIP.
> >>>>
> >>>> Thanks,
> >>>> PoAn
> >>>>
> >>>>
> >>>>> On Dec 20, 2024, at 3:58 PM, Chia-Ping Tsai <chia7...@gmail.com>
> wrote:
> >>>>>
> >>>>>> because assignors are sticky.
> >>>>>
> >>>>> I forgot about that spec again :(
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>> David Jacot <david.ja...@gmail.com> 於 2024年12月20日 週五 下午3:41寫道:
> >>>>>
> >>>>>> Hi Chia-Ping,
> >>>>>>
> >>>>>> DJ08: In my opinion, changing the format will be rare so it is
> >>>>>> acceptable if rebalances are triggered in this case on
> >>>>>> upgrade/downgrade. It is also what will happen if a cluster is
> >>>>>> downgraded from 4.1 (with this KIP) to 4.0. The rebalance won't
> change
> >>>>>> anything if the topology of the group is the same because assignors
> >>>>>> are sticky. The default ones are and we recommend custom ones to
> also
> >>>>>> be.
> >>>>>>
> >>>>>> Best,
> >>>>>> David
> >>>>>>
> >>>>>> On Fri, Dec 20, 2024 at 2:11 AM Chia-Ping Tsai <chia7...@apache.org
> >
> >>>>>> wrote:
> >>>>>>>
> >>>>>>> ummm, it does not work for downgrade as the old coordinator has no
> idea
> >>>>>> about new format :(
> >>>>>>>
> >>>>>>>
> >>>>>>> On 2024/12/20 00:57:27 Chia-Ping Tsai wrote:
> >>>>>>>> hi David
> >>>>>>>>
> >>>>>>>>> DJ08:
> >>>>>>>>
> >>>>>>>> That's a good question. If the "hash" lacks version control, it
> could
> >>>>>> trigger a series of unnecessary rebalances. However, adding
> additional
> >>>>>> information ("magic") to the hash does not help the upgraded
> coordinator
> >>>>>> determine the "version." This means that the upgraded coordinator
> would
> >>>>>> still trigger unnecessary rebalances because it has no way to know
> which
> >>>>>> format to use when comparing the hash.
> >>>>>>>>
> >>>>>>>> Perhaps we can add a new field to ConsumerGroupMetadataValue to
> >>>>>> indicate the version of the "hash." This would allow the
> coordinator,
> >>>> when
> >>>>>> handling subscription metadata, to compute the old hash and
> determine
> >>>>>> whether an epoch bump is necessary. Additionally, the coordinator
> can
> >>>>>> generate a new record to upgrade the hash without requiring an epoch
> >>>> bump.
> >>>>>>>>
> >>>>>>>> Another issue is whether the coordinator should cache all
> versions of
> >>>>>> the hash. I believe this is necessary; otherwise, during an upgrade,
> >>>> there
> >>>>>> would be extensive recomputing of old hashes.
> >>>>>>>>
> >>>>>>>> I believe this idea should also work for downgrades, and that's
> just
> >>>>>> my two cents.
> >>>>>>>>
> >>>>>>>> Best,
> >>>>>>>> Chia-Ping
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On 2024/12/19 14:39:41 David Jacot wrote:
> >>>>>>>>> Hi PoAn and Chia-Ping,
> >>>>>>>>>
> >>>>>>>>> Thanks for your responses.
> >>>>>>>>>
> >>>>>>>>> DJ02: Sorry, I was not clear. I was wondering whether we could
> >>>>>> compute the
> >>>>>>>>> hash without having to convert to bytes before. Guava has a nice
> >>>>>> interface
> >>>>>>>>> for this allowing to incrementally add primitive types to the
> hash.
> >>>>>> We can
> >>>>>>>>> discuss this in the PR as it is an implementation detail.
> >>>>>>>>>
> >>>>>>>>> DJ03: Thanks. I don't think that the replicas are updated when a
> >>>>>> broker
> >>>>>>>>> shuts down. What you said applies to the ISR. I suppose that we
> can
> >>>>>> rely on
> >>>>>>>>> the ISR changes to trigger updates. It is also worth noting
> >>>>>>>>> that TopicsDelta#changedTopics is updated for every change (e.g.
> ISR
> >>>>>>>>> change, leader change, replicas change, etc.). I suppose that it
> is
> >>>>>> OK but
> >>>>>>>>> it seems that it will trigger refreshes which are not necessary.
> >>>>>> However, a
> >>>>>>>>> rebalance won't be triggered because the hash won't change.
> >>>>>>>>> DJ03.1: I suppose that we will continue to rely on
> >>>>>>>>> ModernGroup#requestMetadataRefresh to notify groups that must
> >>>>>> refresh their
> >>>>>>>>> hashes. Is my understanding correct?
> >>>>>>>>>
> >>>>>>>>> DJ05: Fair enough.
> >>>>>>>>>
> >>>>>>>>> DJ06: You mention in two places that you would like to combine
> >>>>>> hashes by
> >>>>>>>>> additioning them. I wonder if this is a good practice.
> Intuitively,
> >>>>>> I would
> >>>>>>>>> have used XOR or hashed the hashed. Guava has a method for
> combining
> >>>>>>>>> hashes. It may be worth looking into the algorithm used.
> >>>>>>>>>
> >>>>>>>>> DJ07: I would rename "AllTopicHash" to "MetadataHash" in order
> to be
> >>>>>> more
> >>>>>>>>> generic.
> >>>>>>>>>
> >>>>>>>>> DJ08: Regarding the per topic hash, I wonder whether we should
> >>>>>> precise in
> >>>>>>>>> the KIP how we will compute it. I had the following in mind:
> >>>>>>>>> hash(topicName; numPartitions; [partitionId;sorted racks]). We
> could
> >>>>>> also
> >>>>>>>>> add a magic byte at the first element as a sort of version. I am
> not
> >>>>>> sure
> >>>>>>>>> whether it is needed though. I was thinking about this while
> >>>>>> imagining how
> >>>>>>>>> we would handle changing the format in the future.
> >>>>>>>>>
> >>>>>>>>> DJ09: It would be great if we could provide more details about
> >>>>>> backward
> >>>>>>>>> compatibility. What happens when the cluster is upgraded or
> >>>>>> downgraded?
> >>>>>>>>>
> >>>>>>>>> DJ10: We should update KIP-1071. It may be worth pigging them in
> the
> >>>>>>>>> discussion thread of KIP-1071.
> >>>>>>>>>
> >>>>>>>>> Best,
> >>>>>>>>> David
> >>>>>>>>>
> >>>>>>>>> On Tue, Dec 17, 2024 at 9:25 AM PoAn Yang <yangp...@gmail.com>
> >>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Hi Chia-Ping / David / Andrew,
> >>>>>>>>>>
> >>>>>>>>>> Thanks for the review and suggestions.
> >>>>>>>>>>
> >>>>>>>>>> DJ01: Removed all implementation details.
> >>>>>>>>>>
> >>>>>>>>>> DJ02: Does the “incrementally” mean that we only calculate the
> >>>>>> difference
> >>>>>>>>>> parts?
> >>>>>>>>>> For example, if the number of partition change, we only
> calculate
> >>>>>> the hash
> >>>>>>>>>> of number of partition and reconstruct it to the topic hash.
> >>>>>>>>>> IMO, we only calculate topic hash one time. With cache
> mechanism,
> >>>>>> the
> >>>>>>>>>> value can be reused in different groups on a same broker.
> >>>>>>>>>> The CPU usage for this part is not very high.
> >>>>>>>>>>
> >>>>>>>>>> DJ03: Added the update path to KIP for both cases.
> >>>>>>>>>>
> >>>>>>>>>> DJ04: Yes, it’s a good idea. With cache mechanism and single
> hash
> >>>>>> per
> >>>>>>>>>> group, we can balance cpu and disk usage.
> >>>>>>>>>>
> >>>>>>>>>> DJ05: Currently, the topic hash is only used in coordinator.
> >>>>>> However, the
> >>>>>>>>>> metadata image is used in many different places.
> >>>>>>>>>> How about we move the hash to metadata image when we find more
> use
> >>>>>> cases?
> >>>>>>>>>>
> >>>>>>>>>> AS1, AS2: Thanks for the reminder. I will simply delete
> >>>>>>>>>> ShareGroupPartitionMetadataKey/Value and add a new field to
> >>>>>>>>>> ShareGroupMetadataValue.
> >>>>>>>>>>
> >>>>>>>>>> Best,
> >>>>>>>>>> PoAn
> >>>>>>>>>>
> >>>>>>>>>>> On Dec 17, 2024, at 5:50 AM, Andrew Schofield <
> >>>>>>>>>> andrew_schofield_j...@outlook.com> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>> Hi PoAn,
> >>>>>>>>>>> Thanks for the KIP.
> >>>>>>>>>>>
> >>>>>>>>>>> AS1: From the point of view of share groups, the API and record
> >>>>>> schema
> >>>>>>>>>>> definitions are unstable in AK 4.0. In AK 4.1, we will start
> >>>>>> supporting
> >>>>>>>>>> proper
> >>>>>>>>>>> versioning. As a result, I think you do not need to deprecate
> >>>>>> the fields
> >>>>>>>>>> in the
> >>>>>>>>>>> ShareGroupPartitionMetadataValue. Just include the schema for
> >>>>>> the fields
> >>>>>>>>>>> which are actually needed, and I'll update the schema in the
> >>>>>> code when
> >>>>>>>>>>> the KIP is implemented.
> >>>>>>>>>>>
> >>>>>>>>>>> AS2: In the event that DJ04 actually removes the need for
> >>>>>>>>>>> ConsumerGroupPartitionMetadataKey/Value entirely, I would
> simply
> >>>>>>>>>>> delete ShareGroupPartitionMetadataKey/Value, assuming that it
> is
> >>>>>>>>>>> accepted in time for AK 4.1.
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> Thanks,
> >>>>>>>>>>> Andrew
> >>>>>>>>>>> ________________________________________
> >>>>>>>>>>> From: Chia-Ping Tsai <chia7...@apache.org>
> >>>>>>>>>>> Sent: 16 December 2024 16:27
> >>>>>>>>>>> To: dev@kafka.apache.org <dev@kafka.apache.org>
> >>>>>>>>>>> Subject: Re: [DISCUSS] KIP-1101: Trigger rebalance on rack
> >>>>>> topology
> >>>>>>>>>> changes
> >>>>>>>>>>>
> >>>>>>>>>>> hi David
> >>>>>>>>>>>
> >>>>>>>>>>>> DJ05
> >>>>>>>>>>>
> >>>>>>>>>>> One of the benefits of having a single hash per group (DJ04) is
> >>>>>> the
> >>>>>>>>>> reduction in the size of stored data. Additionally, the cost of
> >>>>>>>>>> re-computing can be minimized thanks to caching. So I'm + 1 to
> >>>>>> DJ04.
> >>>>>>>>>> However, the advantage of storing the topic cache in the
> metadata
> >>>>>> image is
> >>>>>>>>>> somewhat unclear to me. Could you please provide more details on
> >>>>>> what you
> >>>>>>>>>> mean by "tight"?Furthermore, since the metadata image is a
> >>>>>> thread-safe
> >>>>>>>>>> object, we need to ensure that the lazy initialization is also
> >>>>>> thread-safe.
> >>>>>>>>>> If no other components require the cache, it would be better to
> >>>>>> keep the
> >>>>>>>>>> caches within the coordinator.
> >>>>>>>>>>>
> >>>>>>>>>>> Best,
> >>>>>>>>>>> Chia-Ping
> >>>>>>>>>>>
> >>>>>>>>>>> On 2024/12/16 14:01:35 David Jacot wrote:
> >>>>>>>>>>>> Hi PoAn,
> >>>>>>>>>>>>
> >>>>>>>>>>>> Thanks for the KIP. I have some comments about it.
> >>>>>>>>>>>>
> >>>>>>>>>>>> DJ01: Please, remove all the code from the KIP. We only care
> >>>>>> about
> >>>>>>>>>> public
> >>>>>>>>>>>> interface changes, not about implementation details.
> >>>>>>>>>>>> DJ02: Regarding the hash computation, I agree that we should
> use
> >>>>>>>>>> Murmur3.
> >>>>>>>>>>>> However, I don't quite like the implementation that you
> shared.
> >>>>>> I
> >>>>>>>>>> wonder if
> >>>>>>>>>>>> we could make it work incrementally instead of computing a
> hash
> >>>>>> of
> >>>>>>>>>>>> everything and combining them.
> >>>>>>>>>>>> DJ03: Regarding the cache, my understanding is that the cache
> is
> >>>>>>>>>> populated
> >>>>>>>>>>>> when a topic without hash is seen in a HB request and the
> cache
> >>>>>> is
> >>>>>>>>>> cleaned
> >>>>>>>>>>>> up when topics are deleted based on the metadata image.
> >>>>>> However, the
> >>>>>>>>>> update
> >>>>>>>>>>>> path is not clear. Let's say that a partition is added to a
> >>>>>> topic, how
> >>>>>>>>>> does
> >>>>>>>>>>>> it detect it? Let's also imagine that the racks of a partition
> >>>>>> have
> >>>>>>>>>>>> changed, how does it detect it? In the KIP, it would be nice
> to
> >>>>>> be clear
> >>>>>>>>>>>> about those.
> >>>>>>>>>>>> DJ04: I wonder whether we should go with a single hash per
> >>>>>> group. Your
> >>>>>>>>>>>> argument against it is that it would require to re-compute the
> >>>>>> hash of
> >>>>>>>>>> all
> >>>>>>>>>>>> the topics when it needs to be computed. In my opinion, we
> could
> >>>>>>>>>> leverage
> >>>>>>>>>>>> the cached hash per topic to compute the hash of all the
> >>>>>> subscribed
> >>>>>>>>>> ones.
> >>>>>>>>>>>> We could basically combine all the hashes without having to
> >>>>>> compute all
> >>>>>>>>>> of
> >>>>>>>>>>>> them. This approach has a few benefits. 1) We could get rid of
> >>>>>>>>>>>> the ConsumerGroupPartitionMetadata record as we could store
> the
> >>>>>> hash
> >>>>>>>>>> with
> >>>>>>>>>>>> the group epoch. 2) We could get rid of the Map that we keep
> in
> >>>>>> each
> >>>>>>>>>> group
> >>>>>>>>>>>> to store the hashed corresponding to the subscribed topics.
> >>>>>>>>>>>> DJ05: Regarding the cache again, I wonder if we should
> actually
> >>>>>> store
> >>>>>>>>>> the
> >>>>>>>>>>>> hash in the metadata image instead of maintaining it somewhere
> >>>>>> else. We
> >>>>>>>>>>>> could still lazily compute it. The benefit is that the value
> >>>>>> would be
> >>>>>>>>>> tight
> >>>>>>>>>>>> to the topic. I have not really looked into it. Would it be an
> >>>>>> option?
> >>>>>>>>>>>>
> >>>>>>>>>>>> I'll be away for two weeks starting from Saturday. I kindly
> ask
> >>>>>> you to
> >>>>>>>>>> wait
> >>>>>>>>>>>> on me if we cannot conclude this week.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Best,
> >>>>>>>>>>>> David
> >>>>>>>>>>>>
> >>>>>>>>>>>> On Tue, Nov 5, 2024 at 1:43 PM Frank Yang <yangp...@gmail.com
> >
> >>>>>> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>>> Hi Chia-Ping,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Thanks for the review and suggestions.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Q0: Add how rack change and how it affects topic partition.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Q1: Add why we need a balance algorithm to Motivation
> section.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Q2: After checking again, we don’t need to update cache when
> >>>>>> we replay
> >>>>>>>>>>>>> records. We only need to renew it in consumer heartbeat.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Q3: Add a new section “Topic Hash Function”.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Thanks.
> >>>>>>>>>>>>> PoAn
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> On Nov 1, 2024, at 4:39 PM, Chia-Ping Tsai <
> >>>>>> chia7...@gmail.com>
> >>>>>>>>>> wrote:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> hi PoAn
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Thanks for for this KIP!
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Q0: Could you add more details about `A topic partition has
> >>>>>> rack
> >>>>>>>>>> change`?
> >>>>>>>>>>>>>> IIRC, the "rack change" includes both follower and leader,
> >>>>>> right?
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Q1: Could you please add the 'concerns' we discussed to the
> >>>>>> Motivation
> >>>>>>>>>>>>>> section? This should include topics like 'computations' and
> >>>>>> 'space
> >>>>>>>>>>>>> usage'.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Q2: `The group coordinator can leverage it to add a new
> topic
> >>>>>>>>>> hash.`This
> >>>>>>>>>>>>>> description seems a bit off to me. Why do we need to update
> >>>>>> the cache
> >>>>>>>>>> at
> >>>>>>>>>>>>>> this phase? The cache is intended to prevent duplicate
> >>>>>> computations
> >>>>>>>>>>>>> caused
> >>>>>>>>>>>>>> by heartbeat requests that occur between two metadata change
> >>>>>> events.
> >>>>>>>>>>>>>> Therefore, we could even remove the changed topics from
> >>>>>> caches on a
> >>>>>>>>>>>>>> metadata change, as the first heartbeat request would update
> >>>>>> the
> >>>>>>>>>> caches
> >>>>>>>>>>>>> for
> >>>>>>>>>>>>>> all changed topics.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Q3: Could you please include a section about the choice of
> >>>>>> hash
> >>>>>>>>>>>>>> implementation? The hash implementation must be consistent
> >>>>>> across
> >>>>>>>>>>>>> different
> >>>>>>>>>>>>>> JDKs, so we use Murmur3 to generate the hash value.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>> Chia-Ping
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Frank Yang <yangp...@gmail.com> 於 2024年11月1日 週五 下午3:57寫道:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Hi all,
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> I would like to start a discussion thread on KIP-1101.
> >>>>>> Trigger
> >>>>>>>>>> rebalance
> >>>>>>>>>>>>>>> on rack topology changes. In this KIP, we aim to use less
> >>>>>> memory /
> >>>>>>>>>> disk
> >>>>>>>>>>>>>>> resources to detect rack changes in the new coordinator.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>
> >>>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1101%3A+Trigger+rebalance+on+rack+topology+changes
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Please take a look and feel free to share any thoughts.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Thanks.
> >>>>>>>>>>>>>>> PoAn
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>
> >>>>
> >>>
>
>

Reply via email to