This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit e2070a87d556816a831a60c8056fb6cbddfb61c1 Author: Yunze Xu <[email protected]> AuthorDate: Sat Mar 16 14:56:34 2024 +0800 [fix][broker] Fix wrong double-checked locking for readOnActiveConsumerTask in dispatcher (#22279) (cherry picked from commit 4e0c145c89a35ec9b41fa22862edac59e28d892d) --- .../PersistentDispatcherSingleActiveConsumer.java | 26 +++++++++++++--------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java index bf6482bda01..cc7b6841e5c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java @@ -71,6 +71,7 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher protected volatile int readBatchSize; protected final Backoff readFailureBackoff; private volatile ScheduledFuture<?> readOnActiveConsumerTask = null; + private final Object lockForReadOnActiveConsumerTask = new Object(); private final RedeliveryTracker redeliveryTracker; @@ -120,18 +121,23 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher return; } - readOnActiveConsumerTask = topic.getBrokerService().executor().schedule(() -> { - if (log.isDebugEnabled()) { - log.debug("[{}] Rewind cursor and read more entries after {} ms delay", name, - serviceConfig.getActiveConsumerFailoverDelayTimeMillis()); + synchronized (lockForReadOnActiveConsumerTask) { + if (readOnActiveConsumerTask != null) { + return; } - Consumer activeConsumer = ACTIVE_CONSUMER_UPDATER.get(this); - cursor.rewind(activeConsumer != null && activeConsumer.readCompacted()); + readOnActiveConsumerTask = topic.getBrokerService().executor().schedule(() -> { + if (log.isDebugEnabled()) { + log.debug("[{}] Rewind cursor and read more entries after {} ms delay", name, + serviceConfig.getActiveConsumerFailoverDelayTimeMillis()); + } + Consumer activeConsumer = ACTIVE_CONSUMER_UPDATER.get(this); + cursor.rewind(activeConsumer != null && activeConsumer.readCompacted()); - notifyActiveConsumerChanged(activeConsumer); - readMoreEntries(activeConsumer); - readOnActiveConsumerTask = null; - }, serviceConfig.getActiveConsumerFailoverDelayTimeMillis(), TimeUnit.MILLISECONDS); + notifyActiveConsumerChanged(activeConsumer); + readMoreEntries(activeConsumer); + readOnActiveConsumerTask = null; + }, serviceConfig.getActiveConsumerFailoverDelayTimeMillis(), TimeUnit.MILLISECONDS); + } } @Override
