This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 4e0c145c89a [fix][broker] Fix wrong double-checked locking for
readOnActiveConsumerTask in dispatcher (#22279)
4e0c145c89a is described below
commit 4e0c145c89a35ec9b41fa22862edac59e28d892d
Author: Yunze Xu <[email protected]>
AuthorDate: Sat Mar 16 14:56:34 2024 +0800
[fix][broker] Fix wrong double-checked locking for readOnActiveConsumerTask
in dispatcher (#22279)
---
.../PersistentDispatcherSingleActiveConsumer.java | 26 +++++++++++++---------
1 file changed, 16 insertions(+), 10 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 0f43eb6c5cc..637ede8a41f 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -74,6 +74,7 @@ public class PersistentDispatcherSingleActiveConsumer extends
AbstractDispatcher
protected volatile int readBatchSize;
protected final Backoff readFailureBackoff;
private volatile ScheduledFuture<?> readOnActiveConsumerTask = null;
+ private final Object lockForReadOnActiveConsumerTask = new Object();
private final RedeliveryTracker redeliveryTracker;
@@ -123,18 +124,23 @@ public class PersistentDispatcherSingleActiveConsumer
extends AbstractDispatcher
return;
}
- readOnActiveConsumerTask =
topic.getBrokerService().executor().schedule(() -> {
- if (log.isDebugEnabled()) {
- log.debug("[{}] Rewind cursor and read more entries after {}
ms delay", name,
-
serviceConfig.getActiveConsumerFailoverDelayTimeMillis());
+ synchronized (lockForReadOnActiveConsumerTask) {
+ if (readOnActiveConsumerTask != null) {
+ return;
}
- Consumer activeConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
- cursor.rewind(activeConsumer != null &&
activeConsumer.readCompacted());
+ readOnActiveConsumerTask =
topic.getBrokerService().executor().schedule(() -> {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Rewind cursor and read more entries after
{} ms delay", name,
+
serviceConfig.getActiveConsumerFailoverDelayTimeMillis());
+ }
+ Consumer activeConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
+ cursor.rewind(activeConsumer != null &&
activeConsumer.readCompacted());
- notifyActiveConsumerChanged(activeConsumer);
- readMoreEntries(activeConsumer);
- readOnActiveConsumerTask = null;
- }, serviceConfig.getActiveConsumerFailoverDelayTimeMillis(),
TimeUnit.MILLISECONDS);
+ notifyActiveConsumerChanged(activeConsumer);
+ readMoreEntries(activeConsumer);
+ readOnActiveConsumerTask = null;
+ }, serviceConfig.getActiveConsumerFailoverDelayTimeMillis(),
TimeUnit.MILLISECONDS);
+ }
}
@Override