This is an automated email from the ASF dual-hosted git repository.

ifesdjeen pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 473cd7be91 Standardise Replay logic to ensure SafeCommandStore.update 
is called Also Fix:  - Initialise home state when calling waiting() if not 
already initialised  - Don't reportClosed/reportRetired for epochs that are 
already closed/retired Also Improve:  - Implement waitForQuiescence in 
AccordExecutor  - Permit replay parallelism
473cd7be91 is described below

commit 473cd7be91dbdbadb78a9b85689fb87047b1ed09
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Sat Jul 5 09:58:59 2025 +0100

    Standardise Replay logic to ensure SafeCommandStore.update is called
    Also Fix:
     - Initialise home state when calling waiting() if not already initialised
     - Don't reportClosed/reportRetired for epochs that are already 
closed/retired
    Also Improve:
     - Implement waitForQuiescence in AccordExecutor
     - Permit replay parallelism
    
    patch by Benedict; reviewed by Alex Petrov for CASSANDRA-20750
---
 .gitmodules                                        |  4 +-
 modules/accord                                     |  2 +-
 .../service/accord/AccordCommandStore.java         | 21 +++++-----
 .../service/accord/AccordCommandStores.java        | 38 +----------------
 .../cassandra/service/accord/AccordExecutor.java   | 37 +++++++++++++++++
 .../accord/AccordExecutorAbstractLockLoop.java     |  6 ++-
 .../service/accord/AccordExecutorSimple.java       |  4 +-
 .../cassandra/service/accord/AccordJournal.java    | 47 +++++++++++++++-------
 .../cassandra/service/accord/AccordTask.java       |  2 -
 9 files changed, 90 insertions(+), 71 deletions(-)

diff --git a/.gitmodules b/.gitmodules
index 616dacf610..3d5a373c79 100644
--- a/.gitmodules
+++ b/.gitmodules
@@ -1,4 +1,4 @@
 [submodule "modules/accord"]
        path = modules/accord
-       url = https://github.com/apache/cassandra-accord.git
-       branch = trunk
+       url = https://github.com/belliottsmith/cassandra-accord.git
+       branch = fixes-250705
diff --git a/modules/accord b/modules/accord
index 71d235d56c..41023ff057 160000
--- a/modules/accord
+++ b/modules/accord
@@ -1 +1 @@
-Subproject commit 71d235d56cb315fa5ae01ec24d3d9f08dd08ac6a
+Subproject commit 41023ff0573170210ecfb3327283c815a9091c4b
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java 
b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
index e166b94b87..7c1e66857e 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
@@ -45,7 +45,6 @@ import 
accord.impl.AbstractSafeCommandStore.CommandStoreCaches;
 import accord.local.Command;
 import accord.local.CommandStore;
 import accord.local.CommandStores;
-import accord.local.Commands;
 import accord.local.NodeCommandStoreService;
 import accord.local.PreLoadContext;
 import accord.local.RedundantBefore;
@@ -55,6 +54,7 @@ import accord.primitives.PartialTxn;
 import accord.primitives.RangeDeps;
 import accord.primitives.Ranges;
 import accord.primitives.RoutableKey;
+import accord.primitives.Route;
 import accord.primitives.Status;
 import accord.primitives.Timestamp;
 import accord.primitives.TxnId;
@@ -71,7 +71,6 @@ import org.apache.cassandra.utils.Clock;
 import static accord.api.Journal.CommandUpdate;
 import static accord.api.Journal.FieldUpdates;
 import static accord.api.Journal.Load.MINIMAL;
-import static accord.api.Journal.Loader;
 import static accord.utils.Invariants.require;
 
 public class AccordCommandStore extends CommandStore
@@ -153,7 +152,7 @@ public class AccordCommandStore extends CommandStore
 
     private AccordSafeCommandStore current;
 
-    private final CommandStoreLoader loader;
+    private final AccordCommandStoreLoader loader;
 
     public AccordCommandStore(int id,
                               NodeCommandStoreService node,
@@ -182,7 +181,7 @@ public class AccordCommandStore extends CommandStore
 
         this.exclusiveExecutor = sharedExecutor.executor();
         this.commandsForRanges = new CommandsForRanges.Manager(this);
-        this.loader = new CommandStoreLoader(this);
+        this.loader = new AccordCommandStoreLoader(this);
 
         maybeLoadRedundantBefore(journal.loadRedundantBefore(id()));
         maybeLoadBootstrapBeganAt(journal.loadBootstrapBeganAt(id()));
@@ -485,7 +484,7 @@ public class AccordCommandStore extends CommandStore
         return rangeSearcher;
     }
 
