This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new b29e3110bd Fate and FateExecutor improvements (#5817)
b29e3110bd is described below

commit b29e3110bd4bbfc3384aebd9a3552490df634a3e
Author: Dave Marion <dlmar...@apache.org>
AuthorDate: Mon Aug 25 15:25:45 2025 -0400

    Fate and FateExecutor improvements (#5817)
    
    Modifications to Fate and FateExecutor to improve
    correctness.
    
    1. Modified Fate and removed the `startFateExecutors`
        method which may have created additional FateExecutor
        instances, but may not have added them to the
        `fateExecutors` set if the `FatePoolsWatcher` thread
        added them first.
    
    2. Modified `FateExecutor.resizeFateExecutor` to add
        the TransactionRunner instances to the `runningTxRunners`
        when they are created, not when they are called, so
        that the shutdown code has a complete view of all
        TransactionRunner instances.
    
    3. Added additional logging
    
    4. Added check in UserFateStore constructor that the
        transaction table exists.
    
    
    Co-authored-by: Keith Turner <ktur...@apache.org>
    Co-authored-by: Kevin Rathbun <kevinrr...@gmail.com>
---
 .../java/org/apache/accumulo/core/fate/Fate.java   | 47 ++++++++++------------
 .../apache/accumulo/core/fate/FateExecutor.java    | 41 ++++++++++---------
 .../accumulo/core/fate/user/UserFateStore.java     |  2 +
 .../accumulo/test/fate/FatePoolsWatcherITBase.java |  6 +--
 .../org/apache/accumulo/test/fate/FlakyFate.java   |  5 ---
 5 files changed, 48 insertions(+), 53 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java 
b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
index 4149bc8539..42b30d88b7 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
@@ -83,7 +83,8 @@ public class Fate<T> {
   private static final Duration POOL_WATCHER_DELAY = Duration.ofSeconds(30);
 
   private final AtomicBoolean keepRunning = new AtomicBoolean(true);
-  private final Set<FateExecutor<T>> fateExecutors = new HashSet<>();
+  // Visible for FlakyFate test object
+  protected final Set<FateExecutor<T>> fateExecutors = new HashSet<>();
 
   public enum TxInfo {
     FATE_OP, AUTO_CLEAN, EXCEPTION, TX_AGEOFF, RETURN_VALUE
@@ -170,8 +171,9 @@ public class Fate<T> {
     public void run() {
       // Read from the config here and here only. Must avoid reading the same 
property from the
       // config more than once since it can change at any point in this 
execution
-      var poolConfigs = getPoolConfigurations(conf);
-      var idleCheckIntervalMillis = 
conf.getTimeInMillis(Property.MANAGER_FATE_IDLE_CHECK_INTERVAL);
+      final var poolConfigs = getPoolConfigurations(conf);
+      final var idleCheckIntervalMillis =
+          conf.getTimeInMillis(Property.MANAGER_FATE_IDLE_CHECK_INTERVAL);
 
       // shutdown task: shutdown fate executors whose set of fate operations 
are no longer present
       // in the config
@@ -183,15 +185,17 @@ public class Fate<T> {
           // if this fate executors set of fate ops is no longer present in 
the config...
           if (!poolConfigs.containsKey(fateExecutor.getFateOps())) {
             if (!fateExecutor.isShutdown()) {
-              log.debug("The config for {} has changed invalidating {}. 
Gracefully shutting down "
-                  + "the FateExecutor.", getFateConfigProp(), fateExecutor);
+              log.debug(
+                  "[{}] The config for {} has changed invalidating {}. 
Gracefully shutting down "
+                      + "the FateExecutor.",
+                  store.type(), getFateConfigProp(), fateExecutor);
               fateExecutor.initiateShutdown();
             } else if (fateExecutor.isShutdown() && fateExecutor.isAlive()) {
-              log.debug("{} has been shutdown, but is still actively working 
on transactions.",
-                  fateExecutor);
+              log.debug("[{}] {} has been shutdown, but is still actively 
working on transactions.",
+                  store.type(), fateExecutor);
             } else if (fateExecutor.isShutdown() && !fateExecutor.isAlive()) {
-              log.debug("{} has been shutdown and all threads have safely 
terminated.",
-                  fateExecutor);
+              log.debug("[{}] {} has been shutdown and all threads have safely 
terminated.",
+                  store.type(), fateExecutor);
               fateExecutorsIter.remove();
             }
           }
@@ -207,6 +211,7 @@ public class Fate<T> {
         synchronized (fateExecutors) {
           if (fateExecutors.stream().map(FateExecutor::getFateOps)
               .noneMatch(fo -> fo.equals(configFateOps))) {
+            log.debug("[{}] Adding FateExecutor for {}", store.type(), 
configFateOps);
             fateExecutors
                 .add(new FateExecutor<>(Fate.this, environment, configFateOps, 
configPoolSize));
           }
@@ -268,16 +273,6 @@ public class Fate<T> {
     }
     this.deadResCleanerExecutor = deadResCleanerExecutor;
 
-    startFateExecutors(environment, conf, fateExecutors);
-  }
-
-  protected void startFateExecutors(T environment, AccumuloConfiguration conf,
-      Set<FateExecutor<T>> fateExecutors) {
-    for (var poolConf : getPoolConfigurations(conf).entrySet()) {
-      // no fate threads are running at this point; fine not to synchronize
-      fateExecutors
-          .add(new FateExecutor<>(this, environment, poolConf.getKey(), 
poolConf.getValue()));
-    }
   }
 
   /**
@@ -382,7 +377,7 @@ public class Fate<T> {
   // multiple times for a transaction... but it will only seed once
   public void seedTransaction(FateOperation fateOp, FateId fateId, Repo<T> 
repo,
       boolean autoCleanUp, String goalMessage) {
-    log.info("Seeding {} {}", fateId, goalMessage);
+    log.info("[{}] Seeding {} {} {}", store.type(), fateOp, fateId, 
goalMessage);
     store.seedTransaction(fateOp, fateId, repo, autoCleanUp);
   }
 
@@ -405,16 +400,18 @@ public class Fate<T> {
         var txStore = optionalTxStore.orElseThrow();
         try {
           TStatus status = txStore.getStatus();
-          log.info("status is: {}", status);
+          log.info("[{}] status is: {}", store.type(), status);
           if (status == NEW || status == SUBMITTED) {
             txStore.setTransactionInfo(TxInfo.EXCEPTION, new 
TApplicationException(
                 TApplicationException.INTERNAL_ERROR, "Fate transaction 
cancelled by user"));
             txStore.setStatus(FAILED_IN_PROGRESS);
-            log.info("Updated status for {} to FAILED_IN_PROGRESS because it 
was cancelled by user",
-                fateId);
+            log.info(
+                "[{}] Updated status for {} to FAILED_IN_PROGRESS because it 
was cancelled by user",
+                store.type(), fateId);
             return true;
           } else {
-            log.info("{} cancelled by user but already in progress or finished 
state", fateId);
+            log.info("[{}] {} cancelled by user but already in progress or 
finished state",
+                store.type(), fateId);
             return false;
           }
         } finally {
@@ -425,7 +422,7 @@ public class Fate<T> {
         UtilWaitThread.sleep(500);
       }
     }
-    log.info("Unable to reserve transaction {} to cancel it", fateId);
+    log.info("[{}] Unable to reserve transaction {} to cancel it", 
store.type(), fateId);
     return false;
   }
 
diff --git a/core/src/main/java/org/apache/accumulo/core/fate/FateExecutor.java 
b/core/src/main/java/org/apache/accumulo/core/fate/FateExecutor.java
index 5ce288d3e2..56a6c17f44 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/FateExecutor.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/FateExecutor.java
@@ -22,7 +22,6 @@ import static 
com.google.common.util.concurrent.Uninterruptibles.sleepUninterrup
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.MINUTES;
 import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus;
 import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.FAILED;
 import static 
org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.FAILED_IN_PROGRESS;
 import static 
org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.IN_PROGRESS;
@@ -49,6 +48,7 @@ import java.util.stream.Collectors;
 import 
org.apache.accumulo.core.clientImpl.AcceptableThriftTableOperationException;
 import org.apache.accumulo.core.fate.Fate.TxInfo;
 import org.apache.accumulo.core.fate.FateStore.FateTxStore;
+import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus;
 import org.apache.accumulo.core.util.ShutdownUtil;
 import org.apache.accumulo.core.util.Timer;
 import org.apache.accumulo.core.util.threads.ThreadPoolNames;
@@ -57,7 +57,6 @@ import org.apache.accumulo.core.util.threads.Threads;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 
 /**
@@ -107,30 +106,23 @@ public class FateExecutor<T> {
    */
   protected void resizeFateExecutor(Map<Set<Fate.FateOperation>,Integer> 
poolConfigs,
       long idleCheckIntervalMillis) {
-    final var pool = transactionExecutor;
     final int configured = poolConfigs.get(fateOps);
-    ThreadPools.resizePool(pool, () -> configured, poolName);
+    ThreadPools.resizePool(transactionExecutor, () -> configured, poolName);
     synchronized (runningTxRunners) {
       final int running = runningTxRunners.size();
       final int needed = configured - running;
       log.trace("resizing pools configured:{} running:{} needed:{} 
fateOps:{}", configured, running,
           needed, fateOps);
-
       if (needed > 0) {
         // If the pool grew, then ensure that there is a TransactionRunner for 
each thread
         for (int i = 0; i < needed; i++) {
+          final TransactionRunner tr = new TransactionRunner();
           try {
-            pool.execute(new TransactionRunner());
+            runningTxRunners.add(tr);
+            transactionExecutor.execute(tr);
           } catch (RejectedExecutionException e) {
-            // RejectedExecutionException could be shutting down
-            if (pool.isShutdown()) {
-              // The exception is expected in this case, no need to spam the 
logs.
-              log.trace("Expected error adding transaction runner to FaTE 
executor pool. "
-                  + "The pool is shutdown.", e);
-            } else {
-              // This is bad, FaTE may no longer work!
-              log.error("Unexpected error adding transaction runner to FaTE 
executor pool.", e);
-            }
+            runningTxRunners.remove(tr);
+            log.error("Unexpected error adding transaction runner to FaTE 
executor pool.", e);
             break;
           }
         }
@@ -203,7 +195,6 @@ public class FateExecutor<T> {
   /**
    * @return the number of currently running transaction runners
    */
-  @VisibleForTesting
   protected int getNumRunningTxRunners() {
     return runningTxRunners.size();
   }
@@ -343,6 +334,7 @@ public class FateExecutor<T> {
   }
 
   protected class TransactionRunner implements Runnable {
+
     // used to signal a TransactionRunner to stop in the case where there are 
too many running
     // i.e.,
     // 1. the property for the pool size decreased so we have to stop excess 
TransactionRunners
@@ -350,7 +342,6 @@ public class FateExecutor<T> {
     // 2. this FateExecutor is no longer valid from config changes so we need 
to shutdown this
     // FateExecutor
     private final AtomicBoolean stop = new AtomicBoolean(false);
-
     private volatile Long threadId = null;
 
     private Optional<FateTxStore<T>> reserveFateTx() throws 
InterruptedException {
@@ -378,12 +369,11 @@ public class FateExecutor<T> {
 
     @Override
     public void run() {
-      runningTxRunners.add(this);
       runnerLog.trace("A TransactionRunner is starting for {} {} ", 
fate.getStore().type(),
           fateOps);
       threadId = Thread.currentThread().getId();
       try {
-        while (fate.getKeepRunning().get() && !stop.get()) {
+        while (fate.getKeepRunning().get() && !isShutdown() && !stop.get()) {
           FateTxStore<T> txStore = null;
           ExecutionState state = new ExecutionState();
           try {
@@ -395,6 +385,8 @@ public class FateExecutor<T> {
             }
             state.status = txStore.getStatus();
             state.op = txStore.top();
+            runnerLog.trace("Processing FATE transaction {} id: {} status: 
{}", state.op.getName(),
+                txStore.getID(), state.status);
             if (state.status == FAILED_IN_PROGRESS) {
               processFailed(txStore, state.op);
             } else if (state.status == SUBMITTED || state.status == 
IN_PROGRESS) {
@@ -426,9 +418,18 @@ public class FateExecutor<T> {
               }
             }
           } catch (Exception e) {
-            runnerLog.error("Uncaught exception in FATE runner thread.", e);
+            String name = state.op == null ? null : state.op.getName();
+            FateId txid = txStore == null ? null : txStore.getID();
+            runnerLog.error(
+                "Uncaught exception in FATE runner thread processing {} id: {} 
status: {}", name,
+                txid, state.status, e);
           } finally {
             if (txStore != null) {
+              if (runnerLog.isTraceEnabled()) {
+                String name = state.op == null ? null : state.op.getName();
+                runnerLog.trace("Completed FATE transaction {} id: {} status: 
{}", name,
+                    txStore.getID(), state.status);
+              }
               txStore.unreserve(Duration.ofMillis(state.deferTime));
             }
           }
diff --git 
a/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java 
b/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java
index f38c50a2e9..5f71550a32 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java
@@ -112,6 +112,8 @@ public class UserFateStore<T> extends AbstractFateStore<T> {
     super(lockID, isLockHeld, maxDeferred, fateIdGenerator);
     this.context = Objects.requireNonNull(context);
     this.tableName = Objects.requireNonNull(tableName);
+    
Preconditions.checkArgument(this.context.tableOperations().exists(tableName),
+        "user fate store table " + tableName + " does not exist.");
     this.writer = Suppliers.memoize(() -> {
       try {
         return createConditionalWriterForFateTable(this.tableName);
diff --git 
a/test/src/main/java/org/apache/accumulo/test/fate/FatePoolsWatcherITBase.java 
b/test/src/main/java/org/apache/accumulo/test/fate/FatePoolsWatcherITBase.java
index 9a562a7c76..66b80d41dd 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/fate/FatePoolsWatcherITBase.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/fate/FatePoolsWatcherITBase.java
@@ -19,7 +19,6 @@
 package org.apache.accumulo.test.fate;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.util.Map;
 import java.util.Set;
@@ -716,8 +715,9 @@ public abstract class FatePoolsWatcherITBase extends 
SharedMiniClusterBase
     @Override
     public long isReady(FateId fateId, PoolResizeTestEnv environment) throws 
Exception {
       environment.numWorkers.incrementAndGet();
-      assertTrue(environment.isReadyLatch.await(2, TimeUnit.MINUTES),
-          "Timed out waiting for isReady latch");
+      if (!environment.isReadyLatch.await(2, TimeUnit.MINUTES)) {
+        throw new IllegalStateException("Timed out waiting for env latch to be 
ready.");
+      }
       return 0;
     }
 
diff --git a/test/src/main/java/org/apache/accumulo/test/fate/FlakyFate.java 
b/test/src/main/java/org/apache/accumulo/test/fate/FlakyFate.java
index 15e429d581..2e79fc4729 100644
--- a/test/src/main/java/org/apache/accumulo/test/fate/FlakyFate.java
+++ b/test/src/main/java/org/apache/accumulo/test/fate/FlakyFate.java
@@ -39,11 +39,6 @@ public class FlakyFate<T> extends Fate<T> {
   public FlakyFate(T environment, FateStore<T> store, Function<Repo<T>,String> 
toLogStrFunc,
       AccumuloConfiguration conf) {
     super(environment, store, false, toLogStrFunc, conf, new 
ScheduledThreadPoolExecutor(2));
-  }
-
-  @Override
-  protected void startFateExecutors(T environment, AccumuloConfiguration conf,
-      Set<FateExecutor<T>> fateExecutors) {
     for (var poolConfig : getPoolConfigurations(conf).entrySet()) {
       fateExecutors.add(
           new FlakyFateExecutor<>(this, environment, poolConfig.getKey(), 
poolConfig.getValue()));

Reply via email to