This is an automated email from the ASF dual-hosted git repository.
nicoloboschi pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new 523a0da9d12 [fix][broker] Fix deadlock in PendingAckHandleImpl (#18989)
523a0da9d12 is described below
commit 523a0da9d1207086493fe30c1d851227b249b76e
Author: Nicolò Boschi <[email protected]>
AuthorDate: Tue Dec 20 11:57:19 2022 +0100
[fix][broker] Fix deadlock in PendingAckHandleImpl (#18989)
(cherry picked from commit 22866bd19c231e85ddff4acee4dad1f895cbbc72)
---
.../pendingack/impl/PendingAckHandleImpl.java | 22 ++++++++++------------
1 file changed, 10 insertions(+), 12 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
index 64652faa6e9..4f4c61d372b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
@@ -146,20 +146,20 @@ public class PendingAckHandleImpl extends
PendingAckHandleState implements Pendi
.getBrokerService().getPulsar().getTransactionPendingAckStoreProvider();
pendingAckStoreProvider.checkInitializedBefore(persistentSubscription)
- .thenAccept(init -> {
+ .thenAcceptAsync(init -> {
if (init) {
initPendingAckStore();
} else {
completeHandleFuture();
}
- })
- .exceptionally(e -> {
+ }, internalPinnedExecutor)
+ .exceptionallyAsync(e -> {
Throwable t = FutureUtil.unwrapCompletionException(e);
changeToErrorState();
exceptionHandleFuture(t);
this.pendingAckStoreFuture.completeExceptionally(t);
return null;
- });
+ }, internalPinnedExecutor);
}
private void initPendingAckStore() {
@@ -925,18 +925,16 @@ public class PendingAckHandleImpl extends
PendingAckHandleState implements Pendi
return transactionPendingAckStats;
}
- public synchronized void completeHandleFuture() {
- if (!this.pendingAckHandleCompletableFuture.isDone()) {
-
this.pendingAckHandleCompletableFuture.complete(PendingAckHandleImpl.this);
- }
- if (recoverTime.getRecoverStartTime() != 0L) {
+ public void completeHandleFuture() {
+
this.pendingAckHandleCompletableFuture.complete(PendingAckHandleImpl.this);
+ if (recoverTime.getRecoverStartTime() != 0L &&
recoverTime.getRecoverEndTime() == 0L) {
recoverTime.setRecoverEndTime(System.currentTimeMillis());
}
}
- public synchronized void exceptionHandleFuture(Throwable t) {
- if (!this.pendingAckHandleCompletableFuture.isDone()) {
- this.pendingAckHandleCompletableFuture.completeExceptionally(t);
+ public void exceptionHandleFuture(Throwable t) {
+ final boolean completedNow =
this.pendingAckHandleCompletableFuture.completeExceptionally(t);
+ if (completedNow) {
recoverTime.setRecoverEndTime(System.currentTimeMillis());
}
}