congbobo184 commented on code in PR #14716:
URL: https://github.com/apache/pulsar/pull/14716#discussion_r913966984
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java:
##########
@@ -352,15 +357,17 @@ protected void readMoreEntries(Consumer consumer) {
if (log.isDebugEnabled()) {
log.debug("[{}-{}] Schedule read of {} messages", name,
consumer, messagesToRead);
}
- havePendingRead = true;
- if (consumer.readCompacted()) {
- topic.getCompactedTopic().asyncReadEntriesOrWait(cursor,
messagesToRead, isFirstRead,
- this, consumer);
- } else {
- ReadEntriesCtx readEntriesCtx =
- ReadEntriesCtx.create(consumer,
consumer.getConsumerEpoch());
- cursor.asyncReadEntriesOrWait(messagesToRead,
- bytesToRead, this, readEntriesCtx,
topic.getMaxReadPosition());
+ synchronized (this) {
+ havePendingRead = true;
+ if (consumer.readCompacted()) {
+ topic.getCompactedTopic().asyncReadEntriesOrWait(cursor,
messagesToRead, isFirstRead,
+ this, consumer);
+ } else {
+ ReadEntriesCtx readEntriesCtx =
+ ReadEntriesCtx.create(consumer,
consumer.getConsumerEpoch());
+ cursor.asyncReadEntriesOrWait(messagesToRead,
+ bytesToRead, this, readEntriesCtx,
topic.getMaxReadPosition());
+ }
Review Comment:
The first this PR only fix the new consumer epoch in `OpReadEntry` and
OpReadEntry use old readPosition to read `ManagedLedger`.
steps to reproduce:
1. one thread invoke this method
https://github.com/apache/pulsar/blob/48090f36e3c37630d39f67504cf77656bd386b9b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java#L296
and then change the cosnumerEpoch from 0 -> 1
this method hasn't rewind the cursor
https://github.com/apache/pulsar/blob/48090f36e3c37630d39f67504cf77656bd386b9b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java#L319
2. one thread invoke readMoreEntry
https://github.com/apache/pulsar/blob/48090f36e3c37630d39f67504cf77656bd386b9b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java#L361-L364
now the cursor readPostion hasn't been changed but consumerEpoch has
changed. this readOp read the messages, the client will not filter. If you want
to redeliver 1-10, but send you the messages is from 10-15 and cumulative ack
then client broken down 1-10 messages will loss.
The second is because https://github.com/apache/pulsar/pull/15046 changed
the code.
```
if (!havePendingRead) {
cursor.rewind();
if (log.isDebugEnabled()) {
log.debug("[{}-{}] Cursor rewinded, redelivering
unacknowledged messages. ", name, consumer);
}
readMoreEntries(consumer);
} else {
log.info("[{}-{}] Ignoring reDeliverUnAcknowledgedMessages:
cancelPendingRequest on cursor failed", name,
consumer);
}
```
to
>
https://github.com/apache/pulsar/blob/48090f36e3c37630d39f67504cf77656bd386b9b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java#L317-L323
so`havePendingRead` also doesn't need to add the synchronized block. any
other code also doesn't need to move into the synchronized block.
```
ManagedCursorImpl#asyncReadEntriesOrWait is not a synchronized method but
it's thread safe, otherwise there is no need to use an atomic reference
(WAITING_READ_OP_UPDATER) in it.
```
asyncReadEntriesOrWait is thread-safe but it can't solve
https://github.com/apache/pulsar/blob/48090f36e3c37630d39f67504cf77656bd386b9b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java#L48
use old readPostion
and
https://github.com/apache/pulsar/blob/48090f36e3c37630d39f67504cf77656bd386b9b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java#L617
use new consumer epoch.
The third, because this is the critical path of reading, we try to reduce
the code in the lock as much as possible, other code doesn't need to add to the
synchronized block
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]