This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new ffe7b62 Issue 2077: SubscriptionInitialPosition doesn't work with multiple-topic type subscription (#2100) ffe7b62 is described below commit ffe7b62fc26cb3d577717439b9cdd4048a36dcf1 Author: Sijie Guo <guosi...@gmail.com> AuthorDate: Sat Jul 7 13:59:03 2018 -0700 Issue 2077: SubscriptionInitialPosition doesn't work with multiple-topic type subscription (#2100) *Motivation* Fixes #2077. The problem is the consumer configuration is copied to an internal consumer configuration for multi-topic style subscription. but we missed copying SubscriptionInitialPosition. *Solution* Use `clone` for popagating configuration to internal configuration. Added a test case to cover it. --- .../client/impl/MultiTopicsConsumerImpl.java | 21 +------ .../tests/integration/semantics/SemanticsTest.java | 68 ++++++++++++++++++++++ 2 files changed, 69 insertions(+), 20 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index 5221b70..fc91eed 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -475,28 +475,9 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { } private ConsumerConfigurationData<T> getInternalConsumerConfig() { - ConsumerConfigurationData<T> internalConsumerConfig = new ConsumerConfigurationData<>(); + ConsumerConfigurationData<T> internalConsumerConfig = conf.clone(); internalConsumerConfig.setSubscriptionName(subscription); - internalConsumerConfig.setReceiverQueueSize(conf.getReceiverQueueSize()); - internalConsumerConfig.setSubscriptionType(conf.getSubscriptionType()); internalConsumerConfig.setConsumerName(consumerName); - internalConsumerConfig.setAcknowledgementsGroupTimeMicros(conf.getAcknowledgementsGroupTimeMicros()); - internalConsumerConfig.setPriorityLevel(conf.getPriorityLevel()); - internalConsumerConfig.setProperties(conf.getProperties()); - internalConsumerConfig.setReadCompacted(conf.isReadCompacted()); - internalConsumerConfig.setCryptoFailureAction(conf.getCryptoFailureAction()); - - if (null != conf.getConsumerEventListener()) { - internalConsumerConfig.setConsumerEventListener(conf.getConsumerEventListener()); - } - - if (conf.getCryptoKeyReader() != null) { - internalConsumerConfig.setCryptoKeyReader(conf.getCryptoKeyReader()); - } - if (conf.getAckTimeoutMillis() != 0) { - internalConsumerConfig.setAckTimeoutMillis(conf.getAckTimeoutMillis()); - } - return internalConsumerConfig; } diff --git a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/semantics/SemanticsTest.java b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/semantics/SemanticsTest.java index 701b38e..c325e14 100644 --- a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/semantics/SemanticsTest.java +++ b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/semantics/SemanticsTest.java @@ -21,20 +21,26 @@ package org.apache.pulsar.tests.integration.semantics; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; +import java.util.HashMap; +import java.util.Map; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.BatchMessageIdImpl; +import org.apache.pulsar.client.impl.TopicMessageIdImpl; import org.apache.pulsar.tests.topologies.PulsarClusterTestBase; import org.testng.annotations.Test; import org.testng.collections.Lists; @@ -192,6 +198,68 @@ public class SemanticsTest extends PulsarClusterTestBase { receiveAndAssertMessage(consumer, 2L, "message-2"); } + @Test + public void testSubscriptionInitialPositionOneTopic() throws Exception { + testSubscriptionInitialPosition(1); + } + + @Test + public void testSubscriptionInitialPositionTwoTopics() throws Exception { + testSubscriptionInitialPosition(2); + } + + private void testSubscriptionInitialPosition(int numTopics) throws Exception { + String topicName = generateTopicName("test-subscription-initial-pos", true); + + int numMessages = 10; + + try (PulsarClient client = PulsarClient.builder() + .serviceUrl(pulsarCluster.getPlainTextServiceUrl()) + .build()) { + + for (int t = 0; t < numTopics; t++) { + try (Producer<String> producer = client.newProducer(Schema.STRING) + .topic(topicName + "-" + t) + .create()) { + + for (int i = 0; i < numMessages; i++) { + producer.send("sip-topic-" + t + "-message-" + i); + } + } + } + + String[] topics = new String[numTopics]; + Map<Integer, AtomicInteger> topicCounters = new HashMap<>(numTopics); + for (int i = 0; i < numTopics; i++) { + topics[i] = topicName + "-" + i; + topicCounters.put(i, new AtomicInteger(0)); + } + + try (Consumer<String> consumer = client.newConsumer(Schema.STRING) + .topic(topics) + .subscriptionName("my-sub") + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe()) { + + for (int i = 0; i < numTopics * numMessages; i++) { + Message<String> m = consumer.receive(); + int topicIdx; + if (numTopics > 1) { + String topic = ((TopicMessageIdImpl) m.getMessageId()).getTopicName(); + + String[] topicParts = StringUtils.split(topic, '-'); + topicIdx = Integer.parseInt(topicParts[topicParts.length - 1]); + } else { + topicIdx = 0; + } + int topicSeq = topicCounters.get(topicIdx).getAndIncrement(); + + assertEquals("sip-topic-" + topicIdx + "-message-" + topicSeq, m.getValue()); + } + } + } + } + @Test(dataProvider = "ServiceUrls") public void testBatchProducing(String serviceUrl) throws Exception { String topicName = generateTopicName("testbatchproducing", true);