This is an automated email from the ASF dual-hosted git repository.
xiangying pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new 36771383d66 [fix][txn] Catch and log runtime exceptions in async
operations (#19258)
36771383d66 is described below
commit 36771383d66ccd8c6c44d75b384cc377fc26509c
Author: Nicolò Boschi <[email protected]>
AuthorDate: Wed Jan 18 11:54:48 2023 +0100
[fix][txn] Catch and log runtime exceptions in async operations (#19258)
(cherry picked from commit cfd7e60497afd0f050889641997fef17e460dd47)
---
.../org/apache/pulsar/common/util/FutureUtil.java | 22 +++++++++++++++++++++
.../impl/MLTransactionMetadataStore.java | 23 +++++++++++-----------
2 files changed, 33 insertions(+), 12 deletions(-)
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
index b8cc5a99e4c..2d9efa6e0cb 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
@@ -25,6 +25,7 @@ import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
@@ -274,4 +275,25 @@ public class FutureUtil {
return new CompletionException(throwable);
}
}
+
+ /**
+ * Executes an operation using the supplied {@link Executor}
+ * and notify failures on the supplied {@link CompletableFuture}.
+ *
+ * @param runnable the runnable to execute
+ * @param executor the executor to use for executing the runnable
+ * @param completableFuture the future to complete in case of exceptions
+ * @return
+ */
+
+ public static void safeRunAsync(Runnable runnable,
+ Executor executor,
+ CompletableFuture completableFuture) {
+ CompletableFuture
+ .runAsync(runnable, executor)
+ .exceptionally((throwable) -> {
+ completableFuture.completeExceptionally(throwable);
+ return null;
+ });
+ }
}
diff --git
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
index 6c88d27cc22..273a01850cb 100644
---
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
+++
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
@@ -107,8 +107,7 @@ public class MLTransactionMetadataStore
.CoordinatorNotFoundException("transaction metadata store
with tcId "
+ tcID.toString() + " change state to Initializing
error when init it"));
} else {
- internalPinnedExecutor.execute(() ->
transactionLog.replayAsync(new TransactionLogReplayCallback() {
-
+ FutureUtil.safeRunAsync(() -> transactionLog.replayAsync(new
TransactionLogReplayCallback() {
@Override
public void replayComplete() {
recoverTracker.appendOpenTransactionToTimeoutTracker();
@@ -195,7 +194,7 @@ public class MLTransactionMetadataStore
log.error(e.getMessage(), e);
}
}
- }));
+ }), internalPinnedExecutor, completableFuture);
}
return completableFuture;
}
@@ -220,7 +219,7 @@ public class MLTransactionMetadataStore
@Override
public CompletableFuture<TxnID> newTransaction(long timeOut) {
CompletableFuture<TxnID> completableFuture = new CompletableFuture<>();
- internalPinnedExecutor.execute(() -> {
+ FutureUtil.safeRunAsync(() -> {
if (!checkIfReady()) {
completableFuture.completeExceptionally(new
CoordinatorException
.TransactionMetadataStoreStateException(tcID,
State.Ready, getState(), "new Transaction"));
@@ -255,18 +254,18 @@ public class MLTransactionMetadataStore
completableFuture.complete(txnID);
}
});
- });
+ }, internalPinnedExecutor, completableFuture);
return completableFuture;
}
@Override
public CompletableFuture<Void> addProducedPartitionToTxn(TxnID txnID,
List<String> partitions) {
CompletableFuture<Void> promise = new CompletableFuture<>();
- internalPinnedExecutor.execute(() -> {
+ FutureUtil.safeRunAsync(() -> {
if (!checkIfReady()) {
promise
.completeExceptionally(new
CoordinatorException.TransactionMetadataStoreStateException(tcID,
- State.Ready, getState(), "add produced partition"));
+ State.Ready, getState(), "add produced
partition"));
return;
}
getTxnPositionPair(txnID).thenCompose(txnMetaListPair -> {
@@ -299,7 +298,7 @@ public class MLTransactionMetadataStore
promise.completeExceptionally(ex);
return null;
});
- });
+ }, internalPinnedExecutor, promise);
return promise;
}
@@ -307,7 +306,7 @@ public class MLTransactionMetadataStore
public CompletableFuture<Void> addAckedPartitionToTxn(TxnID txnID,
List<TransactionSubscription> txnSubscriptions) {
CompletableFuture<Void> promise = new CompletableFuture<>();
- internalPinnedExecutor.execute(() -> {
+ FutureUtil.safeRunAsync(() -> {
if (!checkIfReady()) {
promise.completeExceptionally(new CoordinatorException
.TransactionMetadataStoreStateException(tcID,
State.Ready, getState(), "add acked partition"));
@@ -343,7 +342,7 @@ public class MLTransactionMetadataStore
promise.completeExceptionally(ex);
return null;
});
- });
+ }, internalPinnedExecutor, promise);
return promise;
}
@@ -351,7 +350,7 @@ public class MLTransactionMetadataStore
public CompletableFuture<Void> updateTxnStatus(TxnID txnID, TxnStatus
newStatus,
TxnStatus
expectedStatus, boolean isTimeout) {
CompletableFuture<Void> promise = new CompletableFuture<>();
- internalPinnedExecutor.execute(() -> {
+ FutureUtil.safeRunAsync(() -> {
if (!checkIfReady()) {
promise.completeExceptionally(new CoordinatorException
.TransactionMetadataStoreStateException(tcID,
@@ -412,7 +411,7 @@ public class MLTransactionMetadataStore
promise.completeExceptionally(ex);
return null;
});
- });
+ }, internalPinnedExecutor, promise);
return promise;
}