sijie commented on a change in pull request #1156: Introduce 
ConsumerGroupListener for realizing if a consumer is active in a failover 
subscription group
URL: https://github.com/apache/incubator-pulsar/pull/1156#discussion_r167388965
 
 

 ##########
 File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
 ##########
 @@ -81,20 +82,33 @@ public AbstractDispatcherSingleActiveConsumer(SubType 
subscriptionType, int part
 
     protected abstract void cancelPendingRead();
 
-    protected void pickAndScheduleActiveConsumer() {
+    protected void notifyConsumerGroupChanged(Consumer activeConsumer) {
+        consumers.forEach(consumer ->
+            consumer.notifyConsumerGroupChange(activeConsumer.consumerId()));
+    }
+
+    /**
+     * @return the previous active consumer if the consumer is changed, 
otherwise null.
+     */
+    protected boolean pickAndScheduleActiveConsumer() {
         checkArgument(!consumers.isEmpty());
 
         consumers.sort((c1, c2) -> 
c1.consumerName().compareTo(c2.consumerName()));
 
         int index = partitionIndex % consumers.size();
         Consumer prevConsumer = ACTIVE_CONSUMER_UPDATER.getAndSet(this, 
consumers.get(index));
 
-        if (prevConsumer == ACTIVE_CONSUMER_UPDATER.get(this)) {
+        Consumer activeConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
+        if (prevConsumer == activeConsumer) {
             // Active consumer did not change. Do nothing at this point
-            return;
+            return false;
+        } else {
+            // If the active consumer is changed, send notification.
+            notifyConsumerGroupChanged(activeConsumer);
 
 Review comment:
   @merlimat I have changed to notify active consumer changes only after the 
cursor is rewinded. please review it again.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to