BewareMyPower commented on code in PR #14716:
URL: https://github.com/apache/pulsar/pull/14716#discussion_r913413162


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java:
##########
@@ -352,15 +357,17 @@ protected void readMoreEntries(Consumer consumer) {
             if (log.isDebugEnabled()) {
                 log.debug("[{}-{}] Schedule read of {} messages", name, 
consumer, messagesToRead);
             }
-            havePendingRead = true;
-            if (consumer.readCompacted()) {
-                topic.getCompactedTopic().asyncReadEntriesOrWait(cursor, 
messagesToRead, isFirstRead,
-                        this, consumer);
-            } else {
-                ReadEntriesCtx readEntriesCtx =
-                        ReadEntriesCtx.create(consumer, 
consumer.getConsumerEpoch());
-                cursor.asyncReadEntriesOrWait(messagesToRead,
-                        bytesToRead, this, readEntriesCtx, 
topic.getMaxReadPosition());
+            synchronized (this) {
+                havePendingRead = true;
+                if (consumer.readCompacted()) {
+                    topic.getCompactedTopic().asyncReadEntriesOrWait(cursor, 
messagesToRead, isFirstRead,
+                            this, consumer);
+                } else {
+                    ReadEntriesCtx readEntriesCtx =
+                            ReadEntriesCtx.create(consumer, 
consumer.getConsumerEpoch());
+                    cursor.asyncReadEntriesOrWait(messagesToRead,
+                            bytesToRead, this, readEntriesCtx, 
topic.getMaxReadPosition());
+                }

Review Comment:
   Could you explain why did you add the `synchronized` block here? It's 
redundant because the only modification on this object is `havePendingRead` but 
it's volatile so the write operation is thread safe. While the following logic 
doesn't rely on the value of `havePendingRead` and doesn't modify any state of 
`PersistentDispatcherSingleActiveConsumer` itself.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to