This is an automated email from the ASF dual-hosted git repository.
rustyrazorblade pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6fbf7d04 CASSANALYTICS-175: Exclude IP address from RingInstance
equality so node replacement does not fail bulk write jobs
6fbf7d04 is described below
commit 6fbf7d04d0788497b458b14ffc8282c9514c20d1
Author: Jon Haddad <[email protected]>
AuthorDate: Tue Jun 9 19:21:10 2026 -0700
CASSANALYTICS-175: Exclude IP address from RingInstance equality so node
replacement does not fail bulk write jobs
A node that is replaced and returns with a different IP address (e.g. a pod
replacement in Kubernetes) is the same logical instance, but RingInstance
equality included the IP address, so CassandraTopologyMonitor reported a
topology change and failed S3 bulk write jobs. The same comparison runs in
RecordWriter task validation and per-instance consistency accounting.
Instance identity is now clusterId, token, fqdn, rack, port and datacenter.
Patch by Jon Haddad; reviewed by Shailaja Koppu, Yifan Cai for
CASSANALYTICS-175
---
CHANGES.txt | 1 +
.../cassandra/spark/bulkwriter/RingInstance.java | 15 +++++----
.../spark/bulkwriter/RingInstanceTest.java | 15 +++++++++
.../spark/bulkwriter/TokenRangeMappingUtils.java | 38 ++++++++++++++++++++++
.../cloudstorage/CassandraTopologyMonitorTest.java | 23 +++++++++++++
5 files changed, 86 insertions(+), 6 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 58d4cf96..e1c21b0d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
0.5.0
-----
+ * Exclude IP address from RingInstance equality so node replacement does not
fail bulk write jobs (CASSANALYTICS-175)
* Regenerate bloom filters for CQLSSTableWriter (CASSANALYTICS-167)
* Avoid Spark 4 partitioning warnings during bulk reads (CASSANALYTICS-171)
* Spark 4.0 Support (CASSANALYTICS-34)
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RingInstance.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RingInstance.java
index ea8231a0..5e5ed60e 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RingInstance.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RingInstance.java
@@ -123,9 +123,12 @@ public class RingInstance implements CassandraInstance,
Serializable
}
/**
- * Custom equality that compares the token, fully qualified domain name,
the port, the datacenter and the clusterId
+ * Custom equality that compares the token, fully qualified domain name,
the rack, the port, the datacenter
+ * and the clusterId
*
- * Note that node state, status, are not part of the calculation.
+ * Note that node state, status and IP address are not part of the
calculation. The IP address is excluded
+ * because a node can come back with a different IP address (e.g. a pod
replacement in Kubernetes) while
+ * remaining the same logical instance.
*
* @param other the other instance
* @return true if both instances are equal, false otherwise
@@ -147,22 +150,22 @@ public class RingInstance implements CassandraInstance,
Serializable
&& Objects.equals(ringEntry.token(), that.ringEntry.token())
&& Objects.equals(ringEntry.fqdn(), that.ringEntry.fqdn())
&& Objects.equals(ringEntry.rack(), that.ringEntry.rack())
- && Objects.equals(ringEntry.address(), that.ringEntry.address())
&& ringEntry.port() == that.ringEntry.port()
&& Objects.equals(ringEntry.datacenter(),
that.ringEntry.datacenter());
}
/**
- * Custom hashCode that compares the token, fully qualified domain name,
the port, and the datacenter
+ * Custom hashCode that hashes the token, fully qualified domain name, the
rack, the port, the datacenter
+ * and the clusterId
*
- * Note that node state and status are not part of the calculation.
+ * Note that node state, status and IP address are not part of the
calculation.
*
* @return The hashcode of this instance based on the important fields
*/
@Override
public int hashCode()
{
- return Objects.hash(clusterId, ringEntry.token(), ringEntry.fqdn(),
ringEntry.rack(), ringEntry.port(), ringEntry.datacenter(),
ringEntry.address());
+ return Objects.hash(clusterId, ringEntry.token(), ringEntry.fqdn(),
ringEntry.rack(), ringEntry.port(), ringEntry.datacenter());
}
@Override
diff --git
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RingInstanceTest.java
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RingInstanceTest.java
index 97362ec3..b73cc272 100644
---
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RingInstanceTest.java
+++
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RingInstanceTest.java
@@ -99,6 +99,21 @@ public class RingInstanceTest
assertThat(instance1.hashCode()).isEqualTo(instance2.hashCode());
}
+ @Test
+ public void testEqualsAndHashcodeIgnoreIpAddress()
+ {
+ // The same logical instance can come back with a different IP address,
+ // e.g. when Kubernetes replaces a pod; it should compare equal
+ RingInstance instance1 = new RingInstance(mockRingEntryBuilder()
+ .address("127.0.0.1")
+ .build());
+ RingInstance instance2 = new RingInstance(mockRingEntryBuilder()
+ .address("127.0.0.2")
+ .build());
+ assertThat(instance1).isEqualTo(instance2);
+ assertThat(instance1.hashCode()).isEqualTo(instance2.hashCode());
+ }
+
@Test
public void testEqualsAndHashcodeConsidersClusterId()
{
diff --git
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TokenRangeMappingUtils.java
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TokenRangeMappingUtils.java
index 64ee81b7..4864144e 100644
---
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TokenRangeMappingUtils.java
+++
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TokenRangeMappingUtils.java
@@ -85,6 +85,44 @@ public final class TokenRangeMappingUtils
new HashSet<>(instances));
}
+ /**
+ * Builds the same topology as {@link #buildTokenRangeMapping(int,
ImmutableMap, int)}, but every instance
+ * has a different IP address, simulating nodes coming back with new IPs,
e.g. pod replacement in Kubernetes
+ */
+ public static TokenRangeMapping<RingInstance>
buildTokenRangeMappingWithChangedIpAddresses(int initialToken,
+
ImmutableMap<String, Integer> rfByDC,
+
int instancesPerDC)
+ {
+ List<RingInstance> instances = getInstances(initialToken, rfByDC,
instancesPerDC)
+ .stream()
+
.map(TokenRangeMappingUtils::withChangedIpAddress)
+ .collect(Collectors.toList());
+ ReplicationFactor replicationFactor = getReplicationFactor(rfByDC);
+ Multimap<RingInstance, Range<BigInteger>> tokenRanges =
setupTokenRangeMap(Partitioner.Murmur3Partitioner, replicationFactor,
instances);
+ return new TokenRangeMapping<>(Partitioner.Murmur3Partitioner,
+ tokenRanges,
+ new HashSet<>(instances));
+ }
+
+ private static RingInstance withChangedIpAddress(RingInstance instance)
+ {
+ RingEntry entry = instance.ringEntry();
+ RingEntry newEntry = new RingEntry.Builder()
+ .datacenter(entry.datacenter())
+ .port(entry.port())
+ .address(entry.address().replace("127.", "10."))
+ .status(entry.status())
+ .state(entry.state())
+ .token(entry.token())
+ .fqdn(entry.fqdn())
+ .rack(entry.rack())
+ .owns(entry.owns())
+ .load(entry.load())
+ .hostId(entry.hostId())
+ .build();
+ return new RingInstance(newEntry);
+ }
+
public static TokenRangeMapping<RingInstance> buildTokenRangeMapping(int
initialToken,
ImmutableMap<String, Integer> rfByDC,
int
instancesPerDC,
diff --git
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/CassandraTopologyMonitorTest.java
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/CassandraTopologyMonitorTest.java
index 0af57dcf..4ed3b67b 100644
---
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/CassandraTopologyMonitorTest.java
+++
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/CassandraTopologyMonitorTest.java
@@ -59,8 +59,31 @@ class CassandraTopologyMonitorTest
assertThat(noChange.get()).isFalse();
}
+ // In environments where nodes are not bound to fixed IP addresses, a node
that goes down can
+ // come back with a new IP while remaining the same logical instance —
same hostname, tokens and
+ // data. This is routine in Kubernetes: a rescheduled pod keeps its
identity but gets a new IP.
+ // The write is still correct, so the monitor must not report a topology
change and cancel the
+ // job when instances differ only by IP address.
+ @Test
+ void testIpAddressChangeIsNotTopologyChange()
+ {
+ ClusterInfo mockClusterInfo = mock(ClusterInfo.class);
+ when(mockClusterInfo.getTokenRangeMapping(false))
+ .thenReturn(buildTopology(10))
+ .thenReturn(buildTopologyWithChangedIpAddresses(10)); // same
instances, new IP addresses
+ AtomicBoolean noChange = new AtomicBoolean(true);
+ CassandraTopologyMonitor monitor = new
CassandraTopologyMonitor(mockClusterInfo, event -> noChange.set(false));
+ monitor.checkTopologyOnDemand();
+ assertThat(noChange.get()).isTrue();
+ }
+
private TokenRangeMapping<RingInstance> buildTopology(int instancesCount)
{
return TokenRangeMappingUtils.buildTokenRangeMapping(0,
ImmutableMap.of("DC1", 3), instancesCount);
}
+
+ private TokenRangeMapping<RingInstance>
buildTopologyWithChangedIpAddresses(int instancesCount)
+ {
+ return
TokenRangeMappingUtils.buildTokenRangeMappingWithChangedIpAddresses(0,
ImmutableMap.of("DC1", 3), instancesCount);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]