lhotari commented on code in PR #25070:
URL: https://github.com/apache/pulsar/pull/25070#discussion_r2686440560
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java:
##########
@@ -71,24 +102,67 @@ public List<String> getMatchingTopics() {
*/
@Override
public void accept(String topicName, NotificationType
notificationType) {
+ if (closed) {
+ return;
+ }
String partitionedTopicName =
TopicName.get(topicName).getPartitionedTopicName();
String domainLessTopicName =
TopicList.removeTopicDomainScheme(partitionedTopicName);
if (topicsPattern.matches(domainLessTopicName)) {
- List<String> newTopics;
- List<String> deletedTopics;
+ List<String> newTopics = Collections.emptyList();
+ List<String> deletedTopics = Collections.emptyList();
if (notificationType == NotificationType.Deleted) {
- newTopics = Collections.emptyList();
- deletedTopics = Collections.singletonList(topicName);
- matchingTopics.remove(topicName);
- } else {
- deletedTopics = Collections.emptyList();
+ if (matchingTopics.remove(topicName)) {
+ deletedTopics = Collections.singletonList(topicName);
+ }
+ } else if (matchingTopics.add(topicName)) {
newTopics = Collections.singletonList(topicName);
- matchingTopics.add(topicName);
}
- String hash = TopicList.calculateHash(matchingTopics);
- topicListService.sendTopicListUpdate(id, hash, deletedTopics,
newTopics);
+ if (!newTopics.isEmpty() || !deletedTopics.isEmpty()) {
+ String hash = TopicList.calculateHash(matchingTopics);
+ sendTopicListUpdate(hash, deletedTopics, newTopics);
+ }
+ }
+ }
+
+ // sends updates one-by-one so that ordering is retained
+ private synchronized void sendTopicListUpdate(String hash,
List<String> deletedTopics, List<String> newTopics) {
+ if (closed) {
+ return;
+ }
+ Runnable task = () -> topicListService.sendTopicListUpdate(id,
hash, deletedTopics, newTopics,
+ this::sendingCompleted);
+ if (!sendingInProgress) {
+ sendingInProgress = true;
+ executor.execute(task);
+ } else {
+ // if sendTopicListSuccess hasn't completed, add to a queue to
be executed after it completes
+ if (!sendTopicListUpdateTasks.offer(task)) {
+ log.warn("Dropping update for watcher id {} matching {}
since queue is full.", id,
+ topicsPattern.inputPattern());
+ }
Review Comment:
Yes, this is a valid concern. The issue #25020 is also related.
In this case, it's most likely useful to close the topic list watcher. One
of the challenges is that the original protocol for Topic list watchers has
some gaps. The `CommandWatchTopicListClose` command isn't designed to be sent
from the broker to the client at the moment. The only way that a broker can
close a watcher is by closing the connection. However, this is problematic
since closing the connection will also close all producers and consumers
sharing the same connection.
Another solution that could be useful would be to mark the state as dirty
and perform the complete listing so that the hash of the topic listing would be
in consistent state.
Besides #25020, another related detail is that Metadata Store events could
be lost. Because of that, topic list watcher should perform a full refresh with
a configurable interval, for example every 5 minutes.
Another gap in the topic list watcher protocol is that the broker cannot
send the updated hash to the client. A better protocol would be to send the
plain hash to the client each time the broker performs a full refresh. If
there's a difference to the state on the client side, the client could request
the full listing.
Currently, the client should perform a scheduled polling to ensure that the
state doesn't get out of sync from the broker side state. This isn't properly
implemented in the Java client at the moment. Similar issues might be present
also in other Pulsar client implementations.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]