This is an automated email from the ASF dual-hosted git repository.

bogong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new 8fa9ab160d7 Optimize conusmer pause (#14566)
8fa9ab160d7 is described below

commit 8fa9ab160d7faff94e7c9591474f491f6e9ceb26
Author: JiangHaiting <[email protected]>
AuthorDate: Wed Mar 23 10:23:31 2022 +0800

    Optimize conusmer pause (#14566)
    
    (cherry picked from commit a32edc7aac67804195c69736277c826bd16c5268)
---
 .../org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java   | 9 +++++++++
 1 file changed, 9 insertions(+)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 2c3d8cb03a6..057eda176bd 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -978,6 +978,7 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
                     partitionIndex -> {
                         String partitionName = 
TopicName.get(topicName).getPartition(partitionIndex).toString();
                         CompletableFuture<Consumer<T>> subFuture = new 
CompletableFuture<>();
+                        configurationData.setStartPaused(paused);
                         ConsumerImpl<T> newConsumer = 
ConsumerImpl.newConsumerImpl(client, partitionName,
                                     configurationData, 
client.externalExecutorProvider(),
                                     partitionIndex, true, listener != null, 
subFuture,
@@ -986,6 +987,8 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
                         synchronized (pauseMutex) {
                             if (paused) {
                                 newConsumer.pause();
+                            } else {
+                                newConsumer.resume();
                             }
                             consumers.putIfAbsent(newConsumer.getTopic(), 
newConsumer);
                         }
@@ -1005,6 +1008,7 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
                     subscribeResult.completeExceptionally(new 
PulsarClientException(errorMessage));
                     return existingValue;
                 } else {
+                    internalConfig.setStartPaused(paused);
                     ConsumerImpl<T> newConsumer = 
ConsumerImpl.newConsumerImpl(client, topicName, internalConfig,
                             client.externalExecutorProvider(), -1,
                             true, listener != null, subFuture, startMessageId, 
schema, interceptors,
@@ -1013,6 +1017,8 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
                     synchronized (pauseMutex) {
                         if (paused) {
                             newConsumer.pause();
+                        } else {
+                            newConsumer.resume();
                         }
                     }
                     return newConsumer;
@@ -1304,6 +1310,7 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
                         int partitionIndex = 
TopicName.getPartitionIndex(partitionName);
                         CompletableFuture<Consumer<T>> subFuture = new 
CompletableFuture<>();
                         ConsumerConfigurationData<T> configurationData = 
getInternalConsumerConfig();
+                        configurationData.setStartPaused(paused);
                         ConsumerImpl<T> newConsumer = 
ConsumerImpl.newConsumerImpl(
                                 client, partitionName, configurationData,
                                 client.externalExecutorProvider(),
@@ -1312,6 +1319,8 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
                         synchronized (pauseMutex) {
                             if (paused) {
                                 newConsumer.pause();
+                            } else {
+                                newConsumer.resume();
                             }
                             consumers.putIfAbsent(newConsumer.getTopic(), 
newConsumer);
                         }

Reply via email to