This is an automated email from the ASF dual-hosted git repository.

zhaocong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 495c9176ae27562a4e01647ed8c19c2950cf26fc
Author: LinChen <[email protected]>
AuthorDate: Tue Dec 6 09:54:03 2022 +0800

    [fix][client] For exclusive subscriptions, if two consumers are created 
repeatedly, the second consumer will block (#18633)
    
    Co-authored-by: lordcheng10 <[email protected]>
    (cherry picked from commit 0029deb3ae9d0af81a265370d2a0ddffb0eb5381)
---
 .../pulsar/client/impl/TopicsConsumerImplTest.java | 30 ++++++++++++++++++++++
 .../client/impl/MultiTopicsConsumerImpl.java       | 11 ++++++--
 2 files changed, 39 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
index cc153b249de..06c0535e3a4 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
@@ -65,6 +65,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 import static org.testng.Assert.assertEquals;
@@ -481,6 +482,35 @@ public class TopicsConsumerImplTest extends 
ProducerConsumerBase {
         }).get();
     }
 
+    @Test(timeOut = 30000)
+    public void testExclusiveSubscribe() throws Exception {
+        final String topicName = 
"persistent://tenant1/namespace1/testTopicNameValid";
+        TenantInfoImpl tenantInfo = createDefaultTenantInfo();
+        admin.tenants().createTenant("tenant1", tenantInfo);
+        admin.namespaces().createNamespace("tenant1/namespace1");
+        admin.topics().createPartitionedTopic(topicName, 3);
+
+        Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
+                .topic(topicName)
+                .subscriptionName("subscriptionName")
+                .subscriptionType(SubscriptionType.Exclusive)
+                .subscribe();
+
+        try {
+            pulsarClient.newConsumer()
+                    .topics(IntStream.range(0, 3).mapToObj(i -> topicName + 
"-partition-" + i)
+                    .collect(Collectors.toList()))
+                    .subscriptionName("subscriptionName")
+                    .subscriptionType(SubscriptionType.Exclusive)
+                    .subscribe();
+            fail("should fail");
+        } catch (PulsarClientException e) {
+            String errorLog = e.getMessage();
+            assertTrue(errorLog.contains("Exclusive consumer is already 
connected"));
+        }
+        consumer1.close();
+    }
+
     @Test
     public void testSubscribeUnsubscribeSingleTopic() throws Exception {
         String key = "TopicsConsumerSubscribeUnsubscribeSingleTopicTest";
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index d0d88b068aa..99ace67a4d3 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -1065,7 +1065,7 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
         log.warn("[{}] Failed to subscribe for topic [{}] in topics consumer 
{}", topic, topicName, error.getMessage());
         client.externalExecutorProvider().getExecutor().submit(() -> {
             AtomicInteger toCloseNum = new AtomicInteger(0);
-            consumers.values().stream().filter(consumer1 -> {
+            List<ConsumerImpl> filterConsumers = 
consumers.values().stream().filter(consumer1 -> {
                 String consumerTopicName = consumer1.getTopic();
                 if 
(TopicName.get(consumerTopicName).getPartitionedTopicName().equals(TopicName.get(topicName).getPartitionedTopicName()))
 {
                     toCloseNum.incrementAndGet();
@@ -1073,7 +1073,14 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
                 } else {
                     return false;
                 }
-            }).collect(Collectors.toList()).forEach(consumer2 -> {
+            }).collect(Collectors.toList());
+
+            if (filterConsumers.isEmpty()) {
+                subscribeFuture.completeExceptionally(error);
+                return;
+            }
+
+            filterConsumers.forEach(consumer2 -> {
                 consumer2.closeAsync().whenComplete((r, ex) -> {
                     consumer2.subscribeFuture().completeExceptionally(error);
                     allTopicPartitionsNumber.decrementAndGet();

Reply via email to