Hi Aditya,
I think I might have confused you. I didn't write clearly enough so apologies.
I'll try again.
Today, we have:
public interface ConsumerRebalanceListener {
void onPartitionsAssigned(Collection<TopicPartition> partitions);
void onPartitionsRevoked(Collection<TopicPartition> partitions);
default void onPartitionsLost(Collection<TopicPartition> partitions) {
onPartitionsRevoked(partitions);
}
}
I think what you end up with is:
public interface ConsumerRebalanceListener {
void onPartitionsAssigned(Collection<TopicPartition> partitions);
default void onPartitionsAssigned(Collection<TopicPartition> partitions,
ConsumerRebalanceAdapter consumerAdapter) {
onPartitionsAssigned(partitions);
}
void onPartitionsRevoked(Collection<TopicPartition> partitions);
default void onPartitionsRevoked(Collection<TopicPartition> partitions,
ConsumerRebalanceAdapter consumerAdapter) {
onPartitionsRevoked(partitions);
}
default void onPartitionsLost(Collection<TopicPartition> partitions) {
onPartitionsRevoked(partitions);
}
default void onPartitionsLost(Collection<TopicPartition> partitions,
ConsumerRebalanceAdapter consumerAdapter) {
onPartitionsRevoked(partitions, consumerAdapter);
}
}
ConsumerRebalanceView is also fine for the cut-down interface to the Consumer.
I do think it would be best to start the name with ConsumerRebalance... .
And then the changes to Consumer are:
* Add setConsumerRebalanceListener(ConsumerRebalanceListener listener)
* Deprecate all of the subscribe variants which take a
ConsumerRebalanceListener parameter
I do take the point about having a list of rebalance listeners like the
interceptor classes, but I wouldn't wrap that into the same KIP personally.
People tend to implement these things in lambdas.
Thanks,
Andrew
On 2026/04/08 05:30:51 Aditya Kousik wrote:
> Hello Andrew,
>
> I dwelt on this a bit more. I think supporting both listeners until AK5.0 may
> not be as irksome as I initially feared. We already do this for classic/async
> consumers with ConsumerDelegate.
>
> I was curious why subscribe() alone took a client-constructed object like
> ConsumerRebalanceListener whereas all other hooks were instantiated by the
> client code via Configurable. We can support both but amply call out via
> documentation and a log info line about which one will be activated at
> runtime. This can be opt-in at launch and eventually made the de facto
> pattern in the next major release.
>
> This would also tie in nicely with subscribe only supporting topic
> collections + regex and move the callback within the client code like other
> interceptors already.
>
> I also wanted to call out that this could be an opportunity to support a list
> of ConsumerRebalanceInterceptor like the others. Currently in my code, i wrap
> the outer one as a CompositeRebalanceListener with
> List<ConsumerRebalanceListener> invoked serially. We already do this as I
> mentioned earlier with producer/consumerInterceptors handling exceptions
> within each call in a loop by logging a warn.
>
> So rebalance.interceptors with a LIST of fqdn classnames instantiated within
> the constructor is my current favourite approach. We support one API exactly
> for all rebalances indicating which one at runtime.
>
> Lmk your thoughts on this.
>
> Thanks,
> Aditya Kousik
>
> > On Apr 5, 2026, at 12:37, Aditya Kousik <[email protected]> wrote:
> >
> > Hi Andrew,
> >
> > I see what you’re saying.
> > With AS1,2 the flow becomes clearer for the subscribe interaction: we only
> > change the subscription state for topics and leave rebalance events to a
> > separate mechanism uncoupled from the subscribe() call.
> >
> > To keep in line with other kafka client classes, can we follow the same
> > convention of using ConsumerConfig to handle this? A new
> > `ConsumerRebalanceInterceptor` with the same signature I’d proposed.
> > Instantiated with Utils#newConfiguredInstance and make the class
> > Configurable. Naming and instantiating makes it closer to existing
> > interceptor classes.
> >
> > My only worry is that as long as ConsumerRebalanceListener exists, this can
> > be a source of confusion for which interface to use for rebalance events.
> > Unless we deprecate it, we bear the burden of invoking both, even if we
> > state that only oneOf(ConsumerRebalanceListener,
> > ConsumberRebalanceInterceptor) will be invoked during rebalances.
> >
> > Would love to hear your thoughts on this.
> >
> > -Aditya
> >
> >> On Apr 5, 2026, at 09:39, Andrew Schofield <[email protected]> wrote:
> >>
> >> Hi Aditya,
> >> I agree that using the existing ConsumerRebalanceListener gives a lower
> >> adoption burden.
> >>
> >> AS1: To be more concrete with what I mean here, we could:
> >> * Deprecate Consumer.subscribe(Collection<String>,
> >> ConsumerRebalanceListener) for removal in AK 5.0
> >> * Introduce
> >> Consumer.setConsumerRebalanceListener(ConsumerRebalanceListener)
> >>
> >> AS2: Given that we already have an interface called
> >> ConsumerRebalanceListener, I suggest that ConsumerRebalanceXXX would be a
> >> better naming choice for naming your new interface in terms of consistency.
> >>
> >> Thanks,
> >> Andrew
> >>
> >>>> On 2026/04/04 23:48:28 Aditya Kousik wrote:
> >>> Hi Andrew, thank you for the quick feedback. It turned out to be pivotal.
> >>>
> >>> One of the rejected alternatives was to Add default methods to
> >>> ConsumerRebalanceListener.
> >>> I was ambivalent on this approach with the hopes that a new method and
> >>> new interface would create the least friction.
> >>>
> >>> You’re right about the state change w.r.t subscribe() variants. With the
> >>> Classic consumer, we directly update SubscriptionType with
> >>> setSubscriptionType and similarly a more complex setup for the async
> >>> consumer. So your setRebalanceHander suggestion seems to follow existing
> >>> patterns.
> >>>
> >>> However, looking at the places I’d need to pipe RebalanceHandler through,
> >>> it’s going to add a burden to the plumbing and subscription state.
> >>>
> >>> I’m falling squarely in the extending the existing
> >>> ConsumerRebalanceListener with new default methods. This also allows
> >>> existing frameworks like Spring and SmallRye can directly hook into the
> >>> new method with minimal change.
> >>>
> >>> I’ve renamed/updated the KIP to reflect this. (I can see why people use
> >>> shareable URLs for the confluence docs)
> >>>
> >>> https://cwiki.apache.org/confluence/x/9ZU8G
> >>>
> >>> -Aditya
> >>>
> >>>
> >>>>> On Apr 2, 2026, at 05:50, Andrew Schofield <[email protected]>
> >>>>> wrote:
> >>>>
> >>>> Hi Aditya,
> >>>> Thanks for the KIP. I've only taken a quick look so far, but here's an
> >>>> initial comment.
> >>>>
> >>>> AS1: One of the mistakes in the Kafka consumer API today is that the
> >>>> `subscribe(Collection<String>, ConsumerRebalanceListener)` does two
> >>>> jobs. First, it replaces the rebalance listener (when you might assume
> >>>> that the listener applies only to rebalance changes resulting from this
> >>>> call to subscribe). Second, it changes the subscription. If the second
> >>>> of these throws an exception, the first will still occur. It's a bit of
> >>>> a mess. I suggest you have a
> >>>> `Consumer.setRebalanceHandler(RebalanceHandler)` method and do not add a
> >>>> new override for `Consumer.subscribe`.
> >>>>
> >>>> Thanks,
> >>>> Andrew
> >>>>
> >>>> On 2026/04/01 15:16:36 Aditya Kousik wrote:
> >>>>> Hi all,
> >>>>>
> >>>>> I'd like to start a discussion on KIP-1306: RebalanceHandler:
> >>>>> Consumer-Aware Rebalance Callback.
> >>>>>
> >>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1306%3A+RebalanceHandler%3A+Consumer-Aware+Rebalance+Callback
> >>>>>
> >>>>> Spring Kafka, SmallRye, and Micronaut all pass the consumer into
> >>>>> rebalance callbacks; the client doesn't. The standard workaround of
> >>>>> constructor-injecting a full Consumer reference allows dangerous
> >>>>> operations like poll() and close() inside a callback. This KIP proposes
> >>>>> RebalanceHandler, with a RebalanceConsumerView that exposes only safe
> >>>>> operations, making misuse a compile error.
> >>>>>
> >>>>> Looking forward to your feedback.
> >>>>>
> >>>>> Thanks
> >>>>>
> >>>
> >>>
>