This is an automated email from the ASF dual-hosted git repository.
edimitrova pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4f232324ba Fix HandshakeTest initiated twice for the outbound
connection
4f232324ba is described below
commit 4f232324ba030ff640a790b4029d9f722e96872b
Author: Maxim Muzafarov <[email protected]>
AuthorDate: Tue Aug 1 22:01:55 2023 +0200
Fix HandshakeTest initiated twice for the outbound connection
patch by Maxim Muzafarov; reviewed by Jon Meredith and Mchael Semb Wever
for CASSANDRA-18704
---
.../org/apache/cassandra/net/HandshakeTest.java | 47 +++++++++++++++++++++-
1 file changed, 46 insertions(+), 1 deletion(-)
diff --git a/test/unit/org/apache/cassandra/net/HandshakeTest.java
b/test/unit/org/apache/cassandra/net/HandshakeTest.java
index 7c536f4baf..46e399259d 100644
--- a/test/unit/org/apache/cassandra/net/HandshakeTest.java
+++ b/test/unit/org/apache/cassandra/net/HandshakeTest.java
@@ -25,6 +25,8 @@ import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
import com.google.common.net.InetAddresses;
@@ -40,6 +42,7 @@ import org.apache.cassandra.utils.concurrent.AsyncPromise;
import org.junit.AfterClass;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import io.netty.channel.EventLoop;
@@ -52,6 +55,7 @@ import static
org.apache.cassandra.net.OutboundConnectionInitiator.*;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNull;
// TODO: test failure due to exception, timeout, etc
public class HandshakeTest
@@ -59,6 +63,7 @@ public class HandshakeTest
private static final SocketFactory factory = new SocketFactory();
static final InetAddressAndPort TO_ADDR =
InetAddressAndPort.getByAddressOverrideDefaults(InetAddresses.forString("127.0.0.2"),
7012);
static final InetAddressAndPort FROM_ADDR =
InetAddressAndPort.getByAddressOverrideDefaults(InetAddresses.forString("127.0.0.1"),
7012);
+ private volatile Throwable handshakeEx;
@BeforeClass
public static void startup()
@@ -73,6 +78,12 @@ public class HandshakeTest
factory.shutdownNow();
}
+ @Before
+ public void setup()
+ {
+ handshakeEx = null;
+ }
+
private Result handshake(int outMin, int outMax) throws
ExecutionException, InterruptedException
{
return handshake(new AcceptVersions(outMin, outMax), null);
@@ -298,13 +309,13 @@ public class HandshakeTest
.withAcceptVersions(new AcceptVersions(minimum_version,
current_version))
.withDefaults(ConnectionCategory.MESSAGING)
.withEncryption(getServerEncryptionOptions(connectionType, optional))
+ .withDebugCallbacks(new HandshakeAcknowledgeChecker(t -> handshakeEx =
t))
.withFrom(FROM_ADDR);
OutboundConnections outboundConnections =
OutboundConnections.tryRegister(new ConcurrentHashMap<>(), TO_ADDR, settings);
GossipDigestSyn syn = new GossipDigestSyn("cluster", "partitioner",
new ArrayList<>(0));
Message<GossipDigestSyn> message = Message.out(Verb.GOSSIP_DIGEST_SYN,
syn);
OutboundConnection outboundConnection =
outboundConnections.connectionFor(message);
outboundConnection.enqueue(message);
- outboundConnection.initiate();
return outboundConnection;
}
@@ -322,6 +333,7 @@ public class HandshakeTest
OutboundConnection outboundConnection = initiateOutbound(endpoint,
fromConnectionType, fromOptional);
waitForConnection(outboundConnection);
assertTrue(outboundConnection.isConnected());
+ assertNull(handshakeEx);
}
finally
{
@@ -337,4 +349,37 @@ public class HandshakeTest
Thread.sleep(1000);
}
}
+
+ private static class HandshakeAcknowledgeChecker implements
OutboundDebugCallbacks
+ {
+ private final AtomicInteger acks = new AtomicInteger(0);
+ private final Consumer<Throwable> fail;
+
+ private HandshakeAcknowledgeChecker(Consumer<Throwable> fail)
+ {
+ this.fail = fail;
+ }
+
+ @Override
+ public void onSendSmallFrame(int messageCount, int payloadSizeInBytes)
+ {
+ }
+
+ @Override
+ public void onSentSmallFrame(int messageCount, int payloadSizeInBytes)
+ {
+ }
+
+ @Override
+ public void onFailedSmallFrame(int messageCount, int
payloadSizeInBytes)
+ {
+ }
+
+ @Override
+ public void onConnect(int messagingVersion, OutboundConnectionSettings
settings)
+ {
+ if (acks.incrementAndGet() > 1)
+ fail.accept(new AssertionError("Handshake was acknowledged
more than once"));
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]