merlimat closed pull request #1156: Introduce ActiveConsumerListener for 
realizing if a consumer is active in a failover subscription group
URL: https://github.com/apache/incubator-pulsar/pull/1156
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
index 5181df177..1b37f8946 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
@@ -26,9 +26,9 @@
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
 import org.apache.pulsar.broker.service.BrokerServiceException;
-import org.apache.pulsar.broker.service.Consumer;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
+import org.apache.pulsar.broker.service.Consumer;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
 import org.apache.pulsar.utils.CopyOnWriteArrayList;
 import org.slf4j.Logger;
@@ -72,7 +72,17 @@ public AbstractDispatcherSingleActiveConsumer(SubType 
subscriptionType, int part
 
     protected abstract boolean isConsumersExceededOnSubscription();
 
-    protected void pickAndScheduleActiveConsumer() {
+    protected void notifyActiveConsumerChanged(Consumer activeConsumer) {
+        if (null != activeConsumer && subscriptionType == SubType.Failover) {
+            consumers.forEach(consumer ->
+                consumer.notifyActiveConsumerChange(activeConsumer));
+        }
+    }
+
+    /**
+     * @return the previous active consumer if the consumer is changed, 
otherwise null.
+     */
+    protected boolean pickAndScheduleActiveConsumer() {
         checkArgument(!consumers.isEmpty());
 
         consumers.sort((c1, c2) -> 
c1.consumerName().compareTo(c2.consumerName()));
@@ -80,12 +90,15 @@ protected void pickAndScheduleActiveConsumer() {
         int index = partitionIndex % consumers.size();
         Consumer prevConsumer = ACTIVE_CONSUMER_UPDATER.getAndSet(this, 
consumers.get(index));
 
-        if (prevConsumer == ACTIVE_CONSUMER_UPDATER.get(this)) {
+        Consumer activeConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
+        if (prevConsumer == activeConsumer) {
             // Active consumer did not change. Do nothing at this point
-            return;
+            return false;
+        } else {
+            // If the active consumer is changed, send notification.
+            scheduleReadOnActiveConsumer();
+            return true;
         }
-
-        scheduleReadOnActiveConsumer();
     }
 
     public synchronized void addConsumer(Consumer consumer) throws 
BrokerServiceException {
@@ -109,8 +122,17 @@ public synchronized void addConsumer(Consumer consumer) 
throws BrokerServiceExce
 
         consumers.add(consumer);
 
-        // Pick an active consumer and start it
-        pickAndScheduleActiveConsumer();
+        if (!pickAndScheduleActiveConsumer()) {
+            // the active consumer is not changed
+            Consumer currentActiveConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
+            if (null == currentActiveConsumer) {
+                if (log.isDebugEnabled()) {
+                    log.debug("Current active consumer disappears while adding 
consumer {}", consumer);
+                }
+            } else {
+                consumer.notifyActiveConsumerChange(currentActiveConsumer);
+            }
+        }
 
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index fd77ef290..0bb30cfb6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -152,6 +152,21 @@ public String consumerName() {
         return consumerName;
     }
 
+    void notifyActiveConsumerChange(Consumer activeConsumer) {
+        if 
(!Commands.peerSupportsActiveConsumerListener(cnx.getRemoteEndpointProtocolVersion()))
 {
+            // if the client is older than `v12`, we don't need to send 
consumer group changes.
+            return;
+        }
+
+        if (log.isDebugEnabled()) {
+            log.debug("notify consumer {} - that [{}] for subscription {} has 
new active consumer : {}",
+                consumerId, topicName, subscription.getName(), activeConsumer);
+        }
+        cnx.ctx().writeAndFlush(
+            Commands.newActiveConsumerChange(consumerId, this == 
activeConsumer),
+            cnx.ctx().voidPromise());
+    }
+
     public boolean readCompacted() {
         return readCompacted;
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 716e3325e..40678f839 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -85,7 +85,10 @@ protected void scheduleReadOnActiveConsumer() {
                 log.debug("[{}] Rewind cursor and read more entries without 
delay", name);
             }
             cursor.rewind();
-            readMoreEntries(ACTIVE_CONSUMER_UPDATER.get(this));
+
+            Consumer activeConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
+            notifyActiveConsumerChanged(activeConsumer);
+            readMoreEntries(activeConsumer);
             return;
         }
 
@@ -102,7 +105,10 @@ protected void scheduleReadOnActiveConsumer() {
                         
serviceConfig.getActiveConsumerFailoverDelayTimeMillis());
             }
             cursor.rewind();
-            readMoreEntries(ACTIVE_CONSUMER_UPDATER.get(this));
+
+            Consumer activeConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
+            notifyActiveConsumerChanged(activeConsumer);
+            readMoreEntries(activeConsumer);
             readOnActiveConsumerTask = null;
         }, serviceConfig.getActiveConsumerFailoverDelayTimeMillis(), 
TimeUnit.MILLISECONDS);
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
index a68165d5f..922b35bc5 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
@@ -22,23 +22,29 @@
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyObject;
 import static org.mockito.Matchers.matches;
+import static org.mockito.Matchers.same;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNull;
 import static org.testng.AssertJUnit.assertEquals;
 import static org.testng.AssertJUnit.assertTrue;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
 import java.lang.reflect.Field;
-import java.lang.reflect.Method;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
 import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
@@ -57,17 +63,18 @@
 import org.apache.pulsar.broker.cache.ConfigurationCacheService;
 import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
 import org.apache.pulsar.broker.namespace.NamespaceService;
-import org.apache.pulsar.broker.service.BrokerService;
-import org.apache.pulsar.broker.service.Consumer;
-import org.apache.pulsar.broker.service.ServerCnx;
 import 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
 import 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.common.api.proto.PulsarApi.BaseCommand;
+import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
+import org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion;
 import org.apache.pulsar.common.naming.DestinationName;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream;
 import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
 import org.apache.zookeeper.ZooKeeper;
 import org.mockito.invocation.InvocationOnMock;
@@ -83,9 +90,12 @@
     private BrokerService brokerService;
     private ManagedLedgerFactory mlFactoryMock;
     private ServerCnx serverCnx;
+    private ServerCnx serverCnxWithOldVersion;
     private ManagedLedger ledgerMock;
     private ManagedCursor cursorMock;
     private ConfigurationCacheService configCacheService;
+    private ChannelHandlerContext channelCtx;
+    private LinkedBlockingQueue<CommandActiveConsumerChange> consumerChanges;
 
     final String successTopicName = 
"persistent://part-perf/global/perf.t1/ptopic";
     final String failTopicName = 
"persistent://part-perf/global/perf.t1/pfailTopic";
@@ -115,10 +125,50 @@ public void setup() throws Exception {
         brokerService = spy(new BrokerService(pulsar));
         doReturn(brokerService).when(pulsar).getBrokerService();
 
+        consumerChanges = new LinkedBlockingQueue<>();
+        this.channelCtx = mock(ChannelHandlerContext.class);
+        doAnswer(invocationOnMock -> {
+            ByteBuf buf = invocationOnMock.getArgumentAt(0, ByteBuf.class);
+
+            ByteBuf cmdBuf = buf.retainedSlice(4, buf.writerIndex() - 4);
+            try {
+                int cmdSize = (int) cmdBuf.readUnsignedInt();
+                int writerIndex = cmdBuf.writerIndex();
+                cmdBuf.writerIndex(cmdBuf.readerIndex() + cmdSize);
+                ByteBufCodedInputStream cmdInputStream = 
ByteBufCodedInputStream.get(cmdBuf);
+
+                BaseCommand.Builder cmdBuilder = BaseCommand.newBuilder();
+                BaseCommand cmd = cmdBuilder.mergeFrom(cmdInputStream, 
null).build();
+                cmdBuilder.recycle();
+                cmdBuf.writerIndex(writerIndex);
+                cmdInputStream.recycle();
+
+                if (cmd.hasActiveConsumerChange()) {
+                    consumerChanges.put(cmd.getActiveConsumerChange());
+                }
+                cmd.recycle();
+            } finally {
+                cmdBuf.release();
+            }
+
+            return null;
+        }).when(channelCtx).writeAndFlush(any(), any());
+
         serverCnx = spy(new ServerCnx(brokerService));
         doReturn(true).when(serverCnx).isActive();
         doReturn(true).when(serverCnx).isWritable();
         doReturn(new InetSocketAddress("localhost", 
1234)).when(serverCnx).clientAddress();
+        
when(serverCnx.getRemoteEndpointProtocolVersion()).thenReturn(ProtocolVersion.v12.getNumber());
+        when(serverCnx.ctx()).thenReturn(channelCtx);
+
+        serverCnxWithOldVersion = spy(new ServerCnx(brokerService));
+        doReturn(true).when(serverCnxWithOldVersion).isActive();
+        doReturn(true).when(serverCnxWithOldVersion).isWritable();
+        doReturn(new InetSocketAddress("localhost", 1234))
+            .when(serverCnxWithOldVersion).clientAddress();
+        when(serverCnxWithOldVersion.getRemoteEndpointProtocolVersion())
+            .thenReturn(ProtocolVersion.v11.getNumber());
+        when(serverCnxWithOldVersion.ctx()).thenReturn(channelCtx);
 
         NamespaceService nsSvc = mock(NamespaceService.class);
         doReturn(nsSvc).when(pulsar).getNamespaceService();
@@ -193,6 +243,51 @@ public Object answer(InvocationOnMock invocationOnMock) 
throws Throwable {
         }).when(ledgerMock).asyncDeleteCursor(matches(".*success.*"), 
any(DeleteCursorCallback.class), anyObject());
     }
 
+    private void verifyActiveConsumerChange(CommandActiveConsumerChange change,
+                                            long consumerId,
+                                            boolean isActive) {
+        assertEquals(consumerId, change.getConsumerId());
+        assertEquals(isActive, change.getIsActive());
+        change.recycle();
+    }
+
+    @Test
+    public void testConsumerGroupChangesWithOldNewConsumers() throws Exception 
{
+        PersistentTopic topic = new PersistentTopic(successTopicName, 
ledgerMock, brokerService);
+        PersistentSubscription sub = new PersistentSubscription(topic, 
"sub-1", cursorMock);
+
+        int partitionIndex = 0;
+        PersistentDispatcherSingleActiveConsumer pdfc = new 
PersistentDispatcherSingleActiveConsumer(cursorMock,
+                SubType.Failover, partitionIndex, topic);
+
+        // 1. Verify no consumers connected
+        assertFalse(pdfc.isConsumerConnected());
+
+        // 2. Add old consumer
+        Consumer consumer1 = new Consumer(sub, SubType.Exclusive, 
topic.getName(), 1 /* consumer id */, 0,
+                "Cons1"/* consumer name */, 50000, serverCnxWithOldVersion, 
"myrole-1", Collections.emptyMap(), false);
+        pdfc.addConsumer(consumer1);
+        List<Consumer> consumers = pdfc.getConsumers();
+        assertTrue(consumers.get(0).consumerName() == 
consumer1.consumerName());
+        assertEquals(1, consumers.size());
+        assertNull(consumerChanges.poll());
+
+        verify(channelCtx, times(0)).write(any());
+
+        // 3. Add new consumer
+        Consumer consumer2 = new Consumer(sub, SubType.Exclusive, 
topic.getName(), 2 /* consumer id */, 0,
+                "Cons2"/* consumer name */, 50000, serverCnx, "myrole-1", 
Collections.emptyMap(), false);
+        pdfc.addConsumer(consumer2);
+        consumers = pdfc.getConsumers();
+        assertTrue(consumers.get(0).consumerName() == 
consumer1.consumerName());
+        assertEquals(2, consumers.size());
+
+        CommandActiveConsumerChange change = consumerChanges.take();
+        verifyActiveConsumerChange(change, 2, false);
+
+        verify(channelCtx, times(1)).writeAndFlush(any(), any());
+    }
+
     @Test
     public void testAddRemoveConsumer() throws Exception {
         log.info("--- Starting 
PersistentDispatcherFailoverConsumerTest::testAddConsumer ---");
@@ -208,13 +303,16 @@ public void testAddRemoveConsumer() throws Exception {
         assertFalse(pdfc.isConsumerConnected());
 
         // 2. Add consumer
-        Consumer consumer1 = new Consumer(sub, SubType.Exclusive, 
topic.getName(), 1 /* consumer id */, 0,
+        Consumer consumer1 = spy(new Consumer(sub, SubType.Exclusive, 
topic.getName(), 1 /* consumer id */, 0,
                 "Cons1"/* consumer name */, 50000, serverCnx, "myrole-1", 
Collections.emptyMap(),
-                false /* read compacted */);
+                false /* read compacted */));
         pdfc.addConsumer(consumer1);
         List<Consumer> consumers = pdfc.getConsumers();
         assertTrue(consumers.get(0).consumerName() == 
consumer1.consumerName());
         assertEquals(1, consumers.size());
+        CommandActiveConsumerChange change = consumerChanges.take();
+        verifyActiveConsumerChange(change, 1, true);
+        verify(consumer1, 
times(1)).notifyActiveConsumerChange(same(consumer1));
 
         // 3. Add again, duplicate allowed
         pdfc.addConsumer(consumer1);
@@ -224,31 +322,57 @@ public void testAddRemoveConsumer() throws Exception {
 
         // 4. Verify active consumer
         assertTrue(pdfc.getActiveConsumer().consumerName() == 
consumer1.consumerName());
+        // get the notified with who is the leader
+        change = consumerChanges.take();
+        verifyActiveConsumerChange(change, 1, true);
+        verify(consumer1, 
times(2)).notifyActiveConsumerChange(same(consumer1));
 
         // 5. Add another consumer which does not change active consumer
-        Consumer consumer2 = new Consumer(sub, SubType.Exclusive, 
topic.getName(), 2 /* consumer id */, 0, "Cons2"/* consumer name */,
-                50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* 
read compacted */);
+        Consumer consumer2 = spy(new Consumer(sub, SubType.Exclusive, 
topic.getName(), 2 /* consumer id */, 0, "Cons2"/* consumer name */,
+                50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* 
read compacted */));
         pdfc.addConsumer(consumer2);
         consumers = pdfc.getConsumers();
         assertTrue(pdfc.getActiveConsumer().consumerName() == 
consumer1.consumerName());
         assertEquals(3, consumers.size());
+        // get notified with who is the leader
+        change = consumerChanges.take();
+        verifyActiveConsumerChange(change, 2, false);
+        verify(consumer1, 
times(2)).notifyActiveConsumerChange(same(consumer1));
+        verify(consumer2, 
times(1)).notifyActiveConsumerChange(same(consumer1));
 
         // 6. Add a consumer which changes active consumer
-        Consumer consumer0 = new Consumer(sub, SubType.Exclusive, 
topic.getName(), 0 /* consumer id */, 0,
+        Consumer consumer0 = spy(new Consumer(sub, SubType.Exclusive, 
topic.getName(), 0 /* consumer id */, 0,
                 "Cons0"/* consumer name */, 50000, serverCnx, "myrole-1", 
Collections.emptyMap(),
-                false /* read compacted */);
+                false /* read compacted */));
         pdfc.addConsumer(consumer0);
         consumers = pdfc.getConsumers();
         assertTrue(pdfc.getActiveConsumer().consumerName() == 
consumer0.consumerName());
         assertEquals(4, consumers.size());
 
+        // all consumers will receive notifications
+        change = consumerChanges.take();
+        verifyActiveConsumerChange(change, 0, true);
+        change = consumerChanges.take();
+        verifyActiveConsumerChange(change, 1, false);
+        change = consumerChanges.take();
+        verifyActiveConsumerChange(change, 1, false);
+        change = consumerChanges.take();
+        verifyActiveConsumerChange(change, 2, false);
+        verify(consumer0, 
times(1)).notifyActiveConsumerChange(same(consumer0));
+        verify(consumer1, 
times(2)).notifyActiveConsumerChange(same(consumer1));
+        verify(consumer1, 
times(2)).notifyActiveConsumerChange(same(consumer0));
+        verify(consumer2, 
times(1)).notifyActiveConsumerChange(same(consumer1));
+        verify(consumer2, 
times(1)).notifyActiveConsumerChange(same(consumer0));
+
         // 7. Remove last consumer
         pdfc.removeConsumer(consumer2);
         consumers = pdfc.getConsumers();
         assertTrue(pdfc.getActiveConsumer().consumerName() == 
consumer0.consumerName());
         assertEquals(3, consumers.size());
+        // not consumer group changes
+        assertNull(consumerChanges.poll());
 
-        // 8. Verify if we can unsubscribe when more than one consumer is 
connected
+        // 8. Verify if we cannot unsubscribe when more than one consumer is 
connected
         assertFalse(pdfc.canUnsubscribe(consumer0));
 
         // 9. Remove active consumer
@@ -257,6 +381,12 @@ public void testAddRemoveConsumer() throws Exception {
         assertTrue(pdfc.getActiveConsumer().consumerName() == 
consumer1.consumerName());
         assertEquals(2, consumers.size());
 
+        // the remaining consumers will receive notifications
+        change = consumerChanges.take();
+        verifyActiveConsumerChange(change, 1, true);
+        change = consumerChanges.take();
+        verifyActiveConsumerChange(change, 1, true);
+
         // 10. Attempt to remove already removed consumer
         String cause = "";
         try {
@@ -271,10 +401,11 @@ public void testAddRemoveConsumer() throws Exception {
         consumers = pdfc.getConsumers();
         assertTrue(pdfc.getActiveConsumer().consumerName() == 
consumer1.consumerName());
         assertEquals(1, consumers.size());
+        // not consumer group changes
+        assertNull(consumerChanges.poll());
 
         // 11. With only one consumer, unsubscribe is allowed
         assertTrue(pdfc.canUnsubscribe(consumer1));
-
     }
     
     @Test
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
index c172b01a8..7bc314155 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
@@ -24,10 +24,12 @@
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
+import com.google.common.collect.Sets;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
 import 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer;
@@ -35,6 +37,7 @@
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.ConsumerEventListener;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
@@ -42,6 +45,7 @@
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.ProducerConfiguration.MessageRoutingMode;
+import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
 import org.apache.pulsar.common.naming.DestinationName;
 import org.apache.pulsar.common.util.FutureUtil;
@@ -68,22 +72,84 @@ protected void cleanup() throws Exception {
 
     private static final int CONSUMER_ADD_OR_REMOVE_WAIT_TIME = 2000;
 
+    private static class TestConsumerStateEventListener implements 
ConsumerEventListener {
+
+        final LinkedBlockingQueue<Integer> activeQueue = new 
LinkedBlockingQueue<>();
+        final LinkedBlockingQueue<Integer> inActiveQueue = new 
LinkedBlockingQueue<>();
+
+        @Override
+        public void becameActive(Consumer consumer, int partitionId) {
+            try {
+                activeQueue.put(partitionId);
+            } catch (InterruptedException e) {
+            }
+        }
+
+        @Override
+        public void becameInactive(Consumer consumer, int partitionId) {
+            try {
+                inActiveQueue.put(partitionId);
+            } catch (InterruptedException e) {
+            }
+        }
+    }
+
+    private void 
verifyConsumerNotReceiveAnyStateChanges(TestConsumerStateEventListener 
listener) throws Exception {
+        assertNull(listener.activeQueue.poll());
+        assertNull(listener.inActiveQueue.poll());
+    }
+
+    private void verifyConsumerActive(TestConsumerStateEventListener listener, 
int partitionId) throws Exception {
+        assertEquals(partitionId, listener.activeQueue.take().intValue());
+        assertNull(listener.inActiveQueue.poll());
+    }
+
+    private void verifyConsumerInactive(TestConsumerStateEventListener 
listener, int partitionId) throws Exception {
+        assertEquals(partitionId, listener.inActiveQueue.take().intValue());
+        assertNull(listener.activeQueue.poll());
+    }
+
+    private static class ActiveInactiveListenerEvent implements 
ConsumerEventListener {
+
+        private final Set<Integer> activePtns = Sets.newHashSet();
+        private final Set<Integer> inactivePtns = Sets.newHashSet();
+
+        @Override
+        public synchronized void becameActive(Consumer consumer, int 
partitionId) {
+            activePtns.add(partitionId);
+            inactivePtns.remove(partitionId);
+        }
+
+        @Override
+        public synchronized void becameInactive(Consumer consumer, int 
partitionId) {
+            activePtns.remove(partitionId);
+            inactivePtns.add(partitionId);
+        }
+    }
+
     @Test
     public void testSimpleConsumerEventsWithoutPartition() throws Exception {
         final String topicName = 
"persistent://prop/use/ns-abc/failover-topic1";
         final String subName = "sub1";
         final int numMsgs = 100;
 
+        TestConsumerStateEventListener listener1 = new 
TestConsumerStateEventListener();
         ConsumerConfiguration consumerConf1 = new ConsumerConfiguration();
         consumerConf1.setSubscriptionType(SubscriptionType.Failover);
         consumerConf1.setConsumerName("1");
+        consumerConf1.setConsumerEventListener(listener1);
+
+        TestConsumerStateEventListener listener2 = new 
TestConsumerStateEventListener();
         ConsumerConfiguration consumerConf2 = new ConsumerConfiguration();
         consumerConf2.setSubscriptionType(SubscriptionType.Failover);
         consumerConf2.setConsumerName("2");
+        consumerConf2.setConsumerEventListener(listener2);
 
         // 1. two consumers on the same subscription
         Consumer consumer1 = pulsarClient.subscribe(topicName, subName, 
consumerConf1);
         Consumer consumer2 = pulsarClient.subscribe(topicName, subName, 
consumerConf2);
+        verifyConsumerActive(listener1, -1);
+        verifyConsumerInactive(listener2, -1);
 
         PersistentTopic topicRef = (PersistentTopic) 
pulsar.getBrokerService().getTopicReference(topicName);
         PersistentSubscription subRef = topicRef.getSubscription(subName);
@@ -147,6 +213,9 @@ public void testSimpleConsumerEventsWithoutPartition() 
throws Exception {
         }
         consumer1.close();
         Thread.sleep(CONSUMER_ADD_OR_REMOVE_WAIT_TIME);
+
+        verifyConsumerActive(listener2, -1);
+        verifyConsumerNotReceiveAnyStateChanges(listener1);
         for (int i = 5; i < numMsgs; i++) {
             msg = consumer2.receive(1, TimeUnit.SECONDS);
             Assert.assertNotNull(msg);
@@ -195,9 +264,11 @@ public void testSimpleConsumerEventsWithoutPartition() 
throws Exception {
         futures.clear();
 
         // 7. consumer subscription should not send messages to the new 
consumer if its name is not highest in the list
+        TestConsumerStateEventListener listener3 = new 
TestConsumerStateEventListener();
         ConsumerConfiguration consumerConf3 = new ConsumerConfiguration();
         consumerConf3.setSubscriptionType(SubscriptionType.Failover);
         consumerConf3.setConsumerName("3");
+        consumerConf3.setConsumerEventListener(listener3);
         for (int i = 0; i < 5; i++) {
             msg = consumer1.receive(1, TimeUnit.SECONDS);
             Assert.assertNotNull(msg);
@@ -206,6 +277,9 @@ public void testSimpleConsumerEventsWithoutPartition() 
throws Exception {
         }
         Consumer consumer3 = pulsarClient.subscribe(topicName, subName, 
consumerConf3);
         Thread.sleep(CONSUMER_ADD_OR_REMOVE_WAIT_TIME);
+
+        verifyConsumerInactive(listener3, -1);
+
         Assert.assertNull(consumer3.receive(1, TimeUnit.SECONDS));
         for (int i = 5; i < numMsgs; i++) {
             msg = consumer1.receive(1, TimeUnit.SECONDS);
@@ -247,7 +321,7 @@ public void testSimpleConsumerEventsWithoutPartition() 
throws Exception {
         admin.persistentTopics().delete(topicName);
     }
 
-    @Test(enabled = false)
+    @Test
     public void testSimpleConsumerEventsWithPartition() throws Exception {
         int numPartitions = 4;
 
@@ -261,12 +335,18 @@ public void testSimpleConsumerEventsWithPartition() 
throws Exception {
 
         ProducerConfiguration producerConf = new ProducerConfiguration();
         
producerConf.setMessageRoutingMode(MessageRoutingMode.RoundRobinPartition);
+
+        ActiveInactiveListenerEvent listener1 = new 
ActiveInactiveListenerEvent();
         ConsumerConfiguration consumerConf1 = new ConsumerConfiguration();
         consumerConf1.setSubscriptionType(SubscriptionType.Failover);
         consumerConf1.setConsumerName("1");
+        consumerConf1.setConsumerEventListener(listener1);
+
+        ActiveInactiveListenerEvent listener2 = new 
ActiveInactiveListenerEvent();
         ConsumerConfiguration consumerConf2 = new ConsumerConfiguration();
         consumerConf2.setSubscriptionType(SubscriptionType.Failover);
         consumerConf2.setConsumerName("2");
+        consumerConf2.setConsumerEventListener(listener2);
 
         // 1. two consumers on the same subscription
         Consumer consumer1 = pulsarClient.subscribe(topicName, subName, 
consumerConf1);
@@ -298,6 +378,7 @@ public void testSimpleConsumerEventsWithPartition() throws 
Exception {
         // equal distribution between both consumers
         int totalMessages = 0;
         Message msg = null;
+        Set<Integer> receivedPtns = Sets.newHashSet();
         while (true) {
             msg = consumer1.receive(1, TimeUnit.SECONDS);
             if (msg == null) {
@@ -305,8 +386,16 @@ public void testSimpleConsumerEventsWithPartition() throws 
Exception {
             }
             totalMessages++;
             consumer1.acknowledge(msg);
+            MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId();
+            receivedPtns.add(msgId.getPartitionIndex());
         }
+
+        assertTrue(Sets.difference(listener1.activePtns, 
receivedPtns).isEmpty());
+        assertTrue(Sets.difference(listener2.inactivePtns, 
receivedPtns).isEmpty());
+
         Assert.assertEquals(totalMessages, numMsgs / 2);
+
+        receivedPtns = Sets.newHashSet();
         while (true) {
             msg = consumer2.receive(1, TimeUnit.SECONDS);
             if (msg == null) {
@@ -314,7 +403,12 @@ public void testSimpleConsumerEventsWithPartition() throws 
Exception {
             }
             totalMessages++;
             consumer2.acknowledge(msg);
+            MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId();
+            receivedPtns.add(msgId.getPartitionIndex());
         }
+        assertTrue(Sets.difference(listener1.inactivePtns, 
receivedPtns).isEmpty());
+        assertTrue(Sets.difference(listener2.activePtns, 
receivedPtns).isEmpty());
+
         Assert.assertEquals(totalMessages, numMsgs);
         Assert.assertEquals(disp0.getActiveConsumer().consumerName(), 
consumerConf1.getConsumerName());
         Assert.assertEquals(disp1.getActiveConsumer().consumerName(), 
consumerConf2.getConsumerName());
diff --git a/pulsar-client-cpp/lib/ClientConnection.cc 
b/pulsar-client-cpp/lib/ClientConnection.cc
index fc773fedf..018f4ab88 100644
--- a/pulsar-client-cpp/lib/ClientConnection.cc
+++ b/pulsar-client-cpp/lib/ClientConnection.cc
@@ -921,6 +921,13 @@ void ClientConnection::handleIncomingCommand() {
                     break;
                 }
 
+                case BaseCommand::ACTIVE_CONSUMER_CHANGE: {
+                    LOG_DEBUG(cnxString_ << "Received notification about 
active consumer changes");
+                    // ignore this message for now.
+                    // TODO: 
@link{https://github.com/apache/incubator-pulsar/issues/1240}
+                    break;
+                }
+
                 default: {
                     LOG_WARN(cnxString_ << "Received invalid message from 
server");
                     close();
diff --git a/pulsar-client-cpp/lib/Commands.cc 
b/pulsar-client-cpp/lib/Commands.cc
index 70d61dfa1..e0245c555 100644
--- a/pulsar-client-cpp/lib/Commands.cc
+++ b/pulsar-client-cpp/lib/Commands.cc
@@ -381,6 +381,9 @@ std::string Commands::messageType(BaseCommand_Type type) {
         case BaseCommand::SEEK:
             return "SEEK";
             break;
+        case BaseCommand::ACTIVE_CONSUMER_CHANGE:
+            return "ACTIVE_CONSUMER_CHANGE";
+            break;
     };
 }
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java
index 76928bd1c..00e4537ad 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java
@@ -46,6 +46,8 @@
 
     private MessageListener messageListener;
 
+    private ConsumerEventListener consumerEventListener;
+
     private int receiverQueueSize = 1000;
 
     private int maxTotalReceiverQueueSizeAcrossPartitions = 50000;
@@ -131,6 +133,33 @@ public ConsumerConfiguration 
setMessageListener(MessageListener messageListener)
         return this;
     }
 
+    /**
+     * @return this configured {@link ConsumerEventListener} for the consumer.
+     * @see #setConsumerEventListener(ConsumerEventListener)
+     * @since 2.0
+     */
+    public ConsumerEventListener getConsumerEventListener() {
+        return this.consumerEventListener;
+    }
+
+    /**
+     * Sets a {@link ConsumerEventListener} for the consumer.
+     *
+     * <p>The consumer group listener is used for receiving consumer state 
change in a consumer group for failover
+     * subscription. Application can then react to the consumer state changes.
+     *
+     * <p>This change is experimental. It is subject to changes coming in 
release 2.0.
+     *
+     * @param listener the consumer group listener object
+     * @return consumer configuration
+     * @since 2.0
+     */
+    public ConsumerConfiguration 
setConsumerEventListener(ConsumerEventListener listener) {
+        checkNotNull(listener);
+        this.consumerEventListener = listener;
+        return this;
+    }
+
     /**
      * @return the configure receiver queue size value
      */
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerEventListener.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerEventListener.java
new file mode 100644
index 000000000..aeb8bbbb5
--- /dev/null
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerEventListener.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+/**
+ * Listener on the consumer state changes.
+ */
+public interface ConsumerEventListener {
+
+    /**
+     * Notified when the consumer group is changed, and the consumer becomes 
the active consumer.
+     */
+    void becameActive(Consumer consumer, int partitionId);
+
+    /**
+     * Notified when the consumer group is changed, and the consumer is still 
inactive or becomes inactive.
+     */
+    void becameInactive(Consumer consumer, int partitionId);
+
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 58cdedeb6..43c49b2a9 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -40,6 +40,7 @@
 import org.apache.pulsar.client.impl.BinaryProtoLookupService.LookupDataResult;
 import org.apache.pulsar.common.api.Commands;
 import org.apache.pulsar.common.api.PulsarHandler;
+import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandCloseConsumer;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandCloseProducer;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandConnected;
@@ -252,6 +253,19 @@ protected void handleMessage(CommandMessage cmdMessage, 
ByteBuf headersAndPayloa
         }
     }
 
+    @Override
+    protected void handleActiveConsumerChange(CommandActiveConsumerChange 
change) {
+        checkArgument(state == State.Ready);
+
+        if (log.isDebugEnabled()) {
+            log.debug("{} Received a consumer group change message from the 
server : {}", ctx.channel(), change);
+        }
+        ConsumerImpl consumer = consumers.get(change.getConsumerId());
+        if (consumer != null) {
+            consumer.activeConsumerChanged(change.getIsActive());
+        }
+    }
+
     @Override
     protected void handleSuccess(CommandSuccess success) {
         checkArgument(state == State.Ready);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index be5e6584b..1886c7683 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -30,6 +30,7 @@
 
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.ConsumerEventListener;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.MessageListener;
@@ -54,6 +55,7 @@
     protected final String consumerName;
     protected final CompletableFuture<Consumer> subscribeFuture;
     protected final MessageListener listener;
+    protected final ConsumerEventListener consumerEventListener;
     protected final ExecutorService listenerExecutor;
     final BlockingQueue<Message> incomingMessages;
     protected final ConcurrentLinkedQueue<CompletableFuture<Message>> 
pendingReceives;
@@ -68,6 +70,7 @@ protected ConsumerBase(PulsarClientImpl client, String topic, 
String subscriptio
         this.consumerName = conf.getConsumerName() == null ? 
ConsumerName.generateRandomName() : conf.getConsumerName();
         this.subscribeFuture = subscribeFuture;
         this.listener = conf.getMessageListener();
+        this.consumerEventListener = conf.getConsumerEventListener();
         if (receiverQueueSize <= 1) {
             this.incomingMessages = Queues.newArrayBlockingQueue(1);
         } else {
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index a43689971..219479f53 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -747,6 +747,20 @@ private void failPendingReceive() {
         }
     }
 
+    void activeConsumerChanged(boolean isActive) {
+        if (consumerEventListener == null) {
+            return;
+        }
+
+        listenerExecutor.submit(() -> {
+            if (isActive) {
+                consumerEventListener.becameActive(this, partitionIndex);
+            } else {
+                consumerEventListener.becameInactive(this, partitionIndex);
+            }
+        });
+    }
+
     void messageReceived(MessageIdData messageId, ByteBuf headersAndPayload, 
ClientCnx cnx) {
         if (log.isDebugEnabled()) {
             log.debug("[{}][{}] Received message: {}/{}", topic, subscription, 
messageId.getLedgerId(),
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
index 60e392804..92cbea2e5 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
@@ -439,7 +439,9 @@ private ConsumerConfiguration getInternalConsumerConfig() {
         
internalConsumerConfig.setReceiverQueueSize(conf.getReceiverQueueSize());
         internalConsumerConfig.setSubscriptionType(conf.getSubscriptionType());
         internalConsumerConfig.setConsumerName(consumerName);
-
+        if (null != conf.getConsumerEventListener()) {
+            
internalConsumerConfig.setConsumerEventListener(conf.getConsumerEventListener());
+        }
         int receiverQueueSize = Math.min(conf.getReceiverQueueSize(),
                 conf.getMaxTotalReceiverQueueSizeAcrossPartitions() / 
numPartitions);
         internalConsumerConfig.setReceiverQueueSize(receiverQueueSize);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index f6bd759bb..029f712f3 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -251,6 +251,13 @@ public Consumer subscribe(String topic, String 
subscription, ConsumerConfigurati
                             "Read compacted can only be used with exclusive of 
failover persistent subscriptions"));
         }
 
+        if (conf.getConsumerEventListener() != null
+            && conf.getSubscriptionType() != SubscriptionType.Failover) {
+            return FutureUtil.failedFuture(
+                    new PulsarClientException.InvalidConfigurationException(
+                        "Active consumer listener is only supported for 
failover subscription"));
+        }
+
         CompletableFuture<Consumer> consumerSubscribedFuture = new 
CompletableFuture<>();
 
         getPartitionedTopicMetadata(topic).thenAccept(metadata -> {
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
index 2912a737a..a395233cc 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
@@ -40,6 +40,7 @@
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandCloseProducer;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandConnect;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandConnected;
+import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange;
 import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandConsumerStatsResponse;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandError;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandFlow;
@@ -360,6 +361,19 @@ public static ByteBuf newUnsubscribe(long consumerId, long 
requestId) {
         return res;
     }
 
+    public static ByteBuf newActiveConsumerChange(long consumerId, boolean 
isActive) {
+        CommandActiveConsumerChange.Builder changeBuilder = 
CommandActiveConsumerChange.newBuilder()
+            .setConsumerId(consumerId)
+            .setIsActive(isActive);
+
+        CommandActiveConsumerChange change = changeBuilder.build();
+        ByteBuf res = serializeWithSize(
+            
BaseCommand.newBuilder().setType(Type.ACTIVE_CONSUMER_CHANGE).setActiveConsumerChange(change));
+        changeBuilder.recycle();
+        change.recycle();
+        return res;
+    }
+
     public static ByteBuf newSeek(long consumerId, long requestId, long 
ledgerId, long entryId) {
         CommandSeek.Builder seekBuilder = CommandSeek.newBuilder();
         seekBuilder.setConsumerId(consumerId);
@@ -956,4 +970,8 @@ public static ByteBuf newLookup(String topic, boolean 
authoritative, String orig
     public static boolean peerSupportsGetLastMessageId(int peerVersion) {
         return peerVersion >= ProtocolVersion.v12.getNumber();
     }
+
+    public static boolean peerSupportsActiveConsumerListener(int peerVersion) {
+        return peerVersion >= ProtocolVersion.v12.getNumber();
+    }
 }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/PulsarDecoder.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/PulsarDecoder.java
index 8d2389bf6..205619983 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/PulsarDecoder.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/PulsarDecoder.java
@@ -23,6 +23,7 @@
 import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.apache.pulsar.common.api.proto.PulsarApi.BaseCommand;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck;
+import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandCloseConsumer;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandCloseProducer;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandConnect;
@@ -52,7 +53,6 @@
 import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
@@ -266,6 +266,11 @@ public void channelRead(ChannelHandlerContext ctx, Object 
msg) throws Exception
                 
handleGetLastMessageIdSuccess(cmd.getGetLastMessageIdResponse());
                 cmd.getGetLastMessageIdResponse().recycle();
                 break;
+
+            case ACTIVE_CONSUMER_CHANGE:
+                handleActiveConsumerChange(cmd.getActiveConsumerChange());
+                cmd.getActiveConsumerChange().recycle();
+                break;
             }
         } finally {
             if (cmdBuilder != null) {
@@ -350,6 +355,10 @@ protected void handleSeek(CommandSeek seek) {
         throw new UnsupportedOperationException();
     }
 
+    protected void handleActiveConsumerChange(CommandActiveConsumerChange 
change) {
+        throw new UnsupportedOperationException();
+    }
+
     protected void handleSuccess(CommandSuccess success) {
         throw new UnsupportedOperationException();
     }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
index 27cc63ae6..80df9b821 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
@@ -13624,6 +13624,393 @@ public Builder removeProperties(int index) {
     // @@protoc_insertion_point(class_scope:pulsar.proto.CommandAck)
   }
   
+  public interface CommandActiveConsumerChangeOrBuilder
+      extends com.google.protobuf.MessageLiteOrBuilder {
+    
+    // required uint64 consumer_id = 1;
+    boolean hasConsumerId();
+    long getConsumerId();
+    
+    // optional bool is_active = 2 [default = false];
+    boolean hasIsActive();
+    boolean getIsActive();
+  }
+  public static final class CommandActiveConsumerChange extends
+      com.google.protobuf.GeneratedMessageLite
+      implements CommandActiveConsumerChangeOrBuilder, 
org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage
  {
+    // Use CommandActiveConsumerChange.newBuilder() to construct.
+    private final io.netty.util.Recycler.Handle<CommandActiveConsumerChange> 
handle;
+    private 
CommandActiveConsumerChange(io.netty.util.Recycler.Handle<CommandActiveConsumerChange>
 handle) {
+      this.handle = handle;
+    }
+    
+     private static final io.netty.util.Recycler<CommandActiveConsumerChange> 
RECYCLER = new io.netty.util.Recycler<CommandActiveConsumerChange>() {
+            protected CommandActiveConsumerChange 
newObject(Handle<CommandActiveConsumerChange> handle) {
+              return new CommandActiveConsumerChange(handle);
+            }
+          };
+        
+        public void recycle() {
+            this.initFields();
+            this.memoizedIsInitialized = -1;
+            this.bitField0_ = 0;
+            this.memoizedSerializedSize = -1;
+            handle.recycle(this);
+        }
+         
+    private CommandActiveConsumerChange(boolean noInit) {
+        this.handle = null;
+    }
+    
+    private static final CommandActiveConsumerChange defaultInstance;
+    public static CommandActiveConsumerChange getDefaultInstance() {
+      return defaultInstance;
+    }
+    
+    public CommandActiveConsumerChange getDefaultInstanceForType() {
+      return defaultInstance;
+    }
+    
+    private int bitField0_;
+    // required uint64 consumer_id = 1;
+    public static final int CONSUMER_ID_FIELD_NUMBER = 1;
+    private long consumerId_;
+    public boolean hasConsumerId() {
+      return ((bitField0_ & 0x00000001) == 0x00000001);
+    }
+    public long getConsumerId() {
+      return consumerId_;
+    }
+    
+    // optional bool is_active = 2 [default = false];
+    public static final int IS_ACTIVE_FIELD_NUMBER = 2;
+    private boolean isActive_;
+    public boolean hasIsActive() {
+      return ((bitField0_ & 0x00000002) == 0x00000002);
+    }
+    public boolean getIsActive() {
+      return isActive_;
+    }
+    
+    private void initFields() {
+      consumerId_ = 0L;
+      isActive_ = false;
+    }
+    private byte memoizedIsInitialized = -1;
+    public final boolean isInitialized() {
+      byte isInitialized = memoizedIsInitialized;
+      if (isInitialized != -1) return isInitialized == 1;
+      
+      if (!hasConsumerId()) {
+        memoizedIsInitialized = 0;
+        return false;
+      }
+      memoizedIsInitialized = 1;
+      return true;
+    }
+    
+    public void writeTo(com.google.protobuf.CodedOutputStream output)
+                        throws java.io.IOException {
+        throw new RuntimeException("Cannot use CodedOutputStream");
+    }
+    
+    public void 
writeTo(org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream output)
+                        throws java.io.IOException {
+      getSerializedSize();
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        output.writeUInt64(1, consumerId_);
+      }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        output.writeBool(2, isActive_);
+      }
+    }
+    
+    private int memoizedSerializedSize = -1;
+    public int getSerializedSize() {
+      int size = memoizedSerializedSize;
+      if (size != -1) return size;
+    
+      size = 0;
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeUInt64Size(1, consumerId_);
+      }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBoolSize(2, isActive_);
+      }
+      memoizedSerializedSize = size;
+      return size;
+    }
+    
+    private static final long serialVersionUID = 0L;
+    @java.lang.Override
+    protected java.lang.Object writeReplace()
+        throws java.io.ObjectStreamException {
+      return super.writeReplace();
+    }
+    
+    public static 
org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange 
parseFrom(
+        com.google.protobuf.ByteString data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+         throw new RuntimeException("Disabled");
+    }
+    public static 
org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange 
parseFrom(
+        com.google.protobuf.ByteString data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+         throw new RuntimeException("Disabled");
+    }
+    public static 
org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange 
parseFrom(byte[] data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data).buildParsed();
+    }
+    public static 
org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange 
parseFrom(
+        byte[] data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data, extensionRegistry)
+               .buildParsed();
+    }
+    public static 
org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange 
parseFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input).buildParsed();
+    }
+    public static 
org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange 
parseFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input, extensionRegistry)
+               .buildParsed();
+    }
+    public static 
org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange 
parseDelimitedFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      Builder builder = newBuilder();
+      if (builder.mergeDelimitedFrom(input)) {
+        return builder.buildParsed();
+      } else {
+        return null;
+      }
+    }
+    public static 
org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange 
parseDelimitedFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      Builder builder = newBuilder();
+      if (builder.mergeDelimitedFrom(input, extensionRegistry)) {
+        return builder.buildParsed();
+      } else {
+        return null;
+      }
+    }
+    public static 
org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange 
parseFrom(
+        com.google.protobuf.CodedInputStream input)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input).buildParsed();
+    }
+    public static 
org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange 
parseFrom(
+        com.google.protobuf.CodedInputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input, extensionRegistry)
+               .buildParsed();
+    }
+    
+    public static Builder newBuilder() { return Builder.create(); }
+    public Builder newBuilderForType() { return newBuilder(); }
+    public static Builder 
newBuilder(org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange
 prototype) {
+      return newBuilder().mergeFrom(prototype);
+    }
+    public Builder toBuilder() { return newBuilder(this); }
+    
+    public static final class Builder extends
+        com.google.protobuf.GeneratedMessageLite.Builder<
+          
org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange, 
Builder>
+        implements 
org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChangeOrBuilder,
 
org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder
  {
+      // Construct using 
org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange.newBuilder()
+      private final io.netty.util.Recycler.Handle<Builder> handle;
+      private Builder(io.netty.util.Recycler.Handle<Builder> handle) {
+        this.handle = handle;
+        maybeForceBuilderInitialization();
+      }
+      private final static io.netty.util.Recycler<Builder> RECYCLER = new 
io.netty.util.Recycler<Builder>() {
+         protected Builder newObject(io.netty.util.Recycler.Handle<Builder> 
handle) {
+               return new Builder(handle);
+             }
+            };
+      
+       public void recycle() {
+                clear();
+                handle.recycle(this);
+            }
+      
+      private void maybeForceBuilderInitialization() {
+      }
+      private static Builder create() {
+        return RECYCLER.get();
+      }
+      
+      public Builder clear() {
+        super.clear();
+        consumerId_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000001);
+        isActive_ = false;
+        bitField0_ = (bitField0_ & ~0x00000002);
+        return this;
+      }
+      
+      public Builder clone() {
+        return create().mergeFrom(buildPartial());
+      }
+      
+      public 
org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange 
getDefaultInstanceForType() {
+        return 
org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange.getDefaultInstance();
+      }
+      
+      public 
org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange 
build() {
+        
org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange result 
= buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(result);
+        }
+        return result;
+      }
+      
+      private 
org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange 
buildParsed()
+          throws com.google.protobuf.InvalidProtocolBufferException {
+        
org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange result 
= buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(
+            result).asInvalidProtocolBufferException();
+        }
+        return result;
+      }
+      
+      public 
org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange 
buildPartial() {
+        
org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange result 
= 
org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange.RECYCLER.get();
+        int from_bitField0_ = bitField0_;
+        int to_bitField0_ = 0;
+        if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+          to_bitField0_ |= 0x00000001;
+        }
+        result.consumerId_ = consumerId_;
+        if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
+          to_bitField0_ |= 0x00000002;
+        }
+        result.isActive_ = isActive_;
+        result.bitField0_ = to_bitField0_;
+        return result;
+      }
+      
+      public Builder 
mergeFrom(org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange
 other) {
+        if (other == 
org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange.getDefaultInstance())
 return this;
+        if (other.hasConsumerId()) {
+          setConsumerId(other.getConsumerId());
+        }
+        if (other.hasIsActive()) {
+          setIsActive(other.getIsActive());
+        }
+        return this;
+      }
+      
+      public final boolean isInitialized() {
+        if (!hasConsumerId()) {
+          
+          return false;
+        }
+        return true;
+      }
+      
+      public Builder mergeFrom(com.google.protobuf.CodedInputStream input,
+                              com.google.protobuf.ExtensionRegistryLite 
extensionRegistry)
+                              throws java.io.IOException {
+         throw new java.io.IOException("Merge from CodedInputStream is 
disabled");
+                              }
+      public Builder mergeFrom(
+          org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream input,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws java.io.IOException {
+        while (true) {
+          int tag = input.readTag();
+          switch (tag) {
+            case 0:
+              
+              return this;
+            default: {
+              if (!input.skipField(tag)) {
+                
+                return this;
+              }
+              break;
+            }
+            case 8: {
+              bitField0_ |= 0x00000001;
+              consumerId_ = input.readUInt64();
+              break;
+            }
+            case 16: {
+              bitField0_ |= 0x00000002;
+              isActive_ = input.readBool();
+              break;
+            }
+          }
+        }
+      }
+      
+      private int bitField0_;
+      
+      // required uint64 consumer_id = 1;
+      private long consumerId_ ;
+      public boolean hasConsumerId() {
+        return ((bitField0_ & 0x00000001) == 0x00000001);
+      }
+      public long getConsumerId() {
+        return consumerId_;
+      }
+      public Builder setConsumerId(long value) {
+        bitField0_ |= 0x00000001;
+        consumerId_ = value;
+        
+        return this;
+      }
+      public Builder clearConsumerId() {
+        bitField0_ = (bitField0_ & ~0x00000001);
+        consumerId_ = 0L;
+        
+        return this;
+      }
+      
+      // optional bool is_active = 2 [default = false];
+      private boolean isActive_ ;
+      public boolean hasIsActive() {
+        return ((bitField0_ & 0x00000002) == 0x00000002);
+      }
+      public boolean getIsActive() {
+        return isActive_;
+      }
+      public Builder setIsActive(boolean value) {
+        bitField0_ |= 0x00000002;
+        isActive_ = value;
+        
+        return this;
+      }
+      public Builder clearIsActive() {
+        bitField0_ = (bitField0_ & ~0x00000002);
+        isActive_ = false;
+        
+        return this;
+      }
+      
+      // 
@@protoc_insertion_point(builder_scope:pulsar.proto.CommandActiveConsumerChange)
+    }
+    
+    static {
+      defaultInstance = new CommandActiveConsumerChange(true);
+      defaultInstance.initFields();
+    }
+    
+    // 
@@protoc_insertion_point(class_scope:pulsar.proto.CommandActiveConsumerChange)
+  }
+  
   public interface CommandFlowOrBuilder
       extends com.google.protobuf.MessageLiteOrBuilder {
     
@@ -21017,6 +21404,10 @@ public Builder clearRequestId() {
     // optional .pulsar.proto.CommandGetLastMessageIdResponse 
getLastMessageIdResponse = 30;
     boolean hasGetLastMessageIdResponse();
     
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse 
getGetLastMessageIdResponse();
+    
+    // optional .pulsar.proto.CommandActiveConsumerChange 
active_consumer_change = 31;
+    boolean hasActiveConsumerChange();
+    org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange 
getActiveConsumerChange();
   }
   public static final class BaseCommand extends
       com.google.protobuf.GeneratedMessageLite
@@ -21085,6 +21476,7 @@ public BaseCommand getDefaultInstanceForType() {
       SEEK(26, 28),
       GET_LAST_MESSAGE_ID(27, 29),
       GET_LAST_MESSAGE_ID_RESPONSE(28, 30),
+      ACTIVE_CONSUMER_CHANGE(29, 31),
       ;
       
       public static final int CONNECT_VALUE = 2;
@@ -21116,6 +21508,7 @@ public BaseCommand getDefaultInstanceForType() {
       public static final int SEEK_VALUE = 28;
       public static final int GET_LAST_MESSAGE_ID_VALUE = 29;
       public static final int GET_LAST_MESSAGE_ID_RESPONSE_VALUE = 30;
+      public static final int ACTIVE_CONSUMER_CHANGE_VALUE = 31;
       
       
       public final int getNumber() { return value; }
@@ -21151,6 +21544,7 @@ public static Type valueOf(int value) {
           case 28: return SEEK;
           case 29: return GET_LAST_MESSAGE_ID;
           case 30: return GET_LAST_MESSAGE_ID_RESPONSE;
+          case 31: return ACTIVE_CONSUMER_CHANGE;
           default: return null;
         }
       }
@@ -21477,6 +21871,16 @@ public boolean hasGetLastMessageIdResponse() {
       return getLastMessageIdResponse_;
     }
     
+    // optional .pulsar.proto.CommandActiveConsumerChange 
active_consumer_change = 31;
+    public static final int ACTIVE_CONSUMER_CHANGE_FIELD_NUMBER = 31;
+    private 
org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange 
activeConsumerChange_;
+    public boolean hasActiveConsumerChange() {
+      return ((bitField0_ & 0x40000000) == 0x40000000);
+    }
+    public 
org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange 
getActiveConsumerChange() {
+      return activeConsumerChange_;
+    }
+    
     private void initFields() {
       type_ = 
org.apache.pulsar.common.api.proto.PulsarApi.BaseCommand.Type.CONNECT;
       connect_ = 
org.apache.pulsar.common.api.proto.PulsarApi.CommandConnect.getDefaultInstance();
@@ -21508,6 +21912,7 @@ private void initFields() {
       seek_ = 
org.apache.pulsar.common.api.proto.PulsarApi.CommandSeek.getDefaultInstance();
       getLastMessageId_ = 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId.getDefaultInstance();
       getLastMessageIdResponse_ = 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse.getDefaultInstance();
+      activeConsumerChange_ = 
org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange.getDefaultInstance();
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -21680,6 +22085,12 @@ public final boolean isInitialized() {
           return false;
         }
       }
+      if (hasActiveConsumerChange()) {
+        if (!getActiveConsumerChange().isInitialized()) {
+          memoizedIsInitialized = 0;
+          return false;
+        }
+      }
       memoizedIsInitialized = 1;
       return true;
     }
@@ -21782,6 +22193,9 @@ public void 
writeTo(org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStr
       if (((bitField0_ & 0x20000000) == 0x20000000)) {
         output.writeMessage(30, getLastMessageIdResponse_);
       }
+      if (((bitField0_ & 0x40000000) == 0x40000000)) {
+        output.writeMessage(31, activeConsumerChange_);
+      }
     }
     
     private int memoizedSerializedSize = -1;
@@ -21910,6 +22324,10 @@ public int getSerializedSize() {
         size += com.google.protobuf.CodedOutputStream
           .computeMessageSize(30, getLastMessageIdResponse_);
       }
+      if (((bitField0_ & 0x40000000) == 0x40000000)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeMessageSize(31, activeConsumerChange_);
+      }
       memoizedSerializedSize = size;
       return size;
     }
@@ -22083,6 +22501,8 @@ public Builder clear() {
         bitField0_ = (bitField0_ & ~0x10000000);
         getLastMessageIdResponse_ = 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse.getDefaultInstance();
         bitField0_ = (bitField0_ & ~0x20000000);
+        activeConsumerChange_ = 
org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange.getDefaultInstance();
+        bitField0_ = (bitField0_ & ~0x40000000);
         return this;
       }
       
@@ -22236,6 +22656,10 @@ public Builder clone() {
           to_bitField0_ |= 0x20000000;
         }
         result.getLastMessageIdResponse_ = getLastMessageIdResponse_;
+        if (((from_bitField0_ & 0x40000000) == 0x40000000)) {
+          to_bitField0_ |= 0x40000000;
+        }
+        result.activeConsumerChange_ = activeConsumerChange_;
         result.bitField0_ = to_bitField0_;
         return result;
       }
@@ -22332,6 +22756,9 @@ public Builder 
mergeFrom(org.apache.pulsar.common.api.proto.PulsarApi.BaseComman
         if (other.hasGetLastMessageIdResponse()) {
           mergeGetLastMessageIdResponse(other.getGetLastMessageIdResponse());
         }
+        if (other.hasActiveConsumerChange()) {
+          mergeActiveConsumerChange(other.getActiveConsumerChange());
+        }
         return this;
       }
       
@@ -22502,6 +22929,12 @@ public final boolean isInitialized() {
             return false;
           }
         }
+        if (hasActiveConsumerChange()) {
+          if (!getActiveConsumerChange().isInitialized()) {
+            
+            return false;
+          }
+        }
         return true;
       }
       
@@ -22826,6 +23259,16 @@ public Builder mergeFrom(
               subBuilder.recycle();
               break;
             }
+            case 250: {
+              
org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange.Builder
 subBuilder = 
org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange.newBuilder();
+              if (hasActiveConsumerChange()) {
+                subBuilder.mergeFrom(getActiveConsumerChange());
+              }
+              input.readMessage(subBuilder, extensionRegistry);
+              setActiveConsumerChange(subBuilder.buildPartial());
+              subBuilder.recycle();
+              break;
+            }
           }
         }
       }
@@ -24103,6 +24546,49 @@ public Builder clearGetLastMessageIdResponse() {
         return this;
       }
       
+      // optional .pulsar.proto.CommandActiveConsumerChange 
active_consumer_change = 31;
+      private 
org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange 
activeConsumerChange_ = 
org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange.getDefaultInstance();
+      public boolean hasActiveConsumerChange() {
+        return ((bitField0_ & 0x40000000) == 0x40000000);
+      }
+      public 
org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange 
getActiveConsumerChange() {
+        return activeConsumerChange_;
+      }
+      public Builder 
setActiveConsumerChange(org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange
 value) {
+        if (value == null) {
+          throw new NullPointerException();
+        }
+        activeConsumerChange_ = value;
+        
+        bitField0_ |= 0x40000000;
+        return this;
+      }
+      public Builder setActiveConsumerChange(
+          
org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange.Builder
 builderForValue) {
+        activeConsumerChange_ = builderForValue.build();
+        
+        bitField0_ |= 0x40000000;
+        return this;
+      }
+      public Builder 
mergeActiveConsumerChange(org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange
 value) {
+        if (((bitField0_ & 0x40000000) == 0x40000000) &&
+            activeConsumerChange_ != 
org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange.getDefaultInstance())
 {
+          activeConsumerChange_ =
+            
org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange.newBuilder(activeConsumerChange_).mergeFrom(value).buildPartial();
+        } else {
+          activeConsumerChange_ = value;
+        }
+        
+        bitField0_ |= 0x40000000;
+        return this;
+      }
+      public Builder clearActiveConsumerChange() {
+        activeConsumerChange_ = 
org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange.getDefaultInstance();
+        
+        bitField0_ = (bitField0_ & ~0x40000000);
+        return this;
+      }
+      
       // @@protoc_insertion_point(builder_scope:pulsar.proto.BaseCommand)
     }
     
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto 
b/pulsar-common/src/main/proto/PulsarApi.proto
index ca3d2fb3d..4a1112d0a 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -135,7 +135,8 @@ enum ProtocolVersion {
        v9 = 9;  // Added end of topic notification
        v10 = 10;// Added proxy to broker
        v11 = 11;// C++ consumers before this version are not correctly 
handling the checksum field
-       v12 = 12;//Added get topic's last messageId from broker
+       v12 = 12;// Added get topic's last messageId from broker
+                 // Added CommandActiveConsumerChange
 }
 
 message CommandConnect {
@@ -324,6 +325,12 @@ message CommandAck {
         repeated KeyLongValue properties = 5;
 }
 
+// changes on active consumer
+message CommandActiveConsumerChange {
+        required uint64 consumer_id    = 1;
+        optional bool is_active     = 2 [default = false];
+}
+
 message CommandFlow {
        required uint64 consumer_id       = 1;
 
@@ -500,6 +507,8 @@ message BaseCommand {
 
                GET_LAST_MESSAGE_ID = 29;
                GET_LAST_MESSAGE_ID_RESPONSE = 30;
+
+                ACTIVE_CONSUMER_CHANGE = 31;
        }
 
        required Type type = 1;
@@ -544,5 +553,6 @@ message BaseCommand {
        optional CommandGetLastMessageId getLastMessageId = 29;
        optional CommandGetLastMessageIdResponse getLastMessageIdResponse = 30;
 
+        optional CommandActiveConsumerChange active_consumer_change = 31;
 
 }
diff --git 
a/pulsar-common/src/test/java/org/apache/pulsar/common/api/PulsarDecoderTest.java
 
b/pulsar-common/src/test/java/org/apache/pulsar/common/api/PulsarDecoderTest.java
new file mode 100644
index 000000000..c57a8c873
--- /dev/null
+++ 
b/pulsar-common/src/test/java/org/apache/pulsar/common/api/PulsarDecoderTest.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.api;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.CALLS_REAL_METHODS;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Unit test of {@link PulsarDecoder}.
+ */
+public class PulsarDecoderTest {
+
+    private PulsarDecoder decoder;
+
+    @BeforeMethod
+    public void setup() {
+        this.decoder = mock(PulsarDecoder.class, CALLS_REAL_METHODS);
+    }
+
+    @Test
+    public void testChannelRead() throws Exception {
+        long consumerId = 1234L;
+        ByteBuf changeBuf = Commands.newActiveConsumerChange(consumerId, true);
+        ByteBuf cmdBuf = changeBuf.slice(4, changeBuf.writerIndex() - 4);
+
+        
doNothing().when(decoder).handleActiveConsumerChange(any(CommandActiveConsumerChange.class));
+        decoder.channelRead(mock(ChannelHandlerContext.class), cmdBuf);
+
+        verify(decoder, times(1))
+            
.handleActiveConsumerChange(any(CommandActiveConsumerChange.class));
+    }
+
+
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to