This is an automated email from the ASF dual-hosted git repository.

ifesdjeen pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git

commit ca04e56014be1408856e94c56c53abaf02981f2b
Author: Alex Petrov <oleksandr.pet...@gmail.com>
AuthorDate: Thu Jun 5 18:39:39 2025 +0200

    Account for truncated epoch when computing the low bound
    
    Patch by Alex Petrov and Benedict Elliott Smith; reviewed by Alex Petrov 
and Benedict Elliott Smith for CASSANDRA-20702.
---
 .../src/main/java/accord/impl/progresslog/HomeState.java      |  3 ++-
 .../src/main/java/accord/impl/progresslog/WaitingState.java   | 10 +++++++---
 accord-core/src/main/java/accord/local/CommandStores.java     | 11 ++++++++---
 accord-core/src/main/java/accord/local/StoreParticipants.java |  2 +-
 .../src/main/java/accord/topology/TopologyManager.java        | 11 ++++-------
 5 files changed, 22 insertions(+), 15 deletions(-)

diff --git a/accord-core/src/main/java/accord/impl/progresslog/HomeState.java 
b/accord-core/src/main/java/accord/impl/progresslog/HomeState.java
index 22a4e366..8c1885d8 100644
--- a/accord-core/src/main/java/accord/impl/progresslog/HomeState.java
+++ b/accord-core/src/main/java/accord/impl/progresslog/HomeState.java
@@ -152,7 +152,8 @@ abstract class HomeState extends WaitingState
         ProgressToken maxProgressToken = 
instance.savedProgressToken(txnId).merge(command);
 
         CallbackInvoker<ProgressToken, Outcome> invoker = 
invokeHomeCallback(instance, txnId, maxProgressToken, 
HomeState::recoverCallback);
-        long lowEpoch = 
safeStore.ranges().latestEarlierEpochThatFullyCovers(txnId.epoch(), 
command.participants().hasTouched());
+
+        long lowEpoch = 
safeStore.ranges().latestEarlierEpochThatFullyCovers(safeStore, txnId.epoch(), 
command.participants().hasTouched());
         long highEpoch = 
safeStore.ranges().earliestLaterEpochThatFullyCovers(command.executeAtIfKnownElseTxnId().epoch(),
 command.participants().hasTouched());
         instance.debugActive(MaybeRecover.maybeRecover(instance.node(), txnId, 
invalidIf(), command.route(), maxProgressToken, lowEpoch, highEpoch, invoker), 
invoker);
         set(safeStore, instance, ReadyToExecute, Querying);
diff --git 
a/accord-core/src/main/java/accord/impl/progresslog/WaitingState.java 
b/accord-core/src/main/java/accord/impl/progresslog/WaitingState.java
index b919d0ad..37084d45 100644
--- a/accord-core/src/main/java/accord/impl/progresslog/WaitingState.java
+++ b/accord-core/src/main/java/accord/impl/progresslog/WaitingState.java
@@ -240,7 +240,7 @@ abstract class WaitingState extends BaseTxnState
         if (offset >= 3)
         {
             offset = 3;
-            lowEpoch = 
safeStore.ranges().latestEarlierEpochThatFullyCovers(lowEpoch, 
command.maxContactable());
+            lowEpoch = 
safeStore.ranges().latestEarlierEpochThatFullyCovers(safeStore, lowEpoch, 
command.maxContactable());
         }
         encodedState = encodedState & ~(0x3L << AWAIT_EPOCH_SHIFT);
         encodedState |= ((long)offset) << AWAIT_EPOCH_SHIFT;
