Thank you for the clarification, Andrew. I’d initially made the changes to the KIP along the same lines after your suggestion. I think I just got a little ahead of my thinking without a setListener method; instead to create an interceptor like we do for consumer - the KIP did not reflect this idea. It was just me floating the idea.
I think the KIP currently reflects all your suggestions, please take a look when you can. It includes the explicit mark for removal of the subscribe methods, new setConsumerRebalanceListener method on Consumer. I’ve renamed the name ConsumerRebalanceAdapter - it’s not really just a view because we do allow write-like ops like commit/seek. Best, Aditya > On Apr 13, 2026, at 10:12, Andrew Schofield <[email protected]> wrote: > > Hi Aditya, > I think I might have confused you. I didn't write clearly enough so > apologies. I'll try again. > > Today, we have: > > public interface ConsumerRebalanceListener { > > void onPartitionsAssigned(Collection<TopicPartition> partitions); > > void onPartitionsRevoked(Collection<TopicPartition> partitions); > > default void onPartitionsLost(Collection<TopicPartition> partitions) { > onPartitionsRevoked(partitions); > } > } > > I think what you end up with is: > > public interface ConsumerRebalanceListener { > > void onPartitionsAssigned(Collection<TopicPartition> partitions); > > default void onPartitionsAssigned(Collection<TopicPartition> partitions, > ConsumerRebalanceAdapter consumerAdapter) { > onPartitionsAssigned(partitions); > } > > void onPartitionsRevoked(Collection<TopicPartition> partitions); > > default void onPartitionsRevoked(Collection<TopicPartition> partitions, > ConsumerRebalanceAdapter consumerAdapter) { > onPartitionsRevoked(partitions); > } > > default void onPartitionsLost(Collection<TopicPartition> partitions) { > onPartitionsRevoked(partitions); > } > > default void onPartitionsLost(Collection<TopicPartition> partitions, > ConsumerRebalanceAdapter consumerAdapter) { > onPartitionsRevoked(partitions, consumerAdapter); > } > } > > ConsumerRebalanceView is also fine for the cut-down interface to the > Consumer. I do think it would be best to start the name with > ConsumerRebalance... . > > And then the changes to Consumer are: > > * Add setConsumerRebalanceListener(ConsumerRebalanceListener listener) > * Deprecate all of the subscribe variants which take a > ConsumerRebalanceListener parameter > > > I do take the point about having a list of rebalance listeners like the > interceptor classes, but I wouldn't wrap that into the same KIP personally. > People tend to implement these things in lambdas. > > Thanks, > Andrew > >> On 2026/04/08 05:30:51 Aditya Kousik wrote: >> Hello Andrew, >> >> I dwelt on this a bit more. I think supporting both listeners until AK5.0 >> may not be as irksome as I initially feared. We already do this for >> classic/async consumers with ConsumerDelegate. >> >> I was curious why subscribe() alone took a client-constructed object like >> ConsumerRebalanceListener whereas all other hooks were instantiated by the >> client code via Configurable. We can support both but amply call out via >> documentation and a log info line about which one will be activated at >> runtime. This can be opt-in at launch and eventually made the de facto >> pattern in the next major release. >> >> This would also tie in nicely with subscribe only supporting topic >> collections + regex and move the callback within the client code like other >> interceptors already. >> >> I also wanted to call out that this could be an opportunity to support a >> list of ConsumerRebalanceInterceptor like the others. Currently in my code, >> i wrap the outer one as a CompositeRebalanceListener with >> List<ConsumerRebalanceListener> invoked serially. We already do this as I >> mentioned earlier with producer/consumerInterceptors handling exceptions >> within each call in a loop by logging a warn. >> >> So rebalance.interceptors with a LIST of fqdn classnames instantiated within >> the constructor is my current favourite approach. We support one API exactly >> for all rebalances indicating which one at runtime. >> >> Lmk your thoughts on this. >> >> Thanks, >> Aditya Kousik >> >>>> On Apr 5, 2026, at 12:37, Aditya Kousik <[email protected]> wrote: >>> >>> Hi Andrew, >>> >>> I see what you’re saying. >>> With AS1,2 the flow becomes clearer for the subscribe interaction: we only >>> change the subscription state for topics and leave rebalance events to a >>> separate mechanism uncoupled from the subscribe() call. >>> >>> To keep in line with other kafka client classes, can we follow the same >>> convention of using ConsumerConfig to handle this? A new >>> `ConsumerRebalanceInterceptor` with the same signature I’d proposed. >>> Instantiated with Utils#newConfiguredInstance and make the class >>> Configurable. Naming and instantiating makes it closer to existing >>> interceptor classes. >>> >>> My only worry is that as long as ConsumerRebalanceListener exists, this can >>> be a source of confusion for which interface to use for rebalance events. >>> Unless we deprecate it, we bear the burden of invoking both, even if we >>> state that only oneOf(ConsumerRebalanceListener, >>> ConsumberRebalanceInterceptor) will be invoked during rebalances. >>> >>> Would love to hear your thoughts on this. >>> >>> -Aditya >>> >>>> On Apr 5, 2026, at 09:39, Andrew Schofield <[email protected]> wrote: >>>> >>>> Hi Aditya, >>>> I agree that using the existing ConsumerRebalanceListener gives a lower >>>> adoption burden. >>>> >>>> AS1: To be more concrete with what I mean here, we could: >>>> * Deprecate Consumer.subscribe(Collection<String>, >>>> ConsumerRebalanceListener) for removal in AK 5.0 >>>> * Introduce >>>> Consumer.setConsumerRebalanceListener(ConsumerRebalanceListener) >>>> >>>> AS2: Given that we already have an interface called >>>> ConsumerRebalanceListener, I suggest that ConsumerRebalanceXXX would be a >>>> better naming choice for naming your new interface in terms of consistency. >>>> >>>> Thanks, >>>> Andrew >>>> >>>>>> On 2026/04/04 23:48:28 Aditya Kousik wrote: >>>>> Hi Andrew, thank you for the quick feedback. It turned out to be pivotal. >>>>> >>>>> One of the rejected alternatives was to Add default methods to >>>>> ConsumerRebalanceListener. >>>>> I was ambivalent on this approach with the hopes that a new method and >>>>> new interface would create the least friction. >>>>> >>>>> You’re right about the state change w.r.t subscribe() variants. With the >>>>> Classic consumer, we directly update SubscriptionType with >>>>> setSubscriptionType and similarly a more complex setup for the async >>>>> consumer. So your setRebalanceHander suggestion seems to follow existing >>>>> patterns. >>>>> >>>>> However, looking at the places I’d need to pipe RebalanceHandler through, >>>>> it’s going to add a burden to the plumbing and subscription state. >>>>> >>>>> I’m falling squarely in the extending the existing >>>>> ConsumerRebalanceListener with new default methods. This also allows >>>>> existing frameworks like Spring and SmallRye can directly hook into the >>>>> new method with minimal change. >>>>> >>>>> I’ve renamed/updated the KIP to reflect this. (I can see why people use >>>>> shareable URLs for the confluence docs) >>>>> >>>>> https://cwiki.apache.org/confluence/x/9ZU8G >>>>> >>>>> -Aditya >>>>> >>>>> >>>>>>> On Apr 2, 2026, at 05:50, Andrew Schofield <[email protected]> >>>>>>> wrote: >>>>>> >>>>>> Hi Aditya, >>>>>> Thanks for the KIP. I've only taken a quick look so far, but here's an >>>>>> initial comment. >>>>>> >>>>>> AS1: One of the mistakes in the Kafka consumer API today is that the >>>>>> `subscribe(Collection<String>, ConsumerRebalanceListener)` does two >>>>>> jobs. First, it replaces the rebalance listener (when you might assume >>>>>> that the listener applies only to rebalance changes resulting from this >>>>>> call to subscribe). Second, it changes the subscription. If the second >>>>>> of these throws an exception, the first will still occur. It's a bit of >>>>>> a mess. I suggest you have a >>>>>> `Consumer.setRebalanceHandler(RebalanceHandler)` method and do not add a >>>>>> new override for `Consumer.subscribe`. >>>>>> >>>>>> Thanks, >>>>>> Andrew >>>>>> >>>>>> On 2026/04/01 15:16:36 Aditya Kousik wrote: >>>>>>> Hi all, >>>>>>> >>>>>>> I'd like to start a discussion on KIP-1306: RebalanceHandler: >>>>>>> Consumer-Aware Rebalance Callback. >>>>>>> >>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1306%3A+RebalanceHandler%3A+Consumer-Aware+Rebalance+Callback >>>>>>> >>>>>>> Spring Kafka, SmallRye, and Micronaut all pass the consumer into >>>>>>> rebalance callbacks; the client doesn't. The standard workaround of >>>>>>> constructor-injecting a full Consumer reference allows dangerous >>>>>>> operations like poll() and close() inside a callback. This KIP proposes >>>>>>> RebalanceHandler, with a RebalanceConsumerView that exposes only safe >>>>>>> operations, making misuse a compile error. >>>>>>> >>>>>>> Looking forward to your feedback. >>>>>>> >>>>>>> Thanks >>>>>>> >>>>> >>>>> >>
