This is an automated email from the ASF dual-hosted git repository.
absurdfarce pushed a commit to branch 4.x
in repository https://gitbox.apache.org/repos/asf/cassandra-java-driver.git
The following commit(s) were added to refs/heads/4.x by this push:
new 7e2c6579a CASSANDRA-19352: Support native_transport_(address|port) +
native_transport_port_ssl for DSE 6.8 (4.x edition)
7e2c6579a is described below
commit 7e2c6579af564be6d1b161ec4159ecf517c190b4
Author: Bret McGuire <[email protected]>
AuthorDate: Tue Feb 6 15:18:59 2024 -0600
CASSANDRA-19352: Support native_transport_(address|port) +
native_transport_port_ssl for DSE 6.8 (4.x edition)
patch by absurdfarce; reviewed by absurdfarce and adutra for CASSANDRA-19352
---
.../core/metadata/DefaultTopologyMonitor.java | 76 +++++++--
.../core/metadata/DefaultTopologyMonitorTest.java | 180 +++++++++++++++++++--
2 files changed, 223 insertions(+), 33 deletions(-)
diff --git
a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultTopologyMonitor.java
b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultTopologyMonitor.java
index 87008b05c..f3dc988cf 100644
---
a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultTopologyMonitor.java
+++
b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultTopologyMonitor.java
@@ -34,6 +34,7 @@ import
com.datastax.oss.driver.internal.core.util.concurrent.CompletableFutures;
import
com.datastax.oss.driver.shaded.guava.common.annotations.VisibleForTesting;
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableMap;
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableSet;
+import com.datastax.oss.driver.shaded.guava.common.collect.Iterators;
import com.datastax.oss.protocol.internal.ProtocolConstants;
import com.datastax.oss.protocol.internal.response.Error;
import edu.umd.cs.findbugs.annotations.NonNull;
@@ -69,6 +70,10 @@ public class DefaultTopologyMonitor implements
TopologyMonitor {
// Assume topology queries never need paging
private static final int INFINITE_PAGE_SIZE = -1;
+ // A few system.peers columns which get special handling below
+ private static final String NATIVE_PORT = "native_port";
+ private static final String NATIVE_TRANSPORT_PORT = "native_transport_port";
+
private final String logPrefix;
private final InternalDriverContext context;
private final ControlConnection controlConnection;
@@ -494,28 +499,65 @@ public class DefaultTopologyMonitor implements
TopologyMonitor {
@Nullable
protected InetSocketAddress getBroadcastRpcAddress(
@NonNull AdminRow row, @NonNull EndPoint localEndPoint) {
- // in system.peers or system.local
- InetAddress broadcastRpcInetAddress = row.getInetAddress("rpc_address");
+
+ InetAddress broadcastRpcInetAddress = null;
+ Iterator<String> addrCandidates =
+ Iterators.forArray(
+ // in system.peers_v2 (Cassandra >= 4.0)
+ "native_address",
+ // DSE 6.8 introduced native_transport_address and
native_transport_port for the
+ // listen address.
+ "native_transport_address",
+ // in system.peers or system.local
+ "rpc_address");
+
+ while (broadcastRpcInetAddress == null && addrCandidates.hasNext())
+ broadcastRpcInetAddress = row.getInetAddress(addrCandidates.next());
+ // This could only happen if system tables are corrupted, but handle
gracefully
if (broadcastRpcInetAddress == null) {
- // in system.peers_v2 (Cassandra >= 4.0)
- broadcastRpcInetAddress = row.getInetAddress("native_address");
- if (broadcastRpcInetAddress == null) {
- // This could only happen if system tables are corrupted, but handle
gracefully
- return null;
+ LOG.warn(
+ "[{}] Unable to determine broadcast RPC IP address, returning null.
"
+ + "This is likely due to a misconfiguration or invalid system
tables. "
+ + "Please validate the contents of system.local and/or {}.",
+ logPrefix,
+ getPeerTableName());
+ return null;
+ }
+
+ Integer broadcastRpcPort = null;
+ Iterator<String> portCandidates =
+ Iterators.forArray(
+ // in system.peers_v2 (Cassandra >= 4.0)
+ NATIVE_PORT,
+ // DSE 6.8 introduced native_transport_address and
native_transport_port for the
+ // listen address.
+ NATIVE_TRANSPORT_PORT,
+ // system.local for Cassandra >= 4.0
+ "rpc_port");
+
+ while ((broadcastRpcPort == null || broadcastRpcPort == 0) &&
portCandidates.hasNext()) {
+
+ String colName = portCandidates.next();
+ broadcastRpcPort = row.getInteger(colName);
+ // Support override for SSL port (if enabled) in DSE
+ if (NATIVE_TRANSPORT_PORT.equals(colName) &&
context.getSslEngineFactory().isPresent()) {
+
+ String sslColName = colName + "_ssl";
+ broadcastRpcPort = row.getInteger(sslColName);
}
}
- // system.local for Cassandra >= 4.0
- Integer broadcastRpcPort = row.getInteger("rpc_port");
+ // use the default port if no port information was found in the row;
+ // note that in rare situations, the default port might not be known, in
which case we
+ // report zero, as advertised in the javadocs of Node and NodeInfo.
if (broadcastRpcPort == null || broadcastRpcPort == 0) {
- // system.peers_v2
- broadcastRpcPort = row.getInteger("native_port");
- if (broadcastRpcPort == null || broadcastRpcPort == 0) {
- // use the default port if no port information was found in the row;
- // note that in rare situations, the default port might not be known,
in which case we
- // report zero, as advertised in the javadocs of Node and NodeInfo.
- broadcastRpcPort = port == -1 ? 0 : port;
- }
+
+ LOG.warn(
+ "[{}] Unable to determine broadcast RPC port. "
+ + "Trying to fall back to port used by the control connection.",
+ logPrefix);
+ broadcastRpcPort = port == -1 ? 0 : port;
}
+
InetSocketAddress broadcastRpcAddress =
new InetSocketAddress(broadcastRpcInetAddress, broadcastRpcPort);
if (row.contains("peer") &&
broadcastRpcAddress.equals(localEndPoint.resolve())) {
diff --git
a/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/DefaultTopologyMonitorTest.java
b/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/DefaultTopologyMonitorTest.java
index cc275eb16..dd40f2335 100644
---
a/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/DefaultTopologyMonitorTest.java
+++
b/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/DefaultTopologyMonitorTest.java
@@ -38,6 +38,7 @@ import
com.datastax.oss.driver.api.core.addresstranslation.AddressTranslator;
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
import com.datastax.oss.driver.api.core.config.DriverConfig;
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
+import com.datastax.oss.driver.api.core.ssl.SslEngineFactory;
import
com.datastax.oss.driver.internal.core.addresstranslation.PassThroughAddressTranslator;
import com.datastax.oss.driver.internal.core.adminrequest.AdminResult;
import com.datastax.oss.driver.internal.core.adminrequest.AdminRow;
@@ -50,9 +51,11 @@ import
com.datastax.oss.driver.internal.core.util.concurrent.CompletableFutures;
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableMap;
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableSet;
import com.datastax.oss.driver.shaded.guava.common.collect.Iterators;
+import com.datastax.oss.driver.shaded.guava.common.collect.Maps;
import com.datastax.oss.protocol.internal.Message;
import com.datastax.oss.protocol.internal.ProtocolConstants;
import com.datastax.oss.protocol.internal.response.Error;
+import com.google.common.collect.Streams;
import com.tngtech.java.junit.dataprovider.DataProvider;
import com.tngtech.java.junit.dataprovider.DataProviderRunner;
import com.tngtech.java.junit.dataprovider.UseDataProvider;
@@ -95,6 +98,8 @@ public class DefaultTopologyMonitorTest {
@Mock private Appender<ILoggingEvent> appender;
@Captor private ArgumentCaptor<ILoggingEvent> loggingEventCaptor;
+ @Mock private SslEngineFactory sslEngineFactory;
+
private DefaultNode node1;
private DefaultNode node2;
@@ -414,18 +419,6 @@ public class DefaultTopologyMonitorTest {
+ "This is likely a gossip or snitch issue, this node will be
ignored.");
}
- @DataProvider
- public static Object[][] columnsToCheckV1() {
- return new Object[][] {{"rpc_address"}, {"host_id"}, {"data_center"},
{"rack"}, {"tokens"}};
- }
-
- @DataProvider
- public static Object[][] columnsToCheckV2() {
- return new Object[][] {
- {"native_address"}, {"native_port"}, {"host_id"}, {"data_center"},
{"rack"}, {"tokens"}
- };
- }
-
@Test
public void should_stop_executing_queries_once_closed() {
// Given
@@ -443,9 +436,9 @@ public class DefaultTopologyMonitorTest {
public void should_warn_when_control_host_found_in_system_peers() {
// Given
AdminRow local = mockLocalRow(1, node1.getHostId());
- AdminRow peer3 = mockPeersRow(3, UUID.randomUUID());
- AdminRow peer2 = mockPeersRow(2, node2.getHostId());
AdminRow peer1 = mockPeersRow(1, node2.getHostId()); // invalid
+ AdminRow peer2 = mockPeersRow(2, node2.getHostId());
+ AdminRow peer3 = mockPeersRow(3, UUID.randomUUID());
topologyMonitor.stubQueries(
new StubbedQuery("SELECT * FROM system.local", mockResult(local)),
new StubbedQuery("SELECT * FROM system.peers_v2",
Collections.emptyMap(), null, true),
@@ -462,7 +455,7 @@ public class DefaultTopologyMonitorTest {
.hasSize(3)
.extractingResultOf("getEndPoint")
.containsOnlyOnce(node1.getEndPoint()));
- assertLog(
+ assertLogContains(
Level.WARN,
"[null] Control node /127.0.0.1:9042 has an entry for itself in
system.peers: "
+ "this entry will be ignored. This is likely due to a
misconfiguration; "
@@ -492,7 +485,7 @@ public class DefaultTopologyMonitorTest {
.hasSize(3)
.extractingResultOf("getEndPoint")
.containsOnlyOnce(node1.getEndPoint()));
- assertLog(
+ assertLogContains(
Level.WARN,
"[null] Control node /127.0.0.1:9042 has an entry for itself in
system.peers_v2: "
+ "this entry will be ignored. This is likely due to a
misconfiguration; "
@@ -500,6 +493,116 @@ public class DefaultTopologyMonitorTest {
+ "all nodes in your cluster.");
}
+ // Confirm the base case of extracting peer info from peers_v2, no SSL
involved
+ @Test
+ public void should_get_peer_address_info_peers_v2() {
+ // Given
+ AdminRow local = mockLocalRow(1, node1.getHostId());
+ AdminRow peer2 = mockPeersV2Row(3, node2.getHostId());
+ AdminRow peer1 = mockPeersV2Row(2, node1.getHostId());
+ topologyMonitor.isSchemaV2 = true;
+ topologyMonitor.stubQueries(
+ new StubbedQuery("SELECT * FROM system.local", mockResult(local)),
+ new StubbedQuery("SELECT * FROM system.peers_v2", mockResult(peer2,
peer1)));
+ when(context.getSslEngineFactory()).thenReturn(Optional.empty());
+
+ // When
+ CompletionStage<Iterable<NodeInfo>> futureInfos =
topologyMonitor.refreshNodeList();
+
+ // Then
+ assertThatStage(futureInfos)
+ .isSuccess(
+ infos -> {
+ Iterator<NodeInfo> iterator = infos.iterator();
+ // First NodeInfo is for local, skip past that
+ iterator.next();
+ NodeInfo peer2nodeInfo = iterator.next();
+ assertThat(peer2nodeInfo.getEndPoint().resolve())
+ .isEqualTo(new InetSocketAddress("127.0.0.3", 9042));
+ NodeInfo peer1nodeInfo = iterator.next();
+ assertThat(peer1nodeInfo.getEndPoint().resolve())
+ .isEqualTo(new InetSocketAddress("127.0.0.2", 9042));
+ });
+ }
+
+ // Confirm the base case of extracting peer info from DSE peers table, no
SSL involved
+ @Test
+ public void should_get_peer_address_info_peers_dse() {
+ // Given
+ AdminRow local = mockLocalRow(1, node1.getHostId());
+ AdminRow peer2 = mockPeersRowDse(3, node2.getHostId());
+ AdminRow peer1 = mockPeersRowDse(2, node1.getHostId());
+ topologyMonitor.isSchemaV2 = true;
+ topologyMonitor.stubQueries(
+ new StubbedQuery("SELECT * FROM system.local", mockResult(local)),
+ new StubbedQuery("SELECT * FROM system.peers_v2", Maps.newHashMap(),
null, true),
+ new StubbedQuery("SELECT * FROM system.peers", mockResult(peer2,
peer1)));
+ when(context.getSslEngineFactory()).thenReturn(Optional.empty());
+
+ // When
+ CompletionStage<Iterable<NodeInfo>> futureInfos =
topologyMonitor.refreshNodeList();
+
+ // Then
+ assertThatStage(futureInfos)
+ .isSuccess(
+ infos -> {
+ Iterator<NodeInfo> iterator = infos.iterator();
+ // First NodeInfo is for local, skip past that
+ iterator.next();
+ NodeInfo peer2nodeInfo = iterator.next();
+ assertThat(peer2nodeInfo.getEndPoint().resolve())
+ .isEqualTo(new InetSocketAddress("127.0.0.3", 9042));
+ NodeInfo peer1nodeInfo = iterator.next();
+ assertThat(peer1nodeInfo.getEndPoint().resolve())
+ .isEqualTo(new InetSocketAddress("127.0.0.2", 9042));
+ });
+ }
+
+ // Confirm the base case of extracting peer info from DSE peers table, this
time with SSL
+ @Test
+ public void should_get_peer_address_info_peers_dse_with_ssl() {
+ // Given
+ AdminRow local = mockLocalRow(1, node1.getHostId());
+ AdminRow peer2 = mockPeersRowDseWithSsl(3, node2.getHostId());
+ AdminRow peer1 = mockPeersRowDseWithSsl(2, node1.getHostId());
+ topologyMonitor.isSchemaV2 = true;
+ topologyMonitor.stubQueries(
+ new StubbedQuery("SELECT * FROM system.local", mockResult(local)),
+ new StubbedQuery("SELECT * FROM system.peers_v2", Maps.newHashMap(),
null, true),
+ new StubbedQuery("SELECT * FROM system.peers", mockResult(peer2,
peer1)));
+
when(context.getSslEngineFactory()).thenReturn(Optional.of(sslEngineFactory));
+
+ // When
+ CompletionStage<Iterable<NodeInfo>> futureInfos =
topologyMonitor.refreshNodeList();
+
+ // Then
+ assertThatStage(futureInfos)
+ .isSuccess(
+ infos -> {
+ Iterator<NodeInfo> iterator = infos.iterator();
+ // First NodeInfo is for local, skip past that
+ iterator.next();
+ NodeInfo peer2nodeInfo = iterator.next();
+ assertThat(peer2nodeInfo.getEndPoint().resolve())
+ .isEqualTo(new InetSocketAddress("127.0.0.3", 9043));
+ NodeInfo peer1nodeInfo = iterator.next();
+ assertThat(peer1nodeInfo.getEndPoint().resolve())
+ .isEqualTo(new InetSocketAddress("127.0.0.2", 9043));
+ });
+ }
+
+ @DataProvider
+ public static Object[][] columnsToCheckV1() {
+ return new Object[][] {{"rpc_address"}, {"host_id"}, {"data_center"},
{"rack"}, {"tokens"}};
+ }
+
+ @DataProvider
+ public static Object[][] columnsToCheckV2() {
+ return new Object[][] {
+ {"native_address"}, {"native_port"}, {"host_id"}, {"data_center"},
{"rack"}, {"tokens"}
+ };
+ }
+
/** Mocks the query execution logic. */
private static class TestTopologyMonitor extends DefaultTopologyMonitor {
@@ -641,6 +744,43 @@ public class DefaultTopologyMonitorTest {
}
}
+ // Mock row for DSE ~6.8
+ private AdminRow mockPeersRowDse(int i, UUID hostId) {
+ try {
+ AdminRow row = mock(AdminRow.class);
+ when(row.contains("peer")).thenReturn(true);
+ when(row.isNull("data_center")).thenReturn(false);
+ when(row.getString("data_center")).thenReturn("dc" + i);
+ when(row.getString("dse_version")).thenReturn("6.8.30");
+ when(row.contains("graph")).thenReturn(true);
+ when(row.isNull("host_id")).thenReturn(hostId == null);
+ when(row.getUuid("host_id")).thenReturn(hostId);
+
when(row.getInetAddress("peer")).thenReturn(InetAddress.getByName("127.0.0." +
i));
+ when(row.isNull("rack")).thenReturn(false);
+ when(row.getString("rack")).thenReturn("rack" + i);
+ when(row.isNull("native_transport_address")).thenReturn(false);
+ when(row.getInetAddress("native_transport_address"))
+ .thenReturn(InetAddress.getByName("127.0.0." + i));
+ when(row.isNull("native_transport_port")).thenReturn(false);
+ when(row.getInteger("native_transport_port")).thenReturn(9042);
+ when(row.isNull("tokens")).thenReturn(false);
+ when(row.getSetOfString("tokens")).thenReturn(ImmutableSet.of("token" +
i));
+ when(row.isNull("rpc_address")).thenReturn(false);
+
+ return row;
+ } catch (UnknownHostException e) {
+ fail("unexpected", e);
+ return null;
+ }
+ }
+
+ private AdminRow mockPeersRowDseWithSsl(int i, UUID hostId) {
+ AdminRow row = mockPeersRowDse(i, hostId);
+ when(row.isNull("native_transport_port_ssl")).thenReturn(false);
+ when(row.getInteger("native_transport_port_ssl")).thenReturn(9043);
+ return row;
+ }
+
private AdminResult mockResult(AdminRow... rows) {
AdminResult result = mock(AdminResult.class);
when(result.iterator()).thenReturn(Iterators.forArray(rows));
@@ -654,4 +794,12 @@ public class DefaultTopologyMonitorTest {
assertThat(logs).hasSize(1);
assertThat(logs.iterator().next().getFormattedMessage()).contains(message);
}
+
+ private void assertLogContains(Level level, String message) {
+ verify(appender, atLeast(1)).doAppend(loggingEventCaptor.capture());
+ Iterable<ILoggingEvent> logs =
+ filter(loggingEventCaptor.getAllValues()).with("level", level).get();
+ assertThat(
+
Streams.stream(logs).map(ILoggingEvent::getFormattedMessage).anyMatch(message::contains));
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]