This is an automated email from the ASF dual-hosted git repository.

nicoloboschi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new cfd7e60497a [fix][txn] Catch and log runtime exceptions in async 
operations (#19258)
cfd7e60497a is described below

commit cfd7e60497afd0f050889641997fef17e460dd47
Author: Nicolò Boschi <[email protected]>
AuthorDate: Wed Jan 18 11:54:48 2023 +0100

    [fix][txn] Catch and log runtime exceptions in async operations (#19258)
---
 .../org/apache/pulsar/common/util/FutureUtil.java  | 22 +++++
 .../impl/MLTransactionMetadataStore.java           | 99 +++++++++++-----------
 .../coordinator/impl/TxnLogBufferedWriter.java     | 73 ++++++++--------
 3 files changed, 106 insertions(+), 88 deletions(-)

diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
index 19e3a62cc92..162ef1e52ff 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
@@ -26,6 +26,7 @@ import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
@@ -258,4 +259,25 @@ public class FutureUtil {
             return new CompletionException(throwable);
         }
     }
+
+    /**
+     * Executes an operation using the supplied {@link Executor}
+     * and notify failures on the supplied {@link CompletableFuture}.
+     *
+     * @param runnable the runnable to execute
+     * @param executor  the executor to use for executing the runnable
+     * @param completableFuture  the future to complete in case of exceptions
+     * @return
+     */
+
+    public static void safeRunAsync(Runnable runnable,
+                                    Executor executor,
+                                    CompletableFuture completableFuture) {
+        CompletableFuture
+                .runAsync(runnable, executor)
+                .exceptionally((throwable) -> {
+                    completableFuture.completeExceptionally(throwable);
+                    return null;
+                });
+    }
 }
diff --git 
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
 
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
index 53a515ff991..aa6afcee3ae 100644
--- 
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
+++ 
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
@@ -115,7 +115,7 @@ public class MLTransactionMetadataStore
                             + tcID.toString() + " change state to Initializing 
error when init it"));
         } else {
             recoverTime.setRecoverStartTime(System.currentTimeMillis());
-            internalPinnedExecutor.execute(() -> 
transactionLog.replayAsync(new TransactionLogReplayCallback() {
+            FutureUtil.safeRunAsync(() -> transactionLog.replayAsync(new 
TransactionLogReplayCallback() {
                 @Override
                 public void replayComplete() {
                     recoverTracker.appendOpenTransactionToTimeoutTracker();
@@ -203,7 +203,7 @@ public class MLTransactionMetadataStore
                         log.error(e.getMessage(), e);
                     }
                 }
-            }));
+            }), internalPinnedExecutor, completableFuture);
         }
         return completableFuture;
     }
