This is an automated email from the ASF dual-hosted git repository.

absurdfarce pushed a commit to branch 4.x
in repository https://gitbox.apache.org/repos/asf/cassandra-java-driver.git


The following commit(s) were added to refs/heads/4.x by this push:
     new 7e2c6579a CASSANDRA-19352: Support native_transport_(address|port) + 
native_transport_port_ssl for DSE 6.8 (4.x edition)
7e2c6579a is described below

commit 7e2c6579af564be6d1b161ec4159ecf517c190b4
Author: Bret McGuire <[email protected]>
AuthorDate: Tue Feb 6 15:18:59 2024 -0600

    CASSANDRA-19352: Support native_transport_(address|port) + 
native_transport_port_ssl for DSE 6.8 (4.x edition)
    
    patch by absurdfarce; reviewed by absurdfarce and adutra for CASSANDRA-19352
---
 .../core/metadata/DefaultTopologyMonitor.java      |  76 +++++++--
 .../core/metadata/DefaultTopologyMonitorTest.java  | 180 +++++++++++++++++++--
 2 files changed, 223 insertions(+), 33 deletions(-)

diff --git 
a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultTopologyMonitor.java
 
b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultTopologyMonitor.java
index 87008b05c..f3dc988cf 100644
--- 
a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultTopologyMonitor.java
+++ 
b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultTopologyMonitor.java
@@ -34,6 +34,7 @@ import 
com.datastax.oss.driver.internal.core.util.concurrent.CompletableFutures;
 import 
com.datastax.oss.driver.shaded.guava.common.annotations.VisibleForTesting;
 import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableMap;
 import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableSet;
+import com.datastax.oss.driver.shaded.guava.common.collect.Iterators;
 import com.datastax.oss.protocol.internal.ProtocolConstants;
 import com.datastax.oss.protocol.internal.response.Error;
 import edu.umd.cs.findbugs.annotations.NonNull;
