This is an automated email from the ASF dual-hosted git repository.

absurdfarce pushed a commit to branch 3.x
in repository https://gitbox.apache.org/repos/asf/cassandra-java-driver.git


The following commit(s) were added to refs/heads/3.x by this push:
     new 1876645da JAVA-3125: Match broadcast RPC for control connection and 
Astra events
1876645da is described below

commit 1876645dab79c9e08607683fe119086fa2400ece
Author: Lukasz Antoniak <[email protected]>
AuthorDate: Wed Oct 30 09:28:14 2024 +0100

    JAVA-3125: Match broadcast RPC for control connection and Astra events
    
    patch by Lukasz Antoniak; reviewed by Bret McGuire and Andrew Tolbert for 
JAVA-3125
    reference: https://github.com/apache/cassandra-java-driver/pull/1981
    
    Fix to address Scassandra test issues
---
 .../datastax/driver/core/ControlConnection.java    | 13 +++++--
 .../driver/core/ControlConnectionTest.java         | 45 ++++++++++++++++++++++
 .../datastax/driver/core/ScassandraCluster.java    | 35 +++++++++++++----
 3 files changed, 83 insertions(+), 10 deletions(-)

diff --git 
a/driver-core/src/main/java/com/datastax/driver/core/ControlConnection.java 
b/driver-core/src/main/java/com/datastax/driver/core/ControlConnection.java
index 5ad1ee9d9..b5fd7689d 100644
--- a/driver-core/src/main/java/com/datastax/driver/core/ControlConnection.java
+++ b/driver-core/src/main/java/com/datastax/driver/core/ControlConnection.java
@@ -626,7 +626,11 @@ class ControlConnection implements Connection.Owner {
       broadcastRpcAddress = new InetSocketAddress(nativeAddress, nativePort);
     } else if (row.getColumnDefinitions().contains("rpc_address")) {
       InetAddress rpcAddress = row.getInet("rpc_address");
-      broadcastRpcAddress = new InetSocketAddress(rpcAddress, 
cluster.connectionFactory.getPort());
+      int nativePort = cluster.connectionFactory.getPort();
+      if (row.getColumnDefinitions().contains("rpc_port")) {
+        nativePort = row.getInt("rpc_port");
+      }
+      broadcastRpcAddress = new InetSocketAddress(rpcAddress, nativePort);
     }
     // Before CASSANDRA-9436, system.local doesn't have rpc_address, so this 
might be null. It's not
     // a big deal because we only use this for server events, and the control 
node doesn't receive
@@ -854,8 +858,11 @@ class ControlConnection implements Connection.Owner {
         broadcastRpcAddress = new InetSocketAddress(nativeAddress, nativePort);
       } else {
         InetAddress rpcAddress = row.getInet("rpc_address");
-        broadcastRpcAddress =
-            new InetSocketAddress(rpcAddress, 
cluster.connectionFactory.getPort());
+        int nativePort = cluster.connectionFactory.getPort();
+        if (row.getColumnDefinitions().contains("rpc_port")) {
+          nativePort = row.getInt("rpc_port");
+        }
+        broadcastRpcAddress = new InetSocketAddress(rpcAddress, nativePort);
       }
       broadcastRpcAddresses.add(broadcastRpcAddress);
 
diff --git 
a/driver-core/src/test/java/com/datastax/driver/core/ControlConnectionTest.java 
b/driver-core/src/test/java/com/datastax/driver/core/ControlConnectionTest.java
index abe4982e3..90edb28b1 100644
--- 
a/driver-core/src/test/java/com/datastax/driver/core/ControlConnectionTest.java
+++ 
b/driver-core/src/test/java/com/datastax/driver/core/ControlConnectionTest.java
@@ -20,6 +20,7 @@ package com.datastax.driver.core;
 import static com.datastax.driver.core.Assertions.assertThat;
 import static com.datastax.driver.core.CreateCCM.TestMode.PER_METHOD;
 import static com.datastax.driver.core.ScassandraCluster.SELECT_LOCAL;
+import static 
com.datastax.driver.core.ScassandraCluster.SELECT_LOCAL_RPC_ADDRESS_AND_PORT;
 import static com.datastax.driver.core.ScassandraCluster.SELECT_PEERS;
 import static com.datastax.driver.core.ScassandraCluster.SELECT_PEERS_DSE68;
 import static com.datastax.driver.core.ScassandraCluster.SELECT_PEERS_V2;
@@ -659,6 +660,50 @@ public class ControlConnectionTest extends CCMTestsSupport 
{
     runPeerTest(state);
   }
 