@@ -227,60 +227,59 @@ public class MLTransactionMetadataStore
 
     @Override
     public CompletableFuture<TxnID> newTransaction(long timeOut) {
-        if (this.maxActiveTransactionsPerCoordinator == 0
-                || this.maxActiveTransactionsPerCoordinator > 
txnMetaMap.size()) {
-            CompletableFuture<TxnID> completableFuture = new 
CompletableFuture<>();
-            internalPinnedExecutor.execute(() -> {
-                if (!checkIfReady()) {
-                    completableFuture.completeExceptionally(new 
CoordinatorException
-                            .TransactionMetadataStoreStateException(tcID, 
State.Ready, getState(), "new Transaction"));
-                    return;
-                }
-
-                long mostSigBits = tcID.getId();
-                long leastSigBits = sequenceIdGenerator.generateSequenceId();
-                TxnID txnID = new TxnID(mostSigBits, leastSigBits);
-                long currentTimeMillis = System.currentTimeMillis();
-                TransactionMetadataEntry transactionMetadataEntry = new 
TransactionMetadataEntry()
-                        .setTxnidMostBits(mostSigBits)
-                        .setTxnidLeastBits(leastSigBits)
-                        .setStartTime(currentTimeMillis)
-                        .setTimeoutMs(timeOut)
-                        
.setMetadataOp(TransactionMetadataEntry.TransactionMetadataOp.NEW)
-                        .setLastModificationTime(currentTimeMillis)
-                        
.setMaxLocalTxnId(sequenceIdGenerator.getCurrentSequenceId());
-                transactionLog.append(transactionMetadataEntry)
-                        .whenComplete((position, throwable) -> {
-                            if (throwable != null) {
-                                
completableFuture.completeExceptionally(throwable);
-                            } else {
-                                appendLogCount.increment();
-                                TxnMeta txn = new TxnMetaImpl(txnID, 
currentTimeMillis, timeOut);
-                                List<Position> positions = new ArrayList<>();
-                                positions.add(position);
-                                Pair<TxnMeta, List<Position>> pair = 
MutablePair.of(txn, positions);
-                                txnMetaMap.put(leastSigBits, pair);
-                                
this.timeoutTracker.addTransaction(leastSigBits, timeOut);
-                                createdTransactionCount.increment();
-                                completableFuture.complete(txnID);
-                            }
-                        });
-            });
-            return completableFuture;
-        } else {
+        if (this.maxActiveTransactionsPerCoordinator != 0
+                && this.maxActiveTransactionsPerCoordinator <= 
txnMetaMap.size()) {
             return FutureUtil.failedFuture(new 
CoordinatorException.ReachMaxActiveTxnException("New txn op "
                     + "reach max active txn! tcId : " + 
getTransactionCoordinatorID().getId()));
         }
+        CompletableFuture<TxnID> completableFuture = new CompletableFuture<>();
+        FutureUtil.safeRunAsync(() -> {
+            if (!checkIfReady()) {
+                completableFuture.completeExceptionally(new 
CoordinatorException
+                        .TransactionMetadataStoreStateException(tcID, 
State.Ready, getState(), "new Transaction"));
+                return;
+            }
+
+            long mostSigBits = tcID.getId();
+            long leastSigBits = sequenceIdGenerator.generateSequenceId();
+            TxnID txnID = new TxnID(mostSigBits, leastSigBits);
+            long currentTimeMillis = System.currentTimeMillis();
+            TransactionMetadataEntry transactionMetadataEntry = new 
TransactionMetadataEntry()
+                    .setTxnidMostBits(mostSigBits)
+                    .setTxnidLeastBits(leastSigBits)
+                    .setStartTime(currentTimeMillis)
+                    .setTimeoutMs(timeOut)
+                    
.setMetadataOp(TransactionMetadataEntry.TransactionMetadataOp.NEW)
+                    .setLastModificationTime(currentTimeMillis)
+                    
.setMaxLocalTxnId(sequenceIdGenerator.getCurrentSequenceId());
+            transactionLog.append(transactionMetadataEntry)
+                    .whenComplete((position, throwable) -> {
+                        if (throwable != null) {
+                            completableFuture.completeExceptionally(throwable);
+                        } else {
+                            appendLogCount.increment();
+                            TxnMeta txn = new TxnMetaImpl(txnID, 
currentTimeMillis, timeOut);
+                            List<Position> positions = new ArrayList<>();
+                            positions.add(position);
+                            Pair<TxnMeta, List<Position>> pair = 
MutablePair.of(txn, positions);
+                            txnMetaMap.put(leastSigBits, pair);
+                            this.timeoutTracker.addTransaction(leastSigBits, 
timeOut);
+                            createdTransactionCount.increment();
+                            completableFuture.complete(txnID);
+                        }
+                    });
+        }, internalPinnedExecutor, completableFuture);
+        return completableFuture;
     }
 
     @Override
     public CompletableFuture<Void> addProducedPartitionToTxn(TxnID txnID, 
List<String> partitions) {
         CompletableFuture<Void> promise = new CompletableFuture<>();
-        internalPinnedExecutor.execute(() -> {
+        FutureUtil.safeRunAsync(() -> {
             if (!checkIfReady()) {
                 promise
                         .completeExceptionally(new 
CoordinatorException.TransactionMetadataStoreStateException(tcID,
-                        State.Ready, getState(), "add produced partition"));
+                                State.Ready, getState(), "add produced 
partition"));
                 return;
             }
             getTxnPositionPair(txnID).thenCompose(txnMetaListPair -> {
@@ -313,7 +312,7 @@ public class MLTransactionMetadataStore
                 promise.completeExceptionally(ex);
                 return null;
             });
-        });
+        }, internalPinnedExecutor, promise);
         return promise;
     }
 
@@ -321,7 +320,7 @@ public class MLTransactionMetadataStore
     public CompletableFuture<Void> addAckedPartitionToTxn(TxnID txnID,
                                                           
List<TransactionSubscription> txnSubscriptions) {
         CompletableFuture<Void> promise = new CompletableFuture<>();
-        internalPinnedExecutor.execute(() -> {
+        FutureUtil.safeRunAsync(() -> {
             if (!checkIfReady()) {
                 promise.completeExceptionally(new CoordinatorException
                         .TransactionMetadataStoreStateException(tcID, 
State.Ready, getState(), "add acked partition"));
@@ -357,7 +356,7 @@ public class MLTransactionMetadataStore
                 promise.completeExceptionally(ex);
                 return null;
             });
-        });
+        }, internalPinnedExecutor, promise);
         return promise;
     }
 
