Hi PoAn, As we are getting closer to releasing 4.0, I think that we can resume working on this one if you are still interested. Are you? I would like to have it in 4.1.
Best, David On Wed, Jan 15, 2025 at 1:26 AM Kirk True <k...@kirktrue.pro> wrote: > > Hi all, > > Hopefully a quick question... > > KT01. Will clients calculate the topic hash on the client? Based on the > current state of the KIP and PR, I would have thought "no", but I ask based > on the discussion around the possible use of Guava on client. > > Thanks, > Kirk > > On Mon, Jan 6, 2025, at 9:11 AM, David Jacot wrote: > > Hi PoAn, > > > > Thanks for the update. I haven't read the updated KIP yet. > > > > DJ02: I am not sure about using Guava as a dependency. I mentioned it more > > as an inspiration/reference. I suppose that we could use it on the server > > but we should definitely not use it on the client. I am not sure how others > > feel about it. > > > > Best, > > David > > > > On Mon, Jan 6, 2025 at 5:21 AM PoAn Yang <yangp...@gmail.com> wrote: > > > > > Hi Chia-Ping / David / Lucas, > > > > > > Happy new year and thanks for the review. > > > > > > DJ02: Thanks for the suggestion. I updated the PR to use Guava. > > > > > > DJ03: Yes, I updated the description to mention ISR change, > > > add altering partition reassignment case, and mention that > > > non-related topic change doesn’t trigger a rebalance. > > > DJ03.1: Yes, I will keep using ModernGroup#requestMetadataRefresh > > > to notify group. > > > > > > DJ06: Updated the PR to use Guava Hashing#combineUnordered > > > function to combine topic hash. > > > > > > DJ07: Renamed it to MetadataHash. > > > > > > DJ08: Added a sample hash function to the KIP and use first byte as magic > > > byte. This is also included in latest PR. > > > > > > DJ09: Added two paragraphs about upgraded and downgraded. > > > > > > DJ10: According to Lucas’s comment, I add StreamsGroupMetadataValue update > > > to this KIP. > > > > > > Thanks, > > > PoAn > > > > > > > > > > On Dec 20, 2024, at 3:58 PM, Chia-Ping Tsai <chia7...@gmail.com> wrote: > > > > > > > >> because assignors are sticky. > > > > > > > > I forgot about that spec again :( > > > > > > > > > > > > > > > > > > > > David Jacot <david.ja...@gmail.com> 於 2024年12月20日 週五 下午3:41寫道: > > > > > > > >> Hi Chia-Ping, > > > >> > > > >> DJ08: In my opinion, changing the format will be rare so it is > > > >> acceptable if rebalances are triggered in this case on > > > >> upgrade/downgrade. It is also what will happen if a cluster is > > > >> downgraded from 4.1 (with this KIP) to 4.0. The rebalance won't change > > > >> anything if the topology of the group is the same because assignors > > > >> are sticky. The default ones are and we recommend custom ones to also > > > >> be. > > > >> > > > >> Best, > > > >> David > > > >> > > > >> On Fri, Dec 20, 2024 at 2:11 AM Chia-Ping Tsai <chia7...@apache.org> > > > >> wrote: > > > >>> > > > >>> ummm, it does not work for downgrade as the old coordinator has no > > > >>> idea > > > >> about new format :( > > > >>> > > > >>> > > > >>> On 2024/12/20 00:57:27 Chia-Ping Tsai wrote: > > > >>>> hi David > > > >>>> > > > >>>>> DJ08: > > > >>>> > > > >>>> That's a good question. If the "hash" lacks version control, it could > > > >> trigger a series of unnecessary rebalances. However, adding additional > > > >> information ("magic") to the hash does not help the upgraded > > > >> coordinator > > > >> determine the "version." This means that the upgraded coordinator would > > > >> still trigger unnecessary rebalances because it has no way to know > > > >> which > > > >> format to use when comparing the hash. > > > >>>> > > > >>>> Perhaps we can add a new field to ConsumerGroupMetadataValue to > > > >> indicate the version of the "hash." This would allow the coordinator, > > > when > > > >> handling subscription metadata, to compute the old hash and determine > > > >> whether an epoch bump is necessary. Additionally, the coordinator can > > > >> generate a new record to upgrade the hash without requiring an epoch > > > bump. > > > >>>> > > > >>>> Another issue is whether the coordinator should cache all versions of > > > >> the hash. I believe this is necessary; otherwise, during an upgrade, > > > there > > > >> would be extensive recomputing of old hashes. > > > >>>> > > > >>>> I believe this idea should also work for downgrades, and that's just > > > >> my two cents. > > > >>>> > > > >>>> Best, > > > >>>> Chia-Ping > > > >>>> > > > >>>> > > > >>>> On 2024/12/19 14:39:41 David Jacot wrote: > > > >>>>> Hi PoAn and Chia-Ping, > > > >>>>> > > > >>>>> Thanks for your responses. > > > >>>>> > > > >>>>> DJ02: Sorry, I was not clear. I was wondering whether we could > > > >> compute the > > > >>>>> hash without having to convert to bytes before. Guava has a nice > > > >> interface > > > >>>>> for this allowing to incrementally add primitive types to the hash. > > > >> We can > > > >>>>> discuss this in the PR as it is an implementation detail. > > > >>>>> > > > >>>>> DJ03: Thanks. I don't think that the replicas are updated when a > > > >> broker > > > >>>>> shuts down. What you said applies to the ISR. I suppose that we can > > > >> rely on > > > >>>>> the ISR changes to trigger updates. It is also worth noting > > > >>>>> that TopicsDelta#changedTopics is updated for every change (e.g. ISR > > > >>>>> change, leader change, replicas change, etc.). I suppose that it is > > > >> OK but > > > >>>>> it seems that it will trigger refreshes which are not necessary. > > > >> However, a > > > >>>>> rebalance won't be triggered because the hash won't change. > > > >>>>> DJ03.1: I suppose that we will continue to rely on > > > >>>>> ModernGroup#requestMetadataRefresh to notify groups that must > > > >> refresh their > > > >>>>> hashes. Is my understanding correct? > > > >>>>> > > > >>>>> DJ05: Fair enough. > > > >>>>> > > > >>>>> DJ06: You mention in two places that you would like to combine > > > >> hashes by > > > >>>>> additioning them. I wonder if this is a good practice. Intuitively, > > > >> I would > > > >>>>> have used XOR or hashed the hashed. Guava has a method for combining > > > >>>>> hashes. It may be worth looking into the algorithm used. > > > >>>>> > > > >>>>> DJ07: I would rename "AllTopicHash" to "MetadataHash" in order to be > > > >> more > > > >>>>> generic. > > > >>>>> > > > >>>>> DJ08: Regarding the per topic hash, I wonder whether we should > > > >> precise in > > > >>>>> the KIP how we will compute it. I had the following in mind: > > > >>>>> hash(topicName; numPartitions; [partitionId;sorted racks]). We could > > > >> also > > > >>>>> add a magic byte at the first element as a sort of version. I am not > > > >> sure > > > >>>>> whether it is needed though. I was thinking about this while > > > >> imagining how > > > >>>>> we would handle changing the format in the future. > > > >>>>> > > > >>>>> DJ09: It would be great if we could provide more details about > > > >> backward > > > >>>>> compatibility. What happens when the cluster is upgraded or > > > >> downgraded? > > > >>>>> > > > >>>>> DJ10: We should update KIP-1071. It may be worth pigging them in the > > > >>>>> discussion thread of KIP-1071. > > > >>>>> > > > >>>>> Best, > > > >>>>> David > > > >>>>> > > > >>>>> On Tue, Dec 17, 2024 at 9:25 AM PoAn Yang <yangp...@gmail.com> > > > >> wrote: > > > >>>>> > > > >>>>>> Hi Chia-Ping / David / Andrew, > > > >>>>>> > > > >>>>>> Thanks for the review and suggestions. > > > >>>>>> > > > >>>>>> DJ01: Removed all implementation details. > > > >>>>>> > > > >>>>>> DJ02: Does the “incrementally” mean that we only calculate the > > > >> difference > > > >>>>>> parts? > > > >>>>>> For example, if the number of partition change, we only calculate > > > >> the hash > > > >>>>>> of number of partition and reconstruct it to the topic hash. > > > >>>>>> IMO, we only calculate topic hash one time. With cache mechanism, > > > >> the > > > >>>>>> value can be reused in different groups on a same broker. > > > >>>>>> The CPU usage for this part is not very high. > > > >>>>>> > > > >>>>>> DJ03: Added the update path to KIP for both cases. > > > >>>>>> > > > >>>>>> DJ04: Yes, it’s a good idea. With cache mechanism and single hash > > > >> per > > > >>>>>> group, we can balance cpu and disk usage. > > > >>>>>> > > > >>>>>> DJ05: Currently, the topic hash is only used in coordinator. > > > >> However, the > > > >>>>>> metadata image is used in many different places. > > > >>>>>> How about we move the hash to metadata image when we find more use > > > >> cases? > > > >>>>>> > > > >>>>>> AS1, AS2: Thanks for the reminder. I will simply delete > > > >>>>>> ShareGroupPartitionMetadataKey/Value and add a new field to > > > >>>>>> ShareGroupMetadataValue. > > > >>>>>> > > > >>>>>> Best, > > > >>>>>> PoAn > > > >>>>>> > > > >>>>>>> On Dec 17, 2024, at 5:50 AM, Andrew Schofield < > > > >>>>>> andrew_schofield_j...@outlook.com> wrote: > > > >>>>>>> > > > >>>>>>> Hi PoAn, > > > >>>>>>> Thanks for the KIP. > > > >>>>>>> > > > >>>>>>> AS1: From the point of view of share groups, the API and record > > > >> schema > > > >>>>>>> definitions are unstable in AK 4.0. In AK 4.1, we will start > > > >> supporting > > > >>>>>> proper > > > >>>>>>> versioning. As a result, I think you do not need to deprecate > > > >> the fields > > > >>>>>> in the > > > >>>>>>> ShareGroupPartitionMetadataValue. Just include the schema for > > > >> the fields > > > >>>>>>> which are actually needed, and I'll update the schema in the > > > >> code when > > > >>>>>>> the KIP is implemented. > > > >>>>>>> > > > >>>>>>> AS2: In the event that DJ04 actually removes the need for > > > >>>>>>> ConsumerGroupPartitionMetadataKey/Value entirely, I would simply > > > >>>>>>> delete ShareGroupPartitionMetadataKey/Value, assuming that it is > > > >>>>>>> accepted in time for AK 4.1. > > > >>>>>>> > > > >>>>>>> > > > >>>>>>> Thanks, > > > >>>>>>> Andrew > > > >>>>>>> ________________________________________ > > > >>>>>>> From: Chia-Ping Tsai <chia7...@apache.org> > > > >>>>>>> Sent: 16 December 2024 16:27 > > > >>>>>>> To: dev@kafka.apache.org <dev@kafka.apache.org> > > > >>>>>>> Subject: Re: [DISCUSS] KIP-1101: Trigger rebalance on rack > > > >> topology > > > >>>>>> changes > > > >>>>>>> > > > >>>>>>> hi David > > > >>>>>>> > > > >>>>>>>> DJ05 > > > >>>>>>> > > > >>>>>>> One of the benefits of having a single hash per group (DJ04) is > > > >> the > > > >>>>>> reduction in the size of stored data. Additionally, the cost of > > > >>>>>> re-computing can be minimized thanks to caching. So I'm + 1 to > > > >> DJ04. > > > >>>>>> However, the advantage of storing the topic cache in the metadata > > > >> image is > > > >>>>>> somewhat unclear to me. Could you please provide more details on > > > >> what you > > > >>>>>> mean by "tight"?Furthermore, since the metadata image is a > > > >> thread-safe > > > >>>>>> object, we need to ensure that the lazy initialization is also > > > >> thread-safe. > > > >>>>>> If no other components require the cache, it would be better to > > > >> keep the > > > >>>>>> caches within the coordinator. > > > >>>>>>> > > > >>>>>>> Best, > > > >>>>>>> Chia-Ping > > > >>>>>>> > > > >>>>>>> On 2024/12/16 14:01:35 David Jacot wrote: > > > >>>>>>>> Hi PoAn, > > > >>>>>>>> > > > >>>>>>>> Thanks for the KIP. I have some comments about it. > > > >>>>>>>> > > > >>>>>>>> DJ01: Please, remove all the code from the KIP. We only care > > > >> about > > > >>>>>> public > > > >>>>>>>> interface changes, not about implementation details. > > > >>>>>>>> DJ02: Regarding the hash computation, I agree that we should use > > > >>>>>> Murmur3. > > > >>>>>>>> However, I don't quite like the implementation that you shared. > > > >> I > > > >>>>>> wonder if > > > >>>>>>>> we could make it work incrementally instead of computing a hash > > > >> of > > > >>>>>>>> everything and combining them. > > > >>>>>>>> DJ03: Regarding the cache, my understanding is that the cache is > > > >>>>>> populated > > > >>>>>>>> when a topic without hash is seen in a HB request and the cache > > > >> is > > > >>>>>> cleaned > > > >>>>>>>> up when topics are deleted based on the metadata image. > > > >> However, the > > > >>>>>> update > > > >>>>>>>> path is not clear. Let's say that a partition is added to a > > > >> topic, how > > > >>>>>> does > > > >>>>>>>> it detect it? Let's also imagine that the racks of a partition > > > >> have > > > >>>>>>>> changed, how does it detect it? In the KIP, it would be nice to > > > >> be clear > > > >>>>>>>> about those. > > > >>>>>>>> DJ04: I wonder whether we should go with a single hash per > > > >> group. Your > > > >>>>>>>> argument against it is that it would require to re-compute the > > > >> hash of > > > >>>>>> all > > > >>>>>>>> the topics when it needs to be computed. In my opinion, we could > > > >>>>>> leverage > > > >>>>>>>> the cached hash per topic to compute the hash of all the > > > >> subscribed > > > >>>>>> ones. > > > >>>>>>>> We could basically combine all the hashes without having to > > > >> compute all > > > >>>>>> of > > > >>>>>>>> them. This approach has a few benefits. 1) We could get rid of > > > >>>>>>>> the ConsumerGroupPartitionMetadata record as we could store the > > > >> hash > > > >>>>>> with > > > >>>>>>>> the group epoch. 2) We could get rid of the Map that we keep in > > > >> each > > > >>>>>> group > > > >>>>>>>> to store the hashed corresponding to the subscribed topics. > > > >>>>>>>> DJ05: Regarding the cache again, I wonder if we should actually > > > >> store > > > >>>>>> the > > > >>>>>>>> hash in the metadata image instead of maintaining it somewhere > > > >> else. We > > > >>>>>>>> could still lazily compute it. The benefit is that the value > > > >> would be > > > >>>>>> tight > > > >>>>>>>> to the topic. I have not really looked into it. Would it be an > > > >> option? > > > >>>>>>>> > > > >>>>>>>> I'll be away for two weeks starting from Saturday. I kindly ask > > > >> you to > > > >>>>>> wait > > > >>>>>>>> on me if we cannot conclude this week. > > > >>>>>>>> > > > >>>>>>>> Best, > > > >>>>>>>> David > > > >>>>>>>> > > > >>>>>>>> On Tue, Nov 5, 2024 at 1:43 PM Frank Yang <yangp...@gmail.com> > > > >> wrote: > > > >>>>>>>> > > > >>>>>>>>> Hi Chia-Ping, > > > >>>>>>>>> > > > >>>>>>>>> Thanks for the review and suggestions. > > > >>>>>>>>> > > > >>>>>>>>> Q0: Add how rack change and how it affects topic partition. > > > >>>>>>>>> > > > >>>>>>>>> Q1: Add why we need a balance algorithm to Motivation section. > > > >>>>>>>>> > > > >>>>>>>>> Q2: After checking again, we don’t need to update cache when > > > >> we replay > > > >>>>>>>>> records. We only need to renew it in consumer heartbeat. > > > >>>>>>>>> > > > >>>>>>>>> Q3: Add a new section “Topic Hash Function”. > > > >>>>>>>>> > > > >>>>>>>>> Thanks. > > > >>>>>>>>> PoAn > > > >>>>>>>>> > > > >>>>>>>>>> On Nov 1, 2024, at 4:39 PM, Chia-Ping Tsai < > > > >> chia7...@gmail.com> > > > >>>>>> wrote: > > > >>>>>>>>>> > > > >>>>>>>>>> hi PoAn > > > >>>>>>>>>> > > > >>>>>>>>>> Thanks for for this KIP! > > > >>>>>>>>>> > > > >>>>>>>>>> Q0: Could you add more details about `A topic partition has > > > >> rack > > > >>>>>> change`? > > > >>>>>>>>>> IIRC, the "rack change" includes both follower and leader, > > > >> right? > > > >>>>>>>>>> > > > >>>>>>>>>> Q1: Could you please add the 'concerns' we discussed to the > > > >> Motivation > > > >>>>>>>>>> section? This should include topics like 'computations' and > > > >> 'space > > > >>>>>>>>> usage'. > > > >>>>>>>>>> > > > >>>>>>>>>> Q2: `The group coordinator can leverage it to add a new topic > > > >>>>>> hash.`This > > > >>>>>>>>>> description seems a bit off to me. Why do we need to update > > > >> the cache > > > >>>>>> at > > > >>>>>>>>>> this phase? The cache is intended to prevent duplicate > > > >> computations > > > >>>>>>>>> caused > > > >>>>>>>>>> by heartbeat requests that occur between two metadata change > > > >> events. > > > >>>>>>>>>> Therefore, we could even remove the changed topics from > > > >> caches on a > > > >>>>>>>>>> metadata change, as the first heartbeat request would update > > > >> the > > > >>>>>> caches > > > >>>>>>>>> for > > > >>>>>>>>>> all changed topics. > > > >>>>>>>>>> > > > >>>>>>>>>> Q3: Could you please include a section about the choice of > > > >> hash > > > >>>>>>>>>> implementation? The hash implementation must be consistent > > > >> across > > > >>>>>>>>> different > > > >>>>>>>>>> JDKs, so we use Murmur3 to generate the hash value. > > > >>>>>>>>>> > > > >>>>>>>>>> Best, > > > >>>>>>>>>> Chia-Ping > > > >>>>>>>>>> > > > >>>>>>>>>> > > > >>>>>>>>>> Frank Yang <yangp...@gmail.com> 於 2024年11月1日 週五 下午3:57寫道: > > > >>>>>>>>>> > > > >>>>>>>>>>> Hi all, > > > >>>>>>>>>>> > > > >>>>>>>>>>> I would like to start a discussion thread on KIP-1101. > > > >> Trigger > > > >>>>>> rebalance > > > >>>>>>>>>>> on rack topology changes. In this KIP, we aim to use less > > > >> memory / > > > >>>>>> disk > > > >>>>>>>>>>> resources to detect rack changes in the new coordinator. > > > >>>>>>>>>>> > > > >>>>>>>>>>> > > > >>>>>>>>>>> > > > >>>>>>>>> > > > >>>>>> > > > >> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1101%3A+Trigger+rebalance+on+rack+topology+changes > > > >>>>>>>>>>> > > > >>>>>>>>>>> Please take a look and feel free to share any thoughts. > > > >>>>>>>>>>> > > > >>>>>>>>>>> Thanks. > > > >>>>>>>>>>> PoAn > > > >>>>>>>>> > > > >>>>>>>>> > > > >>>>>>>> > > > >>>>>> > > > >>>>>> > > > >>>>> > > > >>>> > > > >> > > > > > > > >