This is an automated email from the ASF dual-hosted git repository.
dcapwell pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cep-15-accord by this push:
new c9211f0a05 Accord's ConfigService lock is held over large areas which
cause deadlocks and performance issues
c9211f0a05 is described below
commit c9211f0a059644ed0fb14287c1b2e757cf1e6467
Author: David Capwell <[email protected]>
AuthorDate: Thu Oct 17 22:04:10 2024 +0100
Accord's ConfigService lock is held over large areas which cause deadlocks
and performance issues
patch by David Capwell; reviewed by Benedict Elliott Smith for
CASSANDRA-20065
---
modules/accord | 2 +-
.../service/accord/AccordConfigurationService.java | 117 +++++++----
.../test/accord/AccordBootstrapTest.java | 2 +-
.../cassandra/simulator/test/EpochStressTest.java | 231 +++++++++++++++++++++
4 files changed, 309 insertions(+), 43 deletions(-)
diff --git a/modules/accord b/modules/accord
index 8bd9d69803..a8f78552b9 160000
--- a/modules/accord
+++ b/modules/accord
@@ -1 +1 @@
-Subproject commit 8bd9d6980350fa68a1db676a7b10940cf0541fb5
+Subproject commit a8f78552b958a9d9b6fc9f2840753001cd1f6004
diff --git
a/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java
b/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java
index 08461249c3..42ac0325a8 100644
---
a/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java
+++
b/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java
@@ -26,11 +26,10 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Sets;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import accord.impl.AbstractConfigurationService;
import accord.local.Node;
@@ -67,15 +66,15 @@ import static
org.apache.cassandra.utils.Simulate.With.MONITORS;
@Simulate(with=MONITORS)
public class AccordConfigurationService extends
AbstractConfigurationService<AccordConfigurationService.EpochState,
AccordConfigurationService.EpochHistory> implements ChangeListener,
AccordEndpointMapper, AccordSyncPropagator.Listener, Shutdownable
{
- private static final Logger logger =
LoggerFactory.getLogger(AccordConfigurationService.class);
-
private final AccordSyncPropagator syncPropagator;
private final DiskStateManager diskStateManager;
+ @GuardedBy("this")
private EpochDiskState diskState = EpochDiskState.EMPTY;
private enum State { INITIALIZED, LOADING, STARTED, SHUTDOWN }
+ @GuardedBy("this")
private State state = State.INITIALIZED;
private volatile EndpointMapping mapping = EndpointMapping.EMPTY;
@@ -83,7 +82,7 @@ public class AccordConfigurationService extends
AbstractConfigurationService<Acc
static class EpochState extends
AbstractConfigurationService.AbstractEpochState
{
- SyncStatus syncStatus = SyncStatus.NOT_STARTED;
+ private volatile SyncStatus syncStatus = SyncStatus.NOT_STARTED;
protected final AsyncResult.Settable<Void> localSyncNotified =
AsyncResults.settable();
public EpochState(long epoch)
@@ -303,7 +302,7 @@ public class AccordConfigurationService extends
AbstractConfigurationService<Acc
}
@VisibleForTesting
- EpochDiskState diskState()
+ synchronized EpochDiskState diskState()
{
return diskState;
}
@@ -325,12 +324,12 @@ public class AccordConfigurationService extends
AbstractConfigurationService<Acc
Stage.MISC.submit(() -> reportMetadataInternal(metadata));
}
- synchronized void reportMetadataInternal(ClusterMetadata metadata)
+ void reportMetadataInternal(ClusterMetadata metadata)
{
reportMetadataInternal(metadata, false);
}
- synchronized void reportMetadataInternal(ClusterMetadata metadata, boolean
isLoad)
+ void reportMetadataInternal(ClusterMetadata metadata, boolean isLoad)
{
updateMapping(metadata);
Topology topology = AccordTopology.createAccordTopology(metadata);
@@ -342,12 +341,19 @@ public class AccordConfigurationService extends
AbstractConfigurationService<Acc
throw new IllegalStateException("Epoch " +
topology.epoch() + " has node " + node + " but mapping does not!");
}
}
- Topology current = isEmpty() ? Topology.EMPTY : currentTopology();
reportTopology(topology);
+ if (epochs.lastAcknowledged() >= topology.epoch())
checkIfNodesRemoved(topology);
+ else epochs.acknowledgeFuture(topology.epoch()).addCallback(() ->
checkIfNodesRemoved(topology));
+ }
+
+ private void checkIfNodesRemoved(Topology topology)
+ {
+ if (epochs.minEpoch() == topology.epoch()) return;
+ Topology previous = getTopologyForEpoch(topology.epoch() - 1);
// for all nodes removed, or pending removal, mark them as removed so
we don't wait on their replies
- Sets.SetView<Node.Id> removedNodes = Sets.difference(current.nodes(),
topology.nodes());
+ Sets.SetView<Node.Id> removedNodes = Sets.difference(previous.nodes(),
topology.nodes());
if (!removedNodes.isEmpty())
- onNodesRemoved(topology.epoch(), current, removedNodes);
+ onNodesRemoved(topology.epoch(), previous, removedNodes);
}
private static boolean shareShard(Topology current, Node.Id target,
Node.Id self)
@@ -360,7 +366,7 @@ public class AccordConfigurationService extends
AbstractConfigurationService<Acc
return false;
}
- public synchronized void onNodesRemoved(long epoch, Topology current,
Set<Node.Id> removed)
+ public void onNodesRemoved(long epoch, Topology current, Set<Node.Id>
removed)
{
if (removed.isEmpty()) return;
syncPropagator.onNodesRemoved(removed);
@@ -381,11 +387,14 @@ public class AccordConfigurationService extends
AbstractConfigurationService<Acc
private long[] nonCompletedEpochsBefore(long max)
{
LongArrayList notComplete = new LongArrayList();
- for (long epoch = epochs.minEpoch(); epoch <= max && epoch <=
epochs.maxEpoch(); epoch++)
+ synchronized (epochs)
{
- EpochSnapshot snapshot = getEpochSnapshot(epoch);
- if (snapshot.syncStatus != SyncStatus.COMPLETED)
- notComplete.add(epoch);
+ for (long epoch = epochs.minEpoch(), maxKnown = epochs.maxEpoch();
epoch <= max && epoch <= maxKnown; epoch++)
+ {
+ EpochSnapshot snapshot = getEpochSnapshot(epoch);
+ if (snapshot.syncStatus != SyncStatus.COMPLETED)
+ notComplete.add(epoch);
+ }
}
return notComplete.toLongArray();
}
@@ -394,17 +403,17 @@ public class AccordConfigurationService extends
AbstractConfigurationService<Acc
void maybeReportMetadata(ClusterMetadata metadata)
{
// don't report metadata until the previous one has been acknowledged
- synchronized (this)
+ long epoch = metadata.epoch.getEpoch();
+ synchronized (epochs)
{
- long epoch = metadata.epoch.getEpoch();
if (epochs.maxEpoch() == 0)
{
getOrCreateEpochState(epoch); // touch epoch state so
subsequent calls see it
reportMetadata(metadata);
return;
}
- getOrCreateEpochState(epoch - 1).acknowledged().addCallback(() ->
reportMetadata(metadata));
}
+ getOrCreateEpochState(epoch - 1).acknowledged().addCallback(() ->
reportMetadata(metadata));
}
@Override
@@ -439,16 +448,21 @@ public class AccordConfigurationService extends
AbstractConfigurationService<Acc
}
@Override
- protected synchronized void localSyncComplete(Topology topology, boolean
startSync)
+ protected void localSyncComplete(Topology topology, boolean startSync)
{
long epoch = topology.epoch();
EpochState epochState = getOrCreateEpochState(epoch);
- if (!startSync ||epochState.syncStatus != SyncStatus.NOT_STARTED)
+ if (!startSync || epochState.syncStatus != SyncStatus.NOT_STARTED)
return;
Set<Node.Id> notify = topology.nodes().stream().filter(i ->
!localId.equals(i)).collect(Collectors.toSet());
- diskState = diskStateManager.setNotifyingLocalSync(epoch, notify,
diskState);
- epochState.setSyncStatus(SyncStatus.NOTIFYING);
+ synchronized (this)
+ {
+ if (epochState.syncStatus != SyncStatus.NOT_STARTED)
+ return;
+ diskState = diskStateManager.setNotifyingLocalSync(epoch, notify,
diskState);
+ epochState.setSyncStatus(SyncStatus.NOTIFYING);
+ }
syncPropagator.reportSyncComplete(epoch, notify, localId);
}
@@ -462,11 +476,14 @@ public class AccordConfigurationService extends
AbstractConfigurationService<Acc
}
@Override
- public synchronized void onComplete(long epoch)
+ public void onComplete(long epoch)
{
EpochState epochState = getOrCreateEpochState(epoch);
- epochState.setSyncStatus(SyncStatus.COMPLETED);
- diskState = diskStateManager.setCompletedLocalSync(epoch, diskState);
+ synchronized (this)
+ {
+ epochState.setSyncStatus(SyncStatus.COMPLETED);
+ diskState = diskStateManager.setCompletedLocalSync(epoch,
diskState);
+ }
}
@Override
@@ -484,20 +501,21 @@ public class AccordConfigurationService extends
AbstractConfigurationService<Acc
}
@Override
- public synchronized void reportEpochClosed(Ranges ranges, long epoch)
+ public void reportEpochClosed(Ranges ranges, long epoch)
{
checkStarted();
Topology topology = getTopologyForEpoch(epoch);
syncPropagator.reportClosed(epoch, topology.nodes(), ranges);
}
+ @VisibleForTesting
public AccordSyncPropagator syncPropagator()
{
return syncPropagator;
}
@Override
- public synchronized void reportEpochRedundant(Ranges ranges, long epoch)
+ public void reportEpochRedundant(Ranges ranges, long epoch)
{
checkStarted();
// TODO (expected): ensure we aren't fetching a truncated epoch;
otherwise this should be non-null
@@ -506,21 +524,27 @@ public class AccordConfigurationService extends
AbstractConfigurationService<Acc
}
@Override
- public synchronized void receiveClosed(Ranges ranges, long epoch)
+ public void receiveClosed(Ranges ranges, long epoch)
{
- diskState = diskStateManager.markClosed(ranges, epoch, diskState);
+ synchronized (this)
+ {
+ diskState = diskStateManager.markClosed(ranges, epoch, diskState);
+ }
super.receiveClosed(ranges, epoch);
}
@Override
- public synchronized void receiveRedundant(Ranges ranges, long epoch)
+ public void receiveRedundant(Ranges ranges, long epoch)
{
- diskState = diskStateManager.markClosed(ranges, epoch, diskState);
+ synchronized (this)
+ {
+ diskState = diskStateManager.markClosed(ranges, epoch, diskState);
+ }
super.receiveRedundant(ranges, epoch);
}
@Override
- protected synchronized void truncateTopologiesPreListenerNotify(long epoch)
+ protected void truncateTopologiesPreListenerNotify(long epoch)
{
checkStarted();
}
@@ -532,7 +556,7 @@ public class AccordConfigurationService extends
AbstractConfigurationService<Acc
diskState = diskStateManager.truncateTopologyUntil(epoch,
diskState);
}
- private void checkStarted()
+ private synchronized void checkStarted()
{
State state = this.state;
Invariants.checkState(state == State.STARTED, "Expected state to be
STARTED but was %s", state);
@@ -614,28 +638,39 @@ public class AccordConfigurationService extends
AbstractConfigurationService<Acc
}
@VisibleForTesting
- public synchronized EpochSnapshot getEpochSnapshot(long epoch)
+ public EpochSnapshot getEpochSnapshot(long epoch)
{
- if (epoch < epochs.minEpoch() || epoch > epochs.maxEpoch())
- return null;
+ EpochState state;
+ // If epoch truncate happens then getting the epoch again will
recreate an empty one
+ synchronized (epochs)
+ {
+ if (epoch < epochs.minEpoch() || epoch > epochs.maxEpoch())
+ return null;
- return new EpochSnapshot(getOrCreateEpochState(epoch));
+ state = getOrCreateEpochState(epoch);
+ }
+ return new EpochSnapshot(state);
}
@VisibleForTesting
- public synchronized long minEpoch()
+ public long minEpoch()
{
return epochs.minEpoch();
}
@VisibleForTesting
- public synchronized long maxEpoch()
+ public long maxEpoch()
{
return epochs.maxEpoch();
}
+ /**
+ * The callback is resolved while holding the object lock, which can cause
the future chain to resolve while also
+ * holding the lock! This behavior is exposed for tests and is unsafe due
to the lock behind held while resolving
+ * the callback
+ */
@VisibleForTesting
- public synchronized Future<Void> localSyncNotified(long epoch)
+ public Future<Void> unsafeLocalSyncNotified(long epoch)
{
AsyncPromise<Void> promise = new AsyncPromise<>();
getOrCreateEpochState(epoch).localSyncNotified().addCallback((result,
failure) -> {
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordBootstrapTest.java
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordBootstrapTest.java
index 519fee327d..57e452009d 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordBootstrapTest.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordBootstrapTest.java
@@ -123,7 +123,7 @@ public class AccordBootstrapTest extends TestBaseImpl
try
{
AccordConfigurationService configService =
service().configurationService();
- boolean completed =
configService.localSyncNotified(epoch).await(30, TimeUnit.SECONDS);
+ boolean completed =
configService.unsafeLocalSyncNotified(epoch).await(30, TimeUnit.SECONDS);
Assert.assertTrue(String.format("Local sync notification for epoch
%s did not become ready within timeout on %s",
epoch,
FBUtilities.getBroadcastAddressAndPort()), completed);
}
diff --git
a/test/simulator/test/org/apache/cassandra/simulator/test/EpochStressTest.java
b/test/simulator/test/org/apache/cassandra/simulator/test/EpochStressTest.java
new file mode 100644
index 0000000000..1474f4da6e
--- /dev/null
+++
b/test/simulator/test/org/apache/cassandra/simulator/test/EpochStressTest.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.simulator.test;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.EnumMap;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.junit.Test;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import accord.api.ConfigurationService;
+import accord.local.Node;
+import accord.primitives.Ranges;
+import accord.topology.TopologyManager;
+import org.apache.cassandra.service.accord.AccordConfigurationService;
+import org.apache.cassandra.service.accord.AccordService;
+import org.apache.cassandra.service.consensus.TransactionalMode;
+import org.apache.cassandra.simulator.Action;
+import org.apache.cassandra.simulator.ActionList;
+import org.apache.cassandra.simulator.Debug;
+import org.apache.cassandra.simulator.RandomSource;
+import org.apache.cassandra.simulator.cluster.ClusterActionListener;
+import org.apache.cassandra.simulator.cluster.ClusterActions;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.ClusterMetadataService;
+import org.apache.cassandra.tcm.membership.NodeId;
+import org.apache.cassandra.utils.Clock;
+
+import static
org.apache.cassandra.simulator.cluster.ClusterActions.InitialConfiguration.initializeAll;
+import static
org.apache.cassandra.simulator.cluster.ClusterActions.Options.noActions;
+
+/**
+ * In order to run these tests in your IDE, you need to first build a
simulator jara
+ *
+ * ant simulator-jars
+ *
+ * And then run your test using the following settings (omit add-* if you are
running on jdk8):
+ *
+
-Dstorage-config=/Users/dcapwell/src/github/apache/cassandra/cep-15-accord/test/conf
+ -Djava.awt.headless=true
+
-javaagent:/Users/dcapwell/src/github/apache/cassandra/cep-15-accord/lib/jamm-0.4.0.jar
+ -ea
+ -Dcassandra.debugrefcount=true
+ -Xss384k
+ -XX:SoftRefLRUPolicyMSPerMB=0
+ -XX:ActiveProcessorCount=2
+ -XX:HeapDumpPath=build/test
+ -Dcassandra.test.driver.connection_timeout_ms=10000
+ -Dcassandra.test.driver.read_timeout_ms=24000
+ -Dcassandra.memtable_row_overhead_computation_step=100
+ -Dcassandra.test.use_prepared=true
+ -Dcassandra.test.sstableformatdevelopment=true
+ -Djava.security.egd=file:/dev/urandom
+ -Dcassandra.testtag=.jdk11
+ -Dcassandra.keepBriefBrief=true
+ -Dcassandra.allow_simplestrategy=true
+ -Dcassandra.strict.runtime.checks=true
+ -Dcassandra.reads.thresholds.coordinator.defensive_checks_enabled=true
+ -Dcassandra.test.flush_local_schema_changes=false
+ -Dcassandra.test.messagingService.nonGracefulShutdown=true
+ -Dcassandra.use_nix_recursive_delete=true
+ -Dcie-cassandra.disable_schema_drop_log=true
+
-Dlogback.configurationFile=file:///Users/dcapwell/src/github/apache/cassandra/cep-15-accord/test/conf/logback-simulator.xml
+ -Dcassandra.ring_delay_ms=10000
+ -Dcassandra.tolerate_sstable_size=true
+ -Dcassandra.skip_sync=true
+ -Dcassandra.debugrefcount=false
+ -Dcassandra.test.simulator.determinismcheck=strict
+ -Dcassandra.test.simulator.print_asm=none
+
-javaagent:/Users/dcapwell/src/github/apache/cassandra/cep-15-accord/build/test/lib/jars/simulator-asm.jar
+
-Xbootclasspath/a:/Users/dcapwell/src/github/apache/cassandra/cep-15-accord/build/test/lib/jars/simulator-bootstrap.jar
+ -XX:ActiveProcessorCount=4
+ -XX:-TieredCompilation
+ -XX:-BackgroundCompilation
+ -XX:CICompilerCount=1
+ -XX:Tier4CompileThreshold=1000
+ -XX:ReservedCodeCacheSize=256M
+ -Xmx16G
+ -Xmx4G
+ --add-exports java.base/jdk.internal.misc=ALL-UNNAMED
+ --add-exports java.base/jdk.internal.ref=ALL-UNNAMED
+ --add-exports java.base/sun.nio.ch=ALL-UNNAMED
+ --add-exports java.management.rmi/com.sun.jmx.remote.internal.rmi=ALL-UNNAMED
+ --add-exports java.rmi/sun.rmi.registry=ALL-UNNAMED
+ --add-exports java.rmi/sun.rmi.server=ALL-UNNAMED
+ --add-exports java.sql/java.sql=ALL-UNNAMED
+ --add-exports java.rmi/sun.rmi.registry=ALL-UNNAMED
+ --add-opens java.base/java.lang.module=ALL-UNNAMED
+ --add-opens java.base/java.net=ALL-UNNAMED
+ --add-opens java.base/jdk.internal.loader=ALL-UNNAMED
+ --add-opens java.base/jdk.internal.ref=ALL-UNNAMED
+ --add-opens java.base/jdk.internal.reflect=ALL-UNNAMED
+ --add-opens java.base/jdk.internal.math=ALL-UNNAMED
+ --add-opens java.base/jdk.internal.module=ALL-UNNAMED
+ --add-opens java.base/jdk.internal.util.jar=ALL-UNNAMED
+ --add-opens jdk.management/com.sun.management.internal=ALL-UNNAMED
+ --add-opens jdk.management.jfr/jdk.management.jfr=ALL-UNNAMED
+ --add-opens java.desktop/com.sun.beans.introspect=ALL-UNNAMED
+ */
+public class EpochStressTest extends SimulationTestBase
+{
+ @Test
+ public void manyEpochsAndAccordConverges() throws IOException
+ {
+ simulate(simulation -> {
+ // setup
+ ClusterActions.Options options =
noActions(simulation.cluster.size());
+ ClusterActions clusterActions = new
ClusterActions(simulation.simulated, simulation.cluster,
+
options, new ClusterActionListener.NoOpListener(), new Debug(new
EnumMap<>(Debug.Info.class), new int[0]));
+ return
ActionList.of(clusterActions.initializeCluster(initializeAll(simulation.cluster.size())),
+ simulation.schemaChange(1, "CREATE
KEYSPACE ks WITH replication = {'class': 'NetworkTopologyStrategy',
'replication_factor' : 3}"),
+ simulation.schemaChange(1, "CREATE
TABLE IF NOT EXISTS ks.tbl (pk int PRIMARY KEY, v int) WITH " +
TransactionalMode.full.asCqlParam()));
+ },
+ simulation -> {
+ // test
+ RandomSource random = simulation.simulated.random;
+ int numEpochs = 100;
+ List<Action> actions = new ArrayList<>(numEpochs);
+ for (int i = 0; i < numEpochs; i++)
+ {
+ int node = random.uniform(1,
simulation.cluster.size() + 1);
+ actions.add(simulation.schemaChange(node, "ALTER
TABLE ks.tbl WITH comment = 'step=" + i + "'"));
+ }
+ return ActionList.of(actions);
+ },
+ simulation -> {
+ // teardown
+ List<Action> actions = new
ArrayList<>(simulation.cluster.size());
+ for (int i = 0; i < simulation.cluster.size(); i++)
+
actions.add(HarrySimulatorTest.lazy(simulation.simulated,
simulation.cluster.get(i + 1), EpochStressTest::validate));
+ return ActionList.of(actions);
+ },
+ config -> config.nodes(3, 3)
+ .dcs(1, 1)
+ .threadCount(100));
+ }
+
+ private static void validate()
+ {
+ Logger logger = LoggerFactory.getLogger(EpochStressTest.class);
+ NodeId nodeId = ClusterMetadata.current().myNodeId();
+ long maxEpoch =
ClusterMetadataService.instance().log().waitForHighestConsecutive().epoch.getEpoch();
+ long startNano = Clock.Global.nanoTime();
+ long deadlineNanos = startNano + TimeUnit.MINUTES.toNanos(10);
+
+ AccordService accord = (AccordService) AccordService.instance();
+ Node node = accord.node();
+ AccordConfigurationService configService =
(AccordConfigurationService) node.configService();
+ TopologyManager tm = node.topology();
+ long minEpoch = tm.minEpoch();
+
+ logger.info("Starting validation on node {} for epochs {} -> {}",
nodeId, minEpoch, maxEpoch);
+
+ Consumer<Supplier<String>> sleep = msg -> {
+ long now = Clock.Global.nanoTime();
+ if (now > deadlineNanos)
+ throw new AssertionError(msg.get());
+ logger.debug("Step is not ready yet: {}", msg.get());
+ Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+ };
+
+ for (long epoch = minEpoch; epoch <= maxEpoch; epoch++)
+ {
+ long finalEpoch = epoch;
+ ConfigurationService.EpochReady ready = tm.epochReady(epoch);
+ while (!isDone(ready))
+ sleep.accept(() -> "Epoch " + finalEpoch + "'s EpochReady is
not done; " + ready);
+
+ AccordConfigurationService.EpochSnapshot snapshot =
configService.getEpochSnapshot(epoch);
+ while (!isDone(snapshot))
+ {
+ AccordConfigurationService.SyncStatus status =
snapshot.syncStatus;
+ sleep.accept(() -> "Epoch " + finalEpoch + "'s SyncStatus is
not done; " + status);
+ snapshot = configService.getEpochSnapshot(epoch);
+ }
+
+ Ranges expected =
tm.globalForEpoch(epoch).ranges().mergeTouching();
+ Ranges synced = tm.syncComplete(epoch).mergeTouching();
+ while (!isDone(synced, expected))
+ {
+ Ranges finalSynced = synced;
+ sleep.accept(() -> "Epoch " + finalEpoch + "'s syncComplete is
not done; missing " + expected.without(finalSynced));
+ synced = tm.syncComplete(epoch).mergeTouching();
+ }
+ }
+ logger.info("All epochs completed in {}",
Duration.ofNanos(Clock.Global.nanoTime() - startNano));
+ }
+
+ private static boolean isDone(Ranges synced, Ranges expected)
+ {
+ return synced.equals(expected);
+ }
+
+ private static boolean isDone(AccordConfigurationService.EpochSnapshot
snapshot)
+ {
+ return snapshot.syncStatus ==
AccordConfigurationService.SyncStatus.COMPLETED;
+ }
+
+ private static boolean isDone(ConfigurationService.EpochReady ready)
+ {
+ return ready.metadata.isDone()
+ && ready.fastPath.isDone()
+ && ready.data.isDone()
+ && ready.reads.isDone();
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]