Denovo1998 commented on code in PR #25070:
URL: https://github.com/apache/pulsar/pull/25070#discussion_r2686150566


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java:
##########
@@ -71,24 +102,67 @@ public List<String> getMatchingTopics() {
          */
         @Override
         public void accept(String topicName, NotificationType 
notificationType) {
+            if (closed) {
+                return;
+            }
             String partitionedTopicName = 
TopicName.get(topicName).getPartitionedTopicName();
             String domainLessTopicName = 
TopicList.removeTopicDomainScheme(partitionedTopicName);
 
             if (topicsPattern.matches(domainLessTopicName)) {
-                List<String> newTopics;
-                List<String> deletedTopics;
+                List<String> newTopics = Collections.emptyList();
+                List<String> deletedTopics = Collections.emptyList();
                 if (notificationType == NotificationType.Deleted) {
-                    newTopics = Collections.emptyList();
-                    deletedTopics = Collections.singletonList(topicName);
-                    matchingTopics.remove(topicName);
-                } else {
-                    deletedTopics = Collections.emptyList();
+                    if (matchingTopics.remove(topicName)) {
+                        deletedTopics = Collections.singletonList(topicName);
+                    }
+                } else if (matchingTopics.add(topicName)) {
                     newTopics = Collections.singletonList(topicName);
-                    matchingTopics.add(topicName);
                 }
-                String hash = TopicList.calculateHash(matchingTopics);
-                topicListService.sendTopicListUpdate(id, hash, deletedTopics, 
newTopics);
+                if (!newTopics.isEmpty() || !deletedTopics.isEmpty()) {
+                    String hash = TopicList.calculateHash(matchingTopics);
+                    sendTopicListUpdate(hash, deletedTopics, newTopics);
+                }
+            }
+        }
+
+        // sends updates one-by-one so that ordering is retained
+        private synchronized void sendTopicListUpdate(String hash, 
List<String> deletedTopics, List<String> newTopics) {
+            if (closed) {
+                return;
+            }
+            Runnable task = () -> topicListService.sendTopicListUpdate(id, 
hash, deletedTopics, newTopics,
+                    this::sendingCompleted);
+            if (!sendingInProgress) {
+                sendingInProgress = true;
+                executor.execute(task);
+            } else {
+                // if sendTopicListSuccess hasn't completed, add to a queue to 
be executed after it completes
+                if (!sendTopicListUpdateTasks.offer(task)) {
+                    log.warn("Dropping update for watcher id {} matching {} 
since queue is full.", id,
+                            topicsPattern.inputPattern());
+                }

Review Comment:
   Is there consistency risk?
   
   After the delta update, there is no hash check / automatic full correction 
mechanism on the client side (`pulsar-client/TopicListWatcher` just appends 
add/remove), and the final subscription set may permanently deviate from the 
real state.
   
   Perhaps it needs to be changed to:
   
   - Close watcher when the queue is full (forcing the client to rebuild and 
resynchronize fully)?
   - Or change the "queue" from storing delta runnable to "merged": only ensure 
the final state (e.g., only keep one task that "requires recalculation and send 
diff"), to avoid infinite accumulation of delta?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java:
##########
@@ -218,31 +278,103 @@ public void handleWatchTopicList(NamespaceName 
namespaceName, long watcherId, lo
                 });
     }
 
+    private void sendTopicListSuccessWithPermitAcquiringRetries(long 
watcherId, long requestId,
+                                                                
Collection<String> topicList,
+                                                                String hash,
+                                                                Runnable 
successfulCompletionCallback,
+                                                                Runnable 
failedCompletionCallback) {
+        performOperationWithPermitAcquiringRetries(watcherId, "topic list 
success", permitAcquireErrorHandler ->
+                () -> connection.getCommandSender()
+                        .sendWatchTopicListSuccess(requestId, watcherId, hash, 
topicList, permitAcquireErrorHandler)
+                        .whenComplete((__, t) -> {
+                            if (t != null) {
+                                // this is an unexpected case
+                                log.warn("[{}] Failed to send topic list 
success for watcherId={}. "
+                                        + "Watcher is not active.", 
connection, watcherId, t);
+                                failedCompletionCallback.run();
+                            } else {
+                                // completed successfully, run the callback
+                                successfulCompletionCallback.run();
+                            }
+                        }));
+    }
+
     /***
      * @param topicsPattern The regexp for the topic name(not contains 
partition suffix).
      */
     public void 
initializeTopicsListWatcher(CompletableFuture<TopicListWatcher> watcherFuture,
             NamespaceName namespace, long watcherId, TopicsPattern 
topicsPattern) {
-        namespaceService.getListOfPersistentTopics(namespace).
-                thenApply(topics -> {
-                    TopicListWatcher watcher = new TopicListWatcher(this, 
watcherId, topicsPattern, topics);
-                    topicResources.registerPersistentTopicListener(namespace, 
watcher);
-                    return watcher;
-                }).
-                whenComplete((watcher, exception) -> {
-                    if (exception != null) {
-                        watcherFuture.completeExceptionally(exception);
-                    } else {
-                        if (!watcherFuture.complete(watcher)) {
-                            log.warn("[{}] Watcher future was already 
completed. Deregistering watcherId={}.",
-                                    connection.toString(), watcherId);
-                            
topicResources.deregisterPersistentTopicListener(watcher);
-                        }
-                    }
-                });
+        BooleanSupplier isPermitRequestCancelled = () -> 
!connection.isActive() || !watchers.containsKey(watcherId);
+        if (isPermitRequestCancelled.getAsBoolean()) {
+            return;
+        }
+        TopicListSizeResultCache.ResultHolder listSizeHolder = 
pulsar.getBrokerService().getTopicListSizeResultCache()
+                .getTopicListSize(namespace.toString(), 
CommandGetTopicsOfNamespace.Mode.PERSISTENT);
+        AsyncDualMemoryLimiter maxTopicListInFlightLimiter = 
pulsar.getBrokerService().getMaxTopicListInFlightLimiter();
+
+        listSizeHolder.getSizeAsync().thenCompose(initialSize -> {
+            // use heap size limiter to avoid broker getting overwhelmed by a 
lot of concurrent topic list requests
+            return maxTopicListInFlightLimiter.withAcquiredPermits(initialSize,
+                    AsyncDualMemoryLimiter.LimitType.HEAP_MEMORY, 
isPermitRequestCancelled, initialPermits -> {
+                        AtomicReference<TopicListWatcher> watcherRef = new 
AtomicReference<>();
+                        return 
namespaceService.getListOfPersistentTopics(namespace).thenCompose(topics -> {
+                            long actualSize = 
TopicListMemoryLimiter.estimateTopicListSize(topics);
+                            listSizeHolder.updateSize(actualSize);
+                            // register watcher immediately so that we don't 
lose events
+                            TopicListWatcher watcher =
+                                    new TopicListWatcher(this, watcherId, 
topicsPattern, topics,
+                                            connection.ctx().executor());
+                            watcherRef.set(watcher);
+                            
topicResources.registerPersistentTopicListener(namespace, watcher);
+                            // use updated permits to slow down responses so 
that backpressure gets applied
+                            return 
maxTopicListInFlightLimiter.withUpdatedPermits(initialPermits, actualSize,
+                                    isPermitRequestCancelled, updatedPermits 
-> {
+                                        // reset retry backoff
+                                        retryBackoff.reset();
+                                        // just return the watcher which was 
already created before
+                                        return 
CompletableFuture.completedFuture(watcher);
+                                    }, CompletableFuture::failedFuture);
+                        }).whenComplete((watcher, exception) -> {
+                            if (exception != null) {
+                                if (watcherRef.get() != null) {
+                                    watcher.close();

Review Comment:
   Is there a potential NPE here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to