This is an automated email from the ASF dual-hosted git repository.
ifesdjeen pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
The following commit(s) were added to refs/heads/trunk by this push:
new a4a8255b Fix AccordMigrationTest, preclude possible races in topology
propagation
a4a8255b is described below
commit a4a8255bebb6804b8f55289f2fbc846a5253e66c
Author: Alex Petrov <[email protected]>
AuthorDate: Fri Feb 7 19:03:21 2025 +0100
Fix AccordMigrationTest, preclude possible races in topology propagation
Patch by Alex Petrov; reviewed by Benedict Elliott Smith for CASSANDRA-20316
---
.../accord/impl/AbstractConfigurationService.java | 44 +++++-----------------
accord-core/src/main/java/accord/local/Node.java | 11 +++---
.../main/java/accord/topology/TopologyManager.java | 20 ++++++++--
.../accord/burn/BurnTestConfigurationService.java | 15 +++++++-
.../impl/AbstractConfigurationServiceTest.java | 2 +-
5 files changed, 47 insertions(+), 45 deletions(-)
diff --git
a/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java
b/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java
index 92048bd9..fe8046fe 100644
--- a/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java
+++ b/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java
@@ -21,7 +21,6 @@ package accord.impl;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
-
import javax.annotation.concurrent.GuardedBy;
import com.google.common.annotations.VisibleForTesting;
@@ -285,35 +284,8 @@ public abstract class
AbstractConfigurationService<EpochState extends AbstractCo
return epochs.topologyFor(epoch);
}
- protected abstract void fetchTopologyInternal(long epoch);
-
- @GuardedBy("this")
- private long maxRequestedEpoch;
@Override
- public void fetchTopologyForEpoch(long epoch)
- {
- synchronized (this)
- {
- if (epoch <= maxRequestedEpoch)
- return;
-
- maxRequestedEpoch = epoch;
- }
-
- try
- {
- fetchTopologyInternal(epoch);
- }
- catch (Throwable t)
- {
- // This epoch will not be fetched, so we need to reset it back
- synchronized (this)
- {
- maxRequestedEpoch = 0;
- }
- throw t;
- }
- }
+ public abstract void fetchTopologyForEpoch(long epoch);
// TODO (expected): rename, sync is too ambiguous
protected abstract void localSyncComplete(Topology topology, boolean
startSync);
@@ -339,8 +311,8 @@ public abstract class
AbstractConfigurationService<EpochState extends AbstractCo
if (lastReceived > 0 && topology.epoch() > lastReceived + 1)
{
logger.debug("Epoch {} received; waiting to receive {} before
reporting", topology.epoch(), lastReceived + 1);
- fetchTopologyForEpoch(lastReceived + 1);
epochs.receiveFuture(lastReceived + 1).addCallback(() ->
reportTopology(topology, isLoad, startSync));
+ fetchTopologyForEpoch(lastReceived + 1);
return;
}
@@ -404,13 +376,17 @@ public abstract class
AbstractConfigurationService<EpochState extends AbstractCo
epochs.truncateUntil(epoch);
}
+ // synchronized because state.reads is written
public AsyncChain<Void> epochReady(long epoch)
{
- EpochState state = epochs.getOrCreate(epoch);
- if (state.reads != null)
- return state.reads;
+ synchronized (this)
+ {
+ EpochState state = epochs.getOrCreate(epoch);
+ if (state.reads != null)
+ return state.reads;
- return state.acknowledged.flatMap(r -> state.reads);
+ return state.acknowledged.flatMap(r -> state.reads);
+ }
}
public abstract static class Minimal extends
AbstractConfigurationService<Minimal.EpochState, Minimal.EpochHistory>
diff --git a/accord-core/src/main/java/accord/local/Node.java
b/accord-core/src/main/java/accord/local/Node.java
index 21b57487..c89c3544 100644
--- a/accord-core/src/main/java/accord/local/Node.java
+++ b/accord-core/src/main/java/accord/local/Node.java
@@ -96,8 +96,8 @@ import accord.utils.async.AsyncResults;
import accord.utils.async.Cancellable;
import net.nicoulaj.compilecommand.annotations.Inline;
-import static accord.api.ProtocolModifiers.Toggles.ensurePermitted;
import static accord.api.ProtocolModifiers.Toggles.defaultMediumPath;
+import static accord.api.ProtocolModifiers.Toggles.ensurePermitted;
import static accord.api.ProtocolModifiers.Toggles.usePrivilegedCoordinator;
import static accord.primitives.Routable.Domain.Key;
import static accord.primitives.TxnId.Cardinality.Any;
@@ -402,8 +402,8 @@ public class Node implements ConfigurationService.Listener,
NodeCommandStoreServ
}
else
{
- configService.fetchTopologyForEpoch(epoch);
topology.awaitEpoch(epoch).begin(callback);
+ configService.fetchTopologyForEpoch(epoch);
}
}
@@ -415,11 +415,11 @@ public class Node implements
ConfigurationService.Listener, NodeCommandStoreServ
}
else
{
- configService.fetchTopologyForEpoch(epoch);
topology.awaitEpoch(epoch).begin((success, fail) -> {
if (fail != null) ifFailure.accept(null, fail);
else ifSuccess.run();
});
+ configService.fetchTopologyForEpoch(epoch);
}
}
@@ -431,11 +431,11 @@ public class Node implements
ConfigurationService.Listener, NodeCommandStoreServ
}
else
{
- configService.fetchTopologyForEpoch(epoch);
topology.awaitEpoch(epoch).begin((success, fail) -> {
if (fail != null) ifFailure.accept(null,
onFailure.apply(fail));
else ifSuccess.run();
});
+ configService.fetchTopologyForEpoch(epoch);
}
}
@@ -448,8 +448,9 @@ public class Node implements ConfigurationService.Listener,
NodeCommandStoreServ
}
else
{
+ AsyncChain<T> res = topology.awaitEpoch(epoch).flatMap(ignore ->
supplier.get());
configService.fetchTopologyForEpoch(epoch);
- return topology.awaitEpoch(epoch).flatMap(ignore ->
supplier.get());
+ return res;
}
}
diff --git a/accord-core/src/main/java/accord/topology/TopologyManager.java
b/accord-core/src/main/java/accord/topology/TopologyManager.java
index 999b6ee4..88460f2e 100644
--- a/accord-core/src/main/java/accord/topology/TopologyManager.java
+++ b/accord-core/src/main/java/accord/topology/TopologyManager.java
@@ -34,6 +34,9 @@ import javax.annotation.concurrent.GuardedBy;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import accord.api.Agent;
import accord.api.ConfigurationService;
import accord.api.ConfigurationService.EpochReady;
@@ -88,6 +91,7 @@ import static accord.utils.Invariants.nonNull;
public class TopologyManager
{
private static final FutureEpoch SUCCESS;
+ private static final Logger logger =
LoggerFactory.getLogger(TopologyManager.class);
static
{
@@ -250,6 +254,11 @@ public class TopologyManager
this.currentEpoch = epochs.length > 0 ? epochs[0].epoch() : 0;
this.pending = pending;
this.futureEpochs = futureEpochs;
+ if (!futureEpochs.isEmpty())
+ Invariants.require(futureEpochs.get(0).epoch == currentEpoch +
1);
+
+ for (int i = 1; i < futureEpochs.size(); i++)
+ Invariants.requireArgument(futureEpochs.get(i).epoch ==
futureEpochs.get(i - 1).epoch - 1);
for (int i=1; i<epochs.length; i++)
Invariants.requireArgument(epochs[i].epoch() == epochs[i -
1].epoch() - 1);
this.epochs = epochs;
@@ -619,7 +628,6 @@ public class TopologyManager
public synchronized EpochReady onTopologyUpdate(Topology topology,
Supplier<EpochReady> bootstrap)
{
Epochs current = epochs;
-
Invariants.requireArgument(topology.epoch == current.nextEpoch() ||
epochs == Epochs.EMPTY,
"Expected topology update %d to be %d",
topology.epoch, current.nextEpoch());
EpochState[] nextEpochs = new EpochState[current.epochs.length + 1];
@@ -1071,12 +1079,18 @@ public class TopologyManager
public boolean hasEpoch(long epoch)
{
- return epochs.get(epoch) != null;
+ synchronized (this)
+ {
+ return epochs.get(epoch) != null;
+ }
}
public boolean hasAtLeastEpoch(long epoch)
{
- return epochs.currentEpoch >= epoch;
+ synchronized (this)
+ {
+ return epochs.currentEpoch >= epoch;
+ }
}
public Topology localForEpoch(long epoch)
diff --git
a/accord-core/src/test/java/accord/burn/BurnTestConfigurationService.java
b/accord-core/src/test/java/accord/burn/BurnTestConfigurationService.java
index 7e3b23e9..a506b9de 100644
--- a/accord-core/src/test/java/accord/burn/BurnTestConfigurationService.java
+++ b/accord-core/src/test/java/accord/burn/BurnTestConfigurationService.java
@@ -25,6 +25,7 @@ import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
+import javax.annotation.concurrent.GuardedBy;
import accord.api.TestableConfigurationService;
import accord.impl.AbstractConfigurationService;
@@ -165,9 +166,19 @@ public class BurnTestConfigurationService extends
AbstractConfigurationService.M
else node.scheduler().selfRecurring(() ->
super.reportTopology(topology, isLoad, startSync), 0, TimeUnit.MILLISECONDS);
}
- @Override
- protected void fetchTopologyInternal(long epoch)
+ @GuardedBy("this")
+ private long maxRequestedEpoch;
+
+ public void fetchTopologyForEpoch(long epoch)
{
+ synchronized (this)
+ {
+ if (epoch <= maxRequestedEpoch)
+ return;
+
+ maxRequestedEpoch = epoch;
+ }
+
pendingEpochs.computeIfAbsent(epoch, FetchTopology::new);
}
diff --git
a/accord-core/src/test/java/accord/impl/AbstractConfigurationServiceTest.java
b/accord-core/src/test/java/accord/impl/AbstractConfigurationServiceTest.java
index 03b142d2..95ac1bc0 100644
---
a/accord-core/src/test/java/accord/impl/AbstractConfigurationServiceTest.java
+++
b/accord-core/src/test/java/accord/impl/AbstractConfigurationServiceTest.java
@@ -143,7 +143,7 @@ public class AbstractConfigurationServiceTest
}
@Override
- protected void fetchTopologyInternal(long epoch)
+ public void fetchTopologyForEpoch(long epoch)
{
epochsFetched.add(epoch);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]