This is an automated email from the ASF dual-hosted git repository.

brandonwilliams pushed a commit to branch cassandra-4.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-4.0 by this push:
     new b791644fda Allow internal address to change with reconnecting snitches
b791644fda is described below

commit b791644fda91b343a679bb0c2c1e33e594524636
Author: Brandon Williams <[email protected]>
AuthorDate: Thu May 4 05:46:11 2023 -0500

    Allow internal address to change with reconnecting snitches
    
    Patch by brandonwilliams; reviewed by bereng for CASSANDRA-16718
---
 CHANGES.txt                                             |  1 +
 src/java/org/apache/cassandra/gms/EndpointState.java    |  5 +++++
 src/java/org/apache/cassandra/gms/Gossiper.java         | 17 +++++++++++++++++
 .../cassandra/locator/ReconnectableSnitchHelper.java    |  8 ++++++--
 src/java/org/apache/cassandra/net/MessagingService.java |  7 +++++++
 .../cassandra/net/OutboundConnectionSettings.java       |  4 ++--
 6 files changed, 38 insertions(+), 4 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index f6c5cfc757..ff745f3092 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0.10
+ * Allow internal address to change with reconnecting snitches 
(CASSANDRA-16718)
  * Fix quoting in toCqlString methods of UDTs and aggregates (CASSANDRA-17918)
  * NPE when deserializing malformed collections from client (CASSANDRA-18505)
  * Improve 'Not enough space for compaction' logging messages (CASSANDRA-18260)
diff --git a/src/java/org/apache/cassandra/gms/EndpointState.java 
b/src/java/org/apache/cassandra/gms/EndpointState.java
index b8d56263e7..fe29c4bbcc 100644
--- a/src/java/org/apache/cassandra/gms/EndpointState.java
+++ b/src/java/org/apache/cassandra/gms/EndpointState.java
@@ -86,6 +86,11 @@ public class EndpointState
         return applicationState.get().get(key);
     }
 
+    public void removeApplicationState(ApplicationState key)
+    {
+        applicationState.get().remove(key);
+    }
+
     public boolean containsApplicationState(ApplicationState key)
     {
         return applicationState.get().containsKey(key);
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java 
b/src/java/org/apache/cassandra/gms/Gossiper.java
index cf505bb46c..ed5788e6ba 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -1154,6 +1154,23 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
         return 
UUID.fromString(epStates.get(endpoint).getApplicationState(ApplicationState.HOST_ID).value);
     }
 
+    public InetAddressAndPort getInternalAddressAndPort(InetAddressAndPort 
endpoint)
+    {
+        try
+        {
+            String internal = getApplicationState(endpoint, 
ApplicationState.INTERNAL_ADDRESS_AND_PORT);
+            if (internal == null)
+                internal = getApplicationState(endpoint, 
ApplicationState.INTERNAL_IP);
+            if (internal == null)
+                return endpoint;
+            return InetAddressAndPort.getByName(internal);
+        }
+        catch (UnknownHostException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
     /**
      * The value for the provided application state for the provided endpoint 
as currently known by this Gossip instance.
      *
diff --git 
a/src/java/org/apache/cassandra/locator/ReconnectableSnitchHelper.java 
b/src/java/org/apache/cassandra/locator/ReconnectableSnitchHelper.java
index dea8c76f4e..7bb38cb491 100644
--- a/src/java/org/apache/cassandra/locator/ReconnectableSnitchHelper.java
+++ b/src/java/org/apache/cassandra/locator/ReconnectableSnitchHelper.java
@@ -91,7 +91,7 @@ public class ReconnectableSnitchHelper implements 
IEndpointStateChangeSubscriber
             VersionedValue address = 
epState.getApplicationState(ApplicationState.INTERNAL_ADDRESS_AND_PORT);
             if (address == null)
             {
-                address = 
epState.getApplicationState(ApplicationState.INTERNAL_ADDRESS_AND_PORT);
+                address = 
epState.getApplicationState(ApplicationState.INTERNAL_IP);
             }
             if (address != null)
             {
@@ -129,7 +129,11 @@ public class ReconnectableSnitchHelper implements 
IEndpointStateChangeSubscriber
 
     public void onDead(InetAddressAndPort endpoint, EndpointState state)
     {
-        // do nothing.
+        if (preferLocal && 
state.getApplicationState(ApplicationState.INTERNAL_ADDRESS_AND_PORT) != null)
+            
state.removeApplicationState(ApplicationState.INTERNAL_ADDRESS_AND_PORT);
+        if (preferLocal && 
state.getApplicationState(ApplicationState.INTERNAL_IP) != null)
+            state.removeApplicationState(ApplicationState.INTERNAL_IP);
+        MessagingService.instance().closeOutboundNow(endpoint);
     }
 
     public void onRemove(InetAddressAndPort endpoint)
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java 
b/src/java/org/apache/cassandra/net/MessagingService.java
index 71f2231eb2..cd124ad719 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -370,6 +370,13 @@ public final class MessagingService extends 
MessagingServiceMBeanImpl
                 .addListener(future -> channelManagers.remove(to, pool));
     }
 
+    public void closeOutboundNow(InetAddressAndPort to)
+    {
+        OutboundConnections pool = channelManagers.get(to);
+        if (pool != null)
+            closeOutboundNow(pool);
+    }
+
     /**
      * Only to be invoked once we believe the connections will never be used 
again.
      */
diff --git a/src/java/org/apache/cassandra/net/OutboundConnectionSettings.java 
b/src/java/org/apache/cassandra/net/OutboundConnectionSettings.java
index 92a1f52bc3..d9038ee487 100644
--- a/src/java/org/apache/cassandra/net/OutboundConnectionSettings.java
+++ b/src/java/org/apache/cassandra/net/OutboundConnectionSettings.java
@@ -27,7 +27,7 @@ import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.EncryptionOptions;
 import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions;
-import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.locator.IEndpointSnitch;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.utils.FBUtilities;
@@ -452,7 +452,7 @@ public class OutboundConnectionSettings
     {
         InetAddressAndPort connectTo = this.connectTo;
         if (connectTo == null)
-            connectTo = SystemKeyspace.getPreferredIP(to);
+            connectTo = Gossiper.instance.getInternalAddressAndPort(to);
         if (FBUtilities.getBroadcastAddressAndPort().equals(connectTo))
             return FBUtilities.getLocalAddressAndPort();
         return connectTo;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to