codelipenghui commented on code in PR #21423:
URL: https://github.com/apache/pulsar/pull/21423#discussion_r1432073401


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java:
##########
@@ -862,4 +862,8 @@ default void skipNonRecoverableLedger(long ledgerId){}
      * @return whether this cursor is closed.
      */
     boolean isClosed();
+
+    default boolean isMetadataTooLargeToPersist() {

Review Comment:
   I would like suggest to change to `isCursorDataFullyPersistable()`.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java:
##########
@@ -366,6 +377,30 @@ protected void readMoreEntries(Consumer consumer) {
         }
     }
 
+    private boolean shouldPauseOnAckStatePersist() {
+        if (!((PersistentTopic) 
subscription.getTopic()).isDispatcherPauseOnAckStatePersistentEnabled()) {
+            return false;
+        }
+        if (cursor == null) {
+            return true;
+        }
+        if (!cursor.isMetadataTooLargeToPersist()) {
+            return false;
+        }
+        // The cursor state is too large to persist, let us check whether the 
read is a replay read.
+        Range<PositionImpl> lastIndividualDeletedRange = 
cursor.getLastIndividualDeletedRange();
+        if (lastIndividualDeletedRange == null) {
+            // lastIndividualDeletedRange is null means the read is not replay 
read.
+            return true;
+        }
+        // If read position is less than the last acked position, it means the 
read is a replay read.
+        PositionImpl lastAckedPosition = 
lastIndividualDeletedRange.upperEndpoint();
+        Position readPosition = cursor.getReadPosition();
+        boolean readPositionIsSmall =
+                lastAckedPosition.compareTo(readPosition.getLedgerId(), 
readPosition.getEntryId()) > 0;
+        return !readPositionIsSmall;
+    }
+

Review Comment:
   Hmmm. For the single active consumer subscription. Practically, I think we 
don't need to consider the ack state persistence because it should be a misuse. 
Maybe we can skip the single active consumer subscription right now. And add it 
later if we really need it, based on the actual needs.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java:
##########
@@ -129,6 +129,13 @@ default boolean checkAndUnblockIfStuck() {
         return false;
     }
 
+    /**
+     * A callback hook after acknowledge messages.
+     * If acknowledge successfully, {@param position} will not be null, and 
{@param position} and {@param ctx} will be
+     * null.
+     * If acknowledge failed. {@param position} will be null, and {@param 
position} and {@param ctx} will not be null.
+     */
+    default void afterAckMessages(Object position, Throwable error, Object 
ctx){}

Review Comment:
   If it's inevitable. IMO, convert from the outside is better. From the API's 
perspective, we should have a more clear definition



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java:
##########
@@ -354,6 +369,19 @@ public synchronized void readMoreEntries() {
         }
     }
 
+    private boolean shouldPauseOnAckStatePersist(ReadType readType) {
+        if (readType != ReadType.Normal) {
+            return false;
+        }

Review Comment:
   I think the reason should be consumers can disconnect from the topic if we 
stop dispatching messages to the active consumer. Somehow, the dispatching will 
stop forever.
   
   @poorbarcode could you please add some comments here? And we should also add 
test for the consumer disconnection to make sure the remaining active consumers 
can ultimately consume all messages.
   
   Could you please add some comments here and we should also add test for the 
consumer disconnection to make sure the remaining active consumers can 
ultimately consume all messages



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java:
##########
@@ -129,6 +129,13 @@ default boolean checkAndUnblockIfStuck() {
         return false;
     }
 
+    /**
+     * A callback hook after acknowledge messages.
+     * If acknowledge successfully, {@param position} will not be null, and 
{@param position} and {@param ctx} will be
+     * null.
+     * If acknowledge failed. {@param position} will be null, and {@param 
position} and {@param ctx} will not be null.
+     */
+    default void afterAckMessages(Object position, Throwable error, Object 
ctx){}

Review Comment:
   Why the type of position should be defined as Object?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java:
##########
@@ -995,6 +1023,13 @@ public void addUnAckedMessages(int numberOfMessages) {
         topic.getBrokerService().addUnAckedMessages(this, numberOfMessages);
     }
 
+    @Override
+    public void afterAckMessages(Object position, Throwable error, Object ctx) 
{
+        if (!cursor.isMetadataTooLargeToPersist()) {

Review Comment:
   Will it waste too many CPU circles if `cursor.isMetadataTooLargeToPersist()` 
is always false? This method `readMoreEntriesAsync` will be executed by another 
thread and it will do lot of checks for each call.
   
   I think what we want to do is if the dispatcher is paused by the ack state 
persistence before, but now the ack state persistence is good. We should resume 
the message dispatching. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to