This is an automated email from the ASF dual-hosted git repository.
ifesdjeen pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 473cd7be91 Standardise Replay logic to ensure SafeCommandStore.update
is called Also Fix: - Initialise home state when calling waiting() if not
already initialised - Don't reportClosed/reportRetired for epochs that are
already closed/retired Also Improve: - Implement waitForQuiescence in
AccordExecutor - Permit replay parallelism
473cd7be91 is described below
commit 473cd7be91dbdbadb78a9b85689fb87047b1ed09
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Sat Jul 5 09:58:59 2025 +0100
Standardise Replay logic to ensure SafeCommandStore.update is called
Also Fix:
- Initialise home state when calling waiting() if not already initialised
- Don't reportClosed/reportRetired for epochs that are already
closed/retired
Also Improve:
- Implement waitForQuiescence in AccordExecutor
- Permit replay parallelism
patch by Benedict; reviewed by Alex Petrov for CASSANDRA-20750
---
.gitmodules | 4 +-
modules/accord | 2 +-
.../service/accord/AccordCommandStore.java | 21 +++++-----
.../service/accord/AccordCommandStores.java | 38 +----------------
.../cassandra/service/accord/AccordExecutor.java | 37 +++++++++++++++++
.../accord/AccordExecutorAbstractLockLoop.java | 6 ++-
.../service/accord/AccordExecutorSimple.java | 4 +-
.../cassandra/service/accord/AccordJournal.java | 47 +++++++++++++++-------
.../cassandra/service/accord/AccordTask.java | 2 -
9 files changed, 90 insertions(+), 71 deletions(-)
diff --git a/.gitmodules b/.gitmodules
index 616dacf610..3d5a373c79 100644
--- a/.gitmodules
+++ b/.gitmodules
@@ -1,4 +1,4 @@
[submodule "modules/accord"]
path = modules/accord
- url = https://github.com/apache/cassandra-accord.git
- branch = trunk
+ url = https://github.com/belliottsmith/cassandra-accord.git
+ branch = fixes-250705
diff --git a/modules/accord b/modules/accord
index 71d235d56c..41023ff057 160000
--- a/modules/accord
+++ b/modules/accord
@@ -1 +1 @@
-Subproject commit 71d235d56cb315fa5ae01ec24d3d9f08dd08ac6a
+Subproject commit 41023ff0573170210ecfb3327283c815a9091c4b
diff --git
a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
index e166b94b87..7c1e66857e 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
@@ -45,7 +45,6 @@ import
accord.impl.AbstractSafeCommandStore.CommandStoreCaches;
import accord.local.Command;
import accord.local.CommandStore;
import accord.local.CommandStores;
-import accord.local.Commands;
import accord.local.NodeCommandStoreService;
import accord.local.PreLoadContext;
import accord.local.RedundantBefore;
@@ -55,6 +54,7 @@ import accord.primitives.PartialTxn;
import accord.primitives.RangeDeps;
import accord.primitives.Ranges;
import accord.primitives.RoutableKey;
+import accord.primitives.Route;
import accord.primitives.Status;
import accord.primitives.Timestamp;
import accord.primitives.TxnId;
@@ -71,7 +71,6 @@ import org.apache.cassandra.utils.Clock;
import static accord.api.Journal.CommandUpdate;
import static accord.api.Journal.FieldUpdates;
import static accord.api.Journal.Load.MINIMAL;
-import static accord.api.Journal.Loader;
import static accord.utils.Invariants.require;
public class AccordCommandStore extends CommandStore
@@ -153,7 +152,7 @@ public class AccordCommandStore extends CommandStore
private AccordSafeCommandStore current;
- private final CommandStoreLoader loader;
+ private final AccordCommandStoreLoader loader;
public AccordCommandStore(int id,
NodeCommandStoreService node,
@@ -182,7 +181,7 @@ public class AccordCommandStore extends CommandStore
this.exclusiveExecutor = sharedExecutor.executor();
this.commandsForRanges = new CommandsForRanges.Manager(this);
- this.loader = new CommandStoreLoader(this);
+ this.loader = new AccordCommandStoreLoader(this);
maybeLoadRedundantBefore(journal.loadRedundantBefore(id()));
maybeLoadBootstrapBeganAt(journal.loadBootstrapBeganAt(id()));
@@ -485,7 +484,7 @@ public class AccordCommandStore extends CommandStore
return rangeSearcher;
}
- public Loader loader()
+ public AccordCommandStoreLoader loader()
{
return loader;
}
@@ -496,23 +495,21 @@ public class AccordCommandStore extends CommandStore
super.unsafeUpsertRedundantBefore(addRedundantBefore);
}
- private static class CommandStoreLoader extends AbstractLoader
+ public static class AccordCommandStoreLoader extends AbstractLoader
{
private final AccordCommandStore store;
- private CommandStoreLoader(AccordCommandStore store)
+ private AccordCommandStoreLoader(AccordCommandStore store)
{
this.store = store;
}
@Override
- public AsyncChain<Command> load(TxnId txnId)
+ public AsyncChain<Route> load(TxnId txnId)
{
return store.submit(txnId, safeStore -> {
- maybeApplyWrites(txnId, safeStore, (safeCommand, cmd) -> {
- Commands.applyWrites(safeStore, txnId,
cmd).begin(store.agent);
- });
- return safeStore.unsafeGet(txnId).current();
+ maybeApplyWrites(safeStore, txnId);
+ return safeStore.unsafeGet(txnId).current().route();
});
}
}
diff --git
a/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java
b/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java
index 11276a7a6c..3fa4bb0bdc 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java
@@ -17,11 +17,8 @@
*/
package org.apache.cassandra.service.accord;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import accord.api.Agent;
@@ -46,7 +43,6 @@ import org.apache.cassandra.metrics.CacheSizeMetrics;
import org.apache.cassandra.schema.TableId;
import
org.apache.cassandra.service.accord.AccordExecutor.AccordExecutorFactory;
import org.apache.cassandra.service.accord.api.TokenKey;
-import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
import static
org.apache.cassandra.config.AccordSpec.QueueShardModel.THREAD_PER_SHARD;
import static
org.apache.cassandra.config.DatabaseDescriptor.getAccordQueueShardCount;
@@ -229,38 +225,8 @@ public class AccordCommandStores extends CommandStores
implements CacheSize
public void waitForQuiescense()
{
- boolean runAgain = true;
- try
- {
- while (true)
- {
- boolean hasTasks = false;
- List<Future<?>> futures = new ArrayList<>();
- for (AccordExecutor executor : this.executors)
- {
- hasTasks |= executor.hasTasks();
- hasTasks |=
Stage.MUTATION.executor().getPendingTaskCount() > 0;
- hasTasks |= Stage.MUTATION.executor().getActiveTaskCount()
> 0;
- futures.add(executor.submit(() -> {}));
- }
- for (Future<?> future : futures)
- future.get();
- futures.clear();
-
- if (!runAgain)
- return;
-
- runAgain = hasTasks;
- }
- }
- catch (ExecutionException e)
- {
- throw new IllegalStateException("Should have never been thrown",
e);
- }
- catch (InterruptedException e)
- {
- throw new UncheckedInterruptedException(e);
- }
+ for (AccordExecutor executor : this.executors)
+ executor.waitForQuiescence();
}
@Override
diff --git a/src/java/org/apache/cassandra/service/accord/AccordExecutor.java
b/src/java/org/apache/cassandra/service/accord/AccordExecutor.java
index c61f5e7a9b..398585f2a0 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordExecutor.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordExecutor.java
@@ -18,6 +18,8 @@
package org.apache.cassandra.service.accord;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.TimeUnit;
@@ -62,6 +64,7 @@ import org.apache.cassandra.concurrent.Shutdownable;
import org.apache.cassandra.metrics.AccordCacheMetrics;
import org.apache.cassandra.utils.MonotonicClock;
import org.apache.cassandra.utils.concurrent.AsyncPromise;
+import org.apache.cassandra.utils.concurrent.Condition;
import org.apache.cassandra.utils.concurrent.Future;
import static
org.apache.cassandra.service.accord.AccordCache.CommandAdapter.COMMAND_ADAPTER;
@@ -138,6 +141,8 @@ public abstract class AccordExecutor implements CacheSize,
AccordCacheEntry.OnLo
private final AccordCacheEntry.OnLoaded onRangeLoaded =
this::onRangeLoaded;
private final ExclusiveGlobalCaches caches;
+ private List<Condition> waitingForQuiescence;
+
/**
* The maximum total number of loads we can queue at once - this includes
loads for range transactions,
* which are subject to this limit as well as that imposed by {@link
#maxQueuedRangeLoads}
@@ -224,6 +229,38 @@ public abstract class AccordExecutor implements CacheSize,
AccordCacheEntry.OnLo
return Stream.of();
}
+ public void waitForQuiescence()
+ {
+ Condition condition;
+ lock.lock();
+ try
+ {
+ if (tasks == 0)
+ {
+ Invariants.require(running == 0);
+ return;
+ }
+ if (waitingForQuiescence == null)
+ waitingForQuiescence = new ArrayList<>();
+ condition = Condition.newOneTimeCondition();
+ waitingForQuiescence.add(condition);
+ }
+ finally
+ {
+ lock.unlock();
+ }
+ condition.awaitThrowUncheckedOnInterrupt();
+ }
+
+ protected void signalQuiescentExclusive()
+ {
+ if (waitingForQuiescence != null)
+ {
+ waitingForQuiescence.forEach(Condition::signalAll);
+ waitingForQuiescence = null;
+ }
+ }
+
void maybeUnpauseLoading()
{
if (!hasPausedLoading)
diff --git
a/src/java/org/apache/cassandra/service/accord/AccordExecutorAbstractLockLoop.java
b/src/java/org/apache/cassandra/service/accord/AccordExecutorAbstractLockLoop.java
index f442473a46..63fe294439 100644
---
a/src/java/org/apache/cassandra/service/accord/AccordExecutorAbstractLockLoop.java
+++
b/src/java/org/apache/cassandra/service/accord/AccordExecutorAbstractLockLoop.java
@@ -125,7 +125,8 @@ abstract class AccordExecutorAbstractLockLoop extends
AccordExecutor
private void pauseExclusive()
{
isHeldByExecutor = false;
- --running;
+ if (--running == 0 && tasks == 0)
+ signalQuiescentExclusive();
}
private void resumeExclusive()
@@ -234,6 +235,8 @@ abstract class AccordExecutorAbstractLockLoop extends
AccordExecutor
break;
}
+ pauseExclusive();
+
if (shutdown)
{
exitLockExclusive();
@@ -241,7 +244,6 @@ abstract class AccordExecutorAbstractLockLoop extends
AccordExecutor
return;
}
- pauseExclusive();
awaitExclusive();
resumeExclusive();
}
diff --git
a/src/java/org/apache/cassandra/service/accord/AccordExecutorSimple.java
b/src/java/org/apache/cassandra/service/accord/AccordExecutorSimple.java
index ac0069fdc9..67449aac86 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordExecutorSimple.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordExecutorSimple.java
@@ -70,7 +70,6 @@ class AccordExecutorSimple extends AccordExecutor
Invariants.requireArgument(threads == 1);
this.lock = lock;
this.executor = executorFactory().sequential(name.apply(0));
-
}
@Override
@@ -89,7 +88,10 @@ class AccordExecutorSimple extends AccordExecutor
{
Task task = pollWaitingToRunExclusive();
if (task == null)
+ {
+ signalQuiescentExclusive();
return;
+ }
--tasks;
try { task.preRunExclusive(null); task.run(); }
diff --git a/src/java/org/apache/cassandra/service/accord/AccordJournal.java
b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
index c597c11f40..a2fe6d98cb 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordJournal.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
@@ -25,6 +25,7 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.NavigableMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
@@ -52,7 +53,6 @@ import accord.primitives.TxnId;
import accord.utils.Invariants;
import accord.utils.PersistentField;
import accord.utils.UnhandledEnum;
-import accord.utils.async.AsyncChains;
import accord.utils.async.AsyncResult;
import accord.utils.async.AsyncResults;
import org.apache.cassandra.concurrent.Shutdownable;
@@ -70,6 +70,7 @@ import org.apache.cassandra.journal.RecordPointer;
import org.apache.cassandra.journal.SegmentCompactor;
import org.apache.cassandra.journal.StaticSegment;
import org.apache.cassandra.journal.ValueSerializer;
+import
org.apache.cassandra.service.accord.AccordCommandStore.AccordCommandStoreLoader;
import
org.apache.cassandra.service.accord.AccordJournalValueSerializers.FlyweightImage;
import
org.apache.cassandra.service.accord.AccordJournalValueSerializers.IdentityAccumulator;
import org.apache.cassandra.service.accord.JournalKey.JournalKeySupport;
@@ -82,6 +83,8 @@ import
org.apache.cassandra.service.accord.serializers.Version;
import org.apache.cassandra.service.accord.serializers.WaitingOnSerializer;
import org.apache.cassandra.utils.CloseableIterator;
import org.apache.cassandra.utils.ExecutorUtils;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.concurrent.Semaphore;
import static accord.impl.CommandChange.Field.CLEANUP;
import static accord.impl.CommandChange.anyFieldChanged;
@@ -484,6 +487,9 @@ public class AccordJournal implements accord.api.Journal,
RangeSearcher.Supplier
@Override
public void replay(CommandStores commandStores)
{
+ final Semaphore concurrency =
Semaphore.newSemaphore(FBUtilities.getAvailableProcessors());
+ final ConcurrentLinkedQueue<Throwable> failures = new
ConcurrentLinkedQueue<>();
+
try (CloseableIterator<Journal.KeyRefs<JournalKey>> iter =
journalTable.keyIterator())
{
JournalKey prev = null;
@@ -493,8 +499,9 @@ public class AccordJournal implements accord.api.Journal,
RangeSearcher.Supplier
if (ref.key().type != JournalKey.Type.COMMAND_DIFF)
continue;
+
CommandStore commandStore =
commandStores.forId(ref.key().commandStoreId);
- Loader loader = commandStore.loader();
+ AccordCommandStoreLoader loader = (AccordCommandStoreLoader)
commandStore.loader();
TxnId txnId = ref.key().id;
try
{
@@ -503,19 +510,29 @@ public class AccordJournal implements accord.api.Journal,
RangeSearcher.Supplier
ref.key().id.compareTo(prev.id) != 0,
"duplicate key detected %s == %s",
ref.key(), prev);
prev = ref.key();
- AsyncChains.getUnchecked(loader.load(txnId)
- .map(command -> {
- if
(journalTable.shouldIndex(ref.key())
- &&
command.participants() != null
- &&
command.participants().route() != null)
- {
-
ref.segments(segment -> {
-
journalTable.safeNotify(index -> index.update(segment,
ref.key().commandStoreId, txnId, command.participants().route()));
- });
- }
- return command;
- })
- .beginAsResult());
+
+ Throwable rethrow = failures.poll();
+ if (rethrow != null)
+ throw rethrow;
+
+ concurrency.acquireThrowUncheckedOnInterrupt(1);
+ loader.load(txnId)
+ .map(route -> {
+ if (journalTable.shouldIndex(ref.key()))
+ {
+ ref.segments(segment -> {
+ journalTable.safeNotify(index ->
index.update(segment, ref.key().commandStoreId, txnId, route));
+ });
+ }
+ return null;
+ }).begin((success, fail) -> {
+ concurrency.release(1);
+ if (fail != null)
+ {
+ try { journal.handleError("Could not replay
command " + ref.key().id, fail); }
+ catch (Throwable fail2) {
failures.add(fail2); }
+ }
+ });
}
catch (Throwable t)
{
diff --git a/src/java/org/apache/cassandra/service/accord/AccordTask.java
b/src/java/org/apache/cassandra/service/accord/AccordTask.java
index f037ab8df1..3d54a289b1 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordTask.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordTask.java
@@ -42,7 +42,6 @@ import accord.api.Journal;
import accord.api.RoutingKey;
import accord.local.Command;
import accord.local.CommandStore;
-import accord.local.MaxDecidedRX;
import accord.local.PreLoadContext;
import accord.local.SafeCommandStore;
import accord.local.cfk.CommandsForKey;
@@ -1092,7 +1091,6 @@ public abstract class AccordTask<R> extends
SubmittableTask implements Runnable,
void startInternal(Caches caches)
{
- MaxDecidedRX maxDecidedRX = commandStore.unsafeGetMaxDecidedRX();
summaryLoader =
commandStore.commandsForRanges().loader(preLoadContext.primaryTxnId(),
preLoadContext.keyHistory(), keysOrRanges);
summaryLoader.forEachInCache(keysOrRanges, summary ->
summaries.put(summary.txnId, summary), caches);
caches.commands().register(commandWatcher);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]