This is an automated email from the ASF dual-hosted git repository.

nicoloboschi pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new 523a0da9d12 [fix][broker] Fix deadlock in PendingAckHandleImpl (#18989)
523a0da9d12 is described below

commit 523a0da9d1207086493fe30c1d851227b249b76e
Author: Nicolò Boschi <[email protected]>
AuthorDate: Tue Dec 20 11:57:19 2022 +0100

    [fix][broker] Fix deadlock in PendingAckHandleImpl (#18989)
    
    (cherry picked from commit 22866bd19c231e85ddff4acee4dad1f895cbbc72)
---
 .../pendingack/impl/PendingAckHandleImpl.java      | 22 ++++++++++------------
 1 file changed, 10 insertions(+), 12 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
index 64652faa6e9..4f4c61d372b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
@@ -146,20 +146,20 @@ public class PendingAckHandleImpl extends 
PendingAckHandleState implements Pendi
                         
.getBrokerService().getPulsar().getTransactionPendingAckStoreProvider();
 
         pendingAckStoreProvider.checkInitializedBefore(persistentSubscription)
-                .thenAccept(init -> {
+                .thenAcceptAsync(init -> {
                     if (init) {
                         initPendingAckStore();
                     } else {
                         completeHandleFuture();
                     }
-                })
-                .exceptionally(e -> {
+                }, internalPinnedExecutor)
+                .exceptionallyAsync(e -> {
                     Throwable t = FutureUtil.unwrapCompletionException(e);
                     changeToErrorState();
                     exceptionHandleFuture(t);
                     this.pendingAckStoreFuture.completeExceptionally(t);
                     return null;
-                });
+                }, internalPinnedExecutor);
     }
 
     private void initPendingAckStore() {
@@ -925,18 +925,16 @@ public class PendingAckHandleImpl extends 
PendingAckHandleState implements Pendi
         return transactionPendingAckStats;
     }
 
-    public synchronized void completeHandleFuture() {
-        if (!this.pendingAckHandleCompletableFuture.isDone()) {
-            
this.pendingAckHandleCompletableFuture.complete(PendingAckHandleImpl.this);
-        }
-        if (recoverTime.getRecoverStartTime() != 0L) {
+    public void completeHandleFuture() {
+        
this.pendingAckHandleCompletableFuture.complete(PendingAckHandleImpl.this);
+        if (recoverTime.getRecoverStartTime() != 0L && 
recoverTime.getRecoverEndTime() == 0L) {
             recoverTime.setRecoverEndTime(System.currentTimeMillis());
         }
     }
 
-    public synchronized void exceptionHandleFuture(Throwable t) {
-        if (!this.pendingAckHandleCompletableFuture.isDone()) {
-            this.pendingAckHandleCompletableFuture.completeExceptionally(t);
+    public void exceptionHandleFuture(Throwable t) {
+        final boolean completedNow = 
this.pendingAckHandleCompletableFuture.completeExceptionally(t);
+        if (completedNow) {
             recoverTime.setRecoverEndTime(System.currentTimeMillis());
         }
     }

Reply via email to