This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push:
new 9d80b65f0e fixes clean shutdown bug in manager (#4248)
9d80b65f0e is described below
commit 9d80b65f0eec60ad6d137cc8400798380cc55fe7
Author: Keith Turner <[email protected]>
AuthorDate: Fri Feb 9 18:04:17 2024 -0500
fixes clean shutdown bug in manager (#4248)
This commits fixes #4209 by shutting down fate before unassigning any
tablets on which fate depends.
---
.../java/org/apache/accumulo/core/fate/Fate.java | 47 +++++++++++++++++-----
.../accumulo/core/fate/ReadOnlyFateStore.java | 2 +
.../org/apache/accumulo/core/fate/ZooStore.java | 5 +++
.../accumulo/core/fate/accumulo/AccumuloStore.java | 5 +++
.../apache/accumulo/core/logging/FateLogger.java | 6 +++
.../org/apache/accumulo/core/fate/TestStore.java | 5 +++
.../java/org/apache/accumulo/manager/Manager.java | 9 ++++-
.../java/org/apache/accumulo/test/fate/FateIT.java | 12 +++---
8 files changed, 74 insertions(+), 17 deletions(-)
diff --git a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
index 35807ee0fc..3828bb80c4 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
@@ -459,20 +459,45 @@ public class Fate<T> {
}
/**
- * Flags that FATE threadpool to clear out and end. Does not actively stop
running FATE processes.
+ * Initiates shutdown of background threads and optionally waits on them.
*/
- public void shutdown() {
- keepRunning.set(false);
- fatePoolWatcher.shutdown();
- if (executor != null) {
+ public void shutdown(long timeout, TimeUnit timeUnit) {
+ if (keepRunning.compareAndSet(true, false)) {
+ fatePoolWatcher.shutdown();
executor.shutdown();
+ workFinder.interrupt();
}
- workFinder.interrupt();
- try {
- workFinder.join();
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
+
+ if (timeout > 0) {
+ long start = System.nanoTime();
+
+ while ((System.nanoTime() - start) < timeUnit.toNanos(timeout)
+ && (workFinder.isAlive() || !executor.isTerminated())) {
+ try {
+ if (!executor.awaitTermination(1, SECONDS)) {
+ log.debug("Fate {} is waiting for worker threads to terminate",
store.type());
+ continue;
+ }
+
+ workFinder.join(1_000);
+ if (workFinder.isAlive()) {
+ log.debug("Fate {} is waiting for work finder thread to
terminate", store.type());
+ workFinder.interrupt();
+ }
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ if (workFinder.isAlive() || !executor.isTerminated()) {
+ log.warn(
+ "Waited for {}ms for all fate {} background threads to stop, but
some are still running. workFinder:{} executor:{}",
+ TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start),
store.type(),
+ workFinder.isAlive(), !executor.isTerminated());
+ }
}
- }
+ // interrupt the background threads
+ executor.shutdownNow();
+ }
}
diff --git
a/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyFateStore.java
b/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyFateStore.java
index deb79413c9..bdbb7739f9 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyFateStore.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyFateStore.java
@@ -158,4 +158,6 @@ public interface ReadOnlyFateStore<T> {
* @return the current number of transactions that have been deferred
*/
int getDeferredCount();
+
+ FateInstanceType type();
}
diff --git a/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java
b/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java
index d0ef960054..6813e727c5 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java
@@ -118,6 +118,11 @@ public class ZooStore<T> extends AbstractFateStore<T> {
return new Pair<>(node.status, node.fateKey);
}
+ @Override
+ public FateInstanceType type() {
+ return fateInstanceType;
+ }
+
private class FateTxStoreImpl extends AbstractFateTxStoreImpl<T> {
private FateTxStoreImpl(FateId fateId, boolean isReserved) {
diff --git
a/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java
b/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java
index 328560b150..7fd4b967cb 100644
---
a/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java
+++
b/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java
@@ -251,6 +251,11 @@ public class AccumuloStore<T> extends AbstractFateStore<T>
{
}
}
+ @Override
+ public FateInstanceType type() {
+ return fateInstanceType;
+ }
+
private class FateTxStoreImpl extends AbstractFateTxStoreImpl<T> {
private FateTxStoreImpl(FateId fateId, boolean isReserved) {
diff --git
a/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java
b/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java
index 0879fbaea8..d646389f92 100644
--- a/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java
+++ b/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java
@@ -27,6 +27,7 @@ import java.util.stream.Stream;
import org.apache.accumulo.core.fate.Fate;
import org.apache.accumulo.core.fate.FateId;
+import org.apache.accumulo.core.fate.FateInstanceType;
import org.apache.accumulo.core.fate.FateKey;
import org.apache.accumulo.core.fate.FateStore;
import org.apache.accumulo.core.fate.FateStore.FateTxStore;
@@ -138,6 +139,11 @@ public class FateLogger {
return store.getDeferredCount();
}
+ @Override
+ public FateInstanceType type() {
+ return store.type();
+ }
+
@Override
public boolean isDeferredOverflow() {
return store.isDeferredOverflow();
diff --git a/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java
b/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java
index 18089848df..6c69de60ef 100644
--- a/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java
+++ b/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java
@@ -226,6 +226,11 @@ public class TestStore implements FateStore<String> {
return 0;
}
+ @Override
+ public FateInstanceType type() {
+ throw new UnsupportedOperationException();
+ }
+
@Override
public boolean isDeferredOverflow() {
return false;
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index 8c1114df78..09b40386bb 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@ -648,9 +648,16 @@ public class Manager extends AbstractServer
case CLEAN_STOP:
switch (getManagerState()) {
case NORMAL:
+ // USER fate stores its data in a user table and its
operations may interact with
+ // all tables, need to completely shut it down before
unloading user tablets
+ fate(FateInstanceType.USER).shutdown(1, MINUTES);
setManagerState(ManagerState.SAFE_MODE);
break;
case SAFE_MODE: {
+ // META fate stores its data in Zookeeper and its operations
interact with
+ // metadata and root tablets, need to completely shut it
down before unloading
+ // metadata and root tablets
+ fate(FateInstanceType.META).shutdown(1, MINUTES);
int count = nonMetaDataTabletsAssignedOrHosted();
log.debug(
String.format("There are %d non-metadata tablets
assigned or hosted", count));
@@ -1145,7 +1152,7 @@ public class Manager extends AbstractServer
sleepUninterruptibly(500, MILLISECONDS);
}
log.info("Shutting down fate.");
- getFateRefs().keySet().forEach(type -> fate(type).shutdown());
+ getFateRefs().keySet().forEach(type -> fate(type).shutdown(0, MINUTES));
splitter.stop();
diff --git a/test/src/main/java/org/apache/accumulo/test/fate/FateIT.java
b/test/src/main/java/org/apache/accumulo/test/fate/FateIT.java
index a373a58c73..7bd350c577 100644
--- a/test/src/main/java/org/apache/accumulo/test/fate/FateIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/fate/FateIT.java
@@ -31,6 +31,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -176,7 +177,7 @@ public abstract class FateIT extends SharedMiniClusterBase
implements FateTestRu
Wait.waitFor(() -> getTxStatus(sctx, fateId) == UNKNOWN);
} finally {
- fate.shutdown();
+ fate.shutdown(10, TimeUnit.MINUTES);
}
}
@@ -210,7 +211,7 @@ public abstract class FateIT extends SharedMiniClusterBase
implements FateTestRu
fate.delete(fateId);
assertEquals(UNKNOWN, getTxStatus(sctx, fateId));
} finally {
- fate.shutdown();
+ fate.shutdown(10, TimeUnit.MINUTES);
}
}
@@ -245,7 +246,7 @@ public abstract class FateIT extends SharedMiniClusterBase
implements FateTestRu
fate.delete(fateId);
assertEquals(UNKNOWN, getTxStatus(sctx, fateId));
} finally {
- fate.shutdown();
+ fate.shutdown(10, TimeUnit.MINUTES);
}
}
@@ -275,8 +276,9 @@ public abstract class FateIT extends SharedMiniClusterBase
implements FateTestRu
callStarted.await();
// cancel the transaction
assertFalse(fate.cancel(fateId));
+ finishCall.countDown();
} finally {
- fate.shutdown();
+ fate.shutdown(10, TimeUnit.MINUTES);
}
}
@@ -348,7 +350,7 @@ public abstract class FateIT extends SharedMiniClusterBase
implements FateTestRu
});
} finally {
- fate.shutdown();
+ fate.shutdown(10, TimeUnit.MINUTES);
}
}