Denovo1998 commented on code in PR #25070:
URL: https://github.com/apache/pulsar/pull/25070#discussion_r2679211085


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java:
##########
@@ -287,9 +422,70 @@ public void deleteTopicListWatcher(Long watcherId) {
      * @param newTopics topics names added(contains the partition suffix).
      */
     public void sendTopicListUpdate(long watcherId, String topicsHash, 
List<String> deletedTopics,
-                                    List<String> newTopics) {
-        connection.getCommandSender().sendWatchTopicListUpdate(watcherId, 
newTopics, deletedTopics, topicsHash);
+                                    List<String> newTopics, Runnable 
completionCallback) {
+        performOperationWithPermitAcquiringRetries(watcherId, "topic list 
update", permitAcquireErrorHandler ->
+                () -> connection.getCommandSender()
+                        .sendWatchTopicListUpdate(watcherId, newTopics, 
deletedTopics, topicsHash,
+                                permitAcquireErrorHandler)
+                        .whenComplete((__, t) -> {
+                            if (t != null) {
+                                // this is an unexpected case
+                                log.warn("[{}] Failed to send topic list 
update for watcherId={}. Watcher will be in "
+                                        + "inconsistent state.", connection, 
watcherId, t);
+                            }
+                            completionCallback.run();

Review Comment:
   Even if update A fails due to "acquire timeout / queue full" and a retry has 
been scheduled, the completionCallback will still trigger 
TopicListWatcher.sendingCompleted(), and the queue will continue to execute the 
subsequent update B.
   
   Assume there are two updates on the same watcher (in the order of event 
occurrence): A then B.
   
     - t0:update A (Create topic2) triggers, enters the sending chain, and 
starts attempting to acquire direct permits.
     - t1:Direct permits not enough, A acquire failed:
         - acquireDirectMemoryPermitsAndWriteAndFlush calls 
permitAcquireErrorHandler.accept(failure) → Schedule "retry A later"
         - At the same time, the future returned by sendWatchTopicListUpdate 
also completes exceptionally → sendTopicListUpdate(...).whenComplete is called
         - whenComplete still calls completionCallback.run() → 
sendingCompleted() is triggered → the queue releases the next item
     - t2:Update B (Delete topic2) starts executing. Maybe this time the 
permits are sufficient, so B successfully writes to the socket first.
     - t3:t1's scheduled "Retry A" has expired, attempting to acquire again. 
This time it succeeds, so A is then written back to the socket.
   
   **Result: The sequence in which the client receives the messages becomes B 
(Delete) first, followed by A (Create). As a result, the client may incorrectly 
"retain topic2".**
   
   ---
   
   For each watcher, perhaps we need a strict "send chain":
   
     - Head task (success or some update) is only allowed to advance 
sendingCompleted() to the next one after successfully writing (at least the 
write is committed)?
     - If the head of the queue enters retry due to permit acquire failure, 
then it is still the head of the queue: subsequent updates must not skip over 
it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to