This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new 36771383d66 [fix][txn] Catch and log runtime exceptions in async 
operations (#19258)
36771383d66 is described below

commit 36771383d66ccd8c6c44d75b384cc377fc26509c
Author: Nicolò Boschi <[email protected]>
AuthorDate: Wed Jan 18 11:54:48 2023 +0100

    [fix][txn] Catch and log runtime exceptions in async operations (#19258)
    
    (cherry picked from commit cfd7e60497afd0f050889641997fef17e460dd47)
---
 .../org/apache/pulsar/common/util/FutureUtil.java  | 22 +++++++++++++++++++++
 .../impl/MLTransactionMetadataStore.java           | 23 +++++++++++-----------
 2 files changed, 33 insertions(+), 12 deletions(-)

diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
index b8cc5a99e4c..2d9efa6e0cb 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
@@ -25,6 +25,7 @@ import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
@@ -274,4 +275,25 @@ public class FutureUtil {
             return new CompletionException(throwable);
         }
     }
+
+    /**
+     * Executes an operation using the supplied {@link Executor}
+     * and notify failures on the supplied {@link CompletableFuture}.
+     *
+     * @param runnable the runnable to execute
+     * @param executor  the executor to use for executing the runnable
+     * @param completableFuture  the future to complete in case of exceptions
+     * @return
+     */
+
+    public static void safeRunAsync(Runnable runnable,
+                                    Executor executor,
+                                    CompletableFuture completableFuture) {
+        CompletableFuture
+                .runAsync(runnable, executor)
+                .exceptionally((throwable) -> {
+                    completableFuture.completeExceptionally(throwable);
+                    return null;
+                });
+    }
 }
diff --git 
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
 
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
index 6c88d27cc22..273a01850cb 100644
--- 
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
+++ 
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
@@ -107,8 +107,7 @@ public class MLTransactionMetadataStore
                     .CoordinatorNotFoundException("transaction metadata store 
with tcId "
                             + tcID.toString() + " change state to Initializing 
error when init it"));
         } else {
-            internalPinnedExecutor.execute(() -> 
transactionLog.replayAsync(new TransactionLogReplayCallback() {
-
+            FutureUtil.safeRunAsync(() -> transactionLog.replayAsync(new 
TransactionLogReplayCallback() {
                 @Override
                 public void replayComplete() {
                     recoverTracker.appendOpenTransactionToTimeoutTracker();
@@ -195,7 +194,7 @@ public class MLTransactionMetadataStore
                         log.error(e.getMessage(), e);
                     }
                 }
-            }));
+            }), internalPinnedExecutor, completableFuture);
         }
         return completableFuture;
     }
@@ -220,7 +219,7 @@ public class MLTransactionMetadataStore
     @Override
     public CompletableFuture<TxnID> newTransaction(long timeOut) {
         CompletableFuture<TxnID> completableFuture = new CompletableFuture<>();
-        internalPinnedExecutor.execute(() -> {
+        FutureUtil.safeRunAsync(() -> {
             if (!checkIfReady()) {
                 completableFuture.completeExceptionally(new 
CoordinatorException
                         .TransactionMetadataStoreStateException(tcID, 
State.Ready, getState(), "new Transaction"));
@@ -255,18 +254,18 @@ public class MLTransactionMetadataStore
                             completableFuture.complete(txnID);
                         }
                     });
-        });
+        }, internalPinnedExecutor, completableFuture);
         return completableFuture;
     }
 
     @Override
     public CompletableFuture<Void> addProducedPartitionToTxn(TxnID txnID, 
List<String> partitions) {
         CompletableFuture<Void> promise = new CompletableFuture<>();
-        internalPinnedExecutor.execute(() -> {
+        FutureUtil.safeRunAsync(() -> {
             if (!checkIfReady()) {
                 promise
                         .completeExceptionally(new 
CoordinatorException.TransactionMetadataStoreStateException(tcID,
-                        State.Ready, getState(), "add produced partition"));
+                                State.Ready, getState(), "add produced 
partition"));
                 return;
             }
             getTxnPositionPair(txnID).thenCompose(txnMetaListPair -> {
@@ -299,7 +298,7 @@ public class MLTransactionMetadataStore
                 promise.completeExceptionally(ex);
                 return null;
             });
-        });
+        }, internalPinnedExecutor, promise);
         return promise;
     }
 
@@ -307,7 +306,7 @@ public class MLTransactionMetadataStore
     public CompletableFuture<Void> addAckedPartitionToTxn(TxnID txnID,
                                                           
List<TransactionSubscription> txnSubscriptions) {
         CompletableFuture<Void> promise = new CompletableFuture<>();
-        internalPinnedExecutor.execute(() -> {
+        FutureUtil.safeRunAsync(() -> {
             if (!checkIfReady()) {
                 promise.completeExceptionally(new CoordinatorException
                         .TransactionMetadataStoreStateException(tcID, 
State.Ready, getState(), "add acked partition"));
@@ -343,7 +342,7 @@ public class MLTransactionMetadataStore
                 promise.completeExceptionally(ex);
                 return null;
             });
-        });
+        }, internalPinnedExecutor, promise);
         return promise;
     }
 
@@ -351,7 +350,7 @@ public class MLTransactionMetadataStore
     public CompletableFuture<Void> updateTxnStatus(TxnID txnID, TxnStatus 
newStatus,
                                                                 TxnStatus 
expectedStatus, boolean isTimeout) {
         CompletableFuture<Void> promise = new CompletableFuture<>();
-        internalPinnedExecutor.execute(() -> {
+        FutureUtil.safeRunAsync(() -> {
             if (!checkIfReady()) {
                 promise.completeExceptionally(new CoordinatorException
                         .TransactionMetadataStoreStateException(tcID,
@@ -412,7 +411,7 @@ public class MLTransactionMetadataStore
                 promise.completeExceptionally(ex);
                 return null;
             });
-        });
+        }, internalPinnedExecutor, promise);
        return promise;
     }
 

Reply via email to