This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new 0b81489340 Avoid client side errors during manager shutdown. (#5725)
0b81489340 is described below
commit 0b81489340b59d06fefb0d759cd3089a6180746c
Author: Keith Turner <[email protected]>
AuthorDate: Tue Jul 8 15:17:52 2025 -0400
Avoid client side errors during manager shutdown. (#5725)
Client side table operations that used fate could see exceptions during
manager shutdown before this fix. After this fix hopefully that is not
the case. Was occasionally seeing this problems in a ShutdownIT test.
Modified ShutdownIT to more reliably cause the problem before this fix,
after this fix the test is passing consistently.
---
core/src/main/java/org/apache/accumulo/core/fate/Fate.java | 11 +++++++++--
.../src/main/java/org/apache/accumulo/manager/Manager.java | 12 ++++++++----
.../java/org/apache/accumulo/test/functional/ShutdownIT.java | 4 ++++
3 files changed, 21 insertions(+), 6 deletions(-)
diff --git a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
index 6a577333a1..c86e8168e9 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
@@ -487,7 +487,9 @@ public class Fate<T> {
}
/**
- * Initiates shutdown of background threads and optionally waits on them.
+ * Initiates shutdown of background threads that run fate operations and
cleanup fate data and
+ * optionally waits on them. Leaves the fate object in a state where it can
still update and read
+ * fate data, like add a new fate operation or get the status of an existing
fate operation.
*/
public void shutdown(long timeout, TimeUnit timeUnit) {
log.info("Shutting down {} FATE", store.type());
@@ -532,8 +534,13 @@ public class Fate<T> {
if (deadResCleanerExecutor != null) {
deadResCleanerExecutor.shutdownNow();
}
+ }
- // ensure store resources are cleaned up
+ /**
+ * Initiates shutdown of all fate threads and prevents reads and updates of
fates persisted data.
+ */
+ public void close() {
+ shutdown(0, SECONDS);
store.close();
}
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index cc84a0fc29..0dec09e4a0 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@ -101,6 +101,7 @@ import org.apache.accumulo.core.metrics.MetricsProducer;
import org.apache.accumulo.core.trace.TraceUtil;
import org.apache.accumulo.core.util.Retry;
import org.apache.accumulo.core.util.Timer;
+import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.core.util.threads.ThreadPools;
import org.apache.accumulo.core.util.threads.Threads;
import org.apache.accumulo.core.util.time.SteadyTime;
@@ -1196,14 +1197,17 @@ public class Manager extends AbstractServer implements
LiveTServerSet.Listener {
}
}
+ log.debug("Stopping Thrift Servers");
+ getThriftServer().stop();
+ while (getThriftServer().isServing()) {
+ UtilWaitThread.sleep(100);
+ }
+
log.debug("Shutting down fate.");
- getFateRefs().keySet().forEach(type -> fate(type).shutdown(0, MINUTES));
+ getFateRefs().keySet().forEach(type -> fate(type).close());
splitter.stop();
- log.debug("Stopping Thrift Servers");
- getThriftServer().stop();
-
final long deadline = System.currentTimeMillis() + MAX_CLEANUP_WAIT_TIME;
try {
statusThread.join(remaining(deadline));
diff --git
a/test/src/main/java/org/apache/accumulo/test/functional/ShutdownIT.java
b/test/src/main/java/org/apache/accumulo/test/functional/ShutdownIT.java
index 966c0a82ca..d024068732 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/ShutdownIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/ShutdownIT.java
@@ -86,6 +86,7 @@ public class ShutdownIT extends ConfigurableMacBase {
Thread async = new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
+ Thread.sleep(100);
c.tableOperations().delete("table" + i);
}
} catch (Exception ex) {
@@ -95,6 +96,9 @@ public class ShutdownIT extends ConfigurableMacBase {
async.start();
Thread.sleep(100);
assertEquals(0, cluster.exec(Admin.class,
"stopAll").getProcess().waitFor());
+ // give the backfound delete operations a bit to run
+ Thread.sleep(3000);
+ // The delete operations should get stuck or run, but should not throw
an exception
if (ref.get() != null) {
throw ref.get();
}