This is an automated email from the ASF dual-hosted git repository.

samt pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a98cb15476 Handle partitioned nodes in DiscoverySimulationTest
a98cb15476 is described below

commit a98cb154763341773eba44e2c0d465f5980565d7
Author: Sam Tunnicliffe <[email protected]>
AuthorDate: Fri Dec 13 11:24:04 2024 +0000

    Handle partitioned nodes in DiscoverySimulationTest
    
    Patch by Sam Tunnicliffe; reviewed by Brandon Williams for CASSANDRA-19505
---
 .../test/tcm/CMSPlacementAfterMoveTest.java        |  1 +
 .../cassandra/tcm/DiscoverySimulationTest.java     | 62 ++++++++++++++++------
 2 files changed, 48 insertions(+), 15 deletions(-)

diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/tcm/CMSPlacementAfterMoveTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/tcm/CMSPlacementAfterMoveTest.java
index 8b66e97c41..99b33984e5 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/tcm/CMSPlacementAfterMoveTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/tcm/CMSPlacementAfterMoveTest.java
@@ -39,6 +39,7 @@ public class CMSPlacementAfterMoveTest extends TestBaseImpl
     {
         try (Cluster cluster = init(Cluster.build(4)
                                            .withConfig(c -> 
c.with(Feature.GOSSIP, Feature.NETWORK))
+                                           .withoutVNodes()
                                            .start()))
         {
             cluster.get(1).nodetoolResult("cms", "reconfigure", 
"3").asserts().success();
diff --git a/test/unit/org/apache/cassandra/tcm/DiscoverySimulationTest.java 
b/test/unit/org/apache/cassandra/tcm/DiscoverySimulationTest.java
index f8380a6cb6..564eb98f84 100644
--- a/test/unit/org/apache/cassandra/tcm/DiscoverySimulationTest.java
+++ b/test/unit/org/apache/cassandra/tcm/DiscoverySimulationTest.java
@@ -19,19 +19,18 @@
 package org.apache.cassandra.tcm;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.lang3.NotImplementedException;
-import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -49,6 +48,9 @@ import org.apache.cassandra.tcm.log.LocalLog;
 import org.apache.cassandra.tcm.ownership.UniformRangePlacement;
 import org.apache.cassandra.utils.concurrent.Future;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
 public class DiscoverySimulationTest
 {
     static
@@ -84,6 +86,9 @@ public class DiscoverySimulationTest
         Set<InetAddressAndPort> seeds = new HashSet<>();
         seeds.add(InetAddressAndPort.getByName("127.0.100.1"));
         seeds.add(InetAddressAndPort.getByName("127.0.100.100")); // add an 
unreachable node
+        // Thread per-node to try and avoid nodes which start first finishing
+        // before later nodes have made first contact with the seed.
+        Executor executor = Executors.newFixedThreadPool(10);
         for (int i = 1; i <= 10; i++)
         {
             InetAddressAndPort addr = 
InetAddressAndPort.getByName("127.0.100." + i);
@@ -94,15 +99,33 @@ public class DiscoverySimulationTest
             messaging.handlers.put(Verb.TCM_DISCOVER_REQ, 
discovery.requestHandler);
         }
 
-        List<CompletableFuture<Discovery.DiscoveredNodes>> futures = new 
ArrayList<>();
-        for (Discovery value : nodes.values())
-            futures.add(CompletableFuture.supplyAsync(() -> 
value.discover(5)));
-
-        for (CompletableFuture<Discovery.DiscoveredNodes> future : futures)
-            future.get();
-
-        for (CompletableFuture<Discovery.DiscoveredNodes> future : futures)
-            Assert.assertEquals(nodes.keySet(), future.get().nodes());
+        Map<InetAddressAndPort, CompletableFuture<Discovery.DiscoveredNodes>> 
futures = new HashMap<>();
+        nodes.forEach((addr, discovery) -> {
+            futures.put(addr, CompletableFuture.supplyAsync(() -> 
discovery.discover(5), executor));
+        });
+
+        Map<InetAddressAndPort, Discovery.DiscoveredNodes> discovered = new 
HashMap<>();
+        for (Map.Entry<InetAddressAndPort, 
CompletableFuture<Discovery.DiscoveredNodes>> future : futures.entrySet())
+            discovered.put(future.getKey(), future.getValue().get());
+
+        // It's possible that some node(s) in the cluster were completely 
unable to contact any seed. Therefore, these
+        // nodes are undiscoverable by the rest of the cluster so we exclude 
them from the expected results. We should
+        // also expect those partitioned nodes to have failed to discover any 
of their peers.
+        Set<InetAddressAndPort> connected = new HashSet<>();
+        cluster.forEach((addr, messaging) -> {
+            if (messaging.hasSentAtLeastOneRequest)
+                connected.add(addr);
+        });
+
+        for (Map.Entry<InetAddressAndPort, Discovery.DiscoveredNodes> result : 
discovered.entrySet())
+        {
+            InetAddressAndPort node = result.getKey();
+            Set<InetAddressAndPort> peersDiscovered = 
result.getValue().nodes();
+            if (connected.contains(node))
+                assertEquals(node + " was able to contact seed, but didn't 
discover expected peers", connected, peersDiscovered);
+            else
+                assertTrue(node + " was unable to contact seed, but discovered 
" + peersDiscovered, peersDiscovered.isEmpty());
+        }
     }
 
     /**
@@ -113,12 +136,17 @@ public class DiscoverySimulationTest
      */
     public static class FakeMessageDelivery implements MessageDelivery
     {
-        private static AtomicInteger counter = new AtomicInteger();
+        private final AtomicInteger counter = new AtomicInteger();
 
         private final Map<InetAddressAndPort, FakeMessageDelivery> cluster;
         private final Map<Long, RequestCallback<?>> callbacks;
         private final Map<Verb, IVerbHandler<?>> handlers;
         private final InetAddressAndPort addr;
+        // If we're unlucky, every attempt by this node to contact the single 
live peer may
+        // be randomly chosen to fail. If this happens, no other node is able 
to discover
+        // that this one exists and so it should be excluded from the expected 
set when
+        // checking results.
+        public boolean hasSentAtLeastOneRequest = false;
 
         public FakeMessageDelivery(Map<InetAddressAndPort, 
FakeMessageDelivery> cluster,
                                    InetAddressAndPort addr)
@@ -139,6 +167,7 @@ public class DiscoverySimulationTest
         {
             if (message.verb().isResponse())
             {
+                logger.info("{} sending response to {}", addr, to);
                 
cluster.get(to).deliverResponse(Message.forgeIdentityForTests(message, addr));
                 return;
             }
@@ -154,14 +183,17 @@ public class DiscoverySimulationTest
                 FakeMessageDelivery node = cluster.get(to);
                 if (node != null &&
                     // Inject some failures
-                    counter.incrementAndGet() % 2 != 0)
+                    counter.incrementAndGet() % 5 != 0)
                 {
+                    hasSentAtLeastOneRequest = true;
+                    logger.info("{} sending request to {}", addr, to);
                     callbacks.put(message.id(), cb);
                     IVerbHandler<REQ> handler = (IVerbHandler<REQ>) 
node.handlers.get(message.verb());
                     handler.doVerb(message);
                 }
                 else
                 {
+                    logger.info("{} simulating failure sending request to {}", 
addr, to);
                     cb.onFailure(to, RequestFailureReason.TIMEOUT);
                 }
             }
@@ -186,4 +218,4 @@ public class DiscoverySimulationTest
             throw new NotImplementedException();
         }
     }
-}
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to