This is an automated email from the ASF dual-hosted git repository.
ifesdjeen pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cep-15-accord by this push:
new 38f055e813 Improve shutdown sequencing; wait for all stages to quiesce
38f055e813 is described below
commit 38f055e8131604f72d2815fb8f3a11323bc31a64
Author: Alex Petrov <[email protected]>
AuthorDate: Tue Oct 15 16:59:07 2024 +0200
Improve shutdown sequencing; wait for all stages to quiesce
---
.../service/accord/AccordCommandStores.java | 24 +++++++++++++++-------
.../cassandra/service/accord/AccordJournal.java | 2 +-
.../cassandra/service/accord/AccordService.java | 8 ++++----
.../org/apache/cassandra/utils/ExecutorUtils.java | 11 +++++++---
4 files changed, 30 insertions(+), 15 deletions(-)
diff --git
a/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java
b/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java
index 843cd37516..d51223243d 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java
@@ -22,10 +22,8 @@ import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
-import java.util.function.Supplier;
import accord.api.Agent;
-import accord.api.ConfigurationService.EpochReady;
import accord.api.DataStore;
import accord.api.LocalListeners;
import accord.api.ProgressLog;
@@ -38,6 +36,7 @@ import accord.primitives.Range;
import accord.topology.Topology;
import accord.utils.RandomSource;
import org.apache.cassandra.cache.CacheSize;
+import org.apache.cassandra.concurrent.ExecutorPlus;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.metrics.AccordStateCacheMetrics;
@@ -48,6 +47,10 @@ import
org.apache.cassandra.service.accord.api.AccordRoutingKey;
import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
import static
org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory;
+import static org.apache.cassandra.concurrent.Stage.ACCORD_MIGRATION;
+import static org.apache.cassandra.concurrent.Stage.ACCORD_RANGE_LOADER;
+import static org.apache.cassandra.concurrent.Stage.MUTATION;
+import static org.apache.cassandra.concurrent.Stage.READ;
public class AccordCommandStores extends CommandStores implements CacheSize
{
@@ -148,17 +151,24 @@ public class AccordCommandStores extends CommandStores
implements CacheSize
boolean hadPending;
try
{
+ List<ExecutorPlus> executors = new ArrayList<>();
+ for (CommandStoreExecutor executor : this.executors)
+ executors.add(executor.delegate);
+
+ executors.add(READ.executor());
+ executors.add(MUTATION.executor());
+ executors.add(ACCORD_MIGRATION.executor());
+ executors.add(ACCORD_RANGE_LOADER.executor());
+
do
{
hadPending = false;
List<Future<?>> futures = new ArrayList<>();
- for (CommandStoreExecutor executor : executors)
+ for (ExecutorPlus executor : executors)
{
- if (executor.hasTasks())
- {
- futures.add(executor.submit(() -> {}));
+ if (!hadPending && (executor.getPendingTaskCount() > 0 ||
executor.getActiveTaskCount() > 0))
hadPending = true;
- }
+ futures.add(executor.submit(() -> {}));
}
for (Future<?> future : futures)
future.get();
diff --git a/src/java/org/apache/cassandra/service/accord/AccordJournal.java
b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
index 330c78e32c..a575da3f8b 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordJournal.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
@@ -419,7 +419,7 @@ public class AccordJournal implements IJournal, Shutdownable
AccordCommandStore.Loader loader = commandStore.loader();
loader.load(command).get();
if (command.saveStatus().compareTo(SaveStatus.Stable) >= 0
&& !command.hasBeen(Truncated))
- loader.apply(command);
+ loader.apply(command).get();
}
}
diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java
b/src/java/org/apache/cassandra/service/accord/AccordService.java
index 183507e844..a080e93d14 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordService.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordService.java
@@ -987,7 +987,7 @@ public class AccordService implements IAccordService,
Shutdownable
{
if (state != State.STARTED)
return;
- ExecutorUtils.shutdownSequentiallyAndWait(shutdownableSubsystems(), 1,
TimeUnit.MINUTES);
+ shutdownAndWait(1, TimeUnit.MINUTES);
state = State.SHUTDOWN;
}
@@ -1019,10 +1019,10 @@ public class AccordService implements IAccordService,
Shutdownable
@VisibleForTesting
@Override
- public void shutdownAndWait(long timeout, TimeUnit unit) throws
InterruptedException, TimeoutException
+ public void shutdownAndWait(long timeout, TimeUnit unit)
{
- shutdown();
- ExecutorUtils.shutdownAndWait(timeout, unit, this);
+ if
(!ExecutorUtils.shutdownSequentiallyAndWait(shutdownableSubsystems(), timeout,
unit))
+ logger.error("One or more subsystems did not shut down cleanly.");
}
@Override
diff --git a/src/java/org/apache/cassandra/utils/ExecutorUtils.java
b/src/java/org/apache/cassandra/utils/ExecutorUtils.java
index 83fc72530a..b37c8ac962 100644
--- a/src/java/org/apache/cassandra/utils/ExecutorUtils.java
+++ b/src/java/org/apache/cassandra/utils/ExecutorUtils.java
@@ -79,10 +79,11 @@ public class ExecutorUtils
}
}
- public static void shutdownSequentiallyAndWait(Iterable<?> executors, long
timeout, TimeUnit unit)
+ public static boolean shutdownSequentiallyAndWait(Iterable<?> executors,
long timeout, TimeUnit unit)
{
long deadline = nanoTime() + unit.toNanos(timeout);
+ boolean shutdown = true;
for (Object executor : executors)
{
try
@@ -90,12 +91,14 @@ public class ExecutorUtils
if (executor instanceof ExecutorService)
{
((ExecutorService) executor).shutdown();
- ((ExecutorService) executor).awaitTermination(Math.max(0,
deadline - nanoTime()), NANOSECONDS);
+ if (!((ExecutorService)
executor).awaitTermination(Math.max(0, deadline - nanoTime()), NANOSECONDS))
+ shutdown = false;
}
else if (executor instanceof Shutdownable)
{
((Shutdownable) executor).shutdown();
- ((Shutdownable) executor).awaitTermination(Math.max(0,
deadline - nanoTime()), NANOSECONDS);
+ if (!((Shutdownable)
executor).awaitTermination(Math.max(0, deadline - nanoTime()), NANOSECONDS))
+ shutdown = false;
}
else
throw new IllegalArgumentException(executor.toString());
@@ -105,6 +108,8 @@ public class ExecutorUtils
throw new IllegalStateException("Caught interrupt while
shutting down " + executor);
}
}
+
+ return shutdown;
}
public static void shutdown(ExecutorService ... executors)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]