lhotari commented on code in PR #25188:
URL: https://github.com/apache/pulsar/pull/25188#discussion_r2737381061


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternConsumerUpdateQueue.java:
##########
@@ -173,12 +231,31 @@ synchronized void triggerNextTask() {
                 });
                 break;
             }
-            case TOPICS_ADDED: {
-                newTaskFuture = 
topicsChangeListener.onTopicsAdded(task.getRight());
-                break;
-            }
-            case TOPICS_REMOVED: {
-                newTaskFuture = 
topicsChangeListener.onTopicsRemoved(task.getRight());
+            case TOPICS_CHANGED: {
+                TopicsAddedOrRemovedTask topicsAddedOrRemovedTask = 
(TopicsAddedOrRemovedTask) task;
+                newTaskFuture = 
topicsChangeListener.onTopicsRemoved(topicsAddedOrRemovedTask.removedTopics)
+                        .thenCompose(__ ->
+                                
topicsChangeListener.onTopicsAdded(topicsAddedOrRemovedTask.addedTopics))
+                        .thenRun(() -> {
+                            if 
(!patternConsumer.supportsTopicListWatcherReconcile()) {
+                                // ignore topics hash unless topic list 
watcher reconcile is supported since
+                                // the broker side state might be out of sync 
and could cause unnecessary
+                                // reconciliation.
+                                // reconciliation will happen later when the 
client requests the topic listing
+                                // after the next patternAutoDiscoveryPeriod 
interval
+                                // Broker versions that support topic list 
watcher reconcile will also update the
+                                // broker side state when reconciliation is 
requested.
+                                return;
+                            }
+                            String localHash = 
patternConsumer.getLocalStateTopicsHash();
+                            String brokerHash = 
topicsAddedOrRemovedTask.topicsHash;
+                            if (brokerHash != null && brokerHash.length() > 0 
&& !brokerHash.equals(localHash)) {
+                                log.info("[{}][{}] Hash mismatch detected 
(local: {}, broker: {}). Triggering "
+                                                + "reconciliation.", 
patternConsumer.getPattern().inputPattern(),
+                                        patternConsumer.getSubscription(), 
localHash, brokerHash);
+                                appendRecheckOp();

Review Comment:
   this is already covered in doAppend implementation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to