@@ -365,7 +364,7 @@ public class MLTransactionMetadataStore
     public CompletableFuture<Void> updateTxnStatus(TxnID txnID, TxnStatus 
newStatus,
                                                                 TxnStatus 
expectedStatus, boolean isTimeout) {
         CompletableFuture<Void> promise = new CompletableFuture<>();
-        internalPinnedExecutor.execute(() -> {
+        FutureUtil.safeRunAsync(() -> {
             if (!checkIfReady()) {
                 promise.completeExceptionally(new CoordinatorException
                         .TransactionMetadataStoreStateException(tcID,
@@ -426,7 +425,7 @@ public class MLTransactionMetadataStore
                 promise.completeExceptionally(ex);
                 return null;
             });
-        });
+        }, internalPinnedExecutor, promise);
        return promise;
     }
 
diff --git 
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java
 
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java
index a87c040031f..5ad50088ffb 100644
--- 
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java
+++ 
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java
@@ -39,6 +39,7 @@ import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
+import org.apache.pulsar.common.util.FutureUtil;
 
 /***
  * See PIP-160: https://github.com/apache/pulsar/issues/15516.
@@ -214,13 +215,13 @@ public class TxnLogBufferedWriter<T> {
                     AsyncAddArgs.newInstance(callback, ctx, 
System.currentTimeMillis(), byteBuf));
             return;
         }
-        singleThreadExecutorForWrite.execute(() -> {
-            try {
-                internalAsyncAddData(data, callback, ctx);
-            } catch (Exception e){
-                log.warn("Execute 'internalAsyncAddData' fail", e);
-            }
-        });
+        CompletableFuture
+                .runAsync(
+                        () -> internalAsyncAddData(data, callback, ctx), 
singleThreadExecutorForWrite)
+                .exceptionally(e -> {
+                    log.warn("Execute 'internalAsyncAddData' fail", e);
+                    return null;
+                });
     }
 
     /**
@@ -271,21 +272,21 @@ public class TxnLogBufferedWriter<T> {
     }
 
     private void trigFlushByTimingTask(){
-        singleThreadExecutorForWrite.execute(() -> {
-            try {
-                if (flushContext.asyncAddArgsList.isEmpty()) {
-                    return;
-                }
-                
metrics.triggerFlushByByMaxDelay(flushContext.asyncAddArgsList.size(), 
bytesSize,
-                        System.currentTimeMillis() - 
flushContext.asyncAddArgsList.get(0).addedTime);
-                doFlush();
-            } catch (Exception e){
-                log.error("Trig flush by timing task fail.", e);
-            } finally {
-                // Start the next timing task.
-                nextTimingTrigger();
-            }
-        });
+        CompletableFuture
+                .runAsync(() -> {
+                    if (flushContext.asyncAddArgsList.isEmpty()) {
+                        return;
+                    }
+                    
metrics.triggerFlushByByMaxDelay(flushContext.asyncAddArgsList.size(), 
bytesSize,
+                            System.currentTimeMillis() - 
flushContext.asyncAddArgsList.get(0).addedTime);
+                    doFlush();
+                }, singleThreadExecutorForWrite)
+                .whenComplete((ignore, e) -> {
+                    if (e != null) {
+                        log.warn("Execute 'trigFlushByTimingTask' fail", e);
+                    }
+                    nextTimingTrigger();
+                });
     }
 
     /**
@@ -379,24 +380,20 @@ public class TxnLogBufferedWriter<T> {
         }
         CompletableFuture closeFuture = new CompletableFuture();
         // Cancel pending tasks and release resources.
-        singleThreadExecutorForWrite.execute(() -> {
-            try {
-                // If some requests are flushed, BK will trigger these 
callbacks, and the remaining requests in should
-                // fail.
-                failureCallbackByContextAndRecycle(flushContext,
-                        new 
ManagedLedgerException.ManagedLedgerFencedException(
+        FutureUtil.safeRunAsync(() -> {
+            // If some requests are flushed, BK will trigger these callbacks, 
and the remaining requests in should
+            // fail.
+            failureCallbackByContextAndRecycle(flushContext,
+                    new ManagedLedgerException.ManagedLedgerFencedException(
                             new Exception("Transaction log buffered write has 
closed")
-                        ));
-                // Cancel the timing task.
-                if (!timeout.isCancelled()){
-                    this.timeout.cancel();
-                }
-                STATE_UPDATER.set(this, State.CLOSED);
-                closeFuture.complete(null);
-            } catch (Exception e){
-                closeFuture.completeExceptionally(e);
+                    ));
+            // Cancel the timing task.
+            if (!timeout.isCancelled()) {
+                this.timeout.cancel();
             }
-        });
+            STATE_UPDATER.set(this, State.CLOSED);
+            closeFuture.complete(null);
+        }, singleThreadExecutorForWrite, closeFuture);
         return closeFuture;
     }
 

Reply via email to