This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cep-15-accord by this push:
     new c9211f0a05 Accord's ConfigService lock is held over large areas which 
cause deadlocks and performance issues
c9211f0a05 is described below

commit c9211f0a059644ed0fb14287c1b2e757cf1e6467
Author: David Capwell <[email protected]>
AuthorDate: Thu Oct 17 22:04:10 2024 +0100

    Accord's ConfigService lock is held over large areas which cause deadlocks 
and performance issues
    
    patch by David Capwell; reviewed by Benedict Elliott Smith for 
CASSANDRA-20065
---
 modules/accord                                     |   2 +-
 .../service/accord/AccordConfigurationService.java | 117 +++++++----
 .../test/accord/AccordBootstrapTest.java           |   2 +-
 .../cassandra/simulator/test/EpochStressTest.java  | 231 +++++++++++++++++++++
 4 files changed, 309 insertions(+), 43 deletions(-)

diff --git a/modules/accord b/modules/accord
index 8bd9d69803..a8f78552b9 160000
--- a/modules/accord
+++ b/modules/accord
@@ -1 +1 @@
-Subproject commit 8bd9d6980350fa68a1db676a7b10940cf0541fb5
+Subproject commit a8f78552b958a9d9b6fc9f2840753001cd1f6004
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java 
b/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java
index 08461249c3..42ac0325a8 100644
--- 
a/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java
+++ 
b/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java
@@ -26,11 +26,10 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Sets;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import accord.impl.AbstractConfigurationService;
 import accord.local.Node;
@@ -67,15 +66,15 @@ import static 
org.apache.cassandra.utils.Simulate.With.MONITORS;
 @Simulate(with=MONITORS)
 public class AccordConfigurationService extends 
AbstractConfigurationService<AccordConfigurationService.EpochState, 
AccordConfigurationService.EpochHistory> implements ChangeListener, 
AccordEndpointMapper, AccordSyncPropagator.Listener, Shutdownable
 {
-    private static final Logger logger = 
LoggerFactory.getLogger(AccordConfigurationService.class);
-
     private final AccordSyncPropagator syncPropagator;
     private final DiskStateManager diskStateManager;
 
+    @GuardedBy("this")
     private EpochDiskState diskState = EpochDiskState.EMPTY;
 
     private enum State { INITIALIZED, LOADING, STARTED, SHUTDOWN }
 
+    @GuardedBy("this")
     private State state = State.INITIALIZED;
     private volatile EndpointMapping mapping = EndpointMapping.EMPTY;
 
@@ -83,7 +82,7 @@ public class AccordConfigurationService extends 
AbstractConfigurationService<Acc
 
     static class EpochState extends 
AbstractConfigurationService.AbstractEpochState
     {
-        SyncStatus syncStatus = SyncStatus.NOT_STARTED;
+        private volatile SyncStatus syncStatus = SyncStatus.NOT_STARTED;
         protected final AsyncResult.Settable<Void> localSyncNotified = 
AsyncResults.settable();
 
         public EpochState(long epoch)
@@ -303,7 +302,7 @@ public class AccordConfigurationService extends 
AbstractConfigurationService<Acc
     }
 
     @VisibleForTesting
-    EpochDiskState diskState()
+    synchronized EpochDiskState diskState()
     {
         return diskState;
     }
@@ -325,12 +324,12 @@ public class AccordConfigurationService extends 
AbstractConfigurationService<Acc
         Stage.MISC.submit(() -> reportMetadataInternal(metadata));
     }
 
-    synchronized void reportMetadataInternal(ClusterMetadata metadata)
+    void reportMetadataInternal(ClusterMetadata metadata)
     {
         reportMetadataInternal(metadata, false);
     }
 
-    synchronized void reportMetadataInternal(ClusterMetadata metadata, boolean 
isLoad)
+    void reportMetadataInternal(ClusterMetadata metadata, boolean isLoad)
     {
         updateMapping(metadata);
         Topology topology = AccordTopology.createAccordTopology(metadata);
@@ -342,12 +341,19 @@ public class AccordConfigurationService extends 
AbstractConfigurationService<Acc
                     throw new IllegalStateException("Epoch " + 
topology.epoch() + " has node " + node + " but mapping does not!");
             }
         }
-        Topology current = isEmpty() ? Topology.EMPTY : currentTopology();
         reportTopology(topology);
