This is an automated email from the ASF dual-hosted git repository.
brandonwilliams pushed a commit to branch cassandra-4.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-4.0 by this push:
new b791644fda Allow internal address to change with reconnecting snitches
b791644fda is described below
commit b791644fda91b343a679bb0c2c1e33e594524636
Author: Brandon Williams <[email protected]>
AuthorDate: Thu May 4 05:46:11 2023 -0500
Allow internal address to change with reconnecting snitches
Patch by brandonwilliams; reviewed by bereng for CASSANDRA-16718
---
CHANGES.txt | 1 +
src/java/org/apache/cassandra/gms/EndpointState.java | 5 +++++
src/java/org/apache/cassandra/gms/Gossiper.java | 17 +++++++++++++++++
.../cassandra/locator/ReconnectableSnitchHelper.java | 8 ++++++--
src/java/org/apache/cassandra/net/MessagingService.java | 7 +++++++
.../cassandra/net/OutboundConnectionSettings.java | 4 ++--
6 files changed, 38 insertions(+), 4 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index f6c5cfc757..ff745f3092 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0.10
+ * Allow internal address to change with reconnecting snitches
(CASSANDRA-16718)
* Fix quoting in toCqlString methods of UDTs and aggregates (CASSANDRA-17918)
* NPE when deserializing malformed collections from client (CASSANDRA-18505)
* Improve 'Not enough space for compaction' logging messages (CASSANDRA-18260)
diff --git a/src/java/org/apache/cassandra/gms/EndpointState.java
b/src/java/org/apache/cassandra/gms/EndpointState.java
index b8d56263e7..fe29c4bbcc 100644
--- a/src/java/org/apache/cassandra/gms/EndpointState.java
+++ b/src/java/org/apache/cassandra/gms/EndpointState.java
@@ -86,6 +86,11 @@ public class EndpointState
return applicationState.get().get(key);
}
+ public void removeApplicationState(ApplicationState key)
+ {
+ applicationState.get().remove(key);
+ }
+
public boolean containsApplicationState(ApplicationState key)
{
return applicationState.get().containsKey(key);
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java
b/src/java/org/apache/cassandra/gms/Gossiper.java
index cf505bb46c..ed5788e6ba 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -1154,6 +1154,23 @@ public class Gossiper implements
IFailureDetectionEventListener, GossiperMBean
return
UUID.fromString(epStates.get(endpoint).getApplicationState(ApplicationState.HOST_ID).value);
}
+ public InetAddressAndPort getInternalAddressAndPort(InetAddressAndPort
endpoint)
+ {
+ try
+ {
+ String internal = getApplicationState(endpoint,
ApplicationState.INTERNAL_ADDRESS_AND_PORT);
+ if (internal == null)
+ internal = getApplicationState(endpoint,
ApplicationState.INTERNAL_IP);
+ if (internal == null)
+ return endpoint;
+ return InetAddressAndPort.getByName(internal);
+ }
+ catch (UnknownHostException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
/**
* The value for the provided application state for the provided endpoint
as currently known by this Gossip instance.
*
diff --git
a/src/java/org/apache/cassandra/locator/ReconnectableSnitchHelper.java
b/src/java/org/apache/cassandra/locator/ReconnectableSnitchHelper.java
index dea8c76f4e..7bb38cb491 100644
--- a/src/java/org/apache/cassandra/locator/ReconnectableSnitchHelper.java
+++ b/src/java/org/apache/cassandra/locator/ReconnectableSnitchHelper.java
@@ -91,7 +91,7 @@ public class ReconnectableSnitchHelper implements
IEndpointStateChangeSubscriber
VersionedValue address =
epState.getApplicationState(ApplicationState.INTERNAL_ADDRESS_AND_PORT);
if (address == null)
{
- address =
epState.getApplicationState(ApplicationState.INTERNAL_ADDRESS_AND_PORT);
+ address =
epState.getApplicationState(ApplicationState.INTERNAL_IP);
}
if (address != null)
{
@@ -129,7 +129,11 @@ public class ReconnectableSnitchHelper implements
IEndpointStateChangeSubscriber
public void onDead(InetAddressAndPort endpoint, EndpointState state)
{
- // do nothing.
+ if (preferLocal &&
state.getApplicationState(ApplicationState.INTERNAL_ADDRESS_AND_PORT) != null)
+
state.removeApplicationState(ApplicationState.INTERNAL_ADDRESS_AND_PORT);
+ if (preferLocal &&
state.getApplicationState(ApplicationState.INTERNAL_IP) != null)
+ state.removeApplicationState(ApplicationState.INTERNAL_IP);
+ MessagingService.instance().closeOutboundNow(endpoint);
}
public void onRemove(InetAddressAndPort endpoint)
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java
b/src/java/org/apache/cassandra/net/MessagingService.java
index 71f2231eb2..cd124ad719 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -370,6 +370,13 @@ public final class MessagingService extends
MessagingServiceMBeanImpl
.addListener(future -> channelManagers.remove(to, pool));
}
+ public void closeOutboundNow(InetAddressAndPort to)
+ {
+ OutboundConnections pool = channelManagers.get(to);
+ if (pool != null)
+ closeOutboundNow(pool);
+ }
+
/**
* Only to be invoked once we believe the connections will never be used
again.
*/
diff --git a/src/java/org/apache/cassandra/net/OutboundConnectionSettings.java
b/src/java/org/apache/cassandra/net/OutboundConnectionSettings.java
index 92a1f52bc3..d9038ee487 100644
--- a/src/java/org/apache/cassandra/net/OutboundConnectionSettings.java
+++ b/src/java/org/apache/cassandra/net/OutboundConnectionSettings.java
@@ -27,7 +27,7 @@ import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.EncryptionOptions;
import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions;
-import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.locator.IEndpointSnitch;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.utils.FBUtilities;
@@ -452,7 +452,7 @@ public class OutboundConnectionSettings
{
InetAddressAndPort connectTo = this.connectTo;
if (connectTo == null)
- connectTo = SystemKeyspace.getPreferredIP(to);
+ connectTo = Gossiper.instance.getInternalAddressAndPort(to);
if (FBUtilities.getBroadcastAddressAndPort().equals(connectTo))
return FBUtilities.getLocalAddressAndPort();
return connectTo;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]