This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 6d30414 Fix deadlock by consumer and reader (#6728)
6d30414 is described below
commit 6d304140b6205c6a2e94ad34bfd3cc5d16aca5d1
Author: k2la <[email protected]>
AuthorDate: Tue Apr 14 09:11:51 2020 +0900
Fix deadlock by consumer and reader (#6728)
### Motivation
Broker servers were not able to connect clients when consumers and readers
connected to broker servers at almost the same time.
This happened in v2.4.2 and master branch.
As the following threaddump at that time:
```
"bookkeeper-ml-workers-OrderedExecutor-5-0" #52 prio=5 os_prio=0
tid=0x00007ff425fd0800 nid=0x28bf waiting on condition [0x00007ff3478f6000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x0000000750c51a00> (a
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section)
at
java.util.concurrent.locks.StampedLock.acquireWrite(StampedLock.java:1119)
at
java.util.concurrent.locks.StampedLock.writeLock(StampedLock.java:354)
at
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.put(ConcurrentOpenHashMap.java:245)
at
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.computeIfAbsent(ConcurrentOpenHashMap.java:129)
at
org.apache.pulsar.broker.service.persistent.PersistentTopic$2.openCursorComplete(PersistentTopic.java:638)
at
org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.asyncOpenCursor(ManagedLedgerImpl.java:712)
- locked <0x0000000750c53a00> (a
org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl)
at
org.apache.pulsar.broker.service.persistent.PersistentTopic.getDurableSubscription(PersistentTopic.java:631)
at
org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:578)
at
org.apache.pulsar.broker.service.ServerCnx.lambda$null$10(ServerCnx.java:699)
at
org.apache.pulsar.broker.service.ServerCnx$$Lambda$459/848410492.apply(Unknown
Source)
at
java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966)
at
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at
org.apache.pulsar.broker.service.BrokerService$2.lambda$openLedgerComplete$1(BrokerService.java:687)
at
org.apache.pulsar.broker.service.BrokerService$2$$Lambda$229/1013432130.run(Unknown
Source)
at
java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:719)
at
java.util.concurrent.CompletableFuture.uniRunStage(CompletableFuture.java:731)
at
java.util.concurrent.CompletableFuture.thenRun(CompletableFuture.java:2023)
at
org.apache.pulsar.broker.service.BrokerService$2.openLedgerComplete(BrokerService.java:680)
at
org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl.lambda$asyncOpen$7(ManagedLedgerFactoryImpl.java:328)
at
org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl$$Lambda$184/272111809.accept(Unknown
Source)
at
java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)
at
java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at
org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl$2.initializeComplete(ManagedLedgerFactoryImpl.java:316)
at
org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl$3$1.operationComplete(ManagedLedgerImpl.java:464)
at
org.apache.bookkeeper.mledger.impl.ManagedCursorImpl$1.operationComplete(ManagedCursorImpl.java:276)
at
org.apache.bookkeeper.mledger.impl.ManagedCursorImpl$1.operationComplete(ManagedCursorImpl.java:249)
at
org.apache.bookkeeper.mledger.impl.MetaStoreImplZookeeper.lambda$null$7(MetaStoreImplZookeeper.java:241)
at
org.apache.bookkeeper.mledger.impl.MetaStoreImplZookeeper$$Lambda$584/1125537287.run(Unknown
Source)
at
org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32)
at
org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
...
"ForkJoinPool.commonPool-worker-36" #1043 daemon prio=5 os_prio=0
tid=0x00007ff34c0ce800 nid=0x26f2 waiting for monitor entry [0x00007ff32d2eb000]
java.lang.Thread.State: BLOCKED (on object monitor)
at
org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.newNonDurableCursor(ManagedLedgerImpl.java:856)
- waiting to lock <0x0000000750c53a00> (a
org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl)
at
org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$getNonDurableSubscription$13(PersistentTopic.java:684)
at
org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$572/174683985.apply(Unknown
Source)
at
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.put(ConcurrentOpenHashMap.java:274)
at
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.computeIfAbsent(ConcurrentOpenHashMap.java:129)
at
org.apache.pulsar.broker.service.persistent.PersistentTopic.getNonDurableSubscription(PersistentTopic.java:667)
at
org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:579)
at
org.apache.pulsar.broker.service.ServerCnx.lambda$null$10(ServerCnx.java:699)
at
org.apache.pulsar.broker.service.ServerCnx$$Lambda$459/848410492.apply(Unknown
Source)
at
java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
at
java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
at
org.apache.pulsar.broker.service.ServerCnx.lambda$null$13(ServerCnx.java:682)
at
org.apache.pulsar.broker.service.ServerCnx$$Lambda$458/375938934.apply(Unknown
Source)
at
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
at
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at
java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
at
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)
at
java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:457)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at
java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
```
`PersistentTopic#getDurableSubscription` locked `ConcurrentOpenHashMap`
after locking `ManagedLedgerImpl`.
( `ManagedLedgerImpl` => `ConcurrentOpenHashMap`)
On the other hand, `PersistentTopic#getNonDurableSubscription` tried to
lock `ManagedLedgerImpl` after trying to lock `ConcurrentOpenHashMap`.
( `ConcurrentOpenHashMap` => `ManagedLedgerImpl`)
So, it seems that deadlock happens.
### Modifications
Fixed as `PersistentTopic#getNonDurableSubscription` try to lock
`ConcurrentOpenHashMap` after trying to lock `ManagedLedgerImpl`. (
`ManagedLedgerImpl` => `ConcurrentOpenHashMap`)
---
.../broker/service/persistent/PersistentTopic.java | 76 +++++++++++-----------
1 file changed, 39 insertions(+), 37 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 1b2822f..c5c5cee 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -692,49 +692,51 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
CompletableFuture<Subscription> subscriptionFuture = new
CompletableFuture<>();
log.info("[{}][{}] Creating non-durable subscription at msg id {}",
topic, subscriptionName, startMessageId);
- // Create a new non-durable cursor only for the first consumer that
connects
- Subscription subscription =
subscriptions.computeIfAbsent(subscriptionName, name -> {
- MessageIdImpl msgId = startMessageId != null ? (MessageIdImpl)
startMessageId
- : (MessageIdImpl) MessageId.latest;
-
- long ledgerId = msgId.getLedgerId();
- long entryId = msgId.getEntryId();
- if (ledgerId >= 0
- && msgId instanceof BatchMessageIdImpl) {
- // When the start message is relative to a batch, we need to
take one step back on the previous message,
- // because the "batch" might not have been consumed in its
entirety.
- // The client will then be able to discard the first messages
if needed.
- entryId = msgId.getEntryId() - 1;
- }
+ synchronized (ledger) {
+ // Create a new non-durable cursor only for the first consumer
that connects
+ Subscription subscription =
subscriptions.computeIfAbsent(subscriptionName, name -> {
+ MessageIdImpl msgId = startMessageId != null ? (MessageIdImpl)
startMessageId
+ : (MessageIdImpl) MessageId.latest;
+
+ long ledgerId = msgId.getLedgerId();
+ long entryId = msgId.getEntryId();
+ if (ledgerId >= 0
+ && msgId instanceof BatchMessageIdImpl) {
+ // When the start message is relative to a batch, we need
to take one step back on the previous message,
+ // because the "batch" might not have been consumed in its
entirety.
+ // The client will then be able to discard the first
messages if needed.
+ entryId = msgId.getEntryId() - 1;
+ }
- Position startPosition = new PositionImpl(ledgerId, entryId);
- ManagedCursor cursor = null;
- try {
- cursor = ledger.newNonDurableCursor(startPosition,
subscriptionName);
- } catch (ManagedLedgerException e) {
- subscriptionFuture.completeExceptionally(e);
- }
+ Position startPosition = new PositionImpl(ledgerId, entryId);
+ ManagedCursor cursor = null;
+ try {
+ cursor = ledger.newNonDurableCursor(startPosition,
subscriptionName);
+ } catch (ManagedLedgerException e) {
+ subscriptionFuture.completeExceptionally(e);
+ }
- return new PersistentSubscription(this, subscriptionName, cursor,
false);
- });
+ return new PersistentSubscription(this, subscriptionName,
cursor, false);
+ });
- if (!subscriptionFuture.isDone()) {
- if (startMessageRollbackDurationSec > 0) {
- long timestamp = System.currentTimeMillis() -
TimeUnit.SECONDS.toMillis(startMessageRollbackDurationSec);
- subscription.resetCursor(timestamp).handle((s, ex) -> {
- if (ex != null) {
- log.warn("[{}] Failed to reset cursor {} position at
timestamp {}", topic, subscriptionName,
- startMessageRollbackDurationSec);
- }
+ if (!subscriptionFuture.isDone()) {
+ if (startMessageRollbackDurationSec > 0) {
+ long timestamp = System.currentTimeMillis() -
TimeUnit.SECONDS.toMillis(startMessageRollbackDurationSec);
+ subscription.resetCursor(timestamp).handle((s, ex) -> {
+ if (ex != null) {
+ log.warn("[{}] Failed to reset cursor {} position
at timestamp {}", topic, subscriptionName,
+ startMessageRollbackDurationSec);
+ }
+ subscriptionFuture.complete(subscription);
+ return null;
+ });
+ } else {
subscriptionFuture.complete(subscription);
- return null;
- });
+ }
} else {
- subscriptionFuture.complete(subscription);
+ // failed to initialize managed-cursor: clean up created
subscription
+ subscriptions.remove(subscriptionName);
}
- } else {
- // failed to initialize managed-cursor: clean up created
subscription
- subscriptions.remove(subscriptionName);
}
return subscriptionFuture;