This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new dcb8070ea81 [cleanup][client] Remove unnecessary pause/resume logic
from MultiTopicsConsumerImpl and cleanup putIfAbsent logic (#25009)
dcb8070ea81 is described below
commit dcb8070ea8191f5ccd578115637cfb5d3da407e2
Author: Lari Hotari <[email protected]>
AuthorDate: Thu Dec 4 13:55:37 2025 +0200
[cleanup][client] Remove unnecessary pause/resume logic from
MultiTopicsConsumerImpl and cleanup putIfAbsent logic (#25009)
---
.../client/impl/MultiTopicsConsumerImpl.java | 110 +++++++++------------
1 file changed, 49 insertions(+), 61 deletions(-)
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 9e8e12818c6..8061da4e91c 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -49,6 +49,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.function.Predicate;
+import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -1130,26 +1131,9 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
List<CompletableFuture<Consumer<T>>> subscribeList = new
ArrayList<>();
for (int partitionIndex : partitions) {
String partitionName =
TopicName.get(topicName).getPartition(partitionIndex).toString();
- CompletableFuture<Consumer<T>> subFuture = new
CompletableFuture<>();
- configurationData.setStartPaused(paused);
- ConsumerImpl<T> newConsumer =
createInternalConsumer(configurationData, partitionName,
- partitionIndex, subFuture, createIfDoesNotExist,
schema);
- synchronized (pauseMutex) {
- if (paused) {
- newConsumer.pause();
- } else {
- newConsumer.resume();
- }
- Consumer originalValue =
consumers.putIfAbsent(newConsumer.getTopic(), newConsumer);
- if (originalValue != null) {
- newConsumer.closeAsync().exceptionally(ex -> {
- log.error("[{}] [{}] Failed to close the
orphan consumer",
- partitionName, subscription, ex);
- return null;
- });
- }
- }
- subscribeList.add(subFuture);
+ subscribeList.add(addNewConsumerIfNotExists(partitionName,
+ () -> createInternalConsumer(configurationData,
partitionName, partitionIndex,
+ new CompletableFuture<>(),
createIfDoesNotExist, schema)));
}
return FutureUtil.waitForAll(subscribeList);
});
@@ -1159,29 +1143,20 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
CompletableFuture<Consumer<T>> subscribeFuture = new
CompletableFuture<>();
subscribeAllPartitionsFuture = subscribeFuture.thenAccept(__ ->
{});
- synchronized (pauseMutex) {
- consumers.compute(topicName, (key, existingValue) -> {
- if (existingValue != null) {
- String errorMessage =
- String.format("[%s] Failed to subscribe for
topic [%s] in topics consumer. "
- + "Topic is already being subscribed for in
other thread.", topic, topicName);
- log.warn(errorMessage);
- subscribeResult.completeExceptionally(new
PulsarClientException(errorMessage));
- return existingValue;
- } else {
- internalConfig.setStartPaused(paused);
- ConsumerImpl<T> newConsumer =
createInternalConsumer(internalConfig, topicName,
- -1, subscribeFuture, createIfDoesNotExist,
schema);
- if (paused) {
- newConsumer.pause();
- } else {
- newConsumer.resume();
- }
- return newConsumer;
- }
- });
- }
-
+ consumers.compute(topicName, (key, existingValue) -> {
+ if (existingValue != null) {
+ String errorMessage =
+ String.format("[%s] Failed to subscribe for topic
[%s] in topics consumer. "
+ + "Topic is already being subscribed for in other
thread.", topic, topicName);
+ log.warn(errorMessage);
+ subscribeResult.completeExceptionally(new
PulsarClientException(errorMessage));
+ return existingValue;
+ } else {
+ ConsumerImpl<T> newConsumer =
createInternalConsumer(internalConfig, topicName,
+ -1, subscribeFuture, createIfDoesNotExist, schema);
+ return newConsumer;
+ }
+ });
}
subscribeAllPartitionsFuture.thenAccept(finalFuture -> {
@@ -1221,6 +1196,7 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
.timeout(1, TimeUnit.MILLISECONDS)
.build();
configurationData.setBatchReceivePolicy(internalBatchReceivePolicy);
+ configurationData.setStartPaused(paused);
configurationData = configurationData.clone();
return ConsumerImpl.newConsumerImpl(client, partitionName,
configurationData, client.externalExecutorProvider(),
@@ -1444,24 +1420,9 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
.stream()
.map(partitionName -> {
int partitionIndex =
TopicName.getPartitionIndex(partitionName);
- CompletableFuture<Consumer<T>> subFuture = new
CompletableFuture<>();
- ConsumerConfigurationData<T> configurationData =
getInternalConsumerConfig();
- configurationData.setStartPaused(paused);
- ConsumerImpl<T> newConsumer =
createInternalConsumer(configurationData, partitionName,
- partitionIndex, subFuture, true, schema);
- synchronized (pauseMutex) {
- if (paused) {
- newConsumer.pause();
- } else {
- newConsumer.resume();
- }
- consumers.putIfAbsent(newConsumer.getTopic(),
newConsumer);
- }
- if (log.isDebugEnabled()) {
- log.debug("[{}] create consumer {} for
partitionName: {}",
- topicName, newConsumer.getTopic(),
partitionName);
- }
- return subFuture;
+ return addNewConsumerIfNotExists(partitionName,
+ () ->
createInternalConsumer(getInternalConsumerConfig(), partitionName,
+ partitionIndex, new
CompletableFuture<>(), true, schema));
})
.collect(Collectors.toList());
// call interceptor
@@ -1485,6 +1446,33 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
});
}
+ private CompletableFuture<Consumer<T>> addNewConsumerIfNotExists(String
internalTopicName,
+
Supplier<ConsumerImpl<T>> newConsumerSupplier) {
+ ConsumerImpl<T> consumer = consumers.compute(internalTopicName, (__,
existingConsumer) -> {
+ if (existingConsumer != null) {
+ if
(existingConsumer.subscribeFuture().isCompletedExceptionally()) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}][{}] Closing and replacing existing
consumer that wasn't completed successfully "
+ + "for {}", topic, subscription,
internalTopicName);
+ }
+ existingConsumer.closeAsync();
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}][{}] Reusing existing consumer for {}",
topic, subscription, internalTopicName);
+ }
+ return existingConsumer;
+ }
+ }
+ // create the new consumer
+ if (log.isDebugEnabled()) {
+ log.debug("[{}][{}] Creating consumer for {}", topic,
subscription, internalTopicName);
+ }
+ return newConsumerSupplier.get();
+ });
+ // return the subscribe future
+ return consumer.subscribeFuture();
+ }
+
private TimerTask partitionsAutoUpdateTimerTask = new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {