This is an automated email from the ASF dual-hosted git repository.
ifesdjeen pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
The following commit(s) were added to refs/heads/trunk by this push:
new e587fc4c Standardise Replay logic to ensure SafeCommandStore.update is
called Also Fix: - Initialise home state when calling waiting() if not already
initialised - Don't reportClosed/reportRetired for epochs that are already
closed/retired
e587fc4c is described below
commit e587fc4c090aeb74a363af43295e0f7c7825445a
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Sat Jul 5 09:57:13 2025 +0100
Standardise Replay logic to ensure SafeCommandStore.update is called
Also Fix:
- Initialise home state when calling waiting() if not already initialised
- Don't reportClosed/reportRetired for epochs that are already
closed/retired
patch by Benedict; reviewed by Alex Petrov for CASSANDRA-20750
---
accord-core/src/main/java/accord/api/Journal.java | 2 +-
.../src/main/java/accord/impl/AbstractLoader.java | 36 +++++----
.../java/accord/impl/InMemoryCommandStore.java | 43 +++++-----
.../impl/progresslog/DefaultProgressLog.java | 91 +++++++++++++---------
.../src/main/java/accord/local/Commands.java | 4 +-
.../src/main/java/accord/local/SafeCommand.java | 2 +-
.../main/java/accord/topology/TopologyManager.java | 28 ++++---
.../java/accord/impl/basic/InMemoryJournal.java | 2 +-
8 files changed, 113 insertions(+), 95 deletions(-)
diff --git a/accord-core/src/main/java/accord/api/Journal.java
b/accord-core/src/main/java/accord/api/Journal.java
index 99806e2d..f491aafd 100644
--- a/accord-core/src/main/java/accord/api/Journal.java
+++ b/accord-core/src/main/java/accord/api/Journal.java
@@ -175,7 +175,7 @@ public interface Journal
*/
interface Loader
{
- AsyncChain<Command> load(TxnId txnId);
+ AsyncChain<?> load(TxnId txnId);
}
diff --git a/accord-core/src/main/java/accord/impl/AbstractLoader.java
b/accord-core/src/main/java/accord/impl/AbstractLoader.java
index 9020b96a..3e0828d7 100644
--- a/accord-core/src/main/java/accord/impl/AbstractLoader.java
+++ b/accord-core/src/main/java/accord/impl/AbstractLoader.java
@@ -18,18 +18,16 @@
package accord.impl;
-import java.util.function.BiConsumer;
-
import accord.api.Journal;
-import accord.local.Cleanup;
import accord.local.Command;
+import accord.local.CommandStore;
import accord.local.Commands;
import accord.local.SafeCommand;
import accord.local.SafeCommandStore;
+import accord.primitives.Participants;
import accord.primitives.SaveStatus;
import accord.primitives.TxnId;
-import static accord.local.Cleanup.Input.FULL;
import static accord.primitives.SaveStatus.PreApplied;
import static accord.primitives.Status.Stable;
import static accord.primitives.Status.Truncated;
@@ -37,27 +35,31 @@ import static accord.primitives.Txn.Kind.Write;
public abstract class AbstractLoader implements Journal.Loader
{
- protected Command loadInternal(Command command, SafeCommandStore safeStore)
- {
- TxnId txnId = command.txnId();
- Cleanup cleanup = Cleanup.shouldCleanup(FULL, safeStore, command,
command.participants());
- if (cleanup != Cleanup.NO)
- command = Commands.purge(safeStore, command, cleanup);
-
- return safeStore.unsafeGetNoCleanup(txnId).update(safeStore, command);
- }
-
- protected void maybeApplyWrites(TxnId txnId, SafeCommandStore safeStore,
BiConsumer<SafeCommand, Command> apply)
+ protected void maybeApplyWrites(SafeCommandStore safeStore, TxnId txnId)
{
SafeCommand safeCommand = safeStore.unsafeGet(txnId);
Command command = safeCommand.current();
if (command.is(Stable) || command.saveStatus() == PreApplied)
{
- Commands.maybeExecute(safeStore, safeCommand, command, true, true);
+ if (Commands.maybeExecute(safeStore, safeCommand, command, true,
true))
+ return;
}
else if (command.txnId().is(Write) &&
command.saveStatus().compareTo(SaveStatus.Stable) >= 0 &&
!command.hasBeen(Truncated))
{
- apply.accept(safeCommand, command);
+ CommandStore unsafeStore = safeStore.commandStore();
+ Command.Executed executed = command.asExecuted();
+ Participants<?> executes = executed.participants().stillExecutes();
+ if (!executes.isEmpty())
+ {
+ command.writes()
+ .apply(safeStore, executes, command.partialTxn())
+ .invoke(() -> unsafeStore.build(txnId, ss -> {
+ Commands.postApply(ss, txnId, -1, true);
+ }))
+ .begin(safeStore.agent());
+ return;
+ }
}
+ safeCommand.update(safeStore, command, true);
}
}
diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
index 31c7dfbf..08d59d83 100644
--- a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
+++ b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
@@ -49,13 +49,13 @@ import accord.api.Journal;
import accord.api.LocalListeners;
import accord.api.ProgressLog;
import accord.api.RoutingKey;
-import accord.api.Write;
import accord.impl.progresslog.DefaultProgressLog;
import accord.local.Cleanup;
import accord.local.Command;
import accord.local.CommandStore;
import accord.local.CommandStores.RangesForEpoch;
import accord.local.CommandSummaries;
+import accord.local.Commands;
import accord.local.KeyHistory;
import accord.local.NodeCommandStoreService;
import accord.local.PreLoadContext;
@@ -84,7 +84,6 @@ import accord.primitives.Unseekables;
import accord.utils.Invariants;
import accord.utils.async.AsyncChain;
import accord.utils.async.AsyncChains;
-import accord.utils.async.AsyncResult;
import accord.utils.async.Cancellable;
import org.agrona.collections.ObjectHashSet;
@@ -1169,46 +1168,42 @@ public abstract class InMemoryCommandStore extends
CommandStore
return txnId;
}
- private AsyncChain<Command> load(Command command)
+ protected TxnId loadInternal(Command command, SafeCommandStore
safeStore)
+ {
+ TxnId txnId = command.txnId();
+ Cleanup cleanup = Cleanup.shouldCleanup(FULL, safeStore, command,
command.participants());
+ if (cleanup != Cleanup.NO)
+ command = Commands.purge(safeStore, command, cleanup);
+
+ safeStore.unsafeGetNoCleanup(txnId).update(safeStore, command);
+ return txnId;
+ }
+
+ private AsyncChain<TxnId> load(Command command)
{
return
AsyncChains.success(commandStore.executeInContext(commandStore,
context(command, ASYNC),
(SafeCommandStore safeStore) -> loadInternal(command, safeStore)));
}
- private AsyncChain<Command> apply(Command command)
+ private AsyncChain<Void> apply(TxnId txnId)
{
return
AsyncChains.success(commandStore.executeInContext(commandStore,
-
context(command, SYNC),
+ txnId,
(SafeCommandStore safeStore) -> {
-
maybeApplyWrites(command.txnId(), safeStore, (safeCommand, cmd) -> {
-
applyWritesSync(safeStore, safeCommand, cmd);
- });
-
return command;
+
maybeApplyWrites(safeStore, txnId);
+
return null;
}));
}
- private void applyWritesSync(SafeCommandStore safeStore, SafeCommand
safeCommand, Command command)
- {
- Command.Executed executed = command.asExecuted();
- Participants<?> executes = executed.participants().stillExecutes();
- if (!executes.isEmpty())
- {
- AsyncResult<Void> result =
command.writes().apply(Write.InMemoryWrite::applySync,
safeStore.commandStore(), executes, command.partialTxn()).beginAsResult();
- Invariants.require(result.isDone());
- safeCommand.applied(safeStore);
- safeStore.notifyListeners(safeCommand, command);
- }
- }
-
@Override
- public AsyncChain<Command> load(TxnId txnId)
+ public AsyncChain<Void> load(TxnId txnId)
{
// TODO (required): consider this race condition some more:
// - can we avoid double-applying?
// - is this definitely safe?
if (commandStore.hasCommand(txnId))
- return apply(commandStore.command(txnId).value());
+ return apply(txnId);
Command command =
commandStore.journal.loadCommand(commandStore.id, txnId,
commandStore.unsafeGetRedundantBefore(), commandStore.durableBefore());
if (command == null)
diff --git
a/accord-core/src/main/java/accord/impl/progresslog/DefaultProgressLog.java
b/accord-core/src/main/java/accord/impl/progresslog/DefaultProgressLog.java
index 469f551d..f349e8e0 100644
--- a/accord-core/src/main/java/accord/impl/progresslog/DefaultProgressLog.java
+++ b/accord-core/src/main/java/accord/impl/progresslog/DefaultProgressLog.java
@@ -195,41 +195,8 @@ public class DefaultProgressLog implements ProgressLog,
Consumer<SafeCommandStor
TxnState state = null;
Route<?> beforeRoute = before.route();
Route<?> afterRoute = after.route();
- if (afterRoute != null && (beforeRoute == null || force))
- {
- RoutingKey homeKey = afterRoute.homeKey();
- Ranges coordinateRanges = safeStore.coordinateRanges(txnId);
- boolean isHome = coordinateRanges.contains(homeKey);
- state = get(txnId);
- if (isHome)
- {
- if (state == null)
- state = insert(txnId);
-
- if (after.durability().isDurableOrInvalidated())
- {
- state.setHomeDoneAndMaybeRemove(this);
- state = maybeFetch(safeStore, txnId, after, state);
- }
- else
- {
- state.set(safeStore, this, Undecided, Queued);
- }
- }
- else if (state != null)
- {
- // not home shard
- state.setHomeDone(this);
- }
- }
- else if (after.durability().isDurableOrInvalidated() && (force ||
!before.durability().isDurableOrInvalidated()))
- {
- state = get(txnId);
- if (state != null)
- state.setHomeDoneAndMaybeRemove(this);
-
- state = maybeFetch(safeStore, txnId, after, state);
- }
+ if (force || (afterRoute != null && beforeRoute == null) ||
(after.durability().isDurableOrInvalidated() &&
!before.durability().isDurableOrInvalidated()))
+ state = updateHomeState(safeStore, after, get(txnId));
SaveStatus beforeSaveStatus = before.saveStatus();
SaveStatus afterSaveStatus = after.saveStatus();
@@ -260,7 +227,51 @@ public class DefaultProgressLog implements ProgressLog,
Consumer<SafeCommandStor
}
}
- private TxnState maybeFetch(SafeCommandStore safeStore, TxnId txnId,
Command after, TxnState state)
+ private TxnState updateHomeState(SafeCommandStore safeStore, Command
after, @Nullable TxnState state)
+ {
+ Route<?> route = after.route();
+ if (after.durability().isDurableOrInvalidated())
+ {
+ // command is durable, so we don't need to coordinate it - whether
we're the home shard or not
+ if (state != null)
+ state.setHomeDone(this);
+
+ // ... and we should be able to fetch its outcome if we need it
+ state = maybeFetch(safeStore, after, state);
+
+ if (state != null && state.maybeRemove(this))
+ state = null;
+
+ return state;
+ }
+
+ if (route == null)
+ return state; // we don't know if we're the home shard
+
+ TxnId txnId = after.txnId();
+ RoutingKey homeKey = route.homeKey();
+ Ranges coordinateRanges = safeStore.coordinateRanges(txnId);
+ boolean isHome = coordinateRanges.contains(homeKey);
+ state = get(txnId);
+ if (isHome)
+ {
+ if (state == null)
+ state = insert(txnId);
+
+ Invariants.require(!after.durability().isDurableOrInvalidated());
+ if (!state.isHomeInitialised())
+ state.set(safeStore, this, Undecided, Queued); // initialise
+ }
+ else if (state != null)
+ {
+ // not home shard
+ state.setHomeDone(this);
+ }
+
+ return state;
+ }
+
+ private TxnState maybeFetch(SafeCommandStore safeStore, Command after,
TxnState state)
{
if (after.hasBeen(PreApplied))
return state;
@@ -273,7 +284,7 @@ public class DefaultProgressLog implements ProgressLog,
Consumer<SafeCommandStor
{
// this command should be ready to apply locally, so fetch it
if (state == null)
- state = insert(txnId);
+ state = insert(after.txnId());
state.waiting().setBlockedUntil(safeStore, this, CanApply);
}
return state;
@@ -388,6 +399,7 @@ public class DefaultProgressLog implements ProgressLog,
Consumer<SafeCommandStor
if (!blockedBy.txnId().isVisible())
return;
+
Command command = blockedBy.current();
if (command == null) command = uninitialised(blockedBy.txnId());
SaveStatus saveStatus = command.saveStatus();
@@ -419,7 +431,7 @@ public class DefaultProgressLog implements ProgressLog,
Consumer<SafeCommandStor
if (blockedOnStoreParticipants != null) updatedParticipants =
updatedParticipants.supplementOrMerge(saveStatus, blockedOnStoreParticipants);
if (blockedOnStoreParticipants2 != null) updatedParticipants =
updatedParticipants.supplementOrMerge(saveStatus, blockedOnStoreParticipants2);
if (participants != updatedParticipants)
- update = update.updateParticipants(updatedParticipants);
+ update = command.updateParticipants(updatedParticipants);
if (update != command)
command = blockedBy.incidentalUpdate(update);
@@ -438,6 +450,9 @@ public class DefaultProgressLog implements ProgressLog,
Consumer<SafeCommandStor
// later topology that wasn't covered by
its coordination
TxnState state = ensure(blockedBy.txnId());
state.waiting().setBlockedUntil(safeStore, this, blockedUntil);
+ // in case progress log hasn't been updated (e.g. bug on replay),
force an update to the command's state since we're about to wait on it
+ if (!state.isHomeInitialised() && command.route() != null)
+ updateHomeState(safeStore, command, state);
}
@Override
diff --git a/accord-core/src/main/java/accord/local/Commands.java
b/accord-core/src/main/java/accord/local/Commands.java
index 7cbb938b..72bcd3d5 100644
--- a/accord-core/src/main/java/accord/local/Commands.java
+++ b/accord-core/src/main/java/accord/local/Commands.java
@@ -618,7 +618,7 @@ public class Commands
}
}
- protected static void postApply(SafeCommandStore safeStore, TxnId txnId,
long t0, boolean forceApply)
+ public static void postApply(SafeCommandStore safeStore, TxnId txnId, long
t0, boolean forceApply)
{
SafeCommand safeCommand = safeStore.get(txnId);
Command command = safeCommand.current();
@@ -662,7 +662,7 @@ public class Commands
}
@VisibleForImplementation
- public static AsyncChain<Void> applyWrites(SafeCommandStore safeStore,
PreLoadContext context, Command command)
+ public static AsyncChain<Void> replayWrites(SafeCommandStore safeStore,
PreLoadContext context, Command command)
{
CommandStore unsafeStore = safeStore.commandStore();
Command.Executed executed = command.asExecuted();
diff --git a/accord-core/src/main/java/accord/local/SafeCommand.java
b/accord-core/src/main/java/accord/local/SafeCommand.java
index fe4024e1..dcccb4c3 100644
--- a/accord-core/src/main/java/accord/local/SafeCommand.java
+++ b/accord-core/src/main/java/accord/local/SafeCommand.java
@@ -64,7 +64,7 @@ public abstract class SafeCommand
return update(safeStore, update, false);
}
- private <C extends Command> C update(SafeCommandStore safeStore, C update,
boolean force)
+ public <C extends Command> C update(SafeCommandStore safeStore, C update,
boolean force)
{
Command prev = current();
if (prev == update)
diff --git a/accord-core/src/main/java/accord/topology/TopologyManager.java
b/accord-core/src/main/java/accord/topology/TopologyManager.java
index 31f57430..fc30ef31 100644
--- a/accord-core/src/main/java/accord/topology/TopologyManager.java
+++ b/accord-core/src/main/java/accord/topology/TopologyManager.java
@@ -193,24 +193,28 @@ public class TopologyManager
}
}
- boolean recordClosed(Ranges ranges)
+ // returns those ranges that weren't already closed, so that they can
be propagated to lower epochs
+ Ranges recordClosed(Ranges ranges)
{
- if (closed.containsAll(ranges))
- return false;
+ ranges = ranges.without(closed);
+ if (ranges.isEmpty())
+ return ranges;
closed = closed.union(MERGE_ADJACENT, ranges);
Invariants.require(closed.mergeTouching() == closed);
- return true;
+ return ranges.without(addedRanges);
}
- boolean recordRetired(Ranges ranges)
+ // returns those ranges that weren't already retired, so that they can
be propagated to lower epochs
+ Ranges recordRetired(Ranges ranges)
{
- if (retired.containsAll(ranges))
- return false;
+ ranges = ranges.without(retired);
+ if (ranges.isEmpty())
+ return ranges;
closed = closed.union(MERGE_ADJACENT, ranges);
retired = retired.union(MERGE_ADJACENT, ranges);
Invariants.require(closed.mergeTouching() == closed);
Invariants.require(retired.mergeTouching() == retired);
- return true;
+ return ranges.without(addedRanges);
}
Topology global()
@@ -434,7 +438,9 @@ public class TopologyManager
Invariants.require(epoch < minEpoch(), "Could not find epoch
%d. Min: %d, current: %d", epoch, minEpoch(), currentEpoch);
return; // notification came for an already truncated epoch
}
- while (epochs[i].recordClosed(ranges) && ++i < epochs.length) {}
+
+ while (!ranges.isEmpty() && i < epochs.length)
+ ranges = epochs[i++].recordClosed(ranges);
}
/**
@@ -458,8 +464,8 @@ public class TopologyManager
return;
}
- for (int i = retiredIdx; i < epochs.length; i++)
- epochs[i].recordRetired(ranges);
+ for (int i = retiredIdx; !ranges.isEmpty() && i < epochs.length;
i++)
+ ranges = epochs[i].recordRetired(ranges);
}
private Notifications pending(long epoch)
diff --git a/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java
b/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java
index 220d61f4..8adf14ac 100644
--- a/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java
+++ b/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java
@@ -614,7 +614,7 @@ public class InMemoryJournal implements Journal
{
if (e.getValue().isEmpty()) continue;
- AsyncResult<Command> res =
loader.load(e.getKey()).beginAsResult();
+ AsyncResult<?> res = loader.load(e.getKey()).beginAsResult();
AsyncChains.getUnchecked(res);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]