lhotari commented on code in PR #25070:
URL: https://github.com/apache/pulsar/pull/25070#discussion_r2686440560


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java:
##########
@@ -71,24 +102,67 @@ public List<String> getMatchingTopics() {
          */
         @Override
         public void accept(String topicName, NotificationType 
notificationType) {
+            if (closed) {
+                return;
+            }
             String partitionedTopicName = 
TopicName.get(topicName).getPartitionedTopicName();
             String domainLessTopicName = 
TopicList.removeTopicDomainScheme(partitionedTopicName);
 
             if (topicsPattern.matches(domainLessTopicName)) {
-                List<String> newTopics;
-                List<String> deletedTopics;
+                List<String> newTopics = Collections.emptyList();
+                List<String> deletedTopics = Collections.emptyList();
                 if (notificationType == NotificationType.Deleted) {
-                    newTopics = Collections.emptyList();
-                    deletedTopics = Collections.singletonList(topicName);
-                    matchingTopics.remove(topicName);
-                } else {
-                    deletedTopics = Collections.emptyList();
+                    if (matchingTopics.remove(topicName)) {
+                        deletedTopics = Collections.singletonList(topicName);
+                    }
+                } else if (matchingTopics.add(topicName)) {
                     newTopics = Collections.singletonList(topicName);
-                    matchingTopics.add(topicName);
                 }
-                String hash = TopicList.calculateHash(matchingTopics);
-                topicListService.sendTopicListUpdate(id, hash, deletedTopics, 
newTopics);
+                if (!newTopics.isEmpty() || !deletedTopics.isEmpty()) {
+                    String hash = TopicList.calculateHash(matchingTopics);
+                    sendTopicListUpdate(hash, deletedTopics, newTopics);
+                }
+            }
+        }
+
+        // sends updates one-by-one so that ordering is retained
+        private synchronized void sendTopicListUpdate(String hash, 
List<String> deletedTopics, List<String> newTopics) {
+            if (closed) {
+                return;
+            }
+            Runnable task = () -> topicListService.sendTopicListUpdate(id, 
hash, deletedTopics, newTopics,
+                    this::sendingCompleted);
+            if (!sendingInProgress) {
+                sendingInProgress = true;
+                executor.execute(task);
+            } else {
+                // if sendTopicListSuccess hasn't completed, add to a queue to 
be executed after it completes
+                if (!sendTopicListUpdateTasks.offer(task)) {
+                    log.warn("Dropping update for watcher id {} matching {} 
since queue is full.", id,
+                            topicsPattern.inputPattern());
+                }

Review Comment:
   Yes, this is a valid concern. The issue #25020 is also related.
   
   In this case, it's most likely useful to close the topic list watcher. One 
of the challenges is that the original protocol for Topic list watchers has 
some gaps. The `CommandWatchTopicListClose` command isn't designed to be sent 
from the broker to the client at the moment. The only way that a broker can 
close a watcher is by closing the connection. However, this is problematic 
since closing the connection will also close all producers and consumers 
sharing the same connection.
   
   Another solution that could be useful would be to mark the state as dirty 
and perform the complete listing so that the hash of the topic listing would be 
in consistent state. 
   
   Besides #25020, another related detail is that Metadata Store events could 
be lost. Because of that, topic list watcher should perform a full refresh with 
a configurable interval, for example every 5 minutes. 
   
   Another gap in the topic list watcher protocol is that the broker cannot 
send the updated hash to the client. A better protocol would be to send the 
plain hash to the client each time the broker performs a full refresh. If 
there's a difference to the state on the client side, the client could request 
the full listing.
   Currently, the client should perform a scheduled polling to ensure that the 
state doesn't get out of sync from the broker side state. This isn't properly 
implemented in the Java client at the moment. Similar issues might be present 
also in other Pulsar client implementations.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to