This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ffe7b62  Issue 2077: SubscriptionInitialPosition doesn't work with 
multiple-topic type subscription (#2100)
ffe7b62 is described below

commit ffe7b62fc26cb3d577717439b9cdd4048a36dcf1
Author: Sijie Guo <guosi...@gmail.com>
AuthorDate: Sat Jul 7 13:59:03 2018 -0700

    Issue 2077: SubscriptionInitialPosition doesn't work with multiple-topic 
type subscription (#2100)
    
    *Motivation*
    
    Fixes #2077.
    
    The problem is the consumer configuration is copied to an internal consumer 
configuration for multi-topic style subscription. but we missed copying 
SubscriptionInitialPosition.
    
    *Solution*
    
    Use `clone` for popagating configuration to internal configuration.
    
    Added a test case to cover it.
---
 .../client/impl/MultiTopicsConsumerImpl.java       | 21 +------
 .../tests/integration/semantics/SemanticsTest.java | 68 ++++++++++++++++++++++
 2 files changed, 69 insertions(+), 20 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 5221b70..fc91eed 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -475,28 +475,9 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
     }
 
     private ConsumerConfigurationData<T> getInternalConsumerConfig() {
-        ConsumerConfigurationData<T> internalConsumerConfig = new 
ConsumerConfigurationData<>();
+        ConsumerConfigurationData<T> internalConsumerConfig = conf.clone();
         internalConsumerConfig.setSubscriptionName(subscription);
-        
internalConsumerConfig.setReceiverQueueSize(conf.getReceiverQueueSize());
-        internalConsumerConfig.setSubscriptionType(conf.getSubscriptionType());
         internalConsumerConfig.setConsumerName(consumerName);
-        
internalConsumerConfig.setAcknowledgementsGroupTimeMicros(conf.getAcknowledgementsGroupTimeMicros());
-        internalConsumerConfig.setPriorityLevel(conf.getPriorityLevel());
-        internalConsumerConfig.setProperties(conf.getProperties());
-        internalConsumerConfig.setReadCompacted(conf.isReadCompacted());
-        
internalConsumerConfig.setCryptoFailureAction(conf.getCryptoFailureAction());
-        
-        if (null != conf.getConsumerEventListener()) {
-            
internalConsumerConfig.setConsumerEventListener(conf.getConsumerEventListener());
-        }
-
-        if (conf.getCryptoKeyReader() != null) {
-            
internalConsumerConfig.setCryptoKeyReader(conf.getCryptoKeyReader());
-        }
-        if (conf.getAckTimeoutMillis() != 0) {
-            
internalConsumerConfig.setAckTimeoutMillis(conf.getAckTimeoutMillis());
-        }
-
         return internalConsumerConfig;
     }
 
diff --git 
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/semantics/SemanticsTest.java
 
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/semantics/SemanticsTest.java
index 701b38e..c325e14 100644
--- 
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/semantics/SemanticsTest.java
+++ 
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/semantics/SemanticsTest.java
@@ -21,20 +21,26 @@ package org.apache.pulsar.tests.integration.semantics;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
 
+import java.util.HashMap;
+import java.util.Map;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.BatchMessageIdImpl;
+import org.apache.pulsar.client.impl.TopicMessageIdImpl;
 import org.apache.pulsar.tests.topologies.PulsarClusterTestBase;
 import org.testng.annotations.Test;
 import org.testng.collections.Lists;
@@ -192,6 +198,68 @@ public class SemanticsTest extends PulsarClusterTestBase {
         receiveAndAssertMessage(consumer, 2L, "message-2");
     }
 
+    @Test
+    public void testSubscriptionInitialPositionOneTopic() throws Exception {
+        testSubscriptionInitialPosition(1);
+    }
+
+    @Test
+    public void testSubscriptionInitialPositionTwoTopics() throws Exception {
+        testSubscriptionInitialPosition(2);
+    }
+
+    private void testSubscriptionInitialPosition(int numTopics) throws 
Exception {
+        String topicName = generateTopicName("test-subscription-initial-pos", 
true);
+
+        int numMessages = 10;
+
+        try (PulsarClient client = PulsarClient.builder()
+            .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
+            .build()) {
+
+            for (int t = 0; t < numTopics; t++) {
+                try (Producer<String> producer = 
client.newProducer(Schema.STRING)
+                    .topic(topicName + "-" + t)
+                    .create()) {
+
+                    for (int i = 0; i < numMessages; i++) {
+                        producer.send("sip-topic-" + t + "-message-" + i);
+                    }
+                }
+            }
+
+            String[] topics = new String[numTopics];
+            Map<Integer, AtomicInteger> topicCounters = new 
HashMap<>(numTopics);
+            for (int i = 0; i < numTopics; i++) {
+                topics[i] = topicName + "-" + i;
+                topicCounters.put(i, new AtomicInteger(0));
+            }
+
+            try (Consumer<String> consumer = client.newConsumer(Schema.STRING)
+                .topic(topics)
+                .subscriptionName("my-sub")
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscribe()) {
+
+                for (int i = 0; i < numTopics * numMessages; i++) {
+                    Message<String> m = consumer.receive();
+                    int topicIdx;
+                    if (numTopics > 1) {
+                        String topic = ((TopicMessageIdImpl) 
m.getMessageId()).getTopicName();
+
+                        String[] topicParts = StringUtils.split(topic, '-');
+                        topicIdx = 
Integer.parseInt(topicParts[topicParts.length - 1]);
+                    } else {
+                        topicIdx = 0;
+                    }
+                    int topicSeq = 
topicCounters.get(topicIdx).getAndIncrement();
+
+                    assertEquals("sip-topic-" + topicIdx + "-message-" + 
topicSeq, m.getValue());
+                }
+            }
+        }
+    }
+
     @Test(dataProvider = "ServiceUrls")
     public void testBatchProducing(String serviceUrl) throws Exception {
         String topicName = generateTopicName("testbatchproducing", true);

Reply via email to