This is an automated email from the ASF dual-hosted git repository.

ifesdjeen pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a4a8255b Fix AccordMigrationTest, preclude possible races in topology 
propagation
a4a8255b is described below

commit a4a8255bebb6804b8f55289f2fbc846a5253e66c
Author: Alex Petrov <[email protected]>
AuthorDate: Fri Feb 7 19:03:21 2025 +0100

    Fix AccordMigrationTest, preclude possible races in topology propagation
    
    Patch by Alex Petrov; reviewed by Benedict Elliott Smith for CASSANDRA-20316
---
 .../accord/impl/AbstractConfigurationService.java  | 44 +++++-----------------
 accord-core/src/main/java/accord/local/Node.java   | 11 +++---
 .../main/java/accord/topology/TopologyManager.java | 20 ++++++++--
 .../accord/burn/BurnTestConfigurationService.java  | 15 +++++++-
 .../impl/AbstractConfigurationServiceTest.java     |  2 +-
 5 files changed, 47 insertions(+), 45 deletions(-)

diff --git 
a/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java 
b/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java
index 92048bd9..fe8046fe 100644
--- a/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java
+++ b/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java
@@ -21,7 +21,6 @@ package accord.impl;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
-
 import javax.annotation.concurrent.GuardedBy;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -285,35 +284,8 @@ public abstract class 
AbstractConfigurationService<EpochState extends AbstractCo
         return epochs.topologyFor(epoch);
     }
 
-    protected abstract void fetchTopologyInternal(long epoch);
-
-    @GuardedBy("this")
-    private long maxRequestedEpoch;
     @Override
-    public void fetchTopologyForEpoch(long epoch)
-    {
-        synchronized (this)
-        {
-            if (epoch <= maxRequestedEpoch)
-                return;
-
-            maxRequestedEpoch = epoch;
-        }
-
-        try
-        {
-            fetchTopologyInternal(epoch);
-        }
-        catch (Throwable t)
-        {
-            // This epoch will not be fetched, so we need to reset it back
-            synchronized (this)
-            {
-                maxRequestedEpoch = 0;
-            }
-            throw t;
-        }
-    }
+    public abstract void fetchTopologyForEpoch(long epoch);
 
     // TODO (expected): rename, sync is too ambiguous
     protected abstract void localSyncComplete(Topology topology, boolean 
startSync);
@@ -339,8 +311,8 @@ public abstract class 
AbstractConfigurationService<EpochState extends AbstractCo
         if (lastReceived > 0 && topology.epoch() > lastReceived + 1)
         {
             logger.debug("Epoch {} received; waiting to receive {} before 
reporting", topology.epoch(), lastReceived + 1);
-            fetchTopologyForEpoch(lastReceived + 1);
             epochs.receiveFuture(lastReceived + 1).addCallback(() -> 
reportTopology(topology, isLoad, startSync));
+            fetchTopologyForEpoch(lastReceived + 1);
             return;
         }
 
@@ -404,13 +376,17 @@ public abstract class 
AbstractConfigurationService<EpochState extends AbstractCo
         epochs.truncateUntil(epoch);
     }
 
+    // synchronized because state.reads is written
     public AsyncChain<Void> epochReady(long epoch)
     {
-        EpochState state = epochs.getOrCreate(epoch);
-        if (state.reads != null)
-            return state.reads;
+        synchronized (this)
+        {
+            EpochState state = epochs.getOrCreate(epoch);
+            if (state.reads != null)
+                return state.reads;
 
-        return state.acknowledged.flatMap(r -> state.reads);
+            return state.acknowledged.flatMap(r -> state.reads);
+        }
     }
 
     public abstract static class Minimal extends 
AbstractConfigurationService<Minimal.EpochState, Minimal.EpochHistory>
diff --git a/accord-core/src/main/java/accord/local/Node.java 
b/accord-core/src/main/java/accord/local/Node.java
index 21b57487..c89c3544 100644
--- a/accord-core/src/main/java/accord/local/Node.java
+++ b/accord-core/src/main/java/accord/local/Node.java
@@ -96,8 +96,8 @@ import accord.utils.async.AsyncResults;
 import accord.utils.async.Cancellable;
 import net.nicoulaj.compilecommand.annotations.Inline;
 
-import static accord.api.ProtocolModifiers.Toggles.ensurePermitted;
 import static accord.api.ProtocolModifiers.Toggles.defaultMediumPath;
+import static accord.api.ProtocolModifiers.Toggles.ensurePermitted;
 import static accord.api.ProtocolModifiers.Toggles.usePrivilegedCoordinator;
 import static accord.primitives.Routable.Domain.Key;
 import static accord.primitives.TxnId.Cardinality.Any;
@@ -402,8 +402,8 @@ public class Node implements ConfigurationService.Listener, 
NodeCommandStoreServ
         }
         else
         {
-            configService.fetchTopologyForEpoch(epoch);
             topology.awaitEpoch(epoch).begin(callback);
+            configService.fetchTopologyForEpoch(epoch);
         }
     }
 
@@ -415,11 +415,11 @@ public class Node implements 
ConfigurationService.Listener, NodeCommandStoreServ
         }
         else
         {
-            configService.fetchTopologyForEpoch(epoch);
             topology.awaitEpoch(epoch).begin((success, fail) -> {
                 if (fail != null) ifFailure.accept(null, fail);
                 else ifSuccess.run();
             });
+            configService.fetchTopologyForEpoch(epoch);
         }
     }
 