-    public Loader loader()
+    public AccordCommandStoreLoader loader()
     {
         return loader;
     }
@@ -496,23 +495,21 @@ public class AccordCommandStore extends CommandStore
         super.unsafeUpsertRedundantBefore(addRedundantBefore);
     }
 
-    private static class CommandStoreLoader extends AbstractLoader
+    public static class AccordCommandStoreLoader extends AbstractLoader
     {
         private final AccordCommandStore store;
 
-        private CommandStoreLoader(AccordCommandStore store)
+        private AccordCommandStoreLoader(AccordCommandStore store)
         {
             this.store = store;
         }
 
         @Override
-        public AsyncChain<Command> load(TxnId txnId)
+        public AsyncChain<Route> load(TxnId txnId)
         {
             return store.submit(txnId, safeStore -> {
-                maybeApplyWrites(txnId, safeStore, (safeCommand, cmd) -> {
-                    Commands.applyWrites(safeStore, txnId, 
cmd).begin(store.agent);
-                });
-                return safeStore.unsafeGet(txnId).current();
+                maybeApplyWrites(safeStore, txnId);
+                return safeStore.unsafeGet(txnId).current().route();
             });
         }
     }
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java 
b/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java
index 11276a7a6c..3fa4bb0bdc 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java
@@ -17,11 +17,8 @@
  */
 package org.apache.cassandra.service.accord;
 
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
 import accord.api.Agent;
@@ -46,7 +43,6 @@ import org.apache.cassandra.metrics.CacheSizeMetrics;
 import org.apache.cassandra.schema.TableId;
 import 
org.apache.cassandra.service.accord.AccordExecutor.AccordExecutorFactory;
 import org.apache.cassandra.service.accord.api.TokenKey;
-import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
 
 import static 
org.apache.cassandra.config.AccordSpec.QueueShardModel.THREAD_PER_SHARD;
 import static 
org.apache.cassandra.config.DatabaseDescriptor.getAccordQueueShardCount;
@@ -229,38 +225,8 @@ public class AccordCommandStores extends CommandStores 
implements CacheSize
 
     public void waitForQuiescense()
     {
-        boolean runAgain = true;
-        try
-        {
-            while (true)
-            {
-                boolean hasTasks = false;
-                List<Future<?>> futures = new ArrayList<>();
-                for (AccordExecutor executor : this.executors)
-                {
-                    hasTasks |= executor.hasTasks();
-                    hasTasks |= 
Stage.MUTATION.executor().getPendingTaskCount() > 0;
-                    hasTasks |= Stage.MUTATION.executor().getActiveTaskCount() 
> 0;
-                    futures.add(executor.submit(() -> {}));
-                }
-                for (Future<?> future : futures)
-                    future.get();
-                futures.clear();
-
-                if (!runAgain)
-                    return;
-                
-                runAgain = hasTasks;
-            }
-        }
-        catch (ExecutionException e)
-        {
-            throw new IllegalStateException("Should have never been thrown", 
e);
-        }
-        catch (InterruptedException e)
-        {
-            throw new UncheckedInterruptedException(e);
-        }
+        for (AccordExecutor executor : this.executors)
+            executor.waitForQuiescence();
     }
 
     @Override
diff --git a/src/java/org/apache/cassandra/service/accord/AccordExecutor.java 
b/src/java/org/apache/cassandra/service/accord/AccordExecutor.java
index c61f5e7a9b..398585f2a0 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordExecutor.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordExecutor.java
@@ -18,6 +18,8 @@
 
 package org.apache.cassandra.service.accord;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.TimeUnit;
@@ -62,6 +64,7 @@ import org.apache.cassandra.concurrent.Shutdownable;
 import org.apache.cassandra.metrics.AccordCacheMetrics;
 import org.apache.cassandra.utils.MonotonicClock;
 import org.apache.cassandra.utils.concurrent.AsyncPromise;
+import org.apache.cassandra.utils.concurrent.Condition;
 import org.apache.cassandra.utils.concurrent.Future;
 
 import static 
org.apache.cassandra.service.accord.AccordCache.CommandAdapter.COMMAND_ADAPTER;
@@ -138,6 +141,8 @@ public abstract class AccordExecutor implements CacheSize, 
AccordCacheEntry.OnLo
     private final AccordCacheEntry.OnLoaded onRangeLoaded = 
this::onRangeLoaded;
     private final ExclusiveGlobalCaches caches;
 
