This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 2e78d93f1e1e0dd1c61aa59224178783d0321ec7 Author: wenbingshen <[email protected]> AuthorDate: Fri Jan 14 10:45:55 2022 +0800 fix no response to client when handleSubscribe because PendingAckHandleImpl init fail (#13655) Fixes #13654 ### Modifications When the initialization of `PendingAckHandleImpl` fails, `pendingAckHandleCompletableFuture` will not be exception or complete, then `org.apache.pulsar.broker.service.persistent.PersistentSubscription#addConsumer` will not return any response to the client. ``` public CompletableFuture<Void> addConsumer(Consumer consumer) { return pendingAckHandle.pendingAckHandleFuture().thenCompose(future -> ...) } ``` (cherry picked from commit 528c972eff6d9c41adb40a6dad42e9432487d187) --- .../broker/transaction/pendingack/PendingAckReplyCallBack.java | 6 ++++++ .../transaction/pendingack/impl/MLPendingAckReplyCallBack.java | 7 +++++++ .../broker/transaction/pendingack/impl/MLPendingAckStore.java | 3 +++ .../broker/transaction/pendingack/impl/PendingAckHandleImpl.java | 7 +++++++ 4 files changed, 23 insertions(+) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckReplyCallBack.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckReplyCallBack.java index 3f2cc51..fed9add 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckReplyCallBack.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckReplyCallBack.java @@ -36,4 +36,10 @@ public interface PendingAckReplyCallBack { * @param pendingAckMetadataEntry {@link PendingAckMetadataEntry} the metadata entry of pending ack */ void handleMetadataEntry(PendingAckMetadataEntry pendingAckMetadataEntry); + + /** + * Pending ack replay failed callback for pending ack store. + */ + void replayFailed(Throwable t); + } \ No newline at end of file diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckReplyCallBack.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckReplyCallBack.java index 6bcc344..b98e64e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckReplyCallBack.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckReplyCallBack.java @@ -61,6 +61,13 @@ public class MLPendingAckReplyCallBack implements PendingAckReplyCallBack { } @Override + public void replayFailed(Throwable t) { + synchronized (pendingAckHandle) { + pendingAckHandle.exceptionHandleFuture(t); + } + } + + @Override public void handleMetadataEntry(PendingAckMetadataEntry pendingAckMetadataEntry) { TxnID txnID = new TxnID(pendingAckMetadataEntry.getTxnidMostBits(), pendingAckMetadataEntry.getTxnidLeastBits()); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java index 1592318..1ed0992 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java @@ -303,6 +303,8 @@ public class MLPendingAckStore implements PendingAckStore { public void run() { try { if (cursor.isClosed()) { + pendingAckReplyCallBack.replayFailed(new ManagedLedgerException + .CursorAlreadyClosedException("MLPendingAckStore cursor have been closed.")); log.warn("[{}] MLPendingAckStore cursor have been closed, close replay thread.", cursor.getManagedLedger().getName()); return; @@ -350,6 +352,7 @@ public class MLPendingAckStore implements PendingAckStore { } } } catch (Exception e) { + pendingAckReplyCallBack.replayFailed(e); log.error("[{}] Pending ack recover fail!", subManagedCursor.getManagedLedger().getName(), e); return; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java index d92793a..ba74d78 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java @@ -141,6 +141,7 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi }).exceptionally(e -> { acceptQueue.clear(); changeToErrorState(); + exceptionHandleFuture(e.getCause()); log.error("PendingAckHandleImpl init fail! TopicName : {}, SubName: {}", topicName, subName, e); return null; }); @@ -889,6 +890,12 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi } } + public synchronized void exceptionHandleFuture(Throwable t) { + if (!this.pendingAckHandleCompletableFuture.isDone()) { + this.pendingAckHandleCompletableFuture.completeExceptionally(t); + } + } + @Override public TransactionInPendingAckStats getTransactionInPendingAckStats(TxnID txnID) { TransactionInPendingAckStats transactionInPendingAckStats = new TransactionInPendingAckStats();
