This is an automated email from the ASF dual-hosted git repository. xyz pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 48923ce1811ea4bbba1d26798b036c6c052d25f1 Author: Zixuan Liu <[email protected]> AuthorDate: Mon Jun 13 15:16:39 2022 +0800 [fix][client] Remove producer when close producer command is received (#16028) (cherry picked from commit 5ef895af7d8dec851167e56cdf3e8bec11080f8d) --- .../org/apache/pulsar/client/impl/ClientCnx.java | 5 +++-- .../apache/pulsar/client/impl/ClientCnxTest.java | 24 +++++++++++++++++++--- 2 files changed, 24 insertions(+), 5 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java index 2bad4e7ef2d..750c42a620b 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java @@ -110,7 +110,8 @@ public class ClientCnx extends PulsarHandler { // LookupRequests that waiting in client side. private final Queue<Pair<Long, Pair<ByteBuf, TimedCompletableFuture<LookupDataResult>>>> waitingLookupRequests; - private final ConcurrentLongHashMap<ProducerImpl<?>> producers = + @VisibleForTesting + final ConcurrentLongHashMap<ProducerImpl<?>> producers = ConcurrentLongHashMap.<ProducerImpl<?>>newBuilder() .expectedItems(16) .concurrencyLevel(1) @@ -711,7 +712,7 @@ public class ClientCnx extends PulsarHandler { protected void handleCloseProducer(CommandCloseProducer closeProducer) { log.info("[{}] Broker notification of Closed producer: {}", remoteAddress, closeProducer.getProducerId()); final long producerId = closeProducer.getProducerId(); - ProducerImpl<?> producer = producers.get(producerId); + ProducerImpl<?> producer = producers.remove(producerId); if (producer != null) { producer.connectionClosed(this); } else { diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java index a3a00b1b70e..6ce4afecd02 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java @@ -38,6 +38,7 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.PulsarClientException.BrokerMetadataException; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.common.api.proto.CommandCloseConsumer; +import org.apache.pulsar.common.api.proto.CommandCloseProducer; import org.apache.pulsar.common.api.proto.CommandError; import org.apache.pulsar.common.api.proto.ServerError; import org.apache.pulsar.common.protocol.Commands; @@ -156,7 +157,7 @@ public class ClientCnxTest { @Test public void testHandleCloseConsumer() { - ThreadFactory threadFactory = new DefaultThreadFactory("testReceiveErrorAtSendConnectFrameState"); + ThreadFactory threadFactory = new DefaultThreadFactory("testHandleCloseConsumer"); EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, false, threadFactory); ClientConfigurationData conf = new ClientConfigurationData(); ClientCnx cnx = new ClientCnx(conf, eventLoop); @@ -165,11 +166,28 @@ public class ClientCnxTest { cnx.registerConsumer(consumerId, mock(ConsumerImpl.class)); assertEquals(cnx.consumers.size(), 1); - CommandCloseConsumer closeConsumer = new CommandCloseConsumer() - .setConsumerId(1); + CommandCloseConsumer closeConsumer = new CommandCloseConsumer().setConsumerId(consumerId); cnx.handleCloseConsumer(closeConsumer); assertEquals(cnx.consumers.size(), 0); eventLoop.shutdownGracefully(); } + + @Test + public void testHandleCloseProducer() { + ThreadFactory threadFactory = new DefaultThreadFactory("testHandleCloseProducer"); + EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, false, threadFactory); + ClientConfigurationData conf = new ClientConfigurationData(); + ClientCnx cnx = new ClientCnx(conf, eventLoop); + + long producerId = 1; + cnx.registerProducer(producerId, mock(ProducerImpl.class)); + assertEquals(cnx.producers.size(), 1); + + CommandCloseProducer closeProducerCmd = new CommandCloseProducer().setProducerId(producerId); + cnx.handleCloseProducer(closeProducerCmd); + assertEquals(cnx.producers.size(), 0); + + eventLoop.shutdownGracefully(); + } }
