This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 0edb8a93470 [fix] [client] Unclear error message when creating a 
consumer with two same topics (#22255)
0edb8a93470 is described below

commit 0edb8a934704ede1cc134983a84016e611ac8cec
Author: fengyubiao <yubiao.f...@streamnative.io>
AuthorDate: Tue Mar 19 17:23:13 2024 +0800

    [fix] [client] Unclear error message when creating a consumer with two same 
topics (#22255)
---
 .../pulsar/client/api/MultiTopicsConsumerTest.java | 27 ++++++++++++++++++++++
 .../client/impl/MultiTopicsConsumerImpl.java       | 21 +++++++++--------
 2 files changed, 38 insertions(+), 10 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java
index bb8bab29ad9..7a12acd47ed 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java
@@ -26,9 +26,11 @@ import static org.mockito.Mockito.verify;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
 
 import com.google.common.collect.Lists;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -42,6 +44,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import lombok.Cleanup;
+import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.impl.ClientBuilderImpl;
 import org.apache.pulsar.client.impl.ConsumerImpl;
@@ -50,6 +53,7 @@ import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.awaitility.Awaitility;
 import org.mockito.AdditionalAnswers;
 import org.mockito.Mockito;
@@ -372,6 +376,29 @@ public class MultiTopicsConsumerTest extends 
ProducerConsumerBase {
         assertTrue(consumer.isConnected());
     }
 
+    @Test
+    public void testSameTopics() throws Exception {
+        final String topic1 = 
BrokerTestUtil.newUniqueName("public/default/tp");
+        final String topic2 = "persistent://" + topic1;
+        admin.topics().createNonPartitionedTopic(topic2);
+        // Create consumer with two same topics.
+        try {
+            
pulsarClient.newConsumer(Schema.INT32).topics(Arrays.asList(topic1, topic2))
+                    .subscriptionName("s1").subscribe();
+            fail("Do not allow use two same topics.");
+        } catch (Exception e) {
+            if (e instanceof PulsarClientException && e.getCause() != null) {
+                e = (Exception) e.getCause();
+            }
+            Throwable unwrapEx = FutureUtil.unwrapCompletionException(e);
+            assertTrue(unwrapEx instanceof IllegalArgumentException);
+            assertTrue(e.getMessage().contains( "Subscription topics include 
duplicate items"
+                    + " or invalid names"));
+        }
+        // cleanup.
+        admin.topics().delete(topic2);
+    }
+
     @Test(timeOut = 30000)
     public void testSubscriptionNotFound() throws PulsarAdminException, 
PulsarClientException {
         final var topic1 = newTopicName();
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index d18af475d61..20fd03d6a28 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -165,7 +165,8 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
             return;
         }
 
-        checkArgument(topicNamesValid(conf.getTopicNames()), "Topics is 
invalid.");
+        checkArgument(topicNamesValid(conf.getTopicNames()), "Subscription 
topics include duplicate items"
+                + " or invalid names.");
 
         List<CompletableFuture<Void>> futures = conf.getTopicNames().stream()
                 .map(t -> subscribeAsync(t, createTopicIfDoesNotExist))
@@ -202,21 +203,21 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
         checkState(topics != null && topics.size() >= 1,
             "topics should contain more than 1 topic");
 
-        Optional<String> result = topics.stream()
-                .filter(topic -> !TopicName.isValid(topic))
-                .findFirst();
+        Set<TopicName> topicNames = new HashSet<>();
 
-        if (result.isPresent()) {
-            log.warn("Received invalid topic name: {}", result.get());
-            return false;
+        for (String topic : topics) {
+            if (!TopicName.isValid(topic)) {
+                log.warn("Received invalid topic name: {}", topic);
+                return false;
+            }
+            topicNames.add(TopicName.get(topic));
         }
 
         // check topic names are unique
-        HashSet<String> set = new HashSet<>(topics);
-        if (set.size() == topics.size()) {
+        if (topicNames.size() == topics.size()) {
             return true;
         } else {
-            log.warn("Topic names not unique. unique/all : {}/{}", set.size(), 
topics.size());
+            log.warn("Topic names not unique. unique/all : {}/{}", 
topicNames.size(), topics.size());
             return false;
         }
     }

Reply via email to