This is an automated email from the ASF dual-hosted git repository.
bogong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new 8fa9ab160d7 Optimize conusmer pause (#14566)
8fa9ab160d7 is described below
commit 8fa9ab160d7faff94e7c9591474f491f6e9ceb26
Author: JiangHaiting <[email protected]>
AuthorDate: Wed Mar 23 10:23:31 2022 +0800
Optimize conusmer pause (#14566)
(cherry picked from commit a32edc7aac67804195c69736277c826bd16c5268)
---
.../org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java | 9 +++++++++
1 file changed, 9 insertions(+)
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 2c3d8cb03a6..057eda176bd 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -978,6 +978,7 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
partitionIndex -> {
String partitionName =
TopicName.get(topicName).getPartition(partitionIndex).toString();
CompletableFuture<Consumer<T>> subFuture = new
CompletableFuture<>();
+ configurationData.setStartPaused(paused);
ConsumerImpl<T> newConsumer =
ConsumerImpl.newConsumerImpl(client, partitionName,
configurationData,
client.externalExecutorProvider(),
partitionIndex, true, listener != null,
subFuture,
@@ -986,6 +987,8 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
synchronized (pauseMutex) {
if (paused) {
newConsumer.pause();
+ } else {
+ newConsumer.resume();
}
consumers.putIfAbsent(newConsumer.getTopic(),
newConsumer);
}
@@ -1005,6 +1008,7 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
subscribeResult.completeExceptionally(new
PulsarClientException(errorMessage));
return existingValue;
} else {
+ internalConfig.setStartPaused(paused);
ConsumerImpl<T> newConsumer =
ConsumerImpl.newConsumerImpl(client, topicName, internalConfig,
client.externalExecutorProvider(), -1,
true, listener != null, subFuture, startMessageId,
schema, interceptors,
@@ -1013,6 +1017,8 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
synchronized (pauseMutex) {
if (paused) {
newConsumer.pause();
+ } else {
+ newConsumer.resume();
}
}
return newConsumer;
@@ -1304,6 +1310,7 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
int partitionIndex =
TopicName.getPartitionIndex(partitionName);
CompletableFuture<Consumer<T>> subFuture = new
CompletableFuture<>();
ConsumerConfigurationData<T> configurationData =
getInternalConsumerConfig();
+ configurationData.setStartPaused(paused);
ConsumerImpl<T> newConsumer =
ConsumerImpl.newConsumerImpl(
client, partitionName, configurationData,
client.externalExecutorProvider(),
@@ -1312,6 +1319,8 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
synchronized (pauseMutex) {
if (paused) {
newConsumer.pause();
+ } else {
+ newConsumer.resume();
}
consumers.putIfAbsent(newConsumer.getTopic(),
newConsumer);
}