+        if (epochs.lastAcknowledged() >= topology.epoch()) 
checkIfNodesRemoved(topology);
+        else epochs.acknowledgeFuture(topology.epoch()).addCallback(() -> 
checkIfNodesRemoved(topology));
+    }
+
+    private void checkIfNodesRemoved(Topology topology)
+    {
+        if (epochs.minEpoch() == topology.epoch()) return;
+        Topology previous = getTopologyForEpoch(topology.epoch() - 1);
         // for all nodes removed, or pending removal, mark them as removed so 
we don't wait on their replies
-        Sets.SetView<Node.Id> removedNodes = Sets.difference(current.nodes(), 
topology.nodes());
+        Sets.SetView<Node.Id> removedNodes = Sets.difference(previous.nodes(), 
topology.nodes());
         if (!removedNodes.isEmpty())
-            onNodesRemoved(topology.epoch(), current, removedNodes);
+            onNodesRemoved(topology.epoch(), previous, removedNodes);
     }
 
     private static boolean shareShard(Topology current, Node.Id target, 
Node.Id self)
@@ -360,7 +366,7 @@ public class AccordConfigurationService extends 
AbstractConfigurationService<Acc
         return false;
     }
 
-    public synchronized void onNodesRemoved(long epoch, Topology current, 
Set<Node.Id> removed)
+    public void onNodesRemoved(long epoch, Topology current, Set<Node.Id> 
removed)
     {
         if (removed.isEmpty()) return;
         syncPropagator.onNodesRemoved(removed);
@@ -381,11 +387,14 @@ public class AccordConfigurationService extends 
AbstractConfigurationService<Acc
     private long[] nonCompletedEpochsBefore(long max)
     {
         LongArrayList notComplete = new LongArrayList();
-        for (long epoch = epochs.minEpoch(); epoch <= max && epoch <= 
epochs.maxEpoch(); epoch++)
+        synchronized (epochs)
         {
-            EpochSnapshot snapshot = getEpochSnapshot(epoch);
-            if (snapshot.syncStatus != SyncStatus.COMPLETED)
-                notComplete.add(epoch);
+            for (long epoch = epochs.minEpoch(), maxKnown = epochs.maxEpoch(); 
epoch <= max && epoch <= maxKnown; epoch++)
+            {
+                EpochSnapshot snapshot = getEpochSnapshot(epoch);
+                if (snapshot.syncStatus != SyncStatus.COMPLETED)
+                    notComplete.add(epoch);
+            }
         }
         return notComplete.toLongArray();
     }
@@ -394,17 +403,17 @@ public class AccordConfigurationService extends 
AbstractConfigurationService<Acc
     void maybeReportMetadata(ClusterMetadata metadata)
     {
         // don't report metadata until the previous one has been acknowledged
-        synchronized (this)
+        long epoch = metadata.epoch.getEpoch();
+        synchronized (epochs)
         {
-            long epoch = metadata.epoch.getEpoch();
             if (epochs.maxEpoch() == 0)
             {
                 getOrCreateEpochState(epoch);  // touch epoch state so 
subsequent calls see it
                 reportMetadata(metadata);
                 return;
             }
-            getOrCreateEpochState(epoch - 1).acknowledged().addCallback(() -> 
reportMetadata(metadata));
         }
+        getOrCreateEpochState(epoch - 1).acknowledged().addCallback(() -> 
reportMetadata(metadata));
     }
 
     @Override
@@ -439,16 +448,21 @@ public class AccordConfigurationService extends 
AbstractConfigurationService<Acc
     }
 
     @Override
-    protected synchronized void localSyncComplete(Topology topology, boolean 
startSync)
+    protected void localSyncComplete(Topology topology, boolean startSync)
     {
         long epoch = topology.epoch();
         EpochState epochState = getOrCreateEpochState(epoch);
-        if (!startSync ||epochState.syncStatus != SyncStatus.NOT_STARTED)
+        if (!startSync || epochState.syncStatus != SyncStatus.NOT_STARTED)
             return;
 
         Set<Node.Id> notify = topology.nodes().stream().filter(i -> 
!localId.equals(i)).collect(Collectors.toSet());
-        diskState = diskStateManager.setNotifyingLocalSync(epoch, notify, 
diskState);
-        epochState.setSyncStatus(SyncStatus.NOTIFYING);
+        synchronized (this)
+        {
+            if (epochState.syncStatus != SyncStatus.NOT_STARTED)
+                return;
+            diskState = diskStateManager.setNotifyingLocalSync(epoch, notify, 
diskState);
+            epochState.setSyncStatus(SyncStatus.NOTIFYING);
+        }
         syncPropagator.reportSyncComplete(epoch, notify, localId);
     }
 
