This is an automated email from the ASF dual-hosted git repository. konstantinov pushed a commit to branch fixes-260226 in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
commit 87a203d578eaca0b75275227718c12b89efd5154 Author: Benedict Elliott Smith <[email protected]> AuthorDate: Sat Feb 28 09:40:15 2026 +0000 Fix prepend epoch race condition --- .../main/java/accord/topology/PendingEpochs.java | 49 +++++++++++++++++++--- .../main/java/accord/topology/TopologyManager.java | 23 +++++----- 2 files changed, 54 insertions(+), 18 deletions(-) diff --git a/accord-core/src/main/java/accord/topology/PendingEpochs.java b/accord-core/src/main/java/accord/topology/PendingEpochs.java index ad0009fe..2423be9d 100644 --- a/accord-core/src/main/java/accord/topology/PendingEpochs.java +++ b/accord-core/src/main/java/accord/topology/PendingEpochs.java @@ -32,6 +32,7 @@ class PendingEpochs final TopologyManager manager; private PendingEpoch[] epochs = new PendingEpoch[16]; private int start, end; + private long minPermittedEpoch = 0; PendingEpochs(TopologyManager manager) { @@ -48,6 +49,16 @@ class PendingEpochs return end == start; } + boolean isPermitted(long epoch) + { + return epoch >= minPermittedEpoch; + } + + void setMinPermitted(long minPrependEpoch) + { + this.minPermittedEpoch = minPrependEpoch; + } + private void append(PendingEpoch append) { if (end == epochs.length) @@ -94,7 +105,9 @@ class PendingEpochs void remoteReadyToCoordinate(Node.Id node, long epoch) { Invariants.requireArgument(epoch > 0); - getOrCreate(epoch).remoteReadyToCoordinate(node); + PendingEpoch pending = getOrCreateIfPermitted(epoch); + if (pending != null) + pending.remoteReadyToCoordinate(node); } /** @@ -103,7 +116,10 @@ class PendingEpochs */ Ranges closed(Ranges ranges, long epoch) { - return getOrCreate(epoch).closed(ranges); + PendingEpoch pending = getOrCreateIfPermitted(epoch); + if (pending != null) + return pending.closed(ranges); + return ranges; } /** @@ -112,7 +128,10 @@ class PendingEpochs */ Ranges retired(Ranges ranges, long epoch) { - return getOrCreate(epoch).retired(ranges); + PendingEpoch pending = getOrCreateIfPermitted(epoch); + if (pending != null) + return pending.retired(ranges); + return ranges; } PendingEpoch atIndex(int i) @@ -127,9 +146,16 @@ class PendingEpochs return isEmpty() ? 0 : epochs[end - 1].epoch; } - PendingEpoch getOrCreate(long epoch) + long minEpoch() + { + return isEmpty() ? 0 : epochs[start].epoch; + } + + PendingEpoch getOrCreateIfPermitted(long epoch) { - Invariants.require(manager.active().currentEpoch < epoch); + if (!isPermitted(epoch)) + return null; + if (isEmpty()) { append(new PendingEpoch(epoch, manager)); @@ -155,10 +181,21 @@ class PendingEpochs return epochs[end - 1]; } + PendingEpoch getOrCreateOrMin(long epoch) + { + PendingEpoch pending = getOrCreateIfPermitted(epoch); + return pending != null ? pending : atIndex(0); + } + + PendingEpoch getOrCreate(long epoch) + { + return Invariants.nonNull(getOrCreateIfPermitted(epoch)); + } + void removeFirst(long epoch) { Invariants.require(start < end); - Invariants.require(epochs[start].epoch == epoch); + Invariants.require(epochs[start].epoch == epoch, "% != %d", epochs[start].epoch, epoch); epochs[start++] = null; } } diff --git a/accord-core/src/main/java/accord/topology/TopologyManager.java b/accord-core/src/main/java/accord/topology/TopologyManager.java index 4ca397da..c67f570e 100644 --- a/accord-core/src/main/java/accord/topology/TopologyManager.java +++ b/accord-core/src/main/java/accord/topology/TopologyManager.java @@ -290,9 +290,9 @@ public class TopologyManager { long epoch = topology.epoch; // if active is empty, treat the earliest pending epoch as our low bound to avoid race conditions where we begin updating active but discover an earlier epoch - long currentEpoch = !active.isEmpty() ? active.currentEpoch : !pending.isEmpty() ? pending.atIndex(0).epoch - 1 : 0; - if (epoch <= currentEpoch) + if (!pending.isPermitted(epoch)) { + long currentEpoch = !active.isEmpty() ? active.currentEpoch : pending.minEpoch(); logger.debug("Ignoring topology for epoch {} which is behind our latest epoch {}", epoch, currentEpoch); return; } @@ -322,13 +322,15 @@ public class TopologyManager PendingEpoch pending; synchronized (this) { - if (this.pending.isEmpty() || (!this.active.isEmpty() && this.pending.atIndex(0).epoch > 1 + current().epoch())) + if (this.pending.isEmpty() || (!active.isEmpty() && active.currentEpoch + 1 != this.pending.minEpoch())) return; pending = this.pending.atIndex(0); topology = pending.topology(); if (topology == null) return; + + this.pending.setMinPermitted(topology.epoch + 1); } Supplier<EpochReady> bootstrap = node.commandStores().updateTopology(node, topology); @@ -428,7 +430,8 @@ public class TopologyManager if (epoch <= active.currentEpoch) return AsyncChains.success(null); - pendingEpoch = pending.getOrCreate(epoch); + + pendingEpoch = pending.getOrCreateOrMin(epoch); fetch = pendingEpoch.fetching == null; } @@ -445,8 +448,8 @@ public class TopologyManager if (epoch <= active.currentEpoch) break; - pendingEpoch = pending.getOrCreate(epoch); - if (pendingEpoch.fetching != null) + pendingEpoch = pending.getOrCreateIfPermitted(epoch); + if (pendingEpoch == null || pendingEpoch.fetching != null) break; } } @@ -487,13 +490,9 @@ public class TopologyManager synchronized (this) { if (active.hasAtLeastEpoch(epoch)) - { - if (!active.hasEpoch(epoch)) - return get.apply(EpochReady.done(epoch)); - return get.apply(active.getKnown(epoch).epochReady()); - } + return get.apply(active.epochReady(epoch)); - return pending.getOrCreate(epoch).whenActive().get().flatMap(r -> get.apply(active.epochReady(epoch))); + return pending.getOrCreateOrMin(epoch).whenActive().get().flatMap(r -> get.apply(active.epochReady(epoch))); } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
