Hi David, Yes, I will continue working on this.
DJ02: As Kirk noted earlier, since the topic hash calculation is on server side (not client-side), we can safely introduce Guava as a dependency. If there is no other discussion, we can start voting in https://lists.apache.org/thread/j90htmphjk783p697jjfg1xt8thmy33p. We can discuss implementation details in PR https://github.com/apache/kafka/pull/17444. I will resolve merge conflicts by March 14th. Thanks, PoAn > On Mar 13, 2025, at 12:43 AM, David Jacot <david.ja...@gmail.com> wrote: > > Hi PoAn, > > As we are getting closer to releasing 4.0, I think that we can resume > working on this one if you are still interested. Are you? I would like > to have it in 4.1. > > Best, > David > > On Wed, Jan 15, 2025 at 1:26 AM Kirk True <k...@kirktrue.pro> wrote: >> >> Hi all, >> >> Hopefully a quick question... >> >> KT01. Will clients calculate the topic hash on the client? Based on the >> current state of the KIP and PR, I would have thought "no", but I ask based >> on the discussion around the possible use of Guava on client. >> >> Thanks, >> Kirk >> >> On Mon, Jan 6, 2025, at 9:11 AM, David Jacot wrote: >>> Hi PoAn, >>> >>> Thanks for the update. I haven't read the updated KIP yet. >>> >>> DJ02: I am not sure about using Guava as a dependency. I mentioned it more >>> as an inspiration/reference. I suppose that we could use it on the server >>> but we should definitely not use it on the client. I am not sure how others >>> feel about it. >>> >>> Best, >>> David >>> >>> On Mon, Jan 6, 2025 at 5:21 AM PoAn Yang <yangp...@gmail.com> wrote: >>> >>>> Hi Chia-Ping / David / Lucas, >>>> >>>> Happy new year and thanks for the review. >>>> >>>> DJ02: Thanks for the suggestion. I updated the PR to use Guava. >>>> >>>> DJ03: Yes, I updated the description to mention ISR change, >>>> add altering partition reassignment case, and mention that >>>> non-related topic change doesn’t trigger a rebalance. >>>> DJ03.1: Yes, I will keep using ModernGroup#requestMetadataRefresh >>>> to notify group. >>>> >>>> DJ06: Updated the PR to use Guava Hashing#combineUnordered >>>> function to combine topic hash. >>>> >>>> DJ07: Renamed it to MetadataHash. >>>> >>>> DJ08: Added a sample hash function to the KIP and use first byte as magic >>>> byte. This is also included in latest PR. >>>> >>>> DJ09: Added two paragraphs about upgraded and downgraded. >>>> >>>> DJ10: According to Lucas’s comment, I add StreamsGroupMetadataValue update >>>> to this KIP. >>>> >>>> Thanks, >>>> PoAn >>>> >>>> >>>>> On Dec 20, 2024, at 3:58 PM, Chia-Ping Tsai <chia7...@gmail.com> wrote: >>>>> >>>>>> because assignors are sticky. >>>>> >>>>> I forgot about that spec again :( >>>>> >>>>> >>>>> >>>>> >>>>> David Jacot <david.ja...@gmail.com> 於 2024年12月20日 週五 下午3:41寫道: >>>>> >>>>>> Hi Chia-Ping, >>>>>> >>>>>> DJ08: In my opinion, changing the format will be rare so it is >>>>>> acceptable if rebalances are triggered in this case on >>>>>> upgrade/downgrade. It is also what will happen if a cluster is >>>>>> downgraded from 4.1 (with this KIP) to 4.0. The rebalance won't change >>>>>> anything if the topology of the group is the same because assignors >>>>>> are sticky. The default ones are and we recommend custom ones to also >>>>>> be. >>>>>> >>>>>> Best, >>>>>> David >>>>>> >>>>>> On Fri, Dec 20, 2024 at 2:11 AM Chia-Ping Tsai <chia7...@apache.org> >>>>>> wrote: >>>>>>> >>>>>>> ummm, it does not work for downgrade as the old coordinator has no idea >>>>>> about new format :( >>>>>>> >>>>>>> >>>>>>> On 2024/12/20 00:57:27 Chia-Ping Tsai wrote: >>>>>>>> hi David >>>>>>>> >>>>>>>>> DJ08: >>>>>>>> >>>>>>>> That's a good question. If the "hash" lacks version control, it could >>>>>> trigger a series of unnecessary rebalances. However, adding additional >>>>>> information ("magic") to the hash does not help the upgraded coordinator >>>>>> determine the "version." This means that the upgraded coordinator would >>>>>> still trigger unnecessary rebalances because it has no way to know which >>>>>> format to use when comparing the hash. >>>>>>>> >>>>>>>> Perhaps we can add a new field to ConsumerGroupMetadataValue to >>>>>> indicate the version of the "hash." This would allow the coordinator, >>>> when >>>>>> handling subscription metadata, to compute the old hash and determine >>>>>> whether an epoch bump is necessary. Additionally, the coordinator can >>>>>> generate a new record to upgrade the hash without requiring an epoch >>>> bump. >>>>>>>> >>>>>>>> Another issue is whether the coordinator should cache all versions of >>>>>> the hash. I believe this is necessary; otherwise, during an upgrade, >>>> there >>>>>> would be extensive recomputing of old hashes. >>>>>>>> >>>>>>>> I believe this idea should also work for downgrades, and that's just >>>>>> my two cents. >>>>>>>> >>>>>>>> Best, >>>>>>>> Chia-Ping >>>>>>>> >>>>>>>> >>>>>>>> On 2024/12/19 14:39:41 David Jacot wrote: >>>>>>>>> Hi PoAn and Chia-Ping, >>>>>>>>> >>>>>>>>> Thanks for your responses. >>>>>>>>> >>>>>>>>> DJ02: Sorry, I was not clear. I was wondering whether we could >>>>>> compute the >>>>>>>>> hash without having to convert to bytes before. Guava has a nice >>>>>> interface >>>>>>>>> for this allowing to incrementally add primitive types to the hash. >>>>>> We can >>>>>>>>> discuss this in the PR as it is an implementation detail. >>>>>>>>> >>>>>>>>> DJ03: Thanks. I don't think that the replicas are updated when a >>>>>> broker >>>>>>>>> shuts down. What you said applies to the ISR. I suppose that we can >>>>>> rely on >>>>>>>>> the ISR changes to trigger updates. It is also worth noting >>>>>>>>> that TopicsDelta#changedTopics is updated for every change (e.g. ISR >>>>>>>>> change, leader change, replicas change, etc.). I suppose that it is >>>>>> OK but >>>>>>>>> it seems that it will trigger refreshes which are not necessary. >>>>>> However, a >>>>>>>>> rebalance won't be triggered because the hash won't change. >>>>>>>>> DJ03.1: I suppose that we will continue to rely on >>>>>>>>> ModernGroup#requestMetadataRefresh to notify groups that must >>>>>> refresh their >>>>>>>>> hashes. Is my understanding correct? >>>>>>>>> >>>>>>>>> DJ05: Fair enough. >>>>>>>>> >>>>>>>>> DJ06: You mention in two places that you would like to combine >>>>>> hashes by >>>>>>>>> additioning them. I wonder if this is a good practice. Intuitively, >>>>>> I would >>>>>>>>> have used XOR or hashed the hashed. Guava has a method for combining >>>>>>>>> hashes. It may be worth looking into the algorithm used. >>>>>>>>> >>>>>>>>> DJ07: I would rename "AllTopicHash" to "MetadataHash" in order to be >>>>>> more >>>>>>>>> generic. >>>>>>>>> >>>>>>>>> DJ08: Regarding the per topic hash, I wonder whether we should >>>>>> precise in >>>>>>>>> the KIP how we will compute it. I had the following in mind: >>>>>>>>> hash(topicName; numPartitions; [partitionId;sorted racks]). We could >>>>>> also >>>>>>>>> add a magic byte at the first element as a sort of version. I am not >>>>>> sure >>>>>>>>> whether it is needed though. I was thinking about this while >>>>>> imagining how >>>>>>>>> we would handle changing the format in the future. >>>>>>>>> >>>>>>>>> DJ09: It would be great if we could provide more details about >>>>>> backward >>>>>>>>> compatibility. What happens when the cluster is upgraded or >>>>>> downgraded? >>>>>>>>> >>>>>>>>> DJ10: We should update KIP-1071. It may be worth pigging them in the >>>>>>>>> discussion thread of KIP-1071. >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> David >>>>>>>>> >>>>>>>>> On Tue, Dec 17, 2024 at 9:25 AM PoAn Yang <yangp...@gmail.com> >>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi Chia-Ping / David / Andrew, >>>>>>>>>> >>>>>>>>>> Thanks for the review and suggestions. >>>>>>>>>> >>>>>>>>>> DJ01: Removed all implementation details. >>>>>>>>>> >>>>>>>>>> DJ02: Does the “incrementally” mean that we only calculate the >>>>>> difference >>>>>>>>>> parts? >>>>>>>>>> For example, if the number of partition change, we only calculate >>>>>> the hash >>>>>>>>>> of number of partition and reconstruct it to the topic hash. >>>>>>>>>> IMO, we only calculate topic hash one time. With cache mechanism, >>>>>> the >>>>>>>>>> value can be reused in different groups on a same broker. >>>>>>>>>> The CPU usage for this part is not very high. >>>>>>>>>> >>>>>>>>>> DJ03: Added the update path to KIP for both cases. >>>>>>>>>> >>>>>>>>>> DJ04: Yes, it’s a good idea. With cache mechanism and single hash >>>>>> per >>>>>>>>>> group, we can balance cpu and disk usage. >>>>>>>>>> >>>>>>>>>> DJ05: Currently, the topic hash is only used in coordinator. >>>>>> However, the >>>>>>>>>> metadata image is used in many different places. >>>>>>>>>> How about we move the hash to metadata image when we find more use >>>>>> cases? >>>>>>>>>> >>>>>>>>>> AS1, AS2: Thanks for the reminder. I will simply delete >>>>>>>>>> ShareGroupPartitionMetadataKey/Value and add a new field to >>>>>>>>>> ShareGroupMetadataValue. >>>>>>>>>> >>>>>>>>>> Best, >>>>>>>>>> PoAn >>>>>>>>>> >>>>>>>>>>> On Dec 17, 2024, at 5:50 AM, Andrew Schofield < >>>>>>>>>> andrew_schofield_j...@outlook.com> wrote: >>>>>>>>>>> >>>>>>>>>>> Hi PoAn, >>>>>>>>>>> Thanks for the KIP. >>>>>>>>>>> >>>>>>>>>>> AS1: From the point of view of share groups, the API and record >>>>>> schema >>>>>>>>>>> definitions are unstable in AK 4.0. In AK 4.1, we will start >>>>>> supporting >>>>>>>>>> proper >>>>>>>>>>> versioning. As a result, I think you do not need to deprecate >>>>>> the fields >>>>>>>>>> in the >>>>>>>>>>> ShareGroupPartitionMetadataValue. Just include the schema for >>>>>> the fields >>>>>>>>>>> which are actually needed, and I'll update the schema in the >>>>>> code when >>>>>>>>>>> the KIP is implemented. >>>>>>>>>>> >>>>>>>>>>> AS2: In the event that DJ04 actually removes the need for >>>>>>>>>>> ConsumerGroupPartitionMetadataKey/Value entirely, I would simply >>>>>>>>>>> delete ShareGroupPartitionMetadataKey/Value, assuming that it is >>>>>>>>>>> accepted in time for AK 4.1. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Thanks, >>>>>>>>>>> Andrew >>>>>>>>>>> ________________________________________ >>>>>>>>>>> From: Chia-Ping Tsai <chia7...@apache.org> >>>>>>>>>>> Sent: 16 December 2024 16:27 >>>>>>>>>>> To: dev@kafka.apache.org <dev@kafka.apache.org> >>>>>>>>>>> Subject: Re: [DISCUSS] KIP-1101: Trigger rebalance on rack >>>>>> topology >>>>>>>>>> changes >>>>>>>>>>> >>>>>>>>>>> hi David >>>>>>>>>>> >>>>>>>>>>>> DJ05 >>>>>>>>>>> >>>>>>>>>>> One of the benefits of having a single hash per group (DJ04) is >>>>>> the >>>>>>>>>> reduction in the size of stored data. Additionally, the cost of >>>>>>>>>> re-computing can be minimized thanks to caching. So I'm + 1 to >>>>>> DJ04. >>>>>>>>>> However, the advantage of storing the topic cache in the metadata >>>>>> image is >>>>>>>>>> somewhat unclear to me. Could you please provide more details on >>>>>> what you >>>>>>>>>> mean by "tight"?Furthermore, since the metadata image is a >>>>>> thread-safe >>>>>>>>>> object, we need to ensure that the lazy initialization is also >>>>>> thread-safe. >>>>>>>>>> If no other components require the cache, it would be better to >>>>>> keep the >>>>>>>>>> caches within the coordinator. >>>>>>>>>>> >>>>>>>>>>> Best, >>>>>>>>>>> Chia-Ping >>>>>>>>>>> >>>>>>>>>>> On 2024/12/16 14:01:35 David Jacot wrote: >>>>>>>>>>>> Hi PoAn, >>>>>>>>>>>> >>>>>>>>>>>> Thanks for the KIP. I have some comments about it. >>>>>>>>>>>> >>>>>>>>>>>> DJ01: Please, remove all the code from the KIP. We only care >>>>>> about >>>>>>>>>> public >>>>>>>>>>>> interface changes, not about implementation details. >>>>>>>>>>>> DJ02: Regarding the hash computation, I agree that we should use >>>>>>>>>> Murmur3. >>>>>>>>>>>> However, I don't quite like the implementation that you shared. >>>>>> I >>>>>>>>>> wonder if >>>>>>>>>>>> we could make it work incrementally instead of computing a hash >>>>>> of >>>>>>>>>>>> everything and combining them. >>>>>>>>>>>> DJ03: Regarding the cache, my understanding is that the cache is >>>>>>>>>> populated >>>>>>>>>>>> when a topic without hash is seen in a HB request and the cache >>>>>> is >>>>>>>>>> cleaned >>>>>>>>>>>> up when topics are deleted based on the metadata image. >>>>>> However, the >>>>>>>>>> update >>>>>>>>>>>> path is not clear. Let's say that a partition is added to a >>>>>> topic, how >>>>>>>>>> does >>>>>>>>>>>> it detect it? Let's also imagine that the racks of a partition >>>>>> have >>>>>>>>>>>> changed, how does it detect it? In the KIP, it would be nice to >>>>>> be clear >>>>>>>>>>>> about those. >>>>>>>>>>>> DJ04: I wonder whether we should go with a single hash per >>>>>> group. Your >>>>>>>>>>>> argument against it is that it would require to re-compute the >>>>>> hash of >>>>>>>>>> all >>>>>>>>>>>> the topics when it needs to be computed. In my opinion, we could >>>>>>>>>> leverage >>>>>>>>>>>> the cached hash per topic to compute the hash of all the >>>>>> subscribed >>>>>>>>>> ones. >>>>>>>>>>>> We could basically combine all the hashes without having to >>>>>> compute all >>>>>>>>>> of >>>>>>>>>>>> them. This approach has a few benefits. 1) We could get rid of >>>>>>>>>>>> the ConsumerGroupPartitionMetadata record as we could store the >>>>>> hash >>>>>>>>>> with >>>>>>>>>>>> the group epoch. 2) We could get rid of the Map that we keep in >>>>>> each >>>>>>>>>> group >>>>>>>>>>>> to store the hashed corresponding to the subscribed topics. >>>>>>>>>>>> DJ05: Regarding the cache again, I wonder if we should actually >>>>>> store >>>>>>>>>> the >>>>>>>>>>>> hash in the metadata image instead of maintaining it somewhere >>>>>> else. We >>>>>>>>>>>> could still lazily compute it. The benefit is that the value >>>>>> would be >>>>>>>>>> tight >>>>>>>>>>>> to the topic. I have not really looked into it. Would it be an >>>>>> option? >>>>>>>>>>>> >>>>>>>>>>>> I'll be away for two weeks starting from Saturday. I kindly ask >>>>>> you to >>>>>>>>>> wait >>>>>>>>>>>> on me if we cannot conclude this week. >>>>>>>>>>>> >>>>>>>>>>>> Best, >>>>>>>>>>>> David >>>>>>>>>>>> >>>>>>>>>>>> On Tue, Nov 5, 2024 at 1:43 PM Frank Yang <yangp...@gmail.com> >>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hi Chia-Ping, >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks for the review and suggestions. >>>>>>>>>>>>> >>>>>>>>>>>>> Q0: Add how rack change and how it affects topic partition. >>>>>>>>>>>>> >>>>>>>>>>>>> Q1: Add why we need a balance algorithm to Motivation section. >>>>>>>>>>>>> >>>>>>>>>>>>> Q2: After checking again, we don’t need to update cache when >>>>>> we replay >>>>>>>>>>>>> records. We only need to renew it in consumer heartbeat. >>>>>>>>>>>>> >>>>>>>>>>>>> Q3: Add a new section “Topic Hash Function”. >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks. >>>>>>>>>>>>> PoAn >>>>>>>>>>>>> >>>>>>>>>>>>>> On Nov 1, 2024, at 4:39 PM, Chia-Ping Tsai < >>>>>> chia7...@gmail.com> >>>>>>>>>> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>> hi PoAn >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks for for this KIP! >>>>>>>>>>>>>> >>>>>>>>>>>>>> Q0: Could you add more details about `A topic partition has >>>>>> rack >>>>>>>>>> change`? >>>>>>>>>>>>>> IIRC, the "rack change" includes both follower and leader, >>>>>> right? >>>>>>>>>>>>>> >>>>>>>>>>>>>> Q1: Could you please add the 'concerns' we discussed to the >>>>>> Motivation >>>>>>>>>>>>>> section? This should include topics like 'computations' and >>>>>> 'space >>>>>>>>>>>>> usage'. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Q2: `The group coordinator can leverage it to add a new topic >>>>>>>>>> hash.`This >>>>>>>>>>>>>> description seems a bit off to me. Why do we need to update >>>>>> the cache >>>>>>>>>> at >>>>>>>>>>>>>> this phase? The cache is intended to prevent duplicate >>>>>> computations >>>>>>>>>>>>> caused >>>>>>>>>>>>>> by heartbeat requests that occur between two metadata change >>>>>> events. >>>>>>>>>>>>>> Therefore, we could even remove the changed topics from >>>>>> caches on a >>>>>>>>>>>>>> metadata change, as the first heartbeat request would update >>>>>> the >>>>>>>>>> caches >>>>>>>>>>>>> for >>>>>>>>>>>>>> all changed topics. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Q3: Could you please include a section about the choice of >>>>>> hash >>>>>>>>>>>>>> implementation? The hash implementation must be consistent >>>>>> across >>>>>>>>>>>>> different >>>>>>>>>>>>>> JDKs, so we use Murmur3 to generate the hash value. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Best, >>>>>>>>>>>>>> Chia-Ping >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> Frank Yang <yangp...@gmail.com> 於 2024年11月1日 週五 下午3:57寫道: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi all, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I would like to start a discussion thread on KIP-1101. >>>>>> Trigger >>>>>>>>>> rebalance >>>>>>>>>>>>>>> on rack topology changes. In this KIP, we aim to use less >>>>>> memory / >>>>>>>>>> disk >>>>>>>>>>>>>>> resources to detect rack changes in the new coordinator. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>> >>>>>> >>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1101%3A+Trigger+rebalance+on+rack+topology+changes >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Please take a look and feel free to share any thoughts. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thanks. >>>>>>>>>>>>>>> PoAn >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>> >>>> >>>> >>>