This is an automated email from the ASF dual-hosted git repository.

linlin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1cd8658  Fix ensure single-topic consumer can be closed (#9849)
1cd8658 is described below

commit 1cd8658fbd07267e567a77636aa4fa2c5363eca1
Author: Binbin Guo <[email protected]>
AuthorDate: Thu Mar 11 10:47:18 2021 +0800

    Fix ensure single-topic consumer can be closed (#9849)
    
    It is found that there is a logic problem in creating a consumer, which may 
cause the consumer to be unable to close
---
 .../pulsar/client/impl/PulsarClientImpl.java       |   2 +-
 .../pulsar/client/impl/PulsarClientImplTest.java   | 111 ++++++++++++++++++++-
 2 files changed, 107 insertions(+), 6 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index b0e1409..0790e75 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -429,8 +429,8 @@ public class PulsarClientImpl implements PulsarClient {
                 consumer = ConsumerImpl.newConsumerImpl(PulsarClientImpl.this, 
topic, conf, externalExecutorProvider, partitionIndex, false,
                         consumerSubscribedFuture,null, schema, interceptors,
                         true /* createTopicIfDoesNotExist */);
-                consumers.add(consumer);
             }
+            consumers.add(consumer);
         }).exceptionally(ex -> {
             log.warn("[{}] Failed to get partitioned topic metadata", topic, 
ex);
             consumerSubscribedFuture.completeExceptionally(ex);
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java
index 2d114d0..0133ec3 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java
@@ -18,31 +18,132 @@
  */
 package org.apache.pulsar.client.impl;
 
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotSame;
+import static org.testng.Assert.assertSame;
 import static org.testng.Assert.assertTrue;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
 import io.netty.channel.EventLoopGroup;
 import io.netty.util.concurrent.DefaultThreadFactory;
+
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ThreadFactory;
+import java.util.regex.Pattern;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.util.netty.EventLoopUtil;
+import org.mockito.internal.util.reflection.FieldSetter;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 /**
  * PulsarClientImpl unit tests.
  */
 public class PulsarClientImplTest {
+    private PulsarClientImpl clientImpl;
+    private EventLoopGroup eventLoopGroup;
 
-    @Test
-    public void testIsClosed() throws Exception {
+    @BeforeMethod
+    public void setup() throws PulsarClientException {
         ClientConfigurationData conf = new ClientConfigurationData();
         conf.setServiceUrl("pulsar://localhost:6650");
         ThreadFactory threadFactory = new 
DefaultThreadFactory("client-test-stats", Thread.currentThread().isDaemon());
-        EventLoopGroup eventLoopGroup = 
EventLoopUtil.newEventLoopGroup(conf.getNumIoThreads(), threadFactory);
-        PulsarClientImpl clientImpl = new PulsarClientImpl(conf, 
eventLoopGroup);
+        eventLoopGroup = 
EventLoopUtil.newEventLoopGroup(conf.getNumIoThreads(), threadFactory);
+        clientImpl = new PulsarClientImpl(conf, eventLoopGroup);
+    }
+
+    @AfterMethod
+    public void teardown() throws Exception {
+        clientImpl.close();
+        eventLoopGroup.shutdownGracefully().get();
+    }
+
+    @Test
+    public void testIsClosed() throws Exception {
         assertFalse(clientImpl.isClosed());
         clientImpl.close();
         assertTrue(clientImpl.isClosed());
-        eventLoopGroup.shutdownGracefully().get();
+    }
+
+    @Test
+    public void testConsumerIsClosed() throws Exception {
+        // mock client connection
+        LookupService lookup = mock(LookupService.class);
+        when(lookup.getTopicsUnderNamespace(
+                any(NamespaceName.class),
+                any(CommandGetTopicsOfNamespace.Mode.class)))
+                
.thenReturn(CompletableFuture.completedFuture(Collections.emptyList()));
+        when(lookup.getPartitionedTopicMetadata(any(TopicName.class)))
+                .thenReturn(CompletableFuture.completedFuture(new 
PartitionedTopicMetadata()));
+        when(lookup.getBroker(any(TopicName.class)))
+                .thenReturn(CompletableFuture.completedFuture(
+                        Pair.of(mock(InetSocketAddress.class), 
mock(InetSocketAddress.class))));
+        ConnectionPool pool = mock(ConnectionPool.class);
+        ClientCnx cnx = mock(ClientCnx.class);
+        ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
+        Channel channel = mock(Channel.class);
+        when(channel.remoteAddress()).thenReturn(mock(SocketAddress.class));
+        when(ctx.channel()).thenReturn(channel);
+        when(ctx.writeAndFlush(any(), 
any(ChannelPromise.class))).thenReturn(mock(ChannelFuture.class));
+        when(ctx.voidPromise()).thenReturn(mock(ChannelPromise.class));
+        when(cnx.channel()).thenReturn(channel);
+        when(cnx.ctx()).thenReturn(ctx);
+        when(cnx.sendRequestWithId(any(ByteBuf.class), anyLong()))
+                
.thenReturn(CompletableFuture.completedFuture(mock(ProducerResponse.class)));
+        when(pool.getConnection(any(InetSocketAddress.class), 
any(InetSocketAddress.class)))
+                .thenReturn(CompletableFuture.completedFuture(cnx));
+        FieldSetter.setField(clientImpl, 
clientImpl.getClass().getDeclaredField("cnxPool"), pool);
+        FieldSetter.setField(clientImpl, 
clientImpl.getClass().getDeclaredField("lookup"), lookup);
+
+        List<ConsumerBase<byte[]>> consumers = new ArrayList<>();
+        /**
+         * {@link 
org.apache.pulsar.client.impl.PulsarClientImpl#patternTopicSubscribeAsync}
+         */
+        ConsumerConfigurationData<byte[]> consumerConf0 = new 
ConsumerConfigurationData<>();
+        consumerConf0.setSubscriptionName("test-subscription0");
+        consumerConf0.setTopicsPattern(Pattern.compile("test-topic"));
+        consumers.add((ConsumerBase) 
clientImpl.subscribeAsync(consumerConf0).get());
+        /**
+         * {@link 
org.apache.pulsar.client.impl.PulsarClientImpl#singleTopicSubscribeAsync}
+         */
+        ConsumerConfigurationData<byte[]> consumerConf1 = new 
ConsumerConfigurationData<>();
+        consumerConf1.setSubscriptionName("test-subscription1");
+        consumerConf1.setTopicNames(Collections.singleton("test-topic"));
+        consumers.add((ConsumerBase) 
clientImpl.subscribeAsync(consumerConf1).get());
+        /**
+         * {@link 
org.apache.pulsar.client.impl.PulsarClientImpl#multiTopicSubscribeAsync}
+         */
+        ConsumerConfigurationData<byte[]> consumerConf2 = new 
ConsumerConfigurationData<>();
+        consumerConf2.setSubscriptionName("test-subscription2");
+        consumers.add((ConsumerBase) 
clientImpl.subscribeAsync(consumerConf2).get());
+
+        consumers.forEach(consumer ->
+                assertNotSame(consumer.getState(), HandlerState.State.Closed));
+        clientImpl.close();
+        consumers.forEach(consumer ->
+                assertSame(consumer.getState(), HandlerState.State.Closed));
     }
 
 }

Reply via email to