This is an automated email from the ASF dual-hosted git repository.
benedict pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
The following commit(s) were added to refs/heads/trunk by this push:
new f09a12da Improve - Iterate LocalListeners in order, so can query more
effectively on node - Refine AbstractReplay.minReplay/shouldReplay
f09a12da is described below
commit f09a12da76bbc195ceb05ad859912aeb0a432dda
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Mon Dec 15 20:58:24 2025 +0000
Improve
- Iterate LocalListeners in order, so can query more effectively on node
- Refine AbstractReplay.minReplay/shouldReplay
patch by Benedict; reviewed by Alex Petrov for CASSANDRA-21804
---
.../src/main/java/accord/api/LocalListeners.java | 4 +--
.../main/java/accord/impl/AbstractReplayer.java | 30 ++++++++++------------
.../java/accord/impl/AbstractSafeCommandStore.java | 2 ++
.../java/accord/impl/DefaultLocalListeners.java | 21 ++++++++++-----
.../java/accord/impl/InMemoryCommandStore.java | 3 ++-
.../src/main/java/accord/local/CommandStore.java | 5 ++--
.../src/main/java/accord/local/CommandStores.java | 3 +--
.../src/main/java/accord/local/Commands.java | 16 +++++++++---
.../java/accord/utils/ReducingIntervalMap.java | 11 ++++++++
.../main/java/accord/utils/ReducingRangeMap.java | 10 ++++++++
10 files changed, 71 insertions(+), 34 deletions(-)
diff --git a/accord-core/src/main/java/accord/api/LocalListeners.java
b/accord-core/src/main/java/accord/api/LocalListeners.java
index f7c08963..caa51ec7 100644
--- a/accord-core/src/main/java/accord/api/LocalListeners.java
+++ b/accord-core/src/main/java/accord/api/LocalListeners.java
@@ -84,9 +84,9 @@ public interface LocalListeners
void notify(SafeCommandStore safeStore, SafeCommand safeCommand, Command
prev);
/**
- * Erase all listeners for transactions with a lower {@codfe TxnId} than
{@code clearBefore}.
+ * Erase all listeners for transactions with a lower {@code TxnId} than
{@code clearBefore}.
*/
- void clearBefore(CommandStore safeStore, TxnId clearBefore);
+ void clearBefore(TxnId clearBefore);
/**
* Erase all listeners; used only for resetting state
diff --git a/accord-core/src/main/java/accord/impl/AbstractReplayer.java
b/accord-core/src/main/java/accord/impl/AbstractReplayer.java
index 9fd37f57..8c5c0f90 100644
--- a/accord-core/src/main/java/accord/impl/AbstractReplayer.java
+++ b/accord-core/src/main/java/accord/impl/AbstractReplayer.java
@@ -18,11 +18,14 @@
package accord.impl;
+import java.util.Objects;
+
+import javax.annotation.Nullable;
+
import accord.api.Journal;
import accord.local.Command;
import accord.local.CommandStore;
import accord.local.Commands;
-import accord.local.PreLoadContext;
import accord.local.RedundantBefore;
import accord.local.SafeCommand;
import accord.local.SafeCommandStore;
@@ -42,13 +45,14 @@ import static accord.primitives.Txn.Kind.Write;
public abstract class AbstractReplayer implements Journal.Replayer
{
- final RedundantBefore redundantBefore;
- final TxnId minReplay;
+ public final RedundantBefore redundantBefore;
+ public final TxnId minReplay;
- protected AbstractReplayer(RedundantBefore redundantBefore)
+ protected AbstractReplayer(CommandStore commandStore, @Nullable TxnId
minReplay)
{
- this.redundantBefore = redundantBefore;
- this.minReplay = TxnId.noneIfNull(redundantBefore.foldl((b, v) ->
TxnId.nonNullOrMin(v, TxnId.min(b.maxBound(LOCALLY_DURABLE_TO_DATA_STORE),
b.maxBound(LOCALLY_DURABLE_TO_COMMAND_STORE))), null, ignore -> false));
+ this.redundantBefore = commandStore.unsafeGetRedundantBefore();
+
Invariants.require(redundantBefore.ranges(Objects::nonNull).containsAll(commandStore.unsafeGetRangesForEpoch().all()));
+ this.minReplay = TxnId.noneIfNull(redundantBefore.foldl((b, v) ->
TxnId.nonNullOrMin(v, b.maxBoundBoth(LOCALLY_DURABLE_TO_DATA_STORE,
LOCALLY_DURABLE_TO_COMMAND_STORE)), minReplay, ignore -> false));
}
protected boolean maybeShouldReplay(TxnId txnId)
@@ -60,7 +64,7 @@ public abstract class AbstractReplayer implements
Journal.Replayer
{
Participants<?> search = participants.route();
if (search == null) search = participants.hasTouched();
- return redundantBefore.foldlWithDefault(search, (b, v, id) -> v ||
b.maxBoundBoth(LOCALLY_DURABLE_TO_COMMAND_STORE,
LOCALLY_DURABLE_TO_DATA_STORE).compareTo(id) <= 0, RedundantBefore.Bounds.NONE,
false, txnId, i -> i);
+ return redundantBefore.foldl(search, (b, v, id) -> v ||
b.maxBoundBoth(LOCALLY_DURABLE_TO_COMMAND_STORE,
LOCALLY_DURABLE_TO_DATA_STORE).compareTo(id) <= 0, false, txnId, i -> i);
}
protected void initialiseState(SafeCommandStore safeStore, TxnId txnId)
@@ -76,16 +80,10 @@ public abstract class AbstractReplayer implements
Journal.Replayer
{
if (command.txnId().is(Write))
{
- CommandStore unsafeStore = safeStore.commandStore();
- Participants<?> executes =
command.participants().stillExecutes();
- command.writes()
- .apply(safeStore, executes, command.partialTxn())
- .invoke(() ->
unsafeStore.chain(PreLoadContext.contextFor(txnId, "Replay"), ss -> {
- Commands.postApply(ss, txnId, true);
- }))
- .begin(safeStore.agent());
+ Commands.applyChain(safeStore, command)
+ .begin(safeStore.agent());
}
- else Invariants.expect(command.hasBeen(Applied));
+ else Invariants.expect(command.hasBeen(Applied), "%s is
Applying but is not a Write transaction", txnId);
}
}
safeCommand.update(safeStore, safeCommand.current(), true);
diff --git
a/accord-core/src/main/java/accord/impl/AbstractSafeCommandStore.java
b/accord-core/src/main/java/accord/impl/AbstractSafeCommandStore.java
index 84fa5b67..24d52839 100644
--- a/accord-core/src/main/java/accord/impl/AbstractSafeCommandStore.java
+++ b/accord-core/src/main/java/accord/impl/AbstractSafeCommandStore.java
@@ -224,6 +224,8 @@ extends SafeCommandStore
@Override
public final void upsertRedundantBefore(RedundantBefore addRedundantBefore)
{
+ // TODO (required): this is potentially unsafe: if the update is not
persisted for some reason (due to some later exception)
+ // we can continue with a stale redundantBefore
// TODO (expected): fix RedundantBefore sorting issue and switch to
upsert mode
ensureFieldUpdates().newRedundantBefore =
RedundantBefore.merge(redundantBefore(), addRedundantBefore);
unsafeUpsertRedundantBefore(addRedundantBefore);
diff --git a/accord-core/src/main/java/accord/impl/DefaultLocalListeners.java
b/accord-core/src/main/java/accord/impl/DefaultLocalListeners.java
index bb56f13f..9081b6b9 100644
--- a/accord-core/src/main/java/accord/impl/DefaultLocalListeners.java
+++ b/accord-core/src/main/java/accord/impl/DefaultLocalListeners.java
@@ -123,6 +123,9 @@ public class DefaultLocalListeners implements LocalListeners
/*
* A list that allows duplicates and sorts and removes duplicates on
notify and when the list would have to resize
+ * TODO (expected): save time and space by:
+ * - encoding SaveStatus as byte
+ * - encoding listeners as any of: single TxnId, array of TxnId (for
small size), btree for a large collection
*/
static class TxnListeners extends TxnId implements PreLoadContext
{
@@ -158,13 +161,13 @@ public class DefaultLocalListeners implements
LocalListeners
return c;
}
- void notify(NotifySink notifySink, SafeCommandStore safeStore,
SafeCommand safeCommand)
+ void notify(DefaultLocalListeners owner, SafeCommandStore safeStore,
SafeCommand safeCommand)
{
trim();
for (int i = 0 ; i < count ; ++i)
{
TxnId listenerId = listeners[i];
- notifySink.notify(safeStore, safeCommand, listenerId);
+ owner.notifySink.notify(safeStore, safeCommand, listenerId);
}
}
@@ -173,6 +176,9 @@ public class DefaultLocalListeners implements LocalListeners
*/
private int trim()
{
+ if (count == 0)
+ return 0;
+
Arrays.sort(listeners, 0, count);
int removedCount = 0;
for (int i = 1 ; i < count ; ++i)
@@ -487,7 +493,7 @@ public class DefaultLocalListeners implements LocalListeners
{
TxnListeners notify = BTree.findByIndex(txnListeners, start);
Invariants.require(txnId.equals(notify));
- notify.notify(notifySink, safeStore, safeCommand);
+ notify.notify(this, safeStore, safeCommand);
if (this.txnListeners != txnListeners)
{
// listener registrations were changed by this listener's
notify invocation, so reset our cursor
@@ -519,7 +525,7 @@ public class DefaultLocalListeners implements LocalListeners
}
@Override
- public void clearBefore(CommandStore commandStore, TxnId clearBefore)
+ public void clearBefore(TxnId clearBefore)
{
while (!BTree.isEmpty(txnListeners))
{
@@ -532,7 +538,7 @@ public class DefaultLocalListeners implements LocalListeners
Command command = safeCommand.current();
SaveStatus saveStatus = command.saveStatus();
Invariants.require(saveStatus.compareTo(entry.await) >= 0 ||
command.participants().stillOwns().isEmpty());
- entry.notify(notifySink, safeStore, safeCommand);
+ entry.notify(this, safeStore, safeCommand);
}, commandStore.agent());
txnListeners = BTreeRemoval.remove(txnListeners,
TxnListeners::compareListeners, entry);
}
@@ -598,8 +604,8 @@ public class DefaultLocalListeners implements LocalListeners
return () -> {
return new Iterator<>()
{
- Object[] snapshot = txnListeners;
- Iterator<TxnListeners> iter = BTree.slice(snapshot,
TxnListeners::compareListeners, BTree.Dir.ASC);
+ Object[] snapshot;
+ Iterator<TxnListeners> iter;
TxnListeners cur;
TxnId[] buffer = TxnId.NO_TXNIDS;
int bufferIndex, bufferCount, maxBufferCount;
@@ -630,6 +636,7 @@ public class DefaultLocalListeners implements LocalListeners
cur = iter.next();
bufferIndex = 0;
+ cur.trim();
bufferCount = cur.count;
if (bufferCount == 0)
continue;
diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
index 13a688c9..0e3b7956 100644
--- a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
+++ b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
@@ -1228,7 +1228,8 @@ public abstract class InMemoryCommandStore extends
CommandStore
private CommandReplayer(InMemoryCommandStore commandStore)
{
- super(commandStore.unsafeGetRedundantBefore());
+ // TODO (required): we shouldn't be providing TxnId.NONE here, we
need to standardise on querying journal for data missing from
InMemoryCommandStore
+ super(commandStore, TxnId.NONE);
this.commandStore = commandStore;
}
diff --git a/accord-core/src/main/java/accord/local/CommandStore.java
b/accord-core/src/main/java/accord/local/CommandStore.java
index 13835a6e..d321bc67 100644
--- a/accord-core/src/main/java/accord/local/CommandStore.java
+++ b/accord-core/src/main/java/accord/local/CommandStore.java
@@ -115,6 +115,7 @@ public abstract class CommandStore implements
AbstractAsyncExecutor, SequentialA
}
}
+ // TODO (required): we only REMOVE ranges now, so it should be possible to
simplify this
public static class EpochUpdateHolder extends AtomicReference<EpochUpdate>
{
// TODO (desired): can better encapsulate by accepting only the
newRangesForEpoch and deriving the add/remove ranges
@@ -371,7 +372,7 @@ public abstract class CommandStore implements
AbstractAsyncExecutor, SequentialA
protected void unsafeUpsertRedundantBefore(RedundantBefore
addRedundantBefore)
{
- redundantBefore = RedundantBefore.merge(redundantBefore,
addRedundantBefore);
+ unsafeSetRedundantBefore(RedundantBefore.merge(redundantBefore,
addRedundantBefore));
}
@VisibleForTesting
@@ -883,7 +884,7 @@ public abstract class CommandStore implements
AbstractAsyncExecutor, SequentialA
TxnId clearWaitingBefore =
redundantBefore.minShardAndLocallyAppliedBefore();
TxnId clearAllBefore = TxnId.min(clearWaitingBefore,
durableBefore().min.quorumBefore);
progressLog.clearBefore(safeStore, clearWaitingBefore, clearAllBefore);
- listeners.clearBefore(this, clearWaitingBefore);
+ listeners.clearBefore(clearWaitingBefore);
}
@VisibleForTesting
diff --git a/accord-core/src/main/java/accord/local/CommandStores.java
b/accord-core/src/main/java/accord/local/CommandStores.java
index ca959c6a..9a6e2d29 100644
--- a/accord-core/src/main/java/accord/local/CommandStores.java
+++ b/accord-core/src/main/java/accord/local/CommandStores.java
@@ -1044,13 +1044,12 @@ public abstract class CommandStores implements
AsyncExecutorFactory
return update.bootstrap;
}
- public synchronized void shutdown()
+ public void shutdown()
{
for (ShardHolder shard : current.shards)
shard.store.shutdown();
}
-
@Override
public AsyncExecutor someExecutor()
{
diff --git a/accord-core/src/main/java/accord/local/Commands.java
b/accord-core/src/main/java/accord/local/Commands.java
index f183d29c..3154e7de 100644
--- a/accord-core/src/main/java/accord/local/Commands.java
+++ b/accord-core/src/main/java/accord/local/Commands.java
@@ -710,7 +710,7 @@ public class Commands
@Override public String reason() { return "Post Apply"; }
}
- public static AsyncChain<Void> applyChain(SafeCommandStore safeStore,
Command.Executed command)
+ public static AsyncChain<Void> applyChain(SafeCommandStore safeStore,
Command command)
{
// TODO (required): make sure we are correctly handling (esp. C* side
with validation logic) executing a transaction
// that was pre-bootstrap for some range (so redundant and we may
have gone ahead of), but had to be executed locally
@@ -720,9 +720,17 @@ public class Commands
//noinspection DataFlowIssue
safeStore = safeStore; // disable reuse
Participants<?> executes = command.participants().stillExecutes(); //
including any keys we aren't writing
- return command.writes()
- .apply(safeStore, executes, command.partialTxn())
- .then(head -> new PostApply<>(head, unsafeStore, txnId,
executes, false));
+ if (executes.isEmpty())
+ {
+ postApply(safeStore, txnId, false);
+ return AsyncChains.success(null);
+ }
+ else
+ {
+ return command.writes()
+ .apply(safeStore, executes, command.partialTxn())
+ .then(head -> new PostApply<>(head, unsafeStore,
txnId, executes, false));
+ }
}
public static boolean maybeExecute(SafeCommandStore safeStore, SafeCommand
safeCommand, boolean alwaysNotifyListeners, boolean notifyWaitingOn)
diff --git a/accord-core/src/main/java/accord/utils/ReducingIntervalMap.java
b/accord-core/src/main/java/accord/utils/ReducingIntervalMap.java
index b4281242..bb88c0f0 100644
--- a/accord-core/src/main/java/accord/utils/ReducingIntervalMap.java
+++ b/accord-core/src/main/java/accord/utils/ReducingIntervalMap.java
@@ -107,6 +107,17 @@ public class ReducingIntervalMap<K extends Comparable<?
super K>, V>
return accumulator;
}
+ public <V2> V2 foldlWithDefault(BiFunction<V, V2, V2> reduce, V ifNull, V2
accumulator, Predicate<V2> terminate)
+ {
+ for (V value : values)
+ {
+ accumulator = reduce.apply(value == null ? ifNull : value,
accumulator);
+ if (terminate.test(accumulator))
+ break;
+ }
+ return accumulator;
+ }
+
public <V2> V2 foldlWithBounds(QuadFunction<V, V2, K, K, V2> fold, V2
accumulator, Predicate<V2> terminate)
{
for (int i = 0 ; i < values.length ; ++i)
diff --git a/accord-core/src/main/java/accord/utils/ReducingRangeMap.java
b/accord-core/src/main/java/accord/utils/ReducingRangeMap.java
index 1b6eb4c4..409eb17d 100644
--- a/accord-core/src/main/java/accord/utils/ReducingRangeMap.java
+++ b/accord-core/src/main/java/accord/utils/ReducingRangeMap.java
@@ -22,6 +22,8 @@ import accord.primitives.*;
import java.util.Arrays;
import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.function.IntFunction;
import java.util.function.Predicate;
import static accord.utils.SortedArrays.Search.FAST;
@@ -488,4 +490,12 @@ public class ReducingRangeMap<V> extends
ReducingIntervalMap<RoutingKey, V>
ranges = Arrays.copyOf(ranges, count);
return Ranges.ofSortedAndDeoverlapped(ranges);
}
+
+ public <V2> ReducingRangeMap<V2> map(Function<V, V2> map,
IntFunction<V2[]> allocator)
+ {
+ V2[] output = allocator.apply(values.length);
+ for (int i = 0 ; i < values.length ; ++i)
+ output[i] = map.apply(values[i]);
+ return new ReducingRangeMap<>(inclusiveEnds, starts, output);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]