This is an automated email from the ASF dual-hosted git repository.

benedict pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 756f2a1f (Accord) Fix:  - Attempt to fix CommandsForKey 
StackOverflowError (presumed to be reachable via SaveStatus->InternalStatus map 
returning null)  - Bound recursion with SafeCommandStore.tryRecurse()  - 
IndexOutOfBoundsException in CINTIA checkpoint list encoding bounds logic  - 
Maintain MaxDecidedRX to save majority of work when deciding RX that are newer 
than those we have previously agreed  - Introduce IntervalBTree and use in 
CommandsForRanges to limit time spent in cri [...]
756f2a1f is described below

commit 756f2a1f88f079e74c34fd8e0f3bb5aa98760bef
Author: Benedict Elliott Smith <bened...@apache.org>
AuthorDate: Sun Jun 22 12:14:55 2025 +0100

    (Accord) Fix:
     - Attempt to fix CommandsForKey StackOverflowError (presumed to be 
reachable via SaveStatus->InternalStatus map returning null)
     - Bound recursion with SafeCommandStore.tryRecurse()
     - IndexOutOfBoundsException in CINTIA checkpoint list encoding bounds logic
     - Maintain MaxDecidedRX to save majority of work when deciding RX that are 
newer than those we have previously agreed
     - Introduce IntervalBTree and use in CommandsForRanges to limit time spent 
in critical section when there are millions of RX
    
    patch by Benedict; reviewed by Alex Petrov for CASSANDRA-20727
---
 .../accord/coordinate/CoordinateTransaction.java   | 12 +--
 .../accord/impl/AbstractConfigurationService.java  |  6 +-
 .../java/accord/impl/DefaultLocalListeners.java    |  6 +-
 .../java/accord/impl/InMemoryCommandStore.java     |  1 -
 .../src/main/java/accord/local/CommandStore.java   | 20 ++++-
 .../main/java/accord/local/CommandSummaries.java   |  8 +-
 .../src/main/java/accord/local/MaxDecidedRX.java   | 66 ++++++++++++++++
 .../main/java/accord/local/SafeCommandStore.java   | 22 ++++++
 .../main/java/accord/local/cfk/CommandsForKey.java | 91 +++++++++++++---------
 .../src/main/java/accord/local/cfk/NotifySink.java | 17 +++-
 .../main/java/accord/local/cfk/PostProcess.java    | 23 ++++--
 .../src/main/java/accord/local/cfk/Updating.java   | 12 ++-
 .../main/java/accord/topology/TopologyManager.java |  2 +-
 .../java/accord/utils/BTreeReducingRangeMap.java   | 31 ++++++++
 .../java/accord/utils/CheckpointIntervalArray.java | 15 +++-
 .../utils/CheckpointIntervalArrayBuilder.java      | 19 +++--
 16 files changed, 274 insertions(+), 77 deletions(-)

diff --git 
a/accord-core/src/main/java/accord/coordinate/CoordinateTransaction.java 
b/accord-core/src/main/java/accord/coordinate/CoordinateTransaction.java
index 70b4884a..696f487c 100644
--- a/accord-core/src/main/java/accord/coordinate/CoordinateTransaction.java
+++ b/accord-core/src/main/java/accord/coordinate/CoordinateTransaction.java
@@ -195,15 +195,17 @@ public class CoordinateTransaction extends 
CoordinatePreAccept<Result>
                     case PENDING:
                         this.cancel = cancel;
                         this.timeout = timeout;
-                        return;
+                        break;
                     case TIMEOUT:
-                        timeout = null;
+                        if (cancel != null)
+                            cancel.cancel();
+                        break;
                     case SUCCESS:
-                        cancel = null;
+                        if (timeout != null)
+                            timeout.cancel();
+                        break;
                 }
             }
-            if (cancel != null) cancel.cancel();
-            if (timeout != null) timeout.cancel();
         }
 
         @Override
diff --git 
a/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java 
b/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java
index 72b8119d..0c4d0e56 100644
--- a/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java
+++ b/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java
@@ -345,9 +345,9 @@ public abstract class 
AbstractConfigurationService<EpochState extends AbstractCo
         if (epochs.wasTruncated(ready.epoch))
             return;
 
-        ready.metadata.invokeIfSuccess(() -> epochs.acknowledge(ready));
-        ready.coordinate.invokeIfSuccess(() -> 
localSyncComplete(epochs.getOrCreate(ready.epoch).topology, startSync));
-        ready.reads.invokeIfSuccess(() ->  
localBootstrapsComplete(epochs.getOrCreate(ready.epoch).topology));
+        ready.metadata.invokeIfSuccess(() -> 
epochs.acknowledge(ready)).begin(agent);
+        ready.coordinate.invokeIfSuccess(() -> 
localSyncComplete(epochs.getOrCreate(ready.epoch).topology, 
startSync)).begin(agent);
+        ready.reads.invokeIfSuccess(() ->  
localBootstrapsComplete(epochs.getOrCreate(ready.epoch).topology)).begin(agent);
     }
 
     protected void topologyUpdatePostListenerNotify(Topology topology) {}
