This is an automated email from the ASF dual-hosted git repository.

jkonisa pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f69e6a9e CASSSIDECAR-406: Fix ClusterTopologyMonitor delay parsing 
error due to missing time unit suffix (#316)
f69e6a9e is described below

commit f69e6a9ef0ca355667075713f48ac74cedcfd8af
Author: Jyothsna konisa <[email protected]>
AuthorDate: Wed Feb 11 13:29:25 2026 -0800

    CASSSIDECAR-406: Fix ClusterTopologyMonitor delay parsing error due to 
missing time unit suffix (#316)
    
    Patch by Jyothsna Konisa; Reviewed by Yifan Cai, Bernardo Botella, Josh 
McKenzie and Francisco Guerrero  for CASSSIDECAR-406
---
 CHANGES.txt                                        |   1 +
 conf/sidecar.yaml                                  |   7 +
 server/build.gradle                                |   1 +
 .../sidecar/config/SidecarConfiguration.java       |   5 +
 .../config/yaml/SidecarConfigurationImpl.java      |  29 ++
 .../sidecar/tasks/ClusterTopologyMonitor.java      |  25 +-
 .../sidecar/tasks/ClusterTopologyMonitorTest.java  | 495 +++++++++++++++++++++
 7 files changed, 558 insertions(+), 5 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 0a423300..674c6ccf 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
 0.3.0
 -----
+ * Fix ClusterTopologyMonitor delay parsing error due to missing time unit 
suffix (CASSSIDECAR-406)
  * Adding storage_port to instance configuration and metadata (CASSSIDECAR-399)
  * Sidecar endpoint to support on-demand repair operation (CASSSIDECAR-268)
  * Database access during auth flow is blocking event-loop thread 
(CASSSIDECAR-369)
diff --git a/conf/sidecar.yaml b/conf/sidecar.yaml
index 1ca7b0f2..62e52185 100644
--- a/conf/sidecar.yaml
+++ b/conf/sidecar.yaml
@@ -344,6 +344,13 @@ healthcheck:
   initial_delay: 0ms
   execute_interval: 30s
 
+# Cluster Topology Monitor settings
+# Monitors Cassandra cluster topology changes and publishes events when 
detected
+cluster_topology_monitor:
+  enabled: true
+  initial_delay: 0s
+  execute_interval: 1000ms  # Interval between topology refresh cycles
+
 # Sidecar Peer Health Monitor settings
 # Enables a periodic task checking for the health of adjacent Sidecar peers in 
the token ring
 sidecar_peer_health:
diff --git a/server/build.gradle b/server/build.gradle
index 97cac99a..f0a5edc6 100644
--- a/server/build.gradle
+++ b/server/build.gradle
@@ -215,6 +215,7 @@ test {
     systemProperty "vertx.logger-delegate-factory-class-name", 
"io.vertx.core.logging.SLF4JLogDelegateFactory"
     // There is no native lib (JNR) for getting time for testing
     systemProperty "com.datastax.driver.USE_NATIVE_CLOCK", "false"
+    systemProperty "project.root", rootProject.rootDir.absolutePath
 
     // ordinarily we don't need integration tests
     // see the integrationTest task
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/config/SidecarConfiguration.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/config/SidecarConfiguration.java
index c2355a91..82f6cda3 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/config/SidecarConfiguration.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/config/SidecarConfiguration.java
@@ -70,6 +70,11 @@ public interface SidecarConfiguration
      */
     PeriodicTaskConfiguration healthCheckConfiguration();
 
+    /**
+     * @return the configuration for the cluster topology monitor
+     */
+    PeriodicTaskConfiguration clusterTopologyMonitorConfiguration();
+
     /**
      * @return configuration needed for metrics capture
      */
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/SidecarConfigurationImpl.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/SidecarConfigurationImpl.java
index 392c12e0..ef364cf1 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/SidecarConfigurationImpl.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/SidecarConfigurationImpl.java
@@ -93,6 +93,9 @@ public class SidecarConfigurationImpl implements 
SidecarConfiguration
     @JsonProperty("healthcheck")
     protected final PeriodicTaskConfiguration healthCheckConfiguration;
 
+    @JsonProperty("cluster_topology_monitor")
+    protected final PeriodicTaskConfiguration 
clusterTopologyMonitorConfiguration;
+
     @JsonProperty("metrics")
     protected final MetricsConfiguration metricsConfiguration;
 
@@ -132,6 +135,7 @@ public class SidecarConfigurationImpl implements 
SidecarConfiguration
         sslConfiguration = builder.sslConfiguration;
         accessControlConfiguration = builder.accessControlConfiguration;
         healthCheckConfiguration = builder.healthCheckConfiguration;
+        clusterTopologyMonitorConfiguration = 
builder.clusterTopologyMonitorConfiguration;
         sidecarPeerHealthConfiguration = 
builder.sidecarPeerHealthConfiguration;
         sidecarClientConfiguration = builder.sidecarClientConfiguration;
         metricsConfiguration = builder.metricsConfiguration;
@@ -221,6 +225,16 @@ public class SidecarConfigurationImpl implements 
SidecarConfiguration
         return healthCheckConfiguration;
     }
 
+    /**
+     * @return the configuration for the cluster topology monitor
+     */
+    @Override
+    @JsonProperty("cluster_topology_monitor")
+    public PeriodicTaskConfiguration clusterTopologyMonitorConfiguration()
+    {
+        return clusterTopologyMonitorConfiguration;
+    }
+
     /**
      * @return the configuration for the down detector service
      */
@@ -415,6 +429,10 @@ public class SidecarConfigurationImpl implements 
SidecarConfiguration
         = new PeriodicTaskConfigurationImpl(true,
                                             MillisecondBoundConfiguration.ZERO,
                                             
MillisecondBoundConfiguration.parse("30s"));
+        private PeriodicTaskConfiguration clusterTopologyMonitorConfiguration
+        = new PeriodicTaskConfigurationImpl(true,
+                                            MillisecondBoundConfiguration.ZERO,
+                                            
MillisecondBoundConfiguration.parse("1000ms"));
         private SidecarPeerHealthConfiguration sidecarPeerHealthConfiguration 
= new SidecarPeerHealthConfigurationImpl();
         private SidecarClientConfiguration sidecarClientConfiguration = new 
SidecarClientConfigurationImpl();
         private MetricsConfiguration metricsConfiguration = new 
MetricsConfigurationImpl();
@@ -503,6 +521,17 @@ public class SidecarConfigurationImpl implements 
SidecarConfiguration
             return update(b -> b.healthCheckConfiguration = 
healthCheckConfiguration);
         }
 
+        /**
+         * Sets the {@code clusterTopologyMonitorConfiguration} and returns a 
reference to this Builder enabling method chaining.
+         *
+         * @param clusterTopologyMonitorConfiguration the {@code 
clusterTopologyMonitorConfiguration} to set
+         * @return a reference to this Builder
+         */
+        public Builder 
clusterTopologyMonitorConfiguration(PeriodicTaskConfiguration 
clusterTopologyMonitorConfiguration)
+        {
+            return update(b -> b.clusterTopologyMonitorConfiguration = 
clusterTopologyMonitorConfiguration);
+        }
+
         /**
          * Sets the {@code healthCheckConfiguration} and returns a reference 
to this Builder enabling method chaining.
          *
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/tasks/ClusterTopologyMonitor.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/tasks/ClusterTopologyMonitor.java
index 4b77155e..8b9677a6 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/tasks/ClusterTopologyMonitor.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/tasks/ClusterTopologyMonitor.java
@@ -37,7 +37,8 @@ import 
org.apache.cassandra.sidecar.codecs.DcLocalTopologyChangeEventCodec;
 
 import org.apache.cassandra.sidecar.common.server.cluster.locator.TokenRange;
 import org.apache.cassandra.sidecar.common.server.utils.DurationSpec;
-import 
org.apache.cassandra.sidecar.common.server.utils.MillisecondBoundConfiguration;
+import org.apache.cassandra.sidecar.config.PeriodicTaskConfiguration;
+import org.apache.cassandra.sidecar.config.SidecarConfiguration;
 import org.apache.cassandra.sidecar.coordination.TokenRingProvider;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
@@ -65,6 +66,7 @@ public class ClusterTopologyMonitor implements PeriodicTask
     }
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(ClusterTopologyMonitor.class);
+    private final PeriodicTaskConfiguration configuration;
     private final Vertx vertx;
     private final TokenRingProvider tokenRingProvider;
 
@@ -137,9 +139,12 @@ public class ClusterTopologyMonitor implements PeriodicTask
      * Creates a cluster topology monitor with periodic task scheduling.
      */
     @Inject
-    public ClusterTopologyMonitor(Vertx vertx, TokenRingProvider 
tokenRingProvider, PeriodicTaskExecutor periodicTaskExecutor)
+    public ClusterTopologyMonitor(Vertx vertx,
+                                  TokenRingProvider tokenRingProvider,
+                                  PeriodicTaskExecutor periodicTaskExecutor,
+                                  SidecarConfiguration configuration)
     {
-        this(vertx, tokenRingProvider);
+        this(vertx, tokenRingProvider, configuration);
         LOGGER.info("Starting Cluster Topology Monitor");
         periodicTaskExecutor.schedule(this);
         LOGGER.info("Cluster Topology Monitor started");
@@ -148,8 +153,9 @@ public class ClusterTopologyMonitor implements PeriodicTask
     /**
      * Creates a cluster topology monitor without scheduling.
      */
-    public ClusterTopologyMonitor(Vertx vertx, TokenRingProvider 
tokenRingProvider)
+    public ClusterTopologyMonitor(Vertx vertx, TokenRingProvider 
tokenRingProvider, SidecarConfiguration configuration)
     {
+        this.configuration = 
configuration.clusterTopologyMonitorConfiguration();
         this.vertx = vertx;
         this.tokenRingProvider = tokenRingProvider;
         try
@@ -165,13 +171,22 @@ public class ClusterTopologyMonitor implements 
PeriodicTask
         }
     }
 
+    /**
+     * Returns the initial delay before the first execution.
+     */
+    @Override
+    public DurationSpec initialDelay()
+    {
+        return configuration.initialDelay();
+    }
+
     /**
      * Returns the delay between topology refresh cycles.
      */
     @Override
     public DurationSpec delay()
     {
-        return MillisecondBoundConfiguration.parse("1000");
+        return configuration.executeInterval();
     }
 
     /**
diff --git 
a/server/src/test/java/org/apache/cassandra/sidecar/tasks/ClusterTopologyMonitorTest.java
 
b/server/src/test/java/org/apache/cassandra/sidecar/tasks/ClusterTopologyMonitorTest.java
new file mode 100644
index 00000000..5ac9be8f
--- /dev/null
+++ 
b/server/src/test/java/org/apache/cassandra/sidecar/tasks/ClusterTopologyMonitorTest.java
@@ -0,0 +1,495 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.tasks;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import io.vertx.core.Promise;
+import io.vertx.core.Vertx;
+import io.vertx.junit5.Checkpoint;
+import io.vertx.junit5.VertxExtension;
+import io.vertx.junit5.VertxTestContext;
+import org.apache.cassandra.sidecar.common.server.cluster.locator.TokenRange;
+import 
org.apache.cassandra.sidecar.common.server.utils.MillisecondBoundConfiguration;
+import org.apache.cassandra.sidecar.config.PeriodicTaskConfiguration;
+import org.apache.cassandra.sidecar.config.SidecarConfiguration;
+import org.apache.cassandra.sidecar.config.yaml.SidecarConfigurationImpl;
+import org.apache.cassandra.sidecar.coordination.TokenRingProvider;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Unit tests for the {@link ClusterTopologyMonitor}
+ */
+@ExtendWith(VertxExtension.class)
+class ClusterTopologyMonitorTest
+{
+    SidecarConfiguration mockConfiguration;
+    PeriodicTaskConfiguration mockClusterTopologyMonitorConfiguration;
+    TokenRingProvider mockTokenRingProvider;
+    ClusterTopologyMonitor monitor;
+    Vertx vertx;
+
+    @BeforeEach
+    void setup(Vertx vertx)
+    {
+        this.vertx = vertx;
+        mockConfiguration = mock(SidecarConfiguration.class);
+        mockClusterTopologyMonitorConfiguration = 
mock(PeriodicTaskConfiguration.class);
+        mockTokenRingProvider = mock(TokenRingProvider.class);
+
+        
when(mockConfiguration.clusterTopologyMonitorConfiguration()).thenReturn(mockClusterTopologyMonitorConfiguration);
+        
when(mockClusterTopologyMonitorConfiguration.initialDelay()).thenReturn(MillisecondBoundConfiguration.parse("5s"));
+        
when(mockClusterTopologyMonitorConfiguration.executeInterval()).thenReturn(MillisecondBoundConfiguration.parse("500s"));
+
+        monitor = new ClusterTopologyMonitor(vertx, mockTokenRingProvider, 
mockConfiguration);
+    }
+
+    @AfterEach
+    void tearDown(VertxTestContext context)
+    {
+        vertx.close(context.succeedingThenComplete());
+    }
+
+    @Test
+    void testConfiguration()
+    {
+        
assertThat(monitor.initialDelay()).isEqualTo(MillisecondBoundConfiguration.parse("5s"));
+        
assertThat(monitor.delay()).isEqualTo(MillisecondBoundConfiguration.parse("500s"));
+
+        
when(mockClusterTopologyMonitorConfiguration.initialDelay()).thenReturn(MillisecondBoundConfiguration.parse("0s"));
+        
when(mockClusterTopologyMonitorConfiguration.executeInterval()).thenReturn(MillisecondBoundConfiguration.parse("2000s"));
+        ClusterTopologyMonitor customMonitor = new 
ClusterTopologyMonitor(vertx, mockTokenRingProvider, mockConfiguration);
+
+        
assertThat(customMonitor.initialDelay()).isEqualTo(MillisecondBoundConfiguration.parse("0s"));
+        
assertThat(customMonitor.delay()).isEqualTo(MillisecondBoundConfiguration.parse("2000s"));
+    }
+
+    @Test
+    void testConfigurationFromYaml() throws IOException
+    {
+        // Get project root from system property set by Gradle
+        Path projectRoot = Paths.get(System.getProperty("project.root"));
+        Path configPath = projectRoot.resolve("conf/sidecar.yaml");
+        assertThat(configPath).exists();
+
+        SidecarConfiguration config = 
SidecarConfigurationImpl.readYamlConfiguration(configPath);
+
+        PeriodicTaskConfiguration clusterTopologyConfig = 
config.clusterTopologyMonitorConfiguration();
+
+        assertThat(clusterTopologyConfig).isNotNull();
+        assertThat(clusterTopologyConfig.enabled()).isTrue();
+        
assertThat(clusterTopologyConfig.initialDelay()).isEqualTo(MillisecondBoundConfiguration.parse("0s"));
+        
assertThat(clusterTopologyConfig.executeInterval()).isEqualTo(MillisecondBoundConfiguration.parse("1000ms"));
+    }
+
+    @Test
+    void testInitialBootstrapPublishesEvent(VertxTestContext context)
+    {
+        Map<String, List<TokenRange>> topology = createTopology("instance1", 
0, 100);
+        setupMocksForSingleDc("dc1", topology);
+
+        Checkpoint checkpoint = context.checkpoint();
+
+        
vertx.eventBus().consumer(ClusterTopologyMonitor.ClusterTopologyEventType.ON_DC_TOPOLOGY_CHANGE.address(),
 message -> {
+            ClusterTopologyMonitor.DcLocalTopologyChangeEvent event =
+                (ClusterTopologyMonitor.DcLocalTopologyChangeEvent) 
message.body();
+
+            context.verify(() -> {
+                assertThat(event.dc).isEqualTo("dc1");
+                assertThat(event.prev).isNull();
+                assertThat(event.curr).isNotNull().hasSize(1);
+                assertThat(event.curr).containsKey("instance1");
+                assertThat(event.curr.get("instance1")).hasSize(1);
+            });
+            checkpoint.flag();
+        });
+
+        monitor.execute(Promise.promise());
+    }
+
+    @Test
+    void testNoTopologyChangeDoesNotPublishEvent(VertxTestContext context)
+    {
+        Map<String, List<TokenRange>> topology = createTopology("instance1", 
0, 100);
+        setupMocksForSingleDc("dc1", topology);
+
+        List<ClusterTopologyMonitor.DcLocalTopologyChangeEvent> events = new 
ArrayList<>();
+
+        
vertx.eventBus().consumer(ClusterTopologyMonitor.ClusterTopologyEventType.ON_DC_TOPOLOGY_CHANGE.address(),
 message -> {
+            events.add((ClusterTopologyMonitor.DcLocalTopologyChangeEvent) 
message.body());
+        });
+
+        // First execution - should publish bootstrap event
+        monitor.execute(Promise.promise());
+
+        // Wait a bit, then execute again with same topology - should not 
publish
+        vertx.setTimer(200, id1 -> {
+            monitor.execute(Promise.promise());
+
+            // Wait again to ensure no second event
+            vertx.setTimer(200, id2 -> {
+                context.verify(() -> {
+                    assertThat(events).hasSize(1); // Only bootstrap event
+                    assertThat(events.get(0).prev).isNull();
+                });
+                context.completeNow();
+            });
+        });
+    }
+
+    @Test
+    void testTopologyChangeWhenInstanceAdded(VertxTestContext context)
+    {
+        Map<String, List<TokenRange>> initial = createTopology("instance1", 0, 
100);
+        Map<String, List<TokenRange>> withNewInstance = new HashMap<>(initial);
+        withNewInstance.put("instance2", List.of(new TokenRange(100, 200)));
+
+        when(mockTokenRingProvider.dcs()).thenReturn(Set.of("dc1"));
+        when(mockTokenRingProvider.getPrimaryTokenRanges("dc1"))
+            .thenReturn(initial)
+            .thenReturn(withNewInstance);
+
+        List<ClusterTopologyMonitor.DcLocalTopologyChangeEvent> events = new 
ArrayList<>();
+
+        
vertx.eventBus().consumer(ClusterTopologyMonitor.ClusterTopologyEventType.ON_DC_TOPOLOGY_CHANGE.address(),
 message -> {
+            events.add((ClusterTopologyMonitor.DcLocalTopologyChangeEvent) 
message.body());
+        });
+
+        // Bootstrap
+        monitor.execute(Promise.promise());
+
+        vertx.setTimer(200, id1 -> {
+            // Add new instance
+            monitor.execute(Promise.promise());
+
+            vertx.setTimer(200, id2 -> {
+                context.verify(() -> {
+                    assertThat(events).hasSize(2);
+
+                    // First event - bootstrap
+                    assertThat(events.get(0).dc).isEqualTo("dc1");
+                    assertThat(events.get(0).prev).isNull();
+                    assertThat(events.get(0).curr).hasSize(1);
+
+                    // Second event - instance added
+                    assertThat(events.get(1).dc).isEqualTo("dc1");
+                    assertThat(events.get(1).prev).isNotNull().hasSize(1);
+                    assertThat(events.get(1).curr).hasSize(2);
+                    assertThat(events.get(1).curr).containsKeys("instance1", 
"instance2");
+                });
+                context.completeNow();
+            });
+        });
+    }
+
+    @Test
+    void testTopologyChangeWhenInstanceRemoved(VertxTestContext context)
+    {
+        Map<String, List<TokenRange>> initial = createTopology("instance1", 0, 
100);
+        initial.put("instance2", List.of(new TokenRange(100, 200)));
+
+        Map<String, List<TokenRange>> afterRemoval = 
createTopology("instance1", 0, 100);
+
+        when(mockTokenRingProvider.dcs()).thenReturn(Set.of("dc1"));
+        when(mockTokenRingProvider.getPrimaryTokenRanges("dc1"))
+            .thenReturn(initial)
+            .thenReturn(afterRemoval);
+
+        List<ClusterTopologyMonitor.DcLocalTopologyChangeEvent> events = new 
ArrayList<>();
+
+        
vertx.eventBus().consumer(ClusterTopologyMonitor.ClusterTopologyEventType.ON_DC_TOPOLOGY_CHANGE.address(),
 message -> {
+            events.add((ClusterTopologyMonitor.DcLocalTopologyChangeEvent) 
message.body());
+        });
+
+        monitor.execute(Promise.promise());
+
+        vertx.setTimer(200, id1 -> {
+            monitor.execute(Promise.promise());
+
+            vertx.setTimer(200, id2 -> {
+                context.verify(() -> {
+                    assertThat(events).hasSize(2);
+                    assertThat(events.get(1).prev).hasSize(2);
+                    assertThat(events.get(1).curr).hasSize(1);
+                    assertThat(events.get(1).curr).containsKey("instance1");
+                    
assertThat(events.get(1).curr).doesNotContainKey("instance2");
+                });
+                context.completeNow();
+            });
+        });
+    }
+
+    @Test
+    void testTopologyChangeWhenTokenRangesChange(VertxTestContext context)
+    {
+        Map<String, List<TokenRange>> initial = createTopology("instance1", 0, 
100);
+        Map<String, List<TokenRange>> updated = createTopology("instance1", 0, 
150);
+
+        when(mockTokenRingProvider.dcs()).thenReturn(Set.of("dc1"));
+        when(mockTokenRingProvider.getPrimaryTokenRanges("dc1"))
+            .thenReturn(initial)
+            .thenReturn(updated);
+
+        List<ClusterTopologyMonitor.DcLocalTopologyChangeEvent> events = new 
ArrayList<>();
+
+        
vertx.eventBus().consumer(ClusterTopologyMonitor.ClusterTopologyEventType.ON_DC_TOPOLOGY_CHANGE.address(),
 message -> {
+            events.add((ClusterTopologyMonitor.DcLocalTopologyChangeEvent) 
message.body());
+        });
+
+        monitor.execute(Promise.promise());
+
+        vertx.setTimer(200, id1 -> {
+            monitor.execute(Promise.promise());
+
+            vertx.setTimer(200, id2 -> {
+                context.verify(() -> {
+                    assertThat(events).hasSize(2);
+                    // Verify token range changed
+                    assertThat(events.get(1).prev).isNotNull();
+                    assertThat(events.get(1).curr).isNotNull();
+                    
assertThat(events.get(1).prev).isNotEqualTo(events.get(1).curr);
+                });
+                context.completeNow();
+            });
+        });
+    }
+
+    @Test
+    void testMultipleDatacenters(VertxTestContext context)
+    {
+        when(mockTokenRingProvider.dcs()).thenReturn(Set.of("dc1", "dc2"));
+        
when(mockTokenRingProvider.getPrimaryTokenRanges("dc1")).thenReturn(createTopology("instance1",
 0, 100));
+        
when(mockTokenRingProvider.getPrimaryTokenRanges("dc2")).thenReturn(createTopology("instance2",
 0, 100));
+
+        List<ClusterTopologyMonitor.DcLocalTopologyChangeEvent> events = new 
ArrayList<>();
+        Checkpoint checkpoint = context.checkpoint(2); // Expect 2 events
+
+        
vertx.eventBus().consumer(ClusterTopologyMonitor.ClusterTopologyEventType.ON_DC_TOPOLOGY_CHANGE.address(),
 message -> {
+            ClusterTopologyMonitor.DcLocalTopologyChangeEvent event =
+                (ClusterTopologyMonitor.DcLocalTopologyChangeEvent) 
message.body();
+            events.add(event);
+            checkpoint.flag();
+        });
+
+        monitor.execute(Promise.promise());
+
+        // Give some time for events to be processed, then verify
+        vertx.setTimer(500, id -> {
+            context.verify(() -> {
+                assertThat(events).hasSize(2);
+                assertThat(events).anyMatch(e -> "dc1".equals(e.dc));
+                assertThat(events).anyMatch(e -> "dc2".equals(e.dc));
+            });
+        });
+    }
+
+    @Test
+    void testMultipleSequentialChanges(VertxTestContext context)
+    {
+        Map<String, List<TokenRange>> topology1 = createTopology("instance1", 
0, 100);
+
+        Map<String, List<TokenRange>> topology2 = new HashMap<>(topology1);
+        topology2.put("instance2", List.of(new TokenRange(100, 200)));
+
+        Map<String, List<TokenRange>> topology3 = new HashMap<>(topology2);
+        topology3.put("instance3", List.of(new TokenRange(200, 300)));
+
+        when(mockTokenRingProvider.dcs()).thenReturn(Set.of("dc1"));
+        when(mockTokenRingProvider.getPrimaryTokenRanges("dc1"))
+            .thenReturn(topology1)
+            .thenReturn(topology2)
+            .thenReturn(topology3);
+
+        List<ClusterTopologyMonitor.DcLocalTopologyChangeEvent> events = new 
ArrayList<>();
+
+        
vertx.eventBus().consumer(ClusterTopologyMonitor.ClusterTopologyEventType.ON_DC_TOPOLOGY_CHANGE.address(),
 message -> {
+            events.add((ClusterTopologyMonitor.DcLocalTopologyChangeEvent) 
message.body());
+        });
+
+        monitor.execute(Promise.promise());
+
+        vertx.setTimer(200, id1 -> {
+            monitor.execute(Promise.promise());
+
+            vertx.setTimer(200, id2 -> {
+                monitor.execute(Promise.promise());
+
+                vertx.setTimer(200, id3 -> {
+                    context.verify(() -> {
+                        assertThat(events).hasSize(3);
+                        assertThat(events.get(0).curr).hasSize(1);
+                        assertThat(events.get(1).curr).hasSize(2);
+                        assertThat(events.get(2).curr).hasSize(3);
+                    });
+                    context.completeNow();
+                });
+            });
+        });
+    }
+
+    @Test
+    void testExecuteHandlesExceptionGracefully(VertxTestContext context)
+    {
+        when(mockTokenRingProvider.dcs()).thenThrow(new RuntimeException("Test 
exception"));
+
+        Promise<Void> promise = Promise.promise();
+        monitor.execute(promise);
+
+        promise.future().onComplete(ar -> {
+            context.verify(() -> assertThat(ar.succeeded()).isTrue());
+            context.completeNow();
+        });
+    }
+
+    @Test
+    void testChangeInInstanceTopologyDetectsChanges()
+    {
+        Map<String, List<TokenRange>> prev = createTopology("instance1", 0, 
100);
+        Map<String, List<TokenRange>> curr;
+
+        // Adding instance
+        curr = createTopology("instance1", 0, 100);
+        curr.put("instance2", List.of(new TokenRange(100, 200)));
+        assertThat(ClusterTopologyMonitor.changeInInstanceTopology(prev, 
curr)).isTrue();
+
+        // Removing instance
+        prev = createTopology("instance1", 0, 100);
+        prev.put("instance2", List.of(new TokenRange(100, 200)));
+        curr = createTopology("instance1", 0, 100);
+        assertThat(ClusterTopologyMonitor.changeInInstanceTopology(prev, 
curr)).isTrue();
+
+        // Token range change
+        prev = createTopology("instance1", 0, 100);
+        curr = createTopology("instance1", 0, 150);
+        assertThat(ClusterTopologyMonitor.changeInInstanceTopology(prev, 
curr)).isTrue();
+
+        // No change
+        prev = createTopology("instance1", 0, 100);
+        curr = createTopology("instance1", 0, 100);
+        assertThat(ClusterTopologyMonitor.changeInInstanceTopology(prev, 
curr)).isFalse();
+
+        // Bootstrap (prev is null)
+        assertThat(ClusterTopologyMonitor.changeInInstanceTopology(null, 
curr)).isTrue();
+    }
+
+    @Test
+    void testChangeInInstanceTopologyDetectsTokenRangeChanges()
+    {
+        List<TokenRange> prev, curr;
+
+        // Number of ranges changed
+        prev = List.of(new TokenRange(0, 100));
+        curr = List.of(new TokenRange(0, 50), new TokenRange(50, 100));
+        
assertThat(ClusterTopologyMonitor.changeInInstanceTopology("instance1", prev, 
curr)).isTrue();
+
+        // Lower endpoint changed
+        prev = List.of(new TokenRange(0, 100));
+        curr = List.of(new TokenRange(10, 100));
+        
assertThat(ClusterTopologyMonitor.changeInInstanceTopology("instance1", prev, 
curr)).isTrue();
+
+        // Upper endpoint changed
+        prev = List.of(new TokenRange(0, 100));
+        curr = List.of(new TokenRange(0, 150));
+        
assertThat(ClusterTopologyMonitor.changeInInstanceTopology("instance1", prev, 
curr)).isTrue();
+
+        // No change
+        prev = List.of(new TokenRange(0, 100), new TokenRange(200, 300));
+        curr = List.of(new TokenRange(0, 100), new TokenRange(200, 300));
+        
assertThat(ClusterTopologyMonitor.changeInInstanceTopology("instance1", prev, 
curr)).isFalse();
+
+        // Order doesn't matter
+        prev = List.of(new TokenRange(200, 300), new TokenRange(0, 100));
+        curr = List.of(new TokenRange(0, 100), new TokenRange(200, 300));
+        
assertThat(ClusterTopologyMonitor.changeInInstanceTopology("instance1", prev, 
curr)).isFalse();
+    }
+
+    @Test
+    void testEmptyTopology(VertxTestContext context)
+    {
+        when(mockTokenRingProvider.dcs()).thenReturn(Set.of("dc1"));
+        
when(mockTokenRingProvider.getPrimaryTokenRanges("dc1")).thenReturn(new 
HashMap<>());
+
+        List<ClusterTopologyMonitor.DcLocalTopologyChangeEvent> events = new 
ArrayList<>();
+
+        
vertx.eventBus().consumer(ClusterTopologyMonitor.ClusterTopologyEventType.ON_DC_TOPOLOGY_CHANGE.address(),
 message -> {
+            events.add((ClusterTopologyMonitor.DcLocalTopologyChangeEvent) 
message.body());
+        });
+
+        monitor.execute(Promise.promise());
+
+        vertx.setTimer(200, id -> {
+            context.verify(() -> {
+                // Should still publish bootstrap event even with empty 
topology
+                assertThat(events).hasSize(1);
+                assertThat(events.get(0).curr).isEmpty();
+            });
+            context.completeNow();
+        });
+    }
+
+    @Test
+    void testUpdateMethodBehavior()
+    {
+        String dc = "dc1";
+        Map<String, List<TokenRange>> topology1 = createTopology("instance1", 
0, 100);
+        Map<String, List<TokenRange>> topology2 = createTopology("instance1", 
0, 150);
+
+        // First update (bootstrap) should succeed
+        boolean result = monitor.update(dc, null, topology1);
+        assertThat(result).isTrue();
+
+        // Trying to update with same prev should succeed
+        result = monitor.update(dc, topology1, topology2);
+        assertThat(result).isTrue();
+
+        // Trying to update with wrong prev should fail (concurrent 
modification)
+        Map<String, List<TokenRange>> topology3 = createTopology("instance1", 
0, 200);
+        result = monitor.update(dc, topology1, topology3);
+        assertThat(result).isFalse();
+    }
+
+    // Helper methods
+
+    private Map<String, List<TokenRange>> createTopology(String instanceId, 
long start, long end)
+    {
+        Map<String, List<TokenRange>> topology = new HashMap<>();
+        topology.put(instanceId, List.of(new TokenRange(start, end)));
+        return topology;
+    }
+
+    private void setupMocksForSingleDc(String dc, Map<String, 
List<TokenRange>> topology)
+    {
+        when(mockTokenRingProvider.dcs()).thenReturn(Set.of(dc));
+        
when(mockTokenRingProvider.getPrimaryTokenRanges(dc)).thenReturn(topology);
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to