This is an automated email from the ASF dual-hosted git repository.

benedict pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 312a70372a6cfa416a38f9fd978387e424042554
Author: Alex Petrov <[email protected]>
AuthorDate: Fri Oct 4 11:35:01 2024 +0200

    Fix condition on where we shut down accord; move scheduled executor 
shutdown until after MS.
    
    Wake up segment prepared after shutting down allocator, as no new segments 
will ever be allocated.
    
    Shut down flusher slightly differently: we do not signal from fsync 
complete, since all blocks should have been fsynced by then, but we will add 
invariant check to notice runaway threads.
    
    Wait for quiescense
    
    Truncate blocking
    
    Wait for scheduler shutdown before shutting down command store
    
    Shut down accord after shutting down messaging
    
    Truncate caches before replay
---
 src/java/org/apache/cassandra/journal/Flusher.java |  8 +++-
 src/java/org/apache/cassandra/journal/Journal.java | 32 +++++++++++++-
 .../apache/cassandra/service/StorageService.java   | 12 +++--
 .../service/accord/AccordCommandStore.java         |  5 +++
 .../service/accord/AccordCommandStores.java        | 38 ++++++++++++++++
 .../cassandra/service/accord/AccordJournal.java    | 51 ++++++++++------------
 .../cassandra/service/accord/AccordKeyspace.java   | 17 +++++---
 .../cassandra/service/accord/AccordService.java    |  7 ++-
 .../service/accord/AccordVerbHandler.java          |  6 +++
 .../org/apache/cassandra/utils/ExecutorUtils.java  | 28 ++++++++++++
 10 files changed, 162 insertions(+), 42 deletions(-)