@@ -69,6 +70,10 @@ public class DefaultTopologyMonitor implements 
TopologyMonitor {
   // Assume topology queries never need paging
   private static final int INFINITE_PAGE_SIZE = -1;
 
+  // A few system.peers columns which get special handling below
+  private static final String NATIVE_PORT = "native_port";
+  private static final String NATIVE_TRANSPORT_PORT = "native_transport_port";
+
   private final String logPrefix;
   private final InternalDriverContext context;
   private final ControlConnection controlConnection;
@@ -494,28 +499,65 @@ public class DefaultTopologyMonitor implements 
TopologyMonitor {
   @Nullable
   protected InetSocketAddress getBroadcastRpcAddress(
       @NonNull AdminRow row, @NonNull EndPoint localEndPoint) {
-    // in system.peers or system.local
-    InetAddress broadcastRpcInetAddress = row.getInetAddress("rpc_address");
+
+    InetAddress broadcastRpcInetAddress = null;
+    Iterator<String> addrCandidates =
+        Iterators.forArray(
+            // in system.peers_v2 (Cassandra >= 4.0)
+            "native_address",
+            // DSE 6.8 introduced native_transport_address and 
native_transport_port for the
+            // listen address.
+            "native_transport_address",
+            // in system.peers or system.local
+            "rpc_address");
+
+    while (broadcastRpcInetAddress == null && addrCandidates.hasNext())
+      broadcastRpcInetAddress = row.getInetAddress(addrCandidates.next());
+    // This could only happen if system tables are corrupted, but handle 
gracefully
     if (broadcastRpcInetAddress == null) {
-      // in system.peers_v2 (Cassandra >= 4.0)
-      broadcastRpcInetAddress = row.getInetAddress("native_address");
-      if (broadcastRpcInetAddress == null) {
-        // This could only happen if system tables are corrupted, but handle 
gracefully
-        return null;
+      LOG.warn(
+          "[{}] Unable to determine broadcast RPC IP address, returning null.  
"
+              + "This is likely due to a misconfiguration or invalid system 
tables.  "
+              + "Please validate the contents of system.local and/or {}.",
+          logPrefix,
+          getPeerTableName());
+      return null;
+    }
+
+    Integer broadcastRpcPort = null;
+    Iterator<String> portCandidates =
+        Iterators.forArray(
+            // in system.peers_v2 (Cassandra >= 4.0)
+            NATIVE_PORT,
+            // DSE 6.8 introduced native_transport_address and 
native_transport_port for the
+            // listen address.
+            NATIVE_TRANSPORT_PORT,
+            // system.local for Cassandra >= 4.0
+            "rpc_port");
+
+    while ((broadcastRpcPort == null || broadcastRpcPort == 0) && 
portCandidates.hasNext()) {
+
+      String colName = portCandidates.next();
+      broadcastRpcPort = row.getInteger(colName);
+      // Support override for SSL port (if enabled) in DSE
+      if (NATIVE_TRANSPORT_PORT.equals(colName) && 
context.getSslEngineFactory().isPresent()) {
+
+        String sslColName = colName + "_ssl";
+        broadcastRpcPort = row.getInteger(sslColName);
       }
     }
-    // system.local for Cassandra >= 4.0
-    Integer broadcastRpcPort = row.getInteger("rpc_port");
+    // use the default port if no port information was found in the row;
+    // note that in rare situations, the default port might not be known, in 
which case we
+    // report zero, as advertised in the javadocs of Node and NodeInfo.
     if (broadcastRpcPort == null || broadcastRpcPort == 0) {
-      // system.peers_v2
-      broadcastRpcPort = row.getInteger("native_port");
-      if (broadcastRpcPort == null || broadcastRpcPort == 0) {
-        // use the default port if no port information was found in the row;
-        // note that in rare situations, the default port might not be known, 
in which case we
-        // report zero, as advertised in the javadocs of Node and NodeInfo.
-        broadcastRpcPort = port == -1 ? 0 : port;
-      }
+
+      LOG.warn(
+          "[{}] Unable to determine broadcast RPC port.  "
+              + "Trying to fall back to port used by the control connection.",
+          logPrefix);
+      broadcastRpcPort = port == -1 ? 0 : port;
     }
+
     InetSocketAddress broadcastRpcAddress =
         new InetSocketAddress(broadcastRpcInetAddress, broadcastRpcPort);
     if (row.contains("peer") && 
broadcastRpcAddress.equals(localEndPoint.resolve())) {
diff --git 
a/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/DefaultTopologyMonitorTest.java
 
b/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/DefaultTopologyMonitorTest.java
index cc275eb16..dd40f2335 100644
--- 
a/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/DefaultTopologyMonitorTest.java
+++ 
b/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/DefaultTopologyMonitorTest.java
@@ -38,6 +38,7 @@ import 
com.datastax.oss.driver.api.core.addresstranslation.AddressTranslator;
 import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
 import com.datastax.oss.driver.api.core.config.DriverConfig;
 import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
+import com.datastax.oss.driver.api.core.ssl.SslEngineFactory;
 import 
com.datastax.oss.driver.internal.core.addresstranslation.PassThroughAddressTranslator;
 import com.datastax.oss.driver.internal.core.adminrequest.AdminResult;
 import com.datastax.oss.driver.internal.core.adminrequest.AdminRow;
@@ -50,9 +51,11 @@ import 
com.datastax.oss.driver.internal.core.util.concurrent.CompletableFutures;
 import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableMap;
 import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableSet;
 import com.datastax.oss.driver.shaded.guava.common.collect.Iterators;
+import com.datastax.oss.driver.shaded.guava.common.collect.Maps;
 import com.datastax.oss.protocol.internal.Message;
 import com.datastax.oss.protocol.internal.ProtocolConstants;
 import com.datastax.oss.protocol.internal.response.Error;
+import com.google.common.collect.Streams;
 import com.tngtech.java.junit.dataprovider.DataProvider;
 import com.tngtech.java.junit.dataprovider.DataProviderRunner;
 import com.tngtech.java.junit.dataprovider.UseDataProvider;
@@ -95,6 +98,8 @@ public class DefaultTopologyMonitorTest {
   @Mock private Appender<ILoggingEvent> appender;
   @Captor private ArgumentCaptor<ILoggingEvent> loggingEventCaptor;
 
+  @Mock private SslEngineFactory sslEngineFactory;
+
   private DefaultNode node1;
   private DefaultNode node2;
 
@@ -414,18 +419,6 @@ public class DefaultTopologyMonitorTest {
             + "This is likely a gossip or snitch issue, this node will be 
ignored.");
   }
 
-  @DataProvider
-  public static Object[][] columnsToCheckV1() {
-    return new Object[][] {{"rpc_address"}, {"host_id"}, {"data_center"}, 
{"rack"}, {"tokens"}};
-  }
-
-  @DataProvider
-  public static Object[][] columnsToCheckV2() {
-    return new Object[][] {
-      {"native_address"}, {"native_port"}, {"host_id"}, {"data_center"}, 
{"rack"}, {"tokens"}
-    };
-  }
-
   @Test
   public void should_stop_executing_queries_once_closed() {
     // Given
@@ -443,9 +436,9 @@ public class DefaultTopologyMonitorTest {
   public void should_warn_when_control_host_found_in_system_peers() {
     // Given
     AdminRow local = mockLocalRow(1, node1.getHostId());
-    AdminRow peer3 = mockPeersRow(3, UUID.randomUUID());
-    AdminRow peer2 = mockPeersRow(2, node2.getHostId());
     AdminRow peer1 = mockPeersRow(1, node2.getHostId()); // invalid
+    AdminRow peer2 = mockPeersRow(2, node2.getHostId());
+    AdminRow peer3 = mockPeersRow(3, UUID.randomUUID());
     topologyMonitor.stubQueries(
         new StubbedQuery("SELECT * FROM system.local", mockResult(local)),
         new StubbedQuery("SELECT * FROM system.peers_v2", 
Collections.emptyMap(), null, true),
@@ -462,7 +455,7 @@ public class DefaultTopologyMonitorTest {
                     .hasSize(3)
                     .extractingResultOf("getEndPoint")
                     .containsOnlyOnce(node1.getEndPoint()));
-    assertLog(
+    assertLogContains(
         Level.WARN,
         "[null] Control node /127.0.0.1:9042 has an entry for itself in 
system.peers: "
             + "this entry will be ignored. This is likely due to a 
misconfiguration; "
@@ -492,7 +485,7 @@ public class DefaultTopologyMonitorTest {
                     .hasSize(3)
                     .extractingResultOf("getEndPoint")
                     .containsOnlyOnce(node1.getEndPoint()));
-    assertLog(
+    assertLogContains(
         Level.WARN,
         "[null] Control node /127.0.0.1:9042 has an entry for itself in 
system.peers_v2: "
             + "this entry will be ignored. This is likely due to a 
misconfiguration; "
@@ -500,6 +493,116 @@ public class DefaultTopologyMonitorTest {
             + "all nodes in your cluster.");
   }
 
+  // Confirm the base case of extracting peer info from peers_v2, no SSL 
involved
+  @Test
+  public void should_get_peer_address_info_peers_v2() {
+    // Given
+    AdminRow local = mockLocalRow(1, node1.getHostId());
+    AdminRow peer2 = mockPeersV2Row(3, node2.getHostId());
+    AdminRow peer1 = mockPeersV2Row(2, node1.getHostId());
+    topologyMonitor.isSchemaV2 = true;
+    topologyMonitor.stubQueries(
+        new StubbedQuery("SELECT * FROM system.local", mockResult(local)),
+        new StubbedQuery("SELECT * FROM system.peers_v2", mockResult(peer2, 
peer1)));
+    when(context.getSslEngineFactory()).thenReturn(Optional.empty());
+
+    // When
+    CompletionStage<Iterable<NodeInfo>> futureInfos = 
topologyMonitor.refreshNodeList();
+
+    // Then
+    assertThatStage(futureInfos)
+        .isSuccess(
+            infos -> {
+              Iterator<NodeInfo> iterator = infos.iterator();
+              // First NodeInfo is for local, skip past that
+              iterator.next();
+              NodeInfo peer2nodeInfo = iterator.next();
+              assertThat(peer2nodeInfo.getEndPoint().resolve())
+                  .isEqualTo(new InetSocketAddress("127.0.0.3", 9042));
+              NodeInfo peer1nodeInfo = iterator.next();
+              assertThat(peer1nodeInfo.getEndPoint().resolve())
+                  .isEqualTo(new InetSocketAddress("127.0.0.2", 9042));
+            });
+  }
+
+  // Confirm the base case of extracting peer info from DSE peers table, no 
SSL involved
+  @Test
+  public void should_get_peer_address_info_peers_dse() {
+    // Given
+    AdminRow local = mockLocalRow(1, node1.getHostId());
+    AdminRow peer2 = mockPeersRowDse(3, node2.getHostId());
+    AdminRow peer1 = mockPeersRowDse(2, node1.getHostId());
+    topologyMonitor.isSchemaV2 = true;
+    topologyMonitor.stubQueries(
+        new StubbedQuery("SELECT * FROM system.local", mockResult(local)),
+        new StubbedQuery("SELECT * FROM system.peers_v2", Maps.newHashMap(), 
null, true),
+        new StubbedQuery("SELECT * FROM system.peers", mockResult(peer2, 
peer1)));
+    when(context.getSslEngineFactory()).thenReturn(Optional.empty());
+
+    // When
+    CompletionStage<Iterable<NodeInfo>> futureInfos = 
topologyMonitor.refreshNodeList();
+
+    // Then
+    assertThatStage(futureInfos)
+        .isSuccess(
+            infos -> {
+              Iterator<NodeInfo> iterator = infos.iterator();
+              // First NodeInfo is for local, skip past that
+              iterator.next();
+              NodeInfo peer2nodeInfo = iterator.next();
+              assertThat(peer2nodeInfo.getEndPoint().resolve())
+                  .isEqualTo(new InetSocketAddress("127.0.0.3", 9042));
+              NodeInfo peer1nodeInfo = iterator.next();
+              assertThat(peer1nodeInfo.getEndPoint().resolve())
+                  .isEqualTo(new InetSocketAddress("127.0.0.2", 9042));
+            });
+  }
+
+  // Confirm the base case of extracting peer info from DSE peers table, this 
time with SSL
+  @Test
+  public void should_get_peer_address_info_peers_dse_with_ssl() {
+    // Given
+    AdminRow local = mockLocalRow(1, node1.getHostId());
+    AdminRow peer2 = mockPeersRowDseWithSsl(3, node2.getHostId());
+    AdminRow peer1 = mockPeersRowDseWithSsl(2, node1.getHostId());
+    topologyMonitor.isSchemaV2 = true;
+    topologyMonitor.stubQueries(
+        new StubbedQuery("SELECT * FROM system.local", mockResult(local)),
+        new StubbedQuery("SELECT * FROM system.peers_v2", Maps.newHashMap(), 
null, true),
+        new StubbedQuery("SELECT * FROM system.peers", mockResult(peer2, 
peer1)));
+    
when(context.getSslEngineFactory()).thenReturn(Optional.of(sslEngineFactory));
+
+    // When
+    CompletionStage<Iterable<NodeInfo>> futureInfos = 
topologyMonitor.refreshNodeList();
+
+    // Then
+    assertThatStage(futureInfos)
+        .isSuccess(
+            infos -> {
+              Iterator<NodeInfo> iterator = infos.iterator();
+              // First NodeInfo is for local, skip past that
+              iterator.next();
+              NodeInfo peer2nodeInfo = iterator.next();
+              assertThat(peer2nodeInfo.getEndPoint().resolve())
+                  .isEqualTo(new InetSocketAddress("127.0.0.3", 9043));
+              NodeInfo peer1nodeInfo = iterator.next();
+              assertThat(peer1nodeInfo.getEndPoint().resolve())
+                  .isEqualTo(new InetSocketAddress("127.0.0.2", 9043));
+            });
+  }
+
+  @DataProvider
+  public static Object[][] columnsToCheckV1() {
+    return new Object[][] {{"rpc_address"}, {"host_id"}, {"data_center"}, 
{"rack"}, {"tokens"}};
+  }
+
+  @DataProvider
+  public static Object[][] columnsToCheckV2() {
+    return new Object[][] {
+      {"native_address"}, {"native_port"}, {"host_id"}, {"data_center"}, 
{"rack"}, {"tokens"}
+    };
+  }
+
   /** Mocks the query execution logic. */
   private static class TestTopologyMonitor extends DefaultTopologyMonitor {
 
@@ -641,6 +744,43 @@ public class DefaultTopologyMonitorTest {
     }
   }
 
+  // Mock row for DSE ~6.8
+  private AdminRow mockPeersRowDse(int i, UUID hostId) {
+    try {
+      AdminRow row = mock(AdminRow.class);
+      when(row.contains("peer")).thenReturn(true);
+      when(row.isNull("data_center")).thenReturn(false);
+      when(row.getString("data_center")).thenReturn("dc" + i);
+      when(row.getString("dse_version")).thenReturn("6.8.30");
+      when(row.contains("graph")).thenReturn(true);
+      when(row.isNull("host_id")).thenReturn(hostId == null);
+      when(row.getUuid("host_id")).thenReturn(hostId);
+      
when(row.getInetAddress("peer")).thenReturn(InetAddress.getByName("127.0.0." + 
i));
+      when(row.isNull("rack")).thenReturn(false);
+      when(row.getString("rack")).thenReturn("rack" + i);
+      when(row.isNull("native_transport_address")).thenReturn(false);
+      when(row.getInetAddress("native_transport_address"))
+          .thenReturn(InetAddress.getByName("127.0.0." + i));
+      when(row.isNull("native_transport_port")).thenReturn(false);
+      when(row.getInteger("native_transport_port")).thenReturn(9042);
+      when(row.isNull("tokens")).thenReturn(false);
+      when(row.getSetOfString("tokens")).thenReturn(ImmutableSet.of("token" + 
i));
+      when(row.isNull("rpc_address")).thenReturn(false);
+
+      return row;
+    } catch (UnknownHostException e) {
+      fail("unexpected", e);
+      return null;
+    }
+  }
+
+  private AdminRow mockPeersRowDseWithSsl(int i, UUID hostId) {
+    AdminRow row = mockPeersRowDse(i, hostId);
+    when(row.isNull("native_transport_port_ssl")).thenReturn(false);
+    when(row.getInteger("native_transport_port_ssl")).thenReturn(9043);
+    return row;
+  }
+
   private AdminResult mockResult(AdminRow... rows) {
     AdminResult result = mock(AdminResult.class);
     when(result.iterator()).thenReturn(Iterators.forArray(rows));
@@ -654,4 +794,12 @@ public class DefaultTopologyMonitorTest {
     assertThat(logs).hasSize(1);
     assertThat(logs.iterator().next().getFormattedMessage()).contains(message);
   }
+
+  private void assertLogContains(Level level, String message) {
+    verify(appender, atLeast(1)).doAppend(loggingEventCaptor.capture());
+    Iterable<ILoggingEvent> logs =
+        filter(loggingEventCaptor.getAllValues()).with("level", level).get();
+    assertThat(
+        
Streams.stream(logs).map(ILoggingEvent::getFormattedMessage).anyMatch(message::contains));
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to