This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.2 by this push: new 0edb8a93470 [fix] [client] Unclear error message when creating a consumer with two same topics (#22255) 0edb8a93470 is described below commit 0edb8a934704ede1cc134983a84016e611ac8cec Author: fengyubiao <yubiao.f...@streamnative.io> AuthorDate: Tue Mar 19 17:23:13 2024 +0800 [fix] [client] Unclear error message when creating a consumer with two same topics (#22255) --- .../pulsar/client/api/MultiTopicsConsumerTest.java | 27 ++++++++++++++++++++++ .../client/impl/MultiTopicsConsumerImpl.java | 21 +++++++++-------- 2 files changed, 38 insertions(+), 10 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java index bb8bab29ad9..7a12acd47ed 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java @@ -26,9 +26,11 @@ import static org.mockito.Mockito.verify; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; import com.google.common.collect.Lists; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -42,6 +44,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import java.util.stream.IntStream; import lombok.Cleanup; +import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.impl.ClientBuilderImpl; import org.apache.pulsar.client.impl.ConsumerImpl; @@ -50,6 +53,7 @@ import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.util.FutureUtil; import org.awaitility.Awaitility; import org.mockito.AdditionalAnswers; import org.mockito.Mockito; @@ -372,6 +376,29 @@ public class MultiTopicsConsumerTest extends ProducerConsumerBase { assertTrue(consumer.isConnected()); } + @Test + public void testSameTopics() throws Exception { + final String topic1 = BrokerTestUtil.newUniqueName("public/default/tp"); + final String topic2 = "persistent://" + topic1; + admin.topics().createNonPartitionedTopic(topic2); + // Create consumer with two same topics. + try { + pulsarClient.newConsumer(Schema.INT32).topics(Arrays.asList(topic1, topic2)) + .subscriptionName("s1").subscribe(); + fail("Do not allow use two same topics."); + } catch (Exception e) { + if (e instanceof PulsarClientException && e.getCause() != null) { + e = (Exception) e.getCause(); + } + Throwable unwrapEx = FutureUtil.unwrapCompletionException(e); + assertTrue(unwrapEx instanceof IllegalArgumentException); + assertTrue(e.getMessage().contains( "Subscription topics include duplicate items" + + " or invalid names")); + } + // cleanup. + admin.topics().delete(topic2); + } + @Test(timeOut = 30000) public void testSubscriptionNotFound() throws PulsarAdminException, PulsarClientException { final var topic1 = newTopicName(); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index d18af475d61..20fd03d6a28 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -165,7 +165,8 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { return; } - checkArgument(topicNamesValid(conf.getTopicNames()), "Topics is invalid."); + checkArgument(topicNamesValid(conf.getTopicNames()), "Subscription topics include duplicate items" + + " or invalid names."); List<CompletableFuture<Void>> futures = conf.getTopicNames().stream() .map(t -> subscribeAsync(t, createTopicIfDoesNotExist)) @@ -202,21 +203,21 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { checkState(topics != null && topics.size() >= 1, "topics should contain more than 1 topic"); - Optional<String> result = topics.stream() - .filter(topic -> !TopicName.isValid(topic)) - .findFirst(); + Set<TopicName> topicNames = new HashSet<>(); - if (result.isPresent()) { - log.warn("Received invalid topic name: {}", result.get()); - return false; + for (String topic : topics) { + if (!TopicName.isValid(topic)) { + log.warn("Received invalid topic name: {}", topic); + return false; + } + topicNames.add(TopicName.get(topic)); } // check topic names are unique - HashSet<String> set = new HashSet<>(topics); - if (set.size() == topics.size()) { + if (topicNames.size() == topics.size()) { return true; } else { - log.warn("Topic names not unique. unique/all : {}/{}", set.size(), topics.size()); + log.warn("Topic names not unique. unique/all : {}/{}", topicNames.size(), topics.size()); return false; } }