Hi PoAn, I was late to the discussion but I had some questions.
JK01. I foresee scenarios where the cache leaves stale topic hashes around when a group leaves. Should we maintain a counter for the number of groups subscribed to the topic (hash) to only keep active topics? JK02. Can you help me understand when we would actually be computing group-level hashes, is it on every heartbeat? Thanks, Jeff On Wed, Mar 12, 2025 at 11:33 PM PoAn Yang <yangp...@gmail.com> wrote: > Hi David, > > Yes, I will continue working on this. > > DJ02: As Kirk noted earlier, since the topic hash calculation is on server > side (not client-side), we can safely introduce Guava as a dependency. > > If there is no other discussion, we can start voting in > https://lists.apache.org/thread/j90htmphjk783p697jjfg1xt8thmy33p. We can > discuss implementation details in PR > https://github.com/apache/kafka/pull/17444. I will resolve merge > conflicts by March 14th. > > Thanks, > PoAn > > > On Mar 13, 2025, at 12:43 AM, David Jacot <david.ja...@gmail.com> wrote: > > > > Hi PoAn, > > > > As we are getting closer to releasing 4.0, I think that we can resume > > working on this one if you are still interested. Are you? I would like > > to have it in 4.1. > > > > Best, > > David > > > > On Wed, Jan 15, 2025 at 1:26 AM Kirk True <k...@kirktrue.pro> wrote: > >> > >> Hi all, > >> > >> Hopefully a quick question... > >> > >> KT01. Will clients calculate the topic hash on the client? Based on the > current state of the KIP and PR, I would have thought "no", but I ask based > on the discussion around the possible use of Guava on client. > >> > >> Thanks, > >> Kirk > >> > >> On Mon, Jan 6, 2025, at 9:11 AM, David Jacot wrote: > >>> Hi PoAn, > >>> > >>> Thanks for the update. I haven't read the updated KIP yet. > >>> > >>> DJ02: I am not sure about using Guava as a dependency. I mentioned it > more > >>> as an inspiration/reference. I suppose that we could use it on the > server > >>> but we should definitely not use it on the client. I am not sure how > others > >>> feel about it. > >>> > >>> Best, > >>> David > >>> > >>> On Mon, Jan 6, 2025 at 5:21 AM PoAn Yang <yangp...@gmail.com> wrote: > >>> > >>>> Hi Chia-Ping / David / Lucas, > >>>> > >>>> Happy new year and thanks for the review. > >>>> > >>>> DJ02: Thanks for the suggestion. I updated the PR to use Guava. > >>>> > >>>> DJ03: Yes, I updated the description to mention ISR change, > >>>> add altering partition reassignment case, and mention that > >>>> non-related topic change doesn’t trigger a rebalance. > >>>> DJ03.1: Yes, I will keep using ModernGroup#requestMetadataRefresh > >>>> to notify group. > >>>> > >>>> DJ06: Updated the PR to use Guava Hashing#combineUnordered > >>>> function to combine topic hash. > >>>> > >>>> DJ07: Renamed it to MetadataHash. > >>>> > >>>> DJ08: Added a sample hash function to the KIP and use first byte as > magic > >>>> byte. This is also included in latest PR. > >>>> > >>>> DJ09: Added two paragraphs about upgraded and downgraded. > >>>> > >>>> DJ10: According to Lucas’s comment, I add StreamsGroupMetadataValue > update > >>>> to this KIP. > >>>> > >>>> Thanks, > >>>> PoAn > >>>> > >>>> > >>>>> On Dec 20, 2024, at 3:58 PM, Chia-Ping Tsai <chia7...@gmail.com> > wrote: > >>>>> > >>>>>> because assignors are sticky. > >>>>> > >>>>> I forgot about that spec again :( > >>>>> > >>>>> > >>>>> > >>>>> > >>>>> David Jacot <david.ja...@gmail.com> 於 2024年12月20日 週五 下午3:41寫道: > >>>>> > >>>>>> Hi Chia-Ping, > >>>>>> > >>>>>> DJ08: In my opinion, changing the format will be rare so it is > >>>>>> acceptable if rebalances are triggered in this case on > >>>>>> upgrade/downgrade. It is also what will happen if a cluster is > >>>>>> downgraded from 4.1 (with this KIP) to 4.0. The rebalance won't > change > >>>>>> anything if the topology of the group is the same because assignors > >>>>>> are sticky. The default ones are and we recommend custom ones to > also > >>>>>> be. > >>>>>> > >>>>>> Best, > >>>>>> David > >>>>>> > >>>>>> On Fri, Dec 20, 2024 at 2:11 AM Chia-Ping Tsai <chia7...@apache.org > > > >>>>>> wrote: > >>>>>>> > >>>>>>> ummm, it does not work for downgrade as the old coordinator has no > idea > >>>>>> about new format :( > >>>>>>> > >>>>>>> > >>>>>>> On 2024/12/20 00:57:27 Chia-Ping Tsai wrote: > >>>>>>>> hi David > >>>>>>>> > >>>>>>>>> DJ08: > >>>>>>>> > >>>>>>>> That's a good question. If the "hash" lacks version control, it > could > >>>>>> trigger a series of unnecessary rebalances. However, adding > additional > >>>>>> information ("magic") to the hash does not help the upgraded > coordinator > >>>>>> determine the "version." This means that the upgraded coordinator > would > >>>>>> still trigger unnecessary rebalances because it has no way to know > which > >>>>>> format to use when comparing the hash. > >>>>>>>> > >>>>>>>> Perhaps we can add a new field to ConsumerGroupMetadataValue to > >>>>>> indicate the version of the "hash." This would allow the > coordinator, > >>>> when > >>>>>> handling subscription metadata, to compute the old hash and > determine > >>>>>> whether an epoch bump is necessary. Additionally, the coordinator > can > >>>>>> generate a new record to upgrade the hash without requiring an epoch > >>>> bump. > >>>>>>>> > >>>>>>>> Another issue is whether the coordinator should cache all > versions of > >>>>>> the hash. I believe this is necessary; otherwise, during an upgrade, > >>>> there > >>>>>> would be extensive recomputing of old hashes. > >>>>>>>> > >>>>>>>> I believe this idea should also work for downgrades, and that's > just > >>>>>> my two cents. > >>>>>>>> > >>>>>>>> Best, > >>>>>>>> Chia-Ping > >>>>>>>> > >>>>>>>> > >>>>>>>> On 2024/12/19 14:39:41 David Jacot wrote: > >>>>>>>>> Hi PoAn and Chia-Ping, > >>>>>>>>> > >>>>>>>>> Thanks for your responses. > >>>>>>>>> > >>>>>>>>> DJ02: Sorry, I was not clear. I was wondering whether we could > >>>>>> compute the > >>>>>>>>> hash without having to convert to bytes before. Guava has a nice > >>>>>> interface > >>>>>>>>> for this allowing to incrementally add primitive types to the > hash. > >>>>>> We can > >>>>>>>>> discuss this in the PR as it is an implementation detail. > >>>>>>>>> > >>>>>>>>> DJ03: Thanks. I don't think that the replicas are updated when a > >>>>>> broker > >>>>>>>>> shuts down. What you said applies to the ISR. I suppose that we > can > >>>>>> rely on > >>>>>>>>> the ISR changes to trigger updates. It is also worth noting > >>>>>>>>> that TopicsDelta#changedTopics is updated for every change (e.g. > ISR > >>>>>>>>> change, leader change, replicas change, etc.). I suppose that it > is > >>>>>> OK but > >>>>>>>>> it seems that it will trigger refreshes which are not necessary. > >>>>>> However, a > >>>>>>>>> rebalance won't be triggered because the hash won't change. > >>>>>>>>> DJ03.1: I suppose that we will continue to rely on > >>>>>>>>> ModernGroup#requestMetadataRefresh to notify groups that must > >>>>>> refresh their > >>>>>>>>> hashes. Is my understanding correct? > >>>>>>>>> > >>>>>>>>> DJ05: Fair enough. > >>>>>>>>> > >>>>>>>>> DJ06: You mention in two places that you would like to combine > >>>>>> hashes by > >>>>>>>>> additioning them. I wonder if this is a good practice. > Intuitively, > >>>>>> I would > >>>>>>>>> have used XOR or hashed the hashed. Guava has a method for > combining > >>>>>>>>> hashes. It may be worth looking into the algorithm used. > >>>>>>>>> > >>>>>>>>> DJ07: I would rename "AllTopicHash" to "MetadataHash" in order > to be > >>>>>> more > >>>>>>>>> generic. > >>>>>>>>> > >>>>>>>>> DJ08: Regarding the per topic hash, I wonder whether we should > >>>>>> precise in > >>>>>>>>> the KIP how we will compute it. I had the following in mind: > >>>>>>>>> hash(topicName; numPartitions; [partitionId;sorted racks]). We > could > >>>>>> also > >>>>>>>>> add a magic byte at the first element as a sort of version. I am > not > >>>>>> sure > >>>>>>>>> whether it is needed though. I was thinking about this while > >>>>>> imagining how > >>>>>>>>> we would handle changing the format in the future. > >>>>>>>>> > >>>>>>>>> DJ09: It would be great if we could provide more details about > >>>>>> backward > >>>>>>>>> compatibility. What happens when the cluster is upgraded or > >>>>>> downgraded? > >>>>>>>>> > >>>>>>>>> DJ10: We should update KIP-1071. It may be worth pigging them in > the > >>>>>>>>> discussion thread of KIP-1071. > >>>>>>>>> > >>>>>>>>> Best, > >>>>>>>>> David > >>>>>>>>> > >>>>>>>>> On Tue, Dec 17, 2024 at 9:25 AM PoAn Yang <yangp...@gmail.com> > >>>>>> wrote: > >>>>>>>>> > >>>>>>>>>> Hi Chia-Ping / David / Andrew, > >>>>>>>>>> > >>>>>>>>>> Thanks for the review and suggestions. > >>>>>>>>>> > >>>>>>>>>> DJ01: Removed all implementation details. > >>>>>>>>>> > >>>>>>>>>> DJ02: Does the “incrementally” mean that we only calculate the > >>>>>> difference > >>>>>>>>>> parts? > >>>>>>>>>> For example, if the number of partition change, we only > calculate > >>>>>> the hash > >>>>>>>>>> of number of partition and reconstruct it to the topic hash. > >>>>>>>>>> IMO, we only calculate topic hash one time. With cache > mechanism, > >>>>>> the > >>>>>>>>>> value can be reused in different groups on a same broker. > >>>>>>>>>> The CPU usage for this part is not very high. > >>>>>>>>>> > >>>>>>>>>> DJ03: Added the update path to KIP for both cases. > >>>>>>>>>> > >>>>>>>>>> DJ04: Yes, it’s a good idea. With cache mechanism and single > hash > >>>>>> per > >>>>>>>>>> group, we can balance cpu and disk usage. > >>>>>>>>>> > >>>>>>>>>> DJ05: Currently, the topic hash is only used in coordinator. > >>>>>> However, the > >>>>>>>>>> metadata image is used in many different places. > >>>>>>>>>> How about we move the hash to metadata image when we find more > use > >>>>>> cases? > >>>>>>>>>> > >>>>>>>>>> AS1, AS2: Thanks for the reminder. I will simply delete > >>>>>>>>>> ShareGroupPartitionMetadataKey/Value and add a new field to > >>>>>>>>>> ShareGroupMetadataValue. > >>>>>>>>>> > >>>>>>>>>> Best, > >>>>>>>>>> PoAn > >>>>>>>>>> > >>>>>>>>>>> On Dec 17, 2024, at 5:50 AM, Andrew Schofield < > >>>>>>>>>> andrew_schofield_j...@outlook.com> wrote: > >>>>>>>>>>> > >>>>>>>>>>> Hi PoAn, > >>>>>>>>>>> Thanks for the KIP. > >>>>>>>>>>> > >>>>>>>>>>> AS1: From the point of view of share groups, the API and record > >>>>>> schema > >>>>>>>>>>> definitions are unstable in AK 4.0. In AK 4.1, we will start > >>>>>> supporting > >>>>>>>>>> proper > >>>>>>>>>>> versioning. As a result, I think you do not need to deprecate > >>>>>> the fields > >>>>>>>>>> in the > >>>>>>>>>>> ShareGroupPartitionMetadataValue. Just include the schema for > >>>>>> the fields > >>>>>>>>>>> which are actually needed, and I'll update the schema in the > >>>>>> code when > >>>>>>>>>>> the KIP is implemented. > >>>>>>>>>>> > >>>>>>>>>>> AS2: In the event that DJ04 actually removes the need for > >>>>>>>>>>> ConsumerGroupPartitionMetadataKey/Value entirely, I would > simply > >>>>>>>>>>> delete ShareGroupPartitionMetadataKey/Value, assuming that it > is > >>>>>>>>>>> accepted in time for AK 4.1. > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> Thanks, > >>>>>>>>>>> Andrew > >>>>>>>>>>> ________________________________________ > >>>>>>>>>>> From: Chia-Ping Tsai <chia7...@apache.org> > >>>>>>>>>>> Sent: 16 December 2024 16:27 > >>>>>>>>>>> To: dev@kafka.apache.org <dev@kafka.apache.org> > >>>>>>>>>>> Subject: Re: [DISCUSS] KIP-1101: Trigger rebalance on rack > >>>>>> topology > >>>>>>>>>> changes > >>>>>>>>>>> > >>>>>>>>>>> hi David > >>>>>>>>>>> > >>>>>>>>>>>> DJ05 > >>>>>>>>>>> > >>>>>>>>>>> One of the benefits of having a single hash per group (DJ04) is > >>>>>> the > >>>>>>>>>> reduction in the size of stored data. Additionally, the cost of > >>>>>>>>>> re-computing can be minimized thanks to caching. So I'm + 1 to > >>>>>> DJ04. > >>>>>>>>>> However, the advantage of storing the topic cache in the > metadata > >>>>>> image is > >>>>>>>>>> somewhat unclear to me. Could you please provide more details on > >>>>>> what you > >>>>>>>>>> mean by "tight"?Furthermore, since the metadata image is a > >>>>>> thread-safe > >>>>>>>>>> object, we need to ensure that the lazy initialization is also > >>>>>> thread-safe. > >>>>>>>>>> If no other components require the cache, it would be better to > >>>>>> keep the > >>>>>>>>>> caches within the coordinator. > >>>>>>>>>>> > >>>>>>>>>>> Best, > >>>>>>>>>>> Chia-Ping > >>>>>>>>>>> > >>>>>>>>>>> On 2024/12/16 14:01:35 David Jacot wrote: > >>>>>>>>>>>> Hi PoAn, > >>>>>>>>>>>> > >>>>>>>>>>>> Thanks for the KIP. I have some comments about it. > >>>>>>>>>>>> > >>>>>>>>>>>> DJ01: Please, remove all the code from the KIP. We only care > >>>>>> about > >>>>>>>>>> public > >>>>>>>>>>>> interface changes, not about implementation details. > >>>>>>>>>>>> DJ02: Regarding the hash computation, I agree that we should > use > >>>>>>>>>> Murmur3. > >>>>>>>>>>>> However, I don't quite like the implementation that you > shared. > >>>>>> I > >>>>>>>>>> wonder if > >>>>>>>>>>>> we could make it work incrementally instead of computing a > hash > >>>>>> of > >>>>>>>>>>>> everything and combining them. > >>>>>>>>>>>> DJ03: Regarding the cache, my understanding is that the cache > is > >>>>>>>>>> populated > >>>>>>>>>>>> when a topic without hash is seen in a HB request and the > cache > >>>>>> is > >>>>>>>>>> cleaned > >>>>>>>>>>>> up when topics are deleted based on the metadata image. > >>>>>> However, the > >>>>>>>>>> update > >>>>>>>>>>>> path is not clear. Let's say that a partition is added to a > >>>>>> topic, how > >>>>>>>>>> does > >>>>>>>>>>>> it detect it? Let's also imagine that the racks of a partition > >>>>>> have > >>>>>>>>>>>> changed, how does it detect it? In the KIP, it would be nice > to > >>>>>> be clear > >>>>>>>>>>>> about those. > >>>>>>>>>>>> DJ04: I wonder whether we should go with a single hash per > >>>>>> group. Your > >>>>>>>>>>>> argument against it is that it would require to re-compute the > >>>>>> hash of > >>>>>>>>>> all > >>>>>>>>>>>> the topics when it needs to be computed. In my opinion, we > could > >>>>>>>>>> leverage > >>>>>>>>>>>> the cached hash per topic to compute the hash of all the > >>>>>> subscribed > >>>>>>>>>> ones. > >>>>>>>>>>>> We could basically combine all the hashes without having to > >>>>>> compute all > >>>>>>>>>> of > >>>>>>>>>>>> them. This approach has a few benefits. 1) We could get rid of > >>>>>>>>>>>> the ConsumerGroupPartitionMetadata record as we could store > the > >>>>>> hash > >>>>>>>>>> with > >>>>>>>>>>>> the group epoch. 2) We could get rid of the Map that we keep > in > >>>>>> each > >>>>>>>>>> group > >>>>>>>>>>>> to store the hashed corresponding to the subscribed topics. > >>>>>>>>>>>> DJ05: Regarding the cache again, I wonder if we should > actually > >>>>>> store > >>>>>>>>>> the > >>>>>>>>>>>> hash in the metadata image instead of maintaining it somewhere > >>>>>> else. We > >>>>>>>>>>>> could still lazily compute it. The benefit is that the value > >>>>>> would be > >>>>>>>>>> tight > >>>>>>>>>>>> to the topic. I have not really looked into it. Would it be an > >>>>>> option? > >>>>>>>>>>>> > >>>>>>>>>>>> I'll be away for two weeks starting from Saturday. I kindly > ask > >>>>>> you to > >>>>>>>>>> wait > >>>>>>>>>>>> on me if we cannot conclude this week. > >>>>>>>>>>>> > >>>>>>>>>>>> Best, > >>>>>>>>>>>> David > >>>>>>>>>>>> > >>>>>>>>>>>> On Tue, Nov 5, 2024 at 1:43 PM Frank Yang <yangp...@gmail.com > > > >>>>>> wrote: > >>>>>>>>>>>> > >>>>>>>>>>>>> Hi Chia-Ping, > >>>>>>>>>>>>> > >>>>>>>>>>>>> Thanks for the review and suggestions. > >>>>>>>>>>>>> > >>>>>>>>>>>>> Q0: Add how rack change and how it affects topic partition. > >>>>>>>>>>>>> > >>>>>>>>>>>>> Q1: Add why we need a balance algorithm to Motivation > section. > >>>>>>>>>>>>> > >>>>>>>>>>>>> Q2: After checking again, we don’t need to update cache when > >>>>>> we replay > >>>>>>>>>>>>> records. We only need to renew it in consumer heartbeat. > >>>>>>>>>>>>> > >>>>>>>>>>>>> Q3: Add a new section “Topic Hash Function”. > >>>>>>>>>>>>> > >>>>>>>>>>>>> Thanks. > >>>>>>>>>>>>> PoAn > >>>>>>>>>>>>> > >>>>>>>>>>>>>> On Nov 1, 2024, at 4:39 PM, Chia-Ping Tsai < > >>>>>> chia7...@gmail.com> > >>>>>>>>>> wrote: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> hi PoAn > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Thanks for for this KIP! > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Q0: Could you add more details about `A topic partition has > >>>>>> rack > >>>>>>>>>> change`? > >>>>>>>>>>>>>> IIRC, the "rack change" includes both follower and leader, > >>>>>> right? > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Q1: Could you please add the 'concerns' we discussed to the > >>>>>> Motivation > >>>>>>>>>>>>>> section? This should include topics like 'computations' and > >>>>>> 'space > >>>>>>>>>>>>> usage'. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Q2: `The group coordinator can leverage it to add a new > topic > >>>>>>>>>> hash.`This > >>>>>>>>>>>>>> description seems a bit off to me. Why do we need to update > >>>>>> the cache > >>>>>>>>>> at > >>>>>>>>>>>>>> this phase? The cache is intended to prevent duplicate > >>>>>> computations > >>>>>>>>>>>>> caused > >>>>>>>>>>>>>> by heartbeat requests that occur between two metadata change > >>>>>> events. > >>>>>>>>>>>>>> Therefore, we could even remove the changed topics from > >>>>>> caches on a > >>>>>>>>>>>>>> metadata change, as the first heartbeat request would update > >>>>>> the > >>>>>>>>>> caches > >>>>>>>>>>>>> for > >>>>>>>>>>>>>> all changed topics. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Q3: Could you please include a section about the choice of > >>>>>> hash > >>>>>>>>>>>>>> implementation? The hash implementation must be consistent > >>>>>> across > >>>>>>>>>>>>> different > >>>>>>>>>>>>>> JDKs, so we use Murmur3 to generate the hash value. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Best, > >>>>>>>>>>>>>> Chia-Ping > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Frank Yang <yangp...@gmail.com> 於 2024年11月1日 週五 下午3:57寫道: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Hi all, > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> I would like to start a discussion thread on KIP-1101. > >>>>>> Trigger > >>>>>>>>>> rebalance > >>>>>>>>>>>>>>> on rack topology changes. In this KIP, we aim to use less > >>>>>> memory / > >>>>>>>>>> disk > >>>>>>>>>>>>>>> resources to detect rack changes in the new coordinator. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>> > >>>>>> > >>>> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1101%3A+Trigger+rebalance+on+rack+topology+changes > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Please take a look and feel free to share any thoughts. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Thanks. > >>>>>>>>>>>>>>> PoAn > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>> > >>>> > >>>> > >>> > >