This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new 0edb8a93470 [fix] [client] Unclear error message when creating a
consumer with two same topics (#22255)
0edb8a93470 is described below
commit 0edb8a934704ede1cc134983a84016e611ac8cec
Author: fengyubiao <[email protected]>
AuthorDate: Tue Mar 19 17:23:13 2024 +0800
[fix] [client] Unclear error message when creating a consumer with two same
topics (#22255)
---
.../pulsar/client/api/MultiTopicsConsumerTest.java | 27 ++++++++++++++++++++++
.../client/impl/MultiTopicsConsumerImpl.java | 21 +++++++++--------
2 files changed, 38 insertions(+), 10 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java
index bb8bab29ad9..7a12acd47ed 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java
@@ -26,9 +26,11 @@ import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
import com.google.common.collect.Lists;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -42,6 +44,7 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import lombok.Cleanup;
+import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.impl.ClientBuilderImpl;
import org.apache.pulsar.client.impl.ConsumerImpl;
@@ -50,6 +53,7 @@ import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
import org.mockito.AdditionalAnswers;
import org.mockito.Mockito;
@@ -372,6 +376,29 @@ public class MultiTopicsConsumerTest extends
ProducerConsumerBase {
assertTrue(consumer.isConnected());
}
+ @Test
+ public void testSameTopics() throws Exception {
+ final String topic1 =
BrokerTestUtil.newUniqueName("public/default/tp");
+ final String topic2 = "persistent://" + topic1;
+ admin.topics().createNonPartitionedTopic(topic2);
+ // Create consumer with two same topics.
+ try {
+
pulsarClient.newConsumer(Schema.INT32).topics(Arrays.asList(topic1, topic2))
+ .subscriptionName("s1").subscribe();
+ fail("Do not allow use two same topics.");
+ } catch (Exception e) {
+ if (e instanceof PulsarClientException && e.getCause() != null) {
+ e = (Exception) e.getCause();
+ }
+ Throwable unwrapEx = FutureUtil.unwrapCompletionException(e);
+ assertTrue(unwrapEx instanceof IllegalArgumentException);
+ assertTrue(e.getMessage().contains( "Subscription topics include
duplicate items"
+ + " or invalid names"));
+ }
+ // cleanup.
+ admin.topics().delete(topic2);
+ }
+
@Test(timeOut = 30000)
public void testSubscriptionNotFound() throws PulsarAdminException,
PulsarClientException {
final var topic1 = newTopicName();
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index d18af475d61..20fd03d6a28 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -165,7 +165,8 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
return;
}
- checkArgument(topicNamesValid(conf.getTopicNames()), "Topics is
invalid.");
+ checkArgument(topicNamesValid(conf.getTopicNames()), "Subscription
topics include duplicate items"
+ + " or invalid names.");
List<CompletableFuture<Void>> futures = conf.getTopicNames().stream()
.map(t -> subscribeAsync(t, createTopicIfDoesNotExist))
@@ -202,21 +203,21 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
checkState(topics != null && topics.size() >= 1,
"topics should contain more than 1 topic");
- Optional<String> result = topics.stream()
- .filter(topic -> !TopicName.isValid(topic))
- .findFirst();
+ Set<TopicName> topicNames = new HashSet<>();
- if (result.isPresent()) {
- log.warn("Received invalid topic name: {}", result.get());
- return false;
+ for (String topic : topics) {
+ if (!TopicName.isValid(topic)) {
+ log.warn("Received invalid topic name: {}", topic);
+ return false;
+ }
+ topicNames.add(TopicName.get(topic));
}
// check topic names are unique
- HashSet<String> set = new HashSet<>(topics);
- if (set.size() == topics.size()) {
+ if (topicNames.size() == topics.size()) {
return true;
} else {
- log.warn("Topic names not unique. unique/all : {}/{}", set.size(),
topics.size());
+ log.warn("Topic names not unique. unique/all : {}/{}",
topicNames.size(), topics.size());
return false;
}
}