This is an automated email from the ASF dual-hosted git repository.

rustyrazorblade pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6fbf7d04 CASSANALYTICS-175: Exclude IP address from RingInstance 
equality so node replacement does not fail bulk write jobs
6fbf7d04 is described below

commit 6fbf7d04d0788497b458b14ffc8282c9514c20d1
Author: Jon Haddad <[email protected]>
AuthorDate: Tue Jun 9 19:21:10 2026 -0700

    CASSANALYTICS-175: Exclude IP address from RingInstance equality so node 
replacement does not fail bulk write jobs
    
    A node that is replaced and returns with a different IP address (e.g. a pod
    replacement in Kubernetes) is the same logical instance, but RingInstance
    equality included the IP address, so CassandraTopologyMonitor reported a
    topology change and failed S3 bulk write jobs. The same comparison runs in
    RecordWriter task validation and per-instance consistency accounting.
    
    Instance identity is now clusterId, token, fqdn, rack, port and datacenter.
    
    Patch by Jon Haddad; reviewed by Shailaja Koppu, Yifan Cai for 
CASSANALYTICS-175
---
 CHANGES.txt                                        |  1 +
 .../cassandra/spark/bulkwriter/RingInstance.java   | 15 +++++----
 .../spark/bulkwriter/RingInstanceTest.java         | 15 +++++++++
 .../spark/bulkwriter/TokenRangeMappingUtils.java   | 38 ++++++++++++++++++++++
 .../cloudstorage/CassandraTopologyMonitorTest.java | 23 +++++++++++++
 5 files changed, 86 insertions(+), 6 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 58d4cf96..e1c21b0d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
 0.5.0
 -----
+ * Exclude IP address from RingInstance equality so node replacement does not 
fail bulk write jobs (CASSANALYTICS-175)
  * Regenerate bloom filters for CQLSSTableWriter (CASSANALYTICS-167)
  * Avoid Spark 4 partitioning warnings during bulk reads (CASSANALYTICS-171)
  * Spark 4.0 Support (CASSANALYTICS-34)
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RingInstance.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RingInstance.java
index ea8231a0..5e5ed60e 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RingInstance.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RingInstance.java
@@ -123,9 +123,12 @@ public class RingInstance implements CassandraInstance, 
Serializable
     }
 
     /**
-     * Custom equality that compares the token, fully qualified domain name, 
the port, the datacenter and the clusterId
+     * Custom equality that compares the token, fully qualified domain name, 
the rack, the port, the datacenter
+     * and the clusterId
      *
-     * Note that node state, status,  are not part of the calculation.
+     * Note that node state, status and IP address are not part of the 
calculation. The IP address is excluded
+     * because a node can come back with a different IP address (e.g. a pod 
replacement in Kubernetes) while
+     * remaining the same logical instance.
      *
      * @param other the other instance
      * @return true if both instances are equal, false otherwise
@@ -147,22 +150,22 @@ public class RingInstance implements CassandraInstance, 
Serializable
                && Objects.equals(ringEntry.token(), that.ringEntry.token())
                && Objects.equals(ringEntry.fqdn(), that.ringEntry.fqdn())
                && Objects.equals(ringEntry.rack(), that.ringEntry.rack())
-               && Objects.equals(ringEntry.address(), that.ringEntry.address())
                && ringEntry.port() == that.ringEntry.port()
                && Objects.equals(ringEntry.datacenter(), 
that.ringEntry.datacenter());
     }
 
     /**
-     * Custom hashCode that compares the token, fully qualified domain name, 
the port, and the datacenter
+     * Custom hashCode that hashes the token, fully qualified domain name, the 
rack, the port, the datacenter
+     * and the clusterId
      *
-     * Note that node state and status are not part of the calculation.
+     * Note that node state, status and IP address are not part of the 
calculation.
      *
      * @return The hashcode of this instance based on the important fields
      */
     @Override
     public int hashCode()
     {
-        return Objects.hash(clusterId, ringEntry.token(), ringEntry.fqdn(), 
ringEntry.rack(), ringEntry.port(), ringEntry.datacenter(), 
ringEntry.address());
+        return Objects.hash(clusterId, ringEntry.token(), ringEntry.fqdn(), 
ringEntry.rack(), ringEntry.port(), ringEntry.datacenter());
     }
 
     @Override
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RingInstanceTest.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RingInstanceTest.java
index 97362ec3..b73cc272 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RingInstanceTest.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RingInstanceTest.java
@@ -99,6 +99,21 @@ public class RingInstanceTest
         assertThat(instance1.hashCode()).isEqualTo(instance2.hashCode());
     }
 
