This is an automated email from the ASF dual-hosted git repository.

bogong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 9f30ee9  [fix][txn]: fix pending ack is recovering throw 
CursorAlreadyClosedxception (#14781)
9f30ee9 is described below

commit 9f30ee909a683458cb0c0ba7b6cf0c5bd874f4ea
Author: congbo <[email protected]>
AuthorDate: Wed Mar 23 11:08:04 2022 +0800

    [fix][txn]: fix pending ack is recovering throw CursorAlreadyClosedxception 
(#14781)
    
    ### Motivation
    When Transaction PendingAck recover fail throw 
CursorAlreadyClosedException, we should stop the recover op. the cursor was 
been closed, the pendingAck was been closed, so we should stop the recover op, 
in order to release thread resources
    
    ```
    02:03:00.072 [pulsar-transaction-executor-4-1] ERROR 
org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore - 
MLPendingAckStore of topic 
[public/default/persistent/source-topic-partition-13-test__transaction_pending_ack]
 stat reply fail!
    
org.apache.bookkeeper.mledger.ManagedLedgerException$CursorAlreadyClosedException:
 Cursor was already closed
    ```
    
    ### Modifications
    When recover fail by CursorAlreadyClosedException, comeplete recover
    ### Verifying this change
    add test for it
---
 .../broker/transaction/pendingack/impl/MLPendingAckStore.java |  3 ++-
 .../org/apache/pulsar/broker/transaction/TransactionTest.java | 11 +++++++++++
 2 files changed, 13 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
index 36b775b..f0a5ae6 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
@@ -398,7 +398,8 @@ public class MLPendingAckStore implements PendingAckStore {
         public void readEntriesFailed(ManagedLedgerException exception, Object 
ctx) {
             if (managedLedger.getConfig().isAutoSkipNonRecoverableData()
                     && exception instanceof 
ManagedLedgerException.NonRecoverableLedgerException
-                    || exception instanceof 
ManagedLedgerException.ManagedLedgerFencedException) {
+                    || exception instanceof 
ManagedLedgerException.ManagedLedgerFencedException
+                    || exception instanceof 
ManagedLedgerException.CursorAlreadyClosedException) {
                 isReadable = false;
             }
             log.error("MLPendingAckStore of topic [{}] stat reply fail!", 
managedLedger.getName(), exception);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 9dd48ab..5ab770d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -620,6 +620,17 @@ public class TransactionTest extends TransactionTestBase {
         PendingAckHandleImpl pendingAckHandle2 = new 
PendingAckHandleImpl(persistentSubscription);
         Awaitility.await().untilAsserted(() ->
                 assertEquals(pendingAckHandle2.getStats().state, "Ready"));
+
+        doAnswer(invocation -> {
+            AsyncCallbacks.ReadEntriesCallback callback = 
invocation.getArgument(1);
+            callback.readEntriesFailed(new 
ManagedLedgerException.CursorAlreadyClosedException("test"), null);
+            return null;
+        }).when(managedCursor).asyncReadEntries(anyInt(), any(), any(), any());
+
+        PendingAckHandleImpl pendingAckHandle3 = new 
PendingAckHandleImpl(persistentSubscription);
+
+        Awaitility.await().untilAsserted(() ->
+                assertEquals(pendingAckHandle3.getStats().state, "Ready"));
     }
 
     @Test

Reply via email to