This is an automated email from the ASF dual-hosted git repository.
absurdfarce pushed a commit to branch 3.x
in repository https://gitbox.apache.org/repos/asf/cassandra-java-driver.git
The following commit(s) were added to refs/heads/3.x by this push:
new 1876645da JAVA-3125: Match broadcast RPC for control connection and
Astra events
1876645da is described below
commit 1876645dab79c9e08607683fe119086fa2400ece
Author: Lukasz Antoniak <[email protected]>
AuthorDate: Wed Oct 30 09:28:14 2024 +0100
JAVA-3125: Match broadcast RPC for control connection and Astra events
patch by Lukasz Antoniak; reviewed by Bret McGuire and Andrew Tolbert for
JAVA-3125
reference: https://github.com/apache/cassandra-java-driver/pull/1981
Fix to address Scassandra test issues
---
.../datastax/driver/core/ControlConnection.java | 13 +++++--
.../driver/core/ControlConnectionTest.java | 45 ++++++++++++++++++++++
.../datastax/driver/core/ScassandraCluster.java | 35 +++++++++++++----
3 files changed, 83 insertions(+), 10 deletions(-)
diff --git
a/driver-core/src/main/java/com/datastax/driver/core/ControlConnection.java
b/driver-core/src/main/java/com/datastax/driver/core/ControlConnection.java
index 5ad1ee9d9..b5fd7689d 100644
--- a/driver-core/src/main/java/com/datastax/driver/core/ControlConnection.java
+++ b/driver-core/src/main/java/com/datastax/driver/core/ControlConnection.java
@@ -626,7 +626,11 @@ class ControlConnection implements Connection.Owner {
broadcastRpcAddress = new InetSocketAddress(nativeAddress, nativePort);
} else if (row.getColumnDefinitions().contains("rpc_address")) {
InetAddress rpcAddress = row.getInet("rpc_address");
- broadcastRpcAddress = new InetSocketAddress(rpcAddress,
cluster.connectionFactory.getPort());
+ int nativePort = cluster.connectionFactory.getPort();
+ if (row.getColumnDefinitions().contains("rpc_port")) {
+ nativePort = row.getInt("rpc_port");
+ }
+ broadcastRpcAddress = new InetSocketAddress(rpcAddress, nativePort);
}
// Before CASSANDRA-9436, system.local doesn't have rpc_address, so this
might be null. It's not
// a big deal because we only use this for server events, and the control
node doesn't receive
@@ -854,8 +858,11 @@ class ControlConnection implements Connection.Owner {
broadcastRpcAddress = new InetSocketAddress(nativeAddress, nativePort);
} else {
InetAddress rpcAddress = row.getInet("rpc_address");
- broadcastRpcAddress =
- new InetSocketAddress(rpcAddress,
cluster.connectionFactory.getPort());
+ int nativePort = cluster.connectionFactory.getPort();
+ if (row.getColumnDefinitions().contains("rpc_port")) {
+ nativePort = row.getInt("rpc_port");
+ }
+ broadcastRpcAddress = new InetSocketAddress(rpcAddress, nativePort);
}
broadcastRpcAddresses.add(broadcastRpcAddress);
diff --git
a/driver-core/src/test/java/com/datastax/driver/core/ControlConnectionTest.java
b/driver-core/src/test/java/com/datastax/driver/core/ControlConnectionTest.java
index abe4982e3..90edb28b1 100644
---
a/driver-core/src/test/java/com/datastax/driver/core/ControlConnectionTest.java
+++
b/driver-core/src/test/java/com/datastax/driver/core/ControlConnectionTest.java
@@ -20,6 +20,7 @@ package com.datastax.driver.core;
import static com.datastax.driver.core.Assertions.assertThat;
import static com.datastax.driver.core.CreateCCM.TestMode.PER_METHOD;
import static com.datastax.driver.core.ScassandraCluster.SELECT_LOCAL;
+import static
com.datastax.driver.core.ScassandraCluster.SELECT_LOCAL_RPC_ADDRESS_AND_PORT;
import static com.datastax.driver.core.ScassandraCluster.SELECT_PEERS;
import static com.datastax.driver.core.ScassandraCluster.SELECT_PEERS_DSE68;
import static com.datastax.driver.core.ScassandraCluster.SELECT_PEERS_V2;
@@ -659,6 +660,50 @@ public class ControlConnectionTest extends CCMTestsSupport
{
runPeerTest(state);
}
+ @Test(groups = "short")
+ @CCMConfig(createCcm = false)
+ public void should_extract_hosts_port_using_rpc_port_from_local() throws
UnknownHostException {
+ InetAddress expectedAddress = InetAddress.getByName("1.2.3.4");
+ int expectedPort = 29042;
+ PeerRowState state =
+ PeerRowState.builder()
+ .local("rpc_address", expectedAddress)
+ .local("rpc_port", expectedPort)
+ .build();
+
+ ScassandraCluster scassandras =
+
ScassandraCluster.builder().withNodes(2).withPeersV2(state.usePeersV2()).build();
+ scassandras.init();
+
+ Cluster cluster = null;
+ try {
+ scassandras.node(1).primingClient().clearAllPrimes();
+ PrimingClient primingClient = scassandras.node(1).primingClient();
+ primingClient.prime(
+ PrimingRequest.queryBuilder()
+ .withQuery("SELECT * FROM system.local WHERE key='local'")
+ .withThen(
+ then()
+ .withColumnTypes(SELECT_LOCAL_RPC_ADDRESS_AND_PORT)
+ .withRows(state.getLocalRow())
+ .build())
+ .build());
+ cluster =
+ Cluster.builder()
+ .addContactPoints(scassandras.address(1).getAddress())
+ .withPort(scassandras.getBinaryPort())
+ .withNettyOptions(nonQuietClusterCloseOptions)
+ .build();
+ cluster.connect();
+
+
assertThat(cluster.manager.getControlConnection().connectedHost().getBroadcastRpcAddress())
+ .isEqualTo(new InetSocketAddress(expectedAddress, expectedPort));
+ } finally {
+ if (cluster != null) cluster.close();
+ scassandras.stop();
+ }
+ }
+
private void runPeerTest(PeerRowState state) {
ScassandraCluster scassandras =
diff --git
a/driver-core/src/test/java/com/datastax/driver/core/ScassandraCluster.java
b/driver-core/src/test/java/com/datastax/driver/core/ScassandraCluster.java
index b07a01b97..aa3a46b14 100644
--- a/driver-core/src/test/java/com/datastax/driver/core/ScassandraCluster.java
+++ b/driver-core/src/test/java/com/datastax/driver/core/ScassandraCluster.java
@@ -658,14 +658,14 @@ public class ScassandraCluster {
column("cluster_name", TEXT),
column("cql_version", TEXT),
column("data_center", TEXT),
+ column("graph", BOOLEAN),
+ column("host_id", UUID),
column("listen_address", INET),
column("partitioner", TEXT),
column("rack", TEXT),
column("release_version", TEXT),
- column("tokens", set(TEXT)),
- column("graph", BOOLEAN),
- column("host_id", UUID),
- column("schema_version", UUID)
+ column("schema_version", UUID),
+ column("tokens", set(TEXT))
};
public static final org.scassandra.http.client.types.ColumnMetadata[]
SELECT_LOCAL_V2 = {
@@ -676,15 +676,36 @@ public class ScassandraCluster {
column("cluster_name", TEXT),
column("cql_version", TEXT),
column("data_center", TEXT),
+ column("graph", BOOLEAN),
+ column("host_id", UUID),
column("listen_address", INET),
column("listen_port", INT),
column("partitioner", TEXT),
column("rack", TEXT),
column("release_version", TEXT),
- column("tokens", set(TEXT)),
- column("graph", BOOLEAN),
+ column("schema_version", UUID),
+ column("tokens", set(TEXT))
+ };
+
+ public static final org.scassandra.http.client.types.ColumnMetadata[]
+ SELECT_LOCAL_RPC_ADDRESS_AND_PORT = {
+ column("key", TEXT),
+ column("bootstrapped", TEXT),
+ column("broadcast_address", INET),
+ column("broadcast_port", INT),
+ column("cluster_name", TEXT),
+ column("cql_version", TEXT),
+ column("data_center", TEXT),
column("host_id", UUID),
- column("schema_version", UUID)
+ column("listen_address", INET),
+ column("listen_port", INT),
+ column("partitioner", TEXT),
+ column("rack", TEXT),
+ column("release_version", TEXT),
+ column("rpc_address", INET),
+ column("rpc_port", INT),
+ column("schema_version", UUID),
+ column("tokens", set(TEXT))
};
static final org.scassandra.http.client.types.ColumnMetadata[]
SELECT_CLUSTER_NAME = {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]