BewareMyPower commented on code in PR #14716:
URL: https://github.com/apache/pulsar/pull/14716#discussion_r913831659


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java:
##########
@@ -352,15 +357,17 @@ protected void readMoreEntries(Consumer consumer) {
             if (log.isDebugEnabled()) {
                 log.debug("[{}-{}] Schedule read of {} messages", name, 
consumer, messagesToRead);
             }
-            havePendingRead = true;
-            if (consumer.readCompacted()) {
-                topic.getCompactedTopic().asyncReadEntriesOrWait(cursor, 
messagesToRead, isFirstRead,
-                        this, consumer);
-            } else {
-                ReadEntriesCtx readEntriesCtx =
-                        ReadEntriesCtx.create(consumer, 
consumer.getConsumerEpoch());
-                cursor.asyncReadEntriesOrWait(messagesToRead,
-                        bytesToRead, this, readEntriesCtx, 
topic.getMaxReadPosition());
+            synchronized (this) {
+                havePendingRead = true;
+                if (consumer.readCompacted()) {
+                    topic.getCompactedTopic().asyncReadEntriesOrWait(cursor, 
messagesToRead, isFirstRead,
+                            this, consumer);
+                } else {
+                    ReadEntriesCtx readEntriesCtx =
+                            ReadEntriesCtx.create(consumer, 
consumer.getConsumerEpoch());
+                    cursor.asyncReadEntriesOrWait(messagesToRead,
+                            bytesToRead, this, readEntriesCtx, 
topic.getMaxReadPosition());
+                }

Review Comment:
   > it may use the new epoch and use the old readPostion
   
   I don't get what the `old readPosition` represents.
   
   I mean, why not
   
   ```java
       protected synchronized void readMoreEntries(Consumer consumer) {
           /* ... */
       }
   ```
   
   or
   
   ```java
       protected void readMoreEntries(Consumer consumer) {
           // consumer can be null when all consumers are disconnected from 
broker.
           // so skip reading more entries if currently there is no active 
consumer.
           if (null == consumer) {
               return;
           }
   
           synchronized (this) {
               if (consumer.getAvailablePermits() > 0) {
                   Pair<Integer, Long> calculateResult = 
calculateToRead(consumer);
                   /* ... */
               }
           }
   ```
   
   or synchronize other code block, but just the code block I pointed out 
before:
   
   ```java
               havePendingRead = true;
               if (consumer.readCompacted()) {
                   topic.getCompactedTopic().asyncReadEntriesOrWait(cursor, 
messagesToRead, isFirstRead,
                           this, consumer);
               } else {
                   ReadEntriesCtx readEntriesCtx =
                           ReadEntriesCtx.create(consumer, 
consumer.getConsumerEpoch());
                   cursor.asyncReadEntriesOrWait(messagesToRead,
                           bytesToRead, this, readEntriesCtx, 
topic.getMaxReadPosition());
               }
   ```
   
   **Please point out which variables need synchronizing**. As far as I see:
   - `messagesToRead` and `bytesToRead` are computed by `calculateToRead`, 
which is not protected by a synchronized block.
   - `consumer`, `readEntriesCtx` are related to the `consumer` argument of 
`readMoreEntries`, and I think these code doesn't change any state of the 
consumer.
   - `CompactTopicImpl#asyncReadEntriesOrWait` is a synchronized method.
   -  `ManagedCursorImpl#asyncReadEntriesOrWait` is not a synchronized method 
but it's thread safe, otherwise there is no need to use an atomic reference 
(`WAITING_READ_OP_UPDATER`) in it.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to