This is an automated email from the ASF dual-hosted git repository.

edimitrova pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4f232324ba Fix HandshakeTest initiated twice for the outbound 
connection
4f232324ba is described below

commit 4f232324ba030ff640a790b4029d9f722e96872b
Author: Maxim Muzafarov <[email protected]>
AuthorDate: Tue Aug 1 22:01:55 2023 +0200

    Fix HandshakeTest initiated twice for the outbound connection
    
    patch by Maxim Muzafarov; reviewed by Jon Meredith and Mchael Semb Wever 
for CASSANDRA-18704
---
 .../org/apache/cassandra/net/HandshakeTest.java    | 47 +++++++++++++++++++++-
 1 file changed, 46 insertions(+), 1 deletion(-)

diff --git a/test/unit/org/apache/cassandra/net/HandshakeTest.java 
b/test/unit/org/apache/cassandra/net/HandshakeTest.java
index 7c536f4baf..46e399259d 100644
--- a/test/unit/org/apache/cassandra/net/HandshakeTest.java
+++ b/test/unit/org/apache/cassandra/net/HandshakeTest.java
@@ -25,6 +25,8 @@ import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
 
 import com.google.common.net.InetAddresses;
 
@@ -40,6 +42,7 @@ import org.apache.cassandra.utils.concurrent.AsyncPromise;
 
 import org.junit.AfterClass;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import io.netty.channel.EventLoop;
@@ -52,6 +55,7 @@ import static 
org.apache.cassandra.net.OutboundConnectionInitiator.*;
 
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNull;
 
 // TODO: test failure due to exception, timeout, etc
 public class HandshakeTest
@@ -59,6 +63,7 @@ public class HandshakeTest
     private static final SocketFactory factory = new SocketFactory();
     static final InetAddressAndPort TO_ADDR = 
InetAddressAndPort.getByAddressOverrideDefaults(InetAddresses.forString("127.0.0.2"),
 7012);
     static final InetAddressAndPort FROM_ADDR = 
InetAddressAndPort.getByAddressOverrideDefaults(InetAddresses.forString("127.0.0.1"),
 7012);
+    private volatile Throwable handshakeEx;
 
     @BeforeClass
     public static void startup()
@@ -73,6 +78,12 @@ public class HandshakeTest
         factory.shutdownNow();
     }
 
+    @Before
+    public void setup()
+    {
+        handshakeEx = null;
+    }
+
     private Result handshake(int outMin, int outMax) throws 
ExecutionException, InterruptedException
     {
         return handshake(new AcceptVersions(outMin, outMax), null);
@@ -298,13 +309,13 @@ public class HandshakeTest
         .withAcceptVersions(new AcceptVersions(minimum_version, 
current_version))
         .withDefaults(ConnectionCategory.MESSAGING)
         .withEncryption(getServerEncryptionOptions(connectionType, optional))
+        .withDebugCallbacks(new HandshakeAcknowledgeChecker(t -> handshakeEx = 
t))
         .withFrom(FROM_ADDR);
         OutboundConnections outboundConnections = 
OutboundConnections.tryRegister(new ConcurrentHashMap<>(), TO_ADDR, settings);
         GossipDigestSyn syn = new GossipDigestSyn("cluster", "partitioner", 
new ArrayList<>(0));
         Message<GossipDigestSyn> message = Message.out(Verb.GOSSIP_DIGEST_SYN, 
syn);
         OutboundConnection outboundConnection = 
outboundConnections.connectionFor(message);
         outboundConnection.enqueue(message);
-        outboundConnection.initiate();
         return outboundConnection;
     }
 
@@ -322,6 +333,7 @@ public class HandshakeTest
             OutboundConnection outboundConnection = initiateOutbound(endpoint, 
fromConnectionType, fromOptional);
             waitForConnection(outboundConnection);
             assertTrue(outboundConnection.isConnected());
+            assertNull(handshakeEx);
         }
         finally
         {
@@ -337,4 +349,37 @@ public class HandshakeTest
             Thread.sleep(1000);
         }
     }
+
+    private static class HandshakeAcknowledgeChecker implements 
OutboundDebugCallbacks
+    {
+        private final AtomicInteger acks = new AtomicInteger(0);
+        private final Consumer<Throwable> fail;
+
+        private HandshakeAcknowledgeChecker(Consumer<Throwable> fail)
+        {
+            this.fail = fail;
+        }
+
+        @Override
+        public void onSendSmallFrame(int messageCount, int payloadSizeInBytes)
+        {
+        }
+
+        @Override
+        public void onSentSmallFrame(int messageCount, int payloadSizeInBytes)
+        {
+        }
+
+        @Override
+        public void onFailedSmallFrame(int messageCount, int 
payloadSizeInBytes)
+        {
+        }
+
+        @Override
+        public void onConnect(int messagingVersion, OutboundConnectionSettings 
settings)
+        {
+            if (acks.incrementAndGet() > 1)
+                fail.accept(new AssertionError("Handshake was acknowledged 
more than once"));
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to