This is an automated email from the ASF dual-hosted git repository.
xiangfu0 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new a0d00297153 Allow dynamic toggling of adaptive routing metrics export
(#18286)
a0d00297153 is described below
commit a0d00297153b58d0fe79f1146bef51668abcbc3c
Author: Timothy Elgersma <[email protected]>
AuthorDate: Thu Jun 18 15:38:37 2026 -0400
Allow dynamic toggling of adaptive routing metrics export (#18286)
* Allow dynamic toggling of adaptive routing metrics export (#582)
Committed-By-Agent: claude
cc stripe-private-oss-forks/pinot-reviewers
r?
https://git.corp.stripe.com/stripe-private-oss-forks/pinot/pull/581 added
metrics for adaptive routing stats.
We may want to toggle whether or not stats are exported as metrics without
needing to launch a rolling restart.
ServerRoutingStatsManager now implements PinotClusterConfigChangeListener.
The periodic export task is always scheduled when stats collection is enabled;
the enable.stats.metric.export flag is checked inside the task and can be
updated at runtime via the Helix cluster config (PUT /cluster/configs) without
a broker restart.
We also allow updating the metrics export frequency at runtime.
[STREAMANALYTICS-4390](https://jira.corp.stripe.com/browse/STREAMANALYTICS-4390)
Deployed to [rad-canary
QA](https://amp.qa.corp.stripe.com/deploy/qa-deploy1.pdx.deploy.stripe.net%2Fdeploy_r2WUckQcQ467TfeJvyp4zw).
`ssh`ed onto a rad-canary controller and ran
```
curl -X POST localhost:9000/cluster/configs -H "Content-Type:
application/json" -d
'{"pinot.broker.adaptive.server.selector.enable.stats.metric.export": "false"}'
curl -X POST localhost:9000/cluster/configs -H "Content-Type:
application/json" -d
'{"pinot.broker.adaptive.server.selector.enable.stats.metric.export": "true"}'
curl -X POST localhost:9000/cluster/configs -H "Content-Type:
application/json" -d
'{"pinot.broker.adaptive.server.selector.enable.stats.metric.export": "false"}'
```
I see that the metrics stopped / resumed as expected in
[grafana](https://g-8916660cfe.grafana-workspace.us-west-2.amazonaws.com/explore?schemaVersion=1&panes=%7B%223yy%22:%7B%22datasource%22:%22zb219lV4k%22,%22queries%22:%5B%7B%22refId%22:%22B%22,%22expr%22:%22count%20by%20%28host%29%20%28%7B__name__%3D~%5C%22pinot_broker_adaptive_server_latency_ema%5C%22,%20pinot_cluster%3D%5C%22rad-canary%5C%22%7D%29%22,%22range%22:true,%22instant%22:true,%22datasource%22:%7B%22type%22:%22prometheu
[...]
<img width="1477" alt="Screenshot 2026-04-02 at 4 17 58 pm"
src="https://git.corp.stripe.com/user-attachments/assets/dca06b6d-8b6f-4797-a079-95a1f21d7de7"
/>
Stripe-Original-Repo: stripe-private-oss-forks/pinot
Stripe-Monotonic-Timestamp: v2/2026-04-07T21:12:57Z/0
Stripe-Original-PR:
https://git.corp.stripe.com/stripe-private-oss-forks/pinot/pull/582
* mark as volatile
* add validation for runtime setting
---
.../broker/broker/helix/BaseBrokerStarter.java | 1 +
.../routing/stats/ServerRoutingStatsManager.java | 100 ++++++++++++++--
.../stats/ServerRoutingStatsManagerTest.java | 132 +++++++++++++++++++++
3 files changed, 222 insertions(+), 11 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
index eb6428cb877..622479edf34 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
@@ -697,6 +697,7 @@ public abstract class BaseBrokerStarter implements
ServiceStartable {
System.currentTimeMillis() - startTimeMs, TimeUnit.MILLISECONDS);
_clusterConfigChangeHandler.registerClusterConfigChangeListener(ContinuousJfrStarter.INSTANCE);
+
_clusterConfigChangeHandler.registerClusterConfigChangeListener(_serverRoutingStatsManager);
NettyInspector.registerMetrics(_brokerMetrics);
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManager.java
index fe85d62f761..8fbf078d5d7 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManager.java
@@ -19,14 +19,17 @@
package org.apache.pinot.core.transport.server.routing.stats;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
@@ -34,6 +37,7 @@ import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pinot.common.metrics.BrokerGauge;
import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.spi.config.provider.PinotClusterConfigChangeListener;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.query.QueryExecutionContext.QueryType;
import org.apache.pinot.spi.query.QueryThreadContext;
@@ -43,12 +47,18 @@ import org.slf4j.LoggerFactory;
/**
+ * {@code ServerRoutingStatsManager} manages the query routing stats for each
server and used by the Adaptive
+ * Server Selection feature (when enabled). The stats are maintained at the
broker and are updated when a query is
+ * submitted to a server and when a server responds after processing a query.
*
- * {@code ServerRoutingStatsManager} manages the query routing stats for each
server and used by the Adaptive
- * Server Selection feature (when enabled). The stats are maintained at the
broker and are updated when a query is
- * submitted to a server and when a server responds after processing a query.
+ * <p>Thread safety: {@code onChange} is invoked on the Helix config-change
callback thread.
+ * {@code exportStatsAsMetrics} runs on the single-threaded {@code
_periodicTaskExecutor}.
+ * {@code _enableStatsMetricExport} and {@code _statsMetricExportIntervalMs}
are {@code volatile} so
+ * writes from the Helix callback thread are immediately visible to the export
thread. All other config
+ * fields ({@code _alpha}, {@code _autoDecayWindowMs}, etc.) are written once
during {@code init()}
+ * before any executor threads start, so no additional synchronization is
needed for those.
*/
-public class ServerRoutingStatsManager {
+public class ServerRoutingStatsManager implements
PinotClusterConfigChangeListener {
private static final Logger LOGGER =
LoggerFactory.getLogger(ServerRoutingStatsManager.class);
private final PinotConfiguration _config;
@@ -69,7 +79,9 @@ public class ServerRoutingStatsManager {
private double _avgInitializationVal;
private int _hybridScoreExponent;
private int _hybridScoreQueueFloor;
- private boolean _enableStatsMetricExport;
+ private volatile boolean _enableStatsMetricExport;
+ private volatile long _statsMetricExportIntervalMs;
+ private volatile ScheduledFuture<?> _metricExportFuture;
public ServerRoutingStatsManager(PinotConfiguration pinotConfig,
BrokerMetrics brokerMetrics) {
_config = pinotConfig;
@@ -112,12 +124,57 @@ public class ServerRoutingStatsManager {
_enableStatsMetricExport =
_config.getProperty(AdaptiveServerSelector.CONFIG_OF_ENABLE_STATS_METRIC_EXPORT,
AdaptiveServerSelector.DEFAULT_ENABLE_STATS_METRIC_EXPORT);
- if (_enableStatsMetricExport) {
- long intervalMs =
_config.getProperty(AdaptiveServerSelector.CONFIG_OF_STATS_METRIC_EXPORT_INTERVAL_MS,
- AdaptiveServerSelector.DEFAULT_STATS_METRIC_EXPORT_INTERVAL_MS);
- _periodicTaskExecutor.scheduleAtFixedRate(this::exportStatsAsMetrics,
intervalMs, intervalMs,
- TimeUnit.MILLISECONDS);
- LOGGER.info("Adaptive server routing stats metric export enabled with
interval {}ms.", intervalMs);
+ _statsMetricExportIntervalMs =
_config.getProperty(AdaptiveServerSelector.CONFIG_OF_STATS_METRIC_EXPORT_INTERVAL_MS,
+ AdaptiveServerSelector.DEFAULT_STATS_METRIC_EXPORT_INTERVAL_MS);
+ _metricExportFuture =
_periodicTaskExecutor.scheduleAtFixedRate(this::exportStatsAsMetrics,
+ _statsMetricExportIntervalMs, _statsMetricExportIntervalMs,
TimeUnit.MILLISECONDS);
+ LOGGER.info("Adaptive server routing stats metric export scheduled with
interval {}ms (enabled={}).",
+ _statsMetricExportIntervalMs, _enableStatsMetricExport);
+ }
+
+ @Override
+ public void onChange(Set<String> changedConfigs, Map<String, String>
clusterConfigs) {
+ if
(changedConfigs.contains(AdaptiveServerSelector.CONFIG_OF_ENABLE_STATS_METRIC_EXPORT))
{
+ String value =
clusterConfigs.get(AdaptiveServerSelector.CONFIG_OF_ENABLE_STATS_METRIC_EXPORT);
+ if (value != null) {
+ _enableStatsMetricExport = Boolean.parseBoolean(value);
+ } else {
+ // Key was removed from cluster config — fall back to the static
broker config value.
+ _enableStatsMetricExport =
_config.getProperty(AdaptiveServerSelector.CONFIG_OF_ENABLE_STATS_METRIC_EXPORT,
+ AdaptiveServerSelector.DEFAULT_ENABLE_STATS_METRIC_EXPORT);
+ }
+ LOGGER.info("Updated enableStatsMetricExport to {} from cluster
config.", _enableStatsMetricExport);
+ if (!_enableStatsMetricExport) {
+ removeAllServerStatsGauges();
+ }
+ }
+ if
(changedConfigs.contains(AdaptiveServerSelector.CONFIG_OF_STATS_METRIC_EXPORT_INTERVAL_MS))
{
+ String value =
clusterConfigs.get(AdaptiveServerSelector.CONFIG_OF_STATS_METRIC_EXPORT_INTERVAL_MS);
+ long newIntervalMs;
+ if (value != null) {
+ try {
+ newIntervalMs = Long.parseLong(value);
+ } catch (NumberFormatException e) {
+ LOGGER.warn("Invalid value '{}' for config '{}'; ignoring interval
change", value,
+
AdaptiveServerSelector.CONFIG_OF_STATS_METRIC_EXPORT_INTERVAL_MS);
+ return;
+ }
+ if (newIntervalMs <= 0) {
+ LOGGER.warn("Non-positive value {} for config '{}'; ignoring
interval change", newIntervalMs,
+
AdaptiveServerSelector.CONFIG_OF_STATS_METRIC_EXPORT_INTERVAL_MS);
+ return;
+ }
+ } else {
+ newIntervalMs =
_config.getProperty(AdaptiveServerSelector.CONFIG_OF_STATS_METRIC_EXPORT_INTERVAL_MS,
+ AdaptiveServerSelector.DEFAULT_STATS_METRIC_EXPORT_INTERVAL_MS);
+ }
+ if (newIntervalMs != _statsMetricExportIntervalMs) {
+ _statsMetricExportIntervalMs = newIntervalMs;
+ _metricExportFuture.cancel(false);
+ _metricExportFuture =
_periodicTaskExecutor.scheduleAtFixedRate(this::exportStatsAsMetrics,
+ newIntervalMs, newIntervalMs, TimeUnit.MILLISECONDS);
+ LOGGER.info("Rescheduled adaptive server routing stats metric export
with new interval {}ms.", newIntervalMs);
+ }
}
}
@@ -125,6 +182,11 @@ public class ServerRoutingStatsManager {
return _isEnabled;
}
+ @VisibleForTesting
+ public long getStatsMetricExportIntervalMs() {
+ return _statsMetricExportIntervalMs;
+ }
+
public void shutDown() {
// As the stats are not persistent, shutdown need not wait for task
termination.
if (!_isEnabled) {
@@ -441,12 +503,28 @@ public class ServerRoutingStatsManager {
}
}
+ private void removeAllServerStatsGauges() {
+ if (_serverQueryStatsMap == null) {
+ return;
+ }
+ for (String serverInstanceId : _serverQueryStatsMap.keySet()) {
+ String serverTag = "server." + serverInstanceId;
+
_brokerMetrics.removeGauge(BrokerGauge.ADAPTIVE_SERVER_NUM_IN_FLIGHT_REQUESTS.getGaugeName()
+ "." + serverTag);
+
_brokerMetrics.removeGauge(BrokerGauge.ADAPTIVE_SERVER_LATENCY_EMA.getGaugeName()
+ "." + serverTag);
+
_brokerMetrics.removeGauge(BrokerGauge.ADAPTIVE_SERVER_HYBRID_SCORE.getGaugeName()
+ "." + serverTag);
+ }
+ LOGGER.info("Removed adaptive server routing stats gauges for {}
servers.", _serverQueryStatsMap.size());
+ }
+
private void recordQueueSizeMetrics() {
int queueSize = getQueueSize();
_brokerMetrics.setValueOfGlobalGauge(BrokerGauge.ROUTING_STATS_MANAGER_QUEUE_SIZE,
queueSize);
}
private void exportStatsAsMetrics() {
+ if (!_enableStatsMetricExport) {
+ return;
+ }
try {
exportStatsForMap(_serverQueryStatsMap, "server.",
BrokerGauge.ADAPTIVE_SERVER_NUM_IN_FLIGHT_REQUESTS,
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManagerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManagerTest.java
index 17b536091be..79405be3341 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManagerTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManagerTest.java
@@ -22,6 +22,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pinot.common.metrics.BrokerGauge;
import org.apache.pinot.common.metrics.BrokerMetrics;
@@ -456,6 +457,137 @@ public class ServerRoutingStatsManagerTest {
assertTrue(fastScore < slowScore, "Idle servers should be ranked by
latency");
}
+ @Test
+ public void testStatsMetricExportDynamicToggle() throws InterruptedException
{
+ Map<String, Object> properties = new HashMap<>();
+
properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_ENABLE_STATS_COLLECTION,
true);
+
properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_EWMA_ALPHA,
1.0);
+
properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_AUTODECAY_WINDOW_MS,
-1);
+
properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_WARMUP_DURATION_MS,
0);
+
properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_AVG_INITIALIZATION_VAL,
0.0);
+
properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_HYBRID_SCORE_EXPONENT,
3);
+ // Start with metric export disabled.
+
properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_ENABLE_STATS_METRIC_EXPORT,
false);
+
properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_STATS_METRIC_EXPORT_INTERVAL_MS,
50L);
+ ServerRoutingStatsManager manager = new ServerRoutingStatsManager(new
PinotConfiguration(properties),
+ _brokerMetrics);
+ manager.init();
+
+ int requestId = 0;
+ manager.recordStatsForQuerySubmission(requestId++, "dynamicToggleServer");
+ waitForStatsUpdate(manager, requestId);
+ manager.recordStatsUponResponseArrival(requestId++, "dynamicToggleServer",
100);
+ waitForStatsUpdate(manager, requestId);
+
+ String numInFlightKey =
BrokerGauge.ADAPTIVE_SERVER_NUM_IN_FLIGHT_REQUESTS.getGaugeName() + "."
+ + "server.dynamicToggleServer";
+
+ // Confirm no metrics while disabled.
+ Thread.sleep(200);
+ assertNull(_brokerMetrics.getGaugeValue(numInFlightKey));
+
+ // Enable via cluster config change and verify metrics are now exported.
+ manager.onChange(
+
Set.of(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_ENABLE_STATS_METRIC_EXPORT),
+
Map.of(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_ENABLE_STATS_METRIC_EXPORT,
"true"));
+
+ TestUtils.waitForCondition(aVoid ->
_brokerMetrics.getGaugeValue(numInFlightKey) != null,
+ 50L, 5000, "Timed out waiting for metrics after dynamic enable");
+
+ // Disable again and verify the gauges are removed from the registry.
+ manager.onChange(
+
Set.of(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_ENABLE_STATS_METRIC_EXPORT),
+
Map.of(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_ENABLE_STATS_METRIC_EXPORT,
"false"));
+
+ assertNull(_brokerMetrics.getGaugeValue(numInFlightKey));
+ }
+
+ @Test
+ public void testStatsMetricExportIntervalDynamicUpdate() throws
InterruptedException {
+ Map<String, Object> properties = new HashMap<>();
+
properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_ENABLE_STATS_COLLECTION,
true);
+
properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_EWMA_ALPHA,
1.0);
+
properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_AUTODECAY_WINDOW_MS,
-1);
+
properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_WARMUP_DURATION_MS,
0);
+
properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_AVG_INITIALIZATION_VAL,
0.0);
+
properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_HYBRID_SCORE_EXPONENT,
3);
+
properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_ENABLE_STATS_METRIC_EXPORT,
true);
+ // Start with a very long interval so the task won't fire during the test.
+
properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_STATS_METRIC_EXPORT_INTERVAL_MS,
100000L);
+ ServerRoutingStatsManager manager = new ServerRoutingStatsManager(new
PinotConfiguration(properties),
+ _brokerMetrics);
+ manager.init();
+
+ int requestId = 0;
+ manager.recordStatsForQuerySubmission(requestId++, "intervalUpdateServer");
+ waitForStatsUpdate(manager, requestId);
+ manager.recordStatsUponResponseArrival(requestId++,
"intervalUpdateServer", 100);
+ waitForStatsUpdate(manager, requestId);
+
+ String numInFlightKey =
BrokerGauge.ADAPTIVE_SERVER_NUM_IN_FLIGHT_REQUESTS.getGaugeName()
+ + ".server.intervalUpdateServer";
+
+ // No export yet — interval is too long.
+ Thread.sleep(200);
+ assertNull(_brokerMetrics.getGaugeValue(numInFlightKey));
+
+ // Shorten the interval via cluster config change and verify metrics are
now exported.
+ manager.onChange(
+
Set.of(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_STATS_METRIC_EXPORT_INTERVAL_MS),
+
Map.of(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_STATS_METRIC_EXPORT_INTERVAL_MS,
"50"));
+
+ TestUtils.waitForCondition(aVoid ->
_brokerMetrics.getGaugeValue(numInFlightKey) != null,
+ 50L, 5000, "Timed out waiting for metrics after interval update");
+ }
+
+ @Test
+ public void testStatsMetricExportIntervalDynamicUpdateIgnoresBadValues() {
+ Map<String, Object> properties = new HashMap<>();
+
properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_ENABLE_STATS_COLLECTION,
true);
+
properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_EWMA_ALPHA,
1.0);
+
properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_AUTODECAY_WINDOW_MS,
-1);
+
properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_WARMUP_DURATION_MS,
0);
+
properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_AVG_INITIALIZATION_VAL,
0.0);
+
properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_HYBRID_SCORE_EXPONENT,
3);
+
properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_ENABLE_STATS_METRIC_EXPORT,
true);
+
properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_STATS_METRIC_EXPORT_INTERVAL_MS,
100000L);
+ ServerRoutingStatsManager manager = new ServerRoutingStatsManager(new
PinotConfiguration(properties),
+ _brokerMetrics);
+ manager.init();
+
+ long intervalBefore = manager.getStatsMetricExportIntervalMs();
+
+ // Non-numeric value: must be silently ignored without throwing.
+ manager.onChange(
+
Set.of(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_STATS_METRIC_EXPORT_INTERVAL_MS),
+
Map.of(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_STATS_METRIC_EXPORT_INTERVAL_MS,
"abc"));
+ assertEquals(manager.getStatsMetricExportIntervalMs(), intervalBefore,
+ "Interval must not change on non-numeric config value");
+
+ // Zero: must be silently ignored.
+ manager.onChange(
+
Set.of(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_STATS_METRIC_EXPORT_INTERVAL_MS),
+
Map.of(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_STATS_METRIC_EXPORT_INTERVAL_MS,
"0"));
+ assertEquals(manager.getStatsMetricExportIntervalMs(), intervalBefore,
+ "Interval must not change on zero config value");
+
+ // Negative value: must be silently ignored.
+ manager.onChange(
+
Set.of(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_STATS_METRIC_EXPORT_INTERVAL_MS),
+
Map.of(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_STATS_METRIC_EXPORT_INTERVAL_MS,
"-1"));
+ assertEquals(manager.getStatsMetricExportIntervalMs(), intervalBefore,
+ "Interval must not change on negative config value");
+
+ // Key removed from cluster config — must fall back to the static broker
config value (100000L).
+ manager.onChange(
+
Set.of(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_STATS_METRIC_EXPORT_INTERVAL_MS),
+ Collections.emptyMap());
+ assertEquals(manager.getStatsMetricExportIntervalMs(), 100000L,
+ "Interval must revert to static config when cluster key is removed");
+
+ manager.shutDown();
+ }
+
@Test
public void testMseAndSseStatsIsolation() {
Map<String, Object> properties = new HashMap<>();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]