This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 04ff2c9ea58 [improve][broker] PIP-442: Add memory limits for topic
list watcher (part 2) (#25070)
04ff2c9ea58 is described below
commit 04ff2c9ea58b6247da932735b4a066e10ac5050f
Author: Lari Hotari <[email protected]>
AuthorDate: Fri Jan 16 17:45:10 2026 +0200
[improve][broker] PIP-442: Add memory limits for topic list watcher (part
2) (#25070)
(cherry picked from commit 805c71de3d07a89569c3a7763b31c9b042f6776f)
---
.../pulsar/broker/service/PulsarCommandSender.java | 17 +-
.../broker/service/PulsarCommandSenderImpl.java | 32 +-
.../apache/pulsar/broker/service/ServerCnx.java | 1 +
.../pulsar/broker/service/TopicListService.java | 441 +++++++++++++++++----
.../broker/service/TopicListServiceTest.java | 294 +++++++++++++-
.../broker/service/TopicListWatcherTest.java | 30 +-
...nConsumerBackPressureMultipleConsumersTest.java | 2 +-
...icWatcherBackPressureMultipleConsumersTest.java | 213 ++++++++++
.../apache/pulsar/common/protocol/Commands.java | 3 +-
.../semaphore/AsyncDualMemoryLimiterImpl.java | 6 +
.../semaphore/AsyncDualMemoryLimiterUtil.java | 11 +-
.../common/semaphore/AsyncSemaphoreImpl.java | 5 +
.../org/apache/pulsar/common/topics/TopicList.java | 23 +-
.../semaphore/AsyncDualMemoryLimiterUtilTest.java | 27 +-
.../pulsar/proxy/server/LookupProxyHandler.java | 1 +
15 files changed, 978 insertions(+), 128 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java
index 8e2deff31d0..ca80ca49d76 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java
@@ -20,10 +20,11 @@ package org.apache.pulsar.broker.service;
import io.netty.util.concurrent.Future;
+import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
-import java.util.function.Consumer;
+import java.util.function.Function;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.proto.CommandLookupTopicResponse;
@@ -55,7 +56,8 @@ public interface PulsarCommandSender {
CompletableFuture<Void> sendGetTopicsOfNamespaceResponse(List<String>
topics, String topicsHash, boolean filtered,
boolean changed,
long requestId,
-
Consumer<Throwable> permitAcquireErrorHandler);
+
Function<Throwable, CompletableFuture<Void>>
+
permitAcquireErrorHandler);
void sendGetSchemaResponse(long requestId, SchemaInfo schema,
SchemaVersion version);
@@ -96,8 +98,13 @@ public interface PulsarCommandSender {
void sendEndTxnErrorResponse(long requestId, TxnID txnID, ServerError
error, String message);
- void sendWatchTopicListSuccess(long requestId, long watcherId, String
topicsHash, List<String> topics);
+ CompletableFuture<Void> sendWatchTopicListSuccess(long requestId, long
watcherId, String topicsHash,
+ Collection<String>
topics,
+ Function<Throwable,
CompletableFuture<Void>>
+
permitAcquireErrorHandler);
- void sendWatchTopicListUpdate(long watcherId,
- List<String> newTopics, List<String>
deletedTopics, String topicsHash);
+ CompletableFuture<Void> sendWatchTopicListUpdate(long watcherId,
+ List<String> newTopics, List<String>
deletedTopics, String topicsHash,
+ Function<Throwable, CompletableFuture<Void>>
+
permitAcquireErrorHandler);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
index 275e049255c..2570cb4431f 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
@@ -24,10 +24,11 @@ import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
-import java.util.function.Consumer;
+import java.util.function.Function;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
@@ -129,7 +130,9 @@ public class PulsarCommandSenderImpl implements
PulsarCommandSender {
@Override
public CompletableFuture<Void>
sendGetTopicsOfNamespaceResponse(List<String> topics, String topicsHash,
boolean
filtered, boolean changed, long requestId,
-
Consumer<Throwable> permitAcquireErrorHandler) {
+
Function<Throwable,
+
CompletableFuture<Void>>
+
permitAcquireErrorHandler) {
BaseCommand command =
Commands.newGetTopicsOfNamespaceResponseCommand(topics, topicsHash,
filtered, changed, requestId);
safeIntercept(command, cnx);
@@ -366,27 +369,32 @@ public class PulsarCommandSenderImpl implements
PulsarCommandSender {
/***
* @param topics topic names which are matching, the topic name contains
the partition suffix.
+ * @return a CompletableFuture<Void> that completes when the
operation finishes
*/
@Override
- public void sendWatchTopicListSuccess(long requestId, long watcherId,
String topicsHash, List<String> topics) {
+ public CompletableFuture<Void> sendWatchTopicListSuccess(long requestId,
long watcherId, String topicsHash,
+
Collection<String> topics,
+
Function<Throwable, CompletableFuture<Void>>
+
permitAcquireErrorHandler) {
BaseCommand command = Commands.newWatchTopicListSuccess(requestId,
watcherId, topicsHash, topics);
- interceptAndWriteCommand(command);
+ safeIntercept(command, cnx);
+ return acquireDirectMemoryPermitsAndWriteAndFlush(cnx.ctx(),
maxTopicListInFlightLimiter, () -> !cnx.isActive(),
+ command, permitAcquireErrorHandler);
}
/***
* {@inheritDoc}
+ * @return a CompletableFuture that completes when the watch topic list
update operation finishes
*/
@Override
- public void sendWatchTopicListUpdate(long watcherId,
- List<String> newTopics, List<String>
deletedTopics, String topicsHash) {
+ public CompletableFuture<Void> sendWatchTopicListUpdate(long watcherId,
List<String> newTopics,
+ List<String>
deletedTopics, String topicsHash,
+
Function<Throwable, CompletableFuture<Void>>
+
permitAcquireErrorHandler) {
BaseCommand command = Commands.newWatchTopicUpdate(watcherId,
newTopics, deletedTopics, topicsHash);
- interceptAndWriteCommand(command);
- }
-
- private void interceptAndWriteCommand(BaseCommand command) {
safeIntercept(command, cnx);
- ByteBuf outBuf = Commands.serializeWithSize(command);
- writeAndFlush(outBuf);
+ return acquireDirectMemoryPermitsAndWriteAndFlush(cnx.ctx(),
maxTopicListInFlightLimiter, () -> !cnx.isActive(),
+ command, permitAcquireErrorHandler);
}
private void writeAndFlush(ByteBuf outBuf) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 991fc8d536c..cada292dbca 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -2695,6 +2695,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
commandSender.sendErrorResponse(requestId,
ServerError.TooManyRequests,
"Cannot
acquire permits for direct memory");
+ return
CompletableFuture.completedFuture(null);
});
}, t -> {
log.warn("[{}] Failed to
acquire heap memory permits for "
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java
index ef2ea284cf7..ec0f9b73e7e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java
@@ -18,78 +18,166 @@
*/
package org.apache.pulsar.broker.service;
+import com.google.common.annotations.VisibleForTesting;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
+import java.util.LinkedHashSet;
import java.util.List;
+import java.util.Set;
+import java.util.concurrent.BlockingDeque;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
+import java.util.function.BooleanSupplier;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.resources.TopicResources;
+import org.apache.pulsar.broker.topiclistlimit.TopicListMemoryLimiter;
+import org.apache.pulsar.broker.topiclistlimit.TopicListSizeResultCache;
+import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
import org.apache.pulsar.common.api.proto.CommandWatchTopicListClose;
import org.apache.pulsar.common.api.proto.ServerError;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.semaphore.AsyncDualMemoryLimiter;
+import org.apache.pulsar.common.semaphore.AsyncSemaphore;
import org.apache.pulsar.common.topics.TopicList;
import org.apache.pulsar.common.topics.TopicsPattern;
import org.apache.pulsar.common.topics.TopicsPatternFactory;
+import org.apache.pulsar.common.util.Backoff;
+import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
import org.apache.pulsar.metadata.api.NotificationType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TopicListService {
-
-
public static class TopicListWatcher implements BiConsumer<String,
NotificationType> {
-
+ // upper bound for buffered topic list updates
+ private static final int DEFAULT_TOPIC_LIST_UPDATE_MAX_QUEUE_SIZE =
10000;
/** Topic names which are matching, the topic name contains the
partition suffix. **/
- private final List<String> matchingTopics;
+ private final Set<String> matchingTopics;
private final TopicListService topicListService;
private final long id;
+ private final NamespaceName namespace;
/** The regexp for the topic name(not contains partition suffix). **/
private final TopicsPattern topicsPattern;
+ private final Executor executor;
+ private volatile boolean closed = false;
+ private boolean sendingInProgress;
+ private final BlockingDeque<Runnable> sendTopicListUpdateTasks;
+ private boolean updatingTopics;
- /***
- * @param topicsPattern The regexp for the topic name(not contains
partition suffix).
- */
public TopicListWatcher(TopicListService topicListService, long id,
- TopicsPattern topicsPattern, List<String>
topics) {
+ NamespaceName namespace, TopicsPattern
topicsPattern, List<String> topics,
+ Executor executor, int
topicListUpdateMaxQueueSize) {
this.topicListService = topicListService;
this.id = id;
+ this.namespace = namespace;
this.topicsPattern = topicsPattern;
- this.matchingTopics = TopicList.filterTopics(topics,
topicsPattern);
+ this.executor = executor;
+ this.matchingTopics =
+ TopicList.filterTopics(topics, topicsPattern,
Collectors.toCollection(LinkedHashSet::new));
+ // start with in progress state since topic list update will be
sent first
+ this.sendingInProgress = true;
+ this.sendTopicListUpdateTasks =
+ new LinkedBlockingDeque<>(topicListUpdateMaxQueueSize);
}
- public List<String> getMatchingTopics() {
- return matchingTopics;
+ public synchronized Collection<String> getMatchingTopics() {
+ return new ArrayList<>(matchingTopics);
}
/***
* @param topicName topic name which contains partition suffix.
*/
@Override
- public void accept(String topicName, NotificationType
notificationType) {
+ public synchronized void accept(String topicName, NotificationType
notificationType) {
+ if (closed || updatingTopics) {
+ return;
+ }
String partitionedTopicName =
TopicName.get(topicName).getPartitionedTopicName();
String domainLessTopicName =
TopicList.removeTopicDomainScheme(partitionedTopicName);
if (topicsPattern.matches(domainLessTopicName)) {
- List<String> newTopics;
- List<String> deletedTopics;
+ List<String> newTopics = Collections.emptyList();
+ List<String> deletedTopics = Collections.emptyList();
if (notificationType == NotificationType.Deleted) {
- newTopics = Collections.emptyList();
- deletedTopics = Collections.singletonList(topicName);
- matchingTopics.remove(topicName);
- } else {
- deletedTopics = Collections.emptyList();
+ if (matchingTopics.remove(topicName)) {
+ deletedTopics = Collections.singletonList(topicName);
+ }
+ } else if (matchingTopics.add(topicName)) {
newTopics = Collections.singletonList(topicName);
- matchingTopics.add(topicName);
}
- String hash = TopicList.calculateHash(matchingTopics);
- topicListService.sendTopicListUpdate(id, hash, deletedTopics,
newTopics);
+ if (!newTopics.isEmpty() || !deletedTopics.isEmpty()) {
+ String hash = TopicList.calculateHash(matchingTopics);
+ sendTopicListUpdate(hash, deletedTopics, newTopics);
+ }
+ }
+ }
+
+ // sends updates one-by-one so that ordering is retained
+ private synchronized void sendTopicListUpdate(String hash,
List<String> deletedTopics, List<String> newTopics) {
+ if (closed || updatingTopics) {
+ return;
+ }
+ Runnable task = () -> topicListService.sendTopicListUpdate(id,
hash, deletedTopics, newTopics,
+ this::sendingCompleted);
+ if (!sendingInProgress) {
+ sendingInProgress = true;
+ executor.execute(task);
+ } else {
+ // if sendTopicListSuccess hasn't completed, add to a queue to
be executed after it completes
+ if (!sendTopicListUpdateTasks.offer(task)) {
+ log.warn("Update queue was full for watcher id {} matching
{}. Performing full refresh.", id,
+ topicsPattern.inputPattern());
+ if (!updatingTopics) {
+ updatingTopics = true;
+ sendTopicListUpdateTasks.clear();
+ matchingTopics.clear();
+ executor.execute(() ->
topicListService.updateTopicListWatcher(this));
+ }
+ }
+ }
+ }
+
+ // callback that triggers sending the next possibly buffered update
+ @VisibleForTesting
+ synchronized void sendingCompleted() {
+ if (closed) {
+ sendTopicListUpdateTasks.clear();
+ return;
+ }
+ // Execute the next task
+ Runnable task = sendTopicListUpdateTasks.poll();
+ if (task != null) {
+ executor.execute(task);
+ } else {
+ sendingInProgress = false;
}
}
+
+ public synchronized void close() {
+ closed = true;
+ sendTopicListUpdateTasks.clear();
+ }
+
+ synchronized void updateTopics(List<String> topics) {
+ matchingTopics.clear();
+ TopicList.filterTopicsToStream(topics,
topicsPattern).forEach(matchingTopics::add);
+ updatingTopics = false;
+ }
}
@@ -97,23 +185,39 @@ public class TopicListService {
private final NamespaceService namespaceService;
private final TopicResources topicResources;
+ private final PulsarService pulsar;
private final ServerCnx connection;
private final boolean enableSubscriptionPatternEvaluation;
private final int maxSubscriptionPatternLength;
+ private final int topicListUpdateMaxQueueSize;
private final ConcurrentLongHashMap<CompletableFuture<TopicListWatcher>>
watchers;
-
+ private final Backoff retryBackoff;
public TopicListService(PulsarService pulsar, ServerCnx connection,
boolean enableSubscriptionPatternEvaluation, int
maxSubscriptionPatternLength) {
+ this(pulsar, connection, enableSubscriptionPatternEvaluation,
maxSubscriptionPatternLength,
+ TopicListWatcher.DEFAULT_TOPIC_LIST_UPDATE_MAX_QUEUE_SIZE);
+ }
+
+ @VisibleForTesting
+ public TopicListService(PulsarService pulsar, ServerCnx connection,
+ boolean enableSubscriptionPatternEvaluation, int
maxSubscriptionPatternLength,
+ int topicListUpdateMaxQueueSize) {
this.namespaceService = pulsar.getNamespaceService();
+ this.pulsar = pulsar;
this.connection = connection;
this.enableSubscriptionPatternEvaluation =
enableSubscriptionPatternEvaluation;
this.maxSubscriptionPatternLength = maxSubscriptionPatternLength;
+ this.topicListUpdateMaxQueueSize = topicListUpdateMaxQueueSize;
this.watchers =
ConcurrentLongHashMap.<CompletableFuture<TopicListWatcher>>newBuilder()
.expectedItems(8)
.concurrencyLevel(1)
.build();
this.topicResources = pulsar.getPulsarResources().getTopicResources();
+ this.retryBackoff = new Backoff(
+ 100, TimeUnit.MILLISECONDS,
+ 25, TimeUnit.SECONDS,
+ 0, TimeUnit.MILLISECONDS);
}
public void inactivate() {
@@ -162,38 +266,16 @@ public class TopicListService {
CompletableFuture<TopicListWatcher> existingWatcherFuture =
watchers.putIfAbsent(watcherId, watcherFuture);
if (existingWatcherFuture != null) {
- if (existingWatcherFuture.isDone() &&
!existingWatcherFuture.isCompletedExceptionally()) {
- TopicListWatcher watcher = existingWatcherFuture.getNow(null);
- log.info("[{}] Watcher with the same id is already created:"
- + " watcherId={}, watcher={}",
- connection.toString(), watcherId, watcher);
- watcherFuture = existingWatcherFuture;
- } else {
- // There was an early request to create a watcher with the
same watcherId. This can happen when
- // client timeout is lower the broker timeouts. We need to
wait until the previous watcher
- // creation request either completes or fails.
- log.warn("[{}] Watcher with id is already present on the
connection,"
- + " consumerId={}", connection.toString(), watcherId);
- ServerError error;
- if (!existingWatcherFuture.isDone()) {
- error = ServerError.ServiceNotReady;
- } else {
- error = ServerError.UnknownError;
- watchers.remove(watcherId, existingWatcherFuture);
- }
- connection.getCommandSender().sendErrorResponse(requestId,
error,
- "Topic list watcher is already present on the
connection");
- lookupSemaphore.release();
- return;
- }
+ log.info("[{}] Watcher with the same watcherId={} is already
created.", connection, watcherId);
+ // use the existing watcher if it's already created
+ watcherFuture = existingWatcherFuture;
} else {
initializeTopicsListWatcher(watcherFuture, namespaceName,
watcherId, topicsPattern);
}
-
CompletableFuture<TopicListWatcher> finalWatcherFuture = watcherFuture;
finalWatcherFuture.thenAccept(watcher -> {
- List<String> topicList = watcher.getMatchingTopics();
+ Collection<String> topicList = watcher.getMatchingTopics();
String hash = TopicList.calculateHash(topicList);
if (hash.equals(topicsHash)) {
topicList = Collections.emptyList();
@@ -203,7 +285,8 @@ public class TopicListService {
"[{}] Received WatchTopicList for namespace
[//{}] by {}",
connection.toString(), namespaceName,
requestId);
}
-
connection.getCommandSender().sendWatchTopicListSuccess(requestId, watcherId,
hash, topicList);
+ sendTopicListSuccessWithPermitAcquiringRetries(watcherId,
requestId, topicList, hash,
+ watcher::sendingCompleted, watcher::close);
lookupSemaphore.release();
})
.exceptionally(ex -> {
@@ -218,30 +301,158 @@ public class TopicListService {
});
}
+ private void sendTopicListSuccessWithPermitAcquiringRetries(long
watcherId, long requestId,
+
Collection<String> topicList,
+ String hash,
+ Runnable
successfulCompletionCallback,
+ Runnable
failedCompletionCallback) {
+ performOperationWithPermitAcquiringRetries(watcherId, "topic list
success", permitAcquireErrorHandler ->
+ () -> connection.getCommandSender()
+ .sendWatchTopicListSuccess(requestId, watcherId, hash,
topicList, permitAcquireErrorHandler)
+ .whenComplete((__, t) -> {
+ if (t != null) {
+ // this is an unexpected case
+ log.warn("[{}] Failed to send topic list
success for watcherId={}. "
+ + "Watcher is not active.",
connection, watcherId, t);
+ failedCompletionCallback.run();
+ } else {
+ // completed successfully, run the callback
+ successfulCompletionCallback.run();
+ }
+ }));
+ }
+
/***
* @param topicsPattern The regexp for the topic name(not contains
partition suffix).
*/
public void
initializeTopicsListWatcher(CompletableFuture<TopicListWatcher> watcherFuture,
NamespaceName namespace, long watcherId, TopicsPattern
topicsPattern) {
- namespaceService.getListOfPersistentTopics(namespace).
- thenApply(topics -> {
- TopicListWatcher watcher = new TopicListWatcher(this,
watcherId, topicsPattern, topics);
- topicResources.registerPersistentTopicListener(namespace,
watcher);
- return watcher;
- }).
- whenComplete((watcher, exception) -> {
- if (exception != null) {
- watcherFuture.completeExceptionally(exception);
- } else {
- if (!watcherFuture.complete(watcher)) {
- log.warn("[{}] Watcher future was already
completed. Deregistering watcherId={}.",
- connection.toString(), watcherId);
-
topicResources.deregisterPersistentTopicListener(watcher);
- }
- }
- });
+ BooleanSupplier isPermitRequestCancelled = () ->
!connection.isActive() || !watchers.containsKey(watcherId);
+ if (isPermitRequestCancelled.getAsBoolean()) {
+ return;
+ }
+ TopicListSizeResultCache.ResultHolder listSizeHolder =
pulsar.getBrokerService().getTopicListSizeResultCache()
+ .getTopicListSize(namespace.toString(),
CommandGetTopicsOfNamespace.Mode.PERSISTENT);
+ AsyncDualMemoryLimiter maxTopicListInFlightLimiter =
pulsar.getBrokerService().getMaxTopicListInFlightLimiter();
+
+ listSizeHolder.getSizeAsync().thenCompose(initialSize -> {
+ // use heap size limiter to avoid broker getting overwhelmed by a
lot of concurrent topic list requests
+ return maxTopicListInFlightLimiter.withAcquiredPermits(initialSize,
+ AsyncDualMemoryLimiter.LimitType.HEAP_MEMORY,
isPermitRequestCancelled, initialPermits -> {
+ AtomicReference<TopicListWatcher> watcherRef = new
AtomicReference<>();
+ return
namespaceService.getListOfPersistentTopics(namespace).thenCompose(topics -> {
+ long actualSize =
TopicListMemoryLimiter.estimateTopicListSize(topics);
+ listSizeHolder.updateSize(actualSize);
+ // register watcher immediately so that we don't
lose events
+ TopicListWatcher watcher =
+ new TopicListWatcher(this, watcherId,
namespace, topicsPattern, topics,
+ connection.ctx().executor(),
topicListUpdateMaxQueueSize);
+ watcherRef.set(watcher);
+
topicResources.registerPersistentTopicListener(namespace, watcher);
+ // use updated permits to slow down responses so
that backpressure gets applied
+ return
maxTopicListInFlightLimiter.withUpdatedPermits(initialPermits, actualSize,
+ isPermitRequestCancelled, updatedPermits
-> {
+ // reset retry backoff
+ retryBackoff.reset();
+ // just return the watcher which was
already created before
+ return
CompletableFuture.completedFuture(watcher);
+ }, CompletableFuture::failedFuture);
+ }).whenComplete((watcher, exception) -> {
+ if (exception != null) {
+ TopicListWatcher w = watcherRef.get();
+ if (w != null) {
+ w.close();
+
topicResources.deregisterPersistentTopicListener(w);
+ }
+ // triggers a retry
+ throw
FutureUtil.wrapToCompletionException(exception);
+ } else {
+ if (!watcherFuture.complete(watcher)) {
+ log.warn("[{}] Watcher future was already
completed. Deregistering "
+ + "watcherId={}.", connection,
watcherId);
+ watcher.close();
+
topicResources.deregisterPersistentTopicListener(watcher);
+ watchers.remove(watcherId, watcherFuture);
+ }
+ }
+ });
+ }, CompletableFuture::failedFuture);
+ }).exceptionally(t -> {
+ Throwable unwrappedException =
FutureUtil.unwrapCompletionException(t);
+ if (!isPermitRequestCancelled.getAsBoolean() && (
+ unwrappedException instanceof
AsyncSemaphore.PermitAcquireTimeoutException
+ || unwrappedException instanceof
AsyncSemaphore.PermitAcquireQueueFullException)) {
+ // retry with backoff if permit acquisition fails due to
timeout or queue full
+ long retryAfterMillis = this.retryBackoff.next();
+ log.info("[{}] {} when initializing topic list watcher
watcherId={} for namespace {}. Retrying in {} "
+ + "ms.", connection,
unwrappedException.getMessage(), watcherId, namespace,
+ retryAfterMillis);
+ connection.ctx().executor()
+ .schedule(() ->
initializeTopicsListWatcher(watcherFuture, namespace, watcherId, topicsPattern),
+ retryAfterMillis, TimeUnit.MILLISECONDS);
+ } else {
+ log.warn("[{}] Failed to initialize topic list watcher
watcherId={} for namespace {}.", connection,
+ watcherId, namespace, unwrappedException);
+ watcherFuture.completeExceptionally(unwrappedException);
+ }
+ return null;
+ });
}
+ void updateTopicListWatcher(TopicListWatcher watcher) {
+ long watcherId = watcher.id;
+ BooleanSupplier isPermitRequestCancelled = () ->
!connection.isActive() || !watchers.containsKey(watcherId);
+ if (isPermitRequestCancelled.getAsBoolean()) {
+ return;
+ }
+ NamespaceName namespace = watcher.namespace;
+ TopicListSizeResultCache.ResultHolder listSizeHolder =
pulsar.getBrokerService().getTopicListSizeResultCache()
+ .getTopicListSize(namespace.toString(),
CommandGetTopicsOfNamespace.Mode.PERSISTENT);
+ AsyncDualMemoryLimiter maxTopicListInFlightLimiter =
pulsar.getBrokerService().getMaxTopicListInFlightLimiter();
+
+ listSizeHolder.getSizeAsync().thenCompose(initialSize -> {
+ // use heap size limiter to avoid broker getting overwhelmed by a
lot of concurrent topic list requests
+ return maxTopicListInFlightLimiter.withAcquiredPermits(initialSize,
+ AsyncDualMemoryLimiter.LimitType.HEAP_MEMORY,
isPermitRequestCancelled, initialPermits -> {
+ return
namespaceService.getListOfPersistentTopics(namespace).thenCompose(topics -> {
+ long actualSize =
TopicListMemoryLimiter.estimateTopicListSize(topics);
+ listSizeHolder.updateSize(actualSize);
+ // use updated permits to slow down responses so
that backpressure gets applied
+ return
maxTopicListInFlightLimiter.withUpdatedPermits(initialPermits, actualSize,
+ isPermitRequestCancelled, updatedPermits
-> {
+ // reset retry backoff
+ retryBackoff.reset();
+ // just return topics here
+ return
CompletableFuture.completedFuture(topics);
+ }, CompletableFuture::failedFuture);
+ }).whenComplete((topics, exception) -> {
+ if (exception != null) {
+ // triggers a retry
+ throw
FutureUtil.wrapToCompletionException(exception);
+ } else {
+ watcher.updateTopics(topics);
+ }
+ });
+ }, CompletableFuture::failedFuture);
+ }).exceptionally(t -> {
+ Throwable unwrappedException =
FutureUtil.unwrapCompletionException(t);
+ if (!isPermitRequestCancelled.getAsBoolean() && (
+ unwrappedException instanceof
AsyncSemaphore.PermitAcquireTimeoutException
+ || unwrappedException instanceof
AsyncSemaphore.PermitAcquireQueueFullException)) {
+ // retry with backoff if permit acquisition fails due to
timeout or queue full
+ long retryAfterMillis = this.retryBackoff.next();
+ log.info("[{}] {} when updating topic list watcher
watcherId={} for namespace {}. Retrying in {} "
+ + "ms.", connection,
unwrappedException.getMessage(), watcherId, namespace,
+ retryAfterMillis);
+ connection.ctx().executor()
+ .schedule(() -> updateTopicListWatcher(watcher),
retryAfterMillis, TimeUnit.MILLISECONDS);
+ } else {
+ log.warn("[{}] Failed to update topic list watcher
watcherId={} for namespace {}.", connection,
+ watcherId, namespace, unwrappedException);
+ }
+ return null;
+ });
+ }
public void handleWatchTopicListClose(CommandWatchTopicListClose
commandWatchTopicListClose) {
long requestId = commandWatchTopicListClose.getRequestId();
@@ -251,7 +462,7 @@ public class TopicListService {
}
public void deleteTopicListWatcher(Long watcherId) {
- CompletableFuture<TopicListWatcher> watcherFuture =
watchers.get(watcherId);
+ CompletableFuture<TopicListWatcher> watcherFuture =
watchers.remove(watcherId);
if (watcherFuture == null) {
log.info("[{}] TopicListWatcher was not registered on the
connection: {}",
watcherId, connection.toString());
@@ -265,21 +476,20 @@ public class TopicListService {
// create operation will complete, the new watcher will be
discarded.
log.info("[{}] Closed watcher before its creation was completed.
watcherId={}",
connection.toString(), watcherId);
- watchers.remove(watcherId);
- return;
- }
-
- if (watcherFuture.isCompletedExceptionally()) {
- log.info("[{}] Closed watcher that already failed to be created.
watcherId={}",
- connection.toString(), watcherId);
- watchers.remove(watcherId);
return;
}
- // Proceed with normal watcher close
-
topicResources.deregisterPersistentTopicListener(watcherFuture.getNow(null));
- watchers.remove(watcherId);
- log.info("[{}] Closed watcher, watcherId={}", connection.toString(),
watcherId);
+ // deregister topic listener while avoiding race conditions
+ watcherFuture.whenComplete((watcher, t) -> {
+ if (watcher != null) {
+ topicResources.deregisterPersistentTopicListener(watcher);
+ watcher.close();
+ log.info("[{}] Closed watcher, watcherId={}",
connection.toString(), watcherId);
+ } else if (t != null) {
+ log.info("[{}] Closed watcher that failed to be created.
watcherId={}",
+ connection.toString(), watcherId);
+ }
+ });
}
/**
@@ -287,9 +497,76 @@ public class TopicListService {
* @param newTopics topics names added(contains the partition suffix).
*/
public void sendTopicListUpdate(long watcherId, String topicsHash,
List<String> deletedTopics,
- List<String> newTopics) {
- connection.getCommandSender().sendWatchTopicListUpdate(watcherId,
newTopics, deletedTopics, topicsHash);
+ List<String> newTopics, Runnable
completionCallback) {
+ performOperationWithPermitAcquiringRetries(watcherId, "topic list
update", permitAcquireErrorHandler ->
+ () -> connection.getCommandSender()
+ .sendWatchTopicListUpdate(watcherId, newTopics,
deletedTopics, topicsHash,
+ permitAcquireErrorHandler)
+ .whenComplete((__, t) -> {
+ if (t != null) {
+ // this is an unexpected case
+ log.warn("[{}] Failed to send topic list
update for watcherId={}. Watcher will be in "
+ + "inconsistent state.", connection,
watcherId, t);
+ }
+ completionCallback.run();
+ }));
}
+ // performs an operation with infinite permit acquiring retries.
+ // If acquiring permits fails, it will retry after a backoff period
+ private void performOperationWithPermitAcquiringRetries(long watcherId,
String operationName,
+
Function<Function<Throwable, CompletableFuture<Void>>,
+
Supplier<CompletableFuture<Void>>>
+
asyncOperationFactory) {
+ // holds a reference to the operation, this is to resolve a circular
dependency between the error handler and
+ // the actual operation
+ AtomicReference<Supplier<CompletableFuture<Void>>> operationRef = new
AtomicReference<>();
+ // create the error handler for the operation
+ Function<Throwable, CompletableFuture<Void>> permitAcquireErrorHandler
=
+ createPermitAcquireErrorHandler(watcherId, operationName, ()
-> operationRef.get().get());
+ // create the async operation using the factory function. Pass the
error handler to the factory function.
+ Supplier<CompletableFuture<Void>> asyncOperation =
asyncOperationFactory.apply(permitAcquireErrorHandler);
+ // set the operation to run into the operation reference
+ operationRef.set(() -> {
+ if (!connection.isActive() || !watchers.containsKey(watcherId)) {
+ // do nothing if the connection has already been closed or the
watcher has been removed
+ return CompletableFuture.completedFuture(null);
+ }
+ return asyncOperation.get().thenRun(() -> retryBackoff.reset());
+ });
+ // run the operation
+ operationRef.get().get();
+ }
+ // retries acquiring permits until the connection is closed or the watcher
is removed
+ private Function<Throwable, CompletableFuture<Void>>
createPermitAcquireErrorHandler(long watcherId,
+
String operationName,
+
Supplier<CompletableFuture
+
<Void>> operationRef) {
+ ScheduledExecutorService scheduledExecutor =
connection.ctx().channel().eventLoop();
+ AtomicInteger retryCount = new AtomicInteger(0);
+ return t -> {
+ Throwable unwrappedException =
FutureUtil.unwrapCompletionException(t);
+ if (unwrappedException instanceof
AsyncSemaphore.PermitAcquireCancelledException
+ || unwrappedException instanceof
AsyncSemaphore.PermitAcquireAlreadyClosedException
+ || !connection.isActive()
+ || !watchers.containsKey(watcherId)) {
+ // stop retrying and complete successfully
+ return CompletableFuture.completedFuture(null);
+ }
+ long retryDelay = retryBackoff.next();
+ retryCount.incrementAndGet();
+ log.info("[{}] Cannot acquire direct memory tokens for sending {}.
Retry {} in {} ms. {}", connection,
+ operationName, retryCount.get(), retryDelay,
t.getMessage());
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ scheduledExecutor.schedule(() -> FutureUtil.completeAfter(future,
operationRef.get()), retryDelay,
+ TimeUnit.MILLISECONDS);
+ return future;
+ };
+ }
+
+ @VisibleForTesting
+ CompletableFuture<TopicListWatcher> getWatcherFuture(long watcherId) {
+ return watchers.get(watcherId);
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicListServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicListServiceTest.java
index 9109828c025..d85f244b9a3 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicListServiceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicListServiceTest.java
@@ -18,31 +18,70 @@
*/
package org.apache.pulsar.broker.service;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.EventLoop;
+import io.netty.util.concurrent.EventExecutor;
+import io.netty.util.concurrent.ImmediateEventExecutor;
+import io.netty.util.concurrent.ScheduledFuture;
import java.net.InetSocketAddress;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.IntStream;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections4.ListUtils;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.broker.resources.TopicResources;
+import org.apache.pulsar.broker.topiclistlimit.TopicListSizeResultCache;
import org.apache.pulsar.common.api.proto.CommandWatchTopicListClose;
import org.apache.pulsar.common.api.proto.ServerError;
import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.semaphore.AsyncDualMemoryLimiter;
+import org.apache.pulsar.common.semaphore.AsyncDualMemoryLimiterImpl;
+import org.apache.pulsar.common.semaphore.AsyncSemaphore;
import org.apache.pulsar.common.topics.TopicList;
import org.apache.pulsar.common.topics.TopicsPattern;
+import org.apache.pulsar.metadata.api.MetadataStore;
+import org.apache.pulsar.metadata.api.Notification;
+import org.apache.pulsar.metadata.api.NotificationType;
+import org.awaitility.Awaitility;
+import org.jspecify.annotations.NonNull;
+import org.mockito.InOrder;
import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
+@Slf4j
public class TopicListServiceTest {
private TopicListService topicListService;
@@ -52,27 +91,99 @@ public class TopicListServiceTest {
private TopicResources topicResources;
private final TopicsPattern.RegexImplementation
topicsPatternImplementation =
TopicsPattern.RegexImplementation.RE2J_WITH_JDK_FALLBACK;
+ private EventLoop eventLoop;
+ private PulsarCommandSender pulsarCommandSender;
+ private Consumer<Notification> notificationConsumer;
+ private AsyncDualMemoryLimiterImpl memoryLimiter;
+ private ScheduledExecutorService scheduledExecutorService;
+ private PulsarService pulsar;
@BeforeMethod(alwaysRun = true)
public void setup() throws Exception {
lookupSemaphore = new Semaphore(1);
lookupSemaphore.acquire();
topicListFuture = new CompletableFuture<>();
- topicResources = mock(TopicResources.class);
- PulsarService pulsar = mock(PulsarService.class);
-
when(pulsar.getNamespaceService()).thenReturn(mock(NamespaceService.class));
+ AtomicReference<Consumer<Notification>> listenerRef = new
AtomicReference<>();
+ MetadataStore metadataStore = mock(MetadataStore.class);
+ doAnswer(invocationOnMock -> {
+ listenerRef.set(invocationOnMock.getArgument(0));
+ return null;
+ }).when(metadataStore).registerListener(any());
+ topicResources = spy(new TopicResources(metadataStore));
+ notificationConsumer = listenerRef.get();
+
+ pulsar = mock(PulsarService.class);
+ NamespaceService namespaceService = mock(NamespaceService.class);
+ when(pulsar.getNamespaceService()).thenReturn(namespaceService);
+ doAnswer(invocationOnMock -> topicListFuture)
+ .when(namespaceService).getListOfPersistentTopics(any());
when(pulsar.getPulsarResources()).thenReturn(mock(PulsarResources.class));
when(pulsar.getPulsarResources().getTopicResources()).thenReturn(topicResources);
-
when(pulsar.getNamespaceService().getListOfPersistentTopics(any())).thenReturn(topicListFuture);
+ BrokerService brokerService = mock(BrokerService.class);
+ when(pulsar.getBrokerService()).thenReturn(brokerService);
+ TopicListSizeResultCache topicListSizeResultCache =
mock(TopicListSizeResultCache.class);
+
when(brokerService.getTopicListSizeResultCache()).thenReturn(topicListSizeResultCache);
+ TopicListSizeResultCache.ResultHolder resultHolder =
mock(TopicListSizeResultCache.ResultHolder.class);
+
doReturn(resultHolder).when(topicListSizeResultCache).getTopicListSize(anyString(),
any());
+
doReturn(CompletableFuture.completedFuture(1L)).when(resultHolder).getSizeAsync();
+
+ memoryLimiter = new AsyncDualMemoryLimiterImpl(1_000_000, 10000, 500,
1_000_000, 10000, 500);
+
doReturn(memoryLimiter).when(brokerService).getMaxTopicListInFlightLimiter();
connection = mock(ServerCnx.class);
when(connection.getRemoteAddress()).thenReturn(new
InetSocketAddress(10000));
-
when(connection.getCommandSender()).thenReturn(mock(PulsarCommandSender.class));
+ pulsarCommandSender = mock(PulsarCommandSender.class);
+ when(connection.getCommandSender()).thenReturn(pulsarCommandSender);
+ when(connection.isActive()).thenReturn(true);
+ when(pulsarCommandSender.sendWatchTopicListUpdate(anyLong(), any(),
any(), anyString(), any()))
+ .thenReturn(CompletableFuture.completedFuture(null));
+ when(pulsarCommandSender.sendWatchTopicListSuccess(anyLong(),
anyLong(), anyString(), any(), any()))
+ .thenReturn(CompletableFuture.completedFuture(null));
+
+ scheduledExecutorService =
Executors.newSingleThreadScheduledExecutor();
- topicListService = new TopicListService(pulsar, connection, true, 30);
+ ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
+ when(connection.ctx()).thenReturn(ctx);
+ EventExecutor executor = spy(ImmediateEventExecutor.INSTANCE);
+ doReturn(executor).when(ctx).executor();
+ doAnswer(invocationOnMock -> {
+
scheduledExecutorService.schedule(invocationOnMock.<Runnable>getArgument(0),
+ invocationOnMock.getArgument(1),
invocationOnMock.getArgument(2));
+ return mock(ScheduledFuture.class);
+ }).when(executor).schedule(any(Runnable.class), anyLong(), any());
+ Channel channel = mock(Channel.class);
+ when(ctx.channel()).thenReturn(channel);
+ eventLoop = mock(EventLoop.class);
+ when(channel.eventLoop()).thenReturn(eventLoop);
+ doAnswer(invocationOnMock -> {
+
scheduledExecutorService.schedule(invocationOnMock.<Runnable>getArgument(0),
+ invocationOnMock.getArgument(1),
invocationOnMock.getArgument(2));
+ return mock(ScheduledFuture.class);
+ }).when(eventLoop).schedule(any(Runnable.class), anyLong(), any());
+
+ topicListService = newTopicListService();
+
+ }
+ private @NonNull TopicListService newTopicListService() {
+ return new TopicListService(pulsar, connection, true, 30);
+ }
+
+ private @NonNull TopicListService newTopicListService(int
topicListUpdateMaxQueueSize) {
+ return new TopicListService(pulsar, connection, true, 30,
+ topicListUpdateMaxQueueSize);
+ }
+
+ @AfterMethod(alwaysRun = true)
+ void cleanup() {
+ if (memoryLimiter != null) {
+ memoryLimiter.close();
+ }
+ if (scheduledExecutorService != null) {
+ scheduledExecutorService.shutdownNow();
+ }
}
@Test
@@ -88,10 +199,41 @@ public class TopicListServiceTest {
List<String> topics =
Collections.singletonList("persistent://tenant/ns/topic1");
String hash = TopicList.calculateHash(topics);
topicListFuture.complete(topics);
- Assert.assertEquals(1, lookupSemaphore.availablePermits());
+ Awaitility.await().untilAsserted(() -> Assert.assertEquals(1,
lookupSemaphore.availablePermits()));
verify(topicResources).registerPersistentTopicListener(
eq(NamespaceName.get("tenant/ns")),
any(TopicListService.TopicListWatcher.class));
- verify(connection.getCommandSender()).sendWatchTopicListSuccess(7, 13,
hash, topics);
+ Collection<String> expectedTopics = new ArrayList<>(topics);
+
verify(connection.getCommandSender()).sendWatchTopicListSuccess(eq(7L),
eq(13L), eq(hash), eq(expectedTopics),
+ any());
+ }
+
+ @Test
+ public void testCommandWatchSuccessResponseWhenOutOfPermits() throws
ExecutionException, InterruptedException {
+ // acquire all permits
+ AsyncDualMemoryLimiter.AsyncDualMemoryLimiterPermit permit =
+ memoryLimiter.acquire(1_000_000,
AsyncDualMemoryLimiter.LimitType.HEAP_MEMORY,
+ Boolean.FALSE::booleanValue)
+ .get();
+ topicListService.handleWatchTopicList(
+ NamespaceName.get("tenant/ns"),
+ 13,
+ 7,
+ "persistent://tenant/ns/topic\\d",
+ topicsPatternImplementation, null,
+ lookupSemaphore);
+ List<String> topics =
Collections.singletonList("persistent://tenant/ns/topic1");
+ String hash = TopicList.calculateHash(topics);
+ topicListFuture.complete(topics);
+ // wait for acquisition to timeout a few times
+ Thread.sleep(2000);
+ // release the permits
+ memoryLimiter.release(permit);
+ Awaitility.await().untilAsserted(() -> Assert.assertEquals(1,
lookupSemaphore.availablePermits()));
+ verify(topicResources).registerPersistentTopicListener(
+ eq(NamespaceName.get("tenant/ns")),
any(TopicListService.TopicListWatcher.class));
+ Collection<String> expectedTopics = new ArrayList<>(topics);
+
verify(connection.getCommandSender()).sendWatchTopicListSuccess(eq(7L),
eq(13L), eq(hash), eq(expectedTopics),
+ any());
}
@Test
@@ -104,7 +246,7 @@ public class TopicListServiceTest {
topicsPatternImplementation, null,
lookupSemaphore);
topicListFuture.completeExceptionally(new
PulsarServerException("Error"));
- Assert.assertEquals(1, lookupSemaphore.availablePermits());
+ Awaitility.await().untilAsserted(() -> Assert.assertEquals(1,
lookupSemaphore.availablePermits()));
verifyNoInteractions(topicResources);
verify(connection.getCommandSender()).sendErrorResponse(eq(7L),
any(ServerError.class),
eq(PulsarServerException.class.getCanonicalName() + ":
Error"));
@@ -121,12 +263,146 @@ public class TopicListServiceTest {
lookupSemaphore);
List<String> topics =
Collections.singletonList("persistent://tenant/ns/topic1");
topicListFuture.complete(topics);
+
assertThat(topicListService.getWatcherFuture(13)).succeedsWithin(Duration.ofSeconds(2));
CommandWatchTopicListClose watchTopicListClose = new
CommandWatchTopicListClose()
.setRequestId(8)
.setWatcherId(13);
topicListService.handleWatchTopicListClose(watchTopicListClose);
+
verify(topicResources).deregisterPersistentTopicListener(any(TopicListService.TopicListWatcher.class));
}
+ @Test
+ public void testCommandWatchSuccessDirectMemoryAcquirePermitsRetries() {
+ topicListService.handleWatchTopicList(
+ NamespaceName.get("tenant/ns"),
+ 13,
+ 7,
+ "persistent://tenant/ns/topic\\d",
+ topicsPatternImplementation, null,
+ lookupSemaphore);
+ List<String> topics =
Collections.singletonList("persistent://tenant/ns/topic1");
+ String hash = TopicList.calculateHash(topics);
+ AtomicInteger failureCount = new AtomicInteger(0);
+ doAnswer(invocationOnMock -> {
+ if (failureCount.incrementAndGet() < 3) {
+ Throwable failure = new
AsyncSemaphore.PermitAcquireTimeoutException("Acquire timed out");
+ Function<Throwable, CompletableFuture<Void>>
permitAcquireErrorHandler =
+ invocationOnMock.getArgument(4);
+ return permitAcquireErrorHandler.apply(failure);
+ } else {
+ return CompletableFuture.completedFuture(null);
+ }
+ }).when(pulsarCommandSender).sendWatchTopicListSuccess(anyLong(),
anyLong(), anyString(), any(), any());
+ topicListFuture.complete(topics);
+
assertThat(topicListService.getWatcherFuture(13)).succeedsWithin(Duration.ofSeconds(2));
+ Collection<String> expectedTopics = new ArrayList<>(topics);
+ verify(connection.getCommandSender(), timeout(2000L).times(3))
+ .sendWatchTopicListSuccess(eq(7L), eq(13L), eq(hash),
eq(expectedTopics), any());
+ }
+
+ @Test
+ public void testCommandWatchUpdate() {
+ topicListService.handleWatchTopicList(
+ NamespaceName.get("tenant/ns"),
+ 13,
+ 7,
+ "persistent://tenant/ns/topic\\d",
+ topicsPatternImplementation, null,
+ lookupSemaphore);
+ List<String> topics =
Collections.singletonList("persistent://tenant/ns/topic1");
+ topicListFuture.complete(topics);
+
assertThat(topicListService.getWatcherFuture(13)).succeedsWithin(Duration.ofSeconds(2));
+
+ List<String> newTopics =
Collections.singletonList("persistent://tenant/ns/topic2");
+ String hash = TopicList.calculateHash(ListUtils.union(topics,
newTopics));
+ notificationConsumer.accept(
+ new Notification(NotificationType.Created,
"/managed-ledgers/tenant/ns/persistent/topic2"));
+ verify(connection.getCommandSender(), timeout(2000L))
+ .sendWatchTopicListUpdate(eq(13L), eq(newTopics), any(),
eq(hash), any());
+
+ hash = TopicList.calculateHash(newTopics);
+ notificationConsumer.accept(
+ new Notification(NotificationType.Deleted,
"/managed-ledgers/tenant/ns/persistent/topic1"));
+ verify(connection.getCommandSender(), timeout(2000L))
+ .sendWatchTopicListUpdate(eq(13L), eq(List.of()), eq(topics),
eq(hash), any());
+ }
+
+ @Test
+ public void testCommandWatchUpdateRetries() {
+ topicListService.handleWatchTopicList(
+ NamespaceName.get("tenant/ns"),
+ 13,
+ 7,
+ "persistent://tenant/ns/topic\\d",
+ topicsPatternImplementation, null,
+ lookupSemaphore);
+ List<String> topics =
Collections.singletonList("persistent://tenant/ns/topic1");
+ topicListFuture.complete(topics);
+
assertThat(topicListService.getWatcherFuture(13)).succeedsWithin(Duration.ofSeconds(2));
+
+ List<String> newTopics =
Collections.singletonList("persistent://tenant/ns/topic2");
+ String hash = TopicList.calculateHash(ListUtils.union(topics,
newTopics));
+ AtomicInteger failureCount = new AtomicInteger(0);
+ doAnswer(invocationOnMock -> {
+ List<String> newTopicsArg = invocationOnMock.getArgument(1);
+ if (!newTopicsArg.isEmpty() && failureCount.incrementAndGet() < 3)
{
+ Throwable failure = new
AsyncSemaphore.PermitAcquireTimeoutException("Acquire timed out");
+ Function<Throwable, CompletableFuture<Void>>
permitAcquireErrorHandler =
+ invocationOnMock.getArgument(4);
+ return permitAcquireErrorHandler.apply(failure);
+ } else {
+ return CompletableFuture.completedFuture(null);
+ }
+ }).when(pulsarCommandSender).sendWatchTopicListUpdate(anyLong(),
any(), any(), anyString(), any());
+ notificationConsumer.accept(
+ new Notification(NotificationType.Created,
"/managed-ledgers/tenant/ns/persistent/topic2"));
+ notificationConsumer.accept(
+ new Notification(NotificationType.Deleted,
"/managed-ledgers/tenant/ns/persistent/topic2"));
+ InOrder inOrder = inOrder(connection.getCommandSender());
+ inOrder.verify(connection.getCommandSender(), timeout(2000L).times(3))
+ .sendWatchTopicListUpdate(eq(13L), eq(newTopics),
eq(List.of()), eq(hash), any());
+ inOrder.verify(connection.getCommandSender(), timeout(2000L).times(1))
+ .sendWatchTopicListUpdate(eq(13L), eq(List.of()),
eq(newTopics), any(), any());
+ }
+
+ @Test
+ public void testCommandWatchUpdateQueueOverflows() {
+ int topicListUpdateMaxQueueSize = 10;
+ topicListService = newTopicListService(topicListUpdateMaxQueueSize);
+ topicListService.handleWatchTopicList(
+ NamespaceName.get("tenant/ns"),
+ 13,
+ 7,
+ "persistent://tenant/ns/topic\\d+",
+ topicsPatternImplementation, null,
+ lookupSemaphore);
+ List<String> topics =
Collections.singletonList("persistent://tenant/ns/topic1");
+ topicListFuture.complete(topics);
+
assertThat(topicListService.getWatcherFuture(13)).succeedsWithin(Duration.ofSeconds(2));
+
+ CompletableFuture<Void> completePending = new CompletableFuture<>();
+ doReturn(completePending).when(pulsarCommandSender)
+ .sendWatchTopicListUpdate(anyLong(), any(), any(),
anyString(), any());
+ topicListFuture = new CompletableFuture<>();
+
+ // when the queue overflows
+ for (int i = 10; i <= 10 + topicListUpdateMaxQueueSize + 1; i++) {
+ notificationConsumer.accept(
+ new Notification(NotificationType.Created,
"/managed-ledgers/tenant/ns/persistent/topic" + i));
+ }
+
+ // a new listing should be performed. Return 100 topics in the
response, simulating that events have been lost
+ List<String> updatedTopics = IntStream.range(1, 101).mapToObj(i ->
"persistent://tenant/ns/topic" + i).toList();
+ topicListFuture.complete(updatedTopics);
+ // validate that the watcher's matching topics have been updated
+ Awaitility.await().untilAsserted(() -> {
+ CompletableFuture<TopicListService.TopicListWatcher> watcherFuture
= topicListService.getWatcherFuture(13);
+ assertThat(watcherFuture).isNotNull();
+ assertThat(watcherFuture.join().getMatchingTopics())
+ .containsExactlyInAnyOrderElementsOf(updatedTopics);
+ });
+ }
+
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicListWatcherTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicListWatcherTest.java
index 884cdc0ef92..46262e84d38 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicListWatcherTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicListWatcherTest.java
@@ -18,12 +18,19 @@
*/
package org.apache.pulsar.broker.service;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import com.google.common.util.concurrent.MoreExecutors;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.topics.TopicList;
import org.apache.pulsar.common.topics.TopicsPattern;
import org.apache.pulsar.common.topics.TopicsPatternFactory;
@@ -53,7 +60,9 @@ public class TopicListWatcherTest {
@BeforeMethod(alwaysRun = true)
public void setup() {
topicListService = mock(TopicListService.class);
- watcher = new TopicListService.TopicListWatcher(topicListService, ID,
PATTERN, INITIAL_TOPIC_LIST);
+ watcher = new TopicListService.TopicListWatcher(topicListService, ID,
NamespaceName.get("tenant", "ns"),
+ PATTERN, INITIAL_TOPIC_LIST, MoreExecutors.directExecutor(),
9);
+ watcher.sendingCompleted();
}
@Test
@@ -71,8 +80,8 @@ public class TopicListWatcherTest {
List<String> allMatchingTopics = Arrays.asList(
"persistent://tenant/ns/topic1",
"persistent://tenant/ns/topic2", newTopic);
String hash = TopicList.calculateHash(allMatchingTopics);
- verify(topicListService).sendTopicListUpdate(ID, hash,
Collections.emptyList(),
- Collections.singletonList(newTopic));
+ verify(topicListService).sendTopicListUpdate(eq(ID), eq(hash),
eq(Collections.emptyList()),
+ eq(Collections.singletonList(newTopic)), any());
Assert.assertEquals(
allMatchingTopics,
watcher.getMatchingTopics());
@@ -85,8 +94,8 @@ public class TopicListWatcherTest {
List<String> allMatchingTopics =
Collections.singletonList("persistent://tenant/ns/topic2");
String hash = TopicList.calculateHash(allMatchingTopics);
- verify(topicListService).sendTopicListUpdate(ID, hash,
- Collections.singletonList(deletedTopic),
Collections.emptyList());
+ verify(topicListService).sendTopicListUpdate(eq(ID), eq(hash),
+ eq(Collections.singletonList(deletedTopic)),
eq(Collections.emptyList()), any());
Assert.assertEquals(
allMatchingTopics,
watcher.getMatchingTopics());
@@ -100,4 +109,15 @@ public class TopicListWatcherTest {
Arrays.asList("persistent://tenant/ns/topic1",
"persistent://tenant/ns/topic2"),
watcher.getMatchingTopics());
}
+
+ @Test
+ public void testUpdateQueueOverFlowPerformsFullUpdate() {
+ for (int i = 10; i <= 20; i++) {
+ String newTopic = "persistent://tenant/ns/topic" + i;
+ watcher.accept(newTopic, NotificationType.Created);
+ }
+ verify(topicListService).sendTopicListUpdate(anyLong(), anyString(),
any(), any(), any());
+ verify(topicListService).updateTopicListWatcher(any());
+ verifyNoMoreInteractions(topicListService);
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PatternConsumerBackPressureMultipleConsumersTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PatternConsumerBackPressureMultipleConsumersTest.java
index b7f17b8ba3d..8e7f54fde54 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PatternConsumerBackPressureMultipleConsumersTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PatternConsumerBackPressureMultipleConsumersTest.java
@@ -256,7 +256,7 @@ public class
PatternConsumerBackPressureMultipleConsumersTest extends MockedPuls
}
// Use this implementation when PIP-234 isn't available
- private static class SharedClientResources implements AutoCloseable {
+ public static class SharedClientResources implements AutoCloseable {
private final EventLoopGroup ioEventLoopGroup;
private final ExecutorProvider internalExecutorProvider;
private final ExecutorProvider externalExecutorProvider;
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PatternConsumerTopicWatcherBackPressureMultipleConsumersTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PatternConsumerTopicWatcherBackPressureMultipleConsumersTest.java
new file mode 100644
index 00000000000..e47c85d16d9
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PatternConsumerTopicWatcherBackPressureMultipleConsumersTest.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.IntStream;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.impl.ClientBuilderImpl;
+import org.apache.pulsar.client.impl.PatternMultiTopicsConsumerImpl;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.common.semaphore.AsyncDualMemoryLimiter;
+import org.apache.pulsar.common.semaphore.AsyncDualMemoryLimiterImpl;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker-impl")
+public class PatternConsumerTopicWatcherBackPressureMultipleConsumersTest
extends MockedPulsarServiceBaseTest {
+
+ @Override
+ @BeforeMethod
+ protected void setup() throws Exception {
+ isTcpLookup = useTcpLookup();
+ super.internalSetup();
+ setupDefaultTenantAndNamespace();
+ }
+
+ @Override
+ protected void doInitConf() throws Exception {
+ super.doInitConf();
+ conf.setSubscriptionPatternMaxLength(100);
+ }
+
+ protected boolean useTcpLookup() {
+ return true;
+ }
+
+ @Override
+ @AfterMethod(alwaysRun = true)
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @Test(timeOut = 60 * 1000)
+ public void
testPatternConsumerWithLargeAmountOfConcurrentClientConnections()
+ throws PulsarAdminException, InterruptedException, IOException,
ExecutionException, TimeoutException {
+ // create a new namespace for this test
+ String namespace = BrokerTestUtil.newUniqueName("public/ns");
+ admin.namespaces().createNamespace(namespace);
+
+ // use multiple clients so that each client has a separate connection
to the broker
+ final int numberOfClients = 100;
+
+ // create a long topic name to consume more memory per topic
+ final String topicNamePrefix = "persistent://" + namespace + "/" +
StringUtils.repeat('a', 512) + "-";
+ // number of topics to create
+ final int topicCount = 300;
+
+ // create topics
+ createTopics(topicCount, topicNamePrefix, "_0");
+
+ {
+ @Cleanup
+
PatternConsumerBackPressureMultipleConsumersTest.SharedClientResources
sharedClientResources =
+ new
PatternConsumerBackPressureMultipleConsumersTest.SharedClientResources();
+ List<PulsarClientImpl> clients = new ArrayList<>(numberOfClients);
+ @Cleanup
+ Closeable closeClients = () -> {
+ for (PulsarClient client : clients) {
+ try {
+ client.close();
+ } catch (PulsarClientException e) {
+ log.error("Failed to close client {}", client, e);
+ }
+ }
+ };
+ for (int i = 0; i < numberOfClients; i++) {
+ ClientBuilderImpl clientBuilderImpl =
+ (ClientBuilderImpl) PulsarClient.builder()
+ .serviceUrl(getClientServiceUrl())
+ .statsInterval(0, TimeUnit.SECONDS)
+ .operationTimeout(1, TimeUnit.MINUTES);
+ PulsarClientImpl client =
sharedClientResources.useSharedResources(
+
PulsarClientImpl.builder().conf(clientBuilderImpl.getClientConfigurationData())).build();
+ clients.add(client);
+ }
+
+ List<CompletableFuture<Consumer<String>>> consumerFutures = new
ArrayList<>(numberOfClients);
+ for (int i = 0; i < topicCount; i++) {
+ String topicsPattern = namespace + "/a+-" + i + "_[01]$";
+ CompletableFuture<Consumer<String>> consumerFuture =
+ clients.get(i %
numberOfClients).newConsumer(Schema.STRING)
+
.topicsPattern(topicsPattern).subscriptionName("sub" + i)
+ .subscribeAsync();
+ consumerFutures.add(consumerFuture);
+ consumerFuture.exceptionally(throwable -> {
+ log.error("Failed to subscribe to pattern {}",
topicsPattern, throwable);
+ return null;
+ });
+ }
+
+ FutureUtil.waitForAll(consumerFutures).get(60, TimeUnit.SECONDS);
+
+ List<Consumer<String>> consumers =
consumerFutures.stream().map(CompletableFuture::join).toList();
+
+ List<? extends CompletableFuture<?>> watcherFutures =
consumers.stream().map(consumer -> {
+ try {
+ CompletableFuture<?> watcherFuture = consumer instanceof
PatternMultiTopicsConsumerImpl
+ ? (CompletableFuture<?>)
FieldUtils.readField(consumer, "watcherFuture", true) : null;
+ return watcherFuture;
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException(e);
+ }
+ }).filter(Objects::nonNull).toList();
+
+ // wait for all watcher futures to complete
+ FutureUtil.waitForAll(watcherFutures).get(60, TimeUnit.SECONDS);
+
+ PulsarClientImpl client = clients.get(0);
+ sendAndValidate(topicCount, client, consumers, topicNamePrefix,
"_0");
+
+ // create additional topics
+ createTopics(topicCount, topicNamePrefix, "_1");
+
+ // send to additional topic
+ sendAndValidate(topicCount, client, consumers, topicNamePrefix,
"_1");
+ }
+
+ validateThatTokensHaventLeakedOrIncreased();
+ }
+
+ protected void validateThatTokensHaventLeakedOrIncreased() {
+ AsyncDualMemoryLimiterImpl limiter =
+ pulsar.getBrokerService().getMaxTopicListInFlightLimiter();
+
assertThat(limiter.getLimiter(AsyncDualMemoryLimiter.LimitType.HEAP_MEMORY).getAvailablePermits())
+
.isEqualTo(pulsar.getConfiguration().getMaxTopicListInFlightHeapMemSizeMB() *
1024L * 1024);
+
assertThat(limiter.getLimiter(AsyncDualMemoryLimiter.LimitType.HEAP_MEMORY).getAcquiredPermits())
+ .isEqualTo(0);
+
assertThat(limiter.getLimiter(AsyncDualMemoryLimiter.LimitType.DIRECT_MEMORY).getAvailablePermits())
+
.isEqualTo(pulsar.getConfiguration().getMaxTopicListInFlightDirectMemSizeMB() *
1024L * 1024);
+
assertThat(limiter.getLimiter(AsyncDualMemoryLimiter.LimitType.DIRECT_MEMORY).getAcquiredPermits())
+ .isEqualTo(0);
+ }
+
+ private void createTopics(int topicCount, String topicNamePrefix, String
topicNameSuffix)
+ throws InterruptedException, ExecutionException, TimeoutException {
+ List<CompletableFuture<Void>> createTopicFutures = IntStream.range(0,
topicCount)
+ .mapToObj(i ->
admin.topics().createNonPartitionedTopicAsync(topicNamePrefix + i +
topicNameSuffix))
+ .toList();
+ // wait for all topics to be created
+ FutureUtil.waitForAll(createTopicFutures).get(30, TimeUnit.SECONDS);
+ }
+
+ private static void sendAndValidate(int topicCount, PulsarClientImpl
client, List<Consumer<String>> consumers,
+ String topicNamePrefix,
+ String topicNameSuffix) throws
PulsarClientException {
+ for (int i = 0; i < topicCount; i++) {
+ // send message to every topic
+ Producer<String> producer =
+ client.newProducer(Schema.STRING).topic(topicNamePrefix +
i + topicNameSuffix).create();
+ producer.send("test" + i);
+ producer.close();
+ }
+
+ // validate that every consumer receives a single message
+ for (int i = 0; i < consumers.size(); i++) {
+ Consumer<String> consumer = consumers.get(i);
+ int finalI = i;
+ assertThat(consumer.receive(10, TimeUnit.SECONDS)).isNotNull()
+ .satisfies(message -> assertThat(message.getValue())
+ .isEqualTo("test" + finalI));
+ // validate that no more messages are received
+ assertThat(consumer.receive(1, TimeUnit.MICROSECONDS)).isNull();
+ }
+ }
+
+ protected String getClientServiceUrl() {
+ return lookupUrl.toString();
+ }
+}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index e0aac72c1a3..c6912ae4fe1 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -31,6 +31,7 @@ import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Base64;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -1613,7 +1614,7 @@ public class Commands {
* @param topics topic names which are matching, the topic name contains
the partition suffix.
*/
public static BaseCommand newWatchTopicListSuccess(long requestId, long
watcherId, String topicsHash,
- List<String> topics) {
+ Collection<String>
topics) {
BaseCommand cmd = new
BaseCommand().setType(Type.WATCH_TOPIC_LIST_SUCCESS);
cmd.setWatchTopicListSuccess()
.setRequestId(requestId)
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/semaphore/AsyncDualMemoryLimiterImpl.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/semaphore/AsyncDualMemoryLimiterImpl.java
index 7a707f8b5e5..58603851662 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/semaphore/AsyncDualMemoryLimiterImpl.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/semaphore/AsyncDualMemoryLimiterImpl.java
@@ -175,5 +175,11 @@ public class AsyncDualMemoryLimiterImpl implements
AsyncDualMemoryLimiter, AutoC
public AsyncSemaphore.AsyncSemaphorePermit getUnderlyingPermit() {
return underlyingPermit;
}
+
+ @Override
+ public String toString() {
+ return "DualMemoryLimiterPermit@" + System.identityHashCode(this)
+ "{" + "limitType=" + limitType
+ + ", permits=" + underlyingPermit.getPermits() + '}';
+ }
}
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/semaphore/AsyncDualMemoryLimiterUtil.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/semaphore/AsyncDualMemoryLimiterUtil.java
index 616681dc155..9f3880a2c09 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/semaphore/AsyncDualMemoryLimiterUtil.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/semaphore/AsyncDualMemoryLimiterUtil.java
@@ -93,17 +93,17 @@ public class AsyncDualMemoryLimiterUtil {
dualMemoryLimiter,
BooleanSupplier isCancelled,
BaseCommand command,
-
Consumer<Throwable>
+
Function<Throwable,
+
CompletableFuture<Void>>
permitAcquireErrorHandler
) {
// Calculate serialized size before acquiring permits
int serializedSize = command.getSerializedSize();
// Acquire permits
return dualMemoryLimiter.acquire(serializedSize,
AsyncDualMemoryLimiter.LimitType.DIRECT_MEMORY, isCancelled)
- .whenComplete((permits, t) -> {
+ .handle((permits, t) -> {
if (t != null) {
- permitAcquireErrorHandler.accept(t);
- return;
+ return permitAcquireErrorHandler.apply(t);
}
try {
// Serialize the response
@@ -118,6 +118,7 @@ public class AsyncDualMemoryLimiterUtil {
dualMemoryLimiter.release(permits);
throw e;
}
- }).thenApply(__ -> null);
+ return CompletableFuture.<Void>completedFuture(null);
+ }).thenCompose(Function.identity());
}
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/semaphore/AsyncSemaphoreImpl.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/semaphore/AsyncSemaphoreImpl.java
index 6df47256ee3..6ee9028711f 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/semaphore/AsyncSemaphoreImpl.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/semaphore/AsyncSemaphoreImpl.java
@@ -359,5 +359,10 @@ public class AsyncSemaphoreImpl implements AsyncSemaphore,
AutoCloseable {
public long releasePermits() {
return PERMITS_UPDATER.getAndSet(this, 0);
}
+
+ @Override
+ public String toString() {
+ return "SemaphorePermit@" + System.identityHashCode(this) +
"[permits=" + permits + "]";
+ }
}
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicList.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicList.java
index 629aedb3e72..0e38f022472 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicList.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicList.java
@@ -27,7 +27,9 @@ import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.stream.Collector;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import lombok.experimental.UtilityClass;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicDomain;
@@ -56,6 +58,22 @@ public class TopicList {
* Filter topics using a TopicListPattern instance.
*/
public static List<String> filterTopics(List<String> original,
TopicsPattern topicsPattern) {
+ return filterTopics(original, topicsPattern, Collectors.toList());
+ }
+
+ /**
+ * Filter topics using a TopicListPattern instance and collect the results
using a specified collector.
+ */
+ public static <R> R filterTopics(List<String> original, TopicsPattern
topicsPattern,
+ Collector<String, ?, R>
collector) {
+ return filterTopicsToStream(original, topicsPattern)
+ .collect(collector);
+ }
+
+ /**
+ * Filter topics using a TopicListPattern instance and return a stream of
filtered topic names.
+ */
+ public static Stream<String> filterTopicsToStream(List<String> original,
TopicsPattern topicsPattern) {
return original.stream()
.map(TopicName::get)
.filter(topicName -> {
@@ -63,8 +81,7 @@ public class TopicList {
String removedScheme =
SCHEME_SEPARATOR_PATTERN.split(partitionedTopicName)[1];
return topicsPattern.matches(removedScheme);
})
- .map(TopicName::toString)
- .collect(Collectors.toList());
+ .map(TopicName::toString);
}
public static List<String> filterSystemTopic(List<String> original) {
@@ -73,7 +90,7 @@ public class TopicList {
.collect(Collectors.toList());
}
- public static String calculateHash(List<String> topics) {
+ public static String calculateHash(Collection<String> topics) {
Hasher hasher = Hashing.crc32c().newHasher();
String[] sortedTopics = topics.toArray(new String[topics.size()]);
Arrays.sort(sortedTopics);
diff --git
a/pulsar-common/src/test/java/org/apache/pulsar/common/semaphore/AsyncDualMemoryLimiterUtilTest.java
b/pulsar-common/src/test/java/org/apache/pulsar/common/semaphore/AsyncDualMemoryLimiterUtilTest.java
index 86d155353df..e07d9410b47 100644
---
a/pulsar-common/src/test/java/org/apache/pulsar/common/semaphore/AsyncDualMemoryLimiterUtilTest.java
+++
b/pulsar-common/src/test/java/org/apache/pulsar/common/semaphore/AsyncDualMemoryLimiterUtilTest.java
@@ -262,7 +262,10 @@ public class AsyncDualMemoryLimiterUtilTest {
limiter,
() -> false,
command,
- throwable -> errorHandlerCalled.set(true)
+ throwable -> {
+ errorHandlerCalled.set(true);
+ return FutureUtil.failedFuture(throwable);
+ }
);
result.get(1, TimeUnit.SECONDS);
@@ -297,6 +300,7 @@ public class AsyncDualMemoryLimiterUtilTest {
throwable -> {
errorHandlerCalled.set(true);
capturedError.set(throwable);
+ return FutureUtil.failedFuture(throwable);
}
);
@@ -323,7 +327,10 @@ public class AsyncDualMemoryLimiterUtilTest {
limiter,
() -> false,
command,
- throwable -> errorHandlerCalled.set(true)
+ throwable -> {
+ errorHandlerCalled.set(true);
+ return FutureUtil.failedFuture(throwable);
+ }
);
try {
@@ -351,7 +358,10 @@ public class AsyncDualMemoryLimiterUtilTest {
limiter,
() -> false,
command,
- throwable -> errorHandlerCalled.set(true)
+ throwable -> {
+ errorHandlerCalled.set(true);
+ return FutureUtil.failedFuture(throwable);
+ }
);
try {
@@ -382,7 +392,10 @@ public class AsyncDualMemoryLimiterUtilTest {
limiter,
cancelled::get,
command,
- throwable -> errorHandlerCalled.set(true)
+ throwable -> {
+ errorHandlerCalled.set(true);
+ return FutureUtil.failedFuture(throwable);
+ }
);
// Cancel the request
@@ -420,6 +433,7 @@ public class AsyncDualMemoryLimiterUtilTest {
() -> false,
command,
throwable -> {
+ return FutureUtil.failedFuture(throwable);
}
);
}
@@ -453,7 +467,10 @@ public class AsyncDualMemoryLimiterUtilTest {
limiter,
() -> false,
command,
- throwable -> errorHandlerCalled.set(true)
+ throwable -> {
+ errorHandlerCalled.set(true);
+ return FutureUtil.failedFuture(throwable);
+ }
);
result.get(1, TimeUnit.SECONDS);
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
index d276c7996a6..b0bd26bbcdf 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
@@ -421,6 +421,7 @@ public class LookupProxyHandler {
clientAddress, t.getMessage());
writeAndFlush(Commands.newError(clientRequestId,
ServerError.TooManyRequests,
"Failed due to direct memory limit exceeded"));
+ return CompletableFuture.completedFuture(null);
});
}