This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new d073a7c PIP-13-1/3: Provide `TopicsConsumer` to consume from several
topics under same namespace (#1103)
d073a7c is described below
commit d073a7c874c391731d703f86ce13849ee7312f0e
Author: Jia Zhai <[email protected]>
AuthorDate: Wed Feb 21 14:15:40 2018 -0800
PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under
same namespace (#1103)
* add TopicsConsumerImpl and some test
* fix license header check, add get method, and fix unit-test fail
* add subscribe and unsubscribe for one topic, add tests
* change to use subscribeAsync(topic) to subscribe topics in init
* change following comments
* rebase master, fix compile issue
---
.../broker/service/PersistentQueueE2ETest.java | 4 +-
.../pulsar/client/impl/TopicsConsumerImplTest.java | 522 +++++++++++++
.../org/apache/pulsar/client/api/PulsarClient.java | 57 ++
.../apache/pulsar/client/impl/ConsumerBase.java | 13 +-
.../apache/pulsar/client/impl/ConsumerImpl.java | 11 +-
.../client/impl/PartitionedConsumerImpl.java | 15 +-
.../pulsar/client/impl/PulsarClientImpl.java | 65 ++
.../pulsar/client/impl/TopicMessageIdImpl.java | 49 ++
.../pulsar/client/impl/TopicMessageImpl.java | 101 +++
.../pulsar/client/impl/TopicsConsumerImpl.java | 823 +++++++++++++++++++++
.../pulsar/client/impl/UnAckedMessageTracker.java | 44 +-
.../client/impl/UnAckedTopicMessageTracker.java | 49 ++
12 files changed, 1712 insertions(+), 41 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java
index 7a65cbc..f311f4c 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java
@@ -539,11 +539,11 @@ public class PersistentQueueE2ETest extends
BrokerTestBase {
producer.send(("hello-" + i).getBytes());
}
- Set<MessageIdImpl> c1_receivedMessages = new HashSet<>();
+ Set<MessageId> c1_receivedMessages = new HashSet<>();
// C-1 gets all messages but doesn't ack
for (int i = 0; i < numMsgs; i++) {
- c1_receivedMessages.add((MessageIdImpl)
consumer1.receive().getMessageId());
+ c1_receivedMessages.add(consumer1.receive().getMessageId());
}
// C-2 will not get any message initially, since everything went to
C-1 already
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
new file mode 100644
index 0000000..a2cb6d5
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
@@ -0,0 +1,522 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+import com.google.common.collect.Lists;
+import java.util.HashSet;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConfiguration;
+import org.apache.pulsar.client.api.ProducerConfiguration.MessageRoutingMode;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.policies.data.PropertyAdmin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class TopicsConsumerImplTest extends ProducerConsumerBase {
+ private static final long testTimeout = 90000; // 1.5 min
+ private static final Logger log =
LoggerFactory.getLogger(TopicsConsumerImplTest.class);
+ private final long ackTimeOutMillis = TimeUnit.SECONDS.toMillis(2);
+
+ @Override
+ @BeforeMethod
+ public void setup() throws Exception {
+ super.internalSetup();
+ }
+
+ @Override
+ @AfterMethod
+ public void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ // Verify subscribe topics from different namespace should return error.
+ @Test(timeOut = testTimeout)
+ public void testDifferentTopicsNameSubscribe() throws Exception {
+ String key = "TopicsFromDifferentNamespace";
+ final String subscriptionName = "my-ex-subscription-" + key;
+
+ final String topicName1 = "persistent://prop/use/ns-abc1/topic-1-" +
key;
+ final String topicName2 = "persistent://prop/use/ns-abc2/topic-2-" +
key;
+ final String topicName3 = "persistent://prop/use/ns-abc3/topic-3-" +
key;
+ List<String> topicNames = Lists.newArrayList(topicName1, topicName2,
topicName3);
+
+ admin.properties().createProperty("prop", new PropertyAdmin());
+ admin.persistentTopics().createPartitionedTopic(topicName2, 2);
+ admin.persistentTopics().createPartitionedTopic(topicName3, 3);
+
+ // 2. Create consumer
+ ConsumerConfiguration conf = new ConsumerConfiguration();
+ conf.setReceiverQueueSize(4);
+ conf.setAckTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS);
+ conf.setSubscriptionType(SubscriptionType.Shared);
+ try {
+ Consumer consumer = pulsarClient.subscribeAsync(topicNames,
subscriptionName, conf).get();
+ fail("subscribe for topics from different namespace should fail.");
+ } catch (IllegalArgumentException e) {
+ // expected for have different namespace
+ }
+ }
+
+
+ @Test(timeOut = testTimeout)
+ public void testGetConsumersAndGetTopics() throws Exception {
+ String key = "TopicsConsumerGet";
+ final String subscriptionName = "my-ex-subscription-" + key;
+
+ final String topicName1 = "persistent://prop/use/ns-abc/topic-1-" +
key;
+ final String topicName2 = "persistent://prop/use/ns-abc/topic-2-" +
key;
+ final String topicName3 = "persistent://prop/use/ns-abc/topic-3-" +
key;
+ List<String> topicNames = Lists.newArrayList(topicName1, topicName2,
topicName3);
+
+ admin.properties().createProperty("prop", new PropertyAdmin());
+ admin.persistentTopics().createPartitionedTopic(topicName2, 2);
+ admin.persistentTopics().createPartitionedTopic(topicName3, 3);
+
+ // 2. Create consumer
+ ConsumerConfiguration conf = new ConsumerConfiguration();
+ conf.setReceiverQueueSize(4);
+ conf.setAckTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS);
+ conf.setSubscriptionType(SubscriptionType.Shared);
+ Consumer consumer = pulsarClient.subscribeAsync(topicNames,
subscriptionName, conf).get();
+ assertTrue(consumer instanceof TopicsConsumerImpl);
+
+ List<String> topics = ((TopicsConsumerImpl)
consumer).getPartitionedTopics();
+ List<ConsumerImpl> consumers = ((TopicsConsumerImpl)
consumer).getConsumers();
+
+ topics.forEach(topic -> log.info("topic: {}", topic));
+ consumers.forEach(c -> log.info("consumer: {}", c.getTopic()));
+
+ IntStream.range(0, 6).forEach(index ->
+
assertTrue(topics.get(index).equals(consumers.get(index).getTopic())));
+
+ assertTrue(((TopicsConsumerImpl) consumer).getTopics().size() == 3);
+
+ consumer.unsubscribe();
+ consumer.close();
+ }
+
+ @Test(timeOut = testTimeout)
+ public void testSyncProducerAndConsumer() throws Exception {
+ String key = "TopicsConsumerSyncTest";
+ final String subscriptionName = "my-ex-subscription-" + key;
+ final String messagePredicate = "my-message-" + key + "-";
+ final int totalMessages = 30;
+
+ final String topicName1 = "persistent://prop/use/ns-abc/topic-1-" +
key;
+ final String topicName2 = "persistent://prop/use/ns-abc/topic-2-" +
key;
+ final String topicName3 = "persistent://prop/use/ns-abc/topic-3-" +
key;
+ List<String> topicNames = Lists.newArrayList(topicName1, topicName2,
topicName3);
+
+ admin.properties().createProperty("prop", new PropertyAdmin());
+ admin.persistentTopics().createPartitionedTopic(topicName2, 2);
+ admin.persistentTopics().createPartitionedTopic(topicName3, 3);
+
+ ProducerConfiguration producerConfiguration = new
ProducerConfiguration();
+
producerConfiguration.setMessageRoutingMode(MessageRoutingMode.RoundRobinPartition);
+
+ // 1. producer connect
+ Producer producer1 = pulsarClient.createProducer(topicName1);
+ Producer producer2 = pulsarClient.createProducer(topicName2,
producerConfiguration);
+ Producer producer3 = pulsarClient.createProducer(topicName3,
producerConfiguration);
+
+ // 2. Create consumer
+ ConsumerConfiguration conf = new ConsumerConfiguration();
+ conf.setReceiverQueueSize(4);
+ conf.setAckTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS);
+ conf.setSubscriptionType(SubscriptionType.Shared);
+ Consumer consumer = pulsarClient.subscribeAsync(topicNames,
subscriptionName, conf).get();
+ assertTrue(consumer instanceof TopicsConsumerImpl);
+
+ // 3. producer publish messages
+ for (int i = 0; i < totalMessages / 3; i++) {
+ producer1.send((messagePredicate + "producer1-" + i).getBytes());
+ producer2.send((messagePredicate + "producer2-" + i).getBytes());
+ producer3.send((messagePredicate + "producer3-" + i).getBytes());
+ }
+
+ int messageSet = 0;
+ Message message = consumer.receive();
+ do {
+ assertTrue(message instanceof TopicMessageImpl);
+ messageSet ++;
+ consumer.acknowledge(message);
+ log.debug("Consumer acknowledged : " + new
String(message.getData()));
+ message = consumer.receive(500, TimeUnit.MILLISECONDS);
+ } while (message != null);
+ assertEquals(messageSet, totalMessages);
+
+ consumer.unsubscribe();
+ consumer.close();
+ producer1.close();
+ producer2.close();
+ producer3.close();
+ }
+
+ @Test(timeOut = testTimeout)
+ public void testAsyncConsumer() throws Exception {
+ String key = "TopicsConsumerAsyncTest";
+ final String subscriptionName = "my-ex-subscription-" + key;
+ final String messagePredicate = "my-message-" + key + "-";
+ final int totalMessages = 30;
+
+ final String topicName1 = "persistent://prop/use/ns-abc/topic-1-" +
key;
+ final String topicName2 = "persistent://prop/use/ns-abc/topic-2-" +
key;
+ final String topicName3 = "persistent://prop/use/ns-abc/topic-3-" +
key;
+ List<String> topicNames = Lists.newArrayList(topicName1, topicName2,
topicName3);
+
+ admin.properties().createProperty("prop", new PropertyAdmin());
+ admin.persistentTopics().createPartitionedTopic(topicName2, 2);
+ admin.persistentTopics().createPartitionedTopic(topicName3, 3);
+
+ ProducerConfiguration producerConfiguration = new
ProducerConfiguration();
+
producerConfiguration.setMessageRoutingMode(MessageRoutingMode.RoundRobinPartition);
+
+ // 1. producer connect
+ Producer producer1 = pulsarClient.createProducer(topicName1);
+ Producer producer2 = pulsarClient.createProducer(topicName2,
producerConfiguration);
+ Producer producer3 = pulsarClient.createProducer(topicName3,
producerConfiguration);
+
+ // 2. Create consumer
+ ConsumerConfiguration conf = new ConsumerConfiguration();
+ conf.setReceiverQueueSize(4);
+ conf.setAckTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS);
+ conf.setSubscriptionType(SubscriptionType.Shared);
+ Consumer consumer = pulsarClient.subscribeAsync(topicNames,
subscriptionName, conf).get();
+ assertTrue(consumer instanceof TopicsConsumerImpl);
+
+ // Asynchronously produce messages
+ List<Future<MessageId>> futures = Lists.newArrayList();
+ for (int i = 0; i < totalMessages / 3; i++) {
+ futures.add(producer1.sendAsync((messagePredicate + "producer1-" +
i).getBytes()));
+ futures.add(producer2.sendAsync((messagePredicate + "producer2-" +
i).getBytes()));
+ futures.add(producer3.sendAsync((messagePredicate + "producer3-" +
i).getBytes()));
+ }
+ log.info("Waiting for async publish to complete : {}", futures.size());
+ for (Future<MessageId> future : futures) {
+ future.get();
+ }
+
+ log.info("start async consume");
+ CountDownLatch latch = new CountDownLatch(totalMessages);
+ ExecutorService executor = Executors.newFixedThreadPool(1);
+ executor.execute(() -> IntStream.range(0, totalMessages).forEach(index
->
+ consumer.receiveAsync()
+ .thenAccept(msg -> {
+ assertTrue(msg instanceof TopicMessageImpl);
+ try {
+ consumer.acknowledge(msg);
+ } catch (PulsarClientException e1) {
+ fail("message acknowledge failed", e1);
+ }
+ latch.countDown();
+ log.info("receive index: {}, latch countDown: {}", index,
latch.getCount());
+ })
+ .exceptionally(ex -> {
+ log.warn("receive index: {}, failed receive message {}",
index, ex.getMessage());
+ ex.printStackTrace();
+ return null;
+ })));
+
+ latch.await();
+ log.info("success latch wait");
+
+ consumer.unsubscribe();
+ consumer.close();
+ producer1.close();
+ producer2.close();
+ producer3.close();
+ }
+
+ @Test(timeOut = testTimeout)
+ public void testConsumerUnackedRedelivery() throws Exception {
+ String key = "TopicsConsumerRedeliveryTest";
+ final String subscriptionName = "my-ex-subscription-" + key;
+ final String messagePredicate = "my-message-" + key + "-";
+ final int totalMessages = 30;
+
+ final String topicName1 = "persistent://prop/use/ns-abc/topic-1-" +
key;
+ final String topicName2 = "persistent://prop/use/ns-abc/topic-2-" +
key;
+ final String topicName3 = "persistent://prop/use/ns-abc/topic-3-" +
key;
+ List<String> topicNames = Lists.newArrayList(topicName1, topicName2,
topicName3);
+
+ admin.properties().createProperty("prop", new PropertyAdmin());
+ admin.persistentTopics().createPartitionedTopic(topicName2, 2);
+ admin.persistentTopics().createPartitionedTopic(topicName3, 3);
+
+ ProducerConfiguration producerConfiguration = new
ProducerConfiguration();
+
producerConfiguration.setMessageRoutingMode(MessageRoutingMode.RoundRobinPartition);
+
+ // 1. producer connect
+ Producer producer1 = pulsarClient.createProducer(topicName1);
+ Producer producer2 = pulsarClient.createProducer(topicName2,
producerConfiguration);
+ Producer producer3 = pulsarClient.createProducer(topicName3,
producerConfiguration);
+
+ // 2. Create consumer
+ ConsumerConfiguration conf = new ConsumerConfiguration();
+ conf.setReceiverQueueSize(4);
+ conf.setAckTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS);
+ conf.setSubscriptionType(SubscriptionType.Shared);
+ Consumer consumer = pulsarClient.subscribeAsync(topicNames,
subscriptionName, conf).get();
+ assertTrue(consumer instanceof TopicsConsumerImpl);
+
+ // 3. producer publish messages
+ for (int i = 0; i < totalMessages / 3; i++) {
+ producer1.send((messagePredicate + "producer1-" + i).getBytes());
+ producer2.send((messagePredicate + "producer2-" + i).getBytes());
+ producer3.send((messagePredicate + "producer3-" + i).getBytes());
+ }
+
+ // 4. Receiver receives the message, not ack, Unacked Message Tracker
size should be totalMessages.
+ Message message = consumer.receive();
+ while (message != null) {
+ assertTrue(message instanceof TopicMessageImpl);
+ log.debug("Consumer received : " + new String(message.getData()));
+ message = consumer.receive(500, TimeUnit.MILLISECONDS);
+ }
+ long size = ((TopicsConsumerImpl)
consumer).getUnAckedMessageTracker().size();
+ log.debug(key + " Unacked Message Tracker size is " + size);
+ assertEquals(size, totalMessages);
+
+ // 5. Blocking call, redeliver should kick in, after receive and ack,
Unacked Message Tracker size should be 0.
+ message = consumer.receive();
+ HashSet<String> hSet = new HashSet<>();
+ do {
+ assertTrue(message instanceof TopicMessageImpl);
+ hSet.add(new String(message.getData()));
+ consumer.acknowledge(message);
+ log.debug("Consumer acknowledged : " + new
String(message.getData()));
+ message = consumer.receive(500, TimeUnit.MILLISECONDS);
+ } while (message != null);
+
+ size = ((TopicsConsumerImpl)
consumer).getUnAckedMessageTracker().size();
+ log.debug(key + " Unacked Message Tracker size is " + size);
+ assertEquals(size, 0);
+ assertEquals(hSet.size(), totalMessages);
+
+ // 6. producer publish more messages
+ for (int i = 0; i < totalMessages / 3; i++) {
+ producer1.send((messagePredicate + "producer1-round2" +
i).getBytes());
+ producer2.send((messagePredicate + "producer2-round2" +
i).getBytes());
+ producer3.send((messagePredicate + "producer3-round2" +
i).getBytes());
+ }
+
+ // 7. Receiver receives the message, ack them
+ message = consumer.receive();
+ int received = 0;
+ while (message != null) {
+ assertTrue(message instanceof TopicMessageImpl);
+ received++;
+ String data = new String(message.getData());
+ log.debug("Consumer received : " + data);
+ consumer.acknowledge(message);
+ message = consumer.receive(100, TimeUnit.MILLISECONDS);
+ }
+ size = ((TopicsConsumerImpl)
consumer).getUnAckedMessageTracker().size();
+ log.debug(key + " Unacked Message Tracker size is " + size);
+ assertEquals(size, 0);
+ assertEquals(received, totalMessages);
+
+ // 8. Simulate ackTimeout
+ ((TopicsConsumerImpl) consumer).getUnAckedMessageTracker().toggle();
+ ((TopicsConsumerImpl) consumer).getConsumers().forEach(c ->
c.getUnAckedMessageTracker().toggle());
+
+ // 9. producer publish more messages
+ for (int i = 0; i < totalMessages / 3; i++) {
+ producer1.send((messagePredicate + "producer1-round3" +
i).getBytes());
+ producer2.send((messagePredicate + "producer2-round3" +
i).getBytes());
+ producer3.send((messagePredicate + "producer3-round3" +
i).getBytes());
+ }
+
+ // 10. Receiver receives the message, doesn't ack
+ message = consumer.receive();
+ while (message != null) {
+ String data = new String(message.getData());
+ log.debug("Consumer received : " + data);
+ message = consumer.receive(100, TimeUnit.MILLISECONDS);
+ }
+ size = ((TopicsConsumerImpl)
consumer).getUnAckedMessageTracker().size();
+ log.debug(key + " Unacked Message Tracker size is " + size);
+ assertEquals(size, 30);
+
+ Thread.sleep(ackTimeOutMillis);
+
+ // 11. Receiver receives redelivered messages
+ message = consumer.receive();
+ int redelivered = 0;
+ while (message != null) {
+ assertTrue(message instanceof TopicMessageImpl);
+ redelivered++;
+ String data = new String(message.getData());
+ log.debug("Consumer received : " + data);
+ consumer.acknowledge(message);
+ message = consumer.receive(100, TimeUnit.MILLISECONDS);
+ }
+ assertEquals(redelivered, 30);
+ size = ((TopicsConsumerImpl)
consumer).getUnAckedMessageTracker().size();
+ log.info(key + " Unacked Message Tracker size is " + size);
+ assertEquals(size, 0);
+
+ consumer.unsubscribe();
+ consumer.close();
+ producer1.close();
+ producer2.close();
+ producer3.close();
+ }
+
+ @Test
+ public void testSubscribeUnsubscribeSingleTopic() throws Exception {
+ String key = "TopicsConsumerSubscribeUnsubscribeSingleTopicTest";
+ final String subscriptionName = "my-ex-subscription-" + key;
+ final String messagePredicate = "my-message-" + key + "-";
+ final int totalMessages = 30;
+
+ final String topicName1 = "persistent://prop/use/ns-abc/topic-1-" +
key;
+ final String topicName2 = "persistent://prop/use/ns-abc/topic-2-" +
key;
+ final String topicName3 = "persistent://prop/use/ns-abc/topic-3-" +
key;
+ List<String> topicNames = Lists.newArrayList(topicName1, topicName2,
topicName3);
+
+ admin.properties().createProperty("prop", new PropertyAdmin());
+ admin.persistentTopics().createPartitionedTopic(topicName2, 2);
+ admin.persistentTopics().createPartitionedTopic(topicName3, 3);
+
+ ProducerConfiguration producerConfiguration = new
ProducerConfiguration();
+
producerConfiguration.setMessageRoutingMode(MessageRoutingMode.RoundRobinPartition);
+
+ // 1. producer connect
+ Producer producer1 = pulsarClient.createProducer(topicName1);
+ Producer producer2 = pulsarClient.createProducer(topicName2,
producerConfiguration);
+ Producer producer3 = pulsarClient.createProducer(topicName3,
producerConfiguration);
+
+ // 2. Create consumer
+ ConsumerConfiguration conf = new ConsumerConfiguration();
+ conf.setReceiverQueueSize(4);
+ conf.setAckTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS);
+ conf.setSubscriptionType(SubscriptionType.Shared);
+ Consumer consumer = pulsarClient.subscribeAsync(topicNames,
subscriptionName, conf).get();
+ assertTrue(consumer instanceof TopicsConsumerImpl);
+
+ // 3. producer publish messages
+ for (int i = 0; i < totalMessages / 3; i++) {
+ producer1.send((messagePredicate + "producer1-" + i).getBytes());
+ producer2.send((messagePredicate + "producer2-" + i).getBytes());
+ producer3.send((messagePredicate + "producer3-" + i).getBytes());
+ }
+
+ int messageSet = 0;
+ Message message = consumer.receive();
+ do {
+ assertTrue(message instanceof TopicMessageImpl);
+ messageSet ++;
+ consumer.acknowledge(message);
+ log.debug("Consumer acknowledged : " + new
String(message.getData()));
+ message = consumer.receive(500, TimeUnit.MILLISECONDS);
+ } while (message != null);
+ assertEquals(messageSet, totalMessages);
+
+ // 4, unsubscribe topic3
+ CompletableFuture<Void> unsubFuture =
((TopicsConsumerImpl)consumer).unsubscribeAsync(topicName3);
+ unsubFuture.get();
+
+ // 5. producer publish messages
+ for (int i = 0; i < totalMessages / 3; i++) {
+ producer1.send((messagePredicate + "producer1-round2" +
i).getBytes());
+ producer2.send((messagePredicate + "producer2-round2" +
i).getBytes());
+ producer3.send((messagePredicate + "producer3-round2" +
i).getBytes());
+ }
+
+ // 6. should not receive messages from topic3, verify get 2/3 of all
messages
+ messageSet = 0;
+ message = consumer.receive();
+ do {
+ assertTrue(message instanceof TopicMessageImpl);
+ messageSet ++;
+ consumer.acknowledge(message);
+ log.debug("Consumer acknowledged : " + new
String(message.getData()));
+ message = consumer.receive(500, TimeUnit.MILLISECONDS);
+ } while (message != null);
+ assertEquals(messageSet, totalMessages * 2 / 3);
+
+ // 7. use getter to verify internal topics number after un-subscribe
topic3
+ List<String> topics = ((TopicsConsumerImpl)
consumer).getPartitionedTopics();
+ List<ConsumerImpl> consumers = ((TopicsConsumerImpl)
consumer).getConsumers();
+
+ assertEquals(topics.size(), 3);
+ assertEquals(consumers.size(), 3);
+ assertTrue(((TopicsConsumerImpl) consumer).getTopics().size() == 2);
+
+ // 8. re-subscribe topic3
+ CompletableFuture<Void> subFuture =
((TopicsConsumerImpl)consumer).subscribeAsync(topicName3);
+ subFuture.get();
+
+ // 9. producer publish messages
+ for (int i = 0; i < totalMessages / 3; i++) {
+ producer1.send((messagePredicate + "producer1-round3" +
i).getBytes());
+ producer2.send((messagePredicate + "producer2-round3" +
i).getBytes());
+ producer3.send((messagePredicate + "producer3-round3" +
i).getBytes());
+ }
+
+ // 10. should receive messages from all 3 topics
+ messageSet = 0;
+ message = consumer.receive();
+ do {
+ assertTrue(message instanceof TopicMessageImpl);
+ messageSet ++;
+ consumer.acknowledge(message);
+ log.debug("Consumer acknowledged : " + new
String(message.getData()));
+ message = consumer.receive(500, TimeUnit.MILLISECONDS);
+ } while (message != null);
+ assertEquals(messageSet, totalMessages);
+
+ // 11. use getter to verify internal topics number after subscribe
topic3
+ topics = ((TopicsConsumerImpl) consumer).getPartitionedTopics();
+ consumers = ((TopicsConsumerImpl) consumer).getConsumers();
+
+ assertEquals(topics.size(), 6);
+ assertEquals(consumers.size(), 6);
+ assertTrue(((TopicsConsumerImpl) consumer).getTopics().size() == 3);
+
+ consumer.unsubscribe();
+ consumer.close();
+ producer1.close();
+ producer2.close();
+ producer3.close();
+ }
+
+}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClient.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClient.java
index 6ba2518..8de76f4 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClient.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClient.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.client.api;
import java.io.Closeable;
+import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.client.impl.ClientBuilderImpl;
@@ -320,4 +321,60 @@ public interface PulsarClient extends Closeable {
* if the forceful shutdown fails
*/
void shutdown() throws PulsarClientException;
+
+
+ /**
+ * Subscribe to the given topic and subscription combination with default
{@code ConsumerConfiguration}
+ *
+ * @param topics
+ * The collection of topic names, they should be under same
namespace
+ * @param subscription
+ * The name of the subscription
+ * @return The {@code Consumer} object
+ * @throws PulsarClientException
+ */
+ Consumer subscribe(Collection<String> topics, String subscription) throws
PulsarClientException;
+
+ /**
+ * Asynchronously subscribe to the given topics and subscription
combination with
+ * default {@code ConsumerConfiguration}
+ *
+ * @param topics
+ * The collection of topic names, they should be under same
namespace
+ * @param subscription
+ * The name of the subscription
+ * @return Future of the {@code Consumer} object
+ */
+ CompletableFuture<Consumer> subscribeAsync(Collection<String> topics,
String subscription);
+
+ /**
+ * Subscribe to the given topics and subscription combination using given
{@code ConsumerConfiguration}
+ *
+ * @param topics
+ * The collection of topic names, they should be under same
namespace
+ * @param subscription
+ * The name of the subscription
+ * @param conf
+ * The {@code ConsumerConfiguration} object
+ * @return Future of the {@code Consumer} object
+ */
+ Consumer subscribe(Collection<String> topics, String subscription,
ConsumerConfiguration conf)
+ throws PulsarClientException;
+
+ /**
+ * Asynchronously subscribe to the given topics and subscription
combination using given
+ * {@code ConsumerConfiguration}
+ *
+ * @param topics
+ * The collection of topic names, they should be under same
namespace
+ * @param subscription
+ * The name of the subscription
+ * @param conf
+ * The {@code ConsumerConfiguration} object
+ * @return Future of the {@code Consumer} object
+ */
+ CompletableFuture<Consumer> subscribeAsync(Collection<String> topics,
+ String subscription,
+ ConsumerConfiguration conf);
+
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 1886c76..ef7ab8b 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.impl;
+import com.google.common.collect.Queues;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
@@ -27,7 +28,6 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
-
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerConfiguration;
import org.apache.pulsar.client.api.ConsumerEventListener;
@@ -42,8 +42,6 @@ import
org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
-import com.google.common.collect.Queues;
-
public abstract class ConsumerBase extends HandlerBase implements Consumer {
enum ConsumerType {
@@ -59,7 +57,7 @@ public abstract class ConsumerBase extends HandlerBase
implements Consumer {
protected final ExecutorService listenerExecutor;
final BlockingQueue<Message> incomingMessages;
protected final ConcurrentLinkedQueue<CompletableFuture<Message>>
pendingReceives;
- protected final int maxReceiverQueueSize;
+ protected int maxReceiverQueueSize;
protected ConsumerBase(PulsarClientImpl client, String topic, String
subscription, ConsumerConfiguration conf,
int receiverQueueSize, ExecutorService listenerExecutor,
CompletableFuture<Consumer> subscribeFuture) {
@@ -333,7 +331,7 @@ public abstract class ConsumerBase extends HandlerBase
implements Consumer {
* the connected consumers. This is a non blocking call and doesn't throw
an exception. In case the connection
* breaks, the messages are redelivered after reconnect.
*/
- protected abstract void redeliverUnacknowledgedMessages(Set<MessageIdImpl>
messageIds);
+ protected abstract void redeliverUnacknowledgedMessages(Set<MessageId>
messageIds);
@Override
public String toString() {
@@ -343,4 +341,9 @@ public abstract class ConsumerBase extends HandlerBase
implements Consumer {
", topic='" + topic + '\'' +
'}';
}
+
+ protected void setMaxReceiverQueueSize(int newSize) {
+ this.maxReceiverQueueSize = newSize;
+ }
+
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 219479f..93d0d87 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1188,7 +1188,9 @@ public class ConsumerImpl extends ConsumerBase {
}
@Override
- public void redeliverUnacknowledgedMessages(Set<MessageIdImpl> messageIds)
{
+ public void redeliverUnacknowledgedMessages(Set<MessageId> messageIds) {
+ checkArgument(messageIds.stream().findFirst().get() instanceof
MessageIdImpl);
+
if (conf.getSubscriptionType() != SubscriptionType.Shared) {
// We cannot redeliver single messages if subscription type is not
Shared
redeliverUnacknowledgedMessages();
@@ -1197,7 +1199,10 @@ public class ConsumerImpl extends ConsumerBase {
ClientCnx cnx = cnx();
if (isConnected() && cnx.getRemoteEndpointProtocolVersion() >=
ProtocolVersion.v2.getNumber()) {
int messagesFromQueue = removeExpiredMessagesFromQueue(messageIds);
- Iterable<List<MessageIdImpl>> batches =
Iterables.partition(messageIds, MAX_REDELIVER_UNACKNOWLEDGED);
+ Iterable<List<MessageIdImpl>> batches = Iterables.partition(
+ messageIds.stream()
+ .map(messageId -> (MessageIdImpl)messageId)
+ .collect(Collectors.toSet()),
MAX_REDELIVER_UNACKNOWLEDGED);
MessageIdData.Builder builder = MessageIdData.newBuilder();
batches.forEach(ids -> {
List<MessageIdData> messageIdDatas =
ids.stream().map(messageId -> {
@@ -1374,7 +1379,7 @@ public class ConsumerImpl extends ConsumerBase {
return messageId;
}
- private int removeExpiredMessagesFromQueue(Set<MessageIdImpl> messageIds) {
+ private int removeExpiredMessagesFromQueue(Set<MessageId> messageIds) {
int messagesFromQueue = 0;
Message peek = incomingMessages.peek();
if (peek != null) {
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
index 92cbea2..332dd56 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
@@ -470,7 +470,8 @@ public class PartitionedConsumerImpl extends ConsumerBase {
}
@Override
- public void redeliverUnacknowledgedMessages(Set<MessageIdImpl> messageIds)
{
+ public void redeliverUnacknowledgedMessages(Set<MessageId> messageIds) {
+ checkArgument(messageIds.stream().findFirst().get() instanceof
MessageIdImpl);
if (conf.getSubscriptionType() != SubscriptionType.Shared) {
// We cannot redeliver single messages if subscription type is not
Shared
redeliverUnacknowledgedMessages();
@@ -478,9 +479,11 @@ public class PartitionedConsumerImpl extends ConsumerBase {
}
removeExpiredMessagesFromQueue(messageIds);
messageIds.stream()
-
.collect(Collectors.groupingBy(MessageIdImpl::getPartitionIndex,
Collectors.toSet()))
- .forEach((partitionIndex, messageIds1) ->
-
consumers.get(partitionIndex).redeliverUnacknowledgedMessages(messageIds1));
+ .map(messageId -> (MessageIdImpl)messageId)
+ .collect(Collectors.groupingBy(MessageIdImpl::getPartitionIndex,
Collectors.toSet()))
+ .forEach((partitionIndex, messageIds1) ->
+ consumers.get(partitionIndex).redeliverUnacknowledgedMessages(
+ messageIds1.stream().map(mid ->
(MessageId)mid).collect(Collectors.toSet())));
resumeReceivingFromPausedConsumersIfNeeded();
}
@@ -548,10 +551,10 @@ public class PartitionedConsumerImpl extends ConsumerBase
{
return unAckedMessageTracker;
}
- private void removeExpiredMessagesFromQueue(Set<MessageIdImpl> messageIds)
{
+ private void removeExpiredMessagesFromQueue(Set<MessageId> messageIds) {
Message peek = incomingMessages.peek();
if (peek != null) {
- if (!messageIds.contains((MessageIdImpl) peek.getMessageId())) {
+ if (!messageIds.contains(peek.getMessageId())) {
// first message is not expired, then no message is expired in
queue.
return;
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 6e1c5af..5d7aab8 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.client.impl;
import static org.apache.commons.lang3.StringUtils.isBlank;
+import java.util.Collection;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.concurrent.CompletableFuture;
@@ -307,6 +308,70 @@ public class PulsarClientImpl implements PulsarClient {
return consumerSubscribedFuture;
}
+
+ @Override
+ public Consumer subscribe(Collection<String> topics, final String
subscription) throws PulsarClientException {
+ return subscribe(topics, subscription, new ConsumerConfiguration());
+ }
+
+ @Override
+ public Consumer subscribe(Collection<String> topics,
+ String subscription,
+ ConsumerConfiguration conf)
+ throws PulsarClientException {
+ try {
+ return subscribeAsync(topics, subscription, conf).get();
+ } catch (ExecutionException e) {
+ Throwable t = e.getCause();
+ if (t instanceof PulsarClientException) {
+ throw (PulsarClientException) t;
+ } else {
+ throw new PulsarClientException(t);
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarClientException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Consumer> subscribeAsync(Collection<String>
topics, String subscription) {
+ return subscribeAsync(topics, subscription, new
ConsumerConfiguration());
+ }
+
+ @Override
+ public CompletableFuture<Consumer> subscribeAsync(Collection<String>
topics,
+ String subscription,
+ ConsumerConfiguration
conf) {
+ if (topics == null || topics.isEmpty()) {
+ return FutureUtil.failedFuture(new
PulsarClientException.InvalidTopicNameException("Empty topics name"));
+ }
+
+ if (state.get() != State.Open) {
+ return FutureUtil.failedFuture(new
PulsarClientException.AlreadyClosedException("Client already closed"));
+ }
+
+ if (isBlank(subscription)) {
+ return FutureUtil
+ .failedFuture(new
PulsarClientException.InvalidConfigurationException("Empty subscription name"));
+ }
+ if (conf == null) {
+ return FutureUtil.failedFuture(
+ new
PulsarClientException.InvalidConfigurationException("Consumer configuration
undefined"));
+ }
+
+ CompletableFuture<Consumer> consumerSubscribedFuture = new
CompletableFuture<>();
+
+ ConsumerBase consumer = new TopicsConsumerImpl(PulsarClientImpl.this,
topics, subscription,
+ conf, externalExecutorProvider.getExecutor(),
+ consumerSubscribedFuture);
+ synchronized (consumers) {
+ consumers.put(consumer, Boolean.TRUE);
+ }
+
+ return consumerSubscribedFuture;
+ }
+
@Override
public Reader createReader(String topic, MessageId startMessageId,
ReaderConfiguration conf)
throws PulsarClientException {
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java
new file mode 100644
index 0000000..c7cc453
--- /dev/null
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import org.apache.pulsar.client.api.MessageId;
+
+public class TopicMessageIdImpl implements MessageId {
+ private final String topicName;
+ private final MessageId messageId;
+
+ TopicMessageIdImpl(String topicName, MessageId messageId) {
+ this.topicName = topicName;
+ this.messageId = messageId;
+ }
+
+ public String getTopicName() {
+ return topicName;
+ }
+
+ public MessageId getInnerMessageId() {
+ return messageId;
+ }
+
+ @Override
+ public byte[] toByteArray() {
+ return messageId.toByteArray();
+ }
+
+ @Override
+ public int compareTo(MessageId o) {
+ return messageId.compareTo(o);
+ }
+}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
new file mode 100644
index 0000000..619d15c
--- /dev/null
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.client.impl;
+
+import java.util.Map;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+
+public class TopicMessageImpl implements Message {
+
+ private final String topicName;
+ private final Message msg;
+ private final MessageId msgId;
+
+ TopicMessageImpl(String topicName,
+ Message msg) {
+ this.topicName = topicName;
+ this.msg = msg;
+ this.msgId = new TopicMessageIdImpl(topicName, msg.getMessageId());
+ }
+
+ /**
+ * Get the topic name of this message.
+ * @return the name of the topic on which this message was published
+ */
+ public String getTopicName() {
+ return topicName;
+ }
+
+ @Override
+ public MessageId getMessageId() {
+ return msgId;
+ }
+
+ @Override
+ public Map<String, String> getProperties() {
+ return msg.getProperties();
+ }
+
+ @Override
+ public boolean hasProperty(String name) {
+ return msg.hasProperty(name);
+ }
+
+ @Override
+ public String getProperty(String name) {
+ return msg.getProperty(name);
+ }
+
+ @Override
+ public byte[] getData() {
+ return msg.getData();
+ }
+
+ @Override
+ public long getPublishTime() {
+ return msg.getPublishTime();
+ }
+
+ @Override
+ public long getEventTime() {
+ return msg.getEventTime();
+ }
+
+ @Override
+ public long getSequenceId() {
+ return msg.getSequenceId();
+ }
+
+ @Override
+ public String getProducerName() {
+ return msg.getProducerName();
+ }
+
+ @Override
+ public boolean hasKey() {
+ return msg.hasKey();
+ }
+
+ @Override
+ public String getKey() {
+ return msg.getKey();
+ }
+}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java
new file mode 100644
index 0000000..852c5d2
--- /dev/null
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java
@@ -0,0 +1,823 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.Lists;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.util.ConsumerName;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
+import org.apache.pulsar.common.naming.DestinationName;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TopicsConsumerImpl extends ConsumerBase {
+
+ // All topics should be in same namespace
+ protected NamespaceName namespaceName;
+
+ // Map <topic+partition, consumer>, when get do ACK, consumer will by find
by topic name
+ private final ConcurrentHashMap<String, ConsumerImpl> consumers;
+
+ // Map <topic, partitionNumber>, store partition number for each topic
+ private final ConcurrentHashMap<String, Integer> topics;
+
+ // Queue of partition consumers on which we have stopped calling
receiveAsync() because the
+ // shared incoming queue was full
+ private final ConcurrentLinkedQueue<ConsumerImpl> pausedConsumers;
+
+ // Threshold for the shared queue. When the size of the shared queue goes
below the threshold, we are going to
+ // resume receiving from the paused consumer partitions
+ private final int sharedQueueResumeThreshold;
+
+ // sum of topicPartitions, simple topic has 1, partitioned topic equals to
partition number.
+ AtomicInteger numberTopicPartitions;
+
+ private final ReadWriteLock lock = new ReentrantReadWriteLock();
+ private final ConsumerStats stats;
+ private final UnAckedMessageTracker unAckedMessageTracker;
+ private final ConsumerConfiguration internalConfig;
+
+ TopicsConsumerImpl(PulsarClientImpl client, Collection<String> topics,
String subscription,
+ ConsumerConfiguration conf, ExecutorService
listenerExecutor,
+ CompletableFuture<Consumer> subscribeFuture) {
+ super(client, "TopicsConsumerFakeTopicName" +
ConsumerName.generateRandomName(), subscription,
+ conf, Math.max(2, conf.getReceiverQueueSize()), listenerExecutor,
+ subscribeFuture);
+
+ checkArgument(conf.getReceiverQueueSize() > 0,
+ "Receiver queue size needs to be greater than 0 for Topics
Consumer");
+
+ this.topics = new ConcurrentHashMap<>();
+ this.consumers = new ConcurrentHashMap<>();
+ this.pausedConsumers = new ConcurrentLinkedQueue<>();
+ this.sharedQueueResumeThreshold = maxReceiverQueueSize / 2;
+ this.numberTopicPartitions = new AtomicInteger(0);
+
+ if (conf.getAckTimeoutMillis() != 0) {
+ this.unAckedMessageTracker = new
UnAckedTopicMessageTracker(client, this, conf.getAckTimeoutMillis());
+ } else {
+ this.unAckedMessageTracker =
UnAckedMessageTracker.UNACKED_MESSAGE_TRACKER_DISABLED;
+ }
+
+ this.internalConfig = getInternalConsumerConfig();
+ this.stats = client.getConfiguration().getStatsIntervalSeconds() > 0 ?
new ConsumerStats() : null;
+
+ if (topics.isEmpty()) {
+ this.namespaceName = null;
+ setState(State.Ready);
+ subscribeFuture().complete(TopicsConsumerImpl.this);
+ return;
+ }
+
+ checkArgument(topics.isEmpty() || topicNamesValid(topics), "Topics
should have same namespace.");
+ this.namespaceName = topics.stream().findFirst().flatMap(
+ new Function<String, Optional<NamespaceName>>() {
+ @Override
+ public Optional<NamespaceName> apply(String s) {
+ return
Optional.of(DestinationName.get(s).getNamespaceObject());
+ }
+ }).get();
+
+ List<CompletableFuture<Void>> futures = topics.stream().map(t ->
subscribeAsync(t)).collect(Collectors.toList());
+ FutureUtil.waitForAll(futures)
+ .thenAccept(finalFuture -> {
+ try {
+ if (numberTopicPartitions.get() > maxReceiverQueueSize) {
+ setMaxReceiverQueueSize(numberTopicPartitions.get());
+ }
+ setState(State.Ready);
+ // We have successfully created N consumers, so we can
start receiving messages now
+
startReceivingMessages(consumers.values().stream().collect(Collectors.toList()));
+ subscribeFuture().complete(TopicsConsumerImpl.this);
+ log.info("[{}] [{}] Created topics consumer with {}
sub-consumers",
+ topic, subscription, numberTopicPartitions.get());
+ } catch (PulsarClientException e) {
+ log.warn("[{}] Failed startReceivingMessages while
subscribe topics: {}", topic, e.getMessage());
+ subscribeFuture.completeExceptionally(e);
+ }})
+ .exceptionally(ex -> {
+ log.warn("[{}] Failed to subscribe topics: {}", topic,
ex.getMessage());
+ subscribeFuture.completeExceptionally(ex);
+ return null;
+ });
+ }
+
+ // Check topics are valid.
+ // - each topic is valid,
+ // - every topic has same namespace,
+ // - topic names are unique.
+ private static boolean topicNamesValid(Collection<String> topics) {
+ checkState(topics != null && topics.size() > 1,
+ "topics should should contain more than 1 topics");
+
+ final String namespace =
DestinationName.get(topics.stream().findFirst().get()).getNamespace();
+
+ Optional<String> result = topics.stream()
+ .filter(topic -> {
+ boolean topicInvalid = !DestinationName.isValid(topic);
+ if (topicInvalid) {
+ return true;
+ }
+
+ String newNamespace =
DestinationName.get(topic).getNamespace();
+ if (!namespace.equals(newNamespace)) {
+ return true;
+ } else {
+ return false;
+ }
+ }).findFirst();
+
+ if (result.isPresent()) {
+ log.warn("[{}] Received invalid topic name. {}/{}", result.get());
+ return false;
+ }
+
+ // check topic names are unique
+ HashSet<String> set = new HashSet<>(topics);
+ if (set.size() == topics.size()) {
+ return true;
+ } else {
+ log.warn("Topic names not unique. unique/all : {}/{}", set.size(),
topics.size());
+ return false;
+ }
+ }
+
+ private void startReceivingMessages(List<ConsumerImpl> newConsumers)
throws PulsarClientException {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] startReceivingMessages for {} new consumers in
topics consumer, state: {}",
+ topic, newConsumers.size(), getState());
+ }
+ if (getState() == State.Ready) {
+ newConsumers.forEach(consumer -> {
+ consumer.sendFlowPermitsToBroker(consumer.cnx(),
conf.getReceiverQueueSize());
+ receiveMessageFromConsumer(consumer);
+ });
+ }
+ }
+
+ private void receiveMessageFromConsumer(ConsumerImpl consumer) {
+ consumer.receiveAsync().thenAccept(message -> {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] [{}] Receive message from sub consumer:{}",
+ topic, subscription, consumer.getTopic());
+ }
+ // Process the message, add to the queue and trigger listener or
async callback
+ messageReceived(consumer, message);
+
+ // we're modifying pausedConsumers
+ lock.writeLock().lock();
+ try {
+ int size = incomingMessages.size();
+ if (size >= maxReceiverQueueSize
+ || (size > sharedQueueResumeThreshold &&
!pausedConsumers.isEmpty())) {
+ // mark this consumer to be resumed later: if No more
space left in shared queue,
+ // or if any consumer is already paused (to create fair
chance for already paused consumers)
+ pausedConsumers.add(consumer);
+ } else {
+ // Schedule next receiveAsync() if the incoming queue is
not full. Use a different thread to avoid
+ // recursion and stack overflow
+ client.eventLoopGroup().execute(() -> {
+ receiveMessageFromConsumer(consumer);
+ });
+ }
+ } finally {
+ lock.writeLock().unlock();
+ }
+ });
+ }
+
+ private void messageReceived(ConsumerImpl consumer, Message message) {
+ checkArgument(message instanceof MessageImpl);
+ lock.writeLock().lock();
+ try {
+ TopicMessageImpl topicMessage = new
TopicMessageImpl(consumer.getTopic(), message);
+ unAckedMessageTracker.add(topicMessage.getMessageId());
+
+ if (log.isDebugEnabled()) {
+ log.debug("[{}][{}] Received message from topics-consumer {}",
+ topic, subscription, message.getMessageId());
+ }
+
+ // if asyncReceive is waiting : return message to callback without
adding to incomingMessages queue
+ if (!pendingReceives.isEmpty()) {
+ CompletableFuture<Message> receivedFuture =
pendingReceives.poll();
+ listenerExecutor.execute(() ->
receivedFuture.complete(topicMessage));
+ } else {
+ // Enqueue the message so that it can be retrieved when
application calls receive()
+ // Waits for the queue to have space for the message
+ // This should never block cause TopicsConsumerImpl should
always use GrowableArrayBlockingQueue
+ incomingMessages.put(topicMessage);
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } finally {
+ lock.writeLock().unlock();
+ }
+
+ if (listener != null) {
+ // Trigger the notification on the message listener in a separate
thread to avoid blocking the networking
+ // thread while the message processing happens
+ listenerExecutor.execute(() -> {
+ Message msg;
+ try {
+ msg = internalReceive();
+ } catch (PulsarClientException e) {
+ log.warn("[{}] [{}] Failed to dequeue the message for
listener", topic, subscription, e);
+ return;
+ }
+
+ try {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}][{}] Calling message listener for
message {}",
+ topic, subscription, message.getMessageId());
+ }
+ listener.received(TopicsConsumerImpl.this, msg);
+ } catch (Throwable t) {
+ log.error("[{}][{}] Message listener error in processing
message: {}",
+ topic, subscription, message, t);
+ }
+ });
+ }
+ }
+
+ private void resumeReceivingFromPausedConsumersIfNeeded() {
+ lock.readLock().lock();
+ try {
+ if (incomingMessages.size() <= sharedQueueResumeThreshold &&
!pausedConsumers.isEmpty()) {
+ while (true) {
+ ConsumerImpl consumer = pausedConsumers.poll();
+ if (consumer == null) {
+ break;
+ }
+
+ // if messages are readily available on consumer we will
attempt to writeLock on the same thread
+ client.eventLoopGroup().execute(() -> {
+ receiveMessageFromConsumer(consumer);
+ });
+ }
+ }
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ @Override
+ protected Message internalReceive() throws PulsarClientException {
+ Message message;
+ try {
+ message = incomingMessages.take();
+ checkState(message instanceof TopicMessageImpl);
+ unAckedMessageTracker.add(message.getMessageId());
+ resumeReceivingFromPausedConsumersIfNeeded();
+ return message;
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarClientException(e);
+ }
+ }
+
+ @Override
+ protected Message internalReceive(int timeout, TimeUnit unit) throws
PulsarClientException {
+ Message message;
+ try {
+ message = incomingMessages.poll(timeout, unit);
+ if (message != null) {
+ checkArgument(message instanceof TopicMessageImpl);
+ unAckedMessageTracker.add(message.getMessageId());
+ }
+ resumeReceivingFromPausedConsumersIfNeeded();
+ return message;
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarClientException(e);
+ }
+ }
+
+ @Override
+ protected CompletableFuture<Message> internalReceiveAsync() {
+ CompletableFuture<Message> result = new CompletableFuture<>();
+ Message message;
+ try {
+ lock.writeLock().lock();
+ message = incomingMessages.poll(0, TimeUnit.SECONDS);
+ if (message == null) {
+ pendingReceives.add(result);
+ } else {
+ checkState(message instanceof TopicMessageImpl);
+ unAckedMessageTracker.add(message.getMessageId());
+ resumeReceivingFromPausedConsumersIfNeeded();
+ result.complete(message);
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ result.completeExceptionally(new PulsarClientException(e));
+ } finally {
+ lock.writeLock().unlock();
+ }
+
+ return result;
+ }
+
+ @Override
+ protected CompletableFuture<Void> doAcknowledge(MessageId messageId,
AckType ackType,
+ Map<String,Long>
properties) {
+ checkArgument(messageId instanceof TopicMessageIdImpl);
+ TopicMessageIdImpl messageId1 = (TopicMessageIdImpl) messageId;
+
+ if (getState() != State.Ready) {
+ return FutureUtil.failedFuture(new PulsarClientException("Consumer
already closed"));
+ }
+
+ if (ackType == AckType.Cumulative) {
+ return FutureUtil.failedFuture(new
PulsarClientException.NotSupportedException(
+ "Cumulative acknowledge not supported for topics
consumer"));
+ } else {
+ ConsumerImpl consumer = consumers.get(messageId1.getTopicName());
+
+ MessageId innerId = messageId1.getInnerMessageId();
+ return consumer.doAcknowledge(innerId, ackType, properties)
+ .thenRun(() ->
+ unAckedMessageTracker.remove(messageId1));
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> unsubscribeAsync() {
+ if (getState() == State.Closing || getState() == State.Closed) {
+ return FutureUtil.failedFuture(
+ new PulsarClientException.AlreadyClosedException("Topics
Consumer was already closed"));
+ }
+ setState(State.Closing);
+
+ CompletableFuture<Void> unsubscribeFuture = new CompletableFuture<>();
+ List<CompletableFuture<Void>> futureList = consumers.values().stream()
+ .map(c -> c.unsubscribeAsync()).collect(Collectors.toList());
+
+ FutureUtil.waitForAll(futureList)
+ .whenComplete((r, ex) -> {
+ if (ex == null) {
+ setState(State.Closed);
+ unAckedMessageTracker.close();
+ unsubscribeFuture.complete(null);
+ log.info("[{}] [{}] [{}] Unsubscribed Topics Consumer",
+ topic, subscription, consumerName);
+ } else {
+ setState(State.Failed);
+ unsubscribeFuture.completeExceptionally(ex);
+ log.error("[{}] [{}] [{}] Could not unsubscribe Topics
Consumer",
+ topic, subscription, consumerName, ex.getCause());
+ }
+ });
+
+ return unsubscribeFuture;
+ }
+
+ @Override
+ public CompletableFuture<Void> closeAsync() {
+ if (getState() == State.Closing || getState() == State.Closed) {
+ unAckedMessageTracker.close();
+ return CompletableFuture.completedFuture(null);
+ }
+ setState(State.Closing);
+
+ CompletableFuture<Void> closeFuture = new CompletableFuture<>();
+ List<CompletableFuture<Void>> futureList = consumers.values().stream()
+ .map(c -> c.closeAsync()).collect(Collectors.toList());
+
+ FutureUtil.waitForAll(futureList)
+ .whenComplete((r, ex) -> {
+ if (ex == null) {
+ setState(State.Closed);
+ unAckedMessageTracker.close();
+ closeFuture.complete(null);
+ log.info("[{}] [{}] Closed Topics Consumer", topic,
subscription);
+ client.cleanupConsumer(this);
+ // fail all pending-receive futures to notify application
+ failPendingReceive();
+ } else {
+ setState(State.Failed);
+ closeFuture.completeExceptionally(ex);
+ log.error("[{}] [{}] Could not close Topics Consumer",
topic, subscription,
+ ex.getCause());
+ }
+ });
+
+ return closeFuture;
+ }
+
+ private void failPendingReceive() {
+ lock.readLock().lock();
+ try {
+ if (listenerExecutor != null && !listenerExecutor.isShutdown()) {
+ while (!pendingReceives.isEmpty()) {
+ CompletableFuture<Message> receiveFuture =
pendingReceives.poll();
+ if (receiveFuture != null) {
+ receiveFuture.completeExceptionally(
+ new
PulsarClientException.AlreadyClosedException("Consumer is already closed"));
+ } else {
+ break;
+ }
+ }
+ }
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public boolean isConnected() {
+ return consumers.values().stream().allMatch(consumer ->
consumer.isConnected());
+ }
+
+ @Override
+ void connectionFailed(PulsarClientException exception) {
+ // noop
+
+ }
+
+ @Override
+ void connectionOpened(ClientCnx cnx) {
+ // noop
+
+ }
+
+ @Override
+ String getHandlerName() {
+ return subscription;
+ }
+
+ private ConsumerConfiguration getInternalConsumerConfig() {
+ ConsumerConfiguration internalConsumerConfig = new
ConsumerConfiguration();
+
internalConsumerConfig.setReceiverQueueSize(conf.getReceiverQueueSize());
+ internalConsumerConfig.setSubscriptionType(conf.getSubscriptionType());
+ internalConsumerConfig.setConsumerName(consumerName);
+ if (conf.getCryptoKeyReader() != null) {
+
internalConsumerConfig.setCryptoKeyReader(conf.getCryptoKeyReader());
+
internalConsumerConfig.setCryptoFailureAction(conf.getCryptoFailureAction());
+ }
+ if (conf.getAckTimeoutMillis() != 0) {
+ internalConsumerConfig.setAckTimeout(conf.getAckTimeoutMillis(),
TimeUnit.MILLISECONDS);
+ }
+
+ return internalConsumerConfig;
+ }
+
+ @Override
+ public void redeliverUnacknowledgedMessages() {
+ synchronized (this) {
+ consumers.values().stream().forEach(consumer ->
consumer.redeliverUnacknowledgedMessages());
+ incomingMessages.clear();
+ unAckedMessageTracker.clear();
+ resumeReceivingFromPausedConsumersIfNeeded();
+ }
+ }
+
+ @Override
+ public void redeliverUnacknowledgedMessages(Set<MessageId> messageIds) {
+ checkArgument(messageIds.stream().findFirst().get() instanceof
TopicMessageIdImpl);
+
+ if (conf.getSubscriptionType() != SubscriptionType.Shared) {
+ // We cannot redeliver single messages if subscription type is not
Shared
+ redeliverUnacknowledgedMessages();
+ return;
+ }
+ removeExpiredMessagesFromQueue(messageIds);
+ messageIds.stream().map(messageId -> (TopicMessageIdImpl)messageId)
+ .collect(Collectors.groupingBy(TopicMessageIdImpl::getTopicName,
Collectors.toSet()))
+ .forEach((topicName, messageIds1) ->
+ consumers.get(topicName)
+ .redeliverUnacknowledgedMessages(messageIds1.stream()
+ .map(mid ->
mid.getInnerMessageId()).collect(Collectors.toSet())));
+ resumeReceivingFromPausedConsumersIfNeeded();
+ }
+
+ @Override
+ public void seek(MessageId messageId) throws PulsarClientException {
+ try {
+ seekAsync(messageId).get();
+ } catch (ExecutionException e) {
+ throw new PulsarClientException(e.getCause());
+ } catch (InterruptedException e) {
+ throw new PulsarClientException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> seekAsync(MessageId messageId) {
+ return FutureUtil.failedFuture(new PulsarClientException("Seek
operation not supported on topics consumer"));
+ }
+
+ /**
+ * helper method that returns current state of data structure used to
track acks for batch messages
+ *
+ * @return true if all batch messages have been acknowledged
+ */
+ public boolean isBatchingAckTrackerEmpty() {
+ return consumers.values().stream().allMatch(consumer ->
consumer.isBatchingAckTrackerEmpty());
+ }
+
+
+ @Override
+ public int getAvailablePermits() {
+ return
consumers.values().stream().mapToInt(ConsumerImpl::getAvailablePermits).sum();
+ }
+
+ @Override
+ public boolean hasReachedEndOfTopic() {
+ return
consumers.values().stream().allMatch(Consumer::hasReachedEndOfTopic);
+ }
+
+ @Override
+ public int numMessagesInQueue() {
+ return incomingMessages.size() +
consumers.values().stream().mapToInt(ConsumerImpl::numMessagesInQueue).sum();
+ }
+
+ @Override
+ public synchronized ConsumerStats getStats() {
+ if (stats == null) {
+ return null;
+ }
+ stats.reset();
+
+ consumers.values().stream().forEach(consumer ->
stats.updateCumulativeStats(consumer.getStats()));
+ return stats;
+ }
+
+ public UnAckedMessageTracker getUnAckedMessageTracker() {
+ return unAckedMessageTracker;
+ }
+
+ private void removeExpiredMessagesFromQueue(Set<MessageId> messageIds) {
+ Message peek = incomingMessages.peek();
+ if (peek != null) {
+ if (!messageIds.contains(peek.getMessageId())) {
+ // first message is not expired, then no message is expired in
queue.
+ return;
+ }
+
+ // try not to remove elements that are added while we remove
+ Message message = incomingMessages.poll();
+ checkState(message instanceof TopicMessageImpl);
+ while (message != null) {
+ MessageId messageId = message.getMessageId();
+ if (!messageIds.contains(messageId)) {
+ messageIds.add(messageId);
+ break;
+ }
+ message = incomingMessages.poll();
+ }
+ }
+ }
+
+ private boolean topicNameValid(String topicName) {
+ checkArgument(DestinationName.isValid(topicName), "Invalid topic
name:" + topicName);
+ checkArgument(!topics.containsKey(topicName), "Topics already contains
topic:" + topicName);
+
+ if (this.namespaceName != null) {
+
checkArgument(DestinationName.get(topicName).getNamespace().toString().equals(this.namespaceName.toString()),
+ "Topic " + topicName + " not in same namespace with Topics");
+ }
+
+ return true;
+ }
+
+ // subscribe one more given topic
+ public CompletableFuture<Void> subscribeAsync(String topicName) {
+ if (!topicNameValid(topicName)) {
+ return FutureUtil.failedFuture(
+ new PulsarClientException.AlreadyClosedException("Topic name
not valid"));
+ }
+
+ if (getState() == State.Closing || getState() == State.Closed) {
+ return FutureUtil.failedFuture(
+ new PulsarClientException.AlreadyClosedException("Topics
Consumer was already closed"));
+ }
+
+ CompletableFuture<Void> subscribeResult = new CompletableFuture<>();
+ final AtomicInteger partitionNumber = new AtomicInteger(0);
+
+ client.getPartitionedTopicMetadata(topicName).thenAccept(metadata -> {
+ if (log.isDebugEnabled()) {
+ log.debug("Received topic {} metadata.partitions: {}",
topicName, metadata.partitions);
+ }
+
+ List<CompletableFuture<Consumer>> futureList;
+
+ if (metadata.partitions > 1) {
+ this.topics.putIfAbsent(topicName, metadata.partitions);
+ numberTopicPartitions.addAndGet(metadata.partitions);
+ partitionNumber.addAndGet(metadata.partitions);
+
+ futureList = IntStream
+ .range(0, partitionNumber.get())
+ .mapToObj(
+ partitionIndex -> {
+ String partitionName =
DestinationName.get(topicName).getPartition(partitionIndex).toString();
+ CompletableFuture<Consumer> subFuture = new
CompletableFuture<Consumer>();
+ ConsumerImpl newConsumer = new
ConsumerImpl(client, partitionName, subscription, internalConfig,
+
client.externalExecutorProvider().getExecutor(), partitionIndex,
+ subFuture);
+ consumers.putIfAbsent(newConsumer.getTopic(),
newConsumer);
+ return subFuture;
+ })
+ .collect(Collectors.toList());
+ } else {
+ this.topics.putIfAbsent(topicName, 1);
+ numberTopicPartitions.incrementAndGet();
+ partitionNumber.incrementAndGet();
+
+ CompletableFuture<Consumer> subFuture = new
CompletableFuture<Consumer>();
+ ConsumerImpl newConsumer = new ConsumerImpl(client, topicName,
subscription, internalConfig,
+ client.externalExecutorProvider().getExecutor(), 0,
+ subFuture);
+ consumers.putIfAbsent(newConsumer.getTopic(), newConsumer);
+
+ futureList = Lists.newArrayList(subFuture);
+ }
+
+ FutureUtil.waitForAll(futureList)
+ .thenAccept(finalFuture -> {
+ try {
+ if (numberTopicPartitions.get() >
maxReceiverQueueSize) {
+
setMaxReceiverQueueSize(numberTopicPartitions.get());
+ }
+ int numTopics =
this.topics.values().stream().mapToInt(Integer::intValue).sum();
+ checkState(numberTopicPartitions.get() == numTopics,
+ "numberTopicPartitions " +
numberTopicPartitions.get()
+ + " not equals expected: " + numTopics);
+
+ // We have successfully created new consumers, so we
can start receiving messages for them
+ startReceivingMessages(
+ consumers.values().stream()
+ .filter(consumer1 -> {
+ String consumerTopicName =
consumer1.getTopic();
+ if (DestinationName.get(consumerTopicName)
+
.getPartitionedTopicName().equals(topicName)) {
+ return true;
+ } else {
+ return false;
+ }
+ })
+ .collect(Collectors.toList()));
+
+ subscribeResult.complete(null);
+ log.info("[{}] [{}] Success subscribe new topic {} in
topics consumer, numberTopicPartitions {}",
+ topic, subscription, topicName,
numberTopicPartitions.get());
+ if (this.namespaceName == null) {
+ this.namespaceName =
DestinationName.get(topicName).getNamespaceObject();
+ }
+ return;
+ } catch (PulsarClientException e) {
+ handleSubscribeOneTopicError(topicName, e);
+ subscribeResult.completeExceptionally(e);
+ }
+ })
+ .exceptionally(ex -> {
+ handleSubscribeOneTopicError(topicName, ex);
+ subscribeResult.completeExceptionally(ex);
+ return null;
+ });
+ }).exceptionally(ex1 -> {
+ log.warn("[{}] Failed to get partitioned topic metadata: {}",
topicName, ex1.getMessage());
+ subscribeResult.completeExceptionally(ex1);
+ return null;
+ });
+
+ return subscribeResult;
+ }
+
+ // handling failure during subscribe new topic, unsubscribe success
created partitions
+ private void handleSubscribeOneTopicError(String topicName, Throwable
error) {
+ log.warn("[{}] Failed to subscribe for topic [{}] in topics consumer
", topic, topicName, error.getMessage());
+
+ consumers.values().stream().filter(consumer1 -> {
+ String consumerTopicName = consumer1.getTopic();
+ if
(DestinationName.get(consumerTopicName).getPartitionedTopicName().equals(topicName))
{
+ return true;
+ } else {
+ return false;
+ }
+ }).forEach(consumer2 -> {
+ consumer2.closeAsync().handle((ok, closeException) -> {
+ consumer2.subscribeFuture().completeExceptionally(error);
+ return null;
+ });
+ consumers.remove(consumer2.getTopic());
+ });
+
+ topics.remove(topicName);
+ checkState(numberTopicPartitions.get() == consumers.values().size());
+ }
+
+ // un-subscribe a given topic
+ public CompletableFuture<Void> unsubscribeAsync(String topicName) {
+ checkArgument(DestinationName.isValid(topicName), "Invalid topic
name:" + topicName);
+
+ if (getState() == State.Closing || getState() == State.Closed) {
+ return FutureUtil.failedFuture(
+ new PulsarClientException.AlreadyClosedException("Topics
Consumer was already closed"));
+ }
+
+ CompletableFuture<Void> unsubscribeFuture = new CompletableFuture<>();
+ String topicPartName =
DestinationName.get(topicName).getPartitionedTopicName();
+
+ List<ConsumerImpl> consumersToUnsub = consumers.values().stream()
+ .filter(consumer -> {
+ String consumerTopicName = consumer.getTopic();
+ if
(DestinationName.get(consumerTopicName).getPartitionedTopicName().equals(topicPartName))
{
+ return true;
+ } else {
+ return false;
+ }
+ }).collect(Collectors.toList());
+
+ List<CompletableFuture<Void>> futureList = consumersToUnsub.stream()
+ .map(c -> c.unsubscribeAsync()).collect(Collectors.toList());
+
+ FutureUtil.waitForAll(futureList)
+ .whenComplete((r, ex) -> {
+ if (ex == null) {
+ consumersToUnsub.forEach(consumer1 -> {
+ consumers.remove(consumer1.getTopic());
+ pausedConsumers.remove(consumer1);
+ numberTopicPartitions.decrementAndGet();
+ });
+
+ topics.remove(topicName);
+ ((UnAckedTopicMessageTracker)
unAckedMessageTracker).removeTopicMessages(topicName);
+
+ unsubscribeFuture.complete(null);
+ log.info("[{}] [{}] [{}] Unsubscribed Topics Consumer,
numberTopicPartitions: {}",
+ topicName, subscription, consumerName,
numberTopicPartitions);
+ } else {
+ unsubscribeFuture.completeExceptionally(ex);
+ setState(State.Failed);
+ log.error("[{}] [{}] [{}] Could not unsubscribe Topics
Consumer",
+ topicName, subscription, consumerName, ex.getCause());
+ }
+ });
+
+ return unsubscribeFuture;
+ }
+
+ // get topics name
+ public List<String> getTopics() {
+ return topics.keySet().stream().collect(Collectors.toList());
+ }
+
+ // get partitioned topics name
+ public List<String> getPartitionedTopics() {
+ return consumers.keySet().stream().collect(Collectors.toList());
+ }
+
+ // get partitioned consumers
+ public List<ConsumerImpl> getConsumers() {
+ return consumers.values().stream().collect(Collectors.toList());
+ }
+
+ private static final Logger log =
LoggerFactory.getLogger(TopicsConsumerImpl.class);
+}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
index 796abf6..0066e72 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
@@ -18,28 +18,25 @@
*/
package org.apache.pulsar.client.impl;
+import io.netty.util.Timeout;
+import io.netty.util.TimerTask;
import java.io.Closeable;
-import java.util.ArrayList;
import java.util.HashSet;
-import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-
+import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import io.netty.util.Timeout;
-import io.netty.util.TimerTask;
-
public class UnAckedMessageTracker implements Closeable {
private static final Logger log =
LoggerFactory.getLogger(UnAckedMessageTracker.class);
- private ConcurrentOpenHashSet<MessageIdImpl> currentSet;
- private ConcurrentOpenHashSet<MessageIdImpl> oldOpenSet;
+ protected ConcurrentOpenHashSet<MessageId> currentSet;
+ protected ConcurrentOpenHashSet<MessageId> oldOpenSet;
private final ReentrantReadWriteLock readWriteLock;
- private final Lock readLock;
+ protected final Lock readLock;
private final Lock writeLock;
private Timeout timeout;
@@ -51,17 +48,17 @@ public class UnAckedMessageTracker implements Closeable {
}
@Override
- public boolean add(MessageIdImpl m) {
+ public boolean add(MessageId m) {
return true;
}
@Override
- public boolean remove(MessageIdImpl m) {
+ public boolean remove(MessageId m) {
return true;
}
@Override
- public int removeMessagesTill(MessageIdImpl msgId) {
+ public int removeMessagesTill(MessageId msgId) {
return 0;
}
@@ -77,8 +74,8 @@ public class UnAckedMessageTracker implements Closeable {
}
public UnAckedMessageTracker(PulsarClientImpl client, ConsumerBase
consumerBase, long ackTimeoutMillis) {
- currentSet = new ConcurrentOpenHashSet<MessageIdImpl>();
- oldOpenSet = new ConcurrentOpenHashSet<MessageIdImpl>();
+ currentSet = new ConcurrentOpenHashSet<MessageId>();
+ oldOpenSet = new ConcurrentOpenHashSet<MessageId>();
readWriteLock = new ReentrantReadWriteLock();
readLock = readWriteLock.readLock();
writeLock = readWriteLock.writeLock();
@@ -92,7 +89,7 @@ public class UnAckedMessageTracker implements Closeable {
public void run(Timeout t) throws Exception {
if (isAckTimeout()) {
log.warn("[{}] {} messages have timed-out", consumerBase,
oldOpenSet.size());
- Set<MessageIdImpl> messageIds = new HashSet<>();
+ Set<MessageId> messageIds = new HashSet<>();
oldOpenSet.forEach(messageIds::add);
oldOpenSet.clear();
consumerBase.redeliverUnacknowledgedMessages(messageIds);
@@ -106,7 +103,7 @@ public class UnAckedMessageTracker implements Closeable {
void toggle() {
writeLock.lock();
try {
- ConcurrentOpenHashSet<MessageIdImpl> temp = currentSet;
+ ConcurrentOpenHashSet<MessageId> temp = currentSet;
currentSet = oldOpenSet;
oldOpenSet = temp;
} finally {
@@ -124,7 +121,7 @@ public class UnAckedMessageTracker implements Closeable {
}
}
- public boolean add(MessageIdImpl m) {
+ public boolean add(MessageId m) {
readLock.lock();
try {
oldOpenSet.remove(m);
@@ -144,7 +141,7 @@ public class UnAckedMessageTracker implements Closeable {
}
}
- public boolean remove(MessageIdImpl m) {
+ public boolean remove(MessageId m) {
readLock.lock();
try {
return currentSet.remove(m) || oldOpenSet.remove(m);
@@ -171,15 +168,12 @@ public class UnAckedMessageTracker implements Closeable {
}
}
- public int removeMessagesTill(MessageIdImpl msgId) {
+ public int removeMessagesTill(MessageId msgId) {
readLock.lock();
try {
- int currentSetRemovedMsgCount = currentSet.removeIf(m ->
((m.getLedgerId() < msgId.getLedgerId()
- || (m.getLedgerId() == msgId.getLedgerId() &&
m.getEntryId() <= msgId.getEntryId()))
- && m.getPartitionIndex() == msgId.getPartitionIndex()));
- int oldSetRemovedMsgCount = oldOpenSet.removeIf(m ->
((m.getLedgerId() < msgId.getLedgerId()
- || (m.getLedgerId() == msgId.getLedgerId() &&
m.getEntryId() <= msgId.getEntryId()))
- && m.getPartitionIndex() == msgId.getPartitionIndex()));
+ int currentSetRemovedMsgCount = currentSet.removeIf(m ->
(m.compareTo(msgId) <= 0));
+ int oldSetRemovedMsgCount = oldOpenSet.removeIf(m ->
(m.compareTo(msgId) <= 0));
+
return currentSetRemovedMsgCount + oldSetRemovedMsgCount;
} finally {
readLock.unlock();
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedTopicMessageTracker.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedTopicMessageTracker.java
new file mode 100644
index 0000000..eceedf6
--- /dev/null
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedTopicMessageTracker.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static com.google.common.base.Preconditions.checkState;
+
+public class UnAckedTopicMessageTracker extends UnAckedMessageTracker {
+
+ public UnAckedTopicMessageTracker(PulsarClientImpl client, ConsumerBase
consumerBase, long ackTimeoutMillis) {
+ super(client, consumerBase, ackTimeoutMillis);
+ }
+
+ public int removeTopicMessages(String topicName) {
+ readLock.lock();
+ try {
+ int currentSetRemovedMsgCount = currentSet.removeIf(m -> {
+ checkState(m instanceof TopicMessageIdImpl,
+ "message should be of type TopicMessageIdImpl");
+ return
((TopicMessageIdImpl)m).getTopicName().contains(topicName);
+ });
+ int oldSetRemovedMsgCount = oldOpenSet.removeIf(m -> {
+ checkState(m instanceof TopicMessageIdImpl,
+ "message should be of type TopicMessageIdImpl");
+ return
((TopicMessageIdImpl)m).getTopicName().contains(topicName);
+ });
+
+ return currentSetRemovedMsgCount + oldSetRemovedMsgCount;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+}
--
To stop receiving notification emails like this one, please contact
[email protected].