This is an automated email from the ASF dual-hosted git repository.

benedict pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f09a12da Improve  - Iterate LocalListeners in order, so can query more 
effectively on node  - Refine AbstractReplay.minReplay/shouldReplay
f09a12da is described below

commit f09a12da76bbc195ceb05ad859912aeb0a432dda
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Mon Dec 15 20:58:24 2025 +0000

    Improve
     - Iterate LocalListeners in order, so can query more effectively on node
     - Refine AbstractReplay.minReplay/shouldReplay
    
    patch by Benedict; reviewed by Alex Petrov for CASSANDRA-21804
---
 .../src/main/java/accord/api/LocalListeners.java   |  4 +--
 .../main/java/accord/impl/AbstractReplayer.java    | 30 ++++++++++------------
 .../java/accord/impl/AbstractSafeCommandStore.java |  2 ++
 .../java/accord/impl/DefaultLocalListeners.java    | 21 ++++++++++-----
 .../java/accord/impl/InMemoryCommandStore.java     |  3 ++-
 .../src/main/java/accord/local/CommandStore.java   |  5 ++--
 .../src/main/java/accord/local/CommandStores.java  |  3 +--
 .../src/main/java/accord/local/Commands.java       | 16 +++++++++---
 .../java/accord/utils/ReducingIntervalMap.java     | 11 ++++++++
 .../main/java/accord/utils/ReducingRangeMap.java   | 10 ++++++++
 10 files changed, 71 insertions(+), 34 deletions(-)

diff --git a/accord-core/src/main/java/accord/api/LocalListeners.java 
b/accord-core/src/main/java/accord/api/LocalListeners.java
index f7c08963..caa51ec7 100644
--- a/accord-core/src/main/java/accord/api/LocalListeners.java
+++ b/accord-core/src/main/java/accord/api/LocalListeners.java
@@ -84,9 +84,9 @@ public interface LocalListeners
     void notify(SafeCommandStore safeStore, SafeCommand safeCommand, Command 
prev);
 
     /**
-     * Erase all listeners for transactions with a lower {@codfe TxnId} than 
{@code clearBefore}.
+     * Erase all listeners for transactions with a lower {@code TxnId} than 
{@code clearBefore}.
      */