diff --git a/src/java/org/apache/cassandra/journal/Flusher.java 
b/src/java/org/apache/cassandra/journal/Flusher.java
index 709c68615b..5dfa76e894 100644
--- a/src/java/org/apache/cassandra/journal/Flusher.java
+++ b/src/java/org/apache/cassandra/journal/Flusher.java
@@ -489,7 +489,13 @@ final class Flusher<K, V>
         {
             WaitQueue.Signal signal = fsyncComplete.register(context, 
Timer.Context::stop);
             if (fsyncFinishedFor < flushTime)
-                signal.awaitUninterruptibly();
+            {
+                signal.awaitThrowUncheckedOnInterrupt();
+
+                Journal.State state = journal.state.get();
+                Invariants.checkState(state == Journal.State.NORMAL,
+                                      "Thread %s outlived journal, which is in 
%s state", Thread.currentThread(), state);
+            }
             else
                 signal.cancel();
         }
diff --git a/src/java/org/apache/cassandra/journal/Journal.java 
b/src/java/org/apache/cassandra/journal/Journal.java
index ba4bf503b2..5501146d8d 100644
--- a/src/java/org/apache/cassandra/journal/Journal.java
+++ b/src/java/org/apache/cassandra/journal/Journal.java
@@ -111,7 +111,10 @@ public class Journal<K, V> implements Shutdownable
 
     private final AtomicReference<Segments<K, V>> segments = new 
AtomicReference<>();
 
+    final AtomicReference<State> state = new 
AtomicReference<>(State.UNINITIALIZED);
+
     Interruptible allocator;
+    // TODO (required): we do not need wait queues here, we can just wait on a 
signal on a segment while its byte buffer is being allocated
     private final WaitQueue segmentPrepared = newWaitQueue();
     private final WaitQueue allocatorThreadWaitQueue = newWaitQueue();
     private final BooleanSupplier allocatorThreadWaitCondition = () -> 
(availableSegment == null);
@@ -210,6 +213,8 @@ public class Journal<K, V> implements Shutdownable
 
     public void start()
     {
+        Invariants.checkState(state.compareAndSet(State.UNINITIALIZED, 
State.INITIALIZING),
+                              "Unexpected journal state during 
initialization", state);
         metrics.register(flusher);
 
         deleteTmpFiles();
@@ -228,6 +233,8 @@ public class Journal<K, V> implements Shutdownable
         advanceSegment(null);
         flusher.start();
         compactor.start();
+        Invariants.checkState(state.compareAndSet(State.INITIALIZING, 
State.NORMAL),
+                              "Unexpected journal state after initialization", 
state);
     }
 
     @VisibleForTesting
@@ -253,16 +260,19 @@ public class Journal<K, V> implements Shutdownable
     @Override
     public boolean isTerminated()
     {
-        return false;
+        return state.get() == State.TERMINATED;
     }
 
     public void shutdown()
     {
         try
         {
+            Invariants.checkState(state.compareAndSet(State.NORMAL, 
State.SHUTDOWN),
+                                  "Unexpected journal state while trying to 
shut down", state);
             allocator.shutdown();
             wakeAllocator(); // Wake allocator to force it into shutdown
             allocator.awaitTermination(1, TimeUnit.MINUTES);
+            segmentPrepared.signalAll(); // Wake up all threads waiting on the 
new segment
             compactor.shutdown();
             compactor.awaitTermination(1, TimeUnit.MINUTES);
             flusher.shutdown();
@@ -270,6 +280,8 @@ public class Journal<K, V> implements Shutdownable
             closer.awaitTermination(1, TimeUnit.MINUTES);
             closeAllSegments();
             metrics.deregister();
+            Invariants.checkState(state.compareAndSet(State.SHUTDOWN, 
State.TERMINATED),
+                                  "Unexpected journal state while trying to 
shut down", state);
         }
         catch (InterruptedException e)
         {
@@ -574,7 +586,14 @@ public class Journal<K, V> implements Shutdownable
         {
             WaitQueue.Signal prepared = 
segmentPrepared.register(metrics.waitingOnSegmentAllocation.time(), 
Context::stop);
             if (availableSegment == null && currentSegment == 
currentActiveSegment)
-                prepared.awaitUninterruptibly();
+            {
+                prepared.awaitThrowUncheckedOnInterrupt();
+
+                // In case we woke up due to shutdown signal or interrupt, 
check mode
+                State state = this.state.get();
+                if (state.ordinal() > State.NORMAL.ordinal())
+                    throw new IllegalStateException("Can not obtain allocated 
segment due to shutdown " + state);
+            }
             else
                 prepared.cancel();
         }
@@ -1024,4 +1043,13 @@ public class Journal<K, V> implements Shutdownable
             segments.close();
         }
     }
+
+    enum State
+    {
+        UNINITIALIZED,
+        INITIALIZING,
+        NORMAL,
+        SHUTDOWN,
+        TERMINATED
+    }
 }
diff --git a/src/java/org/apache/cassandra/service/StorageService.java 
b/src/java/org/apache/cassandra/service/StorageService.java
index 54ffa99fe3..ee92ba9054 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -173,6 +173,7 @@ import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.schema.TableMetadataRef;
 import org.apache.cassandra.schema.ViewMetadata;
 import org.apache.cassandra.service.accord.AccordService;
+import org.apache.cassandra.service.accord.AccordVerbHandler;
 import 
org.apache.cassandra.service.consensus.migration.ConsensusMigrationState;
 import 
org.apache.cassandra.service.consensus.migration.ConsensusMigrationTarget;
 import org.apache.cassandra.service.disk.usage.DiskUsageBroadcaster;
@@ -3961,9 +3962,6 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
                 logger.debug(msg);
             transientMode = Optional.of(Mode.DRAINING);
 
-            if (DatabaseDescriptor.getAccordTransactionsEnabled())
-                AccordService.instance().shutdownAndWait(1, MINUTES);
-
             try
             {
                 /* not clear this is reasonable time, but propagated from 
prior embedded behaviour */
@@ -3979,7 +3977,7 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
 
             if (daemon != null)
                 shutdownClientServers();
-            ScheduledExecutors.optionalTasks.shutdown();
+
             Gossiper.instance.stop();
             ActiveRepairService.instance().stop();
 
@@ -3989,6 +3987,9 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
                 transientMode = Optional.of(Mode.DRAINING);
             }
 
+            if (AccordService.isSetup())
+                AccordService.instance().shutdownAndWait(1, MINUTES);
+
             // In-progress writes originating here could generate hints to be 
written,
             // which is currently scheduled on the mutation stage. So shut 
down MessagingService
             // before mutation stage, so we can get all the hints saved before 
shutting down.
@@ -4003,6 +4004,9 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
                 logger.error("Messaging service timed out shutting down", t);
             }
 
+            // ScheduledExecutors shuts down after MessagingService, as 
MessagingService may issue tasks to it.
+            ScheduledExecutors.optionalTasks.shutdown();
+
             if (!isFinalShutdown)
             {
                 logger.debug("clearing mutation stage");
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java 
b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
index 51bc4d85f6..91e4030d5d 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
@@ -694,6 +694,11 @@ public class AccordCommandStore extends CommandStore
             this.threadId = threadId;
         }
 
+        public boolean hasTasks()
+        {
+            return delegate.getPendingTaskCount() > 0 || 
delegate.getActiveTaskCount() > 0;
+        }
+
         CommandStoreExecutor(AccordStateCache stateCache, 
SequentialExecutorPlus delegate)
         {
             this.stateCache = stateCache;
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java 
b/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java
index 6d9744310f..620bad8b16 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java
@@ -17,6 +17,10 @@
  */
 package org.apache.cassandra.service.accord;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
 
@@ -41,6 +45,7 @@ import org.apache.cassandra.metrics.CacheSizeMetrics;
 import org.apache.cassandra.schema.TableId;
 import 
org.apache.cassandra.service.accord.AccordCommandStore.CommandStoreExecutor;
 import org.apache.cassandra.service.accord.api.AccordRoutingKey;
+import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
 
 import static 
org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory;
 
@@ -154,6 +159,39 @@ public class AccordCommandStores extends CommandStores 
implements CacheSize
         };
     }
 
+    public void waitForQuiescense()
+    {
+        boolean hadPending;
+        try
+        {
+            do
+            {
+                hadPending = false;
+                List<Future<?>> futures = new ArrayList<>();
+                for (CommandStoreExecutor executor : executors)
+                {
+                    if (executor.hasTasks())
+                    {
+                        futures.add(executor.submit(() -> {}));
+                        hadPending = true;
+                    }
+                }
+                for (Future<?> future : futures)
+                    future.get();
+                futures.clear();
+            }
+            while (hadPending);
+        }
+        catch (ExecutionException e)
+        {
+            throw new IllegalStateException("Should have never been thrown", 
e);
+        }
+        catch (InterruptedException e)
+        {
+            throw new UncheckedInterruptedException(e);
+        }
+    }
+
     @Override
     public synchronized void shutdown()
     {
diff --git a/src/java/org/apache/cassandra/service/accord/AccordJournal.java 
b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
index 49af8042a8..38bd9f9101 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordJournal.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
@@ -26,7 +26,6 @@ import java.util.NavigableMap;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
@@ -40,9 +39,10 @@ import accord.local.CommandStores.RangesForEpoch;
 import accord.local.DurableBefore;
 import accord.local.Node;
 import accord.local.RedundantBefore;
-import accord.primitives.SaveStatus;
+import accord.local.cfk.CommandsForKey;
 import accord.primitives.Deps;
 import accord.primitives.Ranges;
+import accord.primitives.SaveStatus;
 import accord.primitives.Timestamp;
 import accord.primitives.TxnId;
 import accord.utils.Invariants;
@@ -73,9 +73,6 @@ import static 
org.apache.cassandra.service.accord.AccordJournalValueSerializers.
 
 public class AccordJournal implements IJournal, Shutdownable
 {
-
-    private final AtomicBoolean isReplay = new AtomicBoolean(false);
-
     static
     {
         // make noise early if we forget to update our version mappings
@@ -93,7 +90,7 @@ public class AccordJournal implements IJournal, Shutdownable
     private final Params params;
     Node node;
 
-    enum Status { INITIALIZED, STARTING, STARTED, TERMINATING, TERMINATED }
+    enum Status { INITIALIZED, STARTING, REPLAY, STARTED, TERMINATING, 
TERMINATED }
     private volatile Status status = Status.INITIALIZED;
 
     @VisibleForTesting
@@ -127,10 +124,14 @@ public class AccordJournal implements IJournal, 
Shutdownable
         this.node = node;
         status = Status.STARTING;
         journal.start();
-        status = Status.STARTED;
         return this;
     }
 
+    public boolean started()
+    {
+        return status == Status.STARTED;
+    }
+
     public Params configuration()
     {
         return params;
@@ -150,7 +151,7 @@ public class AccordJournal implements IJournal, Shutdownable
     @Override
     public void shutdown()
     {
-        Invariants.checkState(status == Status.STARTED);
+        Invariants.checkState(status == Status.REPLAY || status == 
Status.STARTED);
         status = Status.TERMINATING;
         journal.shutdown();
         status = Status.TERMINATED;
@@ -230,7 +231,7 @@ public class AccordJournal implements IJournal, Shutdownable
     @Override
     public void appendCommand(int store, SavedCommand.DiffWriter value, 
Runnable onFlush)
     {
-        if (value == null || isReplay.get())
+        if (value == null || status == Status.REPLAY)
         {
             if (onFlush != null)
                 onFlush.run();
@@ -252,7 +253,7 @@ public class AccordJournal implements IJournal, Shutdownable
             @Override
             public AsyncResult<?> persist(DurableBefore addDurableBefore, 
DurableBefore newDurableBefore)
             {
-                if (isReplay.get())
+                if (status == Status.REPLAY)
                     return AsyncResults.success(null);
 
                 AsyncResult.Settable<Void> result = AsyncResults.settable();
@@ -306,18 +307,6 @@ public class AccordJournal implements IJournal, 
Shutdownable
         return builder;
     }
 
-    public List<SavedCommand.Builder> loadSeparateDiffs(int commandStoreId, 
TxnId txnId)
-    {
-        JournalKey key = new JournalKey(txnId, JournalKey.Type.COMMAND_DIFF, 
commandStoreId);
-        List<SavedCommand.Builder> builders = new ArrayList<>();
-        journalTable.readAll(key, (in, version) -> {
-            SavedCommand.Builder builder = new SavedCommand.Builder(txnId);
-            builder.deserializeNext(in, version);
-            builders.add(builder);
-        });
-        return builders;
-    }
-
     private <BUILDER> BUILDER readAll(JournalKey key)
     {
         BUILDER builder = (BUILDER) key.type.serializer.mergerFor(key);
@@ -367,6 +356,10 @@ public class AccordJournal implements IJournal, 
Shutdownable
 
     public void replay()
     {
+        logger.info("Starting journal replay.");
+        CommandsForKey.disableLinearizabilityViolationsReporting();
+        AccordKeyspace.truncateAllCaches();
+
         // TODO (expected): optimize replay memory footprint
         class ToApply
         {
@@ -383,8 +376,6 @@ public class AccordJournal implements IJournal, Shutdownable
         List<ToApply> toApply = new ArrayList<>();
         try (AccordJournalTable.KeyOrderIterator<JournalKey> iter = 
journalTable.readAll())
         {
-            isReplay.set(true);
-
             JournalKey key;
             SavedCommand.Builder builder = new SavedCommand.Builder();
             while ((key = iter.key()) != null)
@@ -425,17 +416,20 @@ public class AccordJournal implements IJournal, 
Shutdownable
             for (ToApply apply : toApply)
             {
                 AccordCommandStore commandStore = (AccordCommandStore) 
node.commandStores().forId(apply.key.commandStoreId);
+                logger.info("Apply {}", apply.command);
                 commandStore.loader().apply(apply.command);
             }
+
+            logger.info("Waiting for command stores to quiesce.");
+            ((AccordCommandStores)node.commandStores()).waitForQuiescense();
+            CommandsForKey.enableLinearizabilityViolationsReporting();
+            logger.info("Finished journal replay.");
+            status = Status.STARTED;
         }
         catch (Throwable t)
         {
             throw new RuntimeException("Can not replay journal.", t);
         }
-        finally
-        {
-            isReplay.set(false);
-        }
     }
 
     // TODO: this is here temporarily; for debugging purposes
@@ -492,7 +486,6 @@ public class AccordJournal implements IJournal, Shutdownable
                                                t);
                 }
             }
-
         }
     }
 }
\ No newline at end of file
diff --git a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java 
b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
index 0a8cb085fb..bddf73f0aa 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
@@ -37,23 +37,22 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
-import org.apache.cassandra.tcm.ClusterMetadata;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import accord.api.RoutingKey;
-import accord.local.StoreParticipants;
-import accord.local.cfk.CommandsForKey;
 import accord.impl.TimestampsForKey;
 import accord.local.Command;
 import accord.local.CommandStore;
 import accord.local.Node;
 import accord.local.RedundantBefore;
+import accord.local.StoreParticipants;
+import accord.local.cfk.CommandsForKey;
+import accord.primitives.Ranges;
+import accord.primitives.Route;
 import accord.primitives.SaveStatus;
 import accord.primitives.Status;
 import accord.primitives.Status.Durability;
-import accord.primitives.Ranges;
-import accord.primitives.Route;
 import accord.primitives.Timestamp;
 import accord.primitives.TxnId;
 import accord.topology.Topology;
@@ -138,6 +137,7 @@ import 
org.apache.cassandra.service.accord.serializers.AccordRoutingKeyByteSourc
 import org.apache.cassandra.service.accord.serializers.CommandSerializers;
 import 
org.apache.cassandra.service.accord.serializers.CommandsForKeySerializer;
 import org.apache.cassandra.service.accord.serializers.KeySerializers;
+import org.apache.cassandra.tcm.ClusterMetadata;
 import org.apache.cassandra.utils.Clock.Global;
 import org.apache.cassandra.utils.CloseableIterator;
 import org.apache.cassandra.utils.btree.BTree;
@@ -747,6 +747,13 @@ public class AccordKeyspace
         return Tables.of(Commands, TimestampsForKeys, CommandsForKeys, 
Topologies, EpochMetadata, Journal);
     }
 
+    public static void truncateAllCaches()
+    {
+        Keyspace ks = Keyspace.open(ACCORD_KEYSPACE_NAME);
+        for (String table : new String[]{ TimestampsForKeys.name, 
CommandsForKeys.name })
+            ks.getColumnFamilyStore(table).truncateBlocking();
+    }
+
     private static <T> ByteBuffer serialize(T obj, LocalVersionedSerializer<T> 
serializer) throws IOException
     {
         int size = (int) serializer.serializedSize(obj);
diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java 
b/src/java/org/apache/cassandra/service/accord/AccordService.java
index 6fdf9e8586..f132840c82 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordService.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordService.java
@@ -201,6 +201,7 @@ public class AccordService implements IAccordService, 
Shutdownable
     private final CoordinateDurabilityScheduling durabilityScheduling;
     private final AccordVerbHandler<? extends Request> requestHandler;
     private final LocalConfig configuration;
+
     @GuardedBy("this")
     private State state = State.INIT;
 
@@ -381,6 +382,10 @@ public class AccordService implements IAccordService, 
Shutdownable
         i.shutdownAndWait(timeout, unit);
     }
 
+    public boolean shouldAcceptMessages()
+    {
+        return state == State.STARTED && journal.started();
+    }
     public static IAccordService instance()
     {
         if (!DatabaseDescriptor.getAccordTransactionsEnabled())
@@ -964,7 +969,7 @@ public class AccordService implements IAccordService, 
Shutdownable
     {
         if (state != State.STARTED)
             return;
-        ExecutorUtils.shutdown(shutdownableSubsystems());
+        ExecutorUtils.shutdownSequentiallyAndWait(shutdownableSubsystems(), 1, 
TimeUnit.MINUTES);
         state = State.SHUTDOWN;
     }
 
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordVerbHandler.java 
b/src/java/org/apache/cassandra/service/accord/AccordVerbHandler.java
index 5d8747d4a5..34c7b26bd9 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordVerbHandler.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordVerbHandler.java
@@ -43,6 +43,12 @@ public class AccordVerbHandler<T extends Request> implements 
IVerbHandler<T>
     @Override
     public void doVerb(Message<T> message) throws IOException
     {
+        if (!((AccordService)AccordService.instance()).shouldAcceptMessages())
+        {
+            logger.debug("Dropping message {} from {}", message.verb(), 
message.from());
+            return;
+        }
+
         logger.trace("Receiving {} from {}", message.payload, message.from());
         T request = message.payload;
 
diff --git a/src/java/org/apache/cassandra/utils/ExecutorUtils.java 
b/src/java/org/apache/cassandra/utils/ExecutorUtils.java
index 5bb841f32b..5b449e3096 100644
--- a/src/java/org/apache/cassandra/utils/ExecutorUtils.java
+++ b/src/java/org/apache/cassandra/utils/ExecutorUtils.java
@@ -79,6 +79,34 @@ public class ExecutorUtils
         }
     }
 
+    public static void shutdownSequentiallyAndWait(Iterable<?> executors, long 
timeout, TimeUnit unit)
+    {
+        long deadline = System.nanoTime() + unit.toNanos(timeout);
+
+        for (Object executor : executors)
+        {
+            try
+            {
+                if (executor instanceof ExecutorService)
+                {
+                    ((ExecutorService) executor).shutdown();
+                    ((ExecutorService) executor).awaitTermination(Math.max(0, 
deadline - System.nanoTime()), NANOSECONDS);
+                }
+                else if (executor instanceof Shutdownable)
+                {
+                    ((Shutdownable) executor).shutdown();
+                    ((Shutdownable) executor).awaitTermination(Math.max(0, 
deadline - System.nanoTime()), NANOSECONDS);
+                }
+                else
+                    throw new IllegalArgumentException(executor.toString());
+            }
+            catch (Throwable t)
+            {
+                throw new IllegalStateException("Caught interrupt while 
shutting down " + executor);
+            }
+        }
+    }
+
     public static void shutdown(ExecutorService ... executors)
     {
         shutdown(Arrays.asList(executors));


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to