+  @Test(groups = "short")
+  @CCMConfig(createCcm = false)
+  public void should_extract_hosts_port_using_rpc_port_from_local() throws 
UnknownHostException {
+    InetAddress expectedAddress = InetAddress.getByName("1.2.3.4");
+    int expectedPort = 29042;
+    PeerRowState state =
+        PeerRowState.builder()
+            .local("rpc_address", expectedAddress)
+            .local("rpc_port", expectedPort)
+            .build();
+
+    ScassandraCluster scassandras =
+        
ScassandraCluster.builder().withNodes(2).withPeersV2(state.usePeersV2()).build();
+    scassandras.init();
+
+    Cluster cluster = null;
+    try {
+      scassandras.node(1).primingClient().clearAllPrimes();
+      PrimingClient primingClient = scassandras.node(1).primingClient();
+      primingClient.prime(
+          PrimingRequest.queryBuilder()
+              .withQuery("SELECT * FROM system.local WHERE key='local'")
+              .withThen(
+                  then()
+                      .withColumnTypes(SELECT_LOCAL_RPC_ADDRESS_AND_PORT)
+                      .withRows(state.getLocalRow())
+                      .build())
+              .build());
+      cluster =
+          Cluster.builder()
+              .addContactPoints(scassandras.address(1).getAddress())
+              .withPort(scassandras.getBinaryPort())
+              .withNettyOptions(nonQuietClusterCloseOptions)
+              .build();
+      cluster.connect();
+
+      
assertThat(cluster.manager.getControlConnection().connectedHost().getBroadcastRpcAddress())
+          .isEqualTo(new InetSocketAddress(expectedAddress, expectedPort));
+    } finally {
+      if (cluster != null) cluster.close();
+      scassandras.stop();
+    }
+  }
+
   private void runPeerTest(PeerRowState state) {
 
     ScassandraCluster scassandras =
diff --git 
a/driver-core/src/test/java/com/datastax/driver/core/ScassandraCluster.java 
b/driver-core/src/test/java/com/datastax/driver/core/ScassandraCluster.java
index b07a01b97..aa3a46b14 100644
--- a/driver-core/src/test/java/com/datastax/driver/core/ScassandraCluster.java
+++ b/driver-core/src/test/java/com/datastax/driver/core/ScassandraCluster.java
@@ -658,14 +658,14 @@ public class ScassandraCluster {
     column("cluster_name", TEXT),
     column("cql_version", TEXT),
     column("data_center", TEXT),
+    column("graph", BOOLEAN),
+    column("host_id", UUID),
     column("listen_address", INET),
     column("partitioner", TEXT),
     column("rack", TEXT),
     column("release_version", TEXT),
-    column("tokens", set(TEXT)),
-    column("graph", BOOLEAN),
-    column("host_id", UUID),
-    column("schema_version", UUID)
+    column("schema_version", UUID),
+    column("tokens", set(TEXT))
   };
 
   public static final org.scassandra.http.client.types.ColumnMetadata[] 
SELECT_LOCAL_V2 = {
@@ -676,15 +676,36 @@ public class ScassandraCluster {
     column("cluster_name", TEXT),
     column("cql_version", TEXT),
     column("data_center", TEXT),
+    column("graph", BOOLEAN),
+    column("host_id", UUID),
     column("listen_address", INET),
     column("listen_port", INT),
     column("partitioner", TEXT),
     column("rack", TEXT),
     column("release_version", TEXT),
-    column("tokens", set(TEXT)),
-    column("graph", BOOLEAN),
+    column("schema_version", UUID),
+    column("tokens", set(TEXT))
+  };
+
+  public static final org.scassandra.http.client.types.ColumnMetadata[]
+      SELECT_LOCAL_RPC_ADDRESS_AND_PORT = {
+    column("key", TEXT),
+    column("bootstrapped", TEXT),
+    column("broadcast_address", INET),
+    column("broadcast_port", INT),
+    column("cluster_name", TEXT),
+    column("cql_version", TEXT),
+    column("data_center", TEXT),
     column("host_id", UUID),
-    column("schema_version", UUID)
+    column("listen_address", INET),
+    column("listen_port", INT),
+    column("partitioner", TEXT),
+    column("rack", TEXT),
+    column("release_version", TEXT),
+    column("rpc_address", INET),
+    column("rpc_port", INT),
+    column("schema_version", UUID),
+    column("tokens", set(TEXT))
   };
 
   static final org.scassandra.http.client.types.ColumnMetadata[] 
SELECT_CLUSTER_NAME = {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to