This is an automated email from the ASF dual-hosted git repository.

ifesdjeen pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cep-15-accord by this push:
     new 38f055e813 Improve shutdown sequencing; wait for all stages to quiesce
38f055e813 is described below

commit 38f055e8131604f72d2815fb8f3a11323bc31a64
Author: Alex Petrov <[email protected]>
AuthorDate: Tue Oct 15 16:59:07 2024 +0200

    Improve shutdown sequencing; wait for all stages to quiesce
---
 .../service/accord/AccordCommandStores.java        | 24 +++++++++++++++-------
 .../cassandra/service/accord/AccordJournal.java    |  2 +-
 .../cassandra/service/accord/AccordService.java    |  8 ++++----
 .../org/apache/cassandra/utils/ExecutorUtils.java  | 11 +++++++---
 4 files changed, 30 insertions(+), 15 deletions(-)

diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java 
b/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java
index 843cd37516..d51223243d 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java
@@ -22,10 +22,8 @@ import java.util.List;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
-import java.util.function.Supplier;
 
 import accord.api.Agent;
-import accord.api.ConfigurationService.EpochReady;
 import accord.api.DataStore;
 import accord.api.LocalListeners;
 import accord.api.ProgressLog;
@@ -38,6 +36,7 @@ import accord.primitives.Range;
 import accord.topology.Topology;
 import accord.utils.RandomSource;
 import org.apache.cassandra.cache.CacheSize;
+import org.apache.cassandra.concurrent.ExecutorPlus;
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.metrics.AccordStateCacheMetrics;
@@ -48,6 +47,10 @@ import 
org.apache.cassandra.service.accord.api.AccordRoutingKey;
 import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
 
 import static 
org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory;
+import static org.apache.cassandra.concurrent.Stage.ACCORD_MIGRATION;
+import static org.apache.cassandra.concurrent.Stage.ACCORD_RANGE_LOADER;
+import static org.apache.cassandra.concurrent.Stage.MUTATION;
+import static org.apache.cassandra.concurrent.Stage.READ;
 
 public class AccordCommandStores extends CommandStores implements CacheSize
 {
@@ -148,17 +151,24 @@ public class AccordCommandStores extends CommandStores 
implements CacheSize
         boolean hadPending;
         try
         {
+            List<ExecutorPlus> executors = new ArrayList<>();
+            for (CommandStoreExecutor executor : this.executors)
+                executors.add(executor.delegate);
+
+            executors.add(READ.executor());
+            executors.add(MUTATION.executor());
+            executors.add(ACCORD_MIGRATION.executor());
+            executors.add(ACCORD_RANGE_LOADER.executor());
+
             do
             {
                 hadPending = false;
                 List<Future<?>> futures = new ArrayList<>();
-                for (CommandStoreExecutor executor : executors)
+                for (ExecutorPlus executor : executors)
                 {
-                    if (executor.hasTasks())
-                    {
-                        futures.add(executor.submit(() -> {}));
+                    if (!hadPending && (executor.getPendingTaskCount() > 0 || 
executor.getActiveTaskCount() > 0))
                         hadPending = true;
-                    }
+                    futures.add(executor.submit(() -> {}));
                 }
                 for (Future<?> future : futures)
                     future.get();
diff --git a/src/java/org/apache/cassandra/service/accord/AccordJournal.java 
b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
index 330c78e32c..a575da3f8b 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordJournal.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
@@ -419,7 +419,7 @@ public class AccordJournal implements IJournal, Shutdownable
                     AccordCommandStore.Loader loader = commandStore.loader();
                     loader.load(command).get();
                     if (command.saveStatus().compareTo(SaveStatus.Stable) >= 0 
&& !command.hasBeen(Truncated))
-                        loader.apply(command);
+                        loader.apply(command).get();
                 }
             }
 
diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java 
b/src/java/org/apache/cassandra/service/accord/AccordService.java
index 183507e844..a080e93d14 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordService.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordService.java
@@ -987,7 +987,7 @@ public class AccordService implements IAccordService, 
Shutdownable
     {
         if (state != State.STARTED)
             return;
-        ExecutorUtils.shutdownSequentiallyAndWait(shutdownableSubsystems(), 1, 
TimeUnit.MINUTES);
+        shutdownAndWait(1, TimeUnit.MINUTES);
         state = State.SHUTDOWN;
     }
 
@@ -1019,10 +1019,10 @@ public class AccordService implements IAccordService, 
Shutdownable
 
     @VisibleForTesting
     @Override
-    public void shutdownAndWait(long timeout, TimeUnit unit) throws 
InterruptedException, TimeoutException
+    public void shutdownAndWait(long timeout, TimeUnit unit)
     {
-        shutdown();
-        ExecutorUtils.shutdownAndWait(timeout, unit, this);
+        if 
(!ExecutorUtils.shutdownSequentiallyAndWait(shutdownableSubsystems(), timeout, 
unit))
+            logger.error("One or more subsystems did not shut down cleanly.");
     }
 
     @Override
diff --git a/src/java/org/apache/cassandra/utils/ExecutorUtils.java 
b/src/java/org/apache/cassandra/utils/ExecutorUtils.java
index 83fc72530a..b37c8ac962 100644
--- a/src/java/org/apache/cassandra/utils/ExecutorUtils.java
+++ b/src/java/org/apache/cassandra/utils/ExecutorUtils.java
@@ -79,10 +79,11 @@ public class ExecutorUtils
         }
     }
 
-    public static void shutdownSequentiallyAndWait(Iterable<?> executors, long 
timeout, TimeUnit unit)
+    public static boolean shutdownSequentiallyAndWait(Iterable<?> executors, 
long timeout, TimeUnit unit)
     {
         long deadline = nanoTime() + unit.toNanos(timeout);
 
+        boolean shutdown = true;
         for (Object executor : executors)
         {
             try
@@ -90,12 +91,14 @@ public class ExecutorUtils
                 if (executor instanceof ExecutorService)
                 {
                     ((ExecutorService) executor).shutdown();
-                    ((ExecutorService) executor).awaitTermination(Math.max(0, 
deadline - nanoTime()), NANOSECONDS);
+                    if (!((ExecutorService) 
executor).awaitTermination(Math.max(0, deadline - nanoTime()), NANOSECONDS))
+                        shutdown = false;
                 }
                 else if (executor instanceof Shutdownable)
                 {
                     ((Shutdownable) executor).shutdown();
-                    ((Shutdownable) executor).awaitTermination(Math.max(0, 
deadline - nanoTime()), NANOSECONDS);
+                    if (!((Shutdownable) 
executor).awaitTermination(Math.max(0, deadline - nanoTime()), NANOSECONDS))
+                        shutdown = false;
                 }
                 else
                     throw new IllegalArgumentException(executor.toString());
@@ -105,6 +108,8 @@ public class ExecutorUtils
                 throw new IllegalStateException("Caught interrupt while 
shutting down " + executor);
             }
         }
+
+        return shutdown;
     }
 
     public static void shutdown(ExecutorService ... executors)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to