This is an automated email from the ASF dual-hosted git repository. zhaocong pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 495c9176ae27562a4e01647ed8c19c2950cf26fc Author: LinChen <[email protected]> AuthorDate: Tue Dec 6 09:54:03 2022 +0800 [fix][client] For exclusive subscriptions, if two consumers are created repeatedly, the second consumer will block (#18633) Co-authored-by: lordcheng10 <[email protected]> (cherry picked from commit 0029deb3ae9d0af81a265370d2a0ddffb0eb5381) --- .../pulsar/client/impl/TopicsConsumerImplTest.java | 30 ++++++++++++++++++++++ .../client/impl/MultiTopicsConsumerImpl.java | 11 ++++++-- 2 files changed, 39 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java index cc153b249de..06c0535e3a4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java @@ -65,6 +65,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import java.util.stream.IntStream; import static org.testng.Assert.assertEquals; @@ -481,6 +482,35 @@ public class TopicsConsumerImplTest extends ProducerConsumerBase { }).get(); } + @Test(timeOut = 30000) + public void testExclusiveSubscribe() throws Exception { + final String topicName = "persistent://tenant1/namespace1/testTopicNameValid"; + TenantInfoImpl tenantInfo = createDefaultTenantInfo(); + admin.tenants().createTenant("tenant1", tenantInfo); + admin.namespaces().createNamespace("tenant1/namespace1"); + admin.topics().createPartitionedTopic(topicName, 3); + + Consumer<byte[]> consumer1 = pulsarClient.newConsumer() + .topic(topicName) + .subscriptionName("subscriptionName") + .subscriptionType(SubscriptionType.Exclusive) + .subscribe(); + + try { + pulsarClient.newConsumer() + .topics(IntStream.range(0, 3).mapToObj(i -> topicName + "-partition-" + i) + .collect(Collectors.toList())) + .subscriptionName("subscriptionName") + .subscriptionType(SubscriptionType.Exclusive) + .subscribe(); + fail("should fail"); + } catch (PulsarClientException e) { + String errorLog = e.getMessage(); + assertTrue(errorLog.contains("Exclusive consumer is already connected")); + } + consumer1.close(); + } + @Test public void testSubscribeUnsubscribeSingleTopic() throws Exception { String key = "TopicsConsumerSubscribeUnsubscribeSingleTopicTest"; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index d0d88b068aa..99ace67a4d3 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -1065,7 +1065,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { log.warn("[{}] Failed to subscribe for topic [{}] in topics consumer {}", topic, topicName, error.getMessage()); client.externalExecutorProvider().getExecutor().submit(() -> { AtomicInteger toCloseNum = new AtomicInteger(0); - consumers.values().stream().filter(consumer1 -> { + List<ConsumerImpl> filterConsumers = consumers.values().stream().filter(consumer1 -> { String consumerTopicName = consumer1.getTopic(); if (TopicName.get(consumerTopicName).getPartitionedTopicName().equals(TopicName.get(topicName).getPartitionedTopicName())) { toCloseNum.incrementAndGet(); @@ -1073,7 +1073,14 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { } else { return false; } - }).collect(Collectors.toList()).forEach(consumer2 -> { + }).collect(Collectors.toList()); + + if (filterConsumers.isEmpty()) { + subscribeFuture.completeExceptionally(error); + return; + } + + filterConsumers.forEach(consumer2 -> { consumer2.closeAsync().whenComplete((r, ex) -> { consumer2.subscribeFuture().completeExceptionally(error); allTopicPartitionsNumber.decrementAndGet();
