Denovo1998 commented on code in PR #25070:
URL: https://github.com/apache/pulsar/pull/25070#discussion_r2686150566
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java:
##########
@@ -71,24 +102,67 @@ public List<String> getMatchingTopics() {
*/
@Override
public void accept(String topicName, NotificationType
notificationType) {
+ if (closed) {
+ return;
+ }
String partitionedTopicName =
TopicName.get(topicName).getPartitionedTopicName();
String domainLessTopicName =
TopicList.removeTopicDomainScheme(partitionedTopicName);
if (topicsPattern.matches(domainLessTopicName)) {
- List<String> newTopics;
- List<String> deletedTopics;
+ List<String> newTopics = Collections.emptyList();
+ List<String> deletedTopics = Collections.emptyList();
if (notificationType == NotificationType.Deleted) {
- newTopics = Collections.emptyList();
- deletedTopics = Collections.singletonList(topicName);
- matchingTopics.remove(topicName);
- } else {
- deletedTopics = Collections.emptyList();
+ if (matchingTopics.remove(topicName)) {
+ deletedTopics = Collections.singletonList(topicName);
+ }
+ } else if (matchingTopics.add(topicName)) {
newTopics = Collections.singletonList(topicName);
- matchingTopics.add(topicName);
}
- String hash = TopicList.calculateHash(matchingTopics);
- topicListService.sendTopicListUpdate(id, hash, deletedTopics,
newTopics);
+ if (!newTopics.isEmpty() || !deletedTopics.isEmpty()) {
+ String hash = TopicList.calculateHash(matchingTopics);
+ sendTopicListUpdate(hash, deletedTopics, newTopics);
+ }
+ }
+ }
+
+ // sends updates one-by-one so that ordering is retained
+ private synchronized void sendTopicListUpdate(String hash,
List<String> deletedTopics, List<String> newTopics) {
+ if (closed) {
+ return;
+ }
+ Runnable task = () -> topicListService.sendTopicListUpdate(id,
hash, deletedTopics, newTopics,
+ this::sendingCompleted);
+ if (!sendingInProgress) {
+ sendingInProgress = true;
+ executor.execute(task);
+ } else {
+ // if sendTopicListSuccess hasn't completed, add to a queue to
be executed after it completes
+ if (!sendTopicListUpdateTasks.offer(task)) {
+ log.warn("Dropping update for watcher id {} matching {}
since queue is full.", id,
+ topicsPattern.inputPattern());
+ }
Review Comment:
Is there consistency risk?
After the delta update, there is no hash check / automatic full correction
mechanism on the client side (`pulsar-client/TopicListWatcher` just appends
add/remove), and the final subscription set may permanently deviate from the
real state.
Perhaps it needs to be changed to:
- Close watcher when the queue is full (forcing the client to rebuild and
resynchronize fully)?
- Or change the "queue" from storing delta runnable to "merged": only ensure
the final state (e.g., only keep one task that "requires recalculation and send
diff"), to avoid infinite accumulation of delta?
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java:
##########
@@ -218,31 +278,103 @@ public void handleWatchTopicList(NamespaceName
namespaceName, long watcherId, lo
});
}
+ private void sendTopicListSuccessWithPermitAcquiringRetries(long
watcherId, long requestId,
+
Collection<String> topicList,
+ String hash,
+ Runnable
successfulCompletionCallback,
+ Runnable
failedCompletionCallback) {
+ performOperationWithPermitAcquiringRetries(watcherId, "topic list
success", permitAcquireErrorHandler ->
+ () -> connection.getCommandSender()
+ .sendWatchTopicListSuccess(requestId, watcherId, hash,
topicList, permitAcquireErrorHandler)
+ .whenComplete((__, t) -> {
+ if (t != null) {
+ // this is an unexpected case
+ log.warn("[{}] Failed to send topic list
success for watcherId={}. "
+ + "Watcher is not active.",
connection, watcherId, t);
+ failedCompletionCallback.run();
+ } else {
+ // completed successfully, run the callback
+ successfulCompletionCallback.run();
+ }
+ }));
+ }
+
/***
* @param topicsPattern The regexp for the topic name(not contains
partition suffix).
*/
public void
initializeTopicsListWatcher(CompletableFuture<TopicListWatcher> watcherFuture,
NamespaceName namespace, long watcherId, TopicsPattern
topicsPattern) {
- namespaceService.getListOfPersistentTopics(namespace).
- thenApply(topics -> {
- TopicListWatcher watcher = new TopicListWatcher(this,
watcherId, topicsPattern, topics);
- topicResources.registerPersistentTopicListener(namespace,
watcher);
- return watcher;
- }).
- whenComplete((watcher, exception) -> {
- if (exception != null) {
- watcherFuture.completeExceptionally(exception);
- } else {
- if (!watcherFuture.complete(watcher)) {
- log.warn("[{}] Watcher future was already
completed. Deregistering watcherId={}.",
- connection.toString(), watcherId);
-
topicResources.deregisterPersistentTopicListener(watcher);
- }
- }
- });
+ BooleanSupplier isPermitRequestCancelled = () ->
!connection.isActive() || !watchers.containsKey(watcherId);
+ if (isPermitRequestCancelled.getAsBoolean()) {
+ return;
+ }
+ TopicListSizeResultCache.ResultHolder listSizeHolder =
pulsar.getBrokerService().getTopicListSizeResultCache()
+ .getTopicListSize(namespace.toString(),
CommandGetTopicsOfNamespace.Mode.PERSISTENT);
+ AsyncDualMemoryLimiter maxTopicListInFlightLimiter =
pulsar.getBrokerService().getMaxTopicListInFlightLimiter();
+
+ listSizeHolder.getSizeAsync().thenCompose(initialSize -> {
+ // use heap size limiter to avoid broker getting overwhelmed by a
lot of concurrent topic list requests
+ return maxTopicListInFlightLimiter.withAcquiredPermits(initialSize,
+ AsyncDualMemoryLimiter.LimitType.HEAP_MEMORY,
isPermitRequestCancelled, initialPermits -> {
+ AtomicReference<TopicListWatcher> watcherRef = new
AtomicReference<>();
+ return
namespaceService.getListOfPersistentTopics(namespace).thenCompose(topics -> {
+ long actualSize =
TopicListMemoryLimiter.estimateTopicListSize(topics);
+ listSizeHolder.updateSize(actualSize);
+ // register watcher immediately so that we don't
lose events
+ TopicListWatcher watcher =
+ new TopicListWatcher(this, watcherId,
topicsPattern, topics,
+ connection.ctx().executor());
+ watcherRef.set(watcher);
+
topicResources.registerPersistentTopicListener(namespace, watcher);
+ // use updated permits to slow down responses so
that backpressure gets applied
+ return
maxTopicListInFlightLimiter.withUpdatedPermits(initialPermits, actualSize,
+ isPermitRequestCancelled, updatedPermits
-> {
+ // reset retry backoff
+ retryBackoff.reset();
+ // just return the watcher which was
already created before
+ return
CompletableFuture.completedFuture(watcher);
+ }, CompletableFuture::failedFuture);
+ }).whenComplete((watcher, exception) -> {
+ if (exception != null) {
+ if (watcherRef.get() != null) {
+ watcher.close();
Review Comment:
Is there a potential NPE here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]