This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new dcb8070ea81 [cleanup][client] Remove unnecessary pause/resume logic 
from MultiTopicsConsumerImpl and cleanup putIfAbsent logic (#25009)
dcb8070ea81 is described below

commit dcb8070ea8191f5ccd578115637cfb5d3da407e2
Author: Lari Hotari <[email protected]>
AuthorDate: Thu Dec 4 13:55:37 2025 +0200

    [cleanup][client] Remove unnecessary pause/resume logic from 
MultiTopicsConsumerImpl and cleanup putIfAbsent logic (#25009)
---
 .../client/impl/MultiTopicsConsumerImpl.java       | 110 +++++++++------------
 1 file changed, 49 insertions(+), 61 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 9e8e12818c6..8061da4e91c 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -49,6 +49,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Function;
 import java.util.function.Predicate;
+import java.util.function.Supplier;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
@@ -1130,26 +1131,9 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
                 List<CompletableFuture<Consumer<T>>> subscribeList = new 
ArrayList<>();
                 for (int partitionIndex : partitions) {
                     String partitionName = 
TopicName.get(topicName).getPartition(partitionIndex).toString();
-                    CompletableFuture<Consumer<T>> subFuture = new 
CompletableFuture<>();
-                    configurationData.setStartPaused(paused);
-                    ConsumerImpl<T> newConsumer = 
createInternalConsumer(configurationData, partitionName,
-                            partitionIndex, subFuture, createIfDoesNotExist, 
schema);
-                    synchronized (pauseMutex) {
-                        if (paused) {
-                            newConsumer.pause();
-                        } else {
-                            newConsumer.resume();
-                        }
-                        Consumer originalValue = 
consumers.putIfAbsent(newConsumer.getTopic(), newConsumer);
-                        if (originalValue != null) {
-                            newConsumer.closeAsync().exceptionally(ex -> {
-                                log.error("[{}] [{}] Failed to close the 
orphan consumer",
-                                        partitionName, subscription, ex);
-                                return null;
-                            });
-                        }
-                    }
-                    subscribeList.add(subFuture);
+                    subscribeList.add(addNewConsumerIfNotExists(partitionName,
+                            () -> createInternalConsumer(configurationData, 
partitionName, partitionIndex,
+                                    new CompletableFuture<>(), 
createIfDoesNotExist, schema)));
                 }
                 return FutureUtil.waitForAll(subscribeList);
             });
@@ -1159,29 +1143,20 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
             CompletableFuture<Consumer<T>> subscribeFuture = new 
CompletableFuture<>();
             subscribeAllPartitionsFuture = subscribeFuture.thenAccept(__ -> 
{});
 
-            synchronized (pauseMutex) {
-                consumers.compute(topicName, (key, existingValue) -> {
-                    if (existingValue != null) {
-                        String errorMessage =
-                                String.format("[%s] Failed to subscribe for 
topic [%s] in topics consumer. "
-                                + "Topic is already being subscribed for in 
other thread.", topic, topicName);
-                        log.warn(errorMessage);
-                        subscribeResult.completeExceptionally(new 
PulsarClientException(errorMessage));
-                        return existingValue;
-                    } else {
-                        internalConfig.setStartPaused(paused);
-                        ConsumerImpl<T> newConsumer = 
createInternalConsumer(internalConfig, topicName,
-                                -1, subscribeFuture, createIfDoesNotExist, 
schema);
-                        if (paused) {
-                            newConsumer.pause();
-                        } else {
-                            newConsumer.resume();
-                        }
-                        return newConsumer;
-                    }
-                });
-            }
-
+            consumers.compute(topicName, (key, existingValue) -> {
+                if (existingValue != null) {
+                    String errorMessage =
+                            String.format("[%s] Failed to subscribe for topic 
[%s] in topics consumer. "
+                            + "Topic is already being subscribed for in other 
thread.", topic, topicName);
+                    log.warn(errorMessage);
+                    subscribeResult.completeExceptionally(new 
PulsarClientException(errorMessage));
+                    return existingValue;
+                } else {
+                    ConsumerImpl<T> newConsumer = 
createInternalConsumer(internalConfig, topicName,
+                            -1, subscribeFuture, createIfDoesNotExist, schema);
+                    return newConsumer;
+                }
+            });
         }
 
         subscribeAllPartitionsFuture.thenAccept(finalFuture -> {
@@ -1221,6 +1196,7 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
                 .timeout(1, TimeUnit.MILLISECONDS)
                 .build();
         configurationData.setBatchReceivePolicy(internalBatchReceivePolicy);
+        configurationData.setStartPaused(paused);
         configurationData = configurationData.clone();
         return ConsumerImpl.newConsumerImpl(client, partitionName,
                 configurationData, client.externalExecutorProvider(),
@@ -1444,24 +1420,9 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
                     .stream()
                     .map(partitionName -> {
                         int partitionIndex = 
TopicName.getPartitionIndex(partitionName);
-                        CompletableFuture<Consumer<T>> subFuture = new 
CompletableFuture<>();
-                        ConsumerConfigurationData<T> configurationData = 
getInternalConsumerConfig();
-                        configurationData.setStartPaused(paused);
-                        ConsumerImpl<T> newConsumer = 
createInternalConsumer(configurationData, partitionName,
-                                partitionIndex, subFuture, true, schema);
-                        synchronized (pauseMutex) {
-                            if (paused) {
-                                newConsumer.pause();
-                            } else {
-                                newConsumer.resume();
-                            }
-                            consumers.putIfAbsent(newConsumer.getTopic(), 
newConsumer);
-                        }
-                        if (log.isDebugEnabled()) {
-                            log.debug("[{}] create consumer {} for 
partitionName: {}",
-                                    topicName, newConsumer.getTopic(), 
partitionName);
-                        }
-                        return subFuture;
+                        return addNewConsumerIfNotExists(partitionName,
+                                () -> 
createInternalConsumer(getInternalConsumerConfig(), partitionName,
+                                        partitionIndex, new 
CompletableFuture<>(), true, schema));
                     })
                     .collect(Collectors.toList());
                 // call interceptor
@@ -1485,6 +1446,33 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
         });
     }
 
+    private CompletableFuture<Consumer<T>> addNewConsumerIfNotExists(String 
internalTopicName,
+                                                                     
Supplier<ConsumerImpl<T>> newConsumerSupplier) {
+        ConsumerImpl<T> consumer = consumers.compute(internalTopicName, (__, 
existingConsumer) -> {
+            if (existingConsumer != null) {
+                if 
(existingConsumer.subscribeFuture().isCompletedExceptionally()) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("[{}][{}] Closing and replacing existing 
consumer that wasn't completed successfully "
+                                + "for {}", topic, subscription, 
internalTopicName);
+                    }
+                    existingConsumer.closeAsync();
+                } else {
+                    if (log.isDebugEnabled()) {
+                        log.debug("[{}][{}] Reusing existing consumer for {}", 
topic, subscription, internalTopicName);
+                    }
+                    return existingConsumer;
+                }
+            }
+            // create the new consumer
+            if (log.isDebugEnabled()) {
+                log.debug("[{}][{}] Creating consumer for {}", topic, 
subscription, internalTopicName);
+            }
+            return newConsumerSupplier.get();
+        });
+        // return the subscribe future
+        return consumer.subscribeFuture();
+    }
+
     private TimerTask partitionsAutoUpdateTimerTask = new TimerTask() {
         @Override
         public void run(Timeout timeout) throws Exception {

Reply via email to