This is an automated email from the ASF dual-hosted git repository.
jkonisa pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git
The following commit(s) were added to refs/heads/trunk by this push:
new f69e6a9e CASSSIDECAR-406: Fix ClusterTopologyMonitor delay parsing
error due to missing time unit suffix (#316)
f69e6a9e is described below
commit f69e6a9ef0ca355667075713f48ac74cedcfd8af
Author: Jyothsna konisa <[email protected]>
AuthorDate: Wed Feb 11 13:29:25 2026 -0800
CASSSIDECAR-406: Fix ClusterTopologyMonitor delay parsing error due to
missing time unit suffix (#316)
Patch by Jyothsna Konisa; Reviewed by Yifan Cai, Bernardo Botella, Josh
McKenzie and Francisco Guerrero for CASSSIDECAR-406
---
CHANGES.txt | 1 +
conf/sidecar.yaml | 7 +
server/build.gradle | 1 +
.../sidecar/config/SidecarConfiguration.java | 5 +
.../config/yaml/SidecarConfigurationImpl.java | 29 ++
.../sidecar/tasks/ClusterTopologyMonitor.java | 25 +-
.../sidecar/tasks/ClusterTopologyMonitorTest.java | 495 +++++++++++++++++++++
7 files changed, 558 insertions(+), 5 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 0a423300..674c6ccf 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
0.3.0
-----
+ * Fix ClusterTopologyMonitor delay parsing error due to missing time unit
suffix (CASSSIDECAR-406)
* Adding storage_port to instance configuration and metadata (CASSSIDECAR-399)
* Sidecar endpoint to support on-demand repair operation (CASSSIDECAR-268)
* Database access during auth flow is blocking event-loop thread
(CASSSIDECAR-369)
diff --git a/conf/sidecar.yaml b/conf/sidecar.yaml
index 1ca7b0f2..62e52185 100644
--- a/conf/sidecar.yaml
+++ b/conf/sidecar.yaml
@@ -344,6 +344,13 @@ healthcheck:
initial_delay: 0ms
execute_interval: 30s
+# Cluster Topology Monitor settings
+# Monitors Cassandra cluster topology changes and publishes events when
detected
+cluster_topology_monitor:
+ enabled: true
+ initial_delay: 0s
+ execute_interval: 1000ms # Interval between topology refresh cycles
+
# Sidecar Peer Health Monitor settings
# Enables a periodic task checking for the health of adjacent Sidecar peers in
the token ring
sidecar_peer_health:
diff --git a/server/build.gradle b/server/build.gradle
index 97cac99a..f0a5edc6 100644
--- a/server/build.gradle
+++ b/server/build.gradle
@@ -215,6 +215,7 @@ test {
systemProperty "vertx.logger-delegate-factory-class-name",
"io.vertx.core.logging.SLF4JLogDelegateFactory"
// There is no native lib (JNR) for getting time for testing
systemProperty "com.datastax.driver.USE_NATIVE_CLOCK", "false"
+ systemProperty "project.root", rootProject.rootDir.absolutePath
// ordinarily we don't need integration tests
// see the integrationTest task
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/config/SidecarConfiguration.java
b/server/src/main/java/org/apache/cassandra/sidecar/config/SidecarConfiguration.java
index c2355a91..82f6cda3 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/config/SidecarConfiguration.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/config/SidecarConfiguration.java
@@ -70,6 +70,11 @@ public interface SidecarConfiguration
*/
PeriodicTaskConfiguration healthCheckConfiguration();
+ /**
+ * @return the configuration for the cluster topology monitor
+ */
+ PeriodicTaskConfiguration clusterTopologyMonitorConfiguration();
+
/**
* @return configuration needed for metrics capture
*/
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/SidecarConfigurationImpl.java
b/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/SidecarConfigurationImpl.java
index 392c12e0..ef364cf1 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/SidecarConfigurationImpl.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/SidecarConfigurationImpl.java
@@ -93,6 +93,9 @@ public class SidecarConfigurationImpl implements
SidecarConfiguration
@JsonProperty("healthcheck")
protected final PeriodicTaskConfiguration healthCheckConfiguration;
+ @JsonProperty("cluster_topology_monitor")
+ protected final PeriodicTaskConfiguration
clusterTopologyMonitorConfiguration;
+
@JsonProperty("metrics")
protected final MetricsConfiguration metricsConfiguration;
@@ -132,6 +135,7 @@ public class SidecarConfigurationImpl implements
SidecarConfiguration
sslConfiguration = builder.sslConfiguration;
accessControlConfiguration = builder.accessControlConfiguration;
healthCheckConfiguration = builder.healthCheckConfiguration;
+ clusterTopologyMonitorConfiguration =
builder.clusterTopologyMonitorConfiguration;
sidecarPeerHealthConfiguration =
builder.sidecarPeerHealthConfiguration;
sidecarClientConfiguration = builder.sidecarClientConfiguration;
metricsConfiguration = builder.metricsConfiguration;
@@ -221,6 +225,16 @@ public class SidecarConfigurationImpl implements
SidecarConfiguration
return healthCheckConfiguration;
}
+ /**
+ * @return the configuration for the cluster topology monitor
+ */
+ @Override
+ @JsonProperty("cluster_topology_monitor")
+ public PeriodicTaskConfiguration clusterTopologyMonitorConfiguration()
+ {
+ return clusterTopologyMonitorConfiguration;
+ }
+
/**
* @return the configuration for the down detector service
*/
@@ -415,6 +429,10 @@ public class SidecarConfigurationImpl implements
SidecarConfiguration
= new PeriodicTaskConfigurationImpl(true,
MillisecondBoundConfiguration.ZERO,
MillisecondBoundConfiguration.parse("30s"));
+ private PeriodicTaskConfiguration clusterTopologyMonitorConfiguration
+ = new PeriodicTaskConfigurationImpl(true,
+ MillisecondBoundConfiguration.ZERO,
+
MillisecondBoundConfiguration.parse("1000ms"));
private SidecarPeerHealthConfiguration sidecarPeerHealthConfiguration
= new SidecarPeerHealthConfigurationImpl();
private SidecarClientConfiguration sidecarClientConfiguration = new
SidecarClientConfigurationImpl();
private MetricsConfiguration metricsConfiguration = new
MetricsConfigurationImpl();
@@ -503,6 +521,17 @@ public class SidecarConfigurationImpl implements
SidecarConfiguration
return update(b -> b.healthCheckConfiguration =
healthCheckConfiguration);
}
+ /**
+ * Sets the {@code clusterTopologyMonitorConfiguration} and returns a
reference to this Builder enabling method chaining.
+ *
+ * @param clusterTopologyMonitorConfiguration the {@code
clusterTopologyMonitorConfiguration} to set
+ * @return a reference to this Builder
+ */
+ public Builder
clusterTopologyMonitorConfiguration(PeriodicTaskConfiguration
clusterTopologyMonitorConfiguration)
+ {
+ return update(b -> b.clusterTopologyMonitorConfiguration =
clusterTopologyMonitorConfiguration);
+ }
+
/**
* Sets the {@code healthCheckConfiguration} and returns a reference
to this Builder enabling method chaining.
*
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/tasks/ClusterTopologyMonitor.java
b/server/src/main/java/org/apache/cassandra/sidecar/tasks/ClusterTopologyMonitor.java
index 4b77155e..8b9677a6 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/tasks/ClusterTopologyMonitor.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/tasks/ClusterTopologyMonitor.java
@@ -37,7 +37,8 @@ import
org.apache.cassandra.sidecar.codecs.DcLocalTopologyChangeEventCodec;
import org.apache.cassandra.sidecar.common.server.cluster.locator.TokenRange;
import org.apache.cassandra.sidecar.common.server.utils.DurationSpec;
-import
org.apache.cassandra.sidecar.common.server.utils.MillisecondBoundConfiguration;
+import org.apache.cassandra.sidecar.config.PeriodicTaskConfiguration;
+import org.apache.cassandra.sidecar.config.SidecarConfiguration;
import org.apache.cassandra.sidecar.coordination.TokenRingProvider;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -65,6 +66,7 @@ public class ClusterTopologyMonitor implements PeriodicTask
}
private static final Logger LOGGER =
LoggerFactory.getLogger(ClusterTopologyMonitor.class);
+ private final PeriodicTaskConfiguration configuration;
private final Vertx vertx;
private final TokenRingProvider tokenRingProvider;
@@ -137,9 +139,12 @@ public class ClusterTopologyMonitor implements PeriodicTask
* Creates a cluster topology monitor with periodic task scheduling.
*/
@Inject
- public ClusterTopologyMonitor(Vertx vertx, TokenRingProvider
tokenRingProvider, PeriodicTaskExecutor periodicTaskExecutor)
+ public ClusterTopologyMonitor(Vertx vertx,
+ TokenRingProvider tokenRingProvider,
+ PeriodicTaskExecutor periodicTaskExecutor,
+ SidecarConfiguration configuration)
{
- this(vertx, tokenRingProvider);
+ this(vertx, tokenRingProvider, configuration);
LOGGER.info("Starting Cluster Topology Monitor");
periodicTaskExecutor.schedule(this);
LOGGER.info("Cluster Topology Monitor started");
@@ -148,8 +153,9 @@ public class ClusterTopologyMonitor implements PeriodicTask
/**
* Creates a cluster topology monitor without scheduling.
*/
- public ClusterTopologyMonitor(Vertx vertx, TokenRingProvider
tokenRingProvider)
+ public ClusterTopologyMonitor(Vertx vertx, TokenRingProvider
tokenRingProvider, SidecarConfiguration configuration)
{
+ this.configuration =
configuration.clusterTopologyMonitorConfiguration();
this.vertx = vertx;
this.tokenRingProvider = tokenRingProvider;
try
@@ -165,13 +171,22 @@ public class ClusterTopologyMonitor implements
PeriodicTask
}
}
+ /**
+ * Returns the initial delay before the first execution.
+ */
+ @Override
+ public DurationSpec initialDelay()
+ {
+ return configuration.initialDelay();
+ }
+
/**
* Returns the delay between topology refresh cycles.
*/
@Override
public DurationSpec delay()
{
- return MillisecondBoundConfiguration.parse("1000");
+ return configuration.executeInterval();
}
/**
diff --git
a/server/src/test/java/org/apache/cassandra/sidecar/tasks/ClusterTopologyMonitorTest.java
b/server/src/test/java/org/apache/cassandra/sidecar/tasks/ClusterTopologyMonitorTest.java
new file mode 100644
index 00000000..5ac9be8f
--- /dev/null
+++
b/server/src/test/java/org/apache/cassandra/sidecar/tasks/ClusterTopologyMonitorTest.java
@@ -0,0 +1,495 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.tasks;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import io.vertx.core.Promise;
+import io.vertx.core.Vertx;
+import io.vertx.junit5.Checkpoint;
+import io.vertx.junit5.VertxExtension;
+import io.vertx.junit5.VertxTestContext;
+import org.apache.cassandra.sidecar.common.server.cluster.locator.TokenRange;
+import
org.apache.cassandra.sidecar.common.server.utils.MillisecondBoundConfiguration;
+import org.apache.cassandra.sidecar.config.PeriodicTaskConfiguration;
+import org.apache.cassandra.sidecar.config.SidecarConfiguration;
+import org.apache.cassandra.sidecar.config.yaml.SidecarConfigurationImpl;
+import org.apache.cassandra.sidecar.coordination.TokenRingProvider;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Unit tests for the {@link ClusterTopologyMonitor}
+ */
+@ExtendWith(VertxExtension.class)
+class ClusterTopologyMonitorTest
+{
+ SidecarConfiguration mockConfiguration;
+ PeriodicTaskConfiguration mockClusterTopologyMonitorConfiguration;
+ TokenRingProvider mockTokenRingProvider;
+ ClusterTopologyMonitor monitor;
+ Vertx vertx;
+
+ @BeforeEach
+ void setup(Vertx vertx)
+ {
+ this.vertx = vertx;
+ mockConfiguration = mock(SidecarConfiguration.class);
+ mockClusterTopologyMonitorConfiguration =
mock(PeriodicTaskConfiguration.class);
+ mockTokenRingProvider = mock(TokenRingProvider.class);
+
+
when(mockConfiguration.clusterTopologyMonitorConfiguration()).thenReturn(mockClusterTopologyMonitorConfiguration);
+
when(mockClusterTopologyMonitorConfiguration.initialDelay()).thenReturn(MillisecondBoundConfiguration.parse("5s"));
+
when(mockClusterTopologyMonitorConfiguration.executeInterval()).thenReturn(MillisecondBoundConfiguration.parse("500s"));
+
+ monitor = new ClusterTopologyMonitor(vertx, mockTokenRingProvider,
mockConfiguration);
+ }
+
+ @AfterEach
+ void tearDown(VertxTestContext context)
+ {
+ vertx.close(context.succeedingThenComplete());
+ }
+
+ @Test
+ void testConfiguration()
+ {
+
assertThat(monitor.initialDelay()).isEqualTo(MillisecondBoundConfiguration.parse("5s"));
+
assertThat(monitor.delay()).isEqualTo(MillisecondBoundConfiguration.parse("500s"));
+
+
when(mockClusterTopologyMonitorConfiguration.initialDelay()).thenReturn(MillisecondBoundConfiguration.parse("0s"));
+
when(mockClusterTopologyMonitorConfiguration.executeInterval()).thenReturn(MillisecondBoundConfiguration.parse("2000s"));
+ ClusterTopologyMonitor customMonitor = new
ClusterTopologyMonitor(vertx, mockTokenRingProvider, mockConfiguration);
+
+
assertThat(customMonitor.initialDelay()).isEqualTo(MillisecondBoundConfiguration.parse("0s"));
+
assertThat(customMonitor.delay()).isEqualTo(MillisecondBoundConfiguration.parse("2000s"));
+ }
+
+ @Test
+ void testConfigurationFromYaml() throws IOException
+ {
+ // Get project root from system property set by Gradle
+ Path projectRoot = Paths.get(System.getProperty("project.root"));
+ Path configPath = projectRoot.resolve("conf/sidecar.yaml");
+ assertThat(configPath).exists();
+
+ SidecarConfiguration config =
SidecarConfigurationImpl.readYamlConfiguration(configPath);
+
+ PeriodicTaskConfiguration clusterTopologyConfig =
config.clusterTopologyMonitorConfiguration();
+
+ assertThat(clusterTopologyConfig).isNotNull();
+ assertThat(clusterTopologyConfig.enabled()).isTrue();
+
assertThat(clusterTopologyConfig.initialDelay()).isEqualTo(MillisecondBoundConfiguration.parse("0s"));
+
assertThat(clusterTopologyConfig.executeInterval()).isEqualTo(MillisecondBoundConfiguration.parse("1000ms"));
+ }
+
+ @Test
+ void testInitialBootstrapPublishesEvent(VertxTestContext context)
+ {
+ Map<String, List<TokenRange>> topology = createTopology("instance1",
0, 100);
+ setupMocksForSingleDc("dc1", topology);
+
+ Checkpoint checkpoint = context.checkpoint();
+
+
vertx.eventBus().consumer(ClusterTopologyMonitor.ClusterTopologyEventType.ON_DC_TOPOLOGY_CHANGE.address(),
message -> {
+ ClusterTopologyMonitor.DcLocalTopologyChangeEvent event =
+ (ClusterTopologyMonitor.DcLocalTopologyChangeEvent)
message.body();
+
+ context.verify(() -> {
+ assertThat(event.dc).isEqualTo("dc1");
+ assertThat(event.prev).isNull();
+ assertThat(event.curr).isNotNull().hasSize(1);
+ assertThat(event.curr).containsKey("instance1");
+ assertThat(event.curr.get("instance1")).hasSize(1);
+ });
+ checkpoint.flag();
+ });
+
+ monitor.execute(Promise.promise());
+ }
+
+ @Test
+ void testNoTopologyChangeDoesNotPublishEvent(VertxTestContext context)
+ {
+ Map<String, List<TokenRange>> topology = createTopology("instance1",
0, 100);
+ setupMocksForSingleDc("dc1", topology);
+
+ List<ClusterTopologyMonitor.DcLocalTopologyChangeEvent> events = new
ArrayList<>();
+
+
vertx.eventBus().consumer(ClusterTopologyMonitor.ClusterTopologyEventType.ON_DC_TOPOLOGY_CHANGE.address(),
message -> {
+ events.add((ClusterTopologyMonitor.DcLocalTopologyChangeEvent)
message.body());
+ });
+
+ // First execution - should publish bootstrap event
+ monitor.execute(Promise.promise());
+
+ // Wait a bit, then execute again with same topology - should not
publish
+ vertx.setTimer(200, id1 -> {
+ monitor.execute(Promise.promise());
+
+ // Wait again to ensure no second event
+ vertx.setTimer(200, id2 -> {
+ context.verify(() -> {
+ assertThat(events).hasSize(1); // Only bootstrap event
+ assertThat(events.get(0).prev).isNull();
+ });
+ context.completeNow();
+ });
+ });
+ }
+
+ @Test
+ void testTopologyChangeWhenInstanceAdded(VertxTestContext context)
+ {
+ Map<String, List<TokenRange>> initial = createTopology("instance1", 0,
100);
+ Map<String, List<TokenRange>> withNewInstance = new HashMap<>(initial);
+ withNewInstance.put("instance2", List.of(new TokenRange(100, 200)));
+
+ when(mockTokenRingProvider.dcs()).thenReturn(Set.of("dc1"));
+ when(mockTokenRingProvider.getPrimaryTokenRanges("dc1"))
+ .thenReturn(initial)
+ .thenReturn(withNewInstance);
+
+ List<ClusterTopologyMonitor.DcLocalTopologyChangeEvent> events = new
ArrayList<>();
+
+
vertx.eventBus().consumer(ClusterTopologyMonitor.ClusterTopologyEventType.ON_DC_TOPOLOGY_CHANGE.address(),
message -> {
+ events.add((ClusterTopologyMonitor.DcLocalTopologyChangeEvent)
message.body());
+ });
+
+ // Bootstrap
+ monitor.execute(Promise.promise());
+
+ vertx.setTimer(200, id1 -> {
+ // Add new instance
+ monitor.execute(Promise.promise());
+
+ vertx.setTimer(200, id2 -> {
+ context.verify(() -> {
+ assertThat(events).hasSize(2);
+
+ // First event - bootstrap
+ assertThat(events.get(0).dc).isEqualTo("dc1");
+ assertThat(events.get(0).prev).isNull();
+ assertThat(events.get(0).curr).hasSize(1);
+
+ // Second event - instance added
+ assertThat(events.get(1).dc).isEqualTo("dc1");
+ assertThat(events.get(1).prev).isNotNull().hasSize(1);
+ assertThat(events.get(1).curr).hasSize(2);
+ assertThat(events.get(1).curr).containsKeys("instance1",
"instance2");
+ });
+ context.completeNow();
+ });
+ });
+ }
+
+ @Test
+ void testTopologyChangeWhenInstanceRemoved(VertxTestContext context)
+ {
+ Map<String, List<TokenRange>> initial = createTopology("instance1", 0,
100);
+ initial.put("instance2", List.of(new TokenRange(100, 200)));
+
+ Map<String, List<TokenRange>> afterRemoval =
createTopology("instance1", 0, 100);
+
+ when(mockTokenRingProvider.dcs()).thenReturn(Set.of("dc1"));
+ when(mockTokenRingProvider.getPrimaryTokenRanges("dc1"))
+ .thenReturn(initial)
+ .thenReturn(afterRemoval);
+
+ List<ClusterTopologyMonitor.DcLocalTopologyChangeEvent> events = new
ArrayList<>();
+
+
vertx.eventBus().consumer(ClusterTopologyMonitor.ClusterTopologyEventType.ON_DC_TOPOLOGY_CHANGE.address(),
message -> {
+ events.add((ClusterTopologyMonitor.DcLocalTopologyChangeEvent)
message.body());
+ });
+
+ monitor.execute(Promise.promise());
+
+ vertx.setTimer(200, id1 -> {
+ monitor.execute(Promise.promise());
+
+ vertx.setTimer(200, id2 -> {
+ context.verify(() -> {
+ assertThat(events).hasSize(2);
+ assertThat(events.get(1).prev).hasSize(2);
+ assertThat(events.get(1).curr).hasSize(1);
+ assertThat(events.get(1).curr).containsKey("instance1");
+
assertThat(events.get(1).curr).doesNotContainKey("instance2");
+ });
+ context.completeNow();
+ });
+ });
+ }
+
+ @Test
+ void testTopologyChangeWhenTokenRangesChange(VertxTestContext context)
+ {
+ Map<String, List<TokenRange>> initial = createTopology("instance1", 0,
100);
+ Map<String, List<TokenRange>> updated = createTopology("instance1", 0,
150);
+
+ when(mockTokenRingProvider.dcs()).thenReturn(Set.of("dc1"));
+ when(mockTokenRingProvider.getPrimaryTokenRanges("dc1"))
+ .thenReturn(initial)
+ .thenReturn(updated);
+
+ List<ClusterTopologyMonitor.DcLocalTopologyChangeEvent> events = new
ArrayList<>();
+
+
vertx.eventBus().consumer(ClusterTopologyMonitor.ClusterTopologyEventType.ON_DC_TOPOLOGY_CHANGE.address(),
message -> {
+ events.add((ClusterTopologyMonitor.DcLocalTopologyChangeEvent)
message.body());
+ });
+
+ monitor.execute(Promise.promise());
+
+ vertx.setTimer(200, id1 -> {
+ monitor.execute(Promise.promise());
+
+ vertx.setTimer(200, id2 -> {
+ context.verify(() -> {
+ assertThat(events).hasSize(2);
+ // Verify token range changed
+ assertThat(events.get(1).prev).isNotNull();
+ assertThat(events.get(1).curr).isNotNull();
+
assertThat(events.get(1).prev).isNotEqualTo(events.get(1).curr);
+ });
+ context.completeNow();
+ });
+ });
+ }
+
+ @Test
+ void testMultipleDatacenters(VertxTestContext context)
+ {
+ when(mockTokenRingProvider.dcs()).thenReturn(Set.of("dc1", "dc2"));
+
when(mockTokenRingProvider.getPrimaryTokenRanges("dc1")).thenReturn(createTopology("instance1",
0, 100));
+
when(mockTokenRingProvider.getPrimaryTokenRanges("dc2")).thenReturn(createTopology("instance2",
0, 100));
+
+ List<ClusterTopologyMonitor.DcLocalTopologyChangeEvent> events = new
ArrayList<>();
+ Checkpoint checkpoint = context.checkpoint(2); // Expect 2 events
+
+
vertx.eventBus().consumer(ClusterTopologyMonitor.ClusterTopologyEventType.ON_DC_TOPOLOGY_CHANGE.address(),
message -> {
+ ClusterTopologyMonitor.DcLocalTopologyChangeEvent event =
+ (ClusterTopologyMonitor.DcLocalTopologyChangeEvent)
message.body();
+ events.add(event);
+ checkpoint.flag();
+ });
+
+ monitor.execute(Promise.promise());
+
+ // Give some time for events to be processed, then verify
+ vertx.setTimer(500, id -> {
+ context.verify(() -> {
+ assertThat(events).hasSize(2);
+ assertThat(events).anyMatch(e -> "dc1".equals(e.dc));
+ assertThat(events).anyMatch(e -> "dc2".equals(e.dc));
+ });
+ });
+ }
+
+ @Test
+ void testMultipleSequentialChanges(VertxTestContext context)
+ {
+ Map<String, List<TokenRange>> topology1 = createTopology("instance1",
0, 100);
+
+ Map<String, List<TokenRange>> topology2 = new HashMap<>(topology1);
+ topology2.put("instance2", List.of(new TokenRange(100, 200)));
+
+ Map<String, List<TokenRange>> topology3 = new HashMap<>(topology2);
+ topology3.put("instance3", List.of(new TokenRange(200, 300)));
+
+ when(mockTokenRingProvider.dcs()).thenReturn(Set.of("dc1"));
+ when(mockTokenRingProvider.getPrimaryTokenRanges("dc1"))
+ .thenReturn(topology1)
+ .thenReturn(topology2)
+ .thenReturn(topology3);
+
+ List<ClusterTopologyMonitor.DcLocalTopologyChangeEvent> events = new
ArrayList<>();
+
+
vertx.eventBus().consumer(ClusterTopologyMonitor.ClusterTopologyEventType.ON_DC_TOPOLOGY_CHANGE.address(),
message -> {
+ events.add((ClusterTopologyMonitor.DcLocalTopologyChangeEvent)
message.body());
+ });
+
+ monitor.execute(Promise.promise());
+
+ vertx.setTimer(200, id1 -> {
+ monitor.execute(Promise.promise());
+
+ vertx.setTimer(200, id2 -> {
+ monitor.execute(Promise.promise());
+
+ vertx.setTimer(200, id3 -> {
+ context.verify(() -> {
+ assertThat(events).hasSize(3);
+ assertThat(events.get(0).curr).hasSize(1);
+ assertThat(events.get(1).curr).hasSize(2);
+ assertThat(events.get(2).curr).hasSize(3);
+ });
+ context.completeNow();
+ });
+ });
+ });
+ }
+
+ @Test
+ void testExecuteHandlesExceptionGracefully(VertxTestContext context)
+ {
+ when(mockTokenRingProvider.dcs()).thenThrow(new RuntimeException("Test
exception"));
+
+ Promise<Void> promise = Promise.promise();
+ monitor.execute(promise);
+
+ promise.future().onComplete(ar -> {
+ context.verify(() -> assertThat(ar.succeeded()).isTrue());
+ context.completeNow();
+ });
+ }
+
+ @Test
+ void testChangeInInstanceTopologyDetectsChanges()
+ {
+ Map<String, List<TokenRange>> prev = createTopology("instance1", 0,
100);
+ Map<String, List<TokenRange>> curr;
+
+ // Adding instance
+ curr = createTopology("instance1", 0, 100);
+ curr.put("instance2", List.of(new TokenRange(100, 200)));
+ assertThat(ClusterTopologyMonitor.changeInInstanceTopology(prev,
curr)).isTrue();
+
+ // Removing instance
+ prev = createTopology("instance1", 0, 100);
+ prev.put("instance2", List.of(new TokenRange(100, 200)));
+ curr = createTopology("instance1", 0, 100);
+ assertThat(ClusterTopologyMonitor.changeInInstanceTopology(prev,
curr)).isTrue();
+
+ // Token range change
+ prev = createTopology("instance1", 0, 100);
+ curr = createTopology("instance1", 0, 150);
+ assertThat(ClusterTopologyMonitor.changeInInstanceTopology(prev,
curr)).isTrue();
+
+ // No change
+ prev = createTopology("instance1", 0, 100);
+ curr = createTopology("instance1", 0, 100);
+ assertThat(ClusterTopologyMonitor.changeInInstanceTopology(prev,
curr)).isFalse();
+
+ // Bootstrap (prev is null)
+ assertThat(ClusterTopologyMonitor.changeInInstanceTopology(null,
curr)).isTrue();
+ }
+
+ @Test
+ void testChangeInInstanceTopologyDetectsTokenRangeChanges()
+ {
+ List<TokenRange> prev, curr;
+
+ // Number of ranges changed
+ prev = List.of(new TokenRange(0, 100));
+ curr = List.of(new TokenRange(0, 50), new TokenRange(50, 100));
+
assertThat(ClusterTopologyMonitor.changeInInstanceTopology("instance1", prev,
curr)).isTrue();
+
+ // Lower endpoint changed
+ prev = List.of(new TokenRange(0, 100));
+ curr = List.of(new TokenRange(10, 100));
+
assertThat(ClusterTopologyMonitor.changeInInstanceTopology("instance1", prev,
curr)).isTrue();
+
+ // Upper endpoint changed
+ prev = List.of(new TokenRange(0, 100));
+ curr = List.of(new TokenRange(0, 150));
+
assertThat(ClusterTopologyMonitor.changeInInstanceTopology("instance1", prev,
curr)).isTrue();
+
+ // No change
+ prev = List.of(new TokenRange(0, 100), new TokenRange(200, 300));
+ curr = List.of(new TokenRange(0, 100), new TokenRange(200, 300));
+
assertThat(ClusterTopologyMonitor.changeInInstanceTopology("instance1", prev,
curr)).isFalse();
+
+ // Order doesn't matter
+ prev = List.of(new TokenRange(200, 300), new TokenRange(0, 100));
+ curr = List.of(new TokenRange(0, 100), new TokenRange(200, 300));
+
assertThat(ClusterTopologyMonitor.changeInInstanceTopology("instance1", prev,
curr)).isFalse();
+ }
+
+ @Test
+ void testEmptyTopology(VertxTestContext context)
+ {
+ when(mockTokenRingProvider.dcs()).thenReturn(Set.of("dc1"));
+
when(mockTokenRingProvider.getPrimaryTokenRanges("dc1")).thenReturn(new
HashMap<>());
+
+ List<ClusterTopologyMonitor.DcLocalTopologyChangeEvent> events = new
ArrayList<>();
+
+
vertx.eventBus().consumer(ClusterTopologyMonitor.ClusterTopologyEventType.ON_DC_TOPOLOGY_CHANGE.address(),
message -> {
+ events.add((ClusterTopologyMonitor.DcLocalTopologyChangeEvent)
message.body());
+ });
+
+ monitor.execute(Promise.promise());
+
+ vertx.setTimer(200, id -> {
+ context.verify(() -> {
+ // Should still publish bootstrap event even with empty
topology
+ assertThat(events).hasSize(1);
+ assertThat(events.get(0).curr).isEmpty();
+ });
+ context.completeNow();
+ });
+ }
+
+ @Test
+ void testUpdateMethodBehavior()
+ {
+ String dc = "dc1";
+ Map<String, List<TokenRange>> topology1 = createTopology("instance1",
0, 100);
+ Map<String, List<TokenRange>> topology2 = createTopology("instance1",
0, 150);
+
+ // First update (bootstrap) should succeed
+ boolean result = monitor.update(dc, null, topology1);
+ assertThat(result).isTrue();
+
+ // Trying to update with same prev should succeed
+ result = monitor.update(dc, topology1, topology2);
+ assertThat(result).isTrue();
+
+ // Trying to update with wrong prev should fail (concurrent
modification)
+ Map<String, List<TokenRange>> topology3 = createTopology("instance1",
0, 200);
+ result = monitor.update(dc, topology1, topology3);
+ assertThat(result).isFalse();
+ }
+
+ // Helper methods
+
+ private Map<String, List<TokenRange>> createTopology(String instanceId,
long start, long end)
+ {
+ Map<String, List<TokenRange>> topology = new HashMap<>();
+ topology.put(instanceId, List.of(new TokenRange(start, end)));
+ return topology;
+ }
+
+ private void setupMocksForSingleDc(String dc, Map<String,
List<TokenRange>> topology)
+ {
+ when(mockTokenRingProvider.dcs()).thenReturn(Set.of(dc));
+
when(mockTokenRingProvider.getPrimaryTokenRanges(dc)).thenReturn(topology);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]