This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 1dd9c43 Delete PartitionedConsumerImpl, use TopicsConsumerImpl
instead (#1365)
1dd9c43 is described below
commit 1dd9c43e8e266d3957177354f592015c6f89c6d6
Author: Jia Zhai <[email protected]>
AuthorDate: Tue Apr 3 22:03:55 2018 -0700
Delete PartitionedConsumerImpl, use TopicsConsumerImpl instead (#1365)
* delete partitionedConsumer, use topicsConsumer instead
* change following comments
* rebase master, rename TopicsConsumerImpl to MultiTopicsConsumerImpl
* avoid dup calling getPartitionedTopicMetadata
* rebase master, fix test error
---
.../broker/service/PersistentFailoverE2ETest.java | 7 +-
.../apache/pulsar/client/impl/MessageIdTest.java | 7 +-
.../client/impl/PatternTopicsConsumerImplTest.java | 76 +--
.../PerMessageUnAcknowledgedRedeliveryTest.java | 6 +-
.../pulsar/client/impl/TopicsConsumerImplTest.java | 46 +-
.../apache/pulsar/client/impl/ConsumerImpl.java | 2 +-
...sumerImpl.java => MultiTopicsConsumerImpl.java} | 283 ++++++-----
.../client/impl/PartitionedConsumerImpl.java | 553 ---------------------
...pl.java => PatternMultiTopicsConsumerImpl.java} | 24 +-
.../pulsar/client/impl/PulsarClientImpl.java | 8 +-
.../pulsar/client/impl/TopicMessageImpl.java | 6 +-
11 files changed, 263 insertions(+), 755 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
index 6176821..08cd7f6 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
@@ -44,6 +44,7 @@ import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.impl.TopicMessageImpl;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
@@ -323,13 +324,11 @@ public class PersistentFailoverE2ETest extends
BrokerTestBase {
final String subName = "sub1";
final int numMsgs = 100;
Set<String> uniqueMessages = new HashSet<>();
-
admin.persistentTopics().createPartitionedTopic(topicName,
numPartitions);
ConsumerBuilder<byte[]> consumerBuilder =
pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
.subscriptionType(SubscriptionType.Failover);
-
// 1. two consumers on the same subscription
ActiveInactiveListenerEvent listener1 = new
ActiveInactiveListenerEvent();
ActiveInactiveListenerEvent listener2 = new
ActiveInactiveListenerEvent();
@@ -374,7 +373,7 @@ public class PersistentFailoverE2ETest extends
BrokerTestBase {
}
totalMessages++;
consumer1.acknowledge(msg);
- MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId();
+ MessageIdImpl msgId = (MessageIdImpl)
(((TopicMessageImpl)msg).getInnerMessageId());
receivedPtns.add(msgId.getPartitionIndex());
}
@@ -391,7 +390,7 @@ public class PersistentFailoverE2ETest extends
BrokerTestBase {
}
totalMessages++;
consumer2.acknowledge(msg);
- MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId();
+ MessageIdImpl msgId = (MessageIdImpl)
(((TopicMessageImpl)msg).getInnerMessageId());
receivedPtns.add(msgId.getPartitionIndex());
}
assertTrue(Sets.difference(listener1.inactivePtns,
receivedPtns).isEmpty());
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java
index 4a9912d..6d6fc92 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java
@@ -208,7 +208,8 @@ public class MessageIdTest extends BrokerTestBase {
Assert.assertEquals(messageIds.size(), numberOfMessages, "Not all
messages published successfully");
for (int i = 0; i < numberOfMessages; i++) {
- MessageId messageId = consumer.receive().getMessageId();
+ MessageId topicMessageId = consumer.receive().getMessageId();
+ MessageId messageId =
((TopicMessageIdImpl)topicMessageId).getInnerMessageId();
log.info("Message ID Received = " + messageId);
Assert.assertTrue(messageIds.remove(messageId), "Failed to receive
Message");
}
@@ -247,7 +248,9 @@ public class MessageIdTest extends BrokerTestBase {
Assert.assertEquals(messageIds.size(), numberOfMessages, "Not all
messages published successfully");
for (int i = 0; i < numberOfMessages; i++) {
-
Assert.assertTrue(messageIds.remove(consumer.receive().getMessageId()), "Failed
to receive Message");
+ MessageId topicMessageId = consumer.receive().getMessageId();
+ MessageId messageId =
((TopicMessageIdImpl)topicMessageId).getInnerMessageId();
+ Assert.assertTrue(messageIds.remove(messageId), "Failed to receive
Message");
}
log.info("Message IDs = " + messageIds);
Assert.assertEquals(messageIds.size(), 0, "Not all messages received
successfully");
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
index 4f9cc70..b3e74f7 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
@@ -161,13 +161,13 @@ public class PatternTopicsConsumerImplTest extends
ProducerConsumerBase {
.subscribe();
// 4. verify consumer get methods, to get right number of partitions
and topics.
- assertSame(pattern, ((PatternTopicsConsumerImpl<?>)
consumer).getPattern());
- List<String> topics = ((PatternTopicsConsumerImpl<?>)
consumer).getPartitionedTopics();
- List<ConsumerImpl<byte[]>> consumers =
((PatternTopicsConsumerImpl<byte[]>) consumer).getConsumers();
+ assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>)
consumer).getPattern());
+ List<String> topics = ((PatternMultiTopicsConsumerImpl<?>)
consumer).getPartitionedTopics();
+ List<ConsumerImpl<byte[]>> consumers =
((PatternMultiTopicsConsumerImpl<byte[]>) consumer).getConsumers();
assertEquals(topics.size(), 6);
assertEquals(consumers.size(), 6);
- assertEquals(((PatternTopicsConsumerImpl<?>)
consumer).getTopics().size(), 3);
+ assertEquals(((PatternMultiTopicsConsumerImpl<?>)
consumer).getTopics().size(), 3);
topics.forEach(topic -> log.debug("topic: {}", topic));
consumers.forEach(c -> log.debug("consumer: {}", c.getTopic()));
@@ -175,7 +175,7 @@ public class PatternTopicsConsumerImplTest extends
ProducerConsumerBase {
IntStream.range(0, topics.size()).forEach(index ->
assertTrue(topics.get(index).equals(consumers.get(index).getTopic())));
- ((PatternTopicsConsumerImpl<?>) consumer).getTopics().forEach(topic ->
log.debug("getTopics topic: {}", topic));
+ ((PatternMultiTopicsConsumerImpl<?>)
consumer).getTopics().forEach(topic -> log.debug("getTopics topic: {}", topic));
// 5. produce data
for (int i = 0; i < totalMessages / 3; i++) {
@@ -235,8 +235,8 @@ public class PatternTopicsConsumerImplTest extends
ProducerConsumerBase {
List<String> oldNames = Lists.newArrayList(topicName1, topicName2,
topicName3, topicName4);
List<String> newNames = Lists.newArrayList(topicName3, topicName4,
topicName5, topicName6);
- List<String> addedNames =
PatternTopicsConsumerImpl.topicsListsMinus(newNames, oldNames);
- List<String> removedNames =
PatternTopicsConsumerImpl.topicsListsMinus(oldNames, newNames);
+ List<String> addedNames =
PatternMultiTopicsConsumerImpl.topicsListsMinus(newNames, oldNames);
+ List<String> removedNames =
PatternMultiTopicsConsumerImpl.topicsListsMinus(oldNames, newNames);
assertTrue(addedNames.size() == 2 &&
addedNames.contains(topicName5) &&
@@ -246,21 +246,21 @@ public class PatternTopicsConsumerImplTest extends
ProducerConsumerBase {
removedNames.contains(topicName2));
// totally 2 different list, should return content of first lists.
- List<String> addedNames2 =
PatternTopicsConsumerImpl.topicsListsMinus(addedNames, removedNames);
+ List<String> addedNames2 =
PatternMultiTopicsConsumerImpl.topicsListsMinus(addedNames, removedNames);
assertTrue(addedNames2.size() == 2 &&
addedNames2.contains(topicName5) &&
addedNames2.contains(topicName6));
// 2 same list, should return empty list.
- List<String> addedNames3 =
PatternTopicsConsumerImpl.topicsListsMinus(addedNames, addedNames);
+ List<String> addedNames3 =
PatternMultiTopicsConsumerImpl.topicsListsMinus(addedNames, addedNames);
assertEquals(addedNames3.size(), 0);
// empty list minus: addedNames2.size = 2, addedNames3.size = 0
- List<String> addedNames4 =
PatternTopicsConsumerImpl.topicsListsMinus(addedNames2, addedNames3);
+ List<String> addedNames4 =
PatternMultiTopicsConsumerImpl.topicsListsMinus(addedNames2, addedNames3);
assertTrue(addedNames4.size() == addedNames2.size());
addedNames4.forEach(name -> assertTrue(addedNames2.contains(name)));
- List<String> addedNames5 =
PatternTopicsConsumerImpl.topicsListsMinus(addedNames3, addedNames2);
+ List<String> addedNames5 =
PatternMultiTopicsConsumerImpl.topicsListsMinus(addedNames3, addedNames2);
assertEquals(addedNames5.size(), 0);
}
@@ -290,10 +290,10 @@ public class PatternTopicsConsumerImplTest extends
ProducerConsumerBase {
.subscribe();
// 3. verify consumer get methods, to get 0 number of partitions and
topics.
- assertSame(pattern, ((PatternTopicsConsumerImpl<?>)
consumer).getPattern());
- assertEquals(((PatternTopicsConsumerImpl<?>)
consumer).getPartitionedTopics().size(), 0);
- assertEquals(((PatternTopicsConsumerImpl<?>)
consumer).getConsumers().size(), 0);
- assertEquals(((PatternTopicsConsumerImpl<?>)
consumer).getTopics().size(), 0);
+ assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>)
consumer).getPattern());
+ assertEquals(((PatternMultiTopicsConsumerImpl<?>)
consumer).getPartitionedTopics().size(), 0);
+ assertEquals(((PatternMultiTopicsConsumerImpl<?>)
consumer).getConsumers().size(), 0);
+ assertEquals(((PatternMultiTopicsConsumerImpl<?>)
consumer).getTopics().size(), 0);
// 4. create producer
String messagePredicate = "my-message-" + key + "-";
@@ -310,15 +310,15 @@ public class PatternTopicsConsumerImplTest extends
ProducerConsumerBase {
// 5. call recheckTopics to subscribe each added topics above
log.debug("recheck topics change");
- PatternTopicsConsumerImpl<byte[]> consumer1 =
((PatternTopicsConsumerImpl<byte[]>) consumer);
+ PatternMultiTopicsConsumerImpl<byte[]> consumer1 =
((PatternMultiTopicsConsumerImpl<byte[]>) consumer);
consumer1.run(consumer1.getRecheckPatternTimeout());
Thread.sleep(100);
// 6. verify consumer get methods, to get number of partitions and
topics, value 6=1+2+3.
- assertSame(pattern, ((PatternTopicsConsumerImpl<?>)
consumer).getPattern());
- assertEquals(((PatternTopicsConsumerImpl<?>)
consumer).getPartitionedTopics().size(), 6);
- assertEquals(((PatternTopicsConsumerImpl<?>)
consumer).getConsumers().size(), 6);
- assertEquals(((PatternTopicsConsumerImpl<?>)
consumer).getTopics().size(), 3);
+ assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>)
consumer).getPattern());
+ assertEquals(((PatternMultiTopicsConsumerImpl<?>)
consumer).getPartitionedTopics().size(), 6);
+ assertEquals(((PatternMultiTopicsConsumerImpl<?>)
consumer).getConsumers().size(), 6);
+ assertEquals(((PatternMultiTopicsConsumerImpl<?>)
consumer).getTopics().size(), 3);
// 7. produce data
@@ -384,13 +384,13 @@ public class PatternTopicsConsumerImplTest extends
ProducerConsumerBase {
.receiverQueueSize(4)
.subscribe();
- assertTrue(consumer instanceof PatternTopicsConsumerImpl);
+ assertTrue(consumer instanceof PatternMultiTopicsConsumerImpl);
// 4. verify consumer get methods, to get 6 number of partitions and
topics: 6=1+2+3
- assertSame(pattern, ((PatternTopicsConsumerImpl<?>)
consumer).getPattern());
- assertEquals(((PatternTopicsConsumerImpl<?>)
consumer).getPartitionedTopics().size(), 6);
- assertEquals(((PatternTopicsConsumerImpl<?>)
consumer).getConsumers().size(), 6);
- assertEquals(((PatternTopicsConsumerImpl<?>)
consumer).getTopics().size(), 3);
+ assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>)
consumer).getPattern());
+ assertEquals(((PatternMultiTopicsConsumerImpl<?>)
consumer).getPartitionedTopics().size(), 6);
+ assertEquals(((PatternMultiTopicsConsumerImpl<?>)
consumer).getConsumers().size(), 6);
+ assertEquals(((PatternMultiTopicsConsumerImpl<?>)
consumer).getTopics().size(), 3);
// 5. produce data to topic 1,2,3; verify should receive all the
message
for (int i = 0; i < totalMessages / 3; i++) {
@@ -419,12 +419,12 @@ public class PatternTopicsConsumerImplTest extends
ProducerConsumerBase {
// 7. call recheckTopics to subscribe each added topics above, verify
topics number: 10=1+2+3+4
log.debug("recheck topics change");
- PatternTopicsConsumerImpl<byte[]> consumer1 =
((PatternTopicsConsumerImpl<byte[]>) consumer);
+ PatternMultiTopicsConsumerImpl<byte[]> consumer1 =
((PatternMultiTopicsConsumerImpl<byte[]>) consumer);
consumer1.run(consumer1.getRecheckPatternTimeout());
Thread.sleep(100);
- assertEquals(((PatternTopicsConsumerImpl<?>)
consumer).getPartitionedTopics().size(), 10);
- assertEquals(((PatternTopicsConsumerImpl<?>)
consumer).getConsumers().size(), 10);
- assertEquals(((PatternTopicsConsumerImpl<?>)
consumer).getTopics().size(), 4);
+ assertEquals(((PatternMultiTopicsConsumerImpl<?>)
consumer).getPartitionedTopics().size(), 10);
+ assertEquals(((PatternMultiTopicsConsumerImpl<?>)
consumer).getConsumers().size(), 10);
+ assertEquals(((PatternMultiTopicsConsumerImpl<?>)
consumer).getTopics().size(), 4);
// 8. produce data to topic3 and topic4, verify should receive all the
message
for (int i = 0; i < totalMessages / 2; i++) {
@@ -487,13 +487,13 @@ public class PatternTopicsConsumerImplTest extends
ProducerConsumerBase {
.receiverQueueSize(4)
.subscribe();
- assertTrue(consumer instanceof PatternTopicsConsumerImpl);
+ assertTrue(consumer instanceof PatternMultiTopicsConsumerImpl);
// 4. verify consumer get methods, to get 0 number of partitions and
topics: 6=1+2+3
- assertSame(pattern, ((PatternTopicsConsumerImpl<?>)
consumer).getPattern());
- assertEquals(((PatternTopicsConsumerImpl<?>)
consumer).getPartitionedTopics().size(), 6);
- assertEquals(((PatternTopicsConsumerImpl<?>)
consumer).getConsumers().size(), 6);
- assertEquals(((PatternTopicsConsumerImpl<?>)
consumer).getTopics().size(), 3);
+ assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>)
consumer).getPattern());
+ assertEquals(((PatternMultiTopicsConsumerImpl<?>)
consumer).getPartitionedTopics().size(), 6);
+ assertEquals(((PatternMultiTopicsConsumerImpl<?>)
consumer).getConsumers().size(), 6);
+ assertEquals(((PatternMultiTopicsConsumerImpl<?>)
consumer).getTopics().size(), 3);
// 5. produce data to topic 1,2,3; verify should receive all the
message
for (int i = 0; i < totalMessages / 3; i++) {
@@ -521,12 +521,12 @@ public class PatternTopicsConsumerImplTest extends
ProducerConsumerBase {
// 7. call recheckTopics to unsubscribe topic 1,3 , verify topics
number: 2=6-1-3
log.debug("recheck topics change");
- PatternTopicsConsumerImpl<byte[]> consumer1 =
((PatternTopicsConsumerImpl<byte[]>) consumer);
+ PatternMultiTopicsConsumerImpl<byte[]> consumer1 =
((PatternMultiTopicsConsumerImpl<byte[]>) consumer);
consumer1.run(consumer1.getRecheckPatternTimeout());
Thread.sleep(100);
- assertEquals(((PatternTopicsConsumerImpl<byte[]>)
consumer).getPartitionedTopics().size(), 2);
- assertEquals(((PatternTopicsConsumerImpl<byte[]>)
consumer).getConsumers().size(), 2);
- assertEquals(((PatternTopicsConsumerImpl<byte[]>)
consumer).getTopics().size(), 1);
+ assertEquals(((PatternMultiTopicsConsumerImpl<byte[]>)
consumer).getPartitionedTopics().size(), 2);
+ assertEquals(((PatternMultiTopicsConsumerImpl<byte[]>)
consumer).getConsumers().size(), 2);
+ assertEquals(((PatternMultiTopicsConsumerImpl<byte[]>)
consumer).getTopics().size(), 1);
// 8. produce data to topic2, verify should receive all the message
for (int i = 0; i < totalMessages; i++) {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PerMessageUnAcknowledgedRedeliveryTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PerMessageUnAcknowledgedRedeliveryTest.java
index 7a94cc2..053cb5e 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PerMessageUnAcknowledgedRedeliveryTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PerMessageUnAcknowledgedRedeliveryTest.java
@@ -338,7 +338,7 @@ public class PerMessageUnAcknowledgedRedeliveryTest extends
BrokerTestBase {
}
private static long
getUnackedMessagesCountInPartitionedConsumer(Consumer<byte[]> c) {
- PartitionedConsumerImpl<byte[]> pc = (PartitionedConsumerImpl<byte[]>)
c;
+ MultiTopicsConsumerImpl<byte[]> pc = (MultiTopicsConsumerImpl<byte[]>)
c;
return pc.getUnAckedMessageTracker().size()
+ pc.getConsumers().stream().mapToLong(consumer ->
consumer.getUnAckedMessageTracker().size()).sum();
}
@@ -405,8 +405,8 @@ public class PerMessageUnAcknowledgedRedeliveryTest extends
BrokerTestBase {
assertEquals(received, 5);
// 7. Simulate ackTimeout
- ((PartitionedConsumerImpl<byte[]>)
consumer).getUnAckedMessageTracker().toggle();
- ((PartitionedConsumerImpl<byte[]>) consumer).getConsumers().forEach(c
-> c.getUnAckedMessageTracker().toggle());
+ ((MultiTopicsConsumerImpl<byte[]>)
consumer).getUnAckedMessageTracker().toggle();
+ ((MultiTopicsConsumerImpl<byte[]>) consumer).getConsumers().forEach(c
-> c.getUnAckedMessageTracker().toggle());
// 8. producer publish more messages
for (int i = 0; i < totalMessages / 3; i++) {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
index 952dfac..6aa574d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
@@ -116,10 +116,10 @@ public class TopicsConsumerImplTest extends
ProducerConsumerBase {
.ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
.receiverQueueSize(4)
.subscribe();
- assertTrue(consumer instanceof TopicsConsumerImpl);
+ assertTrue(consumer instanceof MultiTopicsConsumerImpl);
- List<String> topics = ((TopicsConsumerImpl<byte[]>)
consumer).getPartitionedTopics();
- List<ConsumerImpl<byte[]>> consumers = ((TopicsConsumerImpl)
consumer).getConsumers();
+ List<String> topics = ((MultiTopicsConsumerImpl<byte[]>)
consumer).getPartitionedTopics();
+ List<ConsumerImpl<byte[]>> consumers = ((MultiTopicsConsumerImpl)
consumer).getConsumers();
topics.forEach(topic -> log.info("topic: {}", topic));
consumers.forEach(c -> log.info("consumer: {}", c.getTopic()));
@@ -127,7 +127,7 @@ public class TopicsConsumerImplTest extends
ProducerConsumerBase {
IntStream.range(0, 6).forEach(index ->
assertTrue(topics.get(index).equals(consumers.get(index).getTopic())));
- assertTrue(((TopicsConsumerImpl<byte[]>) consumer).getTopics().size()
== 3);
+ assertTrue(((MultiTopicsConsumerImpl<byte[]>)
consumer).getTopics().size() == 3);
consumer.unsubscribe();
consumer.close();
@@ -167,7 +167,7 @@ public class TopicsConsumerImplTest extends
ProducerConsumerBase {
.ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
.receiverQueueSize(4)
.subscribe();
- assertTrue(consumer instanceof TopicsConsumerImpl);
+ assertTrue(consumer instanceof MultiTopicsConsumerImpl);
// 3. producer publish messages
for (int i = 0; i < totalMessages / 3; i++) {
@@ -228,7 +228,7 @@ public class TopicsConsumerImplTest extends
ProducerConsumerBase {
.ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
.receiverQueueSize(4)
.subscribe();
- assertTrue(consumer instanceof TopicsConsumerImpl);
+ assertTrue(consumer instanceof MultiTopicsConsumerImpl);
// Asynchronously produce messages
List<Future<MessageId>> futures = Lists.newArrayList();
@@ -307,7 +307,7 @@ public class TopicsConsumerImplTest extends
ProducerConsumerBase {
.ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
.receiverQueueSize(4)
.subscribe();
- assertTrue(consumer instanceof TopicsConsumerImpl);
+ assertTrue(consumer instanceof MultiTopicsConsumerImpl);
// 3. producer publish messages
for (int i = 0; i < totalMessages / 3; i++) {
@@ -323,7 +323,7 @@ public class TopicsConsumerImplTest extends
ProducerConsumerBase {
log.debug("Consumer received : " + new String(message.getData()));
message = consumer.receive(500, TimeUnit.MILLISECONDS);
}
- long size = ((TopicsConsumerImpl<byte[]>)
consumer).getUnAckedMessageTracker().size();
+ long size = ((MultiTopicsConsumerImpl<byte[]>)
consumer).getUnAckedMessageTracker().size();
log.debug(key + " Unacked Message Tracker size is " + size);
assertEquals(size, totalMessages);
@@ -338,7 +338,7 @@ public class TopicsConsumerImplTest extends
ProducerConsumerBase {
message = consumer.receive(500, TimeUnit.MILLISECONDS);
} while (message != null);
- size = ((TopicsConsumerImpl<byte[]>)
consumer).getUnAckedMessageTracker().size();
+ size = ((MultiTopicsConsumerImpl<byte[]>)
consumer).getUnAckedMessageTracker().size();
log.debug(key + " Unacked Message Tracker size is " + size);
assertEquals(size, 0);
assertEquals(hSet.size(), totalMessages);
@@ -361,14 +361,14 @@ public class TopicsConsumerImplTest extends
ProducerConsumerBase {
consumer.acknowledge(message);
message = consumer.receive(100, TimeUnit.MILLISECONDS);
}
- size = ((TopicsConsumerImpl<byte[]>)
consumer).getUnAckedMessageTracker().size();
+ size = ((MultiTopicsConsumerImpl<byte[]>)
consumer).getUnAckedMessageTracker().size();
log.debug(key + " Unacked Message Tracker size is " + size);
assertEquals(size, 0);
assertEquals(received, totalMessages);
// 8. Simulate ackTimeout
- ((TopicsConsumerImpl<byte[]>)
consumer).getUnAckedMessageTracker().toggle();
- ((TopicsConsumerImpl<byte[]>) consumer).getConsumers().forEach(c ->
c.getUnAckedMessageTracker().toggle());
+ ((MultiTopicsConsumerImpl<byte[]>)
consumer).getUnAckedMessageTracker().toggle();
+ ((MultiTopicsConsumerImpl<byte[]>) consumer).getConsumers().forEach(c
-> c.getUnAckedMessageTracker().toggle());
// 9. producer publish more messages
for (int i = 0; i < totalMessages / 3; i++) {
@@ -384,7 +384,7 @@ public class TopicsConsumerImplTest extends
ProducerConsumerBase {
log.debug("Consumer received : " + data);
message = consumer.receive(100, TimeUnit.MILLISECONDS);
}
- size = ((TopicsConsumerImpl<byte[]>)
consumer).getUnAckedMessageTracker().size();
+ size = ((MultiTopicsConsumerImpl<byte[]>)
consumer).getUnAckedMessageTracker().size();
log.debug(key + " Unacked Message Tracker size is " + size);
assertEquals(size, 30);
@@ -402,7 +402,7 @@ public class TopicsConsumerImplTest extends
ProducerConsumerBase {
message = consumer.receive(100, TimeUnit.MILLISECONDS);
}
assertEquals(redelivered, 30);
- size = ((TopicsConsumerImpl<byte[]>)
consumer).getUnAckedMessageTracker().size();
+ size = ((MultiTopicsConsumerImpl<byte[]>)
consumer).getUnAckedMessageTracker().size();
log.info(key + " Unacked Message Tracker size is " + size);
assertEquals(size, 0);
@@ -447,7 +447,7 @@ public class TopicsConsumerImplTest extends
ProducerConsumerBase {
.ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
.receiverQueueSize(4)
.subscribe();
- assertTrue(consumer instanceof TopicsConsumerImpl);
+ assertTrue(consumer instanceof MultiTopicsConsumerImpl);
// 3. producer publish messages
for (int i = 0; i < totalMessages / 3; i++) {
@@ -468,7 +468,7 @@ public class TopicsConsumerImplTest extends
ProducerConsumerBase {
assertEquals(messageSet, totalMessages);
// 4, unsubscribe topic3
- CompletableFuture<Void> unsubFuture = ((TopicsConsumerImpl<byte[]>)
consumer).unsubscribeAsync(topicName3);
+ CompletableFuture<Void> unsubFuture =
((MultiTopicsConsumerImpl<byte[]>) consumer).unsubscribeAsync(topicName3);
unsubFuture.get();
// 5. producer publish messages
@@ -491,15 +491,15 @@ public class TopicsConsumerImplTest extends
ProducerConsumerBase {
assertEquals(messageSet, totalMessages * 2 / 3);
// 7. use getter to verify internal topics number after un-subscribe
topic3
- List<String> topics = ((TopicsConsumerImpl<byte[]>)
consumer).getPartitionedTopics();
- List<ConsumerImpl<byte[]>> consumers = ((TopicsConsumerImpl)
consumer).getConsumers();
+ List<String> topics = ((MultiTopicsConsumerImpl<byte[]>)
consumer).getPartitionedTopics();
+ List<ConsumerImpl<byte[]>> consumers = ((MultiTopicsConsumerImpl)
consumer).getConsumers();
assertEquals(topics.size(), 3);
assertEquals(consumers.size(), 3);
- assertTrue(((TopicsConsumerImpl<byte[]>) consumer).getTopics().size()
== 2);
+ assertTrue(((MultiTopicsConsumerImpl<byte[]>)
consumer).getTopics().size() == 2);
// 8. re-subscribe topic3
- CompletableFuture<Void> subFuture =
((TopicsConsumerImpl<byte[]>)consumer).subscribeAsync(topicName3);
+ CompletableFuture<Void> subFuture =
((MultiTopicsConsumerImpl<byte[]>)consumer).subscribeAsync(topicName3);
subFuture.get();
// 9. producer publish messages
@@ -522,12 +522,12 @@ public class TopicsConsumerImplTest extends
ProducerConsumerBase {
assertEquals(messageSet, totalMessages);
// 11. use getter to verify internal topics number after subscribe
topic3
- topics = ((TopicsConsumerImpl<byte[]>)
consumer).getPartitionedTopics();
- consumers = ((TopicsConsumerImpl) consumer).getConsumers();
+ topics = ((MultiTopicsConsumerImpl<byte[]>)
consumer).getPartitionedTopics();
+ consumers = ((MultiTopicsConsumerImpl) consumer).getConsumers();
assertEquals(topics.size(), 6);
assertEquals(consumers.size(), 6);
- assertTrue(((TopicsConsumerImpl<byte[]>) consumer).getTopics().size()
== 3);
+ assertTrue(((MultiTopicsConsumerImpl<byte[]>)
consumer).getTopics().size() == 3);
consumer.unsubscribe();
consumer.close();
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 2cbd642..2a88bf0 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -914,7 +914,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
id = new MessageIdImpl(id.getLedgerId(), id.getEntryId(),
getPartitionIndex());
}
if (partitionIndex != -1) {
- // we should no longer track this message,
PartitionedConsumerImpl will take care from now onwards
+ // we should no longer track this message, TopicsConsumer will
take care from now onwards
unAckedMessageTracker.remove(id);
} else {
unAckedMessageTracker.add(id);
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
similarity index 74%
rename from
pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java
rename to
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 1089cef..6da72ec 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -56,7 +56,7 @@ import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class TopicsConsumerImpl<T> extends ConsumerBase<T> {
+public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
// All topics should be in same namespace
protected NamespaceName namespaceName;
@@ -76,15 +76,15 @@ public class TopicsConsumerImpl<T> extends ConsumerBase<T> {
private final int sharedQueueResumeThreshold;
// sum of topicPartitions, simple topic has 1, partitioned topic equals to
partition number.
- AtomicInteger numberTopicPartitions;
+ AtomicInteger allTopicPartitionsNumber;
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final ConsumerStatsRecorder stats;
private final UnAckedMessageTracker unAckedMessageTracker;
private final ConsumerConfigurationData<T> internalConfig;
- TopicsConsumerImpl(PulsarClientImpl client, ConsumerConfigurationData<T>
conf, ExecutorService listenerExecutor,
- CompletableFuture<Consumer<T>> subscribeFuture,
Schema<T> schema) {
+ MultiTopicsConsumerImpl(PulsarClientImpl client,
ConsumerConfigurationData<T> conf, ExecutorService listenerExecutor,
+ CompletableFuture<Consumer<T>> subscribeFuture,
Schema<T> schema) {
super(client, "TopicsConsumerFakeTopicName" +
ConsumerName.generateRandomName(), conf,
Math.max(2, conf.getReceiverQueueSize()), listenerExecutor,
subscribeFuture, schema);
@@ -95,7 +95,7 @@ public class TopicsConsumerImpl<T> extends ConsumerBase<T> {
this.consumers = new ConcurrentHashMap<>();
this.pausedConsumers = new ConcurrentLinkedQueue<>();
this.sharedQueueResumeThreshold = maxReceiverQueueSize / 2;
- this.numberTopicPartitions = new AtomicInteger(0);
+ this.allTopicPartitionsNumber = new AtomicInteger(0);
if (conf.getAckTimeoutMillis() != 0) {
this.unAckedMessageTracker = new
UnAckedTopicMessageTracker(client, this, conf.getAckTimeoutMillis());
@@ -109,7 +109,7 @@ public class TopicsConsumerImpl<T> extends ConsumerBase<T> {
if (conf.getTopicNames().isEmpty()) {
this.namespaceName = null;
setState(State.Ready);
- subscribeFuture().complete(TopicsConsumerImpl.this);
+ subscribeFuture().complete(MultiTopicsConsumerImpl.this);
return;
}
@@ -122,15 +122,15 @@ public class TopicsConsumerImpl<T> extends
ConsumerBase<T> {
FutureUtil.waitForAll(futures)
.thenAccept(finalFuture -> {
try {
- if (numberTopicPartitions.get() > maxReceiverQueueSize) {
- setMaxReceiverQueueSize(numberTopicPartitions.get());
+ if (allTopicPartitionsNumber.get() > maxReceiverQueueSize)
{
+
setMaxReceiverQueueSize(allTopicPartitionsNumber.get());
}
setState(State.Ready);
// We have successfully created N consumers, so we can
start receiving messages now
startReceivingMessages(consumers.values().stream().collect(Collectors.toList()));
- subscribeFuture().complete(TopicsConsumerImpl.this);
+ subscribeFuture().complete(MultiTopicsConsumerImpl.this);
log.info("[{}] [{}] Created topics consumer with {}
sub-consumers",
- topic, subscription, numberTopicPartitions.get());
+ topic, subscription, allTopicPartitionsNumber.get());
} catch (PulsarClientException e) {
log.warn("[{}] Failed startReceivingMessages while
subscribe topics: {}", topic, e.getMessage());
subscribeFuture.completeExceptionally(e);
@@ -245,7 +245,7 @@ public class TopicsConsumerImpl<T> extends ConsumerBase<T> {
} else {
// Enqueue the message so that it can be retrieved when
application calls receive()
// Waits for the queue to have space for the message
- // This should never block cause TopicsConsumerImpl should
always use GrowableArrayBlockingQueue
+ // This should never block cause MultiTopicsConsumerImpl
should always use GrowableArrayBlockingQueue
incomingMessages.put(topicMessage);
}
} catch (InterruptedException e) {
@@ -271,7 +271,7 @@ public class TopicsConsumerImpl<T> extends ConsumerBase<T> {
log.debug("[{}][{}] Calling message listener for
message {}",
topic, subscription, message.getMessageId());
}
- listener.received(TopicsConsumerImpl.this, msg);
+ listener.received(MultiTopicsConsumerImpl.this, msg);
} catch (Throwable t) {
log.error("[{}][{}] Message listener error in processing
message: {}",
topic, subscription, message, t);
@@ -613,117 +613,172 @@ public class TopicsConsumerImpl<T> extends
ConsumerBase<T> {
}
CompletableFuture<Void> subscribeResult = new CompletableFuture<>();
- final AtomicInteger partitionNumber = new AtomicInteger(0);
- client.getPartitionedTopicMetadata(topicName).thenAccept(metadata -> {
- if (log.isDebugEnabled()) {
- log.debug("Received topic {} metadata.partitions: {}",
topicName, metadata.partitions);
- }
+ client.getPartitionedTopicMetadata(topicName)
+ .thenAccept(metadata -> subscribeTopicPartitions(subscribeResult,
topicName, metadata.partitions))
+ .exceptionally(ex1 -> {
+ log.warn("[{}] Failed to get partitioned topic metadata: {}",
topicName, ex1.getMessage());
+ subscribeResult.completeExceptionally(ex1);
+ return null;
+ });
- List<CompletableFuture<Consumer<T>>> futureList;
-
- if (metadata.partitions > 1) {
- this.topics.putIfAbsent(topicName, metadata.partitions);
- numberTopicPartitions.addAndGet(metadata.partitions);
- partitionNumber.addAndGet(metadata.partitions);
-
- futureList = IntStream
- .range(0, partitionNumber.get())
- .mapToObj(
- partitionIndex -> {
- String partitionName =
TopicName.get(topicName).getPartition(partitionIndex).toString();
- CompletableFuture<Consumer<T>> subFuture = new
CompletableFuture<>();
- ConsumerImpl<T> newConsumer = new
ConsumerImpl<>(client, partitionName, internalConfig,
-
client.externalExecutorProvider().getExecutor(), partitionIndex, subFuture,
schema);
- consumers.putIfAbsent(newConsumer.getTopic(),
newConsumer);
- return subFuture;
- })
- .collect(Collectors.toList());
- } else {
- this.topics.putIfAbsent(topicName, 1);
- numberTopicPartitions.incrementAndGet();
- partitionNumber.incrementAndGet();
+ return subscribeResult;
+ }
- CompletableFuture<Consumer<T>> subFuture = new
CompletableFuture<>();
- ConsumerImpl<T> newConsumer = new ConsumerImpl<>(client,
topicName, internalConfig,
- client.externalExecutorProvider().getExecutor(), 0,
subFuture, schema);
- consumers.putIfAbsent(newConsumer.getTopic(), newConsumer);
+ // create consumer for a single topic with already known partitions.
+ // first create a consumer with no topic, then do subscription for already
know partitionedTopic.
+ public static <T> MultiTopicsConsumerImpl<T>
createPartitionedConsumer(PulsarClientImpl client,
+
ConsumerConfigurationData<T> conf,
+
ExecutorService listenerExecutor,
+
CompletableFuture<Consumer<T>> subscribeFuture,
+ int
numPartitions,
+
Schema<T> schema) {
+ checkArgument(conf.getTopicNames().size() == 1, "Should have only 1
topic for partitioned consumer");
+
+ // get topic name, then remove it from conf, so constructor will
create a consumer with no topic.
+ ConsumerConfigurationData cloneConf = conf.clone();
+ String topicName = cloneConf.getSingleTopic();
+ cloneConf.getTopicNames().remove(topicName);
+
+ CompletableFuture<Consumer> future = new CompletableFuture<>();
+ MultiTopicsConsumerImpl consumer = new MultiTopicsConsumerImpl(client,
cloneConf, listenerExecutor, future, schema);
+
+ future.thenCompose(c ->
((MultiTopicsConsumerImpl)c).subscribeAsync(topicName, numPartitions))
+ .thenRun(()-> subscribeFuture.complete(consumer))
+ .exceptionally(e -> {
+ log.warn("Failed subscription for createPartitionedConsumer:
{} {}, e:{}",
+ topicName, numPartitions, e);
+
subscribeFuture.completeExceptionally(((Throwable)e).getCause());
+ return null;
+ });;
+ return consumer;
+ }
- futureList = Collections.singletonList(subFuture);
- }
+ // subscribe one more given topic, but already know the numberPartitions
+ private CompletableFuture<Void> subscribeAsync(String topicName, int
numberPartitions) {
+ if (!topicNameValid(topicName)) {
+ return FutureUtil.failedFuture(
+ new PulsarClientException.AlreadyClosedException("Topic name
not valid"));
+ }
- FutureUtil.waitForAll(futureList)
- .thenAccept(finalFuture -> {
- try {
- if (numberTopicPartitions.get() >
maxReceiverQueueSize) {
-
setMaxReceiverQueueSize(numberTopicPartitions.get());
- }
- int numTopics =
this.topics.values().stream().mapToInt(Integer::intValue).sum();
- checkState(numberTopicPartitions.get() == numTopics,
- "numberTopicPartitions " +
numberTopicPartitions.get()
- + " not equals expected: " + numTopics);
-
- // We have successfully created new consumers, so we
can start receiving messages for them
- startReceivingMessages(
- consumers.values().stream()
- .filter(consumer1 -> {
- String consumerTopicName =
consumer1.getTopic();
- if
(TopicName.get(consumerTopicName).getPartitionedTopicName().equals(
-
TopicName.get(topicName).getPartitionedTopicName().toString())) {
- return true;
- } else {
- return false;
- }
- })
- .collect(Collectors.toList()));
-
- subscribeResult.complete(null);
- log.info("[{}] [{}] Success subscribe new topic {} in
topics consumer, numberTopicPartitions {}",
- topic, subscription, topicName,
numberTopicPartitions.get());
- if (this.namespaceName == null) {
- this.namespaceName =
TopicName.get(topicName).getNamespaceObject();
- }
- return;
- } catch (PulsarClientException e) {
- handleSubscribeOneTopicError(topicName, e);
- subscribeResult.completeExceptionally(e);
- }
- })
- .exceptionally(ex -> {
- handleSubscribeOneTopicError(topicName, ex);
- subscribeResult.completeExceptionally(ex);
- return null;
- });
- }).exceptionally(ex1 -> {
- log.warn("[{}] Failed to get partitioned topic metadata: {}",
topicName, ex1.getMessage());
- subscribeResult.completeExceptionally(ex1);
- return null;
- });
+ if (getState() == State.Closing || getState() == State.Closed) {
+ return FutureUtil.failedFuture(
+ new PulsarClientException.AlreadyClosedException("Topics
Consumer was already closed"));
+ }
+
+ CompletableFuture<Void> subscribeResult = new CompletableFuture<>();
+ subscribeTopicPartitions(subscribeResult, topicName, numberPartitions);
return subscribeResult;
}
- // handling failure during subscribe new topic, unsubscribe success
created partitions
- private void handleSubscribeOneTopicError(String topicName, Throwable
error) {
- log.warn("[{}] Failed to subscribe for topic [{}] in topics consumer
", topic, topicName, error.getMessage());
+ private void subscribeTopicPartitions(CompletableFuture<Void>
subscribeResult, String topicName, int partitionNumber) {
+ if (log.isDebugEnabled()) {
+ log.debug("Subscribe to topic {} metadata.partitions: {}",
topicName, partitionNumber);
+ }
- consumers.values().stream().filter(consumer1 -> {
- String consumerTopicName = consumer1.getTopic();
- if
(TopicName.get(consumerTopicName).getPartitionedTopicName().equals(topicName)) {
- return true;
- } else {
- return false;
- }
- }).forEach(consumer2 -> {
- consumer2.closeAsync().handle((ok, closeException) -> {
- consumer2.subscribeFuture().completeExceptionally(error);
+ List<CompletableFuture<Consumer<T>>> futureList;
+
+ if (partitionNumber > 1) {
+ this.topics.putIfAbsent(topicName, partitionNumber);
+ allTopicPartitionsNumber.addAndGet(partitionNumber);
+
+ futureList = IntStream
+ .range(0, partitionNumber)
+ .mapToObj(
+ partitionIndex -> {
+ String partitionName =
TopicName.get(topicName).getPartition(partitionIndex).toString();
+ CompletableFuture<Consumer<T>> subFuture = new
CompletableFuture<>();
+ ConsumerImpl<T> newConsumer = new
ConsumerImpl<>(client, partitionName, internalConfig,
+ client.externalExecutorProvider().getExecutor(),
partitionIndex, subFuture, schema);
+ consumers.putIfAbsent(newConsumer.getTopic(),
newConsumer);
+ return subFuture;
+ })
+ .collect(Collectors.toList());
+ } else {
+ this.topics.putIfAbsent(topicName, 1);
+ allTopicPartitionsNumber.incrementAndGet();
+
+ CompletableFuture<Consumer<T>> subFuture = new
CompletableFuture<>();
+ ConsumerImpl<T> newConsumer = new ConsumerImpl<>(client,
topicName, internalConfig,
+ client.externalExecutorProvider().getExecutor(), 0, subFuture,
schema);
+ consumers.putIfAbsent(newConsumer.getTopic(), newConsumer);
+
+ futureList = Collections.singletonList(subFuture);
+ }
+
+ FutureUtil.waitForAll(futureList)
+ .thenAccept(finalFuture -> {
+ try {
+ if (allTopicPartitionsNumber.get() > maxReceiverQueueSize)
{
+
setMaxReceiverQueueSize(allTopicPartitionsNumber.get());
+ }
+ int numTopics =
this.topics.values().stream().mapToInt(Integer::intValue).sum();
+ checkState(allTopicPartitionsNumber.get() == numTopics,
+ "allTopicPartitionsNumber " +
allTopicPartitionsNumber.get()
+ + " not equals expected: " + numTopics);
+
+ // We have successfully created new consumers, so we can
start receiving messages for them
+ startReceivingMessages(
+ consumers.values().stream()
+ .filter(consumer1 -> {
+ String consumerTopicName =
consumer1.getTopic();
+ if
(TopicName.get(consumerTopicName).getPartitionedTopicName().equals(
+
TopicName.get(topicName).getPartitionedTopicName().toString())) {
+ return true;
+ } else {
+ return false;
+ }
+ })
+ .collect(Collectors.toList()));
+
+ subscribeResult.complete(null);
+ log.info("[{}] [{}] Success subscribe new topic {} in
topics consumer, partitions: {}, allTopicPartitionsNumber: {}",
+ topic, subscription, topicName, partitionNumber,
allTopicPartitionsNumber.get());
+ if (this.namespaceName == null) {
+ this.namespaceName =
TopicName.get(topicName).getNamespaceObject();
+ }
+ return;
+ } catch (PulsarClientException e) {
+ handleSubscribeOneTopicError(topicName, e,
subscribeResult);
+ }
+ })
+ .exceptionally(ex -> {
+ handleSubscribeOneTopicError(topicName, ex, subscribeResult);
return null;
});
- consumers.remove(consumer2.getTopic());
- });
+ }
- topics.remove(topicName);
- checkState(numberTopicPartitions.get() == consumers.values().size());
+ // handling failure during subscribe new topic, unsubscribe success
created partitions
+ private void handleSubscribeOneTopicError(String topicName, Throwable
error, CompletableFuture<Void> subscribeFuture) {
+ log.warn("[{}] Failed to subscribe for topic [{}] in topics consumer
{}", topic, topicName, error.getMessage());
+
+ client.externalExecutorProvider().getExecutor().submit(() -> {
+ AtomicInteger toCloseNum = new AtomicInteger(0);
+ consumers.values().stream().filter(consumer1 -> {
+ String consumerTopicName = consumer1.getTopic();
+ if
(TopicName.get(consumerTopicName).getPartitionedTopicName().equals(topicName)) {
+ return true;
+ } else {
+ return false;
+ }
+ }).forEach(consumer2 -> {
+ toCloseNum.incrementAndGet();
+ consumer2.closeAsync().whenComplete((r, ex) -> {
+ consumer2.subscribeFuture().completeExceptionally(error);
+ allTopicPartitionsNumber.decrementAndGet();
+ consumers.remove(consumer2.getTopic());
+ if (toCloseNum.decrementAndGet() == 0) {
+ log.warn("[{}] Failed to subscribe for topic [{}] in
topics consumer, subscribe error: {}",
+ topic, topicName, error.getMessage());
+ topics.remove(topicName);
+ checkState(allTopicPartitionsNumber.get() ==
consumers.values().size());
+ subscribeFuture.completeExceptionally(error);
+ }
+ return;
+ });
+ });
+ });
}
// un-subscribe a given topic
@@ -757,15 +812,15 @@ public class TopicsConsumerImpl<T> extends
ConsumerBase<T> {
consumersToUnsub.forEach(consumer1 -> {
consumers.remove(consumer1.getTopic());
pausedConsumers.remove(consumer1);
- numberTopicPartitions.decrementAndGet();
+ allTopicPartitionsNumber.decrementAndGet();
});
topics.remove(topicName);
((UnAckedTopicMessageTracker)
unAckedMessageTracker).removeTopicMessages(topicName);
unsubscribeFuture.complete(null);
- log.info("[{}] [{}] [{}] Unsubscribed Topics Consumer,
numberTopicPartitions: {}",
- topicName, subscription, consumerName,
numberTopicPartitions);
+ log.info("[{}] [{}] [{}] Unsubscribed Topics Consumer,
allTopicPartitionsNumber: {}",
+ topicName, subscription, consumerName,
allTopicPartitionsNumber);
} else {
unsubscribeFuture.completeExceptionally(ex);
setState(State.Failed);
@@ -792,5 +847,5 @@ public class TopicsConsumerImpl<T> extends ConsumerBase<T> {
return consumers.values().stream().collect(Collectors.toList());
}
- private static final Logger log =
LoggerFactory.getLogger(TopicsConsumerImpl.class);
+ private static final Logger log =
LoggerFactory.getLogger(MultiTopicsConsumerImpl.class);
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
deleted file mode 100644
index 9c952dd..0000000
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
+++ /dev/null
@@ -1,553 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.client.impl;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.stream.Collectors;
-
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
-import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
-import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.util.FutureUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-
-public class PartitionedConsumerImpl<T> extends ConsumerBase<T> {
-
- private final List<ConsumerImpl<T>> consumers;
-
- // Queue of partition consumers on which we have stopped calling
receiveAsync() because the
- // shared incoming queue was full
- private final ConcurrentLinkedQueue<ConsumerImpl<T>> pausedConsumers;
-
- // Threshold for the shared queue. When the size of the shared queue goes
below the threshold, we are going to
- // resume receiving from the paused consumer partitions
- private final int sharedQueueResumeThreshold;
-
- private final int numPartitions;
- private final ReadWriteLock lock = new ReentrantReadWriteLock();
- private final ConsumerStatsRecorderImpl stats;
- private final UnAckedMessageTracker unAckedMessageTracker;
-
- PartitionedConsumerImpl(PulsarClientImpl client,
ConsumerConfigurationData<T> conf, int numPartitions,
- ExecutorService listenerExecutor,
CompletableFuture<Consumer<T>> subscribeFuture, Schema<T> schema) {
- super(client, conf.getSingleTopic(), conf, Math.max(Math.max(2,
numPartitions), conf.getReceiverQueueSize()),
- listenerExecutor, subscribeFuture, schema);
- this.consumers = Lists.newArrayListWithCapacity(numPartitions);
- this.pausedConsumers = new ConcurrentLinkedQueue<>();
- this.sharedQueueResumeThreshold = maxReceiverQueueSize / 2;
- this.numPartitions = numPartitions;
-
- if (conf.getAckTimeoutMillis() != 0) {
- this.unAckedMessageTracker = new UnAckedMessageTracker(client,
this, conf.getAckTimeoutMillis());
- } else {
- this.unAckedMessageTracker =
UnAckedMessageTracker.UNACKED_MESSAGE_TRACKER_DISABLED;
- }
-
- stats = client.getConfiguration().getStatsIntervalSeconds() > 0 ? new
ConsumerStatsRecorderImpl() : null;
- checkArgument(conf.getReceiverQueueSize() > 0,
- "Receiver queue size needs to be greater than 0 for
Partitioned Topics");
- start();
- }
-
- private void start() {
- AtomicReference<Throwable> subscribeFail = new
AtomicReference<Throwable>();
- AtomicInteger completed = new AtomicInteger();
- ConsumerConfigurationData<T> internalConfig =
getInternalConsumerConfig();
- for (int partitionIndex = 0; partitionIndex < numPartitions;
partitionIndex++) {
- String partitionName =
TopicName.get(topic).getPartition(partitionIndex).toString();
- ConsumerImpl<T> consumer = new ConsumerImpl<>(client,
partitionName, internalConfig,
- client.externalExecutorProvider().getExecutor(),
partitionIndex, new CompletableFuture<>(), schema);
- consumers.add(consumer);
- consumer.subscribeFuture().handle((cons, subscribeException) -> {
- if (subscribeException != null) {
- setState(State.Failed);
- subscribeFail.compareAndSet(null, subscribeException);
- client.cleanupConsumer(this);
- }
- if (completed.incrementAndGet() == numPartitions) {
- if (subscribeFail.get() == null) {
- try {
- // We have successfully created N consumers, so we
can start receiving messages now
- starReceivingMessages();
- setState(State.Ready);
-
subscribeFuture().complete(PartitionedConsumerImpl.this);
- log.info("[{}] [{}] Created partitioned consumer",
topic, subscription);
- return null;
- } catch (PulsarClientException e) {
- subscribeFail.set(e);
- }
- }
- closeAsync().handle((ok, closeException) -> {
-
subscribeFuture().completeExceptionally(subscribeFail.get());
- client.cleanupConsumer(this);
- return null;
- });
- log.error("[{}] [{}] Could not create partitioned
consumer.", topic, subscription,
- subscribeFail.get().getCause());
- }
- return null;
- });
- }
- }
-
- private void starReceivingMessages() throws PulsarClientException {
- for (ConsumerImpl<T> consumer : consumers) {
-
consumer.sendFlowPermitsToBroker(consumer.getConnectionHandler().cnx(),
conf.getReceiverQueueSize());
- receiveMessageFromConsumer(consumer);
- }
- }
-
- private void receiveMessageFromConsumer(ConsumerImpl<T> consumer) {
- consumer.receiveAsync().thenAccept(message -> {
- // Process the message, add to the queue and trigger listener or
async callback
- messageReceived(message);
-
- // we're modifying pausedConsumers
- lock.writeLock().lock();
- try {
- int size = incomingMessages.size();
- if (size >= maxReceiverQueueSize
- || (size > sharedQueueResumeThreshold &&
!pausedConsumers.isEmpty())) {
- // mark this consumer to be resumed later: if No more
space left in shared queue,
- // or if any consumer is already paused (to create fair
chance for already paused consumers)
- pausedConsumers.add(consumer);
- } else {
- // Schedule next receiveAsync() if the incoming queue is
not full. Use a different thread to avoid
- // recursion and stack overflow
- client.eventLoopGroup().execute(() -> {
- receiveMessageFromConsumer(consumer);
- });
- }
- } finally {
- lock.writeLock().unlock();
- }
- });
- }
-
- private void resumeReceivingFromPausedConsumersIfNeeded() {
- lock.readLock().lock();
- try {
- if (incomingMessages.size() <= sharedQueueResumeThreshold &&
!pausedConsumers.isEmpty()) {
- while (true) {
- ConsumerImpl<T> consumer = pausedConsumers.poll();
- if (consumer == null) {
- break;
- }
-
- // if messages are readily available on consumer we will
attempt to writeLock on the same thread
- client.eventLoopGroup().execute(() -> {
- receiveMessageFromConsumer(consumer);
- });
- }
- }
- } finally {
- lock.readLock().unlock();
- }
- }
-
- @Override
- protected Message<T> internalReceive() throws PulsarClientException {
- Message<T> message;
- try {
- message = incomingMessages.take();
- unAckedMessageTracker.add((MessageIdImpl) message.getMessageId());
- resumeReceivingFromPausedConsumersIfNeeded();
- return message;
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new PulsarClientException(e);
- }
- }
-
- @Override
- protected Message<T> internalReceive(int timeout, TimeUnit unit) throws
PulsarClientException {
- Message<T> message;
- try {
- message = incomingMessages.poll(timeout, unit);
- if (message != null) {
- unAckedMessageTracker.add(message.getMessageId());
- }
- resumeReceivingFromPausedConsumersIfNeeded();
- return message;
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new PulsarClientException(e);
- }
- }
-
- @Override
- protected CompletableFuture<Message<T>> internalReceiveAsync() {
- CompletableFuture<Message<T>> result = new CompletableFuture<>();
- Message<T> message;
- try {
- lock.writeLock().lock();
- message = incomingMessages.poll(0, TimeUnit.SECONDS);
- if (message == null) {
- pendingReceives.add(result);
- } else {
- unAckedMessageTracker.add(message.getMessageId());
- resumeReceivingFromPausedConsumersIfNeeded();
- result.complete(message);
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- result.completeExceptionally(new PulsarClientException(e));
- } finally {
- lock.writeLock().unlock();
- }
-
- return result;
- }
-
- @Override
- protected CompletableFuture<Void> doAcknowledge(MessageId messageId,
AckType ackType,
- Map<String,Long>
properties) {
- checkArgument(messageId instanceof MessageIdImpl);
-
- if (getState() != State.Ready) {
- return FutureUtil.failedFuture(new PulsarClientException("Consumer
already closed"));
- }
-
- if (ackType == AckType.Cumulative) {
- return FutureUtil.failedFuture(new
PulsarClientException.NotSupportedException(
- "Cumulative acknowledge not supported for partitioned
topics"));
- } else {
-
- ConsumerImpl<T> consumer = consumers.get(((MessageIdImpl)
messageId).getPartitionIndex());
- return consumer.doAcknowledge(messageId, ackType,
properties).thenRun(() ->
- unAckedMessageTracker.remove(messageId));
- }
-
- }
-
- @Override
- public CompletableFuture<Void> unsubscribeAsync() {
- if (getState() == State.Closing || getState() == State.Closed) {
- return FutureUtil.failedFuture(
- new
PulsarClientException.AlreadyClosedException("Partitioned Consumer was already
closed"));
- }
- setState(State.Closing);
-
- AtomicReference<Throwable> unsubscribeFail = new
AtomicReference<Throwable>();
- AtomicInteger completed = new AtomicInteger(numPartitions);
- CompletableFuture<Void> unsubscribeFuture = new CompletableFuture<>();
- for (Consumer<T> consumer : consumers) {
- if (consumer != null) {
- consumer.unsubscribeAsync().handle((unsubscribed, ex) -> {
- if (ex != null) {
- unsubscribeFail.compareAndSet(null, ex);
- }
- if (completed.decrementAndGet() == 0) {
- if (unsubscribeFail.get() == null) {
- setState(State.Closed);
- unAckedMessageTracker.close();
- unsubscribeFuture.complete(null);
- log.info("[{}] [{}] Unsubscribed Partitioned
Consumer", topic, subscription);
- } else {
- setState(State.Failed);
-
unsubscribeFuture.completeExceptionally(unsubscribeFail.get());
- log.error("[{}] [{}] Could not unsubscribe
Partitioned Consumer", topic, subscription,
- unsubscribeFail.get().getCause());
- }
- }
-
- return null;
- });
- }
-
- }
-
- return unsubscribeFuture;
- }
-
- @Override
- public CompletableFuture<Void> closeAsync() {
- if (getState() == State.Closing || getState() == State.Closed) {
- unAckedMessageTracker.close();
- return CompletableFuture.completedFuture(null);
- }
- setState(State.Closing);
-
- AtomicReference<Throwable> closeFail = new
AtomicReference<Throwable>();
- AtomicInteger completed = new AtomicInteger(numPartitions);
- CompletableFuture<Void> closeFuture = new CompletableFuture<>();
- for (Consumer<T> consumer : consumers) {
- if (consumer != null) {
- consumer.closeAsync().handle((closed, ex) -> {
- if (ex != null) {
- closeFail.compareAndSet(null, ex);
- }
- if (completed.decrementAndGet() == 0) {
- if (closeFail.get() == null) {
- setState(State.Closed);
- unAckedMessageTracker.close();
- closeFuture.complete(null);
- log.info("[{}] [{}] Closed Partitioned Consumer",
topic, subscription);
- client.cleanupConsumer(this);
- // fail all pending-receive futures to notify
application
- failPendingReceive();
- } else {
- setState(State.Failed);
- closeFuture.completeExceptionally(closeFail.get());
- log.error("[{}] [{}] Could not close Partitioned
Consumer", topic, subscription,
- closeFail.get().getCause());
- }
- }
-
- return null;
- });
- }
-
- }
-
- return closeFuture;
- }
-
- private void failPendingReceive() {
- lock.readLock().lock();
- try {
- if (listenerExecutor != null && !listenerExecutor.isShutdown()) {
- while (!pendingReceives.isEmpty()) {
- CompletableFuture<Message<T>> receiveFuture =
pendingReceives.poll();
- if (receiveFuture != null) {
- receiveFuture.completeExceptionally(
- new
PulsarClientException.AlreadyClosedException("Consumer is already closed"));
- } else {
- break;
- }
- }
- }
- } finally {
- lock.readLock().unlock();
- }
- }
-
- @Override
- public boolean isConnected() {
- return consumers.stream().allMatch(ConsumerImpl::isConnected);
- }
-
- void messageReceived(Message<T> message) {
- lock.writeLock().lock();
- try {
- unAckedMessageTracker.add(message.getMessageId());
- if (log.isDebugEnabled()) {
- log.debug("[{}][{}] Received message from partitioned-consumer
{}", topic, subscription, message.getMessageId());
- }
- // if asyncReceive is waiting : return message to callback without
adding to incomingMessages queue
- if (!pendingReceives.isEmpty()) {
- CompletableFuture<Message<T>> receivedFuture =
pendingReceives.poll();
- listenerExecutor.execute(() ->
receivedFuture.complete(message));
- } else {
- // Enqueue the message so that it can be retrieved when
application calls receive()
- // Waits for the queue to have space for the message
- // This should never block cause PartitonedConsumerImpl should
always use GrowableArrayBlockingQueue
- incomingMessages.put(message);
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- } finally {
- lock.writeLock().unlock();
- }
-
- if (listener != null) {
- // Trigger the notification on the message listener in a separate
thread to avoid blocking the networking
- // thread while the message processing happens
- listenerExecutor.execute(() -> {
- Message<T> msg;
- try {
- msg = internalReceive();
- } catch (PulsarClientException e) {
- log.warn("[{}] [{}] Failed to dequeue the message for
listener", topic, subscription, e);
- return;
- }
-
- try {
- if (log.isDebugEnabled()) {
- log.debug("[{}][{}] Calling message listener for
message {}", topic, subscription, message.getMessageId());
- }
- listener.received(PartitionedConsumerImpl.this, msg);
- } catch (Throwable t) {
- log.error("[{}][{}] Message listener error in processing
message: {}", topic, subscription, message,
- t);
- }
- });
- }
- }
-
- @Override
- String getHandlerName() {
- return subscription;
- }
-
- private ConsumerConfigurationData<T> getInternalConsumerConfig() {
- ConsumerConfigurationData<T> internalConsumerConfig = new
ConsumerConfigurationData<>();
-
internalConsumerConfig.setReceiverQueueSize(conf.getReceiverQueueSize());
- internalConsumerConfig.setSubscriptionName(conf.getSubscriptionName());
- internalConsumerConfig.setSubscriptionType(conf.getSubscriptionType());
- internalConsumerConfig.setConsumerName(consumerName);
-
internalConsumerConfig.setAcknowledgementsGroupTimeMicros(conf.getAcknowledgementsGroupTimeMicros());
- internalConsumerConfig.setPriorityLevel(conf.getPriorityLevel());
- internalConsumerConfig.setProperties(conf.getProperties());
- internalConsumerConfig.setReadCompacted(conf.isReadCompacted());
- if (null != conf.getConsumerEventListener()) {
-
internalConsumerConfig.setConsumerEventListener(conf.getConsumerEventListener());
- }
-
- int receiverQueueSize = Math.min(conf.getReceiverQueueSize(),
- conf.getMaxTotalReceiverQueueSizeAcrossPartitions() /
numPartitions);
- internalConsumerConfig.setReceiverQueueSize(receiverQueueSize);
-
- if (conf.getCryptoKeyReader() != null) {
-
internalConsumerConfig.setCryptoKeyReader(conf.getCryptoKeyReader());
-
internalConsumerConfig.setCryptoFailureAction(conf.getCryptoFailureAction());
- }
- if (conf.getAckTimeoutMillis() != 0) {
-
internalConsumerConfig.setAckTimeoutMillis(conf.getAckTimeoutMillis());
- }
-
- return internalConsumerConfig;
- }
-
- @Override
- public void redeliverUnacknowledgedMessages() {
- synchronized (this) {
- for (ConsumerImpl<T> c : consumers) {
- c.redeliverUnacknowledgedMessages();
- }
- incomingMessages.clear();
- unAckedMessageTracker.clear();
- resumeReceivingFromPausedConsumersIfNeeded();
- }
- }
-
- @Override
- public void redeliverUnacknowledgedMessages(Set<MessageId> messageIds) {
- checkArgument(messageIds.stream().findFirst().get() instanceof
MessageIdImpl);
- if (conf.getSubscriptionType() != SubscriptionType.Shared) {
- // We cannot redeliver single messages if subscription type is not
Shared
- redeliverUnacknowledgedMessages();
- return;
- }
- removeExpiredMessagesFromQueue(messageIds);
- messageIds.stream()
- .map(messageId -> (MessageIdImpl)messageId)
- .collect(Collectors.groupingBy(MessageIdImpl::getPartitionIndex,
Collectors.toSet()))
- .forEach((partitionIndex, messageIds1) ->
- consumers.get(partitionIndex).redeliverUnacknowledgedMessages(
- messageIds1.stream().map(mid ->
(MessageId)mid).collect(Collectors.toSet())));
- resumeReceivingFromPausedConsumersIfNeeded();
- }
-
- @Override
- public void seek(MessageId messageId) throws PulsarClientException {
- try {
- seekAsync(messageId).get();
- } catch (ExecutionException e) {
- throw new PulsarClientException(e.getCause());
- } catch (InterruptedException e) {
- throw new PulsarClientException(e);
- }
- }
-
- @Override
- public CompletableFuture<Void> seekAsync(MessageId messageId) {
- return FutureUtil.failedFuture(new PulsarClientException("Seek
operation not supported on partitioned topics"));
- }
-
- List<ConsumerImpl<T>> getConsumers() {
- return consumers;
- }
-
- @Override
- public int getAvailablePermits() {
- return
consumers.stream().mapToInt(ConsumerImpl::getAvailablePermits).sum();
- }
-
- @Override
- public boolean hasReachedEndOfTopic() {
- return consumers.stream().allMatch(Consumer::hasReachedEndOfTopic);
- }
-
- @Override
- public int numMessagesInQueue() {
- return incomingMessages.size() +
consumers.stream().mapToInt(ConsumerImpl::numMessagesInQueue).sum();
- }
-
- @Override
- public synchronized ConsumerStatsRecorderImpl getStats() {
- if (stats == null) {
- return null;
- }
- stats.reset();
- for (int i = 0; i < numPartitions; i++) {
- stats.updateCumulativeStats(consumers.get(i).getStats());
- }
- return stats;
- }
-
- public UnAckedMessageTracker getUnAckedMessageTracker() {
- return unAckedMessageTracker;
- }
-
- private void removeExpiredMessagesFromQueue(Set<MessageId> messageIds) {
- Message<T> peek = incomingMessages.peek();
- if (peek != null) {
- if (!messageIds.contains(peek.getMessageId())) {
- // first message is not expired, then no message is expired in
queue.
- return;
- }
-
- // try not to remove elements that are added while we remove
- Message<T> message = incomingMessages.poll();
- while (message != null) {
- MessageIdImpl messageId = (MessageIdImpl)
message.getMessageId();
- if (!messageIds.contains(messageId)) {
- messageIds.add(messageId);
- break;
- }
- message = incomingMessages.poll();
- }
- }
- }
-
- private static final Logger log =
LoggerFactory.getLogger(PartitionedConsumerImpl.class);
-}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
similarity index 89%
rename from
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImpl.java
rename to
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
index f9cf550..d0b0c60 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
@@ -41,17 +41,17 @@ import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class PatternTopicsConsumerImpl<T> extends TopicsConsumerImpl<T>
implements TimerTask {
+public class PatternMultiTopicsConsumerImpl<T> extends
MultiTopicsConsumerImpl<T> implements TimerTask {
private final Pattern topicsPattern;
private final TopicsChangedListener topicsChangeListener;
private volatile Timeout recheckPatternTimeout = null;
- public PatternTopicsConsumerImpl(Pattern topicsPattern,
- PulsarClientImpl client,
- ConsumerConfigurationData<T> conf,
- ExecutorService listenerExecutor,
- CompletableFuture<Consumer<T>>
subscribeFuture,
- Schema<T> schema) {
+ public PatternMultiTopicsConsumerImpl(Pattern topicsPattern,
+ PulsarClientImpl client,
+ ConsumerConfigurationData<T> conf,
+ ExecutorService listenerExecutor,
+ CompletableFuture<Consumer<T>>
subscribeFuture,
+ Schema<T> schema) {
super(client, conf, listenerExecutor, subscribeFuture, schema);
this.topicsPattern = topicsPattern;
@@ -86,7 +86,7 @@ public class PatternTopicsConsumerImpl<T> extends
TopicsConsumerImpl<T> implemen
}
List<String> newTopics =
PulsarClientImpl.topicsPatternFilter(topics, topicsPattern);
- List<String> oldTopics =
PatternTopicsConsumerImpl.this.getTopics();
+ List<String> oldTopics =
PatternMultiTopicsConsumerImpl.this.getTopics();
futures.add(topicsChangeListener.onTopicsAdded(topicsListsMinus(newTopics,
oldTopics)));
futures.add(topicsChangeListener.onTopicsRemoved(topicsListsMinus(oldTopics,
newTopics)));
@@ -100,7 +100,7 @@ public class PatternTopicsConsumerImpl<T> extends
TopicsConsumerImpl<T> implemen
});
// schedule the next re-check task
- client.timer().newTimeout(PatternTopicsConsumerImpl.this,
+ client.timer().newTimeout(PatternMultiTopicsConsumerImpl.this,
Math.min(1, conf.getPatternAutoDiscoveryPeriod()),
TimeUnit.MINUTES);
}
@@ -109,9 +109,9 @@ public class PatternTopicsConsumerImpl<T> extends
TopicsConsumerImpl<T> implemen
}
interface TopicsChangedListener {
- // unsubscribe and delete ConsumerImpl in the `consumers` map in
`TopicsConsumerImpl` based on added topics.
+ // unsubscribe and delete ConsumerImpl in the `consumers` map in
`MultiTopicsConsumerImpl` based on added topics.
CompletableFuture<Void> onTopicsRemoved(Collection<String>
removedTopics);
- // subscribe and create a list of new ConsumerImpl, added them to the
`consumers` map in `TopicsConsumerImpl`.
+ // subscribe and create a list of new ConsumerImpl, added them to the
`consumers` map in `MultiTopicsConsumerImpl`.
CompletableFuture<Void> onTopicsAdded(Collection<String> addedTopics);
}
@@ -181,5 +181,5 @@ public class PatternTopicsConsumerImpl<T> extends
TopicsConsumerImpl<T> implemen
return recheckPatternTimeout;
}
- private static final Logger log =
LoggerFactory.getLogger(PatternTopicsConsumerImpl.class);
+ private static final Logger log =
LoggerFactory.getLogger(PatternMultiTopicsConsumerImpl.class);
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 47e953b..dd07cf9 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -384,8 +384,8 @@ public class PulsarClientImpl implements PulsarClient {
// gets the next single threaded executor from the list of
executors
ExecutorService listenerThread =
externalExecutorProvider.getExecutor();
if (metadata.partitions > 1) {
- consumer = new
PartitionedConsumerImpl<>(PulsarClientImpl.this, conf, metadata.partitions,
listenerThread,
- consumerSubscribedFuture, schema);
+ consumer =
MultiTopicsConsumerImpl.createPartitionedConsumer(PulsarClientImpl.this, conf,
+ listenerThread, consumerSubscribedFuture,
metadata.partitions, schema);
} else {
consumer = new ConsumerImpl<>(PulsarClientImpl.this, topic,
conf, listenerThread, -1,
consumerSubscribedFuture, schema);
@@ -406,7 +406,7 @@ public class PulsarClientImpl implements PulsarClient {
private <T> CompletableFuture<Consumer<T>>
multiTopicSubscribeAsync(ConsumerConfigurationData<T> conf, Schema<T> schema) {
CompletableFuture<Consumer<T>> consumerSubscribedFuture = new
CompletableFuture<>();
- ConsumerBase<T> consumer = new
TopicsConsumerImpl<>(PulsarClientImpl.this, conf,
+ ConsumerBase<T> consumer = new
MultiTopicsConsumerImpl<>(PulsarClientImpl.this, conf,
externalExecutorProvider.getExecutor(),
consumerSubscribedFuture, schema);
synchronized (consumers) {
@@ -436,7 +436,7 @@ public class PulsarClientImpl implements PulsarClient {
List<String> topicsList = topicsPatternFilter(topics,
conf.getTopicsPattern());
conf.getTopicNames().addAll(topicsList);
- ConsumerBase<T> consumer = new
PatternTopicsConsumerImpl<>(conf.getTopicsPattern(),
+ ConsumerBase<T> consumer = new
PatternMultiTopicsConsumerImpl<>(conf.getTopicsPattern(),
PulsarClientImpl.this,
conf,
externalExecutorProvider.getExecutor(),
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
index 6f32188..5ae452e 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
@@ -27,7 +27,7 @@ public class TopicMessageImpl<T> implements Message<T> {
private final String topicName;
private final Message<T> msg;
- private final MessageId msgId;
+ private final TopicMessageIdImpl msgId;
TopicMessageImpl(String topicName,
Message<T> msg) {
@@ -49,6 +49,10 @@ public class TopicMessageImpl<T> implements Message<T> {
return msgId;
}
+ public MessageId getInnerMessageId() {
+ return msgId.getInnerMessageId();
+ }
+
@Override
public Map<String, String> getProperties() {
return msg.getProperties();
--
To stop receiving notification emails like this one, please contact
[email protected].