This is an automated email from the ASF dual-hosted git repository. ifesdjeen pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
commit ca04e56014be1408856e94c56c53abaf02981f2b Author: Alex Petrov <oleksandr.pet...@gmail.com> AuthorDate: Thu Jun 5 18:39:39 2025 +0200 Account for truncated epoch when computing the low bound Patch by Alex Petrov and Benedict Elliott Smith; reviewed by Alex Petrov and Benedict Elliott Smith for CASSANDRA-20702. --- .../src/main/java/accord/impl/progresslog/HomeState.java | 3 ++- .../src/main/java/accord/impl/progresslog/WaitingState.java | 10 +++++++--- accord-core/src/main/java/accord/local/CommandStores.java | 11 ++++++++--- accord-core/src/main/java/accord/local/StoreParticipants.java | 2 +- .../src/main/java/accord/topology/TopologyManager.java | 11 ++++------- 5 files changed, 22 insertions(+), 15 deletions(-) diff --git a/accord-core/src/main/java/accord/impl/progresslog/HomeState.java b/accord-core/src/main/java/accord/impl/progresslog/HomeState.java index 22a4e366..8c1885d8 100644 --- a/accord-core/src/main/java/accord/impl/progresslog/HomeState.java +++ b/accord-core/src/main/java/accord/impl/progresslog/HomeState.java @@ -152,7 +152,8 @@ abstract class HomeState extends WaitingState ProgressToken maxProgressToken = instance.savedProgressToken(txnId).merge(command); CallbackInvoker<ProgressToken, Outcome> invoker = invokeHomeCallback(instance, txnId, maxProgressToken, HomeState::recoverCallback); - long lowEpoch = safeStore.ranges().latestEarlierEpochThatFullyCovers(txnId.epoch(), command.participants().hasTouched()); + + long lowEpoch = safeStore.ranges().latestEarlierEpochThatFullyCovers(safeStore, txnId.epoch(), command.participants().hasTouched()); long highEpoch = safeStore.ranges().earliestLaterEpochThatFullyCovers(command.executeAtIfKnownElseTxnId().epoch(), command.participants().hasTouched()); instance.debugActive(MaybeRecover.maybeRecover(instance.node(), txnId, invalidIf(), command.route(), maxProgressToken, lowEpoch, highEpoch, invoker), invoker); set(safeStore, instance, ReadyToExecute, Querying); diff --git a/accord-core/src/main/java/accord/impl/progresslog/WaitingState.java b/accord-core/src/main/java/accord/impl/progresslog/WaitingState.java index b919d0ad..37084d45 100644 --- a/accord-core/src/main/java/accord/impl/progresslog/WaitingState.java +++ b/accord-core/src/main/java/accord/impl/progresslog/WaitingState.java @@ -240,7 +240,7 @@ abstract class WaitingState extends BaseTxnState if (offset >= 3) { offset = 3; - lowEpoch = safeStore.ranges().latestEarlierEpochThatFullyCovers(lowEpoch, command.maxContactable()); + lowEpoch = safeStore.ranges().latestEarlierEpochThatFullyCovers(safeStore, lowEpoch, command.maxContactable()); } encodedState = encodedState & ~(0x3L << AWAIT_EPOCH_SHIFT); encodedState |= ((long)offset) << AWAIT_EPOCH_SHIFT; @@ -255,11 +255,15 @@ abstract class WaitingState extends BaseTxnState long readLowEpoch(SafeCommandStore safeStore, TxnId txnId, Route<?> route) { int offset = (int) ((encodedState >>> AWAIT_EPOCH_SHIFT) & 0x3); + if (offset == 0) + return txnId.epoch(); + RangesForEpoch ranges = safeStore.ranges(); - long epoch = ranges.epochAtIndex(Math.max(0, ranges.floorIndex(txnId.epoch())) - offset); + int i = ranges.floorIndex(txnId.epoch()) - (offset - 1); + long epoch = ranges.epochAtIndex(Math.max(0, i)) - 1; if (offset < 3) return epoch; - return safeStore.ranges().latestEarlierEpochThatFullyCovers(epoch, route); + return safeStore.ranges().latestEarlierEpochThatFullyCovers(safeStore, epoch, route); } boolean hasNewLowEpoch(SafeCommandStore safeStore, TxnId txnId, long prevLowEpoch, long newLowEpoch) diff --git a/accord-core/src/main/java/accord/local/CommandStores.java b/accord-core/src/main/java/accord/local/CommandStores.java index 8e12ca1f..985c26e1 100644 --- a/accord-core/src/main/java/accord/local/CommandStores.java +++ b/accord-core/src/main/java/accord/local/CommandStores.java @@ -360,17 +360,22 @@ public abstract class CommandStores return Math.max(sinceEpoch, epochs[0]); } - public long latestEarlierEpochThatFullyCovers(long beforeEpoch, Unseekables<?> keysOrRanges) + public long latestEarlierEpochThatFullyCovers(SafeCommandStore safeStore, long beforeEpoch, Unseekables<?> keysOrRanges) { int i = ceilIndex(beforeEpoch); + if (i == 0) + return beforeEpoch; long latest = beforeEpoch; Ranges existing = i >= ranges.length ? Ranges.EMPTY : ranges[i]; - while (--i >= 0) + long minEpoch = safeStore.node().topology().minEpoch(); + while (--i >= 0 && minEpoch < epochs[i]) { if (ranges[i].without(existing).intersects(keysOrRanges)) - latest = epochs[i]; + latest = epochs[i + 1] - 1; existing = existing.with(ranges[i]); } + if (latest < beforeEpoch) + latest = Math.max(latest, safeStore.node().topology().minEpoch()); return latest; } diff --git a/accord-core/src/main/java/accord/local/StoreParticipants.java b/accord-core/src/main/java/accord/local/StoreParticipants.java index aad213c4..5f286beb 100644 --- a/accord-core/src/main/java/accord/local/StoreParticipants.java +++ b/accord-core/src/main/java/accord/local/StoreParticipants.java @@ -688,7 +688,7 @@ public class StoreParticipants private static long computeCoveringEpoch(SafeCommandStore safeStore, long txnIdEpoch, Participants<?> participants) { - long lowEpoch = safeStore.ranges().latestEarlierEpochThatFullyCovers(txnIdEpoch, participants); + long lowEpoch = safeStore.ranges().latestEarlierEpochThatFullyCovers(safeStore, txnIdEpoch, participants); return Math.min(lowEpoch, txnIdEpoch); } } diff --git a/accord-core/src/main/java/accord/topology/TopologyManager.java b/accord-core/src/main/java/accord/topology/TopologyManager.java index d815ac58..05888aa4 100644 --- a/accord-core/src/main/java/accord/topology/TopologyManager.java +++ b/accord-core/src/main/java/accord/topology/TopologyManager.java @@ -314,14 +314,11 @@ public class TopologyManager else { this.epochs = Arrays.copyOf(epochs, truncateFrom); - if (logger.isDebugEnabled()) + for (int i = truncateFrom; i < epochs.length; i++) { - for (int i = truncateFrom; i < epochs.length; i++) - { - EpochState state = epochs[i]; - Invariants.require(epochs[i].syncComplete()); - logger.debug("Retired epoch {} with added/removed ranges {}/{}. Topology: {}. Closed: {}", state.epoch(), state.addedRanges, state.removedRanges, state.global.ranges, state.closed); - } + EpochState state = epochs[i]; + Invariants.require(epochs[i].syncComplete()); + logger.info("Retired epoch {} with added/removed ranges {}/{}. Topology: {}. Closed: {}", state.epoch(), state.addedRanges, state.removedRanges, state.global.ranges, state.closed); } if (logger.isTraceEnabled()) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org