@@ -431,11 +431,11 @@ public class Node implements 
ConfigurationService.Listener, NodeCommandStoreServ
         }
         else
         {
-            configService.fetchTopologyForEpoch(epoch);
             topology.awaitEpoch(epoch).begin((success, fail) -> {
                 if (fail != null) ifFailure.accept(null, 
onFailure.apply(fail));
                 else ifSuccess.run();
             });
+            configService.fetchTopologyForEpoch(epoch);
         }
     }
 
@@ -448,8 +448,9 @@ public class Node implements ConfigurationService.Listener, 
NodeCommandStoreServ
         }
         else
         {
+            AsyncChain<T> res = topology.awaitEpoch(epoch).flatMap(ignore -> 
supplier.get());
             configService.fetchTopologyForEpoch(epoch);
-            return topology.awaitEpoch(epoch).flatMap(ignore -> 
supplier.get());
+            return res;
         }
     }
 
diff --git a/accord-core/src/main/java/accord/topology/TopologyManager.java 
b/accord-core/src/main/java/accord/topology/TopologyManager.java
index 999b6ee4..88460f2e 100644
--- a/accord-core/src/main/java/accord/topology/TopologyManager.java
+++ b/accord-core/src/main/java/accord/topology/TopologyManager.java
@@ -34,6 +34,9 @@ import javax.annotation.concurrent.GuardedBy;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import accord.api.Agent;
 import accord.api.ConfigurationService;
 import accord.api.ConfigurationService.EpochReady;
@@ -88,6 +91,7 @@ import static accord.utils.Invariants.nonNull;
 public class TopologyManager
 {
     private static final FutureEpoch SUCCESS;
+    private static final Logger logger = 
LoggerFactory.getLogger(TopologyManager.class);
 
     static
     {
@@ -250,6 +254,11 @@ public class TopologyManager
             this.currentEpoch = epochs.length > 0 ? epochs[0].epoch() : 0;
             this.pending = pending;
             this.futureEpochs = futureEpochs;
+            if (!futureEpochs.isEmpty())
+                Invariants.require(futureEpochs.get(0).epoch == currentEpoch + 
1);
+
+            for (int i = 1; i < futureEpochs.size(); i++)
+                Invariants.requireArgument(futureEpochs.get(i).epoch == 
futureEpochs.get(i - 1).epoch - 1);
             for (int i=1; i<epochs.length; i++)
                 Invariants.requireArgument(epochs[i].epoch() == epochs[i - 
1].epoch() - 1);
             this.epochs = epochs;
@@ -619,7 +628,6 @@ public class TopologyManager
     public synchronized EpochReady onTopologyUpdate(Topology topology, 
Supplier<EpochReady> bootstrap)
     {
         Epochs current = epochs;
-
         Invariants.requireArgument(topology.epoch == current.nextEpoch() || 
epochs == Epochs.EMPTY,
                                    "Expected topology update %d to be %d", 
topology.epoch, current.nextEpoch());
         EpochState[] nextEpochs = new EpochState[current.epochs.length + 1];
@@ -1071,12 +1079,18 @@ public class TopologyManager
 
     public boolean hasEpoch(long epoch)
     {
-        return epochs.get(epoch) != null;
+        synchronized (this)
+        {
+            return epochs.get(epoch) != null;
+        }
     }
 
     public boolean hasAtLeastEpoch(long epoch)
     {
-        return epochs.currentEpoch >= epoch;
+        synchronized (this)
+        {
+            return epochs.currentEpoch >= epoch;
+        }
     }
 
     public Topology localForEpoch(long epoch)
diff --git 
a/accord-core/src/test/java/accord/burn/BurnTestConfigurationService.java 
b/accord-core/src/test/java/accord/burn/BurnTestConfigurationService.java
index 7e3b23e9..a506b9de 100644
--- a/accord-core/src/test/java/accord/burn/BurnTestConfigurationService.java
+++ b/accord-core/src/test/java/accord/burn/BurnTestConfigurationService.java
@@ -25,6 +25,7 @@ import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import java.util.function.Supplier;
+import javax.annotation.concurrent.GuardedBy;
 
 import accord.api.TestableConfigurationService;
 import accord.impl.AbstractConfigurationService;
@@ -165,9 +166,19 @@ public class BurnTestConfigurationService extends 
AbstractConfigurationService.M
         else node.scheduler().selfRecurring(() -> 
super.reportTopology(topology, isLoad, startSync), 0, TimeUnit.MILLISECONDS);
     }
 
-    @Override
-    protected void fetchTopologyInternal(long epoch)
+    @GuardedBy("this")
+    private long maxRequestedEpoch;
+
+    public void fetchTopologyForEpoch(long epoch)
     {
+        synchronized (this)
+        {
+            if (epoch <= maxRequestedEpoch)
+                return;
+
+            maxRequestedEpoch = epoch;
+        }
+
         pendingEpochs.computeIfAbsent(epoch, FetchTopology::new);
     }
 
diff --git 
a/accord-core/src/test/java/accord/impl/AbstractConfigurationServiceTest.java 
b/accord-core/src/test/java/accord/impl/AbstractConfigurationServiceTest.java
index 03b142d2..95ac1bc0 100644
--- 
a/accord-core/src/test/java/accord/impl/AbstractConfigurationServiceTest.java
+++ 
b/accord-core/src/test/java/accord/impl/AbstractConfigurationServiceTest.java
@@ -143,7 +143,7 @@ public class AbstractConfigurationServiceTest
         }
 
         @Override
-        protected void fetchTopologyInternal(long epoch)
+        public void fetchTopologyForEpoch(long epoch)
         {
             epochsFetched.add(epoch);
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to