This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 10371ee  Future completed twice in the method of  
impl.MLPendingAckStore#closeAsync (#12362)
10371ee is described below

commit 10371ee870d26a079d7d744154e7c4f67ac09fae
Author: chenlin <[email protected]>
AuthorDate: Fri Oct 22 16:23:42 2021 +0800

    Future completed twice in the method of  impl.MLPendingAckStore#closeAsync 
(#12362)
---
 .../pendingack/impl/MLPendingAckStore.java         | 23 ++++++++++++++++------
 1 file changed, 17 insertions(+), 6 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
index b60f4ba..fb88878 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
@@ -120,12 +120,23 @@ public class MLPendingAckStore implements PendingAckStore 
{
         cursor.asyncClose(new AsyncCallbacks.CloseCallback() {
             @Override
             public void closeComplete(Object ctx) {
-                try {
-                    managedLedger.close();
-                } catch (Exception e) {
-                    completableFuture.completeExceptionally(e);
-                }
-                completableFuture.complete(null);
+                managedLedger.asyncClose(new AsyncCallbacks.CloseCallback() {
+
+                    @Override
+                    public void closeComplete(Object ctx) {
+                        if (log.isDebugEnabled()) {
+                            log.debug("[{}][{}] MLPendingAckStore closed 
successfully!", managedLedger.getName(), ctx);
+                        }
+                        completableFuture.complete(null);
+                    }
+
+                    @Override
+                    public void closeFailed(ManagedLedgerException exception, 
Object ctx) {
+                        log.error("[{}][{}] MLPendingAckStore closed 
failed,exception={}", managedLedger.getName(),
+                                ctx, exception);
+                        completableFuture.completeExceptionally(exception);
+                    }
+                }, ctx);
             }
 
             @Override

Reply via email to