ummm, it does not work for downgrade as the old coordinator has no idea about new format :(
On 2024/12/20 00:57:27 Chia-Ping Tsai wrote: > hi David > > > DJ08: > > That's a good question. If the "hash" lacks version control, it could trigger > a series of unnecessary rebalances. However, adding additional information > ("magic") to the hash does not help the upgraded coordinator determine the > "version." This means that the upgraded coordinator would still trigger > unnecessary rebalances because it has no way to know which format to use when > comparing the hash. > > Perhaps we can add a new field to ConsumerGroupMetadataValue to indicate the > version of the "hash." This would allow the coordinator, when handling > subscription metadata, to compute the old hash and determine whether an epoch > bump is necessary. Additionally, the coordinator can generate a new record to > upgrade the hash without requiring an epoch bump. > > Another issue is whether the coordinator should cache all versions of the > hash. I believe this is necessary; otherwise, during an upgrade, there would > be extensive recomputing of old hashes. > > I believe this idea should also work for downgrades, and that's just my two > cents. > > Best, > Chia-Ping > > > On 2024/12/19 14:39:41 David Jacot wrote: > > Hi PoAn and Chia-Ping, > > > > Thanks for your responses. > > > > DJ02: Sorry, I was not clear. I was wondering whether we could compute the > > hash without having to convert to bytes before. Guava has a nice interface > > for this allowing to incrementally add primitive types to the hash. We can > > discuss this in the PR as it is an implementation detail. > > > > DJ03: Thanks. I don't think that the replicas are updated when a broker > > shuts down. What you said applies to the ISR. I suppose that we can rely on > > the ISR changes to trigger updates. It is also worth noting > > that TopicsDelta#changedTopics is updated for every change (e.g. ISR > > change, leader change, replicas change, etc.). I suppose that it is OK but > > it seems that it will trigger refreshes which are not necessary. However, a > > rebalance won't be triggered because the hash won't change. > > DJ03.1: I suppose that we will continue to rely on > > ModernGroup#requestMetadataRefresh to notify groups that must refresh their > > hashes. Is my understanding correct? > > > > DJ05: Fair enough. > > > > DJ06: You mention in two places that you would like to combine hashes by > > additioning them. I wonder if this is a good practice. Intuitively, I would > > have used XOR or hashed the hashed. Guava has a method for combining > > hashes. It may be worth looking into the algorithm used. > > > > DJ07: I would rename "AllTopicHash" to "MetadataHash" in order to be more > > generic. > > > > DJ08: Regarding the per topic hash, I wonder whether we should precise in > > the KIP how we will compute it. I had the following in mind: > > hash(topicName; numPartitions; [partitionId;sorted racks]). We could also > > add a magic byte at the first element as a sort of version. I am not sure > > whether it is needed though. I was thinking about this while imagining how > > we would handle changing the format in the future. > > > > DJ09: It would be great if we could provide more details about backward > > compatibility. What happens when the cluster is upgraded or downgraded? > > > > DJ10: We should update KIP-1071. It may be worth pigging them in the > > discussion thread of KIP-1071. > > > > Best, > > David > > > > On Tue, Dec 17, 2024 at 9:25 AM PoAn Yang <yangp...@gmail.com> wrote: > > > > > Hi Chia-Ping / David / Andrew, > > > > > > Thanks for the review and suggestions. > > > > > > DJ01: Removed all implementation details. > > > > > > DJ02: Does the “incrementally” mean that we only calculate the difference > > > parts? > > > For example, if the number of partition change, we only calculate the hash > > > of number of partition and reconstruct it to the topic hash. > > > IMO, we only calculate topic hash one time. With cache mechanism, the > > > value can be reused in different groups on a same broker. > > > The CPU usage for this part is not very high. > > > > > > DJ03: Added the update path to KIP for both cases. > > > > > > DJ04: Yes, it’s a good idea. With cache mechanism and single hash per > > > group, we can balance cpu and disk usage. > > > > > > DJ05: Currently, the topic hash is only used in coordinator. However, the > > > metadata image is used in many different places. > > > How about we move the hash to metadata image when we find more use cases? > > > > > > AS1, AS2: Thanks for the reminder. I will simply delete > > > ShareGroupPartitionMetadataKey/Value and add a new field to > > > ShareGroupMetadataValue. > > > > > > Best, > > > PoAn > > > > > > > On Dec 17, 2024, at 5:50 AM, Andrew Schofield < > > > andrew_schofield_j...@outlook.com> wrote: > > > > > > > > Hi PoAn, > > > > Thanks for the KIP. > > > > > > > > AS1: From the point of view of share groups, the API and record schema > > > > definitions are unstable in AK 4.0. In AK 4.1, we will start supporting > > > proper > > > > versioning. As a result, I think you do not need to deprecate the fields > > > in the > > > > ShareGroupPartitionMetadataValue. Just include the schema for the fields > > > > which are actually needed, and I'll update the schema in the code when > > > > the KIP is implemented. > > > > > > > > AS2: In the event that DJ04 actually removes the need for > > > > ConsumerGroupPartitionMetadataKey/Value entirely, I would simply > > > > delete ShareGroupPartitionMetadataKey/Value, assuming that it is > > > > accepted in time for AK 4.1. > > > > > > > > > > > > Thanks, > > > > Andrew > > > > ________________________________________ > > > > From: Chia-Ping Tsai <chia7...@apache.org> > > > > Sent: 16 December 2024 16:27 > > > > To: dev@kafka.apache.org <dev@kafka.apache.org> > > > > Subject: Re: [DISCUSS] KIP-1101: Trigger rebalance on rack topology > > > changes > > > > > > > > hi David > > > > > > > >> DJ05 > > > > > > > > One of the benefits of having a single hash per group (DJ04) is the > > > reduction in the size of stored data. Additionally, the cost of > > > re-computing can be minimized thanks to caching. So I'm + 1 to DJ04. > > > However, the advantage of storing the topic cache in the metadata image is > > > somewhat unclear to me. Could you please provide more details on what you > > > mean by "tight"?Furthermore, since the metadata image is a thread-safe > > > object, we need to ensure that the lazy initialization is also > > > thread-safe. > > > If no other components require the cache, it would be better to keep the > > > caches within the coordinator. > > > > > > > > Best, > > > > Chia-Ping > > > > > > > > On 2024/12/16 14:01:35 David Jacot wrote: > > > >> Hi PoAn, > > > >> > > > >> Thanks for the KIP. I have some comments about it. > > > >> > > > >> DJ01: Please, remove all the code from the KIP. We only care about > > > public > > > >> interface changes, not about implementation details. > > > >> DJ02: Regarding the hash computation, I agree that we should use > > > Murmur3. > > > >> However, I don't quite like the implementation that you shared. I > > > wonder if > > > >> we could make it work incrementally instead of computing a hash of > > > >> everything and combining them. > > > >> DJ03: Regarding the cache, my understanding is that the cache is > > > populated > > > >> when a topic without hash is seen in a HB request and the cache is > > > cleaned > > > >> up when topics are deleted based on the metadata image. However, the > > > update > > > >> path is not clear. Let's say that a partition is added to a topic, how > > > does > > > >> it detect it? Let's also imagine that the racks of a partition have > > > >> changed, how does it detect it? In the KIP, it would be nice to be > > > >> clear > > > >> about those. > > > >> DJ04: I wonder whether we should go with a single hash per group. Your > > > >> argument against it is that it would require to re-compute the hash of > > > all > > > >> the topics when it needs to be computed. In my opinion, we could > > > leverage > > > >> the cached hash per topic to compute the hash of all the subscribed > > > ones. > > > >> We could basically combine all the hashes without having to compute all > > > of > > > >> them. This approach has a few benefits. 1) We could get rid of > > > >> the ConsumerGroupPartitionMetadata record as we could store the hash > > > with > > > >> the group epoch. 2) We could get rid of the Map that we keep in each > > > group > > > >> to store the hashed corresponding to the subscribed topics. > > > >> DJ05: Regarding the cache again, I wonder if we should actually store > > > the > > > >> hash in the metadata image instead of maintaining it somewhere else. We > > > >> could still lazily compute it. The benefit is that the value would be > > > tight > > > >> to the topic. I have not really looked into it. Would it be an option? > > > >> > > > >> I'll be away for two weeks starting from Saturday. I kindly ask you to > > > wait > > > >> on me if we cannot conclude this week. > > > >> > > > >> Best, > > > >> David > > > >> > > > >> On Tue, Nov 5, 2024 at 1:43 PM Frank Yang <yangp...@gmail.com> wrote: > > > >> > > > >>> Hi Chia-Ping, > > > >>> > > > >>> Thanks for the review and suggestions. > > > >>> > > > >>> Q0: Add how rack change and how it affects topic partition. > > > >>> > > > >>> Q1: Add why we need a balance algorithm to Motivation section. > > > >>> > > > >>> Q2: After checking again, we don’t need to update cache when we replay > > > >>> records. We only need to renew it in consumer heartbeat. > > > >>> > > > >>> Q3: Add a new section “Topic Hash Function”. > > > >>> > > > >>> Thanks. > > > >>> PoAn > > > >>> > > > >>>> On Nov 1, 2024, at 4:39 PM, Chia-Ping Tsai <chia7...@gmail.com> > > > wrote: > > > >>>> > > > >>>> hi PoAn > > > >>>> > > > >>>> Thanks for for this KIP! > > > >>>> > > > >>>> Q0: Could you add more details about `A topic partition has rack > > > change`? > > > >>>> IIRC, the "rack change" includes both follower and leader, right? > > > >>>> > > > >>>> Q1: Could you please add the 'concerns' we discussed to the > > > >>>> Motivation > > > >>>> section? This should include topics like 'computations' and 'space > > > >>> usage'. > > > >>>> > > > >>>> Q2: `The group coordinator can leverage it to add a new topic > > > hash.`This > > > >>>> description seems a bit off to me. Why do we need to update the cache > > > at > > > >>>> this phase? The cache is intended to prevent duplicate computations > > > >>> caused > > > >>>> by heartbeat requests that occur between two metadata change events. > > > >>>> Therefore, we could even remove the changed topics from caches on a > > > >>>> metadata change, as the first heartbeat request would update the > > > caches > > > >>> for > > > >>>> all changed topics. > > > >>>> > > > >>>> Q3: Could you please include a section about the choice of hash > > > >>>> implementation? The hash implementation must be consistent across > > > >>> different > > > >>>> JDKs, so we use Murmur3 to generate the hash value. > > > >>>> > > > >>>> Best, > > > >>>> Chia-Ping > > > >>>> > > > >>>> > > > >>>> Frank Yang <yangp...@gmail.com> 於 2024年11月1日 週五 下午3:57寫道: > > > >>>> > > > >>>>> Hi all, > > > >>>>> > > > >>>>> I would like to start a discussion thread on KIP-1101. Trigger > > > rebalance > > > >>>>> on rack topology changes. In this KIP, we aim to use less memory / > > > disk > > > >>>>> resources to detect rack changes in the new coordinator. > > > >>>>> > > > >>>>> > > > >>>>> > > > >>> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1101%3A+Trigger+rebalance+on+rack+topology+changes > > > >>>>> > > > >>>>> Please take a look and feel free to share any thoughts. > > > >>>>> > > > >>>>> Thanks. > > > >>>>> PoAn > > > >>> > > > >>> > > > >> > > > > > > > > >