+    @Test
+    public void testEqualsAndHashcodeIgnoreIpAddress()
+    {
+        // The same logical instance can come back with a different IP address,
+        // e.g. when Kubernetes replaces a pod; it should compare equal
+        RingInstance instance1 = new RingInstance(mockRingEntryBuilder()
+                                                  .address("127.0.0.1")
+                                                  .build());
+        RingInstance instance2 = new RingInstance(mockRingEntryBuilder()
+                                                  .address("127.0.0.2")
+                                                  .build());
+        assertThat(instance1).isEqualTo(instance2);
+        assertThat(instance1.hashCode()).isEqualTo(instance2.hashCode());
+    }
+
     @Test
     public void testEqualsAndHashcodeConsidersClusterId()
     {
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TokenRangeMappingUtils.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TokenRangeMappingUtils.java
index 64ee81b7..4864144e 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TokenRangeMappingUtils.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TokenRangeMappingUtils.java
@@ -85,6 +85,44 @@ public final class TokenRangeMappingUtils
                                        new HashSet<>(instances));
     }
 
+    /**
+     * Builds the same topology as {@link #buildTokenRangeMapping(int, 
ImmutableMap, int)}, but every instance
+     * has a different IP address, simulating nodes coming back with new IPs, 
e.g. pod replacement in Kubernetes
+     */
+    public static TokenRangeMapping<RingInstance> 
buildTokenRangeMappingWithChangedIpAddresses(int initialToken,
+                                                                               
                ImmutableMap<String, Integer> rfByDC,
+                                                                               
                int instancesPerDC)
+    {
+        List<RingInstance> instances = getInstances(initialToken, rfByDC, 
instancesPerDC)
+                                       .stream()
+                                       
.map(TokenRangeMappingUtils::withChangedIpAddress)
+                                       .collect(Collectors.toList());
+        ReplicationFactor replicationFactor = getReplicationFactor(rfByDC);
+        Multimap<RingInstance, Range<BigInteger>> tokenRanges = 
setupTokenRangeMap(Partitioner.Murmur3Partitioner, replicationFactor, 
instances);
+        return new TokenRangeMapping<>(Partitioner.Murmur3Partitioner,
+                                       tokenRanges,
+                                       new HashSet<>(instances));
+    }
+
+    private static RingInstance withChangedIpAddress(RingInstance instance)
+    {
+        RingEntry entry = instance.ringEntry();
+        RingEntry newEntry = new RingEntry.Builder()
+                             .datacenter(entry.datacenter())
+                             .port(entry.port())
+                             .address(entry.address().replace("127.", "10."))
+                             .status(entry.status())
+                             .state(entry.state())
+                             .token(entry.token())
+                             .fqdn(entry.fqdn())
+                             .rack(entry.rack())
+                             .owns(entry.owns())
+                             .load(entry.load())
+                             .hostId(entry.hostId())
+                             .build();
+        return new RingInstance(newEntry);
+    }
+
     public static TokenRangeMapping<RingInstance> buildTokenRangeMapping(int 
initialToken,
                                                                          
ImmutableMap<String, Integer> rfByDC,
                                                                          int 
instancesPerDC,
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/CassandraTopologyMonitorTest.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/CassandraTopologyMonitorTest.java
index 0af57dcf..4ed3b67b 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/CassandraTopologyMonitorTest.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/CassandraTopologyMonitorTest.java
@@ -59,8 +59,31 @@ class CassandraTopologyMonitorTest
         assertThat(noChange.get()).isFalse();
     }
 
+    // In environments where nodes are not bound to fixed IP addresses, a node 
that goes down can
+    // come back with a new IP while remaining the same logical instance — 
same hostname, tokens and
+    // data. This is routine in Kubernetes: a rescheduled pod keeps its 
identity but gets a new IP.
+    // The write is still correct, so the monitor must not report a topology 
change and cancel the
+    // job when instances differ only by IP address.
+    @Test
+    void testIpAddressChangeIsNotTopologyChange()
+    {
+        ClusterInfo mockClusterInfo = mock(ClusterInfo.class);
+        when(mockClusterInfo.getTokenRangeMapping(false))
+        .thenReturn(buildTopology(10))
+        .thenReturn(buildTopologyWithChangedIpAddresses(10)); // same 
instances, new IP addresses
+        AtomicBoolean noChange = new AtomicBoolean(true);
+        CassandraTopologyMonitor monitor = new 
CassandraTopologyMonitor(mockClusterInfo, event -> noChange.set(false));
+        monitor.checkTopologyOnDemand();
+        assertThat(noChange.get()).isTrue();
+    }
+
     private TokenRangeMapping<RingInstance> buildTopology(int instancesCount)
     {
         return TokenRangeMappingUtils.buildTokenRangeMapping(0, 
ImmutableMap.of("DC1", 3), instancesCount);
     }
+
+    private TokenRangeMapping<RingInstance> 
buildTopologyWithChangedIpAddresses(int instancesCount)
+    {
+        return 
TokenRangeMappingUtils.buildTokenRangeMappingWithChangedIpAddresses(0, 
ImmutableMap.of("DC1", 3), instancesCount);
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to