-    void clearBefore(CommandStore safeStore, TxnId clearBefore);
+    void clearBefore(TxnId clearBefore);
 
     /**
      * Erase all listeners; used only for resetting state
diff --git a/accord-core/src/main/java/accord/impl/AbstractReplayer.java 
b/accord-core/src/main/java/accord/impl/AbstractReplayer.java
index 9fd37f57..8c5c0f90 100644
--- a/accord-core/src/main/java/accord/impl/AbstractReplayer.java
+++ b/accord-core/src/main/java/accord/impl/AbstractReplayer.java
@@ -18,11 +18,14 @@
 
 package accord.impl;
 
+import java.util.Objects;
+
+import javax.annotation.Nullable;
+
 import accord.api.Journal;
 import accord.local.Command;
 import accord.local.CommandStore;
 import accord.local.Commands;
-import accord.local.PreLoadContext;
 import accord.local.RedundantBefore;
 import accord.local.SafeCommand;
 import accord.local.SafeCommandStore;
@@ -42,13 +45,14 @@ import static accord.primitives.Txn.Kind.Write;
 
 public abstract class AbstractReplayer implements Journal.Replayer
 {
-    final RedundantBefore redundantBefore;
-    final TxnId minReplay;
+    public final RedundantBefore redundantBefore;
+    public final TxnId minReplay;
 
-    protected AbstractReplayer(RedundantBefore redundantBefore)
+    protected AbstractReplayer(CommandStore commandStore, @Nullable TxnId 
minReplay)
     {
-        this.redundantBefore = redundantBefore;
-        this.minReplay = TxnId.noneIfNull(redundantBefore.foldl((b, v) -> 
TxnId.nonNullOrMin(v, TxnId.min(b.maxBound(LOCALLY_DURABLE_TO_DATA_STORE), 
b.maxBound(LOCALLY_DURABLE_TO_COMMAND_STORE))), null, ignore -> false));
+        this.redundantBefore = commandStore.unsafeGetRedundantBefore();
+        
Invariants.require(redundantBefore.ranges(Objects::nonNull).containsAll(commandStore.unsafeGetRangesForEpoch().all()));
+        this.minReplay = TxnId.noneIfNull(redundantBefore.foldl((b, v) -> 
TxnId.nonNullOrMin(v, b.maxBoundBoth(LOCALLY_DURABLE_TO_DATA_STORE, 
LOCALLY_DURABLE_TO_COMMAND_STORE)), minReplay, ignore -> false));
     }
 
     protected boolean maybeShouldReplay(TxnId txnId)
@@ -60,7 +64,7 @@ public abstract class AbstractReplayer implements 
Journal.Replayer
     {
         Participants<?> search = participants.route();
         if (search == null) search = participants.hasTouched();
-        return redundantBefore.foldlWithDefault(search, (b, v, id) -> v || 
b.maxBoundBoth(LOCALLY_DURABLE_TO_COMMAND_STORE, 
LOCALLY_DURABLE_TO_DATA_STORE).compareTo(id) <= 0, RedundantBefore.Bounds.NONE, 
false, txnId, i -> i);
+        return redundantBefore.foldl(search, (b, v, id) -> v || 
b.maxBoundBoth(LOCALLY_DURABLE_TO_COMMAND_STORE, 
LOCALLY_DURABLE_TO_DATA_STORE).compareTo(id) <= 0, false, txnId, i -> i);
     }
 
     protected void initialiseState(SafeCommandStore safeStore, TxnId txnId)
@@ -76,16 +80,10 @@ public abstract class AbstractReplayer implements 
Journal.Replayer
             {
                 if (command.txnId().is(Write))
                 {
-                    CommandStore unsafeStore = safeStore.commandStore();
-                    Participants<?> executes = 
command.participants().stillExecutes();
-                    command.writes()
-                           .apply(safeStore, executes, command.partialTxn())
-                           .invoke(() -> 
unsafeStore.chain(PreLoadContext.contextFor(txnId, "Replay"), ss -> {
-                               Commands.postApply(ss, txnId, true);
-                           }))
-                           .begin(safeStore.agent());
+                    Commands.applyChain(safeStore, command)
+                            .begin(safeStore.agent());
                 }
-                else Invariants.expect(command.hasBeen(Applied));
+                else Invariants.expect(command.hasBeen(Applied), "%s is 
Applying but is not a Write transaction", txnId);
             }
         }
         safeCommand.update(safeStore, safeCommand.current(), true);
diff --git 
a/accord-core/src/main/java/accord/impl/AbstractSafeCommandStore.java 
b/accord-core/src/main/java/accord/impl/AbstractSafeCommandStore.java
index 84fa5b67..24d52839 100644
--- a/accord-core/src/main/java/accord/impl/AbstractSafeCommandStore.java
+++ b/accord-core/src/main/java/accord/impl/AbstractSafeCommandStore.java
@@ -224,6 +224,8 @@ extends SafeCommandStore
     @Override
     public final void upsertRedundantBefore(RedundantBefore addRedundantBefore)
     {
+        // TODO (required): this is potentially unsafe: if the update is not 
persisted for some reason (due to some later exception)
+        //   we can continue with a stale redundantBefore
         // TODO (expected): fix RedundantBefore sorting issue and switch to 
upsert mode
         ensureFieldUpdates().newRedundantBefore = 
RedundantBefore.merge(redundantBefore(), addRedundantBefore);
         unsafeUpsertRedundantBefore(addRedundantBefore);
diff --git a/accord-core/src/main/java/accord/impl/DefaultLocalListeners.java 
b/accord-core/src/main/java/accord/impl/DefaultLocalListeners.java
index bb56f13f..9081b6b9 100644
--- a/accord-core/src/main/java/accord/impl/DefaultLocalListeners.java
+++ b/accord-core/src/main/java/accord/impl/DefaultLocalListeners.java
@@ -123,6 +123,9 @@ public class DefaultLocalListeners implements LocalListeners
 
     /*
      * A list that allows duplicates and sorts and removes duplicates on 
notify and when the list would have to resize
+     * TODO (expected): save time and space by:
+     *      - encoding SaveStatus as byte
+     *      - encoding listeners as any of: single TxnId, array of TxnId (for 
small size), btree for a large collection
      */
     static class TxnListeners extends TxnId implements PreLoadContext
     {
@@ -158,13 +161,13 @@ public class DefaultLocalListeners implements 
LocalListeners
             return c;
         }
 
-        void notify(NotifySink notifySink, SafeCommandStore safeStore, 
SafeCommand safeCommand)
+        void notify(DefaultLocalListeners owner, SafeCommandStore safeStore, 
SafeCommand safeCommand)
         {
             trim();
             for (int i = 0 ; i < count ; ++i)
             {
                 TxnId listenerId = listeners[i];
-                notifySink.notify(safeStore, safeCommand, listenerId);
+                owner.notifySink.notify(safeStore, safeCommand, listenerId);
             }
         }
 
