This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 9eb173a On multi-topic consumer, we shouldn't keep checking the
partitioned metadata (#10708)
9eb173a is described below
commit 9eb173af32888e4d1361fee2f0dd034a9def2636
Author: Matteo Merli <[email protected]>
AuthorDate: Fri May 28 22:15:52 2021 -0700
On multi-topic consumer, we shouldn't keep checking the partitioned
metadata (#10708)
* On multi-topic consumer, we shouldn't keep checking the partitioned
metadata
* Added NON_PARTITIONED constant
* Removed assertion that is now invalid
* Fixed handling of deleted partitioned topic
* Fixed re-subscribing same topic
---
.../client/impl/PatternTopicsConsumerImplTest.java | 44 ++++----
.../pulsar/client/impl/TopicsConsumerImplTest.java | 18 +--
.../client/impl/MultiTopicsConsumerImpl.java | 123 +++++++++++----------
.../impl/PatternMultiTopicsConsumerImpl.java | 13 ++-
.../client/impl/MultiTopicsConsumerImplTest.java | 43 +++++++
.../common/partition/PartitionedTopicMetadata.java | 5 +
6 files changed, 153 insertions(+), 93 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
index 55649ab..bf6edf4 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
@@ -183,12 +183,12 @@ public class PatternTopicsConsumerImplTest extends
ProducerConsumerBase {
// 4. verify consumer get methods, to get right number of partitions
and topics.
assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>)
consumer).getPattern());
- List<String> topics = ((PatternMultiTopicsConsumerImpl<?>)
consumer).getPartitionedTopics();
+ List<String> topics = ((PatternMultiTopicsConsumerImpl<?>)
consumer).getPartitions();
List<ConsumerImpl<byte[]>> consumers =
((PatternMultiTopicsConsumerImpl<byte[]>) consumer).getConsumers();
assertEquals(topics.size(), 6);
assertEquals(consumers.size(), 6);
- assertEquals(((PatternMultiTopicsConsumerImpl<?>)
consumer).getTopics().size(), 3);
+ assertEquals(((PatternMultiTopicsConsumerImpl<?>)
consumer).getPartitionedTopics().size(), 2);
topics.forEach(topic -> log.debug("topic: {}", topic));
consumers.forEach(c -> log.debug("consumer: {}", c.getTopic()));
@@ -196,7 +196,7 @@ public class PatternTopicsConsumerImplTest extends
ProducerConsumerBase {
IntStream.range(0, topics.size()).forEach(index ->
assertEquals(consumers.get(index).getTopic(), topics.get(index)));
- ((PatternMultiTopicsConsumerImpl<?>)
consumer).getTopics().forEach(topic -> log.debug("getTopics topic: {}", topic));
+ ((PatternMultiTopicsConsumerImpl<?>)
consumer).getPartitionedTopics().forEach(topic -> log.debug("getTopics topic:
{}", topic));
// 5. produce data
for (int i = 0; i < totalMessages / 3; i++) {
@@ -286,12 +286,12 @@ public class PatternTopicsConsumerImplTest extends
ProducerConsumerBase {
// 4. verify consumer get methods, to get right number of partitions
and topics.
assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>)
consumer).getPattern());
- List<String> topics = ((PatternMultiTopicsConsumerImpl<?>)
consumer).getPartitionedTopics();
+ List<String> topics = ((PatternMultiTopicsConsumerImpl<?>)
consumer).getPartitions();
List<ConsumerImpl<byte[]>> consumers =
((PatternMultiTopicsConsumerImpl<byte[]>) consumer).getConsumers();
assertEquals(topics.size(), 1);
assertEquals(consumers.size(), 1);
- assertEquals(((PatternMultiTopicsConsumerImpl<?>)
consumer).getTopics().size(), 1);
+ assertEquals(((PatternMultiTopicsConsumerImpl<?>)
consumer).getPartitionedTopics().size(), 0);
topics.forEach(topic -> log.debug("topic: {}", topic));
consumers.forEach(c -> log.debug("consumer: {}", c.getTopic()));
@@ -299,7 +299,7 @@ public class PatternTopicsConsumerImplTest extends
ProducerConsumerBase {
IntStream.range(0, topics.size()).forEach(index ->
assertEquals(consumers.get(index).getTopic(), topics.get(index)));
- ((PatternMultiTopicsConsumerImpl<?>)
consumer).getTopics().forEach(topic -> log.debug("getTopics topic: {}", topic));
+ ((PatternMultiTopicsConsumerImpl<?>)
consumer).getPartitionedTopics().forEach(topic -> log.debug("getTopics topic:
{}", topic));
// 5. produce data
for (int i = 0; i < totalMessages / 4; i++) {
@@ -377,12 +377,12 @@ public class PatternTopicsConsumerImplTest extends
ProducerConsumerBase {
// 4. verify consumer get methods, to get right number of partitions
and topics.
assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>)
consumer).getPattern());
- List<String> topics = ((PatternMultiTopicsConsumerImpl<?>)
consumer).getPartitionedTopics();
+ List<String> topics = ((PatternMultiTopicsConsumerImpl<?>)
consumer).getPartitions();
List<ConsumerImpl<byte[]>> consumers =
((PatternMultiTopicsConsumerImpl<byte[]>) consumer).getConsumers();
assertEquals(topics.size(), 7);
assertEquals(consumers.size(), 7);
- assertEquals(((PatternMultiTopicsConsumerImpl<?>)
consumer).getTopics().size(), 4);
+ assertEquals(((PatternMultiTopicsConsumerImpl<?>)
consumer).getPartitionedTopics().size(), 2);
topics.forEach(topic -> log.debug("topic: {}", topic));
consumers.forEach(c -> log.debug("consumer: {}", c.getTopic()));
@@ -390,7 +390,7 @@ public class PatternTopicsConsumerImplTest extends
ProducerConsumerBase {
IntStream.range(0, topics.size()).forEach(index ->
assertEquals(consumers.get(index).getTopic(), topics.get(index)));
- ((PatternMultiTopicsConsumerImpl<?>)
consumer).getTopics().forEach(topic -> log.debug("getTopics topic: {}", topic));
+ ((PatternMultiTopicsConsumerImpl<?>)
consumer).getPartitionedTopics().forEach(topic -> log.debug("getTopics topic:
{}", topic));
// 5. produce data
for (int i = 0; i < totalMessages / 4; i++) {
@@ -508,9 +508,9 @@ public class PatternTopicsConsumerImplTest extends
ProducerConsumerBase {
// 3. verify consumer get methods, to get 5 number of partitions and
topics.
assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>)
consumer).getPattern());
- assertEquals(((PatternMultiTopicsConsumerImpl<?>)
consumer).getPartitionedTopics().size(), 5);
+ assertEquals(((PatternMultiTopicsConsumerImpl<?>)
consumer).getPartitions().size(), 5);
assertEquals(((PatternMultiTopicsConsumerImpl<?>)
consumer).getConsumers().size(), 5);
- assertEquals(((PatternMultiTopicsConsumerImpl<?>)
consumer).getTopics().size(), 2);
+ assertEquals(((PatternMultiTopicsConsumerImpl<?>)
consumer).getPartitionedTopics().size(), 2);
// 4. create producer
String messagePredicate = "my-message-" + key + "-";
@@ -537,9 +537,9 @@ public class PatternTopicsConsumerImplTest extends
ProducerConsumerBase {
// 6. verify consumer get methods, to get number of partitions and
topics, value 6=1+2+3.
assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>)
consumer).getPattern());
- assertEquals(((PatternMultiTopicsConsumerImpl<?>)
consumer).getPartitionedTopics().size(), 6);
+ assertEquals(((PatternMultiTopicsConsumerImpl<?>)
consumer).getPartitions().size(), 6);
assertEquals(((PatternMultiTopicsConsumerImpl<?>)
consumer).getConsumers().size(), 6);
- assertEquals(((PatternMultiTopicsConsumerImpl<?>)
consumer).getTopics().size(), 3);
+ assertEquals(((PatternMultiTopicsConsumerImpl<?>)
consumer).getPartitionedTopics().size(), 2);
// 7. produce data
@@ -614,9 +614,9 @@ public class PatternTopicsConsumerImplTest extends
ProducerConsumerBase {
// 4. verify consumer get methods, to get 6 number of partitions and
topics: 6=1+2+3
assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>)
consumer).getPattern());
- assertEquals(((PatternMultiTopicsConsumerImpl<?>)
consumer).getPartitionedTopics().size(), 6);
+ assertEquals(((PatternMultiTopicsConsumerImpl<?>)
consumer).getPartitions().size(), 6);
assertEquals(((PatternMultiTopicsConsumerImpl<?>)
consumer).getConsumers().size(), 6);
- assertEquals(((PatternMultiTopicsConsumerImpl<?>)
consumer).getTopics().size(), 3);
+ assertEquals(((PatternMultiTopicsConsumerImpl<?>)
consumer).getPartitionedTopics().size(), 2);
// 5. produce data to topic 1,2,3; verify should receive all the
message
for (int i = 0; i < totalMessages / 3; i++) {
@@ -649,9 +649,9 @@ public class PatternTopicsConsumerImplTest extends
ProducerConsumerBase {
PatternMultiTopicsConsumerImpl<byte[]> consumer1 =
((PatternMultiTopicsConsumerImpl<byte[]>) consumer);
consumer1.run(consumer1.getRecheckPatternTimeout());
Thread.sleep(100);
- assertEquals(((PatternMultiTopicsConsumerImpl<?>)
consumer).getPartitionedTopics().size(), 10);
+ assertEquals(((PatternMultiTopicsConsumerImpl<?>)
consumer).getPartitions().size(), 10);
assertEquals(((PatternMultiTopicsConsumerImpl<?>)
consumer).getConsumers().size(), 10);
- assertEquals(((PatternMultiTopicsConsumerImpl<?>)
consumer).getTopics().size(), 4);
+ assertEquals(((PatternMultiTopicsConsumerImpl<?>)
consumer).getPartitionedTopics().size(), 3);
// 8. produce data to topic3 and topic4, verify should receive all the
message
for (int i = 0; i < totalMessages / 2; i++) {
@@ -723,9 +723,9 @@ public class PatternTopicsConsumerImplTest extends
ProducerConsumerBase {
// 4. verify consumer get methods, to get 0 number of partitions and
topics: 6=1+2+3
assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>)
consumer).getPattern());
- assertEquals(((PatternMultiTopicsConsumerImpl<?>)
consumer).getPartitionedTopics().size(), 6);
+ assertEquals(((PatternMultiTopicsConsumerImpl<?>)
consumer).getPartitions().size(), 6);
assertEquals(((PatternMultiTopicsConsumerImpl<?>)
consumer).getConsumers().size(), 6);
- assertEquals(((PatternMultiTopicsConsumerImpl<?>)
consumer).getTopics().size(), 3);
+ assertEquals(((PatternMultiTopicsConsumerImpl<?>)
consumer).getPartitionedTopics().size(), 2);
// 5. produce data to topic 1,2,3; verify should receive all the
message
for (int i = 0; i < totalMessages / 3; i++) {
@@ -757,9 +757,9 @@ public class PatternTopicsConsumerImplTest extends
ProducerConsumerBase {
PatternMultiTopicsConsumerImpl<byte[]> consumer1 =
((PatternMultiTopicsConsumerImpl<byte[]>) consumer);
consumer1.run(consumer1.getRecheckPatternTimeout());
Thread.sleep(100);
- assertEquals(((PatternMultiTopicsConsumerImpl<byte[]>)
consumer).getPartitionedTopics().size(), 2);
+ assertEquals(((PatternMultiTopicsConsumerImpl<byte[]>)
consumer).getPartitions().size(), 2);
assertEquals(((PatternMultiTopicsConsumerImpl<byte[]>)
consumer).getConsumers().size(), 2);
- assertEquals(((PatternMultiTopicsConsumerImpl<byte[]>)
consumer).getTopics().size(), 1);
+ assertEquals(((PatternMultiTopicsConsumerImpl<byte[]>)
consumer).getPartitionedTopics().size(), 1);
// 8. produce data to topic2, verify should receive all the message
for (int i = 0; i < totalMessages; i++) {
@@ -808,7 +808,7 @@ public class PatternTopicsConsumerImplTest extends
ProducerConsumerBase {
// 4. verify consumer get methods
assertSame(consumerImpl.getPattern(), pattern);
- assertEquals(consumerImpl.getTopics().size(), 2);
+ assertEquals(consumerImpl.getPartitionedTopics().size(), 0);
producer1.send("msg-1");
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
index cb89d33..898e534 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
@@ -148,7 +148,7 @@ public class TopicsConsumerImplTest extends
ProducerConsumerBase {
assertTrue(consumer instanceof MultiTopicsConsumerImpl);
assertTrue(consumer.getTopic().startsWith(MultiTopicsConsumerImpl.DUMMY_TOPIC_NAME_PREFIX));
- List<String> topics = ((MultiTopicsConsumerImpl<byte[]>)
consumer).getPartitionedTopics();
+ List<String> topics = ((MultiTopicsConsumerImpl<byte[]>)
consumer).getPartitions();
List<ConsumerImpl<byte[]>> consumers = ((MultiTopicsConsumerImpl)
consumer).getConsumers();
topics.forEach(topic -> log.info("topic: {}", topic));
@@ -157,7 +157,7 @@ public class TopicsConsumerImplTest extends
ProducerConsumerBase {
IntStream.range(0, 6).forEach(index ->
assertEquals(consumers.get(index).getTopic(), topics.get(index)));
- assertEquals(((MultiTopicsConsumerImpl<byte[]>)
consumer).getTopics().size(), 3);
+ assertEquals(((MultiTopicsConsumerImpl<byte[]>)
consumer).getPartitionedTopics().size(), 2);
consumer.unsubscribe();
consumer.close();
@@ -563,12 +563,12 @@ public class TopicsConsumerImplTest extends
ProducerConsumerBase {
assertEquals(messageSet, totalMessages * 2 / 3);
// 7. use getter to verify internal topics number after un-subscribe
topic3
- List<String> topics = ((MultiTopicsConsumerImpl<byte[]>)
consumer).getPartitionedTopics();
+ List<String> topics = ((MultiTopicsConsumerImpl<byte[]>)
consumer).getPartitions();
List<ConsumerImpl<byte[]>> consumers = ((MultiTopicsConsumerImpl)
consumer).getConsumers();
assertEquals(topics.size(), 3);
assertEquals(consumers.size(), 3);
- assertEquals(((MultiTopicsConsumerImpl<byte[]>)
consumer).getTopics().size(), 2);
+ assertEquals(((MultiTopicsConsumerImpl<byte[]>)
consumer).getPartitionedTopics().size(), 1);
// 8. re-subscribe topic3
CompletableFuture<Void> subFuture =
((MultiTopicsConsumerImpl<byte[]>)consumer).subscribeAsync(topicName3, true);
@@ -594,12 +594,12 @@ public class TopicsConsumerImplTest extends
ProducerConsumerBase {
assertEquals(messageSet, totalMessages);
// 11. use getter to verify internal topics number after subscribe
topic3
- topics = ((MultiTopicsConsumerImpl<byte[]>)
consumer).getPartitionedTopics();
+ topics = ((MultiTopicsConsumerImpl<byte[]>) consumer).getPartitions();
consumers = ((MultiTopicsConsumerImpl) consumer).getConsumers();
assertEquals(topics.size(), 6);
assertEquals(consumers.size(), 6);
- assertEquals(((MultiTopicsConsumerImpl<byte[]>)
consumer).getTopics().size(), 3);
+ assertEquals(((MultiTopicsConsumerImpl<byte[]>)
consumer).getPartitionedTopics().size(), 2);
consumer.unsubscribe();
consumer.close();
@@ -1181,20 +1181,20 @@ public class TopicsConsumerImplTest extends
ProducerConsumerBase {
.subscribe();
Assert.assertEquals(consumer.getPartitionsOfTheTopicMap(), 3);
- Assert.assertEquals(consumer.allTopicPartitionsNumber.intValue(), 3);
+ Assert.assertEquals(consumer.getConsumers().size(), 3);
admin.topics().deletePartitionedTopic(topicName, true);
consumer.getPartitionsAutoUpdateTimeout().task().run(consumer.getPartitionsAutoUpdateTimeout());
Awaitility.await().untilAsserted(() -> {
Assert.assertEquals(consumer.getPartitionsOfTheTopicMap(), 0);
- Assert.assertEquals(consumer.allTopicPartitionsNumber.intValue(),
0);
+ Assert.assertEquals(consumer.getConsumers().size(), 0);
});
admin.topics().createPartitionedTopic(topicName, 7);
consumer.getPartitionsAutoUpdateTimeout().task().run(consumer.getPartitionsAutoUpdateTimeout());
Awaitility.await().untilAsserted(() -> {
Assert.assertEquals(consumer.getPartitionsOfTheTopicMap(), 7);
- Assert.assertEquals(consumer.allTopicPartitionsNumber.intValue(),
7);
+ Assert.assertEquals(consumer.getConsumers().size(), 7);
});
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index df6a830..63027a5 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -41,6 +41,7 @@ import org.apache.pulsar.client.util.ConsumerName;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.util.CompletableFutureCancellationHandler;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
@@ -52,6 +53,7 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -80,7 +82,7 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
private final ConcurrentHashMap<String, ConsumerImpl<T>> consumers;
// Map <topic, numPartitions>, store partition number for each topic
- protected final ConcurrentHashMap<String, Integer> topics;
+ protected final ConcurrentHashMap<String, Integer> partitionedTopics;
// Queue of partition consumers on which we have stopped calling
receiveAsync() because the
// shared incoming queue was full
@@ -141,7 +143,7 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
checkArgument(conf.getReceiverQueueSize() > 0,
"Receiver queue size needs to be greater than 0 for Topics
Consumer");
- this.topics = new ConcurrentHashMap<>();
+ this.partitionedTopics = new ConcurrentHashMap<>();
this.consumers = new ConcurrentHashMap<>();
this.pausedConsumers = new ConcurrentLinkedQueue<>();
this.sharedQueueResumeThreshold = maxReceiverQueueSize / 2;
@@ -805,7 +807,7 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
private void removeTopic(String topic) {
String fullTopicName = getFullTopicName(topic);
if (fullTopicName != null) {
- topics.remove(topic);
+ partitionedTopics.remove(topic);
}
}
@@ -817,7 +819,8 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
new PulsarClientException.AlreadyClosedException("Topic
name not valid"));
}
String fullTopicName = topicNameInstance.toString();
- if (topics.containsKey(fullTopicName) ||
topics.containsKey(topicNameInstance.getPartitionedTopicName())) {
+ if (consumers.containsKey(fullTopicName)
+ ||
partitionedTopics.containsKey(topicNameInstance.getPartitionedTopicName())) {
return FutureUtil.failedFuture(
new PulsarClientException.AlreadyClosedException("Already
subscribed to " + topicName));
}
@@ -881,7 +884,8 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
new PulsarClientException.AlreadyClosedException("Topic
name not valid"));
}
String fullTopicName = topicNameInstance.toString();
- if (topics.containsKey(fullTopicName) ||
topics.containsKey(topicNameInstance.getPartitionedTopicName())) {
+ if (consumers.containsKey(fullTopicName)
+ ||
partitionedTopics.containsKey(topicNameInstance.getPartitionedTopicName())) {
return FutureUtil.failedFuture(
new PulsarClientException.AlreadyClosedException("Already
subscribed to " + topicName));
}
@@ -916,10 +920,11 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
}
List<CompletableFuture<Consumer<T>>> futureList;
- if (numPartitions > 0) {
+ if (numPartitions != PartitionedTopicMetadata.NON_PARTITIONED) {
// Below condition is true if subscribeAsync() has been invoked
second time with same
// topicName before the first invocation had reached this point.
- boolean isTopicBeingSubscribedForInOtherThread =
this.topics.putIfAbsent(topicName, numPartitions) != null;
+ boolean isTopicBeingSubscribedForInOtherThread =
+ partitionedTopics.putIfAbsent(topicName, numPartitions) !=
null;
if (isTopicBeingSubscribedForInOtherThread) {
String errorMessage = String.format("[%s] Failed to subscribe
for topic [%s] in topics consumer. "
+ "Topic is already being subscribed for in other
thread.", topic, topicName);
@@ -955,27 +960,31 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
})
.collect(Collectors.toList());
} else {
- boolean isTopicBeingSubscribedForInOtherThread =
this.topics.putIfAbsent(topicName, 1) != null;
- if (isTopicBeingSubscribedForInOtherThread) {
- String errorMessage = String.format("[%s] Failed to subscribe
for topic [%s] in topics consumer. "
- + "Topic is already being subscribed for in other
thread.", topic, topicName);
- log.warn(errorMessage);
- subscribeResult.completeExceptionally(new
PulsarClientException(errorMessage));
- return;
- }
allTopicPartitionsNumber.incrementAndGet();
CompletableFuture<Consumer<T>> subFuture = new
CompletableFuture<>();
- ConsumerImpl<T> newConsumer = ConsumerImpl.newConsumerImpl(client,
topicName, internalConfig,
- client.externalExecutorProvider(), -1,
- true, subFuture, null, schema, interceptors,
- createIfDoesNotExist);
- synchronized (pauseMutex) {
- if (paused) {
- newConsumer.pause();
+
+ consumers.compute(topicName, (key, existingValue) -> {
+ if (existingValue != null) {
+ String errorMessage = String.format("[%s] Failed to
subscribe for topic [%s] in topics consumer. "
+ + "Topic is already being subscribed for in other
thread.", topic, topicName);
+ log.warn(errorMessage);
+ subscribeResult.completeExceptionally(new
PulsarClientException(errorMessage));
+ return existingValue;
+ } else {
+ ConsumerImpl<T> newConsumer =
ConsumerImpl.newConsumerImpl(client, topicName, internalConfig,
+ client.externalExecutorProvider(), -1,
+ true, subFuture, null, schema, interceptors,
+ createIfDoesNotExist);
+
+ synchronized (pauseMutex) {
+ if (paused) {
+ newConsumer.pause();
+ }
+ }
+ return newConsumer;
}
- consumers.putIfAbsent(newConsumer.getTopic(), newConsumer);
- }
+ });
futureList = Collections.singletonList(subFuture);
}
@@ -985,11 +994,6 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
if (allTopicPartitionsNumber.get() > maxReceiverQueueSize) {
setMaxReceiverQueueSize(allTopicPartitionsNumber.get());
}
- int numTopics =
this.topics.values().stream().mapToInt(Integer::intValue).sum();
- int currentAllTopicsPartitionsNumber =
allTopicPartitionsNumber.get();
- checkState(currentAllTopicsPartitionsNumber == numTopics,
- "allTopicPartitionsNumber " +
currentAllTopicsPartitionsNumber
- + " not equals expected: " + numTopics);
// We have successfully created new consumers, so we can start
receiving messages for them
startReceivingMessages(
@@ -1144,12 +1148,12 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
// get topics name
- public List<String> getTopics() {
- return topics.keySet().stream().collect(Collectors.toList());
+ public List<String> getPartitionedTopics() {
+ return
partitionedTopics.keySet().stream().collect(Collectors.toList());
}
// get partitioned topics name
- public List<String> getPartitionedTopics() {
+ public List<String> getPartitions() {
return consumers.keySet().stream().collect(Collectors.toList());
}
@@ -1160,7 +1164,7 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
// get all partitions that in the topics map
int getPartitionsOfTheTopicMap() {
- return topics.values().stream().mapToInt(Integer::intValue).sum();
+ return
partitionedTopics.values().stream().mapToInt(Integer::intValue).sum();
}
@Override
@@ -1221,17 +1225,11 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
// subscribe increased partitions for a given topic
private CompletableFuture<Void> subscribeIncreasedTopicPartitions(String
topicName) {
- CompletableFuture<Void> future = new CompletableFuture<>();
+ int oldPartitionNumber = partitionedTopics.get(topicName);
- //Drop the disconnected consumers to allow the auto discovery
- consumers.entrySet().removeIf(e ->
TopicName.get(e.getKey()).getPartitionedTopicName().equals(topicName) &&
!e.getValue().isConnected());
- final int connectedPartitions =
Long.valueOf(consumers.entrySet().stream().filter(e ->
e.getKey().contains(topicName)).count()).intValue();
- allTopicPartitionsNumber.set(allTopicPartitionsNumber.get() -
(topics.get(topicName) - connectedPartitions));
- topics.put(topicName, connectedPartitions);
-
- client.getPartitionsForTopic(topicName).thenCompose(list -> {
- int oldPartitionNumber = topics.get(topicName);
- int currentPartitionNumber = Long.valueOf(list.stream().filter(t
-> TopicName.get(t).isPartitioned()).count()).intValue();
+ return client.getPartitionsForTopic(topicName).thenCompose(list -> {
+ int currentPartitionNumber = Long.valueOf(list.stream()
+ .filter(t ->
TopicName.get(t).isPartitioned()).count()).intValue();
if (log.isDebugEnabled()) {
log.debug("[{}] partitions number. old: {}, new: {}",
@@ -1240,11 +1238,28 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
if (oldPartitionNumber == currentPartitionNumber) {
// topic partition number not changed
- future.complete(null);
- return future;
+ return CompletableFuture.completedFuture(null);
+ } else if (currentPartitionNumber ==
PartitionedTopicMetadata.NON_PARTITIONED) {
+ // The topic was initially partitioned but then it was
deleted. We keep it in the topics
+ partitionedTopics.put(topicName, 0);
+
+ allTopicPartitionsNumber.addAndGet(-oldPartitionNumber);
+ List<CompletableFuture<Void>> futures = new ArrayList<>();
+ for (Iterator<Map.Entry<String, ConsumerImpl<T>>> it =
consumers.entrySet().iterator(); it.hasNext();) {
+ Map.Entry<String, ConsumerImpl<T>> e = it.next();
+ String partitionedTopicName =
TopicName.get(e.getKey()).getPartitionedTopicName();
+
+ // Remove the consumers that belong to the deleted
partitioned topic
+ if (partitionedTopicName.equals(topicName)) {
+ futures.add(e.getValue().closeAsync());
+ consumers.remove(e.getKey());
+ }
+ }
+
+ return FutureUtil.waitForAll(futures);
} else if (oldPartitionNumber < currentPartitionNumber) {
allTopicPartitionsNumber.addAndGet(currentPartitionNumber -
oldPartitionNumber);
- topics.put(topicName, currentPartitionNumber);
+ partitionedTopics.put(topicName, currentPartitionNumber);
List<String> newPartitions = list.subList(oldPartitionNumber,
currentPartitionNumber);
// subscribe new added partitions
List<CompletableFuture<Consumer<T>>> futureList = newPartitions
@@ -1273,29 +1288,19 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
.collect(Collectors.toList());
// wait for all partitions subscribe future complete, then
startReceivingMessages
- FutureUtil.waitForAll(futureList)
+ return FutureUtil.waitForAll(futureList)
.thenAccept(finalFuture -> {
List<ConsumerImpl<T>> newConsumerList =
newPartitions.stream()
.map(partitionTopic ->
consumers.get(partitionTopic))
.collect(Collectors.toList());
startReceivingMessages(newConsumerList);
- future.complete(null);
- })
- .exceptionally(ex -> {
- log.warn("[{}] Failed to subscribe {} partition: {} -
{} : {}",
- topic, topicName, oldPartitionNumber,
currentPartitionNumber, ex);
- future.completeExceptionally(ex);
- return null;
});
} else {
log.error("[{}] not support shrink topic partitions. old: {},
new: {}",
topicName, oldPartitionNumber, currentPartitionNumber);
- future.completeExceptionally(new NotSupportedException("not
support shrink topic partitions"));
+ return FutureUtil.failedFuture(new NotSupportedException("not
support shrink topic partitions"));
}
- return future;
});
-
- return future;
}
private TimerTask partitionsAutoUpdateTimerTask = new TimerTask() {
@@ -1311,7 +1316,7 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
// if last auto update not completed yet, do nothing.
if (partitionsAutoUpdateFuture == null ||
partitionsAutoUpdateFuture.isDone()) {
- partitionsAutoUpdateFuture =
topicsPartitionChangedListener.onTopicsExtended(topics.keySet());
+ partitionsAutoUpdateFuture =
topicsPartitionChangedListener.onTopicsExtended(partitionedTopics.keySet());
}
// schedule the next re-check task
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
index 7769d70..2f946af 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
@@ -92,7 +92,14 @@ public class PatternMultiTopicsConsumerImpl<T> extends
MultiTopicsConsumerImpl<T
}
List<String> newTopics =
PulsarClientImpl.topicsPatternFilter(topics, topicsPattern);
- List<String> oldTopics =
PatternMultiTopicsConsumerImpl.this.getTopics();
+ List<String> oldTopics = Lists.newArrayList();
+ oldTopics.addAll(getPartitionedTopics());
+ getPartitions().forEach(p -> {
+ TopicName t = TopicName.get(p);
+ if (!t.isPartitioned() ||
!oldTopics.contains(t.getPartitionedTopicName())) {
+ oldTopics.add(p);
+ }
+ });
futures.add(topicsChangeListener.onTopicsAdded(topicsListsMinus(newTopics,
oldTopics)));
futures.add(topicsChangeListener.onTopicsRemoved(topicsListsMinus(oldTopics,
newTopics)));
@@ -131,7 +138,7 @@ public class PatternMultiTopicsConsumerImpl<T> extends
MultiTopicsConsumerImpl<T
return removeFuture;
}
- List<CompletableFuture<Void>> futures =
Lists.newArrayListWithExpectedSize(topics.size());
+ List<CompletableFuture<Void>> futures =
Lists.newArrayListWithExpectedSize(partitionedTopics.size());
removedTopics.stream().forEach(topic ->
futures.add(removeConsumerAsync(topic)));
FutureUtil.waitForAll(futures)
.thenAccept(finalFuture -> removeFuture.complete(null))
@@ -152,7 +159,7 @@ public class PatternMultiTopicsConsumerImpl<T> extends
MultiTopicsConsumerImpl<T
return addFuture;
}
- List<CompletableFuture<Void>> futures =
Lists.newArrayListWithExpectedSize(topics.size());
+ List<CompletableFuture<Void>> futures =
Lists.newArrayListWithExpectedSize(partitionedTopics.size());
addedTopics.stream().forEach(topic ->
futures.add(subscribeAsync(topic, false /* createTopicIfDoesNotExist */)));
FutureUtil.waitForAll(futures)
.thenAccept(finalFuture -> addFuture.complete(null))
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
index dc70d15..745aa1c 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
@@ -20,7 +20,11 @@ package org.apache.pulsar.client.impl;
import com.google.common.collect.Sets;
import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.util.HashedWheelTimer;
+import io.netty.util.Timer;
import io.netty.util.concurrent.DefaultThreadFactory;
+import lombok.Cleanup;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Messages;
@@ -31,6 +35,7 @@ import
org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
+import org.mockito.internal.verification.VerificationModeFactory;
import org.testng.annotations.Test;
import java.util.Arrays;
@@ -63,9 +68,14 @@ public class MultiTopicsConsumerImplTest {
conf.setStatsIntervalSeconds(100);
ThreadFactory threadFactory = new
DefaultThreadFactory("client-test-stats", Thread.currentThread().isDaemon());
+
+ @Cleanup("shutdown")
EventLoopGroup eventLoopGroup =
EventLoopUtil.newEventLoopGroup(conf.getNumIoThreads(), false, threadFactory);
+
+ @Cleanup("shutdownNow")
ExecutorProvider executorProvider = new ExecutorProvider(1,
"client-test-stats");
+ @Cleanup
PulsarClientImpl clientImpl = new PulsarClientImpl(conf,
eventLoopGroup);
ConsumerConfigurationData consumerConfData = new
ConsumerConfigurationData();
@@ -169,4 +179,37 @@ public class MultiTopicsConsumerImplTest {
verify(clientMock, times(1)).cleanupConsumer(any());
}
+ @Test
+ public void testDontCheckForPartitionsUpdatesOnNonPartitionedTopics()
throws Exception {
+ ExecutorProvider executorProvider = mock(ExecutorProvider.class);
+ ConsumerConfigurationData<byte[]> consumerConfData = new
ConsumerConfigurationData<>();
+ consumerConfData.setSubscriptionName("subscriptionName");
+ consumerConfData.setTopicNames(new HashSet<>(Arrays.asList("a", "b",
"c")));
+ consumerConfData.setAutoUpdatePartitionsIntervalSeconds(1);
+ consumerConfData.setAutoUpdatePartitions(true);
+
+ @Cleanup("stop")
+ Timer timer = new HashedWheelTimer();
+
+ PulsarClientImpl clientMock =
createPulsarClientMockWithMockedClientCnx();
+ when(clientMock.timer()).thenReturn(timer);
+ when(clientMock.preProcessSchemaBeforeSubscribe(any(), any(), any()))
+ .thenReturn(CompletableFuture.completedFuture(null));
+
+ // Simulate non partitioned topics
+ PartitionedTopicMetadata metadata = new PartitionedTopicMetadata(0);
+
when(clientMock.getPartitionedTopicMetadata(any())).thenReturn(CompletableFuture.completedFuture(metadata));
+ CompletableFuture<Consumer<byte[]>> completeFuture = new
CompletableFuture<>();
+
+ MultiTopicsConsumerImpl<byte[]> impl = new MultiTopicsConsumerImpl<>(
+ clientMock, consumerConfData, executorProvider,
+ completeFuture, Schema.BYTES, null, true);
+ impl.setState(HandlerState.State.Ready);
+ Thread.sleep(5000);
+
+ // getPartitionedTopicMetadata should have been called only the first
time, for each of the 3 topics,
+ // but not anymore since the topics are not partitioned.
+ verify(clientMock, times(3)).getPartitionedTopicMetadata(any());
+ }
+
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/partition/PartitionedTopicMetadata.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/partition/PartitionedTopicMetadata.java
index 1f023de..024ca0a 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/partition/PartitionedTopicMetadata.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/partition/PartitionedTopicMetadata.java
@@ -34,4 +34,9 @@ public class PartitionedTopicMetadata {
this.partitions = partitions;
}
+ /**
+ * A topic with '0' partitions is treated like non-partitioned topic.
+ */
+ public static final int NON_PARTITIONED = 0;
+
}