This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit e2070a87d556816a831a60c8056fb6cbddfb61c1
Author: Yunze Xu <[email protected]>
AuthorDate: Sat Mar 16 14:56:34 2024 +0800

    [fix][broker] Fix wrong double-checked locking for readOnActiveConsumerTask 
in dispatcher (#22279)
    
    (cherry picked from commit 4e0c145c89a35ec9b41fa22862edac59e28d892d)
---
 .../PersistentDispatcherSingleActiveConsumer.java  | 26 +++++++++++++---------
 1 file changed, 16 insertions(+), 10 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index bf6482bda01..cc7b6841e5c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -71,6 +71,7 @@ public class PersistentDispatcherSingleActiveConsumer extends 
AbstractDispatcher
     protected volatile int readBatchSize;
     protected final Backoff readFailureBackoff;
     private volatile ScheduledFuture<?> readOnActiveConsumerTask = null;
+    private final Object lockForReadOnActiveConsumerTask = new Object();
 
     private final RedeliveryTracker redeliveryTracker;
 
@@ -120,18 +121,23 @@ public class PersistentDispatcherSingleActiveConsumer 
extends AbstractDispatcher
             return;
         }
 
-        readOnActiveConsumerTask = 
topic.getBrokerService().executor().schedule(() -> {
-            if (log.isDebugEnabled()) {
-                log.debug("[{}] Rewind cursor and read more entries after {} 
ms delay", name,
-                        
serviceConfig.getActiveConsumerFailoverDelayTimeMillis());
+        synchronized (lockForReadOnActiveConsumerTask) {
+            if (readOnActiveConsumerTask != null) {
+                return;
             }
-            Consumer activeConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
-            cursor.rewind(activeConsumer != null && 
activeConsumer.readCompacted());
+            readOnActiveConsumerTask = 
topic.getBrokerService().executor().schedule(() -> {
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] Rewind cursor and read more entries after 
{} ms delay", name,
+                            
serviceConfig.getActiveConsumerFailoverDelayTimeMillis());
+                }
+                Consumer activeConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
+                cursor.rewind(activeConsumer != null && 
activeConsumer.readCompacted());
 
-            notifyActiveConsumerChanged(activeConsumer);
-            readMoreEntries(activeConsumer);
-            readOnActiveConsumerTask = null;
-        }, serviceConfig.getActiveConsumerFailoverDelayTimeMillis(), 
TimeUnit.MILLISECONDS);
+                notifyActiveConsumerChanged(activeConsumer);
+                readMoreEntries(activeConsumer);
+                readOnActiveConsumerTask = null;
+            }, serviceConfig.getActiveConsumerFailoverDelayTimeMillis(), 
TimeUnit.MILLISECONDS);
+        }
     }
 
     @Override

Reply via email to