Hi David,

Yes, I will continue working on this.

DJ02: As Kirk noted earlier, since the topic hash calculation is on server side 
(not client-side), we can safely introduce Guava as a dependency.

If there is no other discussion, we can start voting in 
https://lists.apache.org/thread/j90htmphjk783p697jjfg1xt8thmy33p. We can 
discuss implementation details in PR 
https://github.com/apache/kafka/pull/17444. I will resolve merge conflicts by 
March 14th. 

Thanks,
PoAn

> On Mar 13, 2025, at 12:43 AM, David Jacot <david.ja...@gmail.com> wrote:
> 
> Hi PoAn,
> 
> As we are getting closer to releasing 4.0, I think that we can resume
> working on this one if you are still interested. Are you? I would like
> to have it in 4.1.
> 
> Best,
> David
> 
> On Wed, Jan 15, 2025 at 1:26 AM Kirk True <k...@kirktrue.pro> wrote:
>> 
>> Hi all,
>> 
>> Hopefully a quick question...
>> 
>> KT01. Will clients calculate the topic hash on the client? Based on the 
>> current state of the KIP and PR, I would have thought "no", but I ask based 
>> on the discussion around the possible use of Guava on client.
>> 
>> Thanks,
>> Kirk
>> 
>> On Mon, Jan 6, 2025, at 9:11 AM, David Jacot wrote:
>>> Hi PoAn,
>>> 
>>> Thanks for the update. I haven't read the updated KIP yet.
>>> 
>>> DJ02: I am not sure about using Guava as a dependency. I mentioned it more
>>> as an inspiration/reference. I suppose that we could use it on the server
>>> but we should definitely not use it on the client. I am not sure how others
>>> feel about it.
>>> 
>>> Best,
>>> David
>>> 
>>> On Mon, Jan 6, 2025 at 5:21 AM PoAn Yang <yangp...@gmail.com> wrote:
>>> 
>>>> Hi Chia-Ping / David / Lucas,
>>>> 
>>>> Happy new year and thanks for the review.
>>>> 
>>>> DJ02: Thanks for the suggestion. I updated the PR to use Guava.
>>>> 
>>>> DJ03: Yes, I updated the description to mention ISR change,
>>>> add altering partition reassignment case, and mention that
>>>> non-related topic change doesn’t trigger a rebalance.
>>>> DJ03.1: Yes, I will keep using ModernGroup#requestMetadataRefresh
>>>> to notify group.
>>>> 
>>>> DJ06: Updated the PR to use Guava Hashing#combineUnordered
>>>> function to combine topic hash.
>>>> 
>>>> DJ07: Renamed it to MetadataHash.
>>>> 
>>>> DJ08: Added a sample hash function to the KIP and use first byte as magic
>>>> byte. This is also included in latest PR.
>>>> 
>>>> DJ09: Added two paragraphs about upgraded and downgraded.
>>>> 
>>>> DJ10: According to Lucas’s comment, I add StreamsGroupMetadataValue update
>>>> to this KIP.
>>>> 
>>>> Thanks,
>>>> PoAn
>>>> 
>>>> 
>>>>> On Dec 20, 2024, at 3:58 PM, Chia-Ping Tsai <chia7...@gmail.com> wrote:
>>>>> 
>>>>>> because assignors are sticky.
>>>>> 
>>>>> I forgot about that spec again :(
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> David Jacot <david.ja...@gmail.com> 於 2024年12月20日 週五 下午3:41寫道:
>>>>> 
>>>>>> Hi Chia-Ping,
>>>>>> 
>>>>>> DJ08: In my opinion, changing the format will be rare so it is
>>>>>> acceptable if rebalances are triggered in this case on
>>>>>> upgrade/downgrade. It is also what will happen if a cluster is
>>>>>> downgraded from 4.1 (with this KIP) to 4.0. The rebalance won't change
>>>>>> anything if the topology of the group is the same because assignors
>>>>>> are sticky. The default ones are and we recommend custom ones to also
>>>>>> be.
>>>>>> 
>>>>>> Best,
>>>>>> David
>>>>>> 
>>>>>> On Fri, Dec 20, 2024 at 2:11 AM Chia-Ping Tsai <chia7...@apache.org>
>>>>>> wrote:
>>>>>>> 
>>>>>>> ummm, it does not work for downgrade as the old coordinator has no idea
>>>>>> about new format :(
>>>>>>> 
>>>>>>> 
>>>>>>> On 2024/12/20 00:57:27 Chia-Ping Tsai wrote:
>>>>>>>> hi David
>>>>>>>> 
>>>>>>>>> DJ08:
>>>>>>>> 
>>>>>>>> That's a good question. If the "hash" lacks version control, it could
>>>>>> trigger a series of unnecessary rebalances. However, adding additional
>>>>>> information ("magic") to the hash does not help the upgraded coordinator
>>>>>> determine the "version." This means that the upgraded coordinator would
>>>>>> still trigger unnecessary rebalances because it has no way to know which
>>>>>> format to use when comparing the hash.
>>>>>>>> 
>>>>>>>> Perhaps we can add a new field to ConsumerGroupMetadataValue to
>>>>>> indicate the version of the "hash." This would allow the coordinator,
>>>> when
>>>>>> handling subscription metadata, to compute the old hash and determine
>>>>>> whether an epoch bump is necessary. Additionally, the coordinator can
>>>>>> generate a new record to upgrade the hash without requiring an epoch
>>>> bump.
>>>>>>>> 
>>>>>>>> Another issue is whether the coordinator should cache all versions of
>>>>>> the hash. I believe this is necessary; otherwise, during an upgrade,
>>>> there
>>>>>> would be extensive recomputing of old hashes.
>>>>>>>> 
>>>>>>>> I believe this idea should also work for downgrades, and that's just
>>>>>> my two cents.
>>>>>>>> 
>>>>>>>> Best,
>>>>>>>> Chia-Ping
>>>>>>>> 
>>>>>>>> 
>>>>>>>> On 2024/12/19 14:39:41 David Jacot wrote:
>>>>>>>>> Hi PoAn and Chia-Ping,
>>>>>>>>> 
>>>>>>>>> Thanks for your responses.
>>>>>>>>> 
>>>>>>>>> DJ02: Sorry, I was not clear. I was wondering whether we could
>>>>>> compute the
>>>>>>>>> hash without having to convert to bytes before. Guava has a nice
>>>>>> interface
>>>>>>>>> for this allowing to incrementally add primitive types to the hash.
>>>>>> We can
>>>>>>>>> discuss this in the PR as it is an implementation detail.
>>>>>>>>> 
>>>>>>>>> DJ03: Thanks. I don't think that the replicas are updated when a
>>>>>> broker
>>>>>>>>> shuts down. What you said applies to the ISR. I suppose that we can
>>>>>> rely on
>>>>>>>>> the ISR changes to trigger updates. It is also worth noting
>>>>>>>>> that TopicsDelta#changedTopics is updated for every change (e.g. ISR
>>>>>>>>> change, leader change, replicas change, etc.). I suppose that it is
>>>>>> OK but
>>>>>>>>> it seems that it will trigger refreshes which are not necessary.
>>>>>> However, a
>>>>>>>>> rebalance won't be triggered because the hash won't change.
>>>>>>>>> DJ03.1: I suppose that we will continue to rely on
>>>>>>>>> ModernGroup#requestMetadataRefresh to notify groups that must
>>>>>> refresh their
>>>>>>>>> hashes. Is my understanding correct?
>>>>>>>>> 
>>>>>>>>> DJ05: Fair enough.
>>>>>>>>> 
>>>>>>>>> DJ06: You mention in two places that you would like to combine
>>>>>> hashes by
>>>>>>>>> additioning them. I wonder if this is a good practice. Intuitively,
>>>>>> I would
>>>>>>>>> have used XOR or hashed the hashed. Guava has a method for combining
>>>>>>>>> hashes. It may be worth looking into the algorithm used.
>>>>>>>>> 
>>>>>>>>> DJ07: I would rename "AllTopicHash" to "MetadataHash" in order to be
>>>>>> more
>>>>>>>>> generic.
>>>>>>>>> 
>>>>>>>>> DJ08: Regarding the per topic hash, I wonder whether we should
>>>>>> precise in
>>>>>>>>> the KIP how we will compute it. I had the following in mind:
>>>>>>>>> hash(topicName; numPartitions; [partitionId;sorted racks]). We could
>>>>>> also
>>>>>>>>> add a magic byte at the first element as a sort of version. I am not
>>>>>> sure
>>>>>>>>> whether it is needed though. I was thinking about this while
>>>>>> imagining how
>>>>>>>>> we would handle changing the format in the future.
>>>>>>>>> 
>>>>>>>>> DJ09: It would be great if we could provide more details about
>>>>>> backward
>>>>>>>>> compatibility. What happens when the cluster is upgraded or
>>>>>> downgraded?
>>>>>>>>> 
>>>>>>>>> DJ10: We should update KIP-1071. It may be worth pigging them in the
>>>>>>>>> discussion thread of KIP-1071.
>>>>>>>>> 
>>>>>>>>> Best,
>>>>>>>>> David
>>>>>>>>> 
>>>>>>>>> On Tue, Dec 17, 2024 at 9:25 AM PoAn Yang <yangp...@gmail.com>
>>>>>> wrote:
>>>>>>>>> 
>>>>>>>>>> Hi Chia-Ping / David / Andrew,
>>>>>>>>>> 
>>>>>>>>>> Thanks for the review and suggestions.
>>>>>>>>>> 
>>>>>>>>>> DJ01: Removed all implementation details.
>>>>>>>>>> 
>>>>>>>>>> DJ02: Does the “incrementally” mean that we only calculate the
>>>>>> difference
>>>>>>>>>> parts?
>>>>>>>>>> For example, if the number of partition change, we only calculate
>>>>>> the hash
>>>>>>>>>> of number of partition and reconstruct it to the topic hash.
>>>>>>>>>> IMO, we only calculate topic hash one time. With cache mechanism,
>>>>>> the
>>>>>>>>>> value can be reused in different groups on a same broker.
>>>>>>>>>> The CPU usage for this part is not very high.
>>>>>>>>>> 
>>>>>>>>>> DJ03: Added the update path to KIP for both cases.
>>>>>>>>>> 
>>>>>>>>>> DJ04: Yes, it’s a good idea. With cache mechanism and single hash
>>>>>> per
>>>>>>>>>> group, we can balance cpu and disk usage.
>>>>>>>>>> 
>>>>>>>>>> DJ05: Currently, the topic hash is only used in coordinator.
>>>>>> However, the
>>>>>>>>>> metadata image is used in many different places.
>>>>>>>>>> How about we move the hash to metadata image when we find more use
>>>>>> cases?
>>>>>>>>>> 
>>>>>>>>>> AS1, AS2: Thanks for the reminder. I will simply delete
>>>>>>>>>> ShareGroupPartitionMetadataKey/Value and add a new field to
>>>>>>>>>> ShareGroupMetadataValue.
>>>>>>>>>> 
>>>>>>>>>> Best,
>>>>>>>>>> PoAn
>>>>>>>>>> 
>>>>>>>>>>> On Dec 17, 2024, at 5:50 AM, Andrew Schofield <
>>>>>>>>>> andrew_schofield_j...@outlook.com> wrote:
>>>>>>>>>>> 
>>>>>>>>>>> Hi PoAn,
>>>>>>>>>>> Thanks for the KIP.
>>>>>>>>>>> 
>>>>>>>>>>> AS1: From the point of view of share groups, the API and record
>>>>>> schema
>>>>>>>>>>> definitions are unstable in AK 4.0. In AK 4.1, we will start
>>>>>> supporting
>>>>>>>>>> proper
>>>>>>>>>>> versioning. As a result, I think you do not need to deprecate
>>>>>> the fields
>>>>>>>>>> in the
>>>>>>>>>>> ShareGroupPartitionMetadataValue. Just include the schema for
>>>>>> the fields
>>>>>>>>>>> which are actually needed, and I'll update the schema in the
>>>>>> code when
>>>>>>>>>>> the KIP is implemented.
>>>>>>>>>>> 
>>>>>>>>>>> AS2: In the event that DJ04 actually removes the need for
>>>>>>>>>>> ConsumerGroupPartitionMetadataKey/Value entirely, I would simply
>>>>>>>>>>> delete ShareGroupPartitionMetadataKey/Value, assuming that it is
>>>>>>>>>>> accepted in time for AK 4.1.
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> Thanks,
>>>>>>>>>>> Andrew
>>>>>>>>>>> ________________________________________
>>>>>>>>>>> From: Chia-Ping Tsai <chia7...@apache.org>
>>>>>>>>>>> Sent: 16 December 2024 16:27
>>>>>>>>>>> To: dev@kafka.apache.org <dev@kafka.apache.org>
>>>>>>>>>>> Subject: Re: [DISCUSS] KIP-1101: Trigger rebalance on rack
>>>>>> topology
>>>>>>>>>> changes
>>>>>>>>>>> 
>>>>>>>>>>> hi David
>>>>>>>>>>> 
>>>>>>>>>>>> DJ05
>>>>>>>>>>> 
>>>>>>>>>>> One of the benefits of having a single hash per group (DJ04) is
>>>>>> the
>>>>>>>>>> reduction in the size of stored data. Additionally, the cost of
>>>>>>>>>> re-computing can be minimized thanks to caching. So I'm + 1 to
>>>>>> DJ04.
>>>>>>>>>> However, the advantage of storing the topic cache in the metadata
>>>>>> image is
>>>>>>>>>> somewhat unclear to me. Could you please provide more details on
>>>>>> what you
>>>>>>>>>> mean by "tight"?Furthermore, since the metadata image is a
>>>>>> thread-safe
>>>>>>>>>> object, we need to ensure that the lazy initialization is also
>>>>>> thread-safe.
>>>>>>>>>> If no other components require the cache, it would be better to
>>>>>> keep the
>>>>>>>>>> caches within the coordinator.
>>>>>>>>>>> 
>>>>>>>>>>> Best,
>>>>>>>>>>> Chia-Ping
>>>>>>>>>>> 
>>>>>>>>>>> On 2024/12/16 14:01:35 David Jacot wrote:
>>>>>>>>>>>> Hi PoAn,
>>>>>>>>>>>> 
>>>>>>>>>>>> Thanks for the KIP. I have some comments about it.
>>>>>>>>>>>> 
>>>>>>>>>>>> DJ01: Please, remove all the code from the KIP. We only care
>>>>>> about
>>>>>>>>>> public
>>>>>>>>>>>> interface changes, not about implementation details.
>>>>>>>>>>>> DJ02: Regarding the hash computation, I agree that we should use
>>>>>>>>>> Murmur3.
>>>>>>>>>>>> However, I don't quite like the implementation that you shared.
>>>>>> I
>>>>>>>>>> wonder if
>>>>>>>>>>>> we could make it work incrementally instead of computing a hash
>>>>>> of
>>>>>>>>>>>> everything and combining them.
>>>>>>>>>>>> DJ03: Regarding the cache, my understanding is that the cache is
>>>>>>>>>> populated
>>>>>>>>>>>> when a topic without hash is seen in a HB request and the cache
>>>>>> is
>>>>>>>>>> cleaned
>>>>>>>>>>>> up when topics are deleted based on the metadata image.
>>>>>> However, the
>>>>>>>>>> update
>>>>>>>>>>>> path is not clear. Let's say that a partition is added to a
>>>>>> topic, how
>>>>>>>>>> does
>>>>>>>>>>>> it detect it? Let's also imagine that the racks of a partition
>>>>>> have
>>>>>>>>>>>> changed, how does it detect it? In the KIP, it would be nice to
>>>>>> be clear
>>>>>>>>>>>> about those.
>>>>>>>>>>>> DJ04: I wonder whether we should go with a single hash per
>>>>>> group. Your
>>>>>>>>>>>> argument against it is that it would require to re-compute the
>>>>>> hash of
>>>>>>>>>> all
>>>>>>>>>>>> the topics when it needs to be computed. In my opinion, we could
>>>>>>>>>> leverage
>>>>>>>>>>>> the cached hash per topic to compute the hash of all the
>>>>>> subscribed
>>>>>>>>>> ones.
>>>>>>>>>>>> We could basically combine all the hashes without having to
>>>>>> compute all
>>>>>>>>>> of
>>>>>>>>>>>> them. This approach has a few benefits. 1) We could get rid of
>>>>>>>>>>>> the ConsumerGroupPartitionMetadata record as we could store the
>>>>>> hash
>>>>>>>>>> with
>>>>>>>>>>>> the group epoch. 2) We could get rid of the Map that we keep in
>>>>>> each
>>>>>>>>>> group
>>>>>>>>>>>> to store the hashed corresponding to the subscribed topics.
>>>>>>>>>>>> DJ05: Regarding the cache again, I wonder if we should actually
>>>>>> store
>>>>>>>>>> the
>>>>>>>>>>>> hash in the metadata image instead of maintaining it somewhere
>>>>>> else. We
>>>>>>>>>>>> could still lazily compute it. The benefit is that the value
>>>>>> would be
>>>>>>>>>> tight
>>>>>>>>>>>> to the topic. I have not really looked into it. Would it be an
>>>>>> option?
>>>>>>>>>>>> 
>>>>>>>>>>>> I'll be away for two weeks starting from Saturday. I kindly ask
>>>>>> you to
>>>>>>>>>> wait
>>>>>>>>>>>> on me if we cannot conclude this week.
>>>>>>>>>>>> 
>>>>>>>>>>>> Best,
>>>>>>>>>>>> David
>>>>>>>>>>>> 
>>>>>>>>>>>> On Tue, Nov 5, 2024 at 1:43 PM Frank Yang <yangp...@gmail.com>
>>>>>> wrote:
>>>>>>>>>>>> 
>>>>>>>>>>>>> Hi Chia-Ping,
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Thanks for the review and suggestions.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Q0: Add how rack change and how it affects topic partition.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Q1: Add why we need a balance algorithm to Motivation section.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Q2: After checking again, we don’t need to update cache when
>>>>>> we replay
>>>>>>>>>>>>> records. We only need to renew it in consumer heartbeat.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Q3: Add a new section “Topic Hash Function”.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Thanks.
>>>>>>>>>>>>> PoAn
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> On Nov 1, 2024, at 4:39 PM, Chia-Ping Tsai <
>>>>>> chia7...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> hi PoAn
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Thanks for for this KIP!
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Q0: Could you add more details about `A topic partition has
>>>>>> rack
>>>>>>>>>> change`?
>>>>>>>>>>>>>> IIRC, the "rack change" includes both follower and leader,
>>>>>> right?
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Q1: Could you please add the 'concerns' we discussed to the
>>>>>> Motivation
>>>>>>>>>>>>>> section? This should include topics like 'computations' and
>>>>>> 'space
>>>>>>>>>>>>> usage'.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Q2: `The group coordinator can leverage it to add a new topic
>>>>>>>>>> hash.`This
>>>>>>>>>>>>>> description seems a bit off to me. Why do we need to update
>>>>>> the cache
>>>>>>>>>> at
>>>>>>>>>>>>>> this phase? The cache is intended to prevent duplicate
>>>>>> computations
>>>>>>>>>>>>> caused
>>>>>>>>>>>>>> by heartbeat requests that occur between two metadata change
>>>>>> events.
>>>>>>>>>>>>>> Therefore, we could even remove the changed topics from
>>>>>> caches on a
>>>>>>>>>>>>>> metadata change, as the first heartbeat request would update
>>>>>> the
>>>>>>>>>> caches
>>>>>>>>>>>>> for
>>>>>>>>>>>>>> all changed topics.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Q3: Could you please include a section about the choice of
>>>>>> hash
>>>>>>>>>>>>>> implementation? The hash implementation must be consistent
>>>>>> across
>>>>>>>>>>>>> different
>>>>>>>>>>>>>> JDKs, so we use Murmur3 to generate the hash value.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>> Chia-Ping
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Frank Yang <yangp...@gmail.com> 於 2024年11月1日 週五 下午3:57寫道:
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> I would like to start a discussion thread on KIP-1101.
>>>>>> Trigger
>>>>>>>>>> rebalance
>>>>>>>>>>>>>>> on rack topology changes. In this KIP, we aim to use less
>>>>>> memory /
>>>>>>>>>> disk
>>>>>>>>>>>>>>> resources to detect rack changes in the new coordinator.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>> 
>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1101%3A+Trigger+rebalance+on+rack+topology+changes
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Please take a look and feel free to share any thoughts.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Thanks.
>>>>>>>>>>>>>>> PoAn
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>> 
>>>> 
>>> 

Reply via email to