This is an automated email from the ASF dual-hosted git repository.

konstantinov pushed a commit to branch fixes-260226
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git

commit 87a203d578eaca0b75275227718c12b89efd5154
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Sat Feb 28 09:40:15 2026 +0000

    Fix prepend epoch race condition
---
 .../main/java/accord/topology/PendingEpochs.java   | 49 +++++++++++++++++++---
 .../main/java/accord/topology/TopologyManager.java | 23 +++++-----
 2 files changed, 54 insertions(+), 18 deletions(-)

diff --git a/accord-core/src/main/java/accord/topology/PendingEpochs.java 
b/accord-core/src/main/java/accord/topology/PendingEpochs.java
index ad0009fe..2423be9d 100644
--- a/accord-core/src/main/java/accord/topology/PendingEpochs.java
+++ b/accord-core/src/main/java/accord/topology/PendingEpochs.java
@@ -32,6 +32,7 @@ class PendingEpochs
     final TopologyManager manager;
     private PendingEpoch[] epochs = new PendingEpoch[16];
     private int start, end;
+    private long minPermittedEpoch = 0;
 
     PendingEpochs(TopologyManager manager)
     {
@@ -48,6 +49,16 @@ class PendingEpochs
         return end == start;
     }
 
+    boolean isPermitted(long epoch)
+    {
+        return epoch >= minPermittedEpoch;
+    }
+
+    void setMinPermitted(long minPrependEpoch)
+    {
+        this.minPermittedEpoch = minPrependEpoch;
+    }
+
     private void append(PendingEpoch append)
     {
         if (end == epochs.length)
@@ -94,7 +105,9 @@ class PendingEpochs
     void remoteReadyToCoordinate(Node.Id node, long epoch)
     {
         Invariants.requireArgument(epoch > 0);
-        getOrCreate(epoch).remoteReadyToCoordinate(node);
+        PendingEpoch pending = getOrCreateIfPermitted(epoch);
+        if (pending != null)
+            pending.remoteReadyToCoordinate(node);
     }
 
     /**
@@ -103,7 +116,10 @@ class PendingEpochs
      */
     Ranges closed(Ranges ranges, long epoch)
     {
-        return getOrCreate(epoch).closed(ranges);
+        PendingEpoch pending = getOrCreateIfPermitted(epoch);
+        if (pending != null)
+            return pending.closed(ranges);
+        return ranges;
     }
 
     /**
@@ -112,7 +128,10 @@ class PendingEpochs
      */
     Ranges retired(Ranges ranges, long epoch)
     {
-        return getOrCreate(epoch).retired(ranges);
+        PendingEpoch pending = getOrCreateIfPermitted(epoch);
+        if (pending != null)
+            return pending.retired(ranges);
+        return ranges;
     }
 
     PendingEpoch atIndex(int i)
@@ -127,9 +146,16 @@ class PendingEpochs
         return isEmpty() ? 0 : epochs[end - 1].epoch;
     }
 
-    PendingEpoch getOrCreate(long epoch)
+    long minEpoch()
+    {
+        return isEmpty() ? 0 : epochs[start].epoch;
+    }
+
+    PendingEpoch getOrCreateIfPermitted(long epoch)
     {
-        Invariants.require(manager.active().currentEpoch < epoch);
+        if (!isPermitted(epoch))
+            return null;
+
         if (isEmpty())
         {
             append(new PendingEpoch(epoch, manager));
@@ -155,10 +181,21 @@ class PendingEpochs
         return epochs[end - 1];
     }
 
+    PendingEpoch getOrCreateOrMin(long epoch)
+    {
+        PendingEpoch pending = getOrCreateIfPermitted(epoch);
+        return pending != null ? pending : atIndex(0);
+    }
+
+    PendingEpoch getOrCreate(long epoch)
+    {
+        return Invariants.nonNull(getOrCreateIfPermitted(epoch));
+    }
+
     void removeFirst(long epoch)
     {
         Invariants.require(start < end);
-        Invariants.require(epochs[start].epoch == epoch);
+        Invariants.require(epochs[start].epoch == epoch, "% != %d", 
epochs[start].epoch, epoch);
         epochs[start++] = null;
     }
 }
diff --git a/accord-core/src/main/java/accord/topology/TopologyManager.java 
b/accord-core/src/main/java/accord/topology/TopologyManager.java
index 4ca397da..c67f570e 100644
--- a/accord-core/src/main/java/accord/topology/TopologyManager.java
+++ b/accord-core/src/main/java/accord/topology/TopologyManager.java
@@ -290,9 +290,9 @@ public class TopologyManager
         {
             long epoch = topology.epoch;
             // if active is empty, treat the earliest pending epoch as our low 
bound to avoid race conditions where we begin updating active but discover an 
earlier epoch
-            long currentEpoch = !active.isEmpty() ? active.currentEpoch : 
!pending.isEmpty() ? pending.atIndex(0).epoch - 1 : 0;
-            if (epoch <= currentEpoch)
+            if (!pending.isPermitted(epoch))
             {
+                long currentEpoch = !active.isEmpty() ? active.currentEpoch : 
pending.minEpoch();
                 logger.debug("Ignoring topology for epoch {} which is behind 
our latest epoch {}", epoch, currentEpoch);
                 return;
             }
@@ -322,13 +322,15 @@ public class TopologyManager
                 PendingEpoch pending;
                 synchronized (this)
                 {
-                    if (this.pending.isEmpty() || (!this.active.isEmpty() && 
this.pending.atIndex(0).epoch > 1 + current().epoch()))
+                    if (this.pending.isEmpty() || (!active.isEmpty() && 
active.currentEpoch + 1 != this.pending.minEpoch()))
                         return;
 
                     pending = this.pending.atIndex(0);
                     topology = pending.topology();
                     if (topology == null)
                         return;
+
+                    this.pending.setMinPermitted(topology.epoch + 1);
                 }
 
                 Supplier<EpochReady> bootstrap = 
node.commandStores().updateTopology(node, topology);
@@ -428,7 +430,8 @@ public class TopologyManager
             if (epoch <= active.currentEpoch)
                 return AsyncChains.success(null);
 
-            pendingEpoch = pending.getOrCreate(epoch);
+
+            pendingEpoch = pending.getOrCreateOrMin(epoch);
             fetch = pendingEpoch.fetching == null;
         }
 
@@ -445,8 +448,8 @@ public class TopologyManager
                     if (epoch <= active.currentEpoch)
                         break;
 
-                    pendingEpoch = pending.getOrCreate(epoch);
-                    if (pendingEpoch.fetching != null)
+                    pendingEpoch = pending.getOrCreateIfPermitted(epoch);
+                    if (pendingEpoch == null || pendingEpoch.fetching != null)
                         break;
                 }
             }
@@ -487,13 +490,9 @@ public class TopologyManager
         synchronized (this)
         {
             if (active.hasAtLeastEpoch(epoch))
-            {
-                if (!active.hasEpoch(epoch))
-                    return get.apply(EpochReady.done(epoch));
-                return get.apply(active.getKnown(epoch).epochReady());
-            }
+                return get.apply(active.epochReady(epoch));
 
-            return pending.getOrCreate(epoch).whenActive().get().flatMap(r -> 
get.apply(active.epochReady(epoch)));
+            return 
pending.getOrCreateOrMin(epoch).whenActive().get().flatMap(r -> 
get.apply(active.epochReady(epoch)));
         }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to