This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 35ea4712a60 [fix][broker] Fix wrong double-checked locking for 
readOnActiveConsumerTask in dispatcher (#22279)
35ea4712a60 is described below

commit 35ea4712a60992deeb93171c8b0e6d48c84e530a
Author: Yunze Xu <[email protected]>
AuthorDate: Sat Mar 16 14:56:34 2024 +0800

    [fix][broker] Fix wrong double-checked locking for readOnActiveConsumerTask 
in dispatcher (#22279)
---
 .../PersistentDispatcherSingleActiveConsumer.java  | 26 +++++++++++++---------
 1 file changed, 16 insertions(+), 10 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 387ba83d9cd..7494d11421a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -74,6 +74,7 @@ public class PersistentDispatcherSingleActiveConsumer extends 
AbstractDispatcher
     protected volatile int readBatchSize;
     protected final Backoff readFailureBackoff;
     private volatile ScheduledFuture<?> readOnActiveConsumerTask = null;
+    private final Object lockForReadOnActiveConsumerTask = new Object();
 
     private final RedeliveryTracker redeliveryTracker;
 
@@ -123,18 +124,23 @@ public class PersistentDispatcherSingleActiveConsumer 
extends AbstractDispatcher
             return;
         }
 
-        readOnActiveConsumerTask = 
topic.getBrokerService().executor().schedule(() -> {
-            if (log.isDebugEnabled()) {
-                log.debug("[{}] Rewind cursor and read more entries after {} 
ms delay", name,
-                        
serviceConfig.getActiveConsumerFailoverDelayTimeMillis());
+        synchronized (lockForReadOnActiveConsumerTask) {
+            if (readOnActiveConsumerTask != null) {
+                return;
             }
-            Consumer activeConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
-            cursor.rewind(activeConsumer != null && 
activeConsumer.readCompacted());
+            readOnActiveConsumerTask = 
topic.getBrokerService().executor().schedule(() -> {
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] Rewind cursor and read more entries after 
{} ms delay", name,
+                            
serviceConfig.getActiveConsumerFailoverDelayTimeMillis());
+                }
+                Consumer activeConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
+                cursor.rewind(activeConsumer != null && 
activeConsumer.readCompacted());
 
-            notifyActiveConsumerChanged(activeConsumer);
-            readMoreEntries(activeConsumer);
-            readOnActiveConsumerTask = null;
-        }, serviceConfig.getActiveConsumerFailoverDelayTimeMillis(), 
TimeUnit.MILLISECONDS);
+                notifyActiveConsumerChanged(activeConsumer);
+                readMoreEntries(activeConsumer);
+                readOnActiveConsumerTask = null;
+            }, serviceConfig.getActiveConsumerFailoverDelayTimeMillis(), 
TimeUnit.MILLISECONDS);
+        }
     }
 
     @Override

Reply via email to