This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 4e0c145c89a [fix][broker] Fix wrong double-checked locking for 
readOnActiveConsumerTask in dispatcher (#22279)
4e0c145c89a is described below

commit 4e0c145c89a35ec9b41fa22862edac59e28d892d
Author: Yunze Xu <[email protected]>
AuthorDate: Sat Mar 16 14:56:34 2024 +0800

    [fix][broker] Fix wrong double-checked locking for readOnActiveConsumerTask 
in dispatcher (#22279)
---
 .../PersistentDispatcherSingleActiveConsumer.java  | 26 +++++++++++++---------
 1 file changed, 16 insertions(+), 10 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 0f43eb6c5cc..637ede8a41f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -74,6 +74,7 @@ public class PersistentDispatcherSingleActiveConsumer extends 
AbstractDispatcher
     protected volatile int readBatchSize;
     protected final Backoff readFailureBackoff;
     private volatile ScheduledFuture<?> readOnActiveConsumerTask = null;
+    private final Object lockForReadOnActiveConsumerTask = new Object();
 
     private final RedeliveryTracker redeliveryTracker;
 
@@ -123,18 +124,23 @@ public class PersistentDispatcherSingleActiveConsumer 
extends AbstractDispatcher
             return;
         }
 
-        readOnActiveConsumerTask = 
topic.getBrokerService().executor().schedule(() -> {
-            if (log.isDebugEnabled()) {
-                log.debug("[{}] Rewind cursor and read more entries after {} 
ms delay", name,
-                        
serviceConfig.getActiveConsumerFailoverDelayTimeMillis());
+        synchronized (lockForReadOnActiveConsumerTask) {
+            if (readOnActiveConsumerTask != null) {
+                return;
             }
-            Consumer activeConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
-            cursor.rewind(activeConsumer != null && 
activeConsumer.readCompacted());
+            readOnActiveConsumerTask = 
topic.getBrokerService().executor().schedule(() -> {
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] Rewind cursor and read more entries after 
{} ms delay", name,
+                            
serviceConfig.getActiveConsumerFailoverDelayTimeMillis());
+                }
+                Consumer activeConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
+                cursor.rewind(activeConsumer != null && 
activeConsumer.readCompacted());
 
-            notifyActiveConsumerChanged(activeConsumer);
-            readMoreEntries(activeConsumer);
-            readOnActiveConsumerTask = null;
-        }, serviceConfig.getActiveConsumerFailoverDelayTimeMillis(), 
TimeUnit.MILLISECONDS);
+                notifyActiveConsumerChanged(activeConsumer);
+                readMoreEntries(activeConsumer);
+                readOnActiveConsumerTask = null;
+            }, serviceConfig.getActiveConsumerFailoverDelayTimeMillis(), 
TimeUnit.MILLISECONDS);
+        }
     }
 
     @Override

Reply via email to