This is an automated email from the ASF dual-hosted git repository.

ycai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4a555f47ee Fix flaky testOutboundConnectionsAreRejectedWhenAuthFails
4a555f47ee is described below

commit 4a555f47ee943ce9fd70862cc8127d707e3507a2
Author: Jyothsna Konisa <[email protected]>
AuthorDate: Fri Feb 17 13:50:06 2023 -0800

    Fix flaky testOutboundConnectionsAreRejectedWhenAuthFails
    
    patch by Jyothsna Konisa; reviewed by Jon Meredith, Yifan Cai for 
CASSANDRA-17708
---
 .../cassandra/net/OutboundConnectionInitiator.java |  5 +-
 .../test/InternodeEncryptionEnforcementTest.java   | 72 ++++++++++------------
 2 files changed, 36 insertions(+), 41 deletions(-)

diff --git a/src/java/org/apache/cassandra/net/OutboundConnectionInitiator.java 
b/src/java/org/apache/cassandra/net/OutboundConnectionInitiator.java
index f8df49b598..ebd30f5406 100644
--- a/src/java/org/apache/cassandra/net/OutboundConnectionInitiator.java
+++ b/src/java/org/apache/cassandra/net/OutboundConnectionInitiator.java
@@ -147,7 +147,8 @@ public class OutboundConnectionInitiator<SuccessType 
extends OutboundConnectionI
         {
             // interrupt other connections, so they must attempt to 
re-authenticate
             MessagingService.instance().interruptOutbound(settings.to);
-            return ImmediateFuture.failure(new IOException("authentication 
failed to " + settings.connectToId()));
+            logger.error("Authentication failed to " + settings.connectToId());
+            return ImmediateFuture.failure(new IOException("Authentication 
failed to " + settings.connectToId()));
         }
 
 
@@ -281,7 +282,7 @@ public class OutboundConnectionInitiator<SuccessType 
extends OutboundConnectionI
             {
                 // interrupt other connections, so they must attempt to 
re-authenticate
                 MessagingService.instance().interruptOutbound(settings.to);
-                logger.error("authentication failed to " + 
settings.connectToId());
+                logger.error("Authentication failed to " + 
settings.connectToId());
 
                 // To release all the pending buffered data, replace 
authentication handler with discard handler.
                 // This avoids pending inbound data to be fired through the 
pipeline
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/InternodeEncryptionEnforcementTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/InternodeEncryptionEnforcementTest.java
index d13e2e4a0c..27c26fb2c6 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/InternodeEncryptionEnforcementTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/InternodeEncryptionEnforcementTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.distributed.test;
 
+import java.io.Closeable;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
@@ -43,6 +44,7 @@ import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.net.InboundMessageHandlers;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.net.OutboundConnections;
+import org.awaitility.Awaitility;
 
 import static com.google.common.collect.Iterables.getOnlyElement;
 import static org.hamcrest.Matchers.containsString;
@@ -58,26 +60,23 @@ public final class InternodeEncryptionEnforcementTest 
extends TestBaseImpl
     @Test
     public void testInboundConnectionsAreRejectedWhenAuthFails() throws 
IOException, TimeoutException
     {
+        // RejectInboundConnections authenticator is configured only for 
instance 1 of the cluster
         Cluster.Builder builder = 
createCluster(RejectInboundConnections.class);
 
         final ExecutorService executorService = 
Executors.newSingleThreadExecutor();
-        try (Cluster cluster = builder.start())
+        try (Cluster cluster = builder.start(); Closeable es = 
executorService::shutdown)
         {
             executorService.submit(() -> openConnections(cluster));
 
             /*
-             * instance (1) should not connect to instance (2) as 
authentication fails;
-             * instance (2) should not connect to instance (1) as 
authentication fails.
+             * Instance (1) should be able to make outbound connections to 
instance (2) but Instance (1) should not be
+             * accepting any inbound connections. we should wait for the 
authentication failure log on Instance (1)
              */
             SerializableRunnable runnable = () ->
             {
-                // There should be no inbound handlers as authentication fails 
and we remove handlers.
+                // There should be no inbound handlers as authentication fails 
& we remove handlers.
                 assertEquals(0, 
MessagingService.instance().messageHandlers.values().size());
 
-                // There should be no outbound connections as authentication 
fails.
-                OutboundConnections outbound = 
getOnlyElement(MessagingService.instance().channelManagers.values());
-                assertTrue(!outbound.small.isConnected() && 
!outbound.large.isConnected() && !outbound.urgent.isConnected());
-
                 // Verify that the failure is due to authentication failure
                 final RejectInboundConnections authenticator = 
(RejectInboundConnections) DatabaseDescriptor.getInternodeAuthenticator();
                 assertTrue(authenticator.authenticationFailed);
@@ -86,10 +85,8 @@ public final class InternodeEncryptionEnforcementTest 
extends TestBaseImpl
             // Wait for authentication to fail
             cluster.get(1).logs().watchFor("Unable to authenticate peer");
             cluster.get(1).runOnInstance(runnable);
-            cluster.get(2).logs().watchFor("Unable to authenticate peer");
-            cluster.get(2).runOnInstance(runnable);
+
         }
-        executorService.shutdown();
     }
 
     @Test
@@ -98,21 +95,17 @@ public final class InternodeEncryptionEnforcementTest 
extends TestBaseImpl
         Cluster.Builder builder = 
createCluster(RejectOutboundAuthenticator.class);
 
         final ExecutorService executorService = 
Executors.newSingleThreadExecutor();
-        try (Cluster cluster = builder.start())
+        try (Cluster cluster = builder.start(); Closeable es = 
executorService::shutdown)
         {
             executorService.submit(() -> openConnections(cluster));
 
             /*
-             * instance (1) should not connect to instance (2) as 
authentication fails;
-             * instance (2) should not connect to instance (1) as 
authentication fails.
+             * Instance (1) should not be able to make outbound connections to 
instance (2) but Instance (2) should be
+             * accepting outbound connections from Instance (1)
              */
             SerializableRunnable runnable = () ->
             {
-                // There should be no inbound connections as authentication 
fails.
-                InboundMessageHandlers inbound = 
getOnlyElement(MessagingService.instance().messageHandlers.values());
-                assertEquals(0, inbound.count());
-
-                // There should be no outbound connections as authentication 
fails.
+                // There should be no outbound connections as authentication 
fails on Instance (1).
                 OutboundConnections outbound = 
getOnlyElement(MessagingService.instance().channelManagers.values());
                 assertTrue(!outbound.small.isConnected() && 
!outbound.large.isConnected() && !outbound.urgent.isConnected());
 
@@ -122,31 +115,23 @@ public final class InternodeEncryptionEnforcementTest 
extends TestBaseImpl
             };
 
             // Wait for authentication to fail
-            cluster.get(1).logs().watchFor("authentication failed");
+            cluster.get(1).logs().watchFor("Authentication failed");
             cluster.get(1).runOnInstance(runnable);
-            cluster.get(2).logs().watchFor("authentication failed");
-            cluster.get(2).runOnInstance(runnable);
         }
-        executorService.shutdown();
     }
 
     @Test
     public void testOutboundConnectionsAreInterruptedWhenAuthFails() throws 
IOException, TimeoutException
     {
         Cluster.Builder builder = 
createCluster(AllowFirstAndRejectOtherOutboundAuthenticator.class);
-        try (Cluster cluster = builder.start())
+        final ExecutorService executorService = 
Executors.newSingleThreadExecutor();
+
+        try (Cluster cluster = builder.start(); Closeable es = 
executorService::shutdown)
         {
-            try
-            {
-                openConnections(cluster);
-            }
-            catch (RuntimeException ise)
-            {
-                assertThat(ise.getMessage(), containsString("agreement not 
reached"));
-            }
+            executorService.submit(() -> openConnections(cluster));
 
             // Verify that authentication is failed and Interrupt is called on 
outbound connections.
-            cluster.get(1).logs().watchFor("authentication failed to");
+            cluster.get(1).logs().watchFor("Authentication failed to");
             cluster.get(1).logs().watchFor("Interrupted outbound connections 
to");
 
             /*
@@ -161,7 +146,7 @@ public final class InternodeEncryptionEnforcementTest 
extends TestBaseImpl
 
                 // There should be no outbound connections as authentication 
fails.
                 OutboundConnections outbound = 
getOnlyElement(MessagingService.instance().channelManagers.values());
-                assertTrue(!outbound.small.isConnected() && 
!outbound.large.isConnected() && !outbound.urgent.isConnected());
+                Awaitility.await().until(() -> !outbound.small.isConnected() 
&& !outbound.large.isConnected() && !outbound.urgent.isConnected());
             };
             cluster.get(1).runOnInstance(runnable);
         }
@@ -341,7 +326,14 @@ public final class InternodeEncryptionEnforcementTest 
extends TestBaseImpl
                         encryption.put("internode_encryption", "all");
                         encryption.put("require_client_auth", "true");
                         c.set("server_encryption_options", encryption);
-                        c.set("internode_authenticator", 
authenticatorClass.getName());
+                        if (c.num() == 1)
+                        {
+                            c.set("internode_authenticator", 
authenticatorClass.getName());
+                        }
+                        else
+                        {
+                            c.set("internode_authenticator", 
AllowAllInternodeAuthenticator.class.getName());
+                        }
                     })
         .withNodeIdTopology(ImmutableMap.of(1, 
NetworkTopology.dcAndRack("dc1", "r1a"),
                                             2, 
NetworkTopology.dcAndRack("dc2", "r2a")));
@@ -434,14 +426,16 @@ public final class InternodeEncryptionEnforcementTest 
extends TestBaseImpl
         {
             if (connectionType == InternodeConnectionDirection.OUTBOUND)
             {
-                if(successfulOutbound.get() == 0) {
-                    successfulOutbound.incrementAndGet();
+                if (successfulOutbound.compareAndSet(0, 1))
+                {
                     return true;
-                } else {
+                }
+                else
+                {
                     failedOutbound.incrementAndGet();
+                    authenticationFailed = true;
                     return false;
                 }
-
             }
             return true;
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to