sijie commented on a change in pull request #1156: Introduce ConsumerGroupListener for realizing if a consumer is active in a failover subscription group URL: https://github.com/apache/incubator-pulsar/pull/1156#discussion_r167388965
########## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java ########## @@ -81,20 +82,33 @@ public AbstractDispatcherSingleActiveConsumer(SubType subscriptionType, int part protected abstract void cancelPendingRead(); - protected void pickAndScheduleActiveConsumer() { + protected void notifyConsumerGroupChanged(Consumer activeConsumer) { + consumers.forEach(consumer -> + consumer.notifyConsumerGroupChange(activeConsumer.consumerId())); + } + + /** + * @return the previous active consumer if the consumer is changed, otherwise null. + */ + protected boolean pickAndScheduleActiveConsumer() { checkArgument(!consumers.isEmpty()); consumers.sort((c1, c2) -> c1.consumerName().compareTo(c2.consumerName())); int index = partitionIndex % consumers.size(); Consumer prevConsumer = ACTIVE_CONSUMER_UPDATER.getAndSet(this, consumers.get(index)); - if (prevConsumer == ACTIVE_CONSUMER_UPDATER.get(this)) { + Consumer activeConsumer = ACTIVE_CONSUMER_UPDATER.get(this); + if (prevConsumer == activeConsumer) { // Active consumer did not change. Do nothing at this point - return; + return false; + } else { + // If the active consumer is changed, send notification. + notifyConsumerGroupChanged(activeConsumer); Review comment: @merlimat I have changed to notify active consumer changes only after the cursor is rewinded. please review it again. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services