codelipenghui commented on code in PR #21423:
URL: https://github.com/apache/pulsar/pull/21423#discussion_r1437035119
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java:
##########
@@ -996,6 +1055,23 @@ public void addUnAckedMessages(int numberOfMessages) {
topic.getBrokerService().addUnAckedMessages(this, numberOfMessages);
}
+ @Override
+ public void afterAckMessages(Throwable exOfDeletion, Object ctxOfDeletion)
{
+ /**
+ * - Mark an acknowledgement were executed.
+ * - If there was no previous pause due to cursor data is too large to
persist, we don't need to manually
+ * trigger a new read. This can avoid too many CPU circles.
+ * - Clear the marker that represent delivery was paused at least once
in the earlier time.
+ */
+ blockSignatureOnCursorDataCanNotFullyPersist.markNewAcknowledged();
+ if (blockSignatureOnCursorDataCanNotFullyPersist.hasPausedAtLeastOnce()
+ && cursor.isCursorDataFullyPersistable()) {
+ // clear paused count, and trigger a new reading.
+ blockSignatureOnCursorDataCanNotFullyPersist.clearMarkerPaused();
+ readMoreEntriesAsync();
+ }
+ }
Review Comment:
I tried another approach to simplify the implementation.
AtomicBoolean pausedByCursorDataPersistence;
```java
public void afterAckMessages(Throwable exOfDeletion, Object ctxOfDeletion) {
if (pausedByCursorDataPersistence.get()) {
if (cursor.isCursorDataFullyPersistable()) {
if (pausedByCursorDataPersistence.compareAndSet(true, false)) {
readMoreEntriesAsync();
} else {
afterAckMessages(exOfDeletion, ctxOfDeletion)
}
}
} else {
if (!cursor.isCursorDataFullyPersistable()) {
if (pausedByCursorDataPersistence.compareAndSet(false,
true)) {
} else {
afterAckMessages(exOfDeletion, ctxOfDeletion)
}
}
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]