This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push: new b29e3110bd Fate and FateExecutor improvements (#5817) b29e3110bd is described below commit b29e3110bd4bbfc3384aebd9a3552490df634a3e Author: Dave Marion <dlmar...@apache.org> AuthorDate: Mon Aug 25 15:25:45 2025 -0400 Fate and FateExecutor improvements (#5817) Modifications to Fate and FateExecutor to improve correctness. 1. Modified Fate and removed the `startFateExecutors` method which may have created additional FateExecutor instances, but may not have added them to the `fateExecutors` set if the `FatePoolsWatcher` thread added them first. 2. Modified `FateExecutor.resizeFateExecutor` to add the TransactionRunner instances to the `runningTxRunners` when they are created, not when they are called, so that the shutdown code has a complete view of all TransactionRunner instances. 3. Added additional logging 4. Added check in UserFateStore constructor that the transaction table exists. Co-authored-by: Keith Turner <ktur...@apache.org> Co-authored-by: Kevin Rathbun <kevinrr...@gmail.com> --- .../java/org/apache/accumulo/core/fate/Fate.java | 47 ++++++++++------------ .../apache/accumulo/core/fate/FateExecutor.java | 41 ++++++++++--------- .../accumulo/core/fate/user/UserFateStore.java | 2 + .../accumulo/test/fate/FatePoolsWatcherITBase.java | 6 +-- .../org/apache/accumulo/test/fate/FlakyFate.java | 5 --- 5 files changed, 48 insertions(+), 53 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java index 4149bc8539..42b30d88b7 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java @@ -83,7 +83,8 @@ public class Fate<T> { private static final Duration POOL_WATCHER_DELAY = Duration.ofSeconds(30); private final AtomicBoolean keepRunning = new AtomicBoolean(true); - private final Set<FateExecutor<T>> fateExecutors = new HashSet<>(); + // Visible for FlakyFate test object + protected final Set<FateExecutor<T>> fateExecutors = new HashSet<>(); public enum TxInfo { FATE_OP, AUTO_CLEAN, EXCEPTION, TX_AGEOFF, RETURN_VALUE @@ -170,8 +171,9 @@ public class Fate<T> { public void run() { // Read from the config here and here only. Must avoid reading the same property from the // config more than once since it can change at any point in this execution - var poolConfigs = getPoolConfigurations(conf); - var idleCheckIntervalMillis = conf.getTimeInMillis(Property.MANAGER_FATE_IDLE_CHECK_INTERVAL); + final var poolConfigs = getPoolConfigurations(conf); + final var idleCheckIntervalMillis = + conf.getTimeInMillis(Property.MANAGER_FATE_IDLE_CHECK_INTERVAL); // shutdown task: shutdown fate executors whose set of fate operations are no longer present // in the config @@ -183,15 +185,17 @@ public class Fate<T> { // if this fate executors set of fate ops is no longer present in the config... if (!poolConfigs.containsKey(fateExecutor.getFateOps())) { if (!fateExecutor.isShutdown()) { - log.debug("The config for {} has changed invalidating {}. Gracefully shutting down " - + "the FateExecutor.", getFateConfigProp(), fateExecutor); + log.debug( + "[{}] The config for {} has changed invalidating {}. Gracefully shutting down " + + "the FateExecutor.", + store.type(), getFateConfigProp(), fateExecutor); fateExecutor.initiateShutdown(); } else if (fateExecutor.isShutdown() && fateExecutor.isAlive()) { - log.debug("{} has been shutdown, but is still actively working on transactions.", - fateExecutor); + log.debug("[{}] {} has been shutdown, but is still actively working on transactions.", + store.type(), fateExecutor); } else if (fateExecutor.isShutdown() && !fateExecutor.isAlive()) { - log.debug("{} has been shutdown and all threads have safely terminated.", - fateExecutor); + log.debug("[{}] {} has been shutdown and all threads have safely terminated.", + store.type(), fateExecutor); fateExecutorsIter.remove(); } } @@ -207,6 +211,7 @@ public class Fate<T> { synchronized (fateExecutors) { if (fateExecutors.stream().map(FateExecutor::getFateOps) .noneMatch(fo -> fo.equals(configFateOps))) { + log.debug("[{}] Adding FateExecutor for {}", store.type(), configFateOps); fateExecutors .add(new FateExecutor<>(Fate.this, environment, configFateOps, configPoolSize)); } @@ -268,16 +273,6 @@ public class Fate<T> { } this.deadResCleanerExecutor = deadResCleanerExecutor; - startFateExecutors(environment, conf, fateExecutors); - } - - protected void startFateExecutors(T environment, AccumuloConfiguration conf, - Set<FateExecutor<T>> fateExecutors) { - for (var poolConf : getPoolConfigurations(conf).entrySet()) { - // no fate threads are running at this point; fine not to synchronize - fateExecutors - .add(new FateExecutor<>(this, environment, poolConf.getKey(), poolConf.getValue())); - } } /** @@ -382,7 +377,7 @@ public class Fate<T> { // multiple times for a transaction... but it will only seed once public void seedTransaction(FateOperation fateOp, FateId fateId, Repo<T> repo, boolean autoCleanUp, String goalMessage) { - log.info("Seeding {} {}", fateId, goalMessage); + log.info("[{}] Seeding {} {} {}", store.type(), fateOp, fateId, goalMessage); store.seedTransaction(fateOp, fateId, repo, autoCleanUp); } @@ -405,16 +400,18 @@ public class Fate<T> { var txStore = optionalTxStore.orElseThrow(); try { TStatus status = txStore.getStatus(); - log.info("status is: {}", status); + log.info("[{}] status is: {}", store.type(), status); if (status == NEW || status == SUBMITTED) { txStore.setTransactionInfo(TxInfo.EXCEPTION, new TApplicationException( TApplicationException.INTERNAL_ERROR, "Fate transaction cancelled by user")); txStore.setStatus(FAILED_IN_PROGRESS); - log.info("Updated status for {} to FAILED_IN_PROGRESS because it was cancelled by user", - fateId); + log.info( + "[{}] Updated status for {} to FAILED_IN_PROGRESS because it was cancelled by user", + store.type(), fateId); return true; } else { - log.info("{} cancelled by user but already in progress or finished state", fateId); + log.info("[{}] {} cancelled by user but already in progress or finished state", + store.type(), fateId); return false; } } finally { @@ -425,7 +422,7 @@ public class Fate<T> { UtilWaitThread.sleep(500); } } - log.info("Unable to reserve transaction {} to cancel it", fateId); + log.info("[{}] Unable to reserve transaction {} to cancel it", store.type(), fateId); return false; } diff --git a/core/src/main/java/org/apache/accumulo/core/fate/FateExecutor.java b/core/src/main/java/org/apache/accumulo/core/fate/FateExecutor.java index 5ce288d3e2..56a6c17f44 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/FateExecutor.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/FateExecutor.java @@ -22,7 +22,6 @@ import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterrup import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.concurrent.TimeUnit.SECONDS; -import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.FAILED; import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.FAILED_IN_PROGRESS; import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.IN_PROGRESS; @@ -49,6 +48,7 @@ import java.util.stream.Collectors; import org.apache.accumulo.core.clientImpl.AcceptableThriftTableOperationException; import org.apache.accumulo.core.fate.Fate.TxInfo; import org.apache.accumulo.core.fate.FateStore.FateTxStore; +import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; import org.apache.accumulo.core.util.ShutdownUtil; import org.apache.accumulo.core.util.Timer; import org.apache.accumulo.core.util.threads.ThreadPoolNames; @@ -57,7 +57,6 @@ import org.apache.accumulo.core.util.threads.Threads; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; /** @@ -107,30 +106,23 @@ public class FateExecutor<T> { */ protected void resizeFateExecutor(Map<Set<Fate.FateOperation>,Integer> poolConfigs, long idleCheckIntervalMillis) { - final var pool = transactionExecutor; final int configured = poolConfigs.get(fateOps); - ThreadPools.resizePool(pool, () -> configured, poolName); + ThreadPools.resizePool(transactionExecutor, () -> configured, poolName); synchronized (runningTxRunners) { final int running = runningTxRunners.size(); final int needed = configured - running; log.trace("resizing pools configured:{} running:{} needed:{} fateOps:{}", configured, running, needed, fateOps); - if (needed > 0) { // If the pool grew, then ensure that there is a TransactionRunner for each thread for (int i = 0; i < needed; i++) { + final TransactionRunner tr = new TransactionRunner(); try { - pool.execute(new TransactionRunner()); + runningTxRunners.add(tr); + transactionExecutor.execute(tr); } catch (RejectedExecutionException e) { - // RejectedExecutionException could be shutting down - if (pool.isShutdown()) { - // The exception is expected in this case, no need to spam the logs. - log.trace("Expected error adding transaction runner to FaTE executor pool. " - + "The pool is shutdown.", e); - } else { - // This is bad, FaTE may no longer work! - log.error("Unexpected error adding transaction runner to FaTE executor pool.", e); - } + runningTxRunners.remove(tr); + log.error("Unexpected error adding transaction runner to FaTE executor pool.", e); break; } } @@ -203,7 +195,6 @@ public class FateExecutor<T> { /** * @return the number of currently running transaction runners */ - @VisibleForTesting protected int getNumRunningTxRunners() { return runningTxRunners.size(); } @@ -343,6 +334,7 @@ public class FateExecutor<T> { } protected class TransactionRunner implements Runnable { + // used to signal a TransactionRunner to stop in the case where there are too many running // i.e., // 1. the property for the pool size decreased so we have to stop excess TransactionRunners @@ -350,7 +342,6 @@ public class FateExecutor<T> { // 2. this FateExecutor is no longer valid from config changes so we need to shutdown this // FateExecutor private final AtomicBoolean stop = new AtomicBoolean(false); - private volatile Long threadId = null; private Optional<FateTxStore<T>> reserveFateTx() throws InterruptedException { @@ -378,12 +369,11 @@ public class FateExecutor<T> { @Override public void run() { - runningTxRunners.add(this); runnerLog.trace("A TransactionRunner is starting for {} {} ", fate.getStore().type(), fateOps); threadId = Thread.currentThread().getId(); try { - while (fate.getKeepRunning().get() && !stop.get()) { + while (fate.getKeepRunning().get() && !isShutdown() && !stop.get()) { FateTxStore<T> txStore = null; ExecutionState state = new ExecutionState(); try { @@ -395,6 +385,8 @@ public class FateExecutor<T> { } state.status = txStore.getStatus(); state.op = txStore.top(); + runnerLog.trace("Processing FATE transaction {} id: {} status: {}", state.op.getName(), + txStore.getID(), state.status); if (state.status == FAILED_IN_PROGRESS) { processFailed(txStore, state.op); } else if (state.status == SUBMITTED || state.status == IN_PROGRESS) { @@ -426,9 +418,18 @@ public class FateExecutor<T> { } } } catch (Exception e) { - runnerLog.error("Uncaught exception in FATE runner thread.", e); + String name = state.op == null ? null : state.op.getName(); + FateId txid = txStore == null ? null : txStore.getID(); + runnerLog.error( + "Uncaught exception in FATE runner thread processing {} id: {} status: {}", name, + txid, state.status, e); } finally { if (txStore != null) { + if (runnerLog.isTraceEnabled()) { + String name = state.op == null ? null : state.op.getName(); + runnerLog.trace("Completed FATE transaction {} id: {} status: {}", name, + txStore.getID(), state.status); + } txStore.unreserve(Duration.ofMillis(state.deferTime)); } } diff --git a/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java index f38c50a2e9..5f71550a32 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java @@ -112,6 +112,8 @@ public class UserFateStore<T> extends AbstractFateStore<T> { super(lockID, isLockHeld, maxDeferred, fateIdGenerator); this.context = Objects.requireNonNull(context); this.tableName = Objects.requireNonNull(tableName); + Preconditions.checkArgument(this.context.tableOperations().exists(tableName), + "user fate store table " + tableName + " does not exist."); this.writer = Suppliers.memoize(() -> { try { return createConditionalWriterForFateTable(this.tableName); diff --git a/test/src/main/java/org/apache/accumulo/test/fate/FatePoolsWatcherITBase.java b/test/src/main/java/org/apache/accumulo/test/fate/FatePoolsWatcherITBase.java index 9a562a7c76..66b80d41dd 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/FatePoolsWatcherITBase.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/FatePoolsWatcherITBase.java @@ -19,7 +19,6 @@ package org.apache.accumulo.test.fate; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.Map; import java.util.Set; @@ -716,8 +715,9 @@ public abstract class FatePoolsWatcherITBase extends SharedMiniClusterBase @Override public long isReady(FateId fateId, PoolResizeTestEnv environment) throws Exception { environment.numWorkers.incrementAndGet(); - assertTrue(environment.isReadyLatch.await(2, TimeUnit.MINUTES), - "Timed out waiting for isReady latch"); + if (!environment.isReadyLatch.await(2, TimeUnit.MINUTES)) { + throw new IllegalStateException("Timed out waiting for env latch to be ready."); + } return 0; } diff --git a/test/src/main/java/org/apache/accumulo/test/fate/FlakyFate.java b/test/src/main/java/org/apache/accumulo/test/fate/FlakyFate.java index 15e429d581..2e79fc4729 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/FlakyFate.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/FlakyFate.java @@ -39,11 +39,6 @@ public class FlakyFate<T> extends Fate<T> { public FlakyFate(T environment, FateStore<T> store, Function<Repo<T>,String> toLogStrFunc, AccumuloConfiguration conf) { super(environment, store, false, toLogStrFunc, conf, new ScheduledThreadPoolExecutor(2)); - } - - @Override - protected void startFateExecutors(T environment, AccumuloConfiguration conf, - Set<FateExecutor<T>> fateExecutors) { for (var poolConfig : getPoolConfigurations(conf).entrySet()) { fateExecutors.add( new FlakyFateExecutor<>(this, environment, poolConfig.getKey(), poolConfig.getValue()));