@@ -462,11 +476,14 @@ public class AccordConfigurationService extends 
AbstractConfigurationService<Acc
     }
 
     @Override
-    public synchronized void onComplete(long epoch)
+    public void onComplete(long epoch)
     {
         EpochState epochState = getOrCreateEpochState(epoch);
-        epochState.setSyncStatus(SyncStatus.COMPLETED);
-        diskState = diskStateManager.setCompletedLocalSync(epoch, diskState);
+        synchronized (this)
+        {
+            epochState.setSyncStatus(SyncStatus.COMPLETED);
+            diskState = diskStateManager.setCompletedLocalSync(epoch, 
diskState);
+        }
     }
 
     @Override
@@ -484,20 +501,21 @@ public class AccordConfigurationService extends 
AbstractConfigurationService<Acc
     }
 
     @Override
-    public synchronized void reportEpochClosed(Ranges ranges, long epoch)
+    public void reportEpochClosed(Ranges ranges, long epoch)
     {
         checkStarted();
         Topology topology = getTopologyForEpoch(epoch);
         syncPropagator.reportClosed(epoch, topology.nodes(), ranges);
     }
 
+    @VisibleForTesting
     public AccordSyncPropagator syncPropagator()
     {
         return syncPropagator;
     }
 
     @Override
-    public synchronized void reportEpochRedundant(Ranges ranges, long epoch)
+    public void reportEpochRedundant(Ranges ranges, long epoch)
     {
         checkStarted();
         // TODO (expected): ensure we aren't fetching a truncated epoch; 
otherwise this should be non-null
@@ -506,21 +524,27 @@ public class AccordConfigurationService extends 
AbstractConfigurationService<Acc
     }
 
     @Override
-    public synchronized void receiveClosed(Ranges ranges, long epoch)
+    public void receiveClosed(Ranges ranges, long epoch)
     {
-        diskState = diskStateManager.markClosed(ranges, epoch, diskState);
+        synchronized (this)
+        {
+            diskState = diskStateManager.markClosed(ranges, epoch, diskState);
+        }
         super.receiveClosed(ranges, epoch);
     }
 
     @Override
-    public synchronized void receiveRedundant(Ranges ranges, long epoch)
+    public void receiveRedundant(Ranges ranges, long epoch)
     {
-        diskState = diskStateManager.markClosed(ranges, epoch, diskState);
+        synchronized (this)
+        {
+            diskState = diskStateManager.markClosed(ranges, epoch, diskState);
+        }
         super.receiveRedundant(ranges, epoch);
     }
 
     @Override
-    protected synchronized void truncateTopologiesPreListenerNotify(long epoch)
+    protected void truncateTopologiesPreListenerNotify(long epoch)
     {
         checkStarted();
     }
@@ -532,7 +556,7 @@ public class AccordConfigurationService extends 
AbstractConfigurationService<Acc
             diskState = diskStateManager.truncateTopologyUntil(epoch, 
diskState);
     }
 