diff --git a/accord-core/src/main/java/accord/impl/DefaultLocalListeners.java 
b/accord-core/src/main/java/accord/impl/DefaultLocalListeners.java
index 25e8b6ea..af7a2b02 100644
--- a/accord-core/src/main/java/accord/impl/DefaultLocalListeners.java
+++ b/accord-core/src/main/java/accord/impl/DefaultLocalListeners.java
@@ -85,7 +85,11 @@ public class DefaultLocalListeners implements LocalListeners
         public void notify(SafeCommandStore safeStore, SafeCommand 
safeCommand, TxnId listenerId)
         {
             SafeCommand listener = 
safeStore.ifLoadedAndInitialised(listenerId);
-            if (listener != null) Commands.listenerUpdate(safeStore, listener, 
safeCommand);
+            if (listener != null && safeStore.tryRecurse())
+            {
+                try { Commands.listenerUpdate(safeStore, listener, 
safeCommand); }
+                finally { safeStore.unrecurse(); }
+            }
             else
             {
                 //noinspection SillyAssignment,ConstantConditions
diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java 
b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
index 5cd5174e..950a7e3f 100644
--- a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
+++ b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
@@ -739,7 +739,6 @@ public abstract class InMemoryCommandStore extends 
CommandStore
             final Kinds kinds = new Kinds(Read, ExclusiveSyncPoint);
             return commandsForRanges = new ByTxnIdSnapshot()
             {
-                @Override public boolean mayContainAny(Txn.Kind kind) { return 
kinds.test(kind); }
                 @Override public NavigableMap<Timestamp, Summary> byTxnId() { 
return summaries; }
             };
         }
diff --git a/accord-core/src/main/java/accord/local/CommandStore.java 
b/accord-core/src/main/java/accord/local/CommandStore.java
index d02f0828..96830653 100644
--- a/accord-core/src/main/java/accord/local/CommandStore.java
+++ b/accord-core/src/main/java/accord/local/CommandStore.java
@@ -146,6 +146,7 @@ public abstract class CommandStore implements 
SequentialAsyncExecutor
     private transient NavigableMap<TxnId, Ranges> bootstrapBeganAt = 
emptyBootstrapBeganAt(); // additive (i.e. once inserted, rolled-over until 
invalidated, and the floor entry contains additions)
     private RedundantBefore redundantBefore = RedundantBefore.EMPTY;
     private MaxConflicts maxConflicts = MaxConflicts.EMPTY;
+    private MaxDecidedRX maxDecidedRX = MaxDecidedRX.EMPTY;
     private int maxConflictsUpdates = 0;
     protected RangesForEpoch rangesForEpoch;
 
@@ -245,6 +246,11 @@ public abstract class CommandStore implements 
SequentialAsyncExecutor
         return rangesForEpoch;
     }
 
