This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new bc24013  [client] Set actual topic name to partitioned consumer (#4064)
bc24013 is described below

commit bc24013d7f09a9a1ff9aa6e7d81ea24ede400ef5
Author: massakam <[email protected]>
AuthorDate: Thu Apr 18 07:09:50 2019 +0900

    [client] Set actual topic name to partitioned consumer (#4064)
    
    * Set actual topic name to partitioned consumer
    
    * Change dummy topic name prefix
---
 .../client/api/PartitionedProducerConsumerTest.java |  1 +
 .../client/impl/PatternTopicsConsumerImplTest.java  |  1 +
 .../pulsar/client/impl/TopicsConsumerImplTest.java  |  1 +
 .../pulsar/client/impl/MultiTopicsConsumerImpl.java | 21 ++++++++++++++++-----
 4 files changed, 19 insertions(+), 5 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
index 55952b7..0a88963 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
@@ -87,6 +87,7 @@ public class PartitionedProducerConsumerTest extends 
ProducerConsumerBase {
 
         Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName.toString())
                 .subscriptionName("my-partitioned-subscriber").subscribe();
+        assertEquals(consumer.getTopic(), topicName.toString());
 
         for (int i = 0; i < 10; i++) {
             String message = "my-message-" + i;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
index 9a09ef7..4228fdb 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
@@ -172,6 +172,7 @@ public class PatternTopicsConsumerImplTest extends 
ProducerConsumerBase {
             .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
             .receiverQueueSize(4)
             .subscribe();
+        
assertTrue(consumer.getTopic().startsWith(PatternMultiTopicsConsumerImpl.DUMMY_TOPIC_NAME_PREFIX));
 
         // 4. verify consumer get methods, to get right number of partitions 
and topics.
         assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPattern());
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
index 440ea32..6ff41b2 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
@@ -128,6 +128,7 @@ public class TopicsConsumerImplTest extends 
ProducerConsumerBase {
             .receiverQueueSize(4)
             .subscribe();
         assertTrue(consumer instanceof MultiTopicsConsumerImpl);
+        
assertTrue(consumer.getTopic().startsWith(MultiTopicsConsumerImpl.DUMMY_TOPIC_NAME_PREFIX));
 
         List<String> topics = ((MultiTopicsConsumerImpl<byte[]>) 
consumer).getPartitionedTopics();
         List<ConsumerImpl<byte[]>> consumers = ((MultiTopicsConsumerImpl) 
consumer).getConsumers();
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 69dd385..8ebf1b9 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -64,6 +64,8 @@ import org.slf4j.LoggerFactory;
 
 public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
 
+    public static final String DUMMY_TOPIC_NAME_PREFIX = 
"MultiTopicsConsumer-";
+
     // All topics should be in same namespace
     protected NamespaceName namespaceName;
 
@@ -94,10 +96,18 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
     private final UnAckedMessageTracker unAckedMessageTracker;
     private final ConsumerConfigurationData<T> internalConfig;
 
-    MultiTopicsConsumerImpl(PulsarClientImpl client, 
ConsumerConfigurationData<T> conf, ExecutorService listenerExecutor,
-                            CompletableFuture<Consumer<T>> subscribeFuture, 
Schema<T> schema, ConsumerInterceptors<T> interceptors) {
-        super(client, "TopicsConsumerFakeTopicName" + 
ConsumerName.generateRandomName(), conf,
-                Math.max(2, conf.getReceiverQueueSize()), listenerExecutor, 
subscribeFuture, schema, interceptors);
+    MultiTopicsConsumerImpl(PulsarClientImpl client, 
ConsumerConfigurationData<T> conf,
+            ExecutorService listenerExecutor, CompletableFuture<Consumer<T>> 
subscribeFuture, Schema<T> schema,
+            ConsumerInterceptors<T> interceptors) {
+        this(client, DUMMY_TOPIC_NAME_PREFIX + 
ConsumerName.generateRandomName(), conf, listenerExecutor,
+                subscribeFuture, schema, interceptors);
+    }
+
+    MultiTopicsConsumerImpl(PulsarClientImpl client, String singleTopic, 
ConsumerConfigurationData<T> conf,
+            ExecutorService listenerExecutor, CompletableFuture<Consumer<T>> 
subscribeFuture, Schema<T> schema,
+            ConsumerInterceptors<T> interceptors) {
+        super(client, singleTopic, conf, Math.max(2, 
conf.getReceiverQueueSize()), listenerExecutor, subscribeFuture,
+                schema, interceptors);
 
         checkArgument(conf.getReceiverQueueSize() > 0,
             "Receiver queue size needs to be greater than 0 for Topics 
Consumer");
@@ -693,7 +703,8 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
         cloneConf.getTopicNames().remove(topicName);
 
         CompletableFuture<Consumer> future = new CompletableFuture<>();
-        MultiTopicsConsumerImpl consumer = new MultiTopicsConsumerImpl(client, 
cloneConf, listenerExecutor, future, schema, interceptors);
+        MultiTopicsConsumerImpl consumer = new MultiTopicsConsumerImpl(client, 
topicName, cloneConf, listenerExecutor,
+                future, schema, interceptors);
 
         future.thenCompose(c -> 
((MultiTopicsConsumerImpl)c).subscribeAsync(topicName, numPartitions))
             .thenRun(()-> subscribeFuture.complete(consumer))

Reply via email to