This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 2e78d93f1e1e0dd1c61aa59224178783d0321ec7
Author: wenbingshen <[email protected]>
AuthorDate: Fri Jan 14 10:45:55 2022 +0800

    fix no response to client when handleSubscribe because PendingAckHandleImpl 
init fail (#13655)
    
    Fixes #13654
    
    ### Modifications
    
    When the initialization of `PendingAckHandleImpl` fails, 
`pendingAckHandleCompletableFuture` will not be exception or complete, then 
`org.apache.pulsar.broker.service.persistent.PersistentSubscription#addConsumer`
 will not return any response to the client.
    
    ```
    public CompletableFuture<Void> addConsumer(Consumer consumer) {
            return pendingAckHandle.pendingAckHandleFuture().thenCompose(future 
-> ...)
    }
    ```
    
    (cherry picked from commit 528c972eff6d9c41adb40a6dad42e9432487d187)
---
 .../broker/transaction/pendingack/PendingAckReplyCallBack.java     | 6 ++++++
 .../transaction/pendingack/impl/MLPendingAckReplyCallBack.java     | 7 +++++++
 .../broker/transaction/pendingack/impl/MLPendingAckStore.java      | 3 +++
 .../broker/transaction/pendingack/impl/PendingAckHandleImpl.java   | 7 +++++++
 4 files changed, 23 insertions(+)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckReplyCallBack.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckReplyCallBack.java
index 3f2cc51..fed9add 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckReplyCallBack.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckReplyCallBack.java
@@ -36,4 +36,10 @@ public interface PendingAckReplyCallBack {
      * @param pendingAckMetadataEntry {@link PendingAckMetadataEntry} the 
metadata entry of pending ack
      */
     void handleMetadataEntry(PendingAckMetadataEntry pendingAckMetadataEntry);
+
+    /**
+     * Pending ack replay failed callback for pending ack store.
+     */
+    void replayFailed(Throwable t);
+
 }
\ No newline at end of file
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckReplyCallBack.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckReplyCallBack.java
index 6bcc344..b98e64e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckReplyCallBack.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckReplyCallBack.java
@@ -61,6 +61,13 @@ public class MLPendingAckReplyCallBack implements 
PendingAckReplyCallBack {
     }
 
     @Override
+    public void replayFailed(Throwable t) {
+        synchronized (pendingAckHandle) {
+            pendingAckHandle.exceptionHandleFuture(t);
+        }
+    }
+
+    @Override
     public void handleMetadataEntry(PendingAckMetadataEntry 
pendingAckMetadataEntry) {
         TxnID txnID = new TxnID(pendingAckMetadataEntry.getTxnidMostBits(),
                 pendingAckMetadataEntry.getTxnidLeastBits());
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
index 1592318..1ed0992 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
@@ -303,6 +303,8 @@ public class MLPendingAckStore implements PendingAckStore {
         public void run() {
             try {
                 if (cursor.isClosed()) {
+                    pendingAckReplyCallBack.replayFailed(new 
ManagedLedgerException
+                            .CursorAlreadyClosedException("MLPendingAckStore 
cursor have been closed."));
                     log.warn("[{}] MLPendingAckStore cursor have been closed, 
close replay thread.",
                             cursor.getManagedLedger().getName());
                     return;
@@ -350,6 +352,7 @@ public class MLPendingAckStore implements PendingAckStore {
                     }
                 }
             } catch (Exception e) {
+                pendingAckReplyCallBack.replayFailed(e);
                 log.error("[{}] Pending ack recover fail!", 
subManagedCursor.getManagedLedger().getName(), e);
                 return;
             }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
index d92793a..ba74d78 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
@@ -141,6 +141,7 @@ public class PendingAckHandleImpl extends 
PendingAckHandleState implements Pendi
                     }).exceptionally(e -> {
                         acceptQueue.clear();
                         changeToErrorState();
+                        exceptionHandleFuture(e.getCause());
                         log.error("PendingAckHandleImpl init fail! TopicName : 
{}, SubName: {}", topicName, subName, e);
                         return null;
                     });
@@ -889,6 +890,12 @@ public class PendingAckHandleImpl extends 
PendingAckHandleState implements Pendi
         }
     }
 
+    public synchronized void exceptionHandleFuture(Throwable t) {
+        if (!this.pendingAckHandleCompletableFuture.isDone()) {
+            this.pendingAckHandleCompletableFuture.completeExceptionally(t);
+        }
+    }
+
     @Override
     public TransactionInPendingAckStats getTransactionInPendingAckStats(TxnID 
txnID) {
         TransactionInPendingAckStats transactionInPendingAckStats = new 
TransactionInPendingAckStats();

Reply via email to