-    private void checkStarted()
+    private synchronized void checkStarted()
     {
         State state = this.state;
         Invariants.checkState(state == State.STARTED, "Expected state to be 
STARTED but was %s", state);
@@ -614,28 +638,39 @@ public class AccordConfigurationService extends 
AbstractConfigurationService<Acc
     }
 
     @VisibleForTesting
-    public synchronized EpochSnapshot getEpochSnapshot(long epoch)
+    public EpochSnapshot getEpochSnapshot(long epoch)
     {
-        if (epoch < epochs.minEpoch() || epoch > epochs.maxEpoch())
-            return null;
+        EpochState state;
+        // If epoch truncate happens then getting the epoch again will 
recreate an empty one
+        synchronized (epochs)
+        {
+            if (epoch < epochs.minEpoch() || epoch > epochs.maxEpoch())
+                return null;
 
-        return new EpochSnapshot(getOrCreateEpochState(epoch));
+            state = getOrCreateEpochState(epoch);
+        }
+        return new EpochSnapshot(state);
     }
 
     @VisibleForTesting
-    public synchronized long minEpoch()
+    public long minEpoch()
     {
         return epochs.minEpoch();
     }
 
     @VisibleForTesting
-    public synchronized long maxEpoch()
+    public long maxEpoch()
     {
         return epochs.maxEpoch();
     }
 
+    /**
+     * The callback is resolved while holding the object lock, which can cause 
the future chain to resolve while also
+     * holding the lock!  This behavior is exposed for tests and is unsafe due 
to the lock behind held while resolving
+     * the callback
+     */
     @VisibleForTesting
-    public synchronized Future<Void> localSyncNotified(long epoch)
+    public Future<Void> unsafeLocalSyncNotified(long epoch)
     {
         AsyncPromise<Void> promise = new AsyncPromise<>();
         getOrCreateEpochState(epoch).localSyncNotified().addCallback((result, 
failure) -> {
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordBootstrapTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordBootstrapTest.java
index 519fee327d..57e452009d 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordBootstrapTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordBootstrapTest.java
@@ -123,7 +123,7 @@ public class AccordBootstrapTest extends TestBaseImpl
         try
         {
             AccordConfigurationService configService = 
service().configurationService();
-            boolean completed = 
configService.localSyncNotified(epoch).await(30, TimeUnit.SECONDS);
+            boolean completed = 
configService.unsafeLocalSyncNotified(epoch).await(30, TimeUnit.SECONDS);
             Assert.assertTrue(String.format("Local sync notification for epoch 
%s did not become ready within timeout on %s",
                                             epoch, 
FBUtilities.getBroadcastAddressAndPort()), completed);
         }
diff --git 
a/test/simulator/test/org/apache/cassandra/simulator/test/EpochStressTest.java 
b/test/simulator/test/org/apache/cassandra/simulator/test/EpochStressTest.java
new file mode 100644
index 0000000000..1474f4da6e
--- /dev/null
+++ 
b/test/simulator/test/org/apache/cassandra/simulator/test/EpochStressTest.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.simulator.test;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.EnumMap;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.junit.Test;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import accord.api.ConfigurationService;
+import accord.local.Node;
+import accord.primitives.Ranges;
+import accord.topology.TopologyManager;
+import org.apache.cassandra.service.accord.AccordConfigurationService;
+import org.apache.cassandra.service.accord.AccordService;
+import org.apache.cassandra.service.consensus.TransactionalMode;
+import org.apache.cassandra.simulator.Action;
+import org.apache.cassandra.simulator.ActionList;
+import org.apache.cassandra.simulator.Debug;
+import org.apache.cassandra.simulator.RandomSource;
+import org.apache.cassandra.simulator.cluster.ClusterActionListener;
+import org.apache.cassandra.simulator.cluster.ClusterActions;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.ClusterMetadataService;
+import org.apache.cassandra.tcm.membership.NodeId;
+import org.apache.cassandra.utils.Clock;
+
+import static 
org.apache.cassandra.simulator.cluster.ClusterActions.InitialConfiguration.initializeAll;
+import static 
org.apache.cassandra.simulator.cluster.ClusterActions.Options.noActions;
+
+/**
+ * In order to run these tests in your IDE, you need to first build a 
simulator jara
+ *
+ *    ant simulator-jars
+ *
+ * And then run your test using the following settings (omit add-* if you are 
running on jdk8):
+ *
+ 
-Dstorage-config=/Users/dcapwell/src/github/apache/cassandra/cep-15-accord/test/conf
+ -Djava.awt.headless=true
+ 
-javaagent:/Users/dcapwell/src/github/apache/cassandra/cep-15-accord/lib/jamm-0.4.0.jar
+ -ea
+ -Dcassandra.debugrefcount=true
+ -Xss384k
+ -XX:SoftRefLRUPolicyMSPerMB=0
+ -XX:ActiveProcessorCount=2
+ -XX:HeapDumpPath=build/test
+ -Dcassandra.test.driver.connection_timeout_ms=10000
+ -Dcassandra.test.driver.read_timeout_ms=24000
+ -Dcassandra.memtable_row_overhead_computation_step=100
+ -Dcassandra.test.use_prepared=true
+ -Dcassandra.test.sstableformatdevelopment=true
+ -Djava.security.egd=file:/dev/urandom
+ -Dcassandra.testtag=.jdk11
+ -Dcassandra.keepBriefBrief=true
+ -Dcassandra.allow_simplestrategy=true
+ -Dcassandra.strict.runtime.checks=true
+ -Dcassandra.reads.thresholds.coordinator.defensive_checks_enabled=true
+ -Dcassandra.test.flush_local_schema_changes=false
+ -Dcassandra.test.messagingService.nonGracefulShutdown=true
+ -Dcassandra.use_nix_recursive_delete=true
+ -Dcie-cassandra.disable_schema_drop_log=true
+ 
-Dlogback.configurationFile=file:///Users/dcapwell/src/github/apache/cassandra/cep-15-accord/test/conf/logback-simulator.xml
+ -Dcassandra.ring_delay_ms=10000
+ -Dcassandra.tolerate_sstable_size=true
+ -Dcassandra.skip_sync=true
+ -Dcassandra.debugrefcount=false
+ -Dcassandra.test.simulator.determinismcheck=strict
+ -Dcassandra.test.simulator.print_asm=none
+ 
-javaagent:/Users/dcapwell/src/github/apache/cassandra/cep-15-accord/build/test/lib/jars/simulator-asm.jar
+ 
-Xbootclasspath/a:/Users/dcapwell/src/github/apache/cassandra/cep-15-accord/build/test/lib/jars/simulator-bootstrap.jar
+ -XX:ActiveProcessorCount=4
+ -XX:-TieredCompilation
+ -XX:-BackgroundCompilation
+ -XX:CICompilerCount=1
+ -XX:Tier4CompileThreshold=1000
+ -XX:ReservedCodeCacheSize=256M
+ -Xmx16G
+ -Xmx4G
+ --add-exports java.base/jdk.internal.misc=ALL-UNNAMED
+ --add-exports java.base/jdk.internal.ref=ALL-UNNAMED
+ --add-exports java.base/sun.nio.ch=ALL-UNNAMED
+ --add-exports java.management.rmi/com.sun.jmx.remote.internal.rmi=ALL-UNNAMED
+ --add-exports java.rmi/sun.rmi.registry=ALL-UNNAMED
+ --add-exports java.rmi/sun.rmi.server=ALL-UNNAMED
+ --add-exports java.sql/java.sql=ALL-UNNAMED
+ --add-exports java.rmi/sun.rmi.registry=ALL-UNNAMED
+ --add-opens java.base/java.lang.module=ALL-UNNAMED
+ --add-opens java.base/java.net=ALL-UNNAMED
+ --add-opens java.base/jdk.internal.loader=ALL-UNNAMED
+ --add-opens java.base/jdk.internal.ref=ALL-UNNAMED
+ --add-opens java.base/jdk.internal.reflect=ALL-UNNAMED
+ --add-opens java.base/jdk.internal.math=ALL-UNNAMED
+ --add-opens java.base/jdk.internal.module=ALL-UNNAMED
+ --add-opens java.base/jdk.internal.util.jar=ALL-UNNAMED
+ --add-opens jdk.management/com.sun.management.internal=ALL-UNNAMED
+ --add-opens jdk.management.jfr/jdk.management.jfr=ALL-UNNAMED
+ --add-opens java.desktop/com.sun.beans.introspect=ALL-UNNAMED
+ */
+public class EpochStressTest extends SimulationTestBase
+{
+    @Test
+    public void manyEpochsAndAccordConverges() throws IOException
+    {
+        simulate(simulation -> {
+                     // setup
+                     ClusterActions.Options options = 
noActions(simulation.cluster.size());
+                     ClusterActions clusterActions = new 
ClusterActions(simulation.simulated, simulation.cluster,
+                                                                        
options, new ClusterActionListener.NoOpListener(), new Debug(new 
EnumMap<>(Debug.Info.class), new int[0]));
+                     return 
ActionList.of(clusterActions.initializeCluster(initializeAll(simulation.cluster.size())),
+                                          simulation.schemaChange(1, "CREATE 
KEYSPACE ks WITH replication = {'class': 'NetworkTopologyStrategy', 
'replication_factor' : 3}"),
+                                          simulation.schemaChange(1, "CREATE 
TABLE IF NOT EXISTS ks.tbl (pk int PRIMARY KEY, v int) WITH " + 
TransactionalMode.full.asCqlParam()));
+                 },
+                 simulation -> {
+                     // test
+                     RandomSource random = simulation.simulated.random;
+                     int numEpochs = 100;
+                     List<Action> actions = new ArrayList<>(numEpochs);
+                     for (int i = 0; i < numEpochs; i++)
+                     {
+                         int node = random.uniform(1, 
simulation.cluster.size() + 1);
+                         actions.add(simulation.schemaChange(node, "ALTER 
TABLE ks.tbl WITH comment = 'step=" + i + "'"));
+                     }
+                     return ActionList.of(actions);
+                 },
+                 simulation -> {
+                     // teardown
+                     List<Action> actions = new 
ArrayList<>(simulation.cluster.size());
+                     for (int i = 0; i < simulation.cluster.size(); i++)
+                         
actions.add(HarrySimulatorTest.lazy(simulation.simulated, 
simulation.cluster.get(i + 1), EpochStressTest::validate));
+                     return ActionList.of(actions);
+                 },
+                 config -> config.nodes(3, 3)
+                                 .dcs(1, 1)
+                                 .threadCount(100));
+    }
+
+    private static void validate()
+    {
+        Logger logger = LoggerFactory.getLogger(EpochStressTest.class);
+        NodeId nodeId = ClusterMetadata.current().myNodeId();
+        long maxEpoch = 
ClusterMetadataService.instance().log().waitForHighestConsecutive().epoch.getEpoch();
+        long startNano = Clock.Global.nanoTime();
+        long deadlineNanos = startNano + TimeUnit.MINUTES.toNanos(10);
+
+        AccordService accord = (AccordService) AccordService.instance();
+        Node node = accord.node();
+        AccordConfigurationService configService = 
(AccordConfigurationService) node.configService();
+        TopologyManager tm = node.topology();
+        long minEpoch = tm.minEpoch();
+
+        logger.info("Starting validation on node {} for epochs {} -> {}", 
nodeId, minEpoch, maxEpoch);
+
+        Consumer<Supplier<String>> sleep = msg -> {
+            long now = Clock.Global.nanoTime();
+            if (now > deadlineNanos)
+                throw new AssertionError(msg.get());
+            logger.debug("Step is not ready yet: {}", msg.get());
+            Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+        };
+
+        for (long epoch = minEpoch; epoch <= maxEpoch; epoch++)
+        {
+            long finalEpoch = epoch;
+            ConfigurationService.EpochReady ready = tm.epochReady(epoch);
+            while (!isDone(ready))
+                sleep.accept(() -> "Epoch " + finalEpoch + "'s EpochReady is 
not done; " + ready);
+
+            AccordConfigurationService.EpochSnapshot snapshot = 
configService.getEpochSnapshot(epoch);
+            while (!isDone(snapshot))
+            {
+                AccordConfigurationService.SyncStatus status = 
snapshot.syncStatus;
+                sleep.accept(() -> "Epoch " + finalEpoch + "'s SyncStatus is 
not done; " + status);
+                snapshot = configService.getEpochSnapshot(epoch);
+            }
+
+            Ranges expected = 
tm.globalForEpoch(epoch).ranges().mergeTouching();
+            Ranges synced = tm.syncComplete(epoch).mergeTouching();
+            while (!isDone(synced, expected))
+            {
+                Ranges finalSynced = synced;
+                sleep.accept(() -> "Epoch " + finalEpoch + "'s syncComplete is 
not done; missing " + expected.without(finalSynced));
+                synced = tm.syncComplete(epoch).mergeTouching();
+            }
+        }
+        logger.info("All epochs completed in {}", 
Duration.ofNanos(Clock.Global.nanoTime() - startNano));
+    }
+
+    private static boolean isDone(Ranges synced, Ranges expected)
+    {
+        return synced.equals(expected);
+    }
+
+    private static boolean isDone(AccordConfigurationService.EpochSnapshot 
snapshot)
+    {
+        return snapshot.syncStatus == 
AccordConfigurationService.SyncStatus.COMPLETED;
+    }
+
+    private static boolean isDone(ConfigurationService.EpochReady ready)
+    {
+        return ready.metadata.isDone()
+               && ready.fastPath.isDone()
+               && ready.data.isDone()
+               && ready.reads.isDone();
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to