@@ -173,6 +176,9 @@ public class DefaultLocalListeners implements LocalListeners
          */
         private int trim()
         {
+            if (count == 0)
+                return 0;
+
             Arrays.sort(listeners, 0, count);
             int removedCount = 0;
             for (int i = 1 ; i < count ; ++i)
@@ -487,7 +493,7 @@ public class DefaultLocalListeners implements LocalListeners
         {
             TxnListeners notify = BTree.findByIndex(txnListeners, start);
             Invariants.require(txnId.equals(notify));
-            notify.notify(notifySink, safeStore, safeCommand);
+            notify.notify(this, safeStore, safeCommand);
             if (this.txnListeners != txnListeners)
             {
                 // listener registrations were changed by this listener's 
notify invocation, so reset our cursor
@@ -519,7 +525,7 @@ public class DefaultLocalListeners implements LocalListeners
     }
 
     @Override
-    public void clearBefore(CommandStore commandStore, TxnId clearBefore)
+    public void clearBefore(TxnId clearBefore)
     {
         while (!BTree.isEmpty(txnListeners))
         {
@@ -532,7 +538,7 @@ public class DefaultLocalListeners implements LocalListeners
                 Command command = safeCommand.current();
                 SaveStatus saveStatus = command.saveStatus();
                 Invariants.require(saveStatus.compareTo(entry.await) >= 0 || 
command.participants().stillOwns().isEmpty());
-                entry.notify(notifySink, safeStore, safeCommand);
+                entry.notify(this, safeStore, safeCommand);
             }, commandStore.agent());
             txnListeners = BTreeRemoval.remove(txnListeners, 
TxnListeners::compareListeners, entry);
         }
@@ -598,8 +604,8 @@ public class DefaultLocalListeners implements LocalListeners
         return () -> {
             return new Iterator<>()
             {
-                Object[] snapshot = txnListeners;
-                Iterator<TxnListeners> iter = BTree.slice(snapshot, 
TxnListeners::compareListeners, BTree.Dir.ASC);
+                Object[] snapshot;
+                Iterator<TxnListeners> iter;
                 TxnListeners cur;
                 TxnId[] buffer = TxnId.NO_TXNIDS;
                 int bufferIndex, bufferCount, maxBufferCount;
@@ -630,6 +636,7 @@ public class DefaultLocalListeners implements LocalListeners
 
                         cur = iter.next();
                         bufferIndex = 0;
+                        cur.trim();
                         bufferCount = cur.count;
                         if (bufferCount == 0)
                             continue;
diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java 
b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
index 13a688c9..0e3b7956 100644
--- a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
+++ b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
@@ -1228,7 +1228,8 @@ public abstract class InMemoryCommandStore extends 
CommandStore
 
         private CommandReplayer(InMemoryCommandStore commandStore)
         {
-            super(commandStore.unsafeGetRedundantBefore());
+            // TODO (required): we shouldn't be providing TxnId.NONE here, we 
need to standardise on querying journal for data missing from 
InMemoryCommandStore
+            super(commandStore, TxnId.NONE);
             this.commandStore = commandStore;
         }
 
diff --git a/accord-core/src/main/java/accord/local/CommandStore.java 
b/accord-core/src/main/java/accord/local/CommandStore.java
index 13835a6e..d321bc67 100644
--- a/accord-core/src/main/java/accord/local/CommandStore.java
+++ b/accord-core/src/main/java/accord/local/CommandStore.java
@@ -115,6 +115,7 @@ public abstract class CommandStore implements 
AbstractAsyncExecutor, SequentialA
         }
     }
 
+    // TODO (required): we only REMOVE ranges now, so it should be possible to 
simplify this
     public static class EpochUpdateHolder extends AtomicReference<EpochUpdate>
     {
         // TODO (desired): can better encapsulate by accepting only the 
newRangesForEpoch and deriving the add/remove ranges
@@ -371,7 +372,7 @@ public abstract class CommandStore implements 
AbstractAsyncExecutor, SequentialA
 
     protected void unsafeUpsertRedundantBefore(RedundantBefore 
addRedundantBefore)
     {
-        redundantBefore = RedundantBefore.merge(redundantBefore, 
addRedundantBefore);
+        unsafeSetRedundantBefore(RedundantBefore.merge(redundantBefore, 
addRedundantBefore));
     }
 
     @VisibleForTesting
@@ -883,7 +884,7 @@ public abstract class CommandStore implements 
AbstractAsyncExecutor, SequentialA
         TxnId clearWaitingBefore = 
redundantBefore.minShardAndLocallyAppliedBefore();
         TxnId clearAllBefore = TxnId.min(clearWaitingBefore, 
durableBefore().min.quorumBefore);
         progressLog.clearBefore(safeStore, clearWaitingBefore, clearAllBefore);
-        listeners.clearBefore(this, clearWaitingBefore);
+        listeners.clearBefore(clearWaitingBefore);
     }
 
     @VisibleForTesting
diff --git a/accord-core/src/main/java/accord/local/CommandStores.java 
b/accord-core/src/main/java/accord/local/CommandStores.java
index ca959c6a..9a6e2d29 100644
--- a/accord-core/src/main/java/accord/local/CommandStores.java
+++ b/accord-core/src/main/java/accord/local/CommandStores.java
@@ -1044,13 +1044,12 @@ public abstract class CommandStores implements 
AsyncExecutorFactory
         return update.bootstrap;
     }
 