@@ -255,11 +255,15 @@ abstract class WaitingState extends BaseTxnState
     long readLowEpoch(SafeCommandStore safeStore, TxnId txnId, Route<?> route)
     {
         int offset = (int) ((encodedState >>> AWAIT_EPOCH_SHIFT) & 0x3);
+        if (offset == 0)
+            return txnId.epoch();
+
         RangesForEpoch ranges = safeStore.ranges();
-        long epoch = ranges.epochAtIndex(Math.max(0, 
ranges.floorIndex(txnId.epoch())) - offset);
+        int i = ranges.floorIndex(txnId.epoch()) - (offset - 1);
+        long epoch = ranges.epochAtIndex(Math.max(0, i)) - 1;
         if (offset < 3)
             return epoch;
-        return safeStore.ranges().latestEarlierEpochThatFullyCovers(epoch, 
route);
+        return safeStore.ranges().latestEarlierEpochThatFullyCovers(safeStore, 
epoch, route);
     }
 
     boolean hasNewLowEpoch(SafeCommandStore safeStore, TxnId txnId, long 
prevLowEpoch, long newLowEpoch)
diff --git a/accord-core/src/main/java/accord/local/CommandStores.java 
b/accord-core/src/main/java/accord/local/CommandStores.java
index 8e12ca1f..985c26e1 100644
--- a/accord-core/src/main/java/accord/local/CommandStores.java
+++ b/accord-core/src/main/java/accord/local/CommandStores.java
@@ -360,17 +360,22 @@ public abstract class CommandStores
             return Math.max(sinceEpoch, epochs[0]);
         }
 
-        public long latestEarlierEpochThatFullyCovers(long beforeEpoch, 
Unseekables<?> keysOrRanges)
+        public long latestEarlierEpochThatFullyCovers(SafeCommandStore 
safeStore, long beforeEpoch, Unseekables<?> keysOrRanges)
         {
             int i = ceilIndex(beforeEpoch);
+            if (i == 0)
+                return beforeEpoch;
             long latest = beforeEpoch;
             Ranges existing = i >= ranges.length ? Ranges.EMPTY : ranges[i];
-            while (--i >= 0)
+            long minEpoch = safeStore.node().topology().minEpoch();
+            while (--i >= 0 && minEpoch < epochs[i])
             {
                 if (ranges[i].without(existing).intersects(keysOrRanges))
-                    latest = epochs[i];
+                    latest = epochs[i + 1] - 1;
                 existing = existing.with(ranges[i]);
             }
+            if (latest < beforeEpoch)
+                latest = Math.max(latest, 
safeStore.node().topology().minEpoch());
             return latest;
         }
 
diff --git a/accord-core/src/main/java/accord/local/StoreParticipants.java 
b/accord-core/src/main/java/accord/local/StoreParticipants.java
index aad213c4..5f286beb 100644
--- a/accord-core/src/main/java/accord/local/StoreParticipants.java
+++ b/accord-core/src/main/java/accord/local/StoreParticipants.java
@@ -688,7 +688,7 @@ public class StoreParticipants
 
     private static long computeCoveringEpoch(SafeCommandStore safeStore, long 
txnIdEpoch, Participants<?> participants)
     {
-        long lowEpoch = 
safeStore.ranges().latestEarlierEpochThatFullyCovers(txnIdEpoch, participants);
+        long lowEpoch = 
safeStore.ranges().latestEarlierEpochThatFullyCovers(safeStore, txnIdEpoch, 
participants);
         return Math.min(lowEpoch, txnIdEpoch);
     }
 }
diff --git a/accord-core/src/main/java/accord/topology/TopologyManager.java 
b/accord-core/src/main/java/accord/topology/TopologyManager.java
index d815ac58..05888aa4 100644
--- a/accord-core/src/main/java/accord/topology/TopologyManager.java
+++ b/accord-core/src/main/java/accord/topology/TopologyManager.java
@@ -314,14 +314,11 @@ public class TopologyManager
             else
             {
                 this.epochs = Arrays.copyOf(epochs, truncateFrom);
-                if (logger.isDebugEnabled())
+                for (int i = truncateFrom; i < epochs.length; i++)
                 {
-                    for (int i = truncateFrom; i < epochs.length; i++)
-                    {
-                        EpochState state = epochs[i];
-                        Invariants.require(epochs[i].syncComplete());
-                        logger.debug("Retired epoch {} with added/removed 
ranges {}/{}. Topology: {}. Closed: {}", state.epoch(), state.addedRanges, 
state.removedRanges, state.global.ranges, state.closed);
-                    }
+                    EpochState state = epochs[i];
+                    Invariants.require(epochs[i].syncComplete());
+                    logger.info("Retired epoch {} with added/removed ranges 
{}/{}. Topology: {}. Closed: {}", state.epoch(), state.addedRanges, 
state.removedRanges, state.global.ranges, state.closed);
                 }
                 if (logger.isTraceEnabled())
                 {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to