poorbarcode commented on code in PR #21423:
URL: https://github.com/apache/pulsar/pull/21423#discussion_r1432914681


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java:
##########
@@ -123,6 +123,13 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
     protected final ExecutorService dispatchMessagesThread;
     private final SharedConsumerAssignor assignor;
 
+    /**
+     * Delivery was paused at least once in the earlier time, due to the 
cursor data can not fully persist.
+     * Note: do not use this field to confirm whether the delivery should be 
paused,
+     *       please call {@link #shouldPauseOnAckStatePersist}.
+     */
+    private volatile boolean pausedDueToCursorDataCanNotFullyPersist = false;

Review Comment:
   @Technoboy- 
   
   > And also I think the pausedDueToCursorDataCanNotFullyPersist should be 
named something like BLOCKED_DISPATCHER_ON_CURSOR_DATA_NOT_FULLY_PERSIST
   because we have defined BLOCKED_DISPATCHER_ON_UNACKMSG
   
   Sure, renamed `pausedDueToCursorDataCanNotFullyPersist` to 
`blockedDispatcherOnCursorDataCanNotFullyPersist`
   
   @codelipenghui 
   
   > But they are different threads? Even if we have synchronized for the 
readMoreEntries() method. But will the afterAckMessages() method not acquire 
the same lock?
   
   This variable is just used to confirm whether it should trigger a new 
reading after any acknowledgment. So we only care about whether Pulsar will 
skip a new reading in mistake.
   
   Firstly, we only expected the new reading to be triggered after the 
dispatching has been paused at least once. There are two scenarios with race 
conditions:
   
   **Scenario-1**
   | No. | `ack` | reading messages |
   | --- | --- | --- |
   | 0. | `blockedDispatcherOnCursorDataCanNotFullyPersist` is `false` now. |
   | 1. | | There are too many ack holes, so start to pause dispatching  |
   | 2. | Acked all messages | 
   | 3. | Since `blockedDispatcherOnCursorDataCanNotFullyPersist == false`, 
skip to trigger a new reading |
   | 4. | | Set `blockedDispatcherOnCursorDataCanNotFullyPersist` to `true` and 
discard the reading task. |
   | 5. | No longer to trigger a new reading |
   
   For this scenario, adding a lock `synchronized` on the method 
`afterAckMessages` can solve the issue.
   
   **Scenario-2**
   | No. | `ack` | reading messages |
   | --- | --- | --- |
   | 0. | `blockedDispatcherOnCursorDataCanNotFullyPersist` is `true` now. |
   | 1. | | There are too many ack holes, so start to pause dispatching  |
   | 2. | Acked half of all messages | 
   | 3. | Since `blockedDispatcherOnCursorDataCanNotFullyPersist == true`, 
trigger a new reading |
   | 4. | | Set `blockedDispatcherOnCursorDataCanNotFullyPersist` to `true` and 
discard the reading task. |
   | 5. | Set `blockedDispatcherOnCursorDataCanNotFullyPersist` to `false` |
   | 6. | Trigger a new reading |
   | 7. | Acked another half of all messages | 
   | 8. | Since `blockedDispatcherOnCursorDataCanNotFullyPersist == false`, 
skip to trigger a new reading |
   
   For this scenario, `step 6` will trigger one of these two events: 
   - An exact reading, then trigger a new `ack`
   - Did not trigger an exact reading, so the variable 
`blockedDispatcherOnCursorDataCanNotFullyPersist` will be set to `false.`
   
   So, this race condition can be ignored.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to