This is an automated email from the ASF dual-hosted git repository. konstantinov pushed a commit to branch fixes-260226 in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
commit 3c6df14fa929b027e299e1a9915b78e00fe83add Author: Benedict Elliott Smith <[email protected]> AuthorDate: Sun Jan 11 17:41:41 2026 +0000 move and rename lots of things --- accord-core/src/main/java/accord/api/DataStore.java | 14 +++++++++++++- accord-core/src/main/java/accord/api/Journal.java | 19 +++++++++---------- .../src/main/java/accord/local/CommandStore.java | 4 ++-- .../src/main/java/accord/local/SafeCommandStore.java | 4 ++++ .../src/test/java/accord/impl/basic/Cluster.java | 2 +- .../test/java/accord/impl/basic/InMemoryJournal.java | 2 +- .../test/java/accord/impl/basic/LoggingJournal.java | 4 ++-- .../src/test/java/accord/impl/list/ListStore.java | 12 ++---------- .../src/test/java/accord/impl/mock/MockStore.java | 4 ++-- .../src/main/java/accord/maelstrom/Cluster.java | 2 +- .../main/java/accord/maelstrom/MaelstromStore.java | 4 ++-- 11 files changed, 39 insertions(+), 32 deletions(-) diff --git a/accord-core/src/main/java/accord/api/DataStore.java b/accord-core/src/main/java/accord/api/DataStore.java index 45eb231f..74dd68c6 100644 --- a/accord-core/src/main/java/accord/api/DataStore.java +++ b/accord-core/src/main/java/accord/api/DataStore.java @@ -140,10 +140,22 @@ public interface DataStore */ FetchResult fetch(Node node, SafeCommandStore safeStore, Ranges ranges, SyncPoint syncPoint, FetchRanges callback, FetchKind kind); + /** + * Logical fsync-like operation: anything within the provided ranges written to the store prior to the + * invocation of this method must be durable once the AsyncResult completes successfully. That is, a restart of the node must + * restore the DataStore to a state on or after the point at which snapshot was invoked. + * + * TODO (desired): clunky to pass integer flags around; is there a neater implementation-agnostic alternative? + */ + default void ensureDurable(CommandStore commandStore, Ranges ranges, RedundantBefore reportOnSuccess, int flags) + { + ensureDurable(commandStore, reportOnSuccess, flags); + } + /** * Logical fsync-like operation: anything written to the store prior to the invocation of this method * must be durable once the AsyncResult completes successfully. That is, a restart of the node must * restore the DataStore to a state on or after the point at which snapshot was invoked. */ - void ensureDurable(CommandStore commandStore, Ranges ranges, RedundantBefore reportOnSuccess); + void ensureDurable(CommandStore commandStore, RedundantBefore reportOnSuccess, int flags); } diff --git a/accord-core/src/main/java/accord/api/Journal.java b/accord-core/src/main/java/accord/api/Journal.java index d9533576..3b7c5dbe 100644 --- a/accord-core/src/main/java/accord/api/Journal.java +++ b/accord-core/src/main/java/accord/api/Journal.java @@ -62,26 +62,25 @@ public interface Journal Command.MinimalWithDeps loadMinimalWithDeps(int store, TxnId txnId, RedundantBefore redundantBefore, DurableBefore durableBefore); void saveCommand(int store, CommandUpdate value, Runnable onFlush); - List<? extends TopologyUpdate> replayTopologies(); + List<? extends TopologyUpdate> loadTopologies(); void saveTopology(TopologyUpdate topologyUpdate, Runnable onFlush); - void purge(CommandStores commandStores, EpochSupplier minEpoch); - - /** - * Replays all messages from journal to rehydrate CommandStores state. Returns whether it has seen (and ignored) - * any exceptions during replay. - */ - boolean replay(CommandStores commandStores, Object param); - RedundantBefore loadRedundantBefore(int store); NavigableMap<TxnId, Ranges> loadBootstrapBeganAt(int store); NavigableMap<Timestamp, Ranges> loadSafeToRead(int store); CommandStores.RangesForEpoch loadRangesForEpoch(int store); + void saveStoreState(int store, FieldUpdates fieldUpdates, Runnable onFlush); Persister<DurableBefore, DurableBefore> durableBeforePersister(); - void saveStoreState(int store, FieldUpdates fieldUpdates, Runnable onFlush); + void purge(CommandStores commandStores, EpochSupplier minEpoch); + /** + * Replays all messages from journal to rehydrate CommandStores state. Returns whether it has seen (and ignored) + * any exceptions during replay. + */ + boolean replay(CommandStores commandStores, Object param); + class TopologyUpdate { public final Int2ObjectHashMap<CommandStores.RangesForEpoch> commandStores; diff --git a/accord-core/src/main/java/accord/local/CommandStore.java b/accord-core/src/main/java/accord/local/CommandStore.java index ac9e7d98..94f0742b 100644 --- a/accord-core/src/main/java/accord/local/CommandStore.java +++ b/accord-core/src/main/java/accord/local/CommandStore.java @@ -538,7 +538,7 @@ public abstract class CommandStore implements AbstractAsyncExecutor, SequentialA safeStore.upsertRedundantBefore(addNow); RedundantBefore addOnDataStoreDurable = RedundantBefore.create(ranges, txnId, LOCALLY_DURABLE_TO_DATA_STORE_ONLY); RedundantBefore addOnCommandStoreDurable = RedundantBefore.create(ranges, txnId, LOCALLY_DURABLE_TO_COMMAND_STORE_ONLY); - dataStore.ensureDurable(this, ranges, addOnDataStoreDurable); + dataStore.ensureDurable(this, ranges, addOnDataStoreDurable, 0); ensureDurable(ranges, addOnCommandStoreDurable); } @@ -878,7 +878,7 @@ public abstract class CommandStore implements AbstractAsyncExecutor, SequentialA if (globalSyncId.is(HLC_BOUND) || !requiresUniqueHlcs()) { RedundantBefore addOnDataStoreDurable = RedundantBefore.create(slicedRanges, globalSyncId, GC_BEFORE_AND_LOCALLY_DURABLE); - dataStore.ensureDurable(this, slicedRanges, addOnDataStoreDurable); + dataStore.ensureDurable(this, slicedRanges, addOnDataStoreDurable, 0); } } diff --git a/accord-core/src/main/java/accord/local/SafeCommandStore.java b/accord-core/src/main/java/accord/local/SafeCommandStore.java index 06296c65..ba0ecca6 100644 --- a/accord-core/src/main/java/accord/local/SafeCommandStore.java +++ b/accord-core/src/main/java/accord/local/SafeCommandStore.java @@ -561,6 +561,10 @@ public abstract class SafeCommandStore implements RangesForEpochSupplier, Redund */ public abstract void upsertRedundantBefore(RedundantBefore addRedundantBefore); + public void reportDurable(RedundantBefore addRedundantBefore, int flags) + { + upsertRedundantBefore(addRedundantBefore); + } protected void unsafeSetRedundantBefore(RedundantBefore newRedundantBefore) { diff --git a/accord-core/src/test/java/accord/impl/basic/Cluster.java b/accord-core/src/test/java/accord/impl/basic/Cluster.java index 2f72ef1f..42d18f93 100644 --- a/accord-core/src/test/java/accord/impl/basic/Cluster.java +++ b/accord-core/src/test/java/accord/impl/basic/Cluster.java @@ -751,7 +751,7 @@ public class Cluster Journal.TopologyUpdate lastUpdate = null; { - Iterator<? extends Journal.TopologyUpdate> iter = journal.replayTopologies().iterator(); + Iterator<? extends Journal.TopologyUpdate> iter = journal.loadTopologies().iterator(); while (iter.hasNext()) { Journal.TopologyUpdate update = iter.next(); diff --git a/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java b/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java index c838c891..5d087354 100644 --- a/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java +++ b/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java @@ -252,7 +252,7 @@ public class InMemoryJournal implements Journal } @Override - public List<TopologyUpdate> replayTopologies() + public List<TopologyUpdate> loadTopologies() { return new ArrayList<>(topologyUpdates); } diff --git a/accord-core/src/test/java/accord/impl/basic/LoggingJournal.java b/accord-core/src/test/java/accord/impl/basic/LoggingJournal.java index b55c6c2a..5c342440 100644 --- a/accord-core/src/test/java/accord/impl/basic/LoggingJournal.java +++ b/accord-core/src/test/java/accord/impl/basic/LoggingJournal.java @@ -112,10 +112,10 @@ public class LoggingJournal implements Journal } @Override - public List<? extends TopologyUpdate> replayTopologies() + public List<? extends TopologyUpdate> loadTopologies() { log("REPLAY TOPOLOGIES\n"); - return delegate.replayTopologies(); + return delegate.loadTopologies(); } @Override diff --git a/accord-core/src/test/java/accord/impl/list/ListStore.java b/accord-core/src/test/java/accord/impl/list/ListStore.java index 1458e0eb..aaeb0468 100644 --- a/accord-core/src/test/java/accord/impl/list/ListStore.java +++ b/accord-core/src/test/java/accord/impl/list/ListStore.java @@ -164,20 +164,12 @@ public class ListStore extends Snapshotter<ListStore.Snapshot> implements DataSt } } - private Scheduler.Scheduled scheduled; - - /** - * Logical fsync-like operation: anything written to the store prior to the invocation of this method - * must be durable once the AsyncResult completes successfully. That is, a restart of the node must - * restore the DataStore to a state on or after the point at which snapshot was invoked. - */ - @Override - public void ensureDurable(CommandStore commandStore, Ranges ranges, RedundantBefore onSuccess) + public void ensureDurable(CommandStore commandStore, RedundantBefore onSuccess, int flags) { if (commandStore.node().isReplaying()) return; snapshot(false).invoke((success, fail) -> { - if (fail == null) commandStore.execute((PreLoadContext.Empty)()->"Report DataStore Durable", safeStore -> safeStore.upsertRedundantBefore(onSuccess)); + if (fail == null) commandStore.execute((PreLoadContext.Empty)()->"Report DataStore Durable", safeStore -> safeStore.reportDurable(onSuccess, flags)); }); } diff --git a/accord-core/src/test/java/accord/impl/mock/MockStore.java b/accord-core/src/test/java/accord/impl/mock/MockStore.java index 22a49886..e75b1362 100644 --- a/accord-core/src/test/java/accord/impl/mock/MockStore.java +++ b/accord-core/src/test/java/accord/impl/mock/MockStore.java @@ -159,8 +159,8 @@ public class MockStore implements DataStore } @Override - public void ensureDurable(CommandStore commandStore, Ranges ranges, RedundantBefore reportOnSuccess) + public void ensureDurable(CommandStore commandStore, RedundantBefore reportOnSuccess, int flags) { - commandStore.execute((PreLoadContext.Empty)() -> "Report CommandStore Durable", safeStore -> safeStore.upsertRedundantBefore(reportOnSuccess)); + commandStore.execute((PreLoadContext.Empty)() -> "Report CommandStore Durable", safeStore -> safeStore.reportDurable(reportOnSuccess, flags)); } } diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java b/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java index 7e58289e..aa2d9892 100644 --- a/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java +++ b/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java @@ -394,7 +394,7 @@ public class Cluster implements Scheduler @Override public Command.Minimal loadMinimal(int commandStoreId, TxnId txnId, RedundantBefore redundantBefore, DurableBefore durableBefore) { throw new IllegalStateException("Not impelemented"); } @Override public Command.MinimalWithDeps loadMinimalWithDeps(int store, TxnId txnId, RedundantBefore redundantBefore, DurableBefore durableBefore) { throw new IllegalStateException("Not impelemented"); } @Override public void saveCommand(int store, CommandUpdate value, Runnable onFlush) { throw new IllegalStateException("Not impelemented"); } - @Override public List<TopologyUpdate> replayTopologies() { throw new IllegalStateException("Not impelemented"); } + @Override public List<TopologyUpdate> loadTopologies() { throw new IllegalStateException("Not impelemented"); } @Override public void saveTopology(TopologyUpdate topologyUpdate, Runnable onFlush) { throw new IllegalStateException("Not impelemented"); } @Override public void purge(CommandStores commandStores, EpochSupplier minEpoch) { throw new IllegalStateException("Not impelemented"); } @Override public boolean replay(CommandStores commandStores, Object param) { throw new IllegalStateException("Not impelemented"); } diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromStore.java b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromStore.java index 241100c5..75ad833b 100644 --- a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromStore.java +++ b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromStore.java @@ -62,7 +62,7 @@ public class MaelstromStore implements DataStore } @Override - public void ensureDurable(CommandStore commandStore, Ranges ranges, RedundantBefore reportOnSuccess) + public void ensureDurable(CommandStore commandStore, RedundantBefore reportOnSuccess, int flags) { - commandStore.execute((PreLoadContext.Empty)() -> "Report CommandStore Durable", safeStore -> safeStore.upsertRedundantBefore(reportOnSuccess)); + commandStore.execute((PreLoadContext.Empty)() -> "Report CommandStore Durable", safeStore -> safeStore.reportDurable(reportOnSuccess, flags)); }} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
