codelipenghui commented on code in PR #20951:
URL: https://github.com/apache/pulsar/pull/20951#discussion_r1287050668
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java:
##########
@@ -103,6 +105,7 @@ public void handleExceededBacklogQuota(PersistentTopic
persistentTopic, BacklogQ
break;
case producer_exception:
case producer_request_hold:
+ advanceSlowestMessageDeduplicationCursor(persistentTopic,
backlogQuotaType);
Review Comment:
```suggestion
advanceSlowestSystemCursor(persistentTopic, backlogQuotaType);
```
Because we also need to fix other system cursors, such as topic compaction.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java:
##########
@@ -268,4 +271,29 @@ private void disconnectProducers(PersistentTopic
persistentTopic) {
});
}
+
+ private void advanceSlowestMessageDeduplicationCursor(PersistentTopic
persistentTopic,
Review Comment:
It's better to return a boolean to represent if the broker continues to
disconnect producers. If the slowest cursor is used by the deduplication, we
should skip the producer disconnection.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java:
##########
@@ -268,4 +271,29 @@ private void disconnectProducers(PersistentTopic
persistentTopic) {
});
}
+
+ private void advanceSlowestMessageDeduplicationCursor(PersistentTopic
persistentTopic,
+ BacklogQuotaType
backlogQuotaType) {
+
+ if (backlogQuotaType != destination_storage) {
+ return;
+ }
+
+ MessageDeduplication dedup = persistentTopic.getMessageDeduplication();
+ if (dedup == null) {
+ return;
+ }
+
+ ManagedLedgerImpl mLedger = (ManagedLedgerImpl)
persistentTopic.getManagedLedger();
+ ManagedCursor slowestConsumer = mLedger.getSlowestConsumer();
+ if (slowestConsumer == null) {
+ return;
+ }
+
+ if (!PersistentTopic.isDedupCursorName(slowestConsumer.getName())) {
+ return;
+ }
+
+ dedup.takeSnapshot();
Review Comment:
And it's better to have test to cover this case.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java:
##########
@@ -268,4 +271,29 @@ private void disconnectProducers(PersistentTopic
persistentTopic) {
});
}
+
+ private void advanceSlowestMessageDeduplicationCursor(PersistentTopic
persistentTopic,
+ BacklogQuotaType
backlogQuotaType) {
+
+ if (backlogQuotaType != destination_storage) {
+ return;
+ }
+
+ MessageDeduplication dedup = persistentTopic.getMessageDeduplication();
+ if (dedup == null) {
+ return;
+ }
+
+ ManagedLedgerImpl mLedger = (ManagedLedgerImpl)
persistentTopic.getManagedLedger();
+ ManagedCursor slowestConsumer = mLedger.getSlowestConsumer();
+ if (slowestConsumer == null) {
+ return;
+ }
+
+ if (!PersistentTopic.isDedupCursorName(slowestConsumer.getName())) {
+ return;
+ }
+
+ dedup.takeSnapshot();
Review Comment:
We also have an interval check at
[here](https://github.com/apache/pulsar/pull/20951/files#diff-32e651dda4c98296475fd4e3cd322c76d6da3618c1013eda21c6b9033dec1bc1R502-R505)
As I understand, you want to take a snapshot forcefully, no matter the value
of the interval. Otherwise, the dedup cursor still can't move forward.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]