This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push: new c27c9d8719 Fixes FateIT being flaky (#5738) c27c9d8719 is described below commit c27c9d8719704330f88af8db3deaff197d1e98af Author: Keith Turner <ktur...@apache.org> AuthorDate: Wed Jul 16 12:05:20 2025 -0400 Fixes FateIT being flaky (#5738) In FateIT fate threads were hanging around after a test method completed and sometimes causing problems for subsequent test. Made the test methods wait for all threads in to stop in Fate --- .../main/java/org/apache/accumulo/core/fate/Fate.java | 16 +++++++++++++--- .../main/java/org/apache/accumulo/manager/Manager.java | 2 +- .../org/apache/accumulo/test/fate/zookeeper/FateIT.java | 13 ++++++++----- 3 files changed, 22 insertions(+), 9 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java index aac1921914..f1c3bb9e64 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java @@ -419,9 +419,19 @@ public class Fate<T> { /** * Flags that FATE threadpool to clear out and end. Does not actively stop running FATE processes. */ - public void shutdown() { + public void shutdown(boolean wait) { keepRunning.set(false); - executor.shutdown(); + if (wait) { + executor.shutdownNow(); + while (!executor.isTerminated()) { + try { + executor.awaitTermination(1, SECONDS); + } catch (InterruptedException e) { + throw new IllegalStateException(e); + } + } + } else { + executor.shutdown(); + } } - } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index c4914fc0ff..a9b861fbd4 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -1458,7 +1458,7 @@ public class Manager extends AbstractServer implements LiveTServerSet.Listener, sa.server.stop(); log.debug("Shutting down fate."); - fate().shutdown(); + fate().shutdown(false); final long deadline = System.currentTimeMillis() + MAX_CLEANUP_WAIT_TIME; try { diff --git a/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/FateIT.java b/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/FateIT.java index 7cc90a26a9..155df5ab9c 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/FateIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/FateIT.java @@ -42,6 +42,7 @@ import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.clientImpl.thrift.TableOperation; @@ -189,6 +190,8 @@ public class FateIT { private static CountDownLatch finishCall; private static CountDownLatch undoLatch; + private static AtomicInteger nextFateDir = new AtomicInteger(0); + private enum ExceptionLocation { CALL, IS_READY }; @@ -282,7 +285,7 @@ public class FateIT { return false; }, SECONDS.toMillis(30), 10); } finally { - fate.shutdown(); + fate.shutdown(true); } } @@ -325,7 +328,7 @@ public class FateIT { fate.delete(txid); assertThrows(KeeperException.NoNodeException.class, () -> getTxStatus(zk, txid)); } finally { - fate.shutdown(); + fate.shutdown(true); } } @@ -402,7 +405,7 @@ public class FateIT { fate.delete(txid); assertThrows(KeeperException.NoNodeException.class, () -> getTxStatus(zk, txid)); } finally { - fate.shutdown(); + fate.shutdown(true); } } @@ -443,7 +446,7 @@ public class FateIT { // cancel the transaction assertFalse(fate.cancel(txid)); } finally { - fate.shutdown(); + fate.shutdown(true); } } @@ -507,7 +510,7 @@ public class FateIT { assertEquals(FAILED, fate.waitForCompletion(txid)); assertTrue(fate.getException(txid).getMessage().contains("isReady() failed")); } finally { - fate.shutdown(); + fate.shutdown(true); } }