merlimat closed pull request #1365: Delete PartitionedConsumerImpl, use
TopicsConsumerImpl instead
URL: https://github.com/apache/incubator-pulsar/pull/1365
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
index 617682169..08cd7f654 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
@@ -44,6 +44,7 @@
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.impl.TopicMessageImpl;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
@@ -323,13 +324,11 @@ public void testSimpleConsumerEventsWithPartition()
throws Exception {
final String subName = "sub1";
final int numMsgs = 100;
Set<String> uniqueMessages = new HashSet<>();
-
admin.persistentTopics().createPartitionedTopic(topicName,
numPartitions);
ConsumerBuilder<byte[]> consumerBuilder =
pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
.subscriptionType(SubscriptionType.Failover);
-
// 1. two consumers on the same subscription
ActiveInactiveListenerEvent listener1 = new
ActiveInactiveListenerEvent();
ActiveInactiveListenerEvent listener2 = new
ActiveInactiveListenerEvent();
@@ -374,7 +373,7 @@ public void testSimpleConsumerEventsWithPartition() throws
Exception {
}
totalMessages++;
consumer1.acknowledge(msg);
- MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId();
+ MessageIdImpl msgId = (MessageIdImpl)
(((TopicMessageImpl)msg).getInnerMessageId());
receivedPtns.add(msgId.getPartitionIndex());
}
@@ -391,7 +390,7 @@ public void testSimpleConsumerEventsWithPartition() throws
Exception {
}
totalMessages++;
consumer2.acknowledge(msg);
- MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId();
+ MessageIdImpl msgId = (MessageIdImpl)
(((TopicMessageImpl)msg).getInnerMessageId());
receivedPtns.add(msgId.getPartitionIndex());
}
assertTrue(Sets.difference(listener1.inactivePtns,
receivedPtns).isEmpty());
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java
index 4a9912dce..6d6fc924c 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java
@@ -208,7 +208,8 @@ public void partitionedProducerSendAsync() throws
PulsarClientException, PulsarA
Assert.assertEquals(messageIds.size(), numberOfMessages, "Not all
messages published successfully");
for (int i = 0; i < numberOfMessages; i++) {
- MessageId messageId = consumer.receive().getMessageId();
+ MessageId topicMessageId = consumer.receive().getMessageId();
+ MessageId messageId =
((TopicMessageIdImpl)topicMessageId).getInnerMessageId();
log.info("Message ID Received = " + messageId);
Assert.assertTrue(messageIds.remove(messageId), "Failed to receive
Message");
}
@@ -247,7 +248,9 @@ public void partitionedProducerSend() throws
PulsarClientException, PulsarAdminE
Assert.assertEquals(messageIds.size(), numberOfMessages, "Not all
messages published successfully");
for (int i = 0; i < numberOfMessages; i++) {
-
Assert.assertTrue(messageIds.remove(consumer.receive().getMessageId()), "Failed
to receive Message");
+ MessageId topicMessageId = consumer.receive().getMessageId();
+ MessageId messageId =
((TopicMessageIdImpl)topicMessageId).getInnerMessageId();
+ Assert.assertTrue(messageIds.remove(messageId), "Failed to receive
Message");
}
log.info("Message IDs = " + messageIds);
Assert.assertEquals(messageIds.size(), 0, "Not all messages received
successfully");
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
index 4f9cc7002..b3e74f7b9 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
@@ -161,13 +161,13 @@ public void testBinaryProtoToGetTopicsOfNamespace()
throws Exception {
.subscribe();
// 4. verify consumer get methods, to get right number of partitions
and topics.
- assertSame(pattern, ((PatternTopicsConsumerImpl<?>)
consumer).getPattern());
- List<String> topics = ((PatternTopicsConsumerImpl<?>)
consumer).getPartitionedTopics();
- List<ConsumerImpl<byte[]>> consumers =
((PatternTopicsConsumerImpl<byte[]>) consumer).getConsumers();
+ assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>)
consumer).getPattern());
+ List<String> topics = ((PatternMultiTopicsConsumerImpl<?>)
consumer).getPartitionedTopics();
+ List<ConsumerImpl<byte[]>> consumers =
((PatternMultiTopicsConsumerImpl<byte[]>) consumer).getConsumers();
assertEquals(topics.size(), 6);
assertEquals(consumers.size(), 6);
- assertEquals(((PatternTopicsConsumerImpl<?>)
consumer).getTopics().size(), 3);
+ assertEquals(((PatternMultiTopicsConsumerImpl<?>)
consumer).getTopics().size(), 3);
topics.forEach(topic -> log.debug("topic: {}", topic));
consumers.forEach(c -> log.debug("consumer: {}", c.getTopic()));
@@ -175,7 +175,7 @@ public void testBinaryProtoToGetTopicsOfNamespace() throws
Exception {
IntStream.range(0, topics.size()).forEach(index ->
assertTrue(topics.get(index).equals(consumers.get(index).getTopic())));
- ((PatternTopicsConsumerImpl<?>) consumer).getTopics().forEach(topic ->
log.debug("getTopics topic: {}", topic));
+ ((PatternMultiTopicsConsumerImpl<?>)
consumer).getTopics().forEach(topic -> log.debug("getTopics topic: {}", topic));
// 5. produce data
for (int i = 0; i < totalMessages / 3; i++) {
@@ -235,8 +235,8 @@ public void testTopicsListMinus() throws Exception {
List<String> oldNames = Lists.newArrayList(topicName1, topicName2,
topicName3, topicName4);
List<String> newNames = Lists.newArrayList(topicName3, topicName4,
topicName5, topicName6);
- List<String> addedNames =
PatternTopicsConsumerImpl.topicsListsMinus(newNames, oldNames);
- List<String> removedNames =
PatternTopicsConsumerImpl.topicsListsMinus(oldNames, newNames);
+ List<String> addedNames =
PatternMultiTopicsConsumerImpl.topicsListsMinus(newNames, oldNames);
+ List<String> removedNames =
PatternMultiTopicsConsumerImpl.topicsListsMinus(oldNames, newNames);
assertTrue(addedNames.size() == 2 &&
addedNames.contains(topicName5) &&
@@ -246,21 +246,21 @@ public void testTopicsListMinus() throws Exception {
removedNames.contains(topicName2));
// totally 2 different list, should return content of first lists.
- List<String> addedNames2 =
PatternTopicsConsumerImpl.topicsListsMinus(addedNames, removedNames);
+ List<String> addedNames2 =
PatternMultiTopicsConsumerImpl.topicsListsMinus(addedNames, removedNames);
assertTrue(addedNames2.size() == 2 &&
addedNames2.contains(topicName5) &&
addedNames2.contains(topicName6));
// 2 same list, should return empty list.
- List<String> addedNames3 =
PatternTopicsConsumerImpl.topicsListsMinus(addedNames, addedNames);
+ List<String> addedNames3 =
PatternMultiTopicsConsumerImpl.topicsListsMinus(addedNames, addedNames);
assertEquals(addedNames3.size(), 0);
// empty list minus: addedNames2.size = 2, addedNames3.size = 0
- List<String> addedNames4 =
PatternTopicsConsumerImpl.topicsListsMinus(addedNames2, addedNames3);
+ List<String> addedNames4 =
PatternMultiTopicsConsumerImpl.topicsListsMinus(addedNames2, addedNames3);
assertTrue(addedNames4.size() == addedNames2.size());
addedNames4.forEach(name -> assertTrue(addedNames2.contains(name)));
- List<String> addedNames5 =
PatternTopicsConsumerImpl.topicsListsMinus(addedNames3, addedNames2);
+ List<String> addedNames5 =
PatternMultiTopicsConsumerImpl.topicsListsMinus(addedNames3, addedNames2);
assertEquals(addedNames5.size(), 0);
}
@@ -290,10 +290,10 @@ public void testStartEmptyPatternConsumer() throws
Exception {
.subscribe();
// 3. verify consumer get methods, to get 0 number of partitions and
topics.
- assertSame(pattern, ((PatternTopicsConsumerImpl<?>)
consumer).getPattern());
- assertEquals(((PatternTopicsConsumerImpl<?>)
consumer).getPartitionedTopics().size(), 0);
- assertEquals(((PatternTopicsConsumerImpl<?>)
consumer).getConsumers().size(), 0);
- assertEquals(((PatternTopicsConsumerImpl<?>)
consumer).getTopics().size(), 0);
+ assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>)
consumer).getPattern());
+ assertEquals(((PatternMultiTopicsConsumerImpl<?>)
consumer).getPartitionedTopics().size(), 0);
+ assertEquals(((PatternMultiTopicsConsumerImpl<?>)
consumer).getConsumers().size(), 0);
+ assertEquals(((PatternMultiTopicsConsumerImpl<?>)
consumer).getTopics().size(), 0);
// 4. create producer
String messagePredicate = "my-message-" + key + "-";
@@ -310,15 +310,15 @@ public void testStartEmptyPatternConsumer() throws
Exception {
// 5. call recheckTopics to subscribe each added topics above
log.debug("recheck topics change");
- PatternTopicsConsumerImpl<byte[]> consumer1 =
((PatternTopicsConsumerImpl<byte[]>) consumer);
+ PatternMultiTopicsConsumerImpl<byte[]> consumer1 =
((PatternMultiTopicsConsumerImpl<byte[]>) consumer);
consumer1.run(consumer1.getRecheckPatternTimeout());
Thread.sleep(100);
// 6. verify consumer get methods, to get number of partitions and
topics, value 6=1+2+3.
- assertSame(pattern, ((PatternTopicsConsumerImpl<?>)
consumer).getPattern());
- assertEquals(((PatternTopicsConsumerImpl<?>)
consumer).getPartitionedTopics().size(), 6);
- assertEquals(((PatternTopicsConsumerImpl<?>)
consumer).getConsumers().size(), 6);
- assertEquals(((PatternTopicsConsumerImpl<?>)
consumer).getTopics().size(), 3);
+ assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>)
consumer).getPattern());
+ assertEquals(((PatternMultiTopicsConsumerImpl<?>)
consumer).getPartitionedTopics().size(), 6);
+ assertEquals(((PatternMultiTopicsConsumerImpl<?>)
consumer).getConsumers().size(), 6);
+ assertEquals(((PatternMultiTopicsConsumerImpl<?>)
consumer).getTopics().size(), 3);
// 7. produce data
@@ -384,13 +384,13 @@ public void testAutoSubscribePatternConsumer() throws
Exception {
.receiverQueueSize(4)
.subscribe();
- assertTrue(consumer instanceof PatternTopicsConsumerImpl);
+ assertTrue(consumer instanceof PatternMultiTopicsConsumerImpl);
// 4. verify consumer get methods, to get 6 number of partitions and
topics: 6=1+2+3
- assertSame(pattern, ((PatternTopicsConsumerImpl<?>)
consumer).getPattern());
- assertEquals(((PatternTopicsConsumerImpl<?>)
consumer).getPartitionedTopics().size(), 6);
- assertEquals(((PatternTopicsConsumerImpl<?>)
consumer).getConsumers().size(), 6);
- assertEquals(((PatternTopicsConsumerImpl<?>)
consumer).getTopics().size(), 3);
+ assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>)
consumer).getPattern());
+ assertEquals(((PatternMultiTopicsConsumerImpl<?>)
consumer).getPartitionedTopics().size(), 6);
+ assertEquals(((PatternMultiTopicsConsumerImpl<?>)
consumer).getConsumers().size(), 6);
+ assertEquals(((PatternMultiTopicsConsumerImpl<?>)
consumer).getTopics().size(), 3);
// 5. produce data to topic 1,2,3; verify should receive all the
message
for (int i = 0; i < totalMessages / 3; i++) {
@@ -419,12 +419,12 @@ public void testAutoSubscribePatternConsumer() throws
Exception {
// 7. call recheckTopics to subscribe each added topics above, verify
topics number: 10=1+2+3+4
log.debug("recheck topics change");
- PatternTopicsConsumerImpl<byte[]> consumer1 =
((PatternTopicsConsumerImpl<byte[]>) consumer);
+ PatternMultiTopicsConsumerImpl<byte[]> consumer1 =
((PatternMultiTopicsConsumerImpl<byte[]>) consumer);
consumer1.run(consumer1.getRecheckPatternTimeout());
Thread.sleep(100);
- assertEquals(((PatternTopicsConsumerImpl<?>)
consumer).getPartitionedTopics().size(), 10);
- assertEquals(((PatternTopicsConsumerImpl<?>)
consumer).getConsumers().size(), 10);
- assertEquals(((PatternTopicsConsumerImpl<?>)
consumer).getTopics().size(), 4);
+ assertEquals(((PatternMultiTopicsConsumerImpl<?>)
consumer).getPartitionedTopics().size(), 10);
+ assertEquals(((PatternMultiTopicsConsumerImpl<?>)
consumer).getConsumers().size(), 10);
+ assertEquals(((PatternMultiTopicsConsumerImpl<?>)
consumer).getTopics().size(), 4);
// 8. produce data to topic3 and topic4, verify should receive all the
message
for (int i = 0; i < totalMessages / 2; i++) {
@@ -487,13 +487,13 @@ public void testAutoUnbubscribePatternConsumer() throws
Exception {
.receiverQueueSize(4)
.subscribe();
- assertTrue(consumer instanceof PatternTopicsConsumerImpl);
+ assertTrue(consumer instanceof PatternMultiTopicsConsumerImpl);
// 4. verify consumer get methods, to get 0 number of partitions and
topics: 6=1+2+3
- assertSame(pattern, ((PatternTopicsConsumerImpl<?>)
consumer).getPattern());
- assertEquals(((PatternTopicsConsumerImpl<?>)
consumer).getPartitionedTopics().size(), 6);
- assertEquals(((PatternTopicsConsumerImpl<?>)
consumer).getConsumers().size(), 6);
- assertEquals(((PatternTopicsConsumerImpl<?>)
consumer).getTopics().size(), 3);
+ assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>)
consumer).getPattern());
+ assertEquals(((PatternMultiTopicsConsumerImpl<?>)
consumer).getPartitionedTopics().size(), 6);
+ assertEquals(((PatternMultiTopicsConsumerImpl<?>)
consumer).getConsumers().size(), 6);
+ assertEquals(((PatternMultiTopicsConsumerImpl<?>)
consumer).getTopics().size(), 3);
// 5. produce data to topic 1,2,3; verify should receive all the
message
for (int i = 0; i < totalMessages / 3; i++) {
@@ -521,12 +521,12 @@ public void testAutoUnbubscribePatternConsumer() throws
Exception {
// 7. call recheckTopics to unsubscribe topic 1,3 , verify topics
number: 2=6-1-3
log.debug("recheck topics change");
- PatternTopicsConsumerImpl<byte[]> consumer1 =
((PatternTopicsConsumerImpl<byte[]>) consumer);
+ PatternMultiTopicsConsumerImpl<byte[]> consumer1 =
((PatternMultiTopicsConsumerImpl<byte[]>) consumer);
consumer1.run(consumer1.getRecheckPatternTimeout());
Thread.sleep(100);
- assertEquals(((PatternTopicsConsumerImpl<byte[]>)
consumer).getPartitionedTopics().size(), 2);
- assertEquals(((PatternTopicsConsumerImpl<byte[]>)
consumer).getConsumers().size(), 2);
- assertEquals(((PatternTopicsConsumerImpl<byte[]>)
consumer).getTopics().size(), 1);
+ assertEquals(((PatternMultiTopicsConsumerImpl<byte[]>)
consumer).getPartitionedTopics().size(), 2);
+ assertEquals(((PatternMultiTopicsConsumerImpl<byte[]>)
consumer).getConsumers().size(), 2);
+ assertEquals(((PatternMultiTopicsConsumerImpl<byte[]>)
consumer).getTopics().size(), 1);
// 8. produce data to topic2, verify should receive all the message
for (int i = 0; i < totalMessages; i++) {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PerMessageUnAcknowledgedRedeliveryTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PerMessageUnAcknowledgedRedeliveryTest.java
index 7a94cc26e..053cb5ea1 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PerMessageUnAcknowledgedRedeliveryTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PerMessageUnAcknowledgedRedeliveryTest.java
@@ -338,7 +338,7 @@ public void testFailoverAckedNormalTopic() throws Exception
{
}
private static long
getUnackedMessagesCountInPartitionedConsumer(Consumer<byte[]> c) {
- PartitionedConsumerImpl<byte[]> pc = (PartitionedConsumerImpl<byte[]>)
c;
+ MultiTopicsConsumerImpl<byte[]> pc = (MultiTopicsConsumerImpl<byte[]>)
c;
return pc.getUnAckedMessageTracker().size()
+ pc.getConsumers().stream().mapToLong(consumer ->
consumer.getUnAckedMessageTracker().size()).sum();
}
@@ -405,8 +405,8 @@ public void testSharedAckedPartitionedTopic() throws
Exception {
assertEquals(received, 5);
// 7. Simulate ackTimeout
- ((PartitionedConsumerImpl<byte[]>)
consumer).getUnAckedMessageTracker().toggle();
- ((PartitionedConsumerImpl<byte[]>) consumer).getConsumers().forEach(c
-> c.getUnAckedMessageTracker().toggle());
+ ((MultiTopicsConsumerImpl<byte[]>)
consumer).getUnAckedMessageTracker().toggle();
+ ((MultiTopicsConsumerImpl<byte[]>) consumer).getConsumers().forEach(c
-> c.getUnAckedMessageTracker().toggle());
// 8. producer publish more messages
for (int i = 0; i < totalMessages / 3; i++) {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
index 952dfac22..6aa574ddf 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
@@ -116,10 +116,10 @@ public void testGetConsumersAndGetTopics() throws
Exception {
.ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
.receiverQueueSize(4)
.subscribe();
- assertTrue(consumer instanceof TopicsConsumerImpl);
+ assertTrue(consumer instanceof MultiTopicsConsumerImpl);
- List<String> topics = ((TopicsConsumerImpl<byte[]>)
consumer).getPartitionedTopics();
- List<ConsumerImpl<byte[]>> consumers = ((TopicsConsumerImpl)
consumer).getConsumers();
+ List<String> topics = ((MultiTopicsConsumerImpl<byte[]>)
consumer).getPartitionedTopics();
+ List<ConsumerImpl<byte[]>> consumers = ((MultiTopicsConsumerImpl)
consumer).getConsumers();
topics.forEach(topic -> log.info("topic: {}", topic));
consumers.forEach(c -> log.info("consumer: {}", c.getTopic()));
@@ -127,7 +127,7 @@ public void testGetConsumersAndGetTopics() throws Exception
{
IntStream.range(0, 6).forEach(index ->
assertTrue(topics.get(index).equals(consumers.get(index).getTopic())));
- assertTrue(((TopicsConsumerImpl<byte[]>) consumer).getTopics().size()
== 3);
+ assertTrue(((MultiTopicsConsumerImpl<byte[]>)
consumer).getTopics().size() == 3);
consumer.unsubscribe();
consumer.close();
@@ -167,7 +167,7 @@ public void testSyncProducerAndConsumer() throws Exception {
.ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
.receiverQueueSize(4)
.subscribe();
- assertTrue(consumer instanceof TopicsConsumerImpl);
+ assertTrue(consumer instanceof MultiTopicsConsumerImpl);
// 3. producer publish messages
for (int i = 0; i < totalMessages / 3; i++) {
@@ -228,7 +228,7 @@ public void testAsyncConsumer() throws Exception {
.ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
.receiverQueueSize(4)
.subscribe();
- assertTrue(consumer instanceof TopicsConsumerImpl);
+ assertTrue(consumer instanceof MultiTopicsConsumerImpl);
// Asynchronously produce messages
List<Future<MessageId>> futures = Lists.newArrayList();
@@ -307,7 +307,7 @@ public void testConsumerUnackedRedelivery() throws
Exception {
.ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
.receiverQueueSize(4)
.subscribe();
- assertTrue(consumer instanceof TopicsConsumerImpl);
+ assertTrue(consumer instanceof MultiTopicsConsumerImpl);
// 3. producer publish messages
for (int i = 0; i < totalMessages / 3; i++) {
@@ -323,7 +323,7 @@ public void testConsumerUnackedRedelivery() throws
Exception {
log.debug("Consumer received : " + new String(message.getData()));
message = consumer.receive(500, TimeUnit.MILLISECONDS);
}
- long size = ((TopicsConsumerImpl<byte[]>)
consumer).getUnAckedMessageTracker().size();
+ long size = ((MultiTopicsConsumerImpl<byte[]>)
consumer).getUnAckedMessageTracker().size();
log.debug(key + " Unacked Message Tracker size is " + size);
assertEquals(size, totalMessages);
@@ -338,7 +338,7 @@ public void testConsumerUnackedRedelivery() throws
Exception {
message = consumer.receive(500, TimeUnit.MILLISECONDS);
} while (message != null);
- size = ((TopicsConsumerImpl<byte[]>)
consumer).getUnAckedMessageTracker().size();
+ size = ((MultiTopicsConsumerImpl<byte[]>)
consumer).getUnAckedMessageTracker().size();
log.debug(key + " Unacked Message Tracker size is " + size);
assertEquals(size, 0);
assertEquals(hSet.size(), totalMessages);
@@ -361,14 +361,14 @@ public void testConsumerUnackedRedelivery() throws
Exception {
consumer.acknowledge(message);
message = consumer.receive(100, TimeUnit.MILLISECONDS);
}
- size = ((TopicsConsumerImpl<byte[]>)
consumer).getUnAckedMessageTracker().size();
+ size = ((MultiTopicsConsumerImpl<byte[]>)
consumer).getUnAckedMessageTracker().size();
log.debug(key + " Unacked Message Tracker size is " + size);
assertEquals(size, 0);
assertEquals(received, totalMessages);
// 8. Simulate ackTimeout
- ((TopicsConsumerImpl<byte[]>)
consumer).getUnAckedMessageTracker().toggle();
- ((TopicsConsumerImpl<byte[]>) consumer).getConsumers().forEach(c ->
c.getUnAckedMessageTracker().toggle());
+ ((MultiTopicsConsumerImpl<byte[]>)
consumer).getUnAckedMessageTracker().toggle();
+ ((MultiTopicsConsumerImpl<byte[]>) consumer).getConsumers().forEach(c
-> c.getUnAckedMessageTracker().toggle());
// 9. producer publish more messages
for (int i = 0; i < totalMessages / 3; i++) {
@@ -384,7 +384,7 @@ public void testConsumerUnackedRedelivery() throws
Exception {
log.debug("Consumer received : " + data);
message = consumer.receive(100, TimeUnit.MILLISECONDS);
}
- size = ((TopicsConsumerImpl<byte[]>)
consumer).getUnAckedMessageTracker().size();
+ size = ((MultiTopicsConsumerImpl<byte[]>)
consumer).getUnAckedMessageTracker().size();
log.debug(key + " Unacked Message Tracker size is " + size);
assertEquals(size, 30);
@@ -402,7 +402,7 @@ public void testConsumerUnackedRedelivery() throws
Exception {
message = consumer.receive(100, TimeUnit.MILLISECONDS);
}
assertEquals(redelivered, 30);
- size = ((TopicsConsumerImpl<byte[]>)
consumer).getUnAckedMessageTracker().size();
+ size = ((MultiTopicsConsumerImpl<byte[]>)
consumer).getUnAckedMessageTracker().size();
log.info(key + " Unacked Message Tracker size is " + size);
assertEquals(size, 0);
@@ -447,7 +447,7 @@ public void testSubscribeUnsubscribeSingleTopic() throws
Exception {
.ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
.receiverQueueSize(4)
.subscribe();
- assertTrue(consumer instanceof TopicsConsumerImpl);
+ assertTrue(consumer instanceof MultiTopicsConsumerImpl);
// 3. producer publish messages
for (int i = 0; i < totalMessages / 3; i++) {
@@ -468,7 +468,7 @@ public void testSubscribeUnsubscribeSingleTopic() throws
Exception {
assertEquals(messageSet, totalMessages);
// 4, unsubscribe topic3
- CompletableFuture<Void> unsubFuture = ((TopicsConsumerImpl<byte[]>)
consumer).unsubscribeAsync(topicName3);
+ CompletableFuture<Void> unsubFuture =
((MultiTopicsConsumerImpl<byte[]>) consumer).unsubscribeAsync(topicName3);
unsubFuture.get();
// 5. producer publish messages
@@ -491,15 +491,15 @@ public void testSubscribeUnsubscribeSingleTopic() throws
Exception {
assertEquals(messageSet, totalMessages * 2 / 3);
// 7. use getter to verify internal topics number after un-subscribe
topic3
- List<String> topics = ((TopicsConsumerImpl<byte[]>)
consumer).getPartitionedTopics();
- List<ConsumerImpl<byte[]>> consumers = ((TopicsConsumerImpl)
consumer).getConsumers();
+ List<String> topics = ((MultiTopicsConsumerImpl<byte[]>)
consumer).getPartitionedTopics();
+ List<ConsumerImpl<byte[]>> consumers = ((MultiTopicsConsumerImpl)
consumer).getConsumers();
assertEquals(topics.size(), 3);
assertEquals(consumers.size(), 3);
- assertTrue(((TopicsConsumerImpl<byte[]>) consumer).getTopics().size()
== 2);
+ assertTrue(((MultiTopicsConsumerImpl<byte[]>)
consumer).getTopics().size() == 2);
// 8. re-subscribe topic3
- CompletableFuture<Void> subFuture =
((TopicsConsumerImpl<byte[]>)consumer).subscribeAsync(topicName3);
+ CompletableFuture<Void> subFuture =
((MultiTopicsConsumerImpl<byte[]>)consumer).subscribeAsync(topicName3);
subFuture.get();
// 9. producer publish messages
@@ -522,12 +522,12 @@ public void testSubscribeUnsubscribeSingleTopic() throws
Exception {
assertEquals(messageSet, totalMessages);
// 11. use getter to verify internal topics number after subscribe
topic3
- topics = ((TopicsConsumerImpl<byte[]>)
consumer).getPartitionedTopics();
- consumers = ((TopicsConsumerImpl) consumer).getConsumers();
+ topics = ((MultiTopicsConsumerImpl<byte[]>)
consumer).getPartitionedTopics();
+ consumers = ((MultiTopicsConsumerImpl) consumer).getConsumers();
assertEquals(topics.size(), 6);
assertEquals(consumers.size(), 6);
- assertTrue(((TopicsConsumerImpl<byte[]>) consumer).getTopics().size()
== 3);
+ assertTrue(((MultiTopicsConsumerImpl<byte[]>)
consumer).getTopics().size() == 3);
consumer.unsubscribe();
consumer.close();
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 2cbd64224..2a88bf051 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -914,7 +914,7 @@ protected synchronized void messageProcessed(Message<?>
msg) {
id = new MessageIdImpl(id.getLedgerId(), id.getEntryId(),
getPartitionIndex());
}
if (partitionIndex != -1) {
- // we should no longer track this message,
PartitionedConsumerImpl will take care from now onwards
+ // we should no longer track this message, TopicsConsumer will
take care from now onwards
unAckedMessageTracker.remove(id);
} else {
unAckedMessageTracker.add(id);
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
similarity index 74%
rename from
pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java
rename to
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 1089cef53..6da72ec08 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -56,7 +56,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class TopicsConsumerImpl<T> extends ConsumerBase<T> {
+public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
// All topics should be in same namespace
protected NamespaceName namespaceName;
@@ -76,15 +76,15 @@
private final int sharedQueueResumeThreshold;
// sum of topicPartitions, simple topic has 1, partitioned topic equals to
partition number.
- AtomicInteger numberTopicPartitions;
+ AtomicInteger allTopicPartitionsNumber;
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final ConsumerStatsRecorder stats;
private final UnAckedMessageTracker unAckedMessageTracker;
private final ConsumerConfigurationData<T> internalConfig;
- TopicsConsumerImpl(PulsarClientImpl client, ConsumerConfigurationData<T>
conf, ExecutorService listenerExecutor,
- CompletableFuture<Consumer<T>> subscribeFuture,
Schema<T> schema) {
+ MultiTopicsConsumerImpl(PulsarClientImpl client,
ConsumerConfigurationData<T> conf, ExecutorService listenerExecutor,
+ CompletableFuture<Consumer<T>> subscribeFuture,
Schema<T> schema) {
super(client, "TopicsConsumerFakeTopicName" +
ConsumerName.generateRandomName(), conf,
Math.max(2, conf.getReceiverQueueSize()), listenerExecutor,
subscribeFuture, schema);
@@ -95,7 +95,7 @@
this.consumers = new ConcurrentHashMap<>();
this.pausedConsumers = new ConcurrentLinkedQueue<>();
this.sharedQueueResumeThreshold = maxReceiverQueueSize / 2;
- this.numberTopicPartitions = new AtomicInteger(0);
+ this.allTopicPartitionsNumber = new AtomicInteger(0);
if (conf.getAckTimeoutMillis() != 0) {
this.unAckedMessageTracker = new
UnAckedTopicMessageTracker(client, this, conf.getAckTimeoutMillis());
@@ -109,7 +109,7 @@
if (conf.getTopicNames().isEmpty()) {
this.namespaceName = null;
setState(State.Ready);
- subscribeFuture().complete(TopicsConsumerImpl.this);
+ subscribeFuture().complete(MultiTopicsConsumerImpl.this);
return;
}
@@ -122,15 +122,15 @@
FutureUtil.waitForAll(futures)
.thenAccept(finalFuture -> {
try {
- if (numberTopicPartitions.get() > maxReceiverQueueSize) {
- setMaxReceiverQueueSize(numberTopicPartitions.get());
+ if (allTopicPartitionsNumber.get() > maxReceiverQueueSize)
{
+
setMaxReceiverQueueSize(allTopicPartitionsNumber.get());
}
setState(State.Ready);
// We have successfully created N consumers, so we can
start receiving messages now
startReceivingMessages(consumers.values().stream().collect(Collectors.toList()));
- subscribeFuture().complete(TopicsConsumerImpl.this);
+ subscribeFuture().complete(MultiTopicsConsumerImpl.this);
log.info("[{}] [{}] Created topics consumer with {}
sub-consumers",
- topic, subscription, numberTopicPartitions.get());
+ topic, subscription, allTopicPartitionsNumber.get());
} catch (PulsarClientException e) {
log.warn("[{}] Failed startReceivingMessages while
subscribe topics: {}", topic, e.getMessage());
subscribeFuture.completeExceptionally(e);
@@ -245,7 +245,7 @@ private void messageReceived(ConsumerImpl<T> consumer,
Message<T> message) {
} else {
// Enqueue the message so that it can be retrieved when
application calls receive()
// Waits for the queue to have space for the message
- // This should never block cause TopicsConsumerImpl should
always use GrowableArrayBlockingQueue
+ // This should never block cause MultiTopicsConsumerImpl
should always use GrowableArrayBlockingQueue
incomingMessages.put(topicMessage);
}
} catch (InterruptedException e) {
@@ -271,7 +271,7 @@ private void messageReceived(ConsumerImpl<T> consumer,
Message<T> message) {
log.debug("[{}][{}] Calling message listener for
message {}",
topic, subscription, message.getMessageId());
}
- listener.received(TopicsConsumerImpl.this, msg);
+ listener.received(MultiTopicsConsumerImpl.this, msg);
} catch (Throwable t) {
log.error("[{}][{}] Message listener error in processing
message: {}",
topic, subscription, message, t);
@@ -613,117 +613,172 @@ private boolean topicNameValid(String topicName) {
}
CompletableFuture<Void> subscribeResult = new CompletableFuture<>();
- final AtomicInteger partitionNumber = new AtomicInteger(0);
- client.getPartitionedTopicMetadata(topicName).thenAccept(metadata -> {
- if (log.isDebugEnabled()) {
- log.debug("Received topic {} metadata.partitions: {}",
topicName, metadata.partitions);
- }
+ client.getPartitionedTopicMetadata(topicName)
+ .thenAccept(metadata -> subscribeTopicPartitions(subscribeResult,
topicName, metadata.partitions))
+ .exceptionally(ex1 -> {
+ log.warn("[{}] Failed to get partitioned topic metadata: {}",
topicName, ex1.getMessage());
+ subscribeResult.completeExceptionally(ex1);
+ return null;
+ });
- List<CompletableFuture<Consumer<T>>> futureList;
-
- if (metadata.partitions > 1) {
- this.topics.putIfAbsent(topicName, metadata.partitions);
- numberTopicPartitions.addAndGet(metadata.partitions);
- partitionNumber.addAndGet(metadata.partitions);
-
- futureList = IntStream
- .range(0, partitionNumber.get())
- .mapToObj(
- partitionIndex -> {
- String partitionName =
TopicName.get(topicName).getPartition(partitionIndex).toString();
- CompletableFuture<Consumer<T>> subFuture = new
CompletableFuture<>();
- ConsumerImpl<T> newConsumer = new
ConsumerImpl<>(client, partitionName, internalConfig,
-
client.externalExecutorProvider().getExecutor(), partitionIndex, subFuture,
schema);
- consumers.putIfAbsent(newConsumer.getTopic(),
newConsumer);
- return subFuture;
- })
- .collect(Collectors.toList());
- } else {
- this.topics.putIfAbsent(topicName, 1);
- numberTopicPartitions.incrementAndGet();
- partitionNumber.incrementAndGet();
+ return subscribeResult;
+ }
- CompletableFuture<Consumer<T>> subFuture = new
CompletableFuture<>();
- ConsumerImpl<T> newConsumer = new ConsumerImpl<>(client,
topicName, internalConfig,
- client.externalExecutorProvider().getExecutor(), 0,
subFuture, schema);
- consumers.putIfAbsent(newConsumer.getTopic(), newConsumer);
+ // create consumer for a single topic with already known partitions.
+ // first create a consumer with no topic, then do subscription for already
know partitionedTopic.
+ public static <T> MultiTopicsConsumerImpl<T>
createPartitionedConsumer(PulsarClientImpl client,
+
ConsumerConfigurationData<T> conf,
+
ExecutorService listenerExecutor,
+
CompletableFuture<Consumer<T>> subscribeFuture,
+ int
numPartitions,
+
Schema<T> schema) {
+ checkArgument(conf.getTopicNames().size() == 1, "Should have only 1
topic for partitioned consumer");
+
+ // get topic name, then remove it from conf, so constructor will
create a consumer with no topic.
+ ConsumerConfigurationData cloneConf = conf.clone();
+ String topicName = cloneConf.getSingleTopic();
+ cloneConf.getTopicNames().remove(topicName);
+
+ CompletableFuture<Consumer> future = new CompletableFuture<>();
+ MultiTopicsConsumerImpl consumer = new MultiTopicsConsumerImpl(client,
cloneConf, listenerExecutor, future, schema);
+
+ future.thenCompose(c ->
((MultiTopicsConsumerImpl)c).subscribeAsync(topicName, numPartitions))
+ .thenRun(()-> subscribeFuture.complete(consumer))
+ .exceptionally(e -> {
+ log.warn("Failed subscription for createPartitionedConsumer:
{} {}, e:{}",
+ topicName, numPartitions, e);
+
subscribeFuture.completeExceptionally(((Throwable)e).getCause());
+ return null;
+ });;
+ return consumer;
+ }
- futureList = Collections.singletonList(subFuture);
- }
+ // subscribe one more given topic, but already know the numberPartitions
+ private CompletableFuture<Void> subscribeAsync(String topicName, int
numberPartitions) {
+ if (!topicNameValid(topicName)) {
+ return FutureUtil.failedFuture(
+ new PulsarClientException.AlreadyClosedException("Topic name
not valid"));
+ }
- FutureUtil.waitForAll(futureList)
- .thenAccept(finalFuture -> {
- try {
- if (numberTopicPartitions.get() >
maxReceiverQueueSize) {
-
setMaxReceiverQueueSize(numberTopicPartitions.get());
- }
- int numTopics =
this.topics.values().stream().mapToInt(Integer::intValue).sum();
- checkState(numberTopicPartitions.get() == numTopics,
- "numberTopicPartitions " +
numberTopicPartitions.get()
- + " not equals expected: " + numTopics);
-
- // We have successfully created new consumers, so we
can start receiving messages for them
- startReceivingMessages(
- consumers.values().stream()
- .filter(consumer1 -> {
- String consumerTopicName =
consumer1.getTopic();
- if
(TopicName.get(consumerTopicName).getPartitionedTopicName().equals(
-
TopicName.get(topicName).getPartitionedTopicName().toString())) {
- return true;
- } else {
- return false;
- }
- })
- .collect(Collectors.toList()));
-
- subscribeResult.complete(null);
- log.info("[{}] [{}] Success subscribe new topic {} in
topics consumer, numberTopicPartitions {}",
- topic, subscription, topicName,
numberTopicPartitions.get());
- if (this.namespaceName == null) {
- this.namespaceName =
TopicName.get(topicName).getNamespaceObject();
- }
- return;
- } catch (PulsarClientException e) {
- handleSubscribeOneTopicError(topicName, e);
- subscribeResult.completeExceptionally(e);
- }
- })
- .exceptionally(ex -> {
- handleSubscribeOneTopicError(topicName, ex);
- subscribeResult.completeExceptionally(ex);
- return null;
- });
- }).exceptionally(ex1 -> {
- log.warn("[{}] Failed to get partitioned topic metadata: {}",
topicName, ex1.getMessage());
- subscribeResult.completeExceptionally(ex1);
- return null;
- });
+ if (getState() == State.Closing || getState() == State.Closed) {
+ return FutureUtil.failedFuture(
+ new PulsarClientException.AlreadyClosedException("Topics
Consumer was already closed"));
+ }
+
+ CompletableFuture<Void> subscribeResult = new CompletableFuture<>();
+ subscribeTopicPartitions(subscribeResult, topicName, numberPartitions);
return subscribeResult;
}
- // handling failure during subscribe new topic, unsubscribe success
created partitions
- private void handleSubscribeOneTopicError(String topicName, Throwable
error) {
- log.warn("[{}] Failed to subscribe for topic [{}] in topics consumer
", topic, topicName, error.getMessage());
+ private void subscribeTopicPartitions(CompletableFuture<Void>
subscribeResult, String topicName, int partitionNumber) {
+ if (log.isDebugEnabled()) {
+ log.debug("Subscribe to topic {} metadata.partitions: {}",
topicName, partitionNumber);
+ }
- consumers.values().stream().filter(consumer1 -> {
- String consumerTopicName = consumer1.getTopic();
- if
(TopicName.get(consumerTopicName).getPartitionedTopicName().equals(topicName)) {
- return true;
- } else {
- return false;
- }
- }).forEach(consumer2 -> {
- consumer2.closeAsync().handle((ok, closeException) -> {
- consumer2.subscribeFuture().completeExceptionally(error);
+ List<CompletableFuture<Consumer<T>>> futureList;
+
+ if (partitionNumber > 1) {
+ this.topics.putIfAbsent(topicName, partitionNumber);
+ allTopicPartitionsNumber.addAndGet(partitionNumber);
+
+ futureList = IntStream
+ .range(0, partitionNumber)
+ .mapToObj(
+ partitionIndex -> {
+ String partitionName =
TopicName.get(topicName).getPartition(partitionIndex).toString();
+ CompletableFuture<Consumer<T>> subFuture = new
CompletableFuture<>();
+ ConsumerImpl<T> newConsumer = new
ConsumerImpl<>(client, partitionName, internalConfig,
+ client.externalExecutorProvider().getExecutor(),
partitionIndex, subFuture, schema);
+ consumers.putIfAbsent(newConsumer.getTopic(),
newConsumer);
+ return subFuture;
+ })
+ .collect(Collectors.toList());
+ } else {
+ this.topics.putIfAbsent(topicName, 1);
+ allTopicPartitionsNumber.incrementAndGet();
+
+ CompletableFuture<Consumer<T>> subFuture = new
CompletableFuture<>();
+ ConsumerImpl<T> newConsumer = new ConsumerImpl<>(client,
topicName, internalConfig,
+ client.externalExecutorProvider().getExecutor(), 0, subFuture,
schema);
+ consumers.putIfAbsent(newConsumer.getTopic(), newConsumer);
+
+ futureList = Collections.singletonList(subFuture);
+ }
+
+ FutureUtil.waitForAll(futureList)
+ .thenAccept(finalFuture -> {
+ try {
+ if (allTopicPartitionsNumber.get() > maxReceiverQueueSize)
{
+
setMaxReceiverQueueSize(allTopicPartitionsNumber.get());
+ }
+ int numTopics =
this.topics.values().stream().mapToInt(Integer::intValue).sum();
+ checkState(allTopicPartitionsNumber.get() == numTopics,
+ "allTopicPartitionsNumber " +
allTopicPartitionsNumber.get()
+ + " not equals expected: " + numTopics);
+
+ // We have successfully created new consumers, so we can
start receiving messages for them
+ startReceivingMessages(
+ consumers.values().stream()
+ .filter(consumer1 -> {
+ String consumerTopicName =
consumer1.getTopic();
+ if
(TopicName.get(consumerTopicName).getPartitionedTopicName().equals(
+
TopicName.get(topicName).getPartitionedTopicName().toString())) {
+ return true;
+ } else {
+ return false;
+ }
+ })
+ .collect(Collectors.toList()));
+
+ subscribeResult.complete(null);
+ log.info("[{}] [{}] Success subscribe new topic {} in
topics consumer, partitions: {}, allTopicPartitionsNumber: {}",
+ topic, subscription, topicName, partitionNumber,
allTopicPartitionsNumber.get());
+ if (this.namespaceName == null) {
+ this.namespaceName =
TopicName.get(topicName).getNamespaceObject();
+ }
+ return;
+ } catch (PulsarClientException e) {
+ handleSubscribeOneTopicError(topicName, e,
subscribeResult);
+ }
+ })
+ .exceptionally(ex -> {
+ handleSubscribeOneTopicError(topicName, ex, subscribeResult);
return null;
});
- consumers.remove(consumer2.getTopic());
- });
+ }
- topics.remove(topicName);
- checkState(numberTopicPartitions.get() == consumers.values().size());
+ // handling failure during subscribe new topic, unsubscribe success
created partitions
+ private void handleSubscribeOneTopicError(String topicName, Throwable
error, CompletableFuture<Void> subscribeFuture) {
+ log.warn("[{}] Failed to subscribe for topic [{}] in topics consumer
{}", topic, topicName, error.getMessage());
+
+ client.externalExecutorProvider().getExecutor().submit(() -> {
+ AtomicInteger toCloseNum = new AtomicInteger(0);
+ consumers.values().stream().filter(consumer1 -> {
+ String consumerTopicName = consumer1.getTopic();
+ if
(TopicName.get(consumerTopicName).getPartitionedTopicName().equals(topicName)) {
+ return true;
+ } else {
+ return false;
+ }
+ }).forEach(consumer2 -> {
+ toCloseNum.incrementAndGet();
+ consumer2.closeAsync().whenComplete((r, ex) -> {
+ consumer2.subscribeFuture().completeExceptionally(error);
+ allTopicPartitionsNumber.decrementAndGet();
+ consumers.remove(consumer2.getTopic());
+ if (toCloseNum.decrementAndGet() == 0) {
+ log.warn("[{}] Failed to subscribe for topic [{}] in
topics consumer, subscribe error: {}",
+ topic, topicName, error.getMessage());
+ topics.remove(topicName);
+ checkState(allTopicPartitionsNumber.get() ==
consumers.values().size());
+ subscribeFuture.completeExceptionally(error);
+ }
+ return;
+ });
+ });
+ });
}
// un-subscribe a given topic
@@ -757,15 +812,15 @@ private void handleSubscribeOneTopicError(String
topicName, Throwable error) {
consumersToUnsub.forEach(consumer1 -> {
consumers.remove(consumer1.getTopic());
pausedConsumers.remove(consumer1);
- numberTopicPartitions.decrementAndGet();
+ allTopicPartitionsNumber.decrementAndGet();
});
topics.remove(topicName);
((UnAckedTopicMessageTracker)
unAckedMessageTracker).removeTopicMessages(topicName);
unsubscribeFuture.complete(null);
- log.info("[{}] [{}] [{}] Unsubscribed Topics Consumer,
numberTopicPartitions: {}",
- topicName, subscription, consumerName,
numberTopicPartitions);
+ log.info("[{}] [{}] [{}] Unsubscribed Topics Consumer,
allTopicPartitionsNumber: {}",
+ topicName, subscription, consumerName,
allTopicPartitionsNumber);
} else {
unsubscribeFuture.completeExceptionally(ex);
setState(State.Failed);
@@ -792,5 +847,5 @@ private void handleSubscribeOneTopicError(String topicName,
Throwable error) {
return consumers.values().stream().collect(Collectors.toList());
}
- private static final Logger log =
LoggerFactory.getLogger(TopicsConsumerImpl.class);
+ private static final Logger log =
LoggerFactory.getLogger(MultiTopicsConsumerImpl.class);
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
deleted file mode 100644
index 9c952dd22..000000000
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
+++ /dev/null
@@ -1,553 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.client.impl;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.stream.Collectors;
-
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
-import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
-import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.util.FutureUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-
-public class PartitionedConsumerImpl<T> extends ConsumerBase<T> {
-
- private final List<ConsumerImpl<T>> consumers;
-
- // Queue of partition consumers on which we have stopped calling
receiveAsync() because the
- // shared incoming queue was full
- private final ConcurrentLinkedQueue<ConsumerImpl<T>> pausedConsumers;
-
- // Threshold for the shared queue. When the size of the shared queue goes
below the threshold, we are going to
- // resume receiving from the paused consumer partitions
- private final int sharedQueueResumeThreshold;
-
- private final int numPartitions;
- private final ReadWriteLock lock = new ReentrantReadWriteLock();
- private final ConsumerStatsRecorderImpl stats;
- private final UnAckedMessageTracker unAckedMessageTracker;
-
- PartitionedConsumerImpl(PulsarClientImpl client,
ConsumerConfigurationData<T> conf, int numPartitions,
- ExecutorService listenerExecutor,
CompletableFuture<Consumer<T>> subscribeFuture, Schema<T> schema) {
- super(client, conf.getSingleTopic(), conf, Math.max(Math.max(2,
numPartitions), conf.getReceiverQueueSize()),
- listenerExecutor, subscribeFuture, schema);
- this.consumers = Lists.newArrayListWithCapacity(numPartitions);
- this.pausedConsumers = new ConcurrentLinkedQueue<>();
- this.sharedQueueResumeThreshold = maxReceiverQueueSize / 2;
- this.numPartitions = numPartitions;
-
- if (conf.getAckTimeoutMillis() != 0) {
- this.unAckedMessageTracker = new UnAckedMessageTracker(client,
this, conf.getAckTimeoutMillis());
- } else {
- this.unAckedMessageTracker =
UnAckedMessageTracker.UNACKED_MESSAGE_TRACKER_DISABLED;
- }
-
- stats = client.getConfiguration().getStatsIntervalSeconds() > 0 ? new
ConsumerStatsRecorderImpl() : null;
- checkArgument(conf.getReceiverQueueSize() > 0,
- "Receiver queue size needs to be greater than 0 for
Partitioned Topics");
- start();
- }
-
- private void start() {
- AtomicReference<Throwable> subscribeFail = new
AtomicReference<Throwable>();
- AtomicInteger completed = new AtomicInteger();
- ConsumerConfigurationData<T> internalConfig =
getInternalConsumerConfig();
- for (int partitionIndex = 0; partitionIndex < numPartitions;
partitionIndex++) {
- String partitionName =
TopicName.get(topic).getPartition(partitionIndex).toString();
- ConsumerImpl<T> consumer = new ConsumerImpl<>(client,
partitionName, internalConfig,
- client.externalExecutorProvider().getExecutor(),
partitionIndex, new CompletableFuture<>(), schema);
- consumers.add(consumer);
- consumer.subscribeFuture().handle((cons, subscribeException) -> {
- if (subscribeException != null) {
- setState(State.Failed);
- subscribeFail.compareAndSet(null, subscribeException);
- client.cleanupConsumer(this);
- }
- if (completed.incrementAndGet() == numPartitions) {
- if (subscribeFail.get() == null) {
- try {
- // We have successfully created N consumers, so we
can start receiving messages now
- starReceivingMessages();
- setState(State.Ready);
-
subscribeFuture().complete(PartitionedConsumerImpl.this);
- log.info("[{}] [{}] Created partitioned consumer",
topic, subscription);
- return null;
- } catch (PulsarClientException e) {
- subscribeFail.set(e);
- }
- }
- closeAsync().handle((ok, closeException) -> {
-
subscribeFuture().completeExceptionally(subscribeFail.get());
- client.cleanupConsumer(this);
- return null;
- });
- log.error("[{}] [{}] Could not create partitioned
consumer.", topic, subscription,
- subscribeFail.get().getCause());
- }
- return null;
- });
- }
- }
-
- private void starReceivingMessages() throws PulsarClientException {
- for (ConsumerImpl<T> consumer : consumers) {
-
consumer.sendFlowPermitsToBroker(consumer.getConnectionHandler().cnx(),
conf.getReceiverQueueSize());
- receiveMessageFromConsumer(consumer);
- }
- }
-
- private void receiveMessageFromConsumer(ConsumerImpl<T> consumer) {
- consumer.receiveAsync().thenAccept(message -> {
- // Process the message, add to the queue and trigger listener or
async callback
- messageReceived(message);
-
- // we're modifying pausedConsumers
- lock.writeLock().lock();
- try {
- int size = incomingMessages.size();
- if (size >= maxReceiverQueueSize
- || (size > sharedQueueResumeThreshold &&
!pausedConsumers.isEmpty())) {
- // mark this consumer to be resumed later: if No more
space left in shared queue,
- // or if any consumer is already paused (to create fair
chance for already paused consumers)
- pausedConsumers.add(consumer);
- } else {
- // Schedule next receiveAsync() if the incoming queue is
not full. Use a different thread to avoid
- // recursion and stack overflow
- client.eventLoopGroup().execute(() -> {
- receiveMessageFromConsumer(consumer);
- });
- }
- } finally {
- lock.writeLock().unlock();
- }
- });
- }
-
- private void resumeReceivingFromPausedConsumersIfNeeded() {
- lock.readLock().lock();
- try {
- if (incomingMessages.size() <= sharedQueueResumeThreshold &&
!pausedConsumers.isEmpty()) {
- while (true) {
- ConsumerImpl<T> consumer = pausedConsumers.poll();
- if (consumer == null) {
- break;
- }
-
- // if messages are readily available on consumer we will
attempt to writeLock on the same thread
- client.eventLoopGroup().execute(() -> {
- receiveMessageFromConsumer(consumer);
- });
- }
- }
- } finally {
- lock.readLock().unlock();
- }
- }
-
- @Override
- protected Message<T> internalReceive() throws PulsarClientException {
- Message<T> message;
- try {
- message = incomingMessages.take();
- unAckedMessageTracker.add((MessageIdImpl) message.getMessageId());
- resumeReceivingFromPausedConsumersIfNeeded();
- return message;
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new PulsarClientException(e);
- }
- }
-
- @Override
- protected Message<T> internalReceive(int timeout, TimeUnit unit) throws
PulsarClientException {
- Message<T> message;
- try {
- message = incomingMessages.poll(timeout, unit);
- if (message != null) {
- unAckedMessageTracker.add(message.getMessageId());
- }
- resumeReceivingFromPausedConsumersIfNeeded();
- return message;
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new PulsarClientException(e);
- }
- }
-
- @Override
- protected CompletableFuture<Message<T>> internalReceiveAsync() {
- CompletableFuture<Message<T>> result = new CompletableFuture<>();
- Message<T> message;
- try {
- lock.writeLock().lock();
- message = incomingMessages.poll(0, TimeUnit.SECONDS);
- if (message == null) {
- pendingReceives.add(result);
- } else {
- unAckedMessageTracker.add(message.getMessageId());
- resumeReceivingFromPausedConsumersIfNeeded();
- result.complete(message);
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- result.completeExceptionally(new PulsarClientException(e));
- } finally {
- lock.writeLock().unlock();
- }
-
- return result;
- }
-
- @Override
- protected CompletableFuture<Void> doAcknowledge(MessageId messageId,
AckType ackType,
- Map<String,Long>
properties) {
- checkArgument(messageId instanceof MessageIdImpl);
-
- if (getState() != State.Ready) {
- return FutureUtil.failedFuture(new PulsarClientException("Consumer
already closed"));
- }
-
- if (ackType == AckType.Cumulative) {
- return FutureUtil.failedFuture(new
PulsarClientException.NotSupportedException(
- "Cumulative acknowledge not supported for partitioned
topics"));
- } else {
-
- ConsumerImpl<T> consumer = consumers.get(((MessageIdImpl)
messageId).getPartitionIndex());
- return consumer.doAcknowledge(messageId, ackType,
properties).thenRun(() ->
- unAckedMessageTracker.remove(messageId));
- }
-
- }
-
- @Override
- public CompletableFuture<Void> unsubscribeAsync() {
- if (getState() == State.Closing || getState() == State.Closed) {
- return FutureUtil.failedFuture(
- new
PulsarClientException.AlreadyClosedException("Partitioned Consumer was already
closed"));
- }
- setState(State.Closing);
-
- AtomicReference<Throwable> unsubscribeFail = new
AtomicReference<Throwable>();
- AtomicInteger completed = new AtomicInteger(numPartitions);
- CompletableFuture<Void> unsubscribeFuture = new CompletableFuture<>();
- for (Consumer<T> consumer : consumers) {
- if (consumer != null) {
- consumer.unsubscribeAsync().handle((unsubscribed, ex) -> {
- if (ex != null) {
- unsubscribeFail.compareAndSet(null, ex);
- }
- if (completed.decrementAndGet() == 0) {
- if (unsubscribeFail.get() == null) {
- setState(State.Closed);
- unAckedMessageTracker.close();
- unsubscribeFuture.complete(null);
- log.info("[{}] [{}] Unsubscribed Partitioned
Consumer", topic, subscription);
- } else {
- setState(State.Failed);
-
unsubscribeFuture.completeExceptionally(unsubscribeFail.get());
- log.error("[{}] [{}] Could not unsubscribe
Partitioned Consumer", topic, subscription,
- unsubscribeFail.get().getCause());
- }
- }
-
- return null;
- });
- }
-
- }
-
- return unsubscribeFuture;
- }
-
- @Override
- public CompletableFuture<Void> closeAsync() {
- if (getState() == State.Closing || getState() == State.Closed) {
- unAckedMessageTracker.close();
- return CompletableFuture.completedFuture(null);
- }
- setState(State.Closing);
-
- AtomicReference<Throwable> closeFail = new
AtomicReference<Throwable>();
- AtomicInteger completed = new AtomicInteger(numPartitions);
- CompletableFuture<Void> closeFuture = new CompletableFuture<>();
- for (Consumer<T> consumer : consumers) {
- if (consumer != null) {
- consumer.closeAsync().handle((closed, ex) -> {
- if (ex != null) {
- closeFail.compareAndSet(null, ex);
- }
- if (completed.decrementAndGet() == 0) {
- if (closeFail.get() == null) {
- setState(State.Closed);
- unAckedMessageTracker.close();
- closeFuture.complete(null);
- log.info("[{}] [{}] Closed Partitioned Consumer",
topic, subscription);
- client.cleanupConsumer(this);
- // fail all pending-receive futures to notify
application
- failPendingReceive();
- } else {
- setState(State.Failed);
- closeFuture.completeExceptionally(closeFail.get());
- log.error("[{}] [{}] Could not close Partitioned
Consumer", topic, subscription,
- closeFail.get().getCause());
- }
- }
-
- return null;
- });
- }
-
- }
-
- return closeFuture;
- }
-
- private void failPendingReceive() {
- lock.readLock().lock();
- try {
- if (listenerExecutor != null && !listenerExecutor.isShutdown()) {
- while (!pendingReceives.isEmpty()) {
- CompletableFuture<Message<T>> receiveFuture =
pendingReceives.poll();
- if (receiveFuture != null) {
- receiveFuture.completeExceptionally(
- new
PulsarClientException.AlreadyClosedException("Consumer is already closed"));
- } else {
- break;
- }
- }
- }
- } finally {
- lock.readLock().unlock();
- }
- }
-
- @Override
- public boolean isConnected() {
- return consumers.stream().allMatch(ConsumerImpl::isConnected);
- }
-
- void messageReceived(Message<T> message) {
- lock.writeLock().lock();
- try {
- unAckedMessageTracker.add(message.getMessageId());
- if (log.isDebugEnabled()) {
- log.debug("[{}][{}] Received message from partitioned-consumer
{}", topic, subscription, message.getMessageId());
- }
- // if asyncReceive is waiting : return message to callback without
adding to incomingMessages queue
- if (!pendingReceives.isEmpty()) {
- CompletableFuture<Message<T>> receivedFuture =
pendingReceives.poll();
- listenerExecutor.execute(() ->
receivedFuture.complete(message));
- } else {
- // Enqueue the message so that it can be retrieved when
application calls receive()
- // Waits for the queue to have space for the message
- // This should never block cause PartitonedConsumerImpl should
always use GrowableArrayBlockingQueue
- incomingMessages.put(message);
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- } finally {
- lock.writeLock().unlock();
- }
-
- if (listener != null) {
- // Trigger the notification on the message listener in a separate
thread to avoid blocking the networking
- // thread while the message processing happens
- listenerExecutor.execute(() -> {
- Message<T> msg;
- try {
- msg = internalReceive();
- } catch (PulsarClientException e) {
- log.warn("[{}] [{}] Failed to dequeue the message for
listener", topic, subscription, e);
- return;
- }
-
- try {
- if (log.isDebugEnabled()) {
- log.debug("[{}][{}] Calling message listener for
message {}", topic, subscription, message.getMessageId());
- }
- listener.received(PartitionedConsumerImpl.this, msg);
- } catch (Throwable t) {
- log.error("[{}][{}] Message listener error in processing
message: {}", topic, subscription, message,
- t);
- }
- });
- }
- }
-
- @Override
- String getHandlerName() {
- return subscription;
- }
-
- private ConsumerConfigurationData<T> getInternalConsumerConfig() {
- ConsumerConfigurationData<T> internalConsumerConfig = new
ConsumerConfigurationData<>();
-
internalConsumerConfig.setReceiverQueueSize(conf.getReceiverQueueSize());
- internalConsumerConfig.setSubscriptionName(conf.getSubscriptionName());
- internalConsumerConfig.setSubscriptionType(conf.getSubscriptionType());
- internalConsumerConfig.setConsumerName(consumerName);
-
internalConsumerConfig.setAcknowledgementsGroupTimeMicros(conf.getAcknowledgementsGroupTimeMicros());
- internalConsumerConfig.setPriorityLevel(conf.getPriorityLevel());
- internalConsumerConfig.setProperties(conf.getProperties());
- internalConsumerConfig.setReadCompacted(conf.isReadCompacted());
- if (null != conf.getConsumerEventListener()) {
-
internalConsumerConfig.setConsumerEventListener(conf.getConsumerEventListener());
- }
-
- int receiverQueueSize = Math.min(conf.getReceiverQueueSize(),
- conf.getMaxTotalReceiverQueueSizeAcrossPartitions() /
numPartitions);
- internalConsumerConfig.setReceiverQueueSize(receiverQueueSize);
-
- if (conf.getCryptoKeyReader() != null) {
-
internalConsumerConfig.setCryptoKeyReader(conf.getCryptoKeyReader());
-
internalConsumerConfig.setCryptoFailureAction(conf.getCryptoFailureAction());
- }
- if (conf.getAckTimeoutMillis() != 0) {
-
internalConsumerConfig.setAckTimeoutMillis(conf.getAckTimeoutMillis());
- }
-
- return internalConsumerConfig;
- }
-
- @Override
- public void redeliverUnacknowledgedMessages() {
- synchronized (this) {
- for (ConsumerImpl<T> c : consumers) {
- c.redeliverUnacknowledgedMessages();
- }
- incomingMessages.clear();
- unAckedMessageTracker.clear();
- resumeReceivingFromPausedConsumersIfNeeded();
- }
- }
-
- @Override
- public void redeliverUnacknowledgedMessages(Set<MessageId> messageIds) {
- checkArgument(messageIds.stream().findFirst().get() instanceof
MessageIdImpl);
- if (conf.getSubscriptionType() != SubscriptionType.Shared) {
- // We cannot redeliver single messages if subscription type is not
Shared
- redeliverUnacknowledgedMessages();
- return;
- }
- removeExpiredMessagesFromQueue(messageIds);
- messageIds.stream()
- .map(messageId -> (MessageIdImpl)messageId)
- .collect(Collectors.groupingBy(MessageIdImpl::getPartitionIndex,
Collectors.toSet()))
- .forEach((partitionIndex, messageIds1) ->
- consumers.get(partitionIndex).redeliverUnacknowledgedMessages(
- messageIds1.stream().map(mid ->
(MessageId)mid).collect(Collectors.toSet())));
- resumeReceivingFromPausedConsumersIfNeeded();
- }
-
- @Override
- public void seek(MessageId messageId) throws PulsarClientException {
- try {
- seekAsync(messageId).get();
- } catch (ExecutionException e) {
- throw new PulsarClientException(e.getCause());
- } catch (InterruptedException e) {
- throw new PulsarClientException(e);
- }
- }
-
- @Override
- public CompletableFuture<Void> seekAsync(MessageId messageId) {
- return FutureUtil.failedFuture(new PulsarClientException("Seek
operation not supported on partitioned topics"));
- }
-
- List<ConsumerImpl<T>> getConsumers() {
- return consumers;
- }
-
- @Override
- public int getAvailablePermits() {
- return
consumers.stream().mapToInt(ConsumerImpl::getAvailablePermits).sum();
- }
-
- @Override
- public boolean hasReachedEndOfTopic() {
- return consumers.stream().allMatch(Consumer::hasReachedEndOfTopic);
- }
-
- @Override
- public int numMessagesInQueue() {
- return incomingMessages.size() +
consumers.stream().mapToInt(ConsumerImpl::numMessagesInQueue).sum();
- }
-
- @Override
- public synchronized ConsumerStatsRecorderImpl getStats() {
- if (stats == null) {
- return null;
- }
- stats.reset();
- for (int i = 0; i < numPartitions; i++) {
- stats.updateCumulativeStats(consumers.get(i).getStats());
- }
- return stats;
- }
-
- public UnAckedMessageTracker getUnAckedMessageTracker() {
- return unAckedMessageTracker;
- }
-
- private void removeExpiredMessagesFromQueue(Set<MessageId> messageIds) {
- Message<T> peek = incomingMessages.peek();
- if (peek != null) {
- if (!messageIds.contains(peek.getMessageId())) {
- // first message is not expired, then no message is expired in
queue.
- return;
- }
-
- // try not to remove elements that are added while we remove
- Message<T> message = incomingMessages.poll();
- while (message != null) {
- MessageIdImpl messageId = (MessageIdImpl)
message.getMessageId();
- if (!messageIds.contains(messageId)) {
- messageIds.add(messageId);
- break;
- }
- message = incomingMessages.poll();
- }
- }
- }
-
- private static final Logger log =
LoggerFactory.getLogger(PartitionedConsumerImpl.class);
-}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
similarity index 89%
rename from
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImpl.java
rename to
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
index f9cf550e5..d0b0c606c 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
@@ -41,17 +41,17 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class PatternTopicsConsumerImpl<T> extends TopicsConsumerImpl<T>
implements TimerTask {
+public class PatternMultiTopicsConsumerImpl<T> extends
MultiTopicsConsumerImpl<T> implements TimerTask {
private final Pattern topicsPattern;
private final TopicsChangedListener topicsChangeListener;
private volatile Timeout recheckPatternTimeout = null;
- public PatternTopicsConsumerImpl(Pattern topicsPattern,
- PulsarClientImpl client,
- ConsumerConfigurationData<T> conf,
- ExecutorService listenerExecutor,
- CompletableFuture<Consumer<T>>
subscribeFuture,
- Schema<T> schema) {
+ public PatternMultiTopicsConsumerImpl(Pattern topicsPattern,
+ PulsarClientImpl client,
+ ConsumerConfigurationData<T> conf,
+ ExecutorService listenerExecutor,
+ CompletableFuture<Consumer<T>>
subscribeFuture,
+ Schema<T> schema) {
super(client, conf, listenerExecutor, subscribeFuture, schema);
this.topicsPattern = topicsPattern;
@@ -86,7 +86,7 @@ public void run(Timeout timeout) throws Exception {
}
List<String> newTopics =
PulsarClientImpl.topicsPatternFilter(topics, topicsPattern);
- List<String> oldTopics =
PatternTopicsConsumerImpl.this.getTopics();
+ List<String> oldTopics =
PatternMultiTopicsConsumerImpl.this.getTopics();
futures.add(topicsChangeListener.onTopicsAdded(topicsListsMinus(newTopics,
oldTopics)));
futures.add(topicsChangeListener.onTopicsRemoved(topicsListsMinus(oldTopics,
newTopics)));
@@ -100,7 +100,7 @@ public void run(Timeout timeout) throws Exception {
});
// schedule the next re-check task
- client.timer().newTimeout(PatternTopicsConsumerImpl.this,
+ client.timer().newTimeout(PatternMultiTopicsConsumerImpl.this,
Math.min(1, conf.getPatternAutoDiscoveryPeriod()),
TimeUnit.MINUTES);
}
@@ -109,9 +109,9 @@ public Pattern getPattern() {
}
interface TopicsChangedListener {
- // unsubscribe and delete ConsumerImpl in the `consumers` map in
`TopicsConsumerImpl` based on added topics.
+ // unsubscribe and delete ConsumerImpl in the `consumers` map in
`MultiTopicsConsumerImpl` based on added topics.
CompletableFuture<Void> onTopicsRemoved(Collection<String>
removedTopics);
- // subscribe and create a list of new ConsumerImpl, added them to the
`consumers` map in `TopicsConsumerImpl`.
+ // subscribe and create a list of new ConsumerImpl, added them to the
`consumers` map in `MultiTopicsConsumerImpl`.
CompletableFuture<Void> onTopicsAdded(Collection<String> addedTopics);
}
@@ -181,5 +181,5 @@ Timeout getRecheckPatternTimeout() {
return recheckPatternTimeout;
}
- private static final Logger log =
LoggerFactory.getLogger(PatternTopicsConsumerImpl.class);
+ private static final Logger log =
LoggerFactory.getLogger(PatternMultiTopicsConsumerImpl.class);
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 47e953b5f..dd07cf9fc 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -384,8 +384,8 @@ public ClientConfigurationData getConfiguration() {
// gets the next single threaded executor from the list of
executors
ExecutorService listenerThread =
externalExecutorProvider.getExecutor();
if (metadata.partitions > 1) {
- consumer = new
PartitionedConsumerImpl<>(PulsarClientImpl.this, conf, metadata.partitions,
listenerThread,
- consumerSubscribedFuture, schema);
+ consumer =
MultiTopicsConsumerImpl.createPartitionedConsumer(PulsarClientImpl.this, conf,
+ listenerThread, consumerSubscribedFuture,
metadata.partitions, schema);
} else {
consumer = new ConsumerImpl<>(PulsarClientImpl.this, topic,
conf, listenerThread, -1,
consumerSubscribedFuture, schema);
@@ -406,7 +406,7 @@ public ClientConfigurationData getConfiguration() {
private <T> CompletableFuture<Consumer<T>>
multiTopicSubscribeAsync(ConsumerConfigurationData<T> conf, Schema<T> schema) {
CompletableFuture<Consumer<T>> consumerSubscribedFuture = new
CompletableFuture<>();
- ConsumerBase<T> consumer = new
TopicsConsumerImpl<>(PulsarClientImpl.this, conf,
+ ConsumerBase<T> consumer = new
MultiTopicsConsumerImpl<>(PulsarClientImpl.this, conf,
externalExecutorProvider.getExecutor(),
consumerSubscribedFuture, schema);
synchronized (consumers) {
@@ -436,7 +436,7 @@ public ClientConfigurationData getConfiguration() {
List<String> topicsList = topicsPatternFilter(topics,
conf.getTopicsPattern());
conf.getTopicNames().addAll(topicsList);
- ConsumerBase<T> consumer = new
PatternTopicsConsumerImpl<>(conf.getTopicsPattern(),
+ ConsumerBase<T> consumer = new
PatternMultiTopicsConsumerImpl<>(conf.getTopicsPattern(),
PulsarClientImpl.this,
conf,
externalExecutorProvider.getExecutor(),
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
index 6f3218821..5ae452ee5 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
@@ -27,7 +27,7 @@
private final String topicName;
private final Message<T> msg;
- private final MessageId msgId;
+ private final TopicMessageIdImpl msgId;
TopicMessageImpl(String topicName,
Message<T> msg) {
@@ -49,6 +49,10 @@ public MessageId getMessageId() {
return msgId;
}
+ public MessageId getInnerMessageId() {
+ return msgId.getInnerMessageId();
+ }
+
@Override
public Map<String, String> getProperties() {
return msg.getProperties();
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services