This is an automated email from the ASF dual-hosted git repository.
samt pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new a98cb15476 Handle partitioned nodes in DiscoverySimulationTest
a98cb15476 is described below
commit a98cb154763341773eba44e2c0d465f5980565d7
Author: Sam Tunnicliffe <[email protected]>
AuthorDate: Fri Dec 13 11:24:04 2024 +0000
Handle partitioned nodes in DiscoverySimulationTest
Patch by Sam Tunnicliffe; reviewed by Brandon Williams for CASSANDRA-19505
---
.../test/tcm/CMSPlacementAfterMoveTest.java | 1 +
.../cassandra/tcm/DiscoverySimulationTest.java | 62 ++++++++++++++++------
2 files changed, 48 insertions(+), 15 deletions(-)
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/tcm/CMSPlacementAfterMoveTest.java
b/test/distributed/org/apache/cassandra/distributed/test/tcm/CMSPlacementAfterMoveTest.java
index 8b66e97c41..99b33984e5 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/tcm/CMSPlacementAfterMoveTest.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/tcm/CMSPlacementAfterMoveTest.java
@@ -39,6 +39,7 @@ public class CMSPlacementAfterMoveTest extends TestBaseImpl
{
try (Cluster cluster = init(Cluster.build(4)
.withConfig(c ->
c.with(Feature.GOSSIP, Feature.NETWORK))
+ .withoutVNodes()
.start()))
{
cluster.get(1).nodetoolResult("cms", "reconfigure",
"3").asserts().success();
diff --git a/test/unit/org/apache/cassandra/tcm/DiscoverySimulationTest.java
b/test/unit/org/apache/cassandra/tcm/DiscoverySimulationTest.java
index f8380a6cb6..564eb98f84 100644
--- a/test/unit/org/apache/cassandra/tcm/DiscoverySimulationTest.java
+++ b/test/unit/org/apache/cassandra/tcm/DiscoverySimulationTest.java
@@ -19,19 +19,18 @@
package org.apache.cassandra.tcm;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang3.NotImplementedException;
-import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -49,6 +48,9 @@ import org.apache.cassandra.tcm.log.LocalLog;
import org.apache.cassandra.tcm.ownership.UniformRangePlacement;
import org.apache.cassandra.utils.concurrent.Future;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
public class DiscoverySimulationTest
{
static
@@ -84,6 +86,9 @@ public class DiscoverySimulationTest
Set<InetAddressAndPort> seeds = new HashSet<>();
seeds.add(InetAddressAndPort.getByName("127.0.100.1"));
seeds.add(InetAddressAndPort.getByName("127.0.100.100")); // add an
unreachable node
+ // Thread per-node to try and avoid nodes which start first finishing
+ // before later nodes have made first contact with the seed.
+ Executor executor = Executors.newFixedThreadPool(10);
for (int i = 1; i <= 10; i++)
{
InetAddressAndPort addr =
InetAddressAndPort.getByName("127.0.100." + i);
@@ -94,15 +99,33 @@ public class DiscoverySimulationTest
messaging.handlers.put(Verb.TCM_DISCOVER_REQ,
discovery.requestHandler);
}
- List<CompletableFuture<Discovery.DiscoveredNodes>> futures = new
ArrayList<>();
- for (Discovery value : nodes.values())
- futures.add(CompletableFuture.supplyAsync(() ->
value.discover(5)));
-
- for (CompletableFuture<Discovery.DiscoveredNodes> future : futures)
- future.get();
-
- for (CompletableFuture<Discovery.DiscoveredNodes> future : futures)
- Assert.assertEquals(nodes.keySet(), future.get().nodes());
+ Map<InetAddressAndPort, CompletableFuture<Discovery.DiscoveredNodes>>
futures = new HashMap<>();
+ nodes.forEach((addr, discovery) -> {
+ futures.put(addr, CompletableFuture.supplyAsync(() ->
discovery.discover(5), executor));
+ });
+
+ Map<InetAddressAndPort, Discovery.DiscoveredNodes> discovered = new
HashMap<>();
+ for (Map.Entry<InetAddressAndPort,
CompletableFuture<Discovery.DiscoveredNodes>> future : futures.entrySet())
+ discovered.put(future.getKey(), future.getValue().get());
+
+ // It's possible that some node(s) in the cluster were completely
unable to contact any seed. Therefore, these
+ // nodes are undiscoverable by the rest of the cluster so we exclude
them from the expected results. We should
+ // also expect those partitioned nodes to have failed to discover any
of their peers.
+ Set<InetAddressAndPort> connected = new HashSet<>();
+ cluster.forEach((addr, messaging) -> {
+ if (messaging.hasSentAtLeastOneRequest)
+ connected.add(addr);
+ });
+
+ for (Map.Entry<InetAddressAndPort, Discovery.DiscoveredNodes> result :
discovered.entrySet())
+ {
+ InetAddressAndPort node = result.getKey();
+ Set<InetAddressAndPort> peersDiscovered =
result.getValue().nodes();
+ if (connected.contains(node))
+ assertEquals(node + " was able to contact seed, but didn't
discover expected peers", connected, peersDiscovered);
+ else
+ assertTrue(node + " was unable to contact seed, but discovered
" + peersDiscovered, peersDiscovered.isEmpty());
+ }
}
/**
@@ -113,12 +136,17 @@ public class DiscoverySimulationTest
*/
public static class FakeMessageDelivery implements MessageDelivery
{
- private static AtomicInteger counter = new AtomicInteger();
+ private final AtomicInteger counter = new AtomicInteger();
private final Map<InetAddressAndPort, FakeMessageDelivery> cluster;
private final Map<Long, RequestCallback<?>> callbacks;
private final Map<Verb, IVerbHandler<?>> handlers;
private final InetAddressAndPort addr;
+ // If we're unlucky, every attempt by this node to contact the single
live peer may
+ // be randomly chosen to fail. If this happens, no other node is able
to discover
+ // that this one exists and so it should be excluded from the expected
set when
+ // checking results.
+ public boolean hasSentAtLeastOneRequest = false;
public FakeMessageDelivery(Map<InetAddressAndPort,
FakeMessageDelivery> cluster,
InetAddressAndPort addr)
@@ -139,6 +167,7 @@ public class DiscoverySimulationTest
{
if (message.verb().isResponse())
{
+ logger.info("{} sending response to {}", addr, to);
cluster.get(to).deliverResponse(Message.forgeIdentityForTests(message, addr));
return;
}
@@ -154,14 +183,17 @@ public class DiscoverySimulationTest
FakeMessageDelivery node = cluster.get(to);
if (node != null &&
// Inject some failures
- counter.incrementAndGet() % 2 != 0)
+ counter.incrementAndGet() % 5 != 0)
{
+ hasSentAtLeastOneRequest = true;
+ logger.info("{} sending request to {}", addr, to);
callbacks.put(message.id(), cb);
IVerbHandler<REQ> handler = (IVerbHandler<REQ>)
node.handlers.get(message.verb());
handler.doVerb(message);
}
else
{
+ logger.info("{} simulating failure sending request to {}",
addr, to);
cb.onFailure(to, RequestFailureReason.TIMEOUT);
}
}
@@ -186,4 +218,4 @@ public class DiscoverySimulationTest
throw new NotImplementedException();
}
}
-}
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]