+    private List<Condition> waitingForQuiescence;
+
     /**
      * The maximum total number of loads we can queue at once - this includes 
loads for range transactions,
      * which are subject to this limit as well as that imposed by {@link 
#maxQueuedRangeLoads}
@@ -224,6 +229,38 @@ public abstract class AccordExecutor implements CacheSize, 
AccordCacheEntry.OnLo
         return Stream.of();
     }
 
+    public void waitForQuiescence()
+    {
+        Condition condition;
+        lock.lock();
+        try
+        {
+            if (tasks == 0)
+            {
+                Invariants.require(running == 0);
+                return;
+            }
+            if (waitingForQuiescence == null)
+                waitingForQuiescence = new ArrayList<>();
+            condition = Condition.newOneTimeCondition();
+            waitingForQuiescence.add(condition);
+        }
+        finally
+        {
+            lock.unlock();
+        }
+        condition.awaitThrowUncheckedOnInterrupt();
+    }
+
+    protected void signalQuiescentExclusive()
+    {
+        if (waitingForQuiescence != null)
+        {
+            waitingForQuiescence.forEach(Condition::signalAll);
+            waitingForQuiescence = null;
+        }
+    }
+
     void maybeUnpauseLoading()
     {
         if (!hasPausedLoading)
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordExecutorAbstractLockLoop.java
 
b/src/java/org/apache/cassandra/service/accord/AccordExecutorAbstractLockLoop.java
index f442473a46..63fe294439 100644
--- 
a/src/java/org/apache/cassandra/service/accord/AccordExecutorAbstractLockLoop.java
+++ 
b/src/java/org/apache/cassandra/service/accord/AccordExecutorAbstractLockLoop.java
@@ -125,7 +125,8 @@ abstract class AccordExecutorAbstractLockLoop extends 
AccordExecutor
     private void pauseExclusive()
     {
         isHeldByExecutor = false;
-        --running;
+        if (--running == 0 && tasks == 0)
+            signalQuiescentExclusive();
     }
 
     private void resumeExclusive()
@@ -234,6 +235,8 @@ abstract class AccordExecutorAbstractLockLoop extends 
AccordExecutor
                                 break;
                             }
 
+                            pauseExclusive();
+
                             if (shutdown)
                             {
                                 exitLockExclusive();
@@ -241,7 +244,6 @@ abstract class AccordExecutorAbstractLockLoop extends 
AccordExecutor
                                 return;
                             }
 
-                            pauseExclusive();
                             awaitExclusive();
                             resumeExclusive();
                         }
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordExecutorSimple.java 
b/src/java/org/apache/cassandra/service/accord/AccordExecutorSimple.java
index ac0069fdc9..67449aac86 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordExecutorSimple.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordExecutorSimple.java
@@ -70,7 +70,6 @@ class AccordExecutorSimple extends AccordExecutor
         Invariants.requireArgument(threads == 1);
         this.lock = lock;
         this.executor = executorFactory().sequential(name.apply(0));
-
     }
 
     @Override
@@ -89,7 +88,10 @@ class AccordExecutorSimple extends AccordExecutor
             {
                 Task task = pollWaitingToRunExclusive();
                 if (task == null)
+                {
+                    signalQuiescentExclusive();
                     return;
+                }
 
                 --tasks;
                 try { task.preRunExclusive(null); task.run(); }
diff --git a/src/java/org/apache/cassandra/service/accord/AccordJournal.java 
b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
index c597c11f40..a2fe6d98cb 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordJournal.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
@@ -25,6 +25,7 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.NavigableMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.function.Consumer;
@@ -52,7 +53,6 @@ import accord.primitives.TxnId;
 import accord.utils.Invariants;
 import accord.utils.PersistentField;
 import accord.utils.UnhandledEnum;
-import accord.utils.async.AsyncChains;
 import accord.utils.async.AsyncResult;
 import accord.utils.async.AsyncResults;
 import org.apache.cassandra.concurrent.Shutdownable;
@@ -70,6 +70,7 @@ import org.apache.cassandra.journal.RecordPointer;
 import org.apache.cassandra.journal.SegmentCompactor;
 import org.apache.cassandra.journal.StaticSegment;
 import org.apache.cassandra.journal.ValueSerializer;
+import 
org.apache.cassandra.service.accord.AccordCommandStore.AccordCommandStoreLoader;
 import 
org.apache.cassandra.service.accord.AccordJournalValueSerializers.FlyweightImage;
 import 
org.apache.cassandra.service.accord.AccordJournalValueSerializers.IdentityAccumulator;
 import org.apache.cassandra.service.accord.JournalKey.JournalKeySupport;
@@ -82,6 +83,8 @@ import 
org.apache.cassandra.service.accord.serializers.Version;
 import org.apache.cassandra.service.accord.serializers.WaitingOnSerializer;
 import org.apache.cassandra.utils.CloseableIterator;
 import org.apache.cassandra.utils.ExecutorUtils;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.concurrent.Semaphore;
 
 import static accord.impl.CommandChange.Field.CLEANUP;
 import static accord.impl.CommandChange.anyFieldChanged;
@@ -484,6 +487,9 @@ public class AccordJournal implements accord.api.Journal, 
RangeSearcher.Supplier
     @Override
     public void replay(CommandStores commandStores)
     {
+        final Semaphore concurrency = 
Semaphore.newSemaphore(FBUtilities.getAvailableProcessors());
+        final ConcurrentLinkedQueue<Throwable> failures = new 
ConcurrentLinkedQueue<>();
+
         try (CloseableIterator<Journal.KeyRefs<JournalKey>> iter = 
journalTable.keyIterator())
         {
             JournalKey prev = null;
@@ -493,8 +499,9 @@ public class AccordJournal implements accord.api.Journal, 
RangeSearcher.Supplier
 
                 if (ref.key().type != JournalKey.Type.COMMAND_DIFF)
                     continue;
+
                 CommandStore commandStore = 
commandStores.forId(ref.key().commandStoreId);
-                Loader loader = commandStore.loader();
+                AccordCommandStoreLoader loader = (AccordCommandStoreLoader) 
commandStore.loader();
                 TxnId txnId = ref.key().id;
                 try
                 {
@@ -503,19 +510,29 @@ public class AccordJournal implements accord.api.Journal, 
RangeSearcher.Supplier
                                        ref.key().id.compareTo(prev.id) != 0,
                                        "duplicate key detected %s == %s", 
ref.key(), prev);
                     prev = ref.key();
-                    AsyncChains.getUnchecked(loader.load(txnId)
-                                                   .map(command -> {
-                                                       if 
(journalTable.shouldIndex(ref.key())
-                                                           && 
command.participants() != null
-                                                           && 
command.participants().route() != null)
-                                                       {
-                                                           
ref.segments(segment -> {
-                                                               
journalTable.safeNotify(index -> index.update(segment, 
ref.key().commandStoreId, txnId, command.participants().route()));
-                                                           });
-                                                       }
-                                                       return command;
-                                                   })
-                                                   .beginAsResult());
+
+                    Throwable rethrow = failures.poll();
+                    if (rethrow != null)
+                        throw rethrow;
+
+                    concurrency.acquireThrowUncheckedOnInterrupt(1);
+                    loader.load(txnId)
+                          .map(route -> {
+                              if (journalTable.shouldIndex(ref.key()))
+                              {
+                                  ref.segments(segment -> {
+                                      journalTable.safeNotify(index -> 
index.update(segment, ref.key().commandStoreId, txnId, route));
+                                  });
+                              }
+                              return null;
+                          }).begin((success, fail) -> {
+                              concurrency.release(1);
+                              if (fail != null)
+                              {
+                                  try { journal.handleError("Could not replay 
command " + ref.key().id, fail); }
+                                  catch (Throwable fail2) { 
failures.add(fail2); }
+                              }
+                          });
                 }
                 catch (Throwable t)
                 {
diff --git a/src/java/org/apache/cassandra/service/accord/AccordTask.java 
b/src/java/org/apache/cassandra/service/accord/AccordTask.java
index f037ab8df1..3d54a289b1 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordTask.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordTask.java
@@ -42,7 +42,6 @@ import accord.api.Journal;
 import accord.api.RoutingKey;
 import accord.local.Command;
 import accord.local.CommandStore;
-import accord.local.MaxDecidedRX;
 import accord.local.PreLoadContext;
 import accord.local.SafeCommandStore;
 import accord.local.cfk.CommandsForKey;
@@ -1092,7 +1091,6 @@ public abstract class AccordTask<R> extends 
SubmittableTask implements Runnable,
 
         void startInternal(Caches caches)
         {
-            MaxDecidedRX maxDecidedRX = commandStore.unsafeGetMaxDecidedRX();
             summaryLoader = 
commandStore.commandsForRanges().loader(preLoadContext.primaryTxnId(), 
preLoadContext.keyHistory(), keysOrRanges);
             summaryLoader.forEachInCache(keysOrRanges, summary -> 
summaries.put(summary.txnId, summary), caches);
             caches.commands().register(commandWatcher);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to