This is an automated email from the ASF dual-hosted git repository.
linlin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 1cd8658 Fix ensure single-topic consumer can be closed (#9849)
1cd8658 is described below
commit 1cd8658fbd07267e567a77636aa4fa2c5363eca1
Author: Binbin Guo <[email protected]>
AuthorDate: Thu Mar 11 10:47:18 2021 +0800
Fix ensure single-topic consumer can be closed (#9849)
It is found that there is a logic problem in creating a consumer, which may
cause the consumer to be unable to close
---
.../pulsar/client/impl/PulsarClientImpl.java | 2 +-
.../pulsar/client/impl/PulsarClientImplTest.java | 111 ++++++++++++++++++++-
2 files changed, 107 insertions(+), 6 deletions(-)
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index b0e1409..0790e75 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -429,8 +429,8 @@ public class PulsarClientImpl implements PulsarClient {
consumer = ConsumerImpl.newConsumerImpl(PulsarClientImpl.this,
topic, conf, externalExecutorProvider, partitionIndex, false,
consumerSubscribedFuture,null, schema, interceptors,
true /* createTopicIfDoesNotExist */);
- consumers.add(consumer);
}
+ consumers.add(consumer);
}).exceptionally(ex -> {
log.warn("[{}] Failed to get partitioned topic metadata", topic,
ex);
consumerSubscribedFuture.completeExceptionally(ex);
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java
index 2d114d0..0133ec3 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java
@@ -18,31 +18,132 @@
*/
package org.apache.pulsar.client.impl;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotSame;
+import static org.testng.Assert.assertSame;
import static org.testng.Assert.assertTrue;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.DefaultThreadFactory;
+
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadFactory;
+import java.util.regex.Pattern;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
+import org.mockito.internal.util.reflection.FieldSetter;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
/**
* PulsarClientImpl unit tests.
*/
public class PulsarClientImplTest {
+ private PulsarClientImpl clientImpl;
+ private EventLoopGroup eventLoopGroup;
- @Test
- public void testIsClosed() throws Exception {
+ @BeforeMethod
+ public void setup() throws PulsarClientException {
ClientConfigurationData conf = new ClientConfigurationData();
conf.setServiceUrl("pulsar://localhost:6650");
ThreadFactory threadFactory = new
DefaultThreadFactory("client-test-stats", Thread.currentThread().isDaemon());
- EventLoopGroup eventLoopGroup =
EventLoopUtil.newEventLoopGroup(conf.getNumIoThreads(), threadFactory);
- PulsarClientImpl clientImpl = new PulsarClientImpl(conf,
eventLoopGroup);
+ eventLoopGroup =
EventLoopUtil.newEventLoopGroup(conf.getNumIoThreads(), threadFactory);
+ clientImpl = new PulsarClientImpl(conf, eventLoopGroup);
+ }
+
+ @AfterMethod
+ public void teardown() throws Exception {
+ clientImpl.close();
+ eventLoopGroup.shutdownGracefully().get();
+ }
+
+ @Test
+ public void testIsClosed() throws Exception {
assertFalse(clientImpl.isClosed());
clientImpl.close();
assertTrue(clientImpl.isClosed());
- eventLoopGroup.shutdownGracefully().get();
+ }
+
+ @Test
+ public void testConsumerIsClosed() throws Exception {
+ // mock client connection
+ LookupService lookup = mock(LookupService.class);
+ when(lookup.getTopicsUnderNamespace(
+ any(NamespaceName.class),
+ any(CommandGetTopicsOfNamespace.Mode.class)))
+
.thenReturn(CompletableFuture.completedFuture(Collections.emptyList()));
+ when(lookup.getPartitionedTopicMetadata(any(TopicName.class)))
+ .thenReturn(CompletableFuture.completedFuture(new
PartitionedTopicMetadata()));
+ when(lookup.getBroker(any(TopicName.class)))
+ .thenReturn(CompletableFuture.completedFuture(
+ Pair.of(mock(InetSocketAddress.class),
mock(InetSocketAddress.class))));
+ ConnectionPool pool = mock(ConnectionPool.class);
+ ClientCnx cnx = mock(ClientCnx.class);
+ ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
+ Channel channel = mock(Channel.class);
+ when(channel.remoteAddress()).thenReturn(mock(SocketAddress.class));
+ when(ctx.channel()).thenReturn(channel);
+ when(ctx.writeAndFlush(any(),
any(ChannelPromise.class))).thenReturn(mock(ChannelFuture.class));
+ when(ctx.voidPromise()).thenReturn(mock(ChannelPromise.class));
+ when(cnx.channel()).thenReturn(channel);
+ when(cnx.ctx()).thenReturn(ctx);
+ when(cnx.sendRequestWithId(any(ByteBuf.class), anyLong()))
+
.thenReturn(CompletableFuture.completedFuture(mock(ProducerResponse.class)));
+ when(pool.getConnection(any(InetSocketAddress.class),
any(InetSocketAddress.class)))
+ .thenReturn(CompletableFuture.completedFuture(cnx));
+ FieldSetter.setField(clientImpl,
clientImpl.getClass().getDeclaredField("cnxPool"), pool);
+ FieldSetter.setField(clientImpl,
clientImpl.getClass().getDeclaredField("lookup"), lookup);
+
+ List<ConsumerBase<byte[]>> consumers = new ArrayList<>();
+ /**
+ * {@link
org.apache.pulsar.client.impl.PulsarClientImpl#patternTopicSubscribeAsync}
+ */
+ ConsumerConfigurationData<byte[]> consumerConf0 = new
ConsumerConfigurationData<>();
+ consumerConf0.setSubscriptionName("test-subscription0");
+ consumerConf0.setTopicsPattern(Pattern.compile("test-topic"));
+ consumers.add((ConsumerBase)
clientImpl.subscribeAsync(consumerConf0).get());
+ /**
+ * {@link
org.apache.pulsar.client.impl.PulsarClientImpl#singleTopicSubscribeAsync}
+ */
+ ConsumerConfigurationData<byte[]> consumerConf1 = new
ConsumerConfigurationData<>();
+ consumerConf1.setSubscriptionName("test-subscription1");
+ consumerConf1.setTopicNames(Collections.singleton("test-topic"));
+ consumers.add((ConsumerBase)
clientImpl.subscribeAsync(consumerConf1).get());
+ /**
+ * {@link
org.apache.pulsar.client.impl.PulsarClientImpl#multiTopicSubscribeAsync}
+ */
+ ConsumerConfigurationData<byte[]> consumerConf2 = new
ConsumerConfigurationData<>();
+ consumerConf2.setSubscriptionName("test-subscription2");
+ consumers.add((ConsumerBase)
clientImpl.subscribeAsync(consumerConf2).get());
+
+ consumers.forEach(consumer ->
+ assertNotSame(consumer.getState(), HandlerState.State.Closed));
+ clientImpl.close();
+ consumers.forEach(consumer ->
+ assertSame(consumer.getState(), HandlerState.State.Closed));
}
}