poorbarcode commented on code in PR #21423:
URL: https://github.com/apache/pulsar/pull/21423#discussion_r1432914681
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java:
##########
@@ -123,6 +123,13 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractDispatcherMul
protected final ExecutorService dispatchMessagesThread;
private final SharedConsumerAssignor assignor;
+ /**
+ * Delivery was paused at least once in the earlier time, due to the
cursor data can not fully persist.
+ * Note: do not use this field to confirm whether the delivery should be
paused,
+ * please call {@link #shouldPauseOnAckStatePersist}.
+ */
+ private volatile boolean pausedDueToCursorDataCanNotFullyPersist = false;
Review Comment:
@Technoboy-
> And also I think the pausedDueToCursorDataCanNotFullyPersist should be
named something like BLOCKED_DISPATCHER_ON_CURSOR_DATA_NOT_FULLY_PERSIST
because we have defined BLOCKED_DISPATCHER_ON_UNACKMSG
Sure, renamed `pausedDueToCursorDataCanNotFullyPersist` to
`blockedDispatcherOnCursorDataCanNotFullyPersist`
@codelipenghui
> But they are different threads? Even if we have synchronized for the
readMoreEntries() method. But will the afterAckMessages() method not acquire
the same lock?
This variable is just used to confirm whether it should trigger a new
reading after any acknowledgment. So we only care about whether Pulsar will
skip a new reading in mistake.
Firstly, we only expected the new reading to be triggered after the
dispatching has been paused at least once. There are two scenarios with race
conditions:
**Scenario-1**
| No. | `ack` | reading messages |
| --- | --- | --- |
| 0. | `blockedDispatcherOnCursorDataCanNotFullyPersist` is `false` now. |
| 1. | | There are too many ack holes, so start to pause dispatching |
| 2. | Acked all messages |
| 3. | Since `blockedDispatcherOnCursorDataCanNotFullyPersist == false`,
skip to trigger a new reading |
| 4. | | Set `blockedDispatcherOnCursorDataCanNotFullyPersist` to `true` and
discard the reading task. |
**Scenario-2**
| No. | `ack` | reading messages |
| --- | --- | --- |
| 0. | `blockedDispatcherOnCursorDataCanNotFullyPersist` is `true` now. |
| 1. | | There are too many ack holes, so start to pause dispatching |
| 2. | Acked half of all messages |
| 3. | Since `blockedDispatcherOnCursorDataCanNotFullyPersist == true`,
trigger a new reading |
| 4. | | Set `blockedDispatcherOnCursorDataCanNotFullyPersist` to `true` and
discard the reading task. |
| 5. | Set `blockedDispatcherOnCursorDataCanNotFullyPersist` to `false` |
| 6. | Acked another half of all messages |
| 7. | Since `blockedDispatcherOnCursorDataCanNotFullyPersist == false`,
skip to trigger a new reading |
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]