+    public MaxDecidedRX unsafeGetMaxDecidedRX()
+    {
+        return maxDecidedRX;
+    }
+
     final void unsafeSetRangesForEpoch(RangesForEpoch newRangesForEpoch)
     {
         rangesForEpoch = nonNull(newRangesForEpoch);
@@ -309,6 +315,11 @@ public abstract class CommandStore implements 
SequentialAsyncExecutor
 
     protected abstract void registerTransitive(SafeCommandStore safeStore, 
RangeDeps deps);
 
+    protected void unsafeSetMaxDecidedRX(MaxDecidedRX newMaxDecidedRX)
+    {
+        this.maxDecidedRX = newMaxDecidedRX;
+    }
+
     protected void unsafeSetRejectBefore(RejectBefore newRejectBefore)
     {
         this.rejectBefore = newRejectBefore;
@@ -453,7 +464,7 @@ public abstract class CommandStore implements 
SequentialAsyncExecutor
         setMaxConflicts(updatedMaxConflicts);
     }
 
-    public final void markExclusiveSyncPoint(SafeCommandStore safeStore, TxnId 
txnId, Ranges ranges)
+    final void markExclusiveSyncPoint(SafeCommandStore safeStore, TxnId txnId, 
Ranges ranges)
     {
         // TODO (desired): narrow ranges to those that are owned
         Invariants.requireArgument(txnId.is(ExclusiveSyncPoint));
@@ -462,7 +473,12 @@ public abstract class CommandStore implements 
SequentialAsyncExecutor
         unsafeSetRejectBefore(newRejectBefore);
     }
 
-    public final void markExclusiveSyncPointLocallyApplied(SafeCommandStore 
safeStore, TxnId txnId, Ranges ranges)
+    final void markExclusiveSyncPointDecided(SafeCommandStore safeStore, TxnId 
txnId, Ranges ranges)
+    {
+        unsafeSetMaxDecidedRX(maxDecidedRX.update(ranges, txnId));
+    }
+
+    final void markExclusiveSyncPointLocallyApplied(SafeCommandStore 
safeStore, TxnId txnId, Ranges ranges)
     {
         // TODO (desired): narrow ranges to those that are owned
         Invariants.requireArgument(txnId.is(ExclusiveSyncPoint));
diff --git a/accord-core/src/main/java/accord/local/CommandSummaries.java 
b/accord-core/src/main/java/accord/local/CommandSummaries.java
index 24afd6e6..f9724155 100644
--- a/accord-core/src/main/java/accord/local/CommandSummaries.java
+++ b/accord-core/src/main/java/accord/local/CommandSummaries.java
@@ -31,7 +31,6 @@ import accord.primitives.Ranges;
 import accord.primitives.SaveStatus;
 import accord.primitives.Status;
 import accord.primitives.Timestamp;
-import accord.primitives.Txn;
 import accord.primitives.Txn.Kind.Kinds;
 import accord.primitives.TxnId;
 import accord.primitives.Unseekable;
@@ -107,7 +106,7 @@ public interface CommandSummaries
         {
             public interface Factory<L extends Loader>
             {
-                L create(Unseekables<?> searchKeysOrRanges, RedundantBefore 
redundantBefore, Kinds testKind, TxnId minTxnId, Timestamp maxTxnId, @Nullable 
TxnId findAsDep);
+                L create(@Nullable TxnId primaryTxnId, Unseekables<?> 
searchKeysOrRanges, RedundantBefore redundantBefore, Kinds testKind, TxnId 
minTxnId, Timestamp maxTxnId, @Nullable TxnId findAsDep);
             }
 
             protected final Unseekables<?> searchKeysOrRanges;
@@ -135,10 +134,10 @@ public interface CommandSummaries
                 Timestamp maxTxnId = primaryTxnId == null || keyHistory == 
KeyHistory.RECOVER || !primaryTxnId.is(ExclusiveSyncPoint) ? Timestamp.MAX : 
primaryTxnId;
                 TxnId findAsDep = primaryTxnId != null && keyHistory == 
KeyHistory.RECOVER ? primaryTxnId : null;
                 Kinds kinds = primaryTxnId == null ? AnyGloballyVisible : 
primaryTxnId.witnesses().or(keyHistory == KeyHistory.RECOVER ? 
primaryTxnId.witnessedBy() : Nothing);
-                return factory.create(keysOrRanges, redundantBefore, kinds, 
minTxnId, maxTxnId, findAsDep);
+                return factory.create(primaryTxnId, keysOrRanges, 
redundantBefore, kinds, minTxnId, maxTxnId, findAsDep);
             }
 
-            public Loader(Unseekables<?> searchKeysOrRanges, RedundantBefore 
redundantBefore, Kinds testKind, TxnId minTxnId, Timestamp maxTxnId, @Nullable 
TxnId findAsDep)
+            public Loader(@Nullable TxnId primaryTxnId, Unseekables<?> 
searchKeysOrRanges, RedundantBefore redundantBefore, Kinds testKind, TxnId 
minTxnId, Timestamp maxTxnId, @Nullable TxnId findAsDep)
             {
                 this.searchKeysOrRanges = searchKeysOrRanges;
                 this.redundantBefore = redundantBefore;
@@ -274,7 +273,6 @@ public interface CommandSummaries
     interface ByTxnIdSnapshot extends CommandSummaries
     {
         NavigableMap<Timestamp, Summary> byTxnId();
-        default boolean mayContainAny(Txn.Kind kind) { return true; }
 
         default boolean visit(Unseekables<?> keysOrRanges,
                               TxnId testTxnId,
diff --git a/accord-core/src/main/java/accord/local/MaxDecidedRX.java 
b/accord-core/src/main/java/accord/local/MaxDecidedRX.java
new file mode 100644
index 00000000..616df546
--- /dev/null
+++ b/accord-core/src/main/java/accord/local/MaxDecidedRX.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package accord.local;
+
+import accord.api.RoutingKey;
+import accord.primitives.Routables;
+import accord.primitives.TxnId;
+import accord.primitives.Unseekables;
+import accord.utils.BTreeReducingRangeMap;
+
+public class MaxDecidedRX extends BTreeReducingRangeMap<TxnId>
+{
+    public static final MaxDecidedRX EMPTY = new MaxDecidedRX();
+
+    private MaxDecidedRX()
+    {
+        super();
+    }
+
+    private MaxDecidedRX(boolean inclusiveEnds, Object[] tree)
+    {
+        super(inclusiveEnds, tree);
+    }
+
+    TxnId get(Routables<?> keysOrRanges)
+    {
+        return foldl(keysOrRanges, TxnId::max, TxnId.NONE);
+    }
+
+    public MaxDecidedRX update(Unseekables<?> keysOrRanges, TxnId maxId)
+    {
+        // note: we use mergeMax to ensure we take the maximum epoch and hlc 
independently from any conflict
+        //  this is particularly essential for propagating unique HLCs, so 
that bootstrap recipients don't
+        //  begin serving reads too early
+        return update(this, keysOrRanges, maxId, TxnId::max, 
MaxDecidedRX::new, Builder::new);
+    }
+
+    private static class Builder extends AbstractBoundariesBuilder<RoutingKey, 
TxnId, MaxDecidedRX>
+    {
+        protected Builder(boolean inclusiveEnds, int capacity)
+        {
+            super(inclusiveEnds, capacity);
+        }
+
+        @Override
+        protected MaxDecidedRX buildInternal(Object[] tree)
+        {
+            return new MaxDecidedRX(inclusiveEnds, tree);
+        }
+    }
+}
diff --git a/accord-core/src/main/java/accord/local/SafeCommandStore.java 
b/accord-core/src/main/java/accord/local/SafeCommandStore.java
index 86fc58cd..8543e472 100644
--- a/accord-core/src/main/java/accord/local/SafeCommandStore.java
+++ b/accord-core/src/main/java/accord/local/SafeCommandStore.java
@@ -58,6 +58,7 @@ import static accord.local.cfk.UpdateUnmanagedMode.REGISTER;
 import static accord.primitives.Routable.Domain.Range;
 import static accord.primitives.Routables.Slice.Minimal;
 import static accord.primitives.SaveStatus.Applied;
+import static accord.primitives.SaveStatus.Committed;
 import static accord.primitives.SaveStatus.Uninitialised;
 import static accord.primitives.Timestamp.Flag.SHARD_BOUND;
 import static accord.utils.Invariants.illegalArgument;
@@ -71,6 +72,21 @@ import static accord.utils.Invariants.illegalState;
  */
 public abstract class SafeCommandStore implements RangesForEpochSupplier, 
RedundantBeforeSupplier, CommandSummaries
 {
+    private static final int MAX_REENTRANCY = 100;
+    private int reentrancyCounter;
+    public boolean tryRecurse()
+    {
+        if (reentrancyCounter == MAX_REENTRANCY)
+            return false;
+        ++reentrancyCounter;
+        return true;
+    }
+    public void unrecurse()
+    {
+        --reentrancyCounter;
+        Invariants.require(reentrancyCounter >= 0);
+    }
+
     /**
      * If the transaction exists (with some associated data) in the 
CommandStore, return it. Otherwise return null.
      *
@@ -259,6 +275,12 @@ public abstract class SafeCommandStore implements 
RangesForEpochSupplier, Redund
             commandStore().markExclusiveSyncPoint(this, updated.txnId(), 
ranges);
         }
 
+        if (newSaveStatus.compareTo(Committed) >= 0 && 
oldSaveStatus.compareTo(Committed) < 0 && 
!newSaveStatus.hasBeen(Status.Truncated))
+        {
+            Ranges ranges = updated.participants().owns().toRanges();
+            commandStore().markExclusiveSyncPointDecided(this, 
updated.txnId(), ranges);
+        }
+
         if (newSaveStatus == Applied && (force || oldSaveStatus != Applied))
         {
             Ranges ranges = updated.participants().touches().toRanges();
diff --git a/accord-core/src/main/java/accord/local/cfk/CommandsForKey.java 
b/accord-core/src/main/java/accord/local/cfk/CommandsForKey.java
index 191c554c..5963902b 100644
--- a/accord-core/src/main/java/accord/local/cfk/CommandsForKey.java
+++ b/accord-core/src/main/java/accord/local/cfk/CommandsForKey.java
@@ -19,7 +19,6 @@
 package accord.local.cfk;
 
 import java.util.Arrays;
-import java.util.EnumMap;
 import java.util.Iterator;
 import java.util.Objects;
 import java.util.function.Predicate;
@@ -307,11 +306,11 @@ public class CommandsForKey extends CommandsForKeyUpdate
 
     public static boolean needsUpdate(SafeCommandStore safeStore, Command 
prev, Command updated)
     {
-        InternalStatus newStatus = from(safeStore, updated);
+        InternalStatus newStatus = from(safeStore, updated, false);
         if (newStatus == null)
             return updated.known().is(ApplyAtKnown) && 
updated.executeAt().hasDistinctHlcAndUniqueHlc();
 
-        InternalStatus prevStatus = from(safeStore, prev);
+        InternalStatus prevStatus = from(safeStore, prev, false);
         if (prevStatus != newStatus)
             return true;
 
@@ -857,48 +856,64 @@ public class CommandsForKey extends CommandsForKeyUpdate
         APPLIED_NOT_EXECUTED /*(reserved)*/    (APPLIED,                       
       true, true, false, true),
         INVALIDATED                            (SummaryStatus.INVALIDATED,     
       false,false,false, false),
         ERASED                                 (SummaryStatus.NONE,            
       false,false,false, false),
-        PRUNED                                 (SummaryStatus.NONE,            
       false,false,false, false)
+        PRUNED                                 (SummaryStatus.NONE,            
       false,false,false, false),
         ;
 
-        static final EnumMap<SaveStatus, InternalStatus> FROM_SAVE_STATUS = 
new EnumMap<>(SaveStatus.class);
+        static final InternalStatus[] FROM_SAVE_STATUS = new 
InternalStatus[SaveStatus.values().length];
+        static final InternalStatus[] FROM_SAVE_STATUS_FILTERED = new 
InternalStatus[SaveStatus.values().length];
         static final InternalStatus[] VALUES = values();
         static final SummaryStatus[] TO_SUMMARY_STATUS;
 
+        private static void putFromSaveStatus(SaveStatus from, InternalStatus 
to)
+        {
+            putFromSaveStatus(from, to, false);
+        }
+        
+        private static void putFromSaveStatus(SaveStatus from, InternalStatus 
to, boolean intermediate)
+        {
+            FROM_SAVE_STATUS[from.ordinal()] = to;
+            if (!intermediate)
+                FROM_SAVE_STATUS_FILTERED[from.ordinal()] = to;
+        }
+        
         static
         {
-            FROM_SAVE_STATUS.put(SaveStatus.PreAccepted, 
PREACCEPTED_WITHOUT_DEPS);
-            FROM_SAVE_STATUS.put(SaveStatus.PreAcceptedWithVote, 
PREACCEPTED_WITHOUT_DEPS);
-            FROM_SAVE_STATUS.put(SaveStatus.PreAcceptedWithDeps, 
PREACCEPTED_WITH_DEPS);
-            FROM_SAVE_STATUS.put(SaveStatus.AcceptedInvalidate, NOTACCEPTED);
-            FROM_SAVE_STATUS.put(SaveStatus.AcceptedInvalidateWithDefinition, 
NOTACCEPTED);
-            FROM_SAVE_STATUS.put(SaveStatus.AcceptedMedium, ACCEPTED);
-            FROM_SAVE_STATUS.put(SaveStatus.AcceptedMediumWithDefinition, 
ACCEPTED);
-            FROM_SAVE_STATUS.put(SaveStatus.AcceptedMediumWithDefAndVote, 
ACCEPTED);
-            FROM_SAVE_STATUS.put(SaveStatus.AcceptedSlow, ACCEPTED);
-            FROM_SAVE_STATUS.put(SaveStatus.AcceptedSlowWithDefinition, 
ACCEPTED);
-            FROM_SAVE_STATUS.put(SaveStatus.AcceptedSlowWithDefAndVote, 
ACCEPTED);
-            FROM_SAVE_STATUS.put(SaveStatus.PreCommitted, 
PREACCEPTED_WITHOUT_DEPS);
-            FROM_SAVE_STATUS.put(SaveStatus.PreCommittedWithDefinition, 
PREACCEPTED_WITHOUT_DEPS);
-            FROM_SAVE_STATUS.put(SaveStatus.PreCommittedWithDeps, ACCEPTED);
-            FROM_SAVE_STATUS.put(SaveStatus.PreCommittedWithFixedDeps, 
ACCEPTED);
-            FROM_SAVE_STATUS.put(SaveStatus.PreCommittedWithDefAndDeps, 
ACCEPTED);
-            FROM_SAVE_STATUS.put(SaveStatus.PreCommittedWithDefAndFixedDeps, 
ACCEPTED);
-            FROM_SAVE_STATUS.put(SaveStatus.Committed, COMMITTED);
-            FROM_SAVE_STATUS.put(SaveStatus.Stable, STABLE);
-            FROM_SAVE_STATUS.put(SaveStatus.ReadyToExecute, STABLE);
-            FROM_SAVE_STATUS.put(SaveStatus.PreApplied, STABLE);
-            // SaveStatus.Applying is a transient state; let PreApplied and 
Applied handle updates
-            FROM_SAVE_STATUS.put(SaveStatus.Applied, APPLIED_NOT_DURABLE);
-            // We don't map TruncatedApplyX or Erased as we want to retain 
them as APPLIED
+            putFromSaveStatus(SaveStatus.PreAccepted, 
PREACCEPTED_WITHOUT_DEPS);
+            putFromSaveStatus(SaveStatus.PreAcceptedWithVote, 
PREACCEPTED_WITHOUT_DEPS);
+            putFromSaveStatus(SaveStatus.PreAcceptedWithDeps, 
PREACCEPTED_WITH_DEPS);
+            putFromSaveStatus(SaveStatus.AcceptedInvalidate, NOTACCEPTED);
+            putFromSaveStatus(SaveStatus.AcceptedInvalidateWithDefinition, 
NOTACCEPTED);
+            putFromSaveStatus(SaveStatus.AcceptedMedium, ACCEPTED);
+            putFromSaveStatus(SaveStatus.AcceptedMediumWithDefinition, 
ACCEPTED);
+            putFromSaveStatus(SaveStatus.AcceptedMediumWithDefAndVote, 
ACCEPTED);
+            putFromSaveStatus(SaveStatus.AcceptedSlow, ACCEPTED);
+            putFromSaveStatus(SaveStatus.AcceptedSlowWithDefinition, ACCEPTED);
+            putFromSaveStatus(SaveStatus.AcceptedSlowWithDefAndVote, ACCEPTED);
+            putFromSaveStatus(SaveStatus.PreCommitted, 
PREACCEPTED_WITHOUT_DEPS);
+            putFromSaveStatus(SaveStatus.PreCommittedWithDefinition, 
PREACCEPTED_WITHOUT_DEPS);
+            putFromSaveStatus(SaveStatus.PreCommittedWithDeps, ACCEPTED);
+            putFromSaveStatus(SaveStatus.PreCommittedWithFixedDeps, ACCEPTED);
+            putFromSaveStatus(SaveStatus.PreCommittedWithDefAndDeps, ACCEPTED);
+            putFromSaveStatus(SaveStatus.PreCommittedWithDefAndFixedDeps, 
ACCEPTED);
+            putFromSaveStatus(SaveStatus.Committed, COMMITTED);
+            putFromSaveStatus(SaveStatus.Stable, STABLE);
+            putFromSaveStatus(SaveStatus.ReadyToExecute, STABLE);
+            putFromSaveStatus(SaveStatus.PreApplied, STABLE);
+            putFromSaveStatus(SaveStatus.Applying, STABLE);
+            putFromSaveStatus(SaveStatus.Applied, APPLIED_NOT_DURABLE);
+            putFromSaveStatus(SaveStatus.TruncatedApplyWithOutcome, 
APPLIED_DURABLE, true);
+            putFromSaveStatus(SaveStatus.TruncatedApply, APPLIED_DURABLE, 
true);
+            putFromSaveStatus(SaveStatus.TruncatedUnapplied, APPLIED_DURABLE, 
true);
+            putFromSaveStatus(SaveStatus.Erased, ERASED, true);
             // esp. to support pruning where we expect the prunedBefore 
entr*ies* to be APPLIED
             // Note importantly that we have multiple logical pruned befores - 
the last APPLIED
             // write per epoch is retained to cleanly support
             // TODO (desired): can we improve our semantics here to at least 
PRUNE truncated commands if there's a superseding APPLIED command?
             // TODO (desired): move all truncated to ERASED, but we have to 
handle the case we erase our prunedBefore boundary
-            FROM_SAVE_STATUS.put(SaveStatus.Vestigial, ERASED);
-            FROM_SAVE_STATUS.put(SaveStatus.Invalidated, INVALIDATED);
+            putFromSaveStatus(SaveStatus.Vestigial, ERASED);
+            putFromSaveStatus(SaveStatus.Invalidated, INVALIDATED);
             for (SaveStatus saveStatus : SaveStatus.values())
-                Invariants.require(FROM_SAVE_STATUS.get(saveStatus) != null || 
saveStatus.is(Status.Truncated) || saveStatus.is(Status.NotDefined) || 
saveStatus == SaveStatus.Applying);
+                Invariants.require(FROM_SAVE_STATUS[saveStatus.ordinal()] != 
null || saveStatus.is(Status.NotDefined));
 
             SummaryStatus[] summaryStatuses = SummaryStatus.values();
             TO_SUMMARY_STATUS = Arrays.copyOf(summaryStatuses, 
summaryStatuses.length + 1);
@@ -964,7 +979,7 @@ public class CommandsForKey extends CommandsForKeyUpdate
         @VisibleForTesting
         public static InternalStatus from(SaveStatus status)
         {
-            return FROM_SAVE_STATUS.get(status);
+            return FROM_SAVE_STATUS[status.ordinal()];
         }
 
         @VisibleForTesting
@@ -984,9 +999,10 @@ public class CommandsForKey extends CommandsForKeyUpdate
             return result;
         }
 
-        public static InternalStatus from(SafeCommandStore safeStore, Command 
command)
+        public static InternalStatus from(SafeCommandStore safeStore, Command 
command, boolean includeIntermediate)
         {
-            InternalStatus status = from(command.saveStatus());
+            SaveStatus saveStatus = command.saveStatus();
+            InternalStatus status = (includeIntermediate ? FROM_SAVE_STATUS : 
FROM_SAVE_STATUS_FILTERED)[saveStatus.ordinal()];
             if (status == APPLIED_NOT_DURABLE && 
command.durability().isDurable())
                 status = APPLIED_DURABLE;
             if (status == PREACCEPTED_WITH_DEPS && 
command.txnId().node.equals(safeStore.node().id()) && 
!Ballot.ZERO.equals(command.promised()))
@@ -1345,7 +1361,6 @@ public class CommandsForKey extends CommandsForKeyUpdate
     {
         visitor.visitMaxAppliedHlc(maxUniqueHlc);
 
-        TxnId prunedBefore = prunedBefore();
         int end = insertPos(startedBefore);
         // We only filter durable transactions less than BOTH the txnId and 
executeAt of our max preceding write.
         // This is to avoid the following pre-bootstrap edge case, so this 
filtering can be made stricter in future:
@@ -1546,12 +1561,14 @@ public class CommandsForKey extends CommandsForKeyUpdate
     private CommandsForKeyUpdate update(SafeCommandStore safeStore, Command 
update, @Nullable LoadingPruned loading)
     {
         Invariants.require(manages(update.txnId()));
-        InternalStatus newStatus = from(safeStore, update);
+        InternalStatus newStatus = from(safeStore, update, true);
         if (newStatus == null)
         {
             if (loading == null)
             {
                 // handle replay of TruncatedApply with uniqueHlc
+                // note: this is no longer needed because we will replay 
TruncatedApply entries,
+                // but we keep the logic in case we modify that behaviour later
                 if (update.known().is(ApplyAtKnown))
                     return updateUniqueHlc(update.executeAt().uniqueHlc());
                 return this;
diff --git a/accord-core/src/main/java/accord/local/cfk/NotifySink.java 
b/accord-core/src/main/java/accord/local/cfk/NotifySink.java
index 3e929168..8a4df5a6 100644
--- a/accord-core/src/main/java/accord/local/cfk/NotifySink.java
+++ b/accord-core/src/main/java/accord/local/cfk/NotifySink.java
@@ -51,7 +51,11 @@ interface NotifySink
         public void notWaiting(SafeCommandStore safeStore, TxnId txnId, 
RoutingKey key, long uniqueHlc)
         {
             SafeCommand safeCommand = safeStore.ifLoadedAndInitialised(txnId);
-            if (safeCommand != null) notWaiting(safeStore, safeCommand, key, 
uniqueHlc);
+            if (safeCommand != null && safeStore.tryRecurse())
+            {
+                try { notWaiting(safeStore, safeCommand, key, uniqueHlc); }
+                finally { safeStore.unrecurse(); }
+            }
             else
             {
                 safeStore.commandStore().execute(txnId, safeStore0 -> {
@@ -70,7 +74,11 @@ interface NotifySink
         {
             TxnId txnId = notify.plainTxnId();
 
-            if (safeStore.canExecuteWith(txnId)) doNotifyWaitingOn(safeStore, 
txnId, key, waitingOnStatus, blockedUntil, notifyCfk);
+            if (safeStore.canExecuteWith(txnId) && safeStore.tryRecurse())
+            {
+                try { doNotifyWaitingOn(safeStore, txnId, key, 
waitingOnStatus, blockedUntil, notifyCfk); }
+                finally { safeStore.unrecurse(); }
+            }
             else safeStore.commandStore().execute(txnId, safeStore0 -> {
                 doNotifyWaitingOn(safeStore0, txnId, key, waitingOnStatus, 
blockedUntil, notifyCfk);
             }, safeStore.agent());
@@ -113,9 +121,10 @@ interface NotifySink
         private void doNotifyAlreadyReady(SafeCommandStore safeStore, TxnId 
txnId, RoutingKey key)
         {
             SafeCommandsForKey update = safeStore.ifLoadedAndInitialised(key);
-            if (update != null)
+            if (update != null && safeStore.tryRecurse())
             {
-                update.callback(safeStore, 
safeStore.unsafeGet(txnId).current());
+                try { update.callback(safeStore, 
safeStore.unsafeGet(txnId).current()); }
+                finally { safeStore.unrecurse(); }
             }
             else
             {
diff --git a/accord-core/src/main/java/accord/local/cfk/PostProcess.java 
b/accord-core/src/main/java/accord/local/cfk/PostProcess.java
index 4651bd4c..ec3046b6 100644
--- a/accord-core/src/main/java/accord/local/cfk/PostProcess.java
+++ b/accord-core/src/main/java/accord/local/cfk/PostProcess.java
@@ -97,7 +97,11 @@ abstract class PostProcess
             {
                 safeStore = safeStore; // make it unsafe for use in lambda
                 SafeCommand safeCommand = 
safeStore.ifLoadedAndInitialised(txnId);
-                if (safeCommand != null) load(safeStore, safeCommand, safeCfk, 
notifySink);
+                if (safeCommand != null && safeStore.tryRecurse())
+                {
+                    try { load(safeStore, safeCommand, safeCfk, notifySink); }
+                    finally { safeStore.unrecurse(); }
+                }
                 else safeStore.commandStore().execute(contextFor(txnId, 
RoutingKeys.of(key), SYNC), safeStore0 -> {
                     load(safeStore0, safeStore0.unsafeGet(txnId), 
safeStore0.get(key), notifySink);
                 }, safeStore.agent());
@@ -205,13 +209,20 @@ abstract class PostProcess
             for (TxnId txnId : notify)
             {
                 SafeCommand safeCommand = 
safeStore.ifLoadedAndInitialised(txnId);
-                if (safeCommand != null)
+                if (safeCommand != null && safeStore.tryRecurse())
                 {
-                    CommandsForKeyUpdate update = updateUnmanaged(cfk, 
safeCommand, UPDATE, addUnmanageds);
-                    if (update != cfk)
+                    try
+                    {
+                        CommandsForKeyUpdate update = updateUnmanaged(cfk, 
safeCommand, UPDATE, addUnmanageds);
+                        if (update != cfk)
+                        {
+                            Invariants.require(update.cfk() == cfk);
+                            nestedNotify.add(update.postProcess());
+                        }
+                    }
+                    finally
                     {
-                        Invariants.require(update.cfk() == cfk);
-                        nestedNotify.add(update.postProcess());
+                        safeStore.unrecurse();
                     }
                 }
                 else
diff --git a/accord-core/src/main/java/accord/local/cfk/Updating.java 
b/accord-core/src/main/java/accord/local/cfk/Updating.java
index d93f2735..fd3b0a43 100644
--- a/accord-core/src/main/java/accord/local/cfk/Updating.java
+++ b/accord-core/src/main/java/accord/local/cfk/Updating.java
@@ -287,20 +287,24 @@ class Updating
         return newMaxAppliedPreBootstrapWriteById;
     }
 
-    static Object computeInfoAndAdditions(CommandsForKey cfk, int insertPos, 
int updatePos, TxnId txnId, InternalStatus newStatus, boolean mayExecute, 
Command command)
+    static Object computeInfoAndAdditions(CommandsForKey cfk, int insertPos, 
int updatePos, TxnId plainTxnId, InternalStatus newStatus, boolean mayExecute, 
Command command)
     {
         Invariants.require(newStatus.hasDeps);
         Timestamp executeAt = command.executeAt();
-        if (!newStatus.hasExecuteAt || executeAt.equals(txnId)) executeAt = 
txnId;
+        if (!newStatus.hasExecuteAt || executeAt.equals(plainTxnId)) executeAt 
= plainTxnId;
         Ballot ballot = Ballot.ZERO;
         if (newStatus.hasBallot)
             ballot = command.acceptedOrCommitted();
 
-        Timestamp depsKnownBefore = newStatus.depsKnownBefore(txnId, 
executeAt);
+        // TODO (desired): is this the best place to encode this behaviour?
+        if (command.saveStatus().is(Status.Truncated))
+            return TxnInfo.create(plainTxnId, newStatus, mayExecute, 
executeAt, NO_TXNIDS, ballot);
+
+        Timestamp depsKnownBefore = newStatus.depsKnownBefore(plainTxnId, 
executeAt);
         MergeCursor<TxnId, DepList> deps = 
command.partialDeps().txnIds(cfk.key());
         deps.find(cfk.redundantBefore());
 
-        return computeInfoAndAdditions(cfk.byId, insertPos, updatePos, txnId, 
newStatus, mayExecute, ballot, executeAt, cfk.prunedBefore(), depsKnownBefore, 
deps);
+        return computeInfoAndAdditions(cfk.byId, insertPos, updatePos, 
plainTxnId, newStatus, mayExecute, ballot, executeAt, cfk.prunedBefore(), 
depsKnownBefore, deps);
     }
 
     /**
diff --git a/accord-core/src/main/java/accord/topology/TopologyManager.java 
b/accord-core/src/main/java/accord/topology/TopologyManager.java
index bad57fe2..4e055eb9 100644
--- a/accord-core/src/main/java/accord/topology/TopologyManager.java
+++ b/accord-core/src/main/java/accord/topology/TopologyManager.java
@@ -884,7 +884,7 @@ public class TopologyManager
     {
         Epochs epochs = this.epochs;
         // No epochs known to Accord
-        if (epochs.firstNonEmptyEpoch == -1)
+        if (epochs.firstNonEmptyEpoch == -1 || minEpoch > epochs.currentEpoch)
             return new TopologyRange(epochs.minEpoch(), epochs.currentEpoch, 
epochs.firstNonEmptyEpoch, Collections.emptyList());
 
         minEpoch = Math.max(minEpoch, epochs.minEpoch());
diff --git a/accord-core/src/main/java/accord/utils/BTreeReducingRangeMap.java 
b/accord-core/src/main/java/accord/utils/BTreeReducingRangeMap.java
index 159124fb..5729bbb6 100644
--- a/accord-core/src/main/java/accord/utils/BTreeReducingRangeMap.java
+++ b/accord-core/src/main/java/accord/utils/BTreeReducingRangeMap.java
@@ -156,6 +156,37 @@ public class BTreeReducingRangeMap<V> extends 
BTreeReducingIntervalMap<RoutingKe
         return accumulator;
     }
 
+    public <V2, P1, P2> V2 foldl(RoutingKey start, RoutingKey end, 
QuadFunction<V, V2, P1, P2, V2> fold, V2 accumulator, P1 p1, P2 p2)
+    {
+        if (isEmpty())
+            return accumulator;
+
+        int treeSize = BTree.size(tree);
+        int startIdx = findIndex(start);
+        int startPos = startIdx < 0 ? (-1 - startIdx) : startIdx;
+        if (startIdx == treeSize - 1 || startPos == treeSize)
+            return accumulator;  // is last or out of bounds -> we are done
+        if (startIdx < 0) startPos = Math.max(0, startPos - 1); // inclusive
+
+        int endIdx = findIndex(end);
+        int endPos = endIdx < 0 ? (-1 - endIdx) : endIdx;
+        if (endPos == 0)
+            return accumulator;
+
+        endPos = Math.min(endPos - 1, treeSize - 2); // inclusive
+
+        Iterator<Entry<RoutingKey,V>> iterator = BTree.iterator(tree, 
startPos, endPos, BTree.Dir.ASC);
+
+        while (iterator.hasNext())
+        {
+            Entry<RoutingKey, V> entry = iterator.next();
+            if (entry.hasValue() && entry.value() != null)
+                accumulator = fold.apply(entry.value(), accumulator, p1, p2);
+        }
+
+        return accumulator;
+    }
+
     public int findIndex(RoutableKey key)
     {
         return BTree.findIndex(tree, EntryComparator.instance(), key);
diff --git 
a/accord-core/src/main/java/accord/utils/CheckpointIntervalArray.java 
b/accord-core/src/main/java/accord/utils/CheckpointIntervalArray.java
index 7767ca12..92805dc3 100644
--- a/accord-core/src/main/java/accord/utils/CheckpointIntervalArray.java
+++ b/accord-core/src/main/java/accord/utils/CheckpointIntervalArray.java
@@ -172,13 +172,24 @@ public class CheckpointIntervalArray<Ranges, Range, Key>
                 }
                 else if ((ri & BIT29) != 0)
                 {
+                    // TODO (expected): remove this logic in near future; have 
disabled this optimisation in the builder
                     subStart = ri & 0x1fffffff;
-                    subEnd = Integer.MAX_VALUE;
+                    subEnd = checkpointStart;
+                    for (int prevCheckpoint = checkpoint - 1; prevCheckpoint 
>= 0; --prevCheckpoint)
+                    {
+                        int prevHeaderBaseIndex = (prevCheckpoint / 4) * 5;
+                        int prevHeaderSubIndex = prevCheckpoint & 3;
+                        int prevHeaderListIndex = prevHeaderBaseIndex + 1 + 
prevHeaderSubIndex;
+                        int prevCheckpointStart = 
checkpointLists[prevHeaderListIndex];
+                        if (prevCheckpointStart <= subStart)
+                            break;
+                        subEnd = prevCheckpointStart;
+                    }
                 }
                 else
                 {
                     int length = ri & 0x1fffffff;
-                    subStart = checkpointLists[++i];
+                    subStart = checkpointLists[++i] & 0x7fffffff;
                     subEnd = subStart + length;
                 }
 
diff --git 
a/accord-core/src/main/java/accord/utils/CheckpointIntervalArrayBuilder.java 
b/accord-core/src/main/java/accord/utils/CheckpointIntervalArrayBuilder.java
index d8df85c0..6f5aa394 100644
--- a/accord-core/src/main/java/accord/utils/CheckpointIntervalArrayBuilder.java
+++ b/accord-core/src/main/java/accord/utils/CheckpointIntervalArrayBuilder.java
@@ -85,6 +85,7 @@ public class CheckpointIntervalArrayBuilder<Ranges, Range, 
RoutingKey>
     private static final int BIT31 = 0x80000000;
     private static final int BIT30 = 0x40000000;
     private static final int BIT29 = 0x20000000;
+    private static final int BIT28 = 0x10000000;
     static final int MIN_INDIRECT_LINK_LENGTH = 2;
 
     final Accessor<Ranges, Range, RoutingKey> accessor;
@@ -448,18 +449,23 @@ public class CheckpointIntervalArrayBuilder<Ranges, 
Range, RoutingKey>
             {
                 int index = t.index & ~BIT31;
                 int length = t.linkLength & ~BIT31;
+                Invariants.require(length < BIT30);
                 if (length <= 0xff && index <= 0xfffff)
                 {
                     lists[listCount++] = BIT31 | BIT30 | (length << 20) | 
index;
                 }
-                else if (t.linkLength >= 0 && length < BIT30)
-                {
-                    lists[listCount++] = BIT31 | BIT29 | index;
-                }
+//                This logic has been disabled.
+//                It is possible for some pathological cases to have the 
checkpoint lists run into one another.
+//                So that the search is not guaranteed to stop based on 
comparison, and an end index can only be computed
+//                by scanning the checkpoint list array, which is fine but not 
worth the complexity or saving 4 bytes
+//                else if (t.linkLength >= 0)
+//                {
+//                    lists[listCount++] = BIT31 | BIT29 | index;
+//                }
                 else
                 {
                     lists[listCount++] = BIT31 | length;
-                    lists[listCount++] = BIT31 | pending.count();
+                    lists[listCount++] = BIT31 | index;
                 }
             }
         }
@@ -1053,7 +1059,8 @@ public class CheckpointIntervalArrayBuilder<Ranges, 
Range, RoutingKey>
                         // we may want to reference this list from there
                         // so track count and position of first one to make a 
determination
                         ++openDirectCount;
-                        if (firstOpenDirect < 0) firstOpenDirect = count;
+                        if (firstOpenDirect < 0)
+                            firstOpenDirect = count;
                     }
                     else hasClosedDirect = true;
                 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org


Reply via email to