lhotari commented on code in PR #25070:
URL: https://github.com/apache/pulsar/pull/25070#discussion_r2686797205


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java:
##########
@@ -218,31 +278,103 @@ public void handleWatchTopicList(NamespaceName 
namespaceName, long watcherId, lo
                 });
     }
 
+    private void sendTopicListSuccessWithPermitAcquiringRetries(long 
watcherId, long requestId,
+                                                                
Collection<String> topicList,
+                                                                String hash,
+                                                                Runnable 
successfulCompletionCallback,
+                                                                Runnable 
failedCompletionCallback) {
+        performOperationWithPermitAcquiringRetries(watcherId, "topic list 
success", permitAcquireErrorHandler ->
+                () -> connection.getCommandSender()
+                        .sendWatchTopicListSuccess(requestId, watcherId, hash, 
topicList, permitAcquireErrorHandler)
+                        .whenComplete((__, t) -> {
+                            if (t != null) {
+                                // this is an unexpected case
+                                log.warn("[{}] Failed to send topic list 
success for watcherId={}. "
+                                        + "Watcher is not active.", 
connection, watcherId, t);
+                                failedCompletionCallback.run();
+                            } else {
+                                // completed successfully, run the callback
+                                successfulCompletionCallback.run();
+                            }
+                        }));
+    }
+
     /***
      * @param topicsPattern The regexp for the topic name(not contains 
partition suffix).
      */
     public void 
initializeTopicsListWatcher(CompletableFuture<TopicListWatcher> watcherFuture,
             NamespaceName namespace, long watcherId, TopicsPattern 
topicsPattern) {
-        namespaceService.getListOfPersistentTopics(namespace).
-                thenApply(topics -> {
-                    TopicListWatcher watcher = new TopicListWatcher(this, 
watcherId, topicsPattern, topics);
-                    topicResources.registerPersistentTopicListener(namespace, 
watcher);
-                    return watcher;
-                }).
-                whenComplete((watcher, exception) -> {
-                    if (exception != null) {
-                        watcherFuture.completeExceptionally(exception);
-                    } else {
-                        if (!watcherFuture.complete(watcher)) {
-                            log.warn("[{}] Watcher future was already 
completed. Deregistering watcherId={}.",
-                                    connection.toString(), watcherId);
-                            
topicResources.deregisterPersistentTopicListener(watcher);
-                        }
-                    }
-                });
+        BooleanSupplier isPermitRequestCancelled = () -> 
!connection.isActive() || !watchers.containsKey(watcherId);
+        if (isPermitRequestCancelled.getAsBoolean()) {
+            return;
+        }
+        TopicListSizeResultCache.ResultHolder listSizeHolder = 
pulsar.getBrokerService().getTopicListSizeResultCache()
+                .getTopicListSize(namespace.toString(), 
CommandGetTopicsOfNamespace.Mode.PERSISTENT);
+        AsyncDualMemoryLimiter maxTopicListInFlightLimiter = 
pulsar.getBrokerService().getMaxTopicListInFlightLimiter();
+
+        listSizeHolder.getSizeAsync().thenCompose(initialSize -> {
+            // use heap size limiter to avoid broker getting overwhelmed by a 
lot of concurrent topic list requests
+            return maxTopicListInFlightLimiter.withAcquiredPermits(initialSize,
+                    AsyncDualMemoryLimiter.LimitType.HEAP_MEMORY, 
isPermitRequestCancelled, initialPermits -> {
+                        AtomicReference<TopicListWatcher> watcherRef = new 
AtomicReference<>();
+                        return 
namespaceService.getListOfPersistentTopics(namespace).thenCompose(topics -> {
+                            long actualSize = 
TopicListMemoryLimiter.estimateTopicListSize(topics);
+                            listSizeHolder.updateSize(actualSize);
+                            // register watcher immediately so that we don't 
lose events
+                            TopicListWatcher watcher =
+                                    new TopicListWatcher(this, watcherId, 
topicsPattern, topics,
+                                            connection.ctx().executor());
+                            watcherRef.set(watcher);
+                            
topicResources.registerPersistentTopicListener(namespace, watcher);
+                            // use updated permits to slow down responses so 
that backpressure gets applied
+                            return 
maxTopicListInFlightLimiter.withUpdatedPermits(initialPermits, actualSize,
+                                    isPermitRequestCancelled, updatedPermits 
-> {
+                                        // reset retry backoff
+                                        retryBackoff.reset();
+                                        // just return the watcher which was 
already created before
+                                        return 
CompletableFuture.completedFuture(watcher);
+                                    }, CompletableFuture::failedFuture);
+                        }).whenComplete((watcher, exception) -> {
+                            if (exception != null) {
+                                if (watcherRef.get() != null) {
+                                    watcher.close();

Review Comment:
   Addressed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to