congbobo184 commented on code in PR #14716:
URL: https://github.com/apache/pulsar/pull/14716#discussion_r913966984


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java:
##########
@@ -352,15 +357,17 @@ protected void readMoreEntries(Consumer consumer) {
             if (log.isDebugEnabled()) {
                 log.debug("[{}-{}] Schedule read of {} messages", name, 
consumer, messagesToRead);
             }
-            havePendingRead = true;
-            if (consumer.readCompacted()) {
-                topic.getCompactedTopic().asyncReadEntriesOrWait(cursor, 
messagesToRead, isFirstRead,
-                        this, consumer);
-            } else {
-                ReadEntriesCtx readEntriesCtx =
-                        ReadEntriesCtx.create(consumer, 
consumer.getConsumerEpoch());
-                cursor.asyncReadEntriesOrWait(messagesToRead,
-                        bytesToRead, this, readEntriesCtx, 
topic.getMaxReadPosition());
+            synchronized (this) {
+                havePendingRead = true;
+                if (consumer.readCompacted()) {
+                    topic.getCompactedTopic().asyncReadEntriesOrWait(cursor, 
messagesToRead, isFirstRead,
+                            this, consumer);
+                } else {
+                    ReadEntriesCtx readEntriesCtx =
+                            ReadEntriesCtx.create(consumer, 
consumer.getConsumerEpoch());
+                    cursor.asyncReadEntriesOrWait(messagesToRead,
+                            bytesToRead, this, readEntriesCtx, 
topic.getMaxReadPosition());
+                }

Review Comment:
   The first this PR only fix the new consumer epoch in `OpReadEntry` and 
OpReadEntry use old readPosition to read `ManagedLedger`.
   
   steps to reproduce:
   1. one thread invoke this method
   
https://github.com/apache/pulsar/blob/48090f36e3c37630d39f67504cf77656bd386b9b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java#L296
   and then change the cosnumerEpoch from 0 -> 1
   this method hasn't rewind the cursor 
   
https://github.com/apache/pulsar/blob/48090f36e3c37630d39f67504cf77656bd386b9b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java#L319
   2. one thread invoke readMoreEntry
   
https://github.com/apache/pulsar/blob/48090f36e3c37630d39f67504cf77656bd386b9b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java#L361-L364
   now the cursor readPostion hasn't been changed but consumerEpoch has 
changed. this readOp read the messages, the client will not filter. If you want 
to redeliver 1-10, but send you the messages is from 10-15 and cumulative ack 
then client broken down 1-10 messages will loss.
   
   The second is because https://github.com/apache/pulsar/pull/15046 changed 
the code. 
   
   ```
           if (!havePendingRead) {
               cursor.rewind();
               if (log.isDebugEnabled()) {
                   log.debug("[{}-{}] Cursor rewinded, redelivering 
unacknowledged messages. ", name, consumer);
               }
               readMoreEntries(consumer);
           } else {
               log.info("[{}-{}] Ignoring reDeliverUnAcknowledgedMessages: 
cancelPendingRequest on cursor failed", name,
                       consumer);
           }
   ```
   to
   
   > 
https://github.com/apache/pulsar/blob/48090f36e3c37630d39f67504cf77656bd386b9b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java#L317-L323
   
   so`havePendingRead` also doesn't need to add the synchronized block. any 
other code also doesn't need to move into the synchronized block.
   
   ```
   ManagedCursorImpl#asyncReadEntriesOrWait is not a synchronized method but 
it's thread safe, otherwise there is no need to use an atomic reference 
(WAITING_READ_OP_UPDATER) in it.
   ```
   asyncReadEntriesOrWait is thread-safe but it can't solve 
   
https://github.com/apache/pulsar/blob/48090f36e3c37630d39f67504cf77656bd386b9b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java#L48
 use old readPostion
   
   and 
   
https://github.com/apache/pulsar/blob/48090f36e3c37630d39f67504cf77656bd386b9b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java#L617
 use new consumer epoch.
   
   The third, because this is the critical path of reading, we try to reduce 
the code in the lock as much as possible, other code doesn't need to add to the 
synchronized block
   
   
   
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to