This is an automated email from the ASF dual-hosted git repository. xyz pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit e80d8a05b553cd3291b5f8fac7785cd485f108fb Author: Zixuan Liu <[email protected]> AuthorDate: Mon Jun 13 09:01:16 2022 +0800 [fix][client] Remove consumer when close consumer command is received (#15761) (cherry picked from commit 5246c8e1cc44b96db6ba684e0ce64914cfd05a61) --- .../org/apache/pulsar/client/impl/ClientCnx.java | 6 ++++-- .../org/apache/pulsar/client/impl/ClientCnxTest.java | 20 ++++++++++++++++++++ 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java index bb686553d78..2bad4e7ef2d 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java @@ -21,6 +21,7 @@ package org.apache.pulsar.client.impl; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Queues; import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; @@ -114,7 +115,8 @@ public class ClientCnx extends PulsarHandler { .expectedItems(16) .concurrencyLevel(1) .build(); - private final ConcurrentLongHashMap<ConsumerImpl<?>> consumers = + @VisibleForTesting + final ConcurrentLongHashMap<ConsumerImpl<?>> consumers = ConcurrentLongHashMap.<ConsumerImpl<?>>newBuilder() .expectedItems(16) .concurrencyLevel(1) @@ -721,7 +723,7 @@ public class ClientCnx extends PulsarHandler { protected void handleCloseConsumer(CommandCloseConsumer closeConsumer) { log.info("[{}] Broker notification of Closed consumer: {}", remoteAddress, closeConsumer.getConsumerId()); final long consumerId = closeConsumer.getConsumerId(); - ConsumerImpl<?> consumer = consumers.get(consumerId); + ConsumerImpl<?> consumer = consumers.remove(consumerId); if (consumer != null) { consumer.connectionClosed(this); } else { diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java index 558c0bfa13f..a3a00b1b70e 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java @@ -37,6 +37,7 @@ import java.util.concurrent.ThreadFactory; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.PulsarClientException.BrokerMetadataException; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.common.api.proto.CommandCloseConsumer; import org.apache.pulsar.common.api.proto.CommandError; import org.apache.pulsar.common.api.proto.ServerError; import org.apache.pulsar.common.protocol.Commands; @@ -152,4 +153,23 @@ public class ClientCnxTest { eventLoop.shutdownGracefully(); } + + @Test + public void testHandleCloseConsumer() { + ThreadFactory threadFactory = new DefaultThreadFactory("testReceiveErrorAtSendConnectFrameState"); + EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, false, threadFactory); + ClientConfigurationData conf = new ClientConfigurationData(); + ClientCnx cnx = new ClientCnx(conf, eventLoop); + + long consumerId = 1; + cnx.registerConsumer(consumerId, mock(ConsumerImpl.class)); + assertEquals(cnx.consumers.size(), 1); + + CommandCloseConsumer closeConsumer = new CommandCloseConsumer() + .setConsumerId(1); + cnx.handleCloseConsumer(closeConsumer); + assertEquals(cnx.consumers.size(), 0); + + eventLoop.shutdownGracefully(); + } }
