This is an automated email from the ASF dual-hosted git repository.
nicoloboschi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new cfd7e60497a [fix][txn] Catch and log runtime exceptions in async
operations (#19258)
cfd7e60497a is described below
commit cfd7e60497afd0f050889641997fef17e460dd47
Author: Nicolò Boschi <[email protected]>
AuthorDate: Wed Jan 18 11:54:48 2023 +0100
[fix][txn] Catch and log runtime exceptions in async operations (#19258)
---
.../org/apache/pulsar/common/util/FutureUtil.java | 22 +++++
.../impl/MLTransactionMetadataStore.java | 99 +++++++++++-----------
.../coordinator/impl/TxnLogBufferedWriter.java | 73 ++++++++--------
3 files changed, 106 insertions(+), 88 deletions(-)
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
index 19e3a62cc92..162ef1e52ff 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
@@ -26,6 +26,7 @@ import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
@@ -258,4 +259,25 @@ public class FutureUtil {
return new CompletionException(throwable);
}
}
+
+ /**
+ * Executes an operation using the supplied {@link Executor}
+ * and notify failures on the supplied {@link CompletableFuture}.
+ *
+ * @param runnable the runnable to execute
+ * @param executor the executor to use for executing the runnable
+ * @param completableFuture the future to complete in case of exceptions
+ * @return
+ */
+
+ public static void safeRunAsync(Runnable runnable,
+ Executor executor,
+ CompletableFuture completableFuture) {
+ CompletableFuture
+ .runAsync(runnable, executor)
+ .exceptionally((throwable) -> {
+ completableFuture.completeExceptionally(throwable);
+ return null;
+ });
+ }
}
diff --git
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
index 53a515ff991..aa6afcee3ae 100644
---
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
+++
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
@@ -115,7 +115,7 @@ public class MLTransactionMetadataStore
+ tcID.toString() + " change state to Initializing
error when init it"));
} else {
recoverTime.setRecoverStartTime(System.currentTimeMillis());
- internalPinnedExecutor.execute(() ->
transactionLog.replayAsync(new TransactionLogReplayCallback() {
+ FutureUtil.safeRunAsync(() -> transactionLog.replayAsync(new
TransactionLogReplayCallback() {
@Override
public void replayComplete() {
recoverTracker.appendOpenTransactionToTimeoutTracker();
@@ -203,7 +203,7 @@ public class MLTransactionMetadataStore
log.error(e.getMessage(), e);
}
}
- }));
+ }), internalPinnedExecutor, completableFuture);
}
return completableFuture;
}
@@ -227,60 +227,59 @@ public class MLTransactionMetadataStore
@Override
public CompletableFuture<TxnID> newTransaction(long timeOut) {
- if (this.maxActiveTransactionsPerCoordinator == 0
- || this.maxActiveTransactionsPerCoordinator >
txnMetaMap.size()) {
- CompletableFuture<TxnID> completableFuture = new
CompletableFuture<>();
- internalPinnedExecutor.execute(() -> {
- if (!checkIfReady()) {
- completableFuture.completeExceptionally(new
CoordinatorException
- .TransactionMetadataStoreStateException(tcID,
State.Ready, getState(), "new Transaction"));
- return;
- }
-
- long mostSigBits = tcID.getId();
- long leastSigBits = sequenceIdGenerator.generateSequenceId();
- TxnID txnID = new TxnID(mostSigBits, leastSigBits);
- long currentTimeMillis = System.currentTimeMillis();
- TransactionMetadataEntry transactionMetadataEntry = new
TransactionMetadataEntry()
- .setTxnidMostBits(mostSigBits)
- .setTxnidLeastBits(leastSigBits)
- .setStartTime(currentTimeMillis)
- .setTimeoutMs(timeOut)
-
.setMetadataOp(TransactionMetadataEntry.TransactionMetadataOp.NEW)
- .setLastModificationTime(currentTimeMillis)
-
.setMaxLocalTxnId(sequenceIdGenerator.getCurrentSequenceId());
- transactionLog.append(transactionMetadataEntry)
- .whenComplete((position, throwable) -> {
- if (throwable != null) {
-
completableFuture.completeExceptionally(throwable);
- } else {
- appendLogCount.increment();
- TxnMeta txn = new TxnMetaImpl(txnID,
currentTimeMillis, timeOut);
- List<Position> positions = new ArrayList<>();
- positions.add(position);
- Pair<TxnMeta, List<Position>> pair =
MutablePair.of(txn, positions);
- txnMetaMap.put(leastSigBits, pair);
-
this.timeoutTracker.addTransaction(leastSigBits, timeOut);
- createdTransactionCount.increment();
- completableFuture.complete(txnID);
- }
- });
- });
- return completableFuture;
- } else {
+ if (this.maxActiveTransactionsPerCoordinator != 0
+ && this.maxActiveTransactionsPerCoordinator <=
txnMetaMap.size()) {
return FutureUtil.failedFuture(new
CoordinatorException.ReachMaxActiveTxnException("New txn op "
+ "reach max active txn! tcId : " +
getTransactionCoordinatorID().getId()));
}
+ CompletableFuture<TxnID> completableFuture = new CompletableFuture<>();
+ FutureUtil.safeRunAsync(() -> {
+ if (!checkIfReady()) {
+ completableFuture.completeExceptionally(new
CoordinatorException
+ .TransactionMetadataStoreStateException(tcID,
State.Ready, getState(), "new Transaction"));
+ return;
+ }
+
+ long mostSigBits = tcID.getId();
+ long leastSigBits = sequenceIdGenerator.generateSequenceId();
+ TxnID txnID = new TxnID(mostSigBits, leastSigBits);
+ long currentTimeMillis = System.currentTimeMillis();
+ TransactionMetadataEntry transactionMetadataEntry = new
TransactionMetadataEntry()
+ .setTxnidMostBits(mostSigBits)
+ .setTxnidLeastBits(leastSigBits)
+ .setStartTime(currentTimeMillis)
+ .setTimeoutMs(timeOut)
+
.setMetadataOp(TransactionMetadataEntry.TransactionMetadataOp.NEW)
+ .setLastModificationTime(currentTimeMillis)
+
.setMaxLocalTxnId(sequenceIdGenerator.getCurrentSequenceId());
+ transactionLog.append(transactionMetadataEntry)
+ .whenComplete((position, throwable) -> {
+ if (throwable != null) {
+ completableFuture.completeExceptionally(throwable);
+ } else {
+ appendLogCount.increment();
+ TxnMeta txn = new TxnMetaImpl(txnID,
currentTimeMillis, timeOut);
+ List<Position> positions = new ArrayList<>();
+ positions.add(position);
+ Pair<TxnMeta, List<Position>> pair =
MutablePair.of(txn, positions);
+ txnMetaMap.put(leastSigBits, pair);
+ this.timeoutTracker.addTransaction(leastSigBits,
timeOut);
+ createdTransactionCount.increment();
+ completableFuture.complete(txnID);
+ }
+ });
+ }, internalPinnedExecutor, completableFuture);
+ return completableFuture;
}
@Override
public CompletableFuture<Void> addProducedPartitionToTxn(TxnID txnID,
List<String> partitions) {
CompletableFuture<Void> promise = new CompletableFuture<>();
- internalPinnedExecutor.execute(() -> {
+ FutureUtil.safeRunAsync(() -> {
if (!checkIfReady()) {
promise
.completeExceptionally(new
CoordinatorException.TransactionMetadataStoreStateException(tcID,
- State.Ready, getState(), "add produced partition"));
+ State.Ready, getState(), "add produced
partition"));
return;
}
getTxnPositionPair(txnID).thenCompose(txnMetaListPair -> {
@@ -313,7 +312,7 @@ public class MLTransactionMetadataStore
promise.completeExceptionally(ex);
return null;
});
- });
+ }, internalPinnedExecutor, promise);
return promise;
}
@@ -321,7 +320,7 @@ public class MLTransactionMetadataStore
public CompletableFuture<Void> addAckedPartitionToTxn(TxnID txnID,
List<TransactionSubscription> txnSubscriptions) {
CompletableFuture<Void> promise = new CompletableFuture<>();
- internalPinnedExecutor.execute(() -> {
+ FutureUtil.safeRunAsync(() -> {
if (!checkIfReady()) {
promise.completeExceptionally(new CoordinatorException
.TransactionMetadataStoreStateException(tcID,
State.Ready, getState(), "add acked partition"));
@@ -357,7 +356,7 @@ public class MLTransactionMetadataStore
promise.completeExceptionally(ex);
return null;
});
- });
+ }, internalPinnedExecutor, promise);
return promise;
}
@@ -365,7 +364,7 @@ public class MLTransactionMetadataStore
public CompletableFuture<Void> updateTxnStatus(TxnID txnID, TxnStatus
newStatus,
TxnStatus
expectedStatus, boolean isTimeout) {
CompletableFuture<Void> promise = new CompletableFuture<>();
- internalPinnedExecutor.execute(() -> {
+ FutureUtil.safeRunAsync(() -> {
if (!checkIfReady()) {
promise.completeExceptionally(new CoordinatorException
.TransactionMetadataStoreStateException(tcID,
@@ -426,7 +425,7 @@ public class MLTransactionMetadataStore
promise.completeExceptionally(ex);
return null;
});
- });
+ }, internalPinnedExecutor, promise);
return promise;
}
diff --git
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java
index a87c040031f..5ad50088ffb 100644
---
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java
+++
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java
@@ -39,6 +39,7 @@ import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
+import org.apache.pulsar.common.util.FutureUtil;
/***
* See PIP-160: https://github.com/apache/pulsar/issues/15516.
@@ -214,13 +215,13 @@ public class TxnLogBufferedWriter<T> {
AsyncAddArgs.newInstance(callback, ctx,
System.currentTimeMillis(), byteBuf));
return;
}
- singleThreadExecutorForWrite.execute(() -> {
- try {
- internalAsyncAddData(data, callback, ctx);
- } catch (Exception e){
- log.warn("Execute 'internalAsyncAddData' fail", e);
- }
- });
+ CompletableFuture
+ .runAsync(
+ () -> internalAsyncAddData(data, callback, ctx),
singleThreadExecutorForWrite)
+ .exceptionally(e -> {
+ log.warn("Execute 'internalAsyncAddData' fail", e);
+ return null;
+ });
}
/**
@@ -271,21 +272,21 @@ public class TxnLogBufferedWriter<T> {
}
private void trigFlushByTimingTask(){
- singleThreadExecutorForWrite.execute(() -> {
- try {
- if (flushContext.asyncAddArgsList.isEmpty()) {
- return;
- }
-
metrics.triggerFlushByByMaxDelay(flushContext.asyncAddArgsList.size(),
bytesSize,
- System.currentTimeMillis() -
flushContext.asyncAddArgsList.get(0).addedTime);
- doFlush();
- } catch (Exception e){
- log.error("Trig flush by timing task fail.", e);
- } finally {
- // Start the next timing task.
- nextTimingTrigger();
- }
- });
+ CompletableFuture
+ .runAsync(() -> {
+ if (flushContext.asyncAddArgsList.isEmpty()) {
+ return;
+ }
+
metrics.triggerFlushByByMaxDelay(flushContext.asyncAddArgsList.size(),
bytesSize,
+ System.currentTimeMillis() -
flushContext.asyncAddArgsList.get(0).addedTime);
+ doFlush();
+ }, singleThreadExecutorForWrite)
+ .whenComplete((ignore, e) -> {
+ if (e != null) {
+ log.warn("Execute 'trigFlushByTimingTask' fail", e);
+ }
+ nextTimingTrigger();
+ });
}
/**
@@ -379,24 +380,20 @@ public class TxnLogBufferedWriter<T> {
}
CompletableFuture closeFuture = new CompletableFuture();
// Cancel pending tasks and release resources.
- singleThreadExecutorForWrite.execute(() -> {
- try {
- // If some requests are flushed, BK will trigger these
callbacks, and the remaining requests in should
- // fail.
- failureCallbackByContextAndRecycle(flushContext,
- new
ManagedLedgerException.ManagedLedgerFencedException(
+ FutureUtil.safeRunAsync(() -> {
+ // If some requests are flushed, BK will trigger these callbacks,
and the remaining requests in should
+ // fail.
+ failureCallbackByContextAndRecycle(flushContext,
+ new ManagedLedgerException.ManagedLedgerFencedException(
new Exception("Transaction log buffered write has
closed")
- ));
- // Cancel the timing task.
- if (!timeout.isCancelled()){
- this.timeout.cancel();
- }
- STATE_UPDATER.set(this, State.CLOSED);
- closeFuture.complete(null);
- } catch (Exception e){
- closeFuture.completeExceptionally(e);
+ ));
+ // Cancel the timing task.
+ if (!timeout.isCancelled()) {
+ this.timeout.cancel();
}
- });
+ STATE_UPDATER.set(this, State.CLOSED);
+ closeFuture.complete(null);
+ }, singleThreadExecutorForWrite, closeFuture);
return closeFuture;
}