This is an automated email from the ASF dual-hosted git repository.
ycai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4a555f47ee Fix flaky testOutboundConnectionsAreRejectedWhenAuthFails
4a555f47ee is described below
commit 4a555f47ee943ce9fd70862cc8127d707e3507a2
Author: Jyothsna Konisa <[email protected]>
AuthorDate: Fri Feb 17 13:50:06 2023 -0800
Fix flaky testOutboundConnectionsAreRejectedWhenAuthFails
patch by Jyothsna Konisa; reviewed by Jon Meredith, Yifan Cai for
CASSANDRA-17708
---
.../cassandra/net/OutboundConnectionInitiator.java | 5 +-
.../test/InternodeEncryptionEnforcementTest.java | 72 ++++++++++------------
2 files changed, 36 insertions(+), 41 deletions(-)
diff --git a/src/java/org/apache/cassandra/net/OutboundConnectionInitiator.java
b/src/java/org/apache/cassandra/net/OutboundConnectionInitiator.java
index f8df49b598..ebd30f5406 100644
--- a/src/java/org/apache/cassandra/net/OutboundConnectionInitiator.java
+++ b/src/java/org/apache/cassandra/net/OutboundConnectionInitiator.java
@@ -147,7 +147,8 @@ public class OutboundConnectionInitiator<SuccessType
extends OutboundConnectionI
{
// interrupt other connections, so they must attempt to
re-authenticate
MessagingService.instance().interruptOutbound(settings.to);
- return ImmediateFuture.failure(new IOException("authentication
failed to " + settings.connectToId()));
+ logger.error("Authentication failed to " + settings.connectToId());
+ return ImmediateFuture.failure(new IOException("Authentication
failed to " + settings.connectToId()));
}
@@ -281,7 +282,7 @@ public class OutboundConnectionInitiator<SuccessType
extends OutboundConnectionI
{
// interrupt other connections, so they must attempt to
re-authenticate
MessagingService.instance().interruptOutbound(settings.to);
- logger.error("authentication failed to " +
settings.connectToId());
+ logger.error("Authentication failed to " +
settings.connectToId());
// To release all the pending buffered data, replace
authentication handler with discard handler.
// This avoids pending inbound data to be fired through the
pipeline
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/InternodeEncryptionEnforcementTest.java
b/test/distributed/org/apache/cassandra/distributed/test/InternodeEncryptionEnforcementTest.java
index d13e2e4a0c..27c26fb2c6 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/InternodeEncryptionEnforcementTest.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/InternodeEncryptionEnforcementTest.java
@@ -17,6 +17,7 @@
*/
package org.apache.cassandra.distributed.test;
+import java.io.Closeable;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
@@ -43,6 +44,7 @@ import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.net.InboundMessageHandlers;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.OutboundConnections;
+import org.awaitility.Awaitility;
import static com.google.common.collect.Iterables.getOnlyElement;
import static org.hamcrest.Matchers.containsString;
@@ -58,26 +60,23 @@ public final class InternodeEncryptionEnforcementTest
extends TestBaseImpl
@Test
public void testInboundConnectionsAreRejectedWhenAuthFails() throws
IOException, TimeoutException
{
+ // RejectInboundConnections authenticator is configured only for
instance 1 of the cluster
Cluster.Builder builder =
createCluster(RejectInboundConnections.class);
final ExecutorService executorService =
Executors.newSingleThreadExecutor();
- try (Cluster cluster = builder.start())
+ try (Cluster cluster = builder.start(); Closeable es =
executorService::shutdown)
{
executorService.submit(() -> openConnections(cluster));
/*
- * instance (1) should not connect to instance (2) as
authentication fails;
- * instance (2) should not connect to instance (1) as
authentication fails.
+ * Instance (1) should be able to make outbound connections to
instance (2) but Instance (1) should not be
+ * accepting any inbound connections. we should wait for the
authentication failure log on Instance (1)
*/
SerializableRunnable runnable = () ->
{
- // There should be no inbound handlers as authentication fails
and we remove handlers.
+ // There should be no inbound handlers as authentication fails
& we remove handlers.
assertEquals(0,
MessagingService.instance().messageHandlers.values().size());
- // There should be no outbound connections as authentication
fails.
- OutboundConnections outbound =
getOnlyElement(MessagingService.instance().channelManagers.values());
- assertTrue(!outbound.small.isConnected() &&
!outbound.large.isConnected() && !outbound.urgent.isConnected());
-
// Verify that the failure is due to authentication failure
final RejectInboundConnections authenticator =
(RejectInboundConnections) DatabaseDescriptor.getInternodeAuthenticator();
assertTrue(authenticator.authenticationFailed);
@@ -86,10 +85,8 @@ public final class InternodeEncryptionEnforcementTest
extends TestBaseImpl
// Wait for authentication to fail
cluster.get(1).logs().watchFor("Unable to authenticate peer");
cluster.get(1).runOnInstance(runnable);
- cluster.get(2).logs().watchFor("Unable to authenticate peer");
- cluster.get(2).runOnInstance(runnable);
+
}
- executorService.shutdown();
}
@Test
@@ -98,21 +95,17 @@ public final class InternodeEncryptionEnforcementTest
extends TestBaseImpl
Cluster.Builder builder =
createCluster(RejectOutboundAuthenticator.class);
final ExecutorService executorService =
Executors.newSingleThreadExecutor();
- try (Cluster cluster = builder.start())
+ try (Cluster cluster = builder.start(); Closeable es =
executorService::shutdown)
{
executorService.submit(() -> openConnections(cluster));
/*
- * instance (1) should not connect to instance (2) as
authentication fails;
- * instance (2) should not connect to instance (1) as
authentication fails.
+ * Instance (1) should not be able to make outbound connections to
instance (2) but Instance (2) should be
+ * accepting outbound connections from Instance (1)
*/
SerializableRunnable runnable = () ->
{
- // There should be no inbound connections as authentication
fails.
- InboundMessageHandlers inbound =
getOnlyElement(MessagingService.instance().messageHandlers.values());
- assertEquals(0, inbound.count());
-
- // There should be no outbound connections as authentication
fails.
+ // There should be no outbound connections as authentication
fails on Instance (1).
OutboundConnections outbound =
getOnlyElement(MessagingService.instance().channelManagers.values());
assertTrue(!outbound.small.isConnected() &&
!outbound.large.isConnected() && !outbound.urgent.isConnected());
@@ -122,31 +115,23 @@ public final class InternodeEncryptionEnforcementTest
extends TestBaseImpl
};
// Wait for authentication to fail
- cluster.get(1).logs().watchFor("authentication failed");
+ cluster.get(1).logs().watchFor("Authentication failed");
cluster.get(1).runOnInstance(runnable);
- cluster.get(2).logs().watchFor("authentication failed");
- cluster.get(2).runOnInstance(runnable);
}
- executorService.shutdown();
}
@Test
public void testOutboundConnectionsAreInterruptedWhenAuthFails() throws
IOException, TimeoutException
{
Cluster.Builder builder =
createCluster(AllowFirstAndRejectOtherOutboundAuthenticator.class);
- try (Cluster cluster = builder.start())
+ final ExecutorService executorService =
Executors.newSingleThreadExecutor();
+
+ try (Cluster cluster = builder.start(); Closeable es =
executorService::shutdown)
{
- try
- {
- openConnections(cluster);
- }
- catch (RuntimeException ise)
- {
- assertThat(ise.getMessage(), containsString("agreement not
reached"));
- }
+ executorService.submit(() -> openConnections(cluster));
// Verify that authentication is failed and Interrupt is called on
outbound connections.
- cluster.get(1).logs().watchFor("authentication failed to");
+ cluster.get(1).logs().watchFor("Authentication failed to");
cluster.get(1).logs().watchFor("Interrupted outbound connections
to");
/*
@@ -161,7 +146,7 @@ public final class InternodeEncryptionEnforcementTest
extends TestBaseImpl
// There should be no outbound connections as authentication
fails.
OutboundConnections outbound =
getOnlyElement(MessagingService.instance().channelManagers.values());
- assertTrue(!outbound.small.isConnected() &&
!outbound.large.isConnected() && !outbound.urgent.isConnected());
+ Awaitility.await().until(() -> !outbound.small.isConnected()
&& !outbound.large.isConnected() && !outbound.urgent.isConnected());
};
cluster.get(1).runOnInstance(runnable);
}
@@ -341,7 +326,14 @@ public final class InternodeEncryptionEnforcementTest
extends TestBaseImpl
encryption.put("internode_encryption", "all");
encryption.put("require_client_auth", "true");
c.set("server_encryption_options", encryption);
- c.set("internode_authenticator",
authenticatorClass.getName());
+ if (c.num() == 1)
+ {
+ c.set("internode_authenticator",
authenticatorClass.getName());
+ }
+ else
+ {
+ c.set("internode_authenticator",
AllowAllInternodeAuthenticator.class.getName());
+ }
})
.withNodeIdTopology(ImmutableMap.of(1,
NetworkTopology.dcAndRack("dc1", "r1a"),
2,
NetworkTopology.dcAndRack("dc2", "r2a")));
@@ -434,14 +426,16 @@ public final class InternodeEncryptionEnforcementTest
extends TestBaseImpl
{
if (connectionType == InternodeConnectionDirection.OUTBOUND)
{
- if(successfulOutbound.get() == 0) {
- successfulOutbound.incrementAndGet();
+ if (successfulOutbound.compareAndSet(0, 1))
+ {
return true;
- } else {
+ }
+ else
+ {
failedOutbound.incrementAndGet();
+ authenticationFailed = true;
return false;
}
-
}
return true;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]