codelipenghui commented on code in PR #21423:
URL: https://github.com/apache/pulsar/pull/21423#discussion_r1437035119


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java:
##########
@@ -996,6 +1055,23 @@ public void addUnAckedMessages(int numberOfMessages) {
         topic.getBrokerService().addUnAckedMessages(this, numberOfMessages);
     }
 
+    @Override
+    public void afterAckMessages(Throwable exOfDeletion, Object ctxOfDeletion) 
{
+        /**
+         * - Mark an acknowledgement were executed.
+         * - If there was no previous pause due to cursor data is too large to 
persist, we don't need to manually
+         *   trigger a new read. This can avoid too many CPU circles.
+         * - Clear the marker that represent delivery was paused at least once 
in the earlier time.
+         */
+        blockSignatureOnCursorDataCanNotFullyPersist.markNewAcknowledged();
+        if (blockSignatureOnCursorDataCanNotFullyPersist.hasPausedAtLeastOnce()
+                && cursor.isCursorDataFullyPersistable()) {
+            // clear paused count, and trigger a new reading.
+            blockSignatureOnCursorDataCanNotFullyPersist.clearMarkerPaused();
+            readMoreEntriesAsync();
+        }
+    }

Review Comment:
   I tried another approach to simplify the implementation.
   
   AtomicBoolean pausedByCursorDataPersistence;
   
   ```java
   public void afterAckMessages(Throwable exOfDeletion, Object ctxOfDeletion) {
           if (pausedByCursorDataPersistence.get()) {
            if (cursor.isCursorDataFullyPersistable()) {
                if (pausedByCursorDataPersistence.compareAndSet(true, false)) {
                        readMoreEntriesAsync();
                } else {
                    afterAckMessages(exOfDeletion, ctxOfDeletion)
                }
            }
        } else {
                if (!cursor.isCursorDataFullyPersistable()) {
                    if (pausedByCursorDataPersistence.compareAndSet(false, 
true)) {
   
                    } else {
                        afterAckMessages(exOfDeletion, ctxOfDeletion)
                    }
                }
        }
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to