codelipenghui commented on code in PR #20951:
URL: https://github.com/apache/pulsar/pull/20951#discussion_r1287050668


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java:
##########
@@ -103,6 +105,7 @@ public void handleExceededBacklogQuota(PersistentTopic 
persistentTopic, BacklogQ
             break;
         case producer_exception:
         case producer_request_hold:
+            advanceSlowestMessageDeduplicationCursor(persistentTopic, 
backlogQuotaType);

Review Comment:
   ```suggestion
               advanceSlowestSystemCursor(persistentTopic, backlogQuotaType);
   ```
   Because we also need to fix other system cursors, such as topic compaction.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java:
##########
@@ -268,4 +271,29 @@ private void disconnectProducers(PersistentTopic 
persistentTopic) {
 
         });
     }
+
+    private void advanceSlowestMessageDeduplicationCursor(PersistentTopic 
persistentTopic,

Review Comment:
   It's better to return a boolean to represent if the broker continues to 
disconnect producers. If the slowest cursor is used by the deduplication, we 
should skip the producer disconnection.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java:
##########
@@ -268,4 +271,29 @@ private void disconnectProducers(PersistentTopic 
persistentTopic) {
 
         });
     }
+
+    private void advanceSlowestMessageDeduplicationCursor(PersistentTopic 
persistentTopic,
+                                                          BacklogQuotaType 
backlogQuotaType) {
+
+        if (backlogQuotaType != destination_storage) {
+            return;
+        }
+
+        MessageDeduplication dedup = persistentTopic.getMessageDeduplication();
+        if (dedup == null) {
+            return;
+        }
+
+        ManagedLedgerImpl mLedger = (ManagedLedgerImpl) 
persistentTopic.getManagedLedger();
+        ManagedCursor slowestConsumer = mLedger.getSlowestConsumer();
+        if (slowestConsumer == null) {
+            return;
+        }
+
+        if (!PersistentTopic.isDedupCursorName(slowestConsumer.getName())) {
+            return;
+        }
+
+        dedup.takeSnapshot();

Review Comment:
   And it's better to have test to cover this case.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java:
##########
@@ -268,4 +271,29 @@ private void disconnectProducers(PersistentTopic 
persistentTopic) {
 
         });
     }
+
+    private void advanceSlowestMessageDeduplicationCursor(PersistentTopic 
persistentTopic,
+                                                          BacklogQuotaType 
backlogQuotaType) {
+
+        if (backlogQuotaType != destination_storage) {
+            return;
+        }
+
+        MessageDeduplication dedup = persistentTopic.getMessageDeduplication();
+        if (dedup == null) {
+            return;
+        }
+
+        ManagedLedgerImpl mLedger = (ManagedLedgerImpl) 
persistentTopic.getManagedLedger();
+        ManagedCursor slowestConsumer = mLedger.getSlowestConsumer();
+        if (slowestConsumer == null) {
+            return;
+        }
+
+        if (!PersistentTopic.isDedupCursorName(slowestConsumer.getName())) {
+            return;
+        }
+
+        dedup.takeSnapshot();

Review Comment:
   We also have an interval check at 
[here](https://github.com/apache/pulsar/pull/20951/files#diff-32e651dda4c98296475fd4e3cd322c76d6da3618c1013eda21c6b9033dec1bc1R502-R505)
   
   As I understand, you want to take a snapshot forcefully, no matter the value 
of the interval. Otherwise, the dedup cursor still can't move forward.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to