This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new 35ea4712a60 [fix][broker] Fix wrong double-checked locking for
readOnActiveConsumerTask in dispatcher (#22279)
35ea4712a60 is described below
commit 35ea4712a60992deeb93171c8b0e6d48c84e530a
Author: Yunze Xu <[email protected]>
AuthorDate: Sat Mar 16 14:56:34 2024 +0800
[fix][broker] Fix wrong double-checked locking for readOnActiveConsumerTask
in dispatcher (#22279)
---
.../PersistentDispatcherSingleActiveConsumer.java | 26 +++++++++++++---------
1 file changed, 16 insertions(+), 10 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 387ba83d9cd..7494d11421a 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -74,6 +74,7 @@ public class PersistentDispatcherSingleActiveConsumer extends
AbstractDispatcher
protected volatile int readBatchSize;
protected final Backoff readFailureBackoff;
private volatile ScheduledFuture<?> readOnActiveConsumerTask = null;
+ private final Object lockForReadOnActiveConsumerTask = new Object();
private final RedeliveryTracker redeliveryTracker;
@@ -123,18 +124,23 @@ public class PersistentDispatcherSingleActiveConsumer
extends AbstractDispatcher
return;
}
- readOnActiveConsumerTask =
topic.getBrokerService().executor().schedule(() -> {
- if (log.isDebugEnabled()) {
- log.debug("[{}] Rewind cursor and read more entries after {}
ms delay", name,
-
serviceConfig.getActiveConsumerFailoverDelayTimeMillis());
+ synchronized (lockForReadOnActiveConsumerTask) {
+ if (readOnActiveConsumerTask != null) {
+ return;
}
- Consumer activeConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
- cursor.rewind(activeConsumer != null &&
activeConsumer.readCompacted());
+ readOnActiveConsumerTask =
topic.getBrokerService().executor().schedule(() -> {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Rewind cursor and read more entries after
{} ms delay", name,
+
serviceConfig.getActiveConsumerFailoverDelayTimeMillis());
+ }
+ Consumer activeConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
+ cursor.rewind(activeConsumer != null &&
activeConsumer.readCompacted());
- notifyActiveConsumerChanged(activeConsumer);
- readMoreEntries(activeConsumer);
- readOnActiveConsumerTask = null;
- }, serviceConfig.getActiveConsumerFailoverDelayTimeMillis(),
TimeUnit.MILLISECONDS);
+ notifyActiveConsumerChanged(activeConsumer);
+ readMoreEntries(activeConsumer);
+ readOnActiveConsumerTask = null;
+ }, serviceConfig.getActiveConsumerFailoverDelayTimeMillis(),
TimeUnit.MILLISECONDS);
+ }
}
@Override