This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit e80d8a05b553cd3291b5f8fac7785cd485f108fb
Author: Zixuan Liu <[email protected]>
AuthorDate: Mon Jun 13 09:01:16 2022 +0800

    [fix][client] Remove consumer when close consumer command is received 
(#15761)
    
    (cherry picked from commit 5246c8e1cc44b96db6ba684e0ce64914cfd05a61)
---
 .../org/apache/pulsar/client/impl/ClientCnx.java     |  6 ++++--
 .../org/apache/pulsar/client/impl/ClientCnxTest.java | 20 ++++++++++++++++++++
 2 files changed, 24 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index bb686553d78..2bad4e7ef2d 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.client.impl;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
 import static 
org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Queues;
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.Channel;
@@ -114,7 +115,8 @@ public class ClientCnx extends PulsarHandler {
                     .expectedItems(16)
                     .concurrencyLevel(1)
                     .build();
-    private final ConcurrentLongHashMap<ConsumerImpl<?>> consumers =
+    @VisibleForTesting
+    final ConcurrentLongHashMap<ConsumerImpl<?>> consumers =
             ConcurrentLongHashMap.<ConsumerImpl<?>>newBuilder()
                     .expectedItems(16)
                     .concurrencyLevel(1)
@@ -721,7 +723,7 @@ public class ClientCnx extends PulsarHandler {
     protected void handleCloseConsumer(CommandCloseConsumer closeConsumer) {
         log.info("[{}] Broker notification of Closed consumer: {}", 
remoteAddress, closeConsumer.getConsumerId());
         final long consumerId = closeConsumer.getConsumerId();
-        ConsumerImpl<?> consumer = consumers.get(consumerId);
+        ConsumerImpl<?> consumer = consumers.remove(consumerId);
         if (consumer != null) {
             consumer.connectionClosed(this);
         } else {
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
index 558c0bfa13f..a3a00b1b70e 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
@@ -37,6 +37,7 @@ import java.util.concurrent.ThreadFactory;
 import org.apache.pulsar.client.api.PulsarClientException;
 import 
org.apache.pulsar.client.api.PulsarClientException.BrokerMetadataException;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.common.api.proto.CommandCloseConsumer;
 import org.apache.pulsar.common.api.proto.CommandError;
 import org.apache.pulsar.common.api.proto.ServerError;
 import org.apache.pulsar.common.protocol.Commands;
@@ -152,4 +153,23 @@ public class ClientCnxTest {
 
         eventLoop.shutdownGracefully();
     }
+
+    @Test
+    public void testHandleCloseConsumer() {
+        ThreadFactory threadFactory = new 
DefaultThreadFactory("testReceiveErrorAtSendConnectFrameState");
+        EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, false, 
threadFactory);
+        ClientConfigurationData conf = new ClientConfigurationData();
+        ClientCnx cnx = new ClientCnx(conf, eventLoop);
+
+        long consumerId = 1;
+        cnx.registerConsumer(consumerId, mock(ConsumerImpl.class));
+        assertEquals(cnx.consumers.size(), 1);
+
+        CommandCloseConsumer closeConsumer = new CommandCloseConsumer()
+                .setConsumerId(1);
+        cnx.handleCloseConsumer(closeConsumer);
+        assertEquals(cnx.consumers.size(), 0);
+
+        eventLoop.shutdownGracefully();
+    }
 }

Reply via email to