-    public synchronized void shutdown()
+    public void shutdown()
     {
         for (ShardHolder shard : current.shards)
             shard.store.shutdown();
     }
 
-
     @Override
     public AsyncExecutor someExecutor()
     {
diff --git a/accord-core/src/main/java/accord/local/Commands.java 
b/accord-core/src/main/java/accord/local/Commands.java
index f183d29c..3154e7de 100644
--- a/accord-core/src/main/java/accord/local/Commands.java
+++ b/accord-core/src/main/java/accord/local/Commands.java
@@ -710,7 +710,7 @@ public class Commands
         @Override public String reason() { return "Post Apply"; }
     }
 
-    public static AsyncChain<Void> applyChain(SafeCommandStore safeStore, 
Command.Executed command)
+    public static AsyncChain<Void> applyChain(SafeCommandStore safeStore, 
Command command)
     {
         // TODO (required): make sure we are correctly handling (esp. C* side 
with validation logic) executing a transaction
         //  that was pre-bootstrap for some range (so redundant and we may 
have gone ahead of), but had to be executed locally
@@ -720,9 +720,17 @@ public class Commands
         //noinspection DataFlowIssue
         safeStore = safeStore; // disable reuse
         Participants<?> executes = command.participants().stillExecutes(); // 
including any keys we aren't writing
-        return command.writes()
-                      .apply(safeStore, executes, command.partialTxn())
-                      .then(head -> new PostApply<>(head, unsafeStore, txnId, 
executes, false));
+        if (executes.isEmpty())
+        {
+            postApply(safeStore, txnId, false);
+            return AsyncChains.success(null);
+        }
+        else
+        {
+            return command.writes()
+                          .apply(safeStore, executes, command.partialTxn())
+                          .then(head -> new PostApply<>(head, unsafeStore, 
txnId, executes, false));
+        }
     }
 
     public static boolean maybeExecute(SafeCommandStore safeStore, SafeCommand 
safeCommand, boolean alwaysNotifyListeners, boolean notifyWaitingOn)
diff --git a/accord-core/src/main/java/accord/utils/ReducingIntervalMap.java 
b/accord-core/src/main/java/accord/utils/ReducingIntervalMap.java
index b4281242..bb88c0f0 100644
--- a/accord-core/src/main/java/accord/utils/ReducingIntervalMap.java
+++ b/accord-core/src/main/java/accord/utils/ReducingIntervalMap.java
@@ -107,6 +107,17 @@ public class ReducingIntervalMap<K extends Comparable<? 
super K>, V>
         return accumulator;
     }
 
+    public <V2> V2 foldlWithDefault(BiFunction<V, V2, V2> reduce, V ifNull, V2 
accumulator, Predicate<V2> terminate)
+    {
+        for (V value : values)
+        {
+            accumulator = reduce.apply(value == null ? ifNull : value, 
accumulator);
+            if (terminate.test(accumulator))
+                break;
+        }
+        return accumulator;
+    }
+
     public <V2> V2 foldlWithBounds(QuadFunction<V, V2, K, K, V2> fold, V2 
accumulator, Predicate<V2> terminate)
     {
         for (int i = 0 ; i < values.length ; ++i)
diff --git a/accord-core/src/main/java/accord/utils/ReducingRangeMap.java 
b/accord-core/src/main/java/accord/utils/ReducingRangeMap.java
index 1b6eb4c4..409eb17d 100644
--- a/accord-core/src/main/java/accord/utils/ReducingRangeMap.java
+++ b/accord-core/src/main/java/accord/utils/ReducingRangeMap.java
@@ -22,6 +22,8 @@ import accord.primitives.*;
 
 import java.util.Arrays;
 import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.function.IntFunction;
 import java.util.function.Predicate;
 
 import static accord.utils.SortedArrays.Search.FAST;
@@ -488,4 +490,12 @@ public class ReducingRangeMap<V> extends 
ReducingIntervalMap<RoutingKey, V>
             ranges = Arrays.copyOf(ranges, count);
         return Ranges.ofSortedAndDeoverlapped(ranges);
     }
+
+    public <V2> ReducingRangeMap<V2> map(Function<V, V2> map, 
IntFunction<V2[]> allocator)
+    {
+        V2[] output = allocator.apply(values.length);
+        for (int i = 0 ; i < values.length ; ++i)
+            output[i] = map.apply(values[i]);
+        return new ReducingRangeMap<>(inclusiveEnds, starts, output);
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to