This is an automated email from the ASF dual-hosted git repository.
yashmayya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 2cf1cdc5914 Add configurable queue size floor to adaptive routing
hybrid score (#18288)
2cf1cdc5914 is described below
commit 2cf1cdc59149e9ddabdcd5b57818fcf736f635db
Author: Timothy Elgersma <[email protected]>
AuthorDate: Tue May 12 14:49:37 2026 -0400
Add configurable queue size floor to adaptive routing hybrid score (#18288)
---
.../adaptiveserverselector/HybridSelector.java | 4 +-
.../routing/stats/ServerRoutingStatsEntry.java | 10 +++-
.../routing/stats/ServerRoutingStatsManager.java | 7 ++-
.../stats/ServerRoutingStatsManagerTest.java | 67 ++++++++++++++++++++++
.../apache/pinot/spi/utils/CommonConstants.java | 3 +
5 files changed, 86 insertions(+), 5 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/adaptiveserverselector/HybridSelector.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/adaptiveserverselector/HybridSelector.java
index 68b41b2b013..f7383ceb43b 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/adaptiveserverselector/HybridSelector.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/adaptiveserverselector/HybridSelector.java
@@ -37,8 +37,10 @@ import
org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsMa
*
https://www.usenix.org/system/files/conference/nsdi15/nsdi15-paper-suresh.pdf
*
* The Hybrid score for each server is calculated as follows. The server with
the lowest Hybrid score is picked.
- * HybridScore = Math.pow(A+B, N) * C
+ * HybridScore = Math.pow(F+A+B, N) * C
* N -> Configurable exponent with default value of 3.
+ * F -> Configurable queue size floor with default value of 0. Setting F=1
matches the original paper formulation and
+ * prevents the score from collapsing to 0 when all servers are idle,
ensuring latency is still used for routing.
*/
public class HybridSelector implements AdaptiveServerSelector {
private final ServerRoutingStatsManager _serverRoutingStatsManager;
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsEntry.java
b/pinot-core/src/main/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsEntry.java
index 7aa9a99b959..970ccd1f1e0 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsEntry.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsEntry.java
@@ -20,6 +20,7 @@ package org.apache.pinot.core.transport.server.routing.stats;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.pinot.common.utils.ExponentialMovingAverage;
@@ -44,8 +45,11 @@ public class ServerRoutingStatsEntry {
// Hybrid score exponent.
private final int _hybridScoreExponent;
+ // Hybrid score queue size floor (added to A+B before exponentiation to
avoid score collapsing to 0 when idle).
+ private final int _hybridScoreQueueFloor;
+
public ServerRoutingStatsEntry(String serverInstanceId, double alphaEMA,
long autoDecayWindowMsEMA,
- long warmupDurationMsEMA, double avgInitializationValEMA, int
scoreExponent,
+ long warmupDurationMsEMA, double avgInitializationValEMA, int
scoreExponent, int queueFloor,
ScheduledExecutorService periodicTaskExecutor) {
_serverInstanceId = serverInstanceId;
_serverLock = new ReentrantReadWriteLock();
@@ -58,6 +62,8 @@ public class ServerRoutingStatsEntry {
periodicTaskExecutor);
_hybridScoreExponent = scoreExponent;
+ Preconditions.checkArgument(queueFloor >= 0, "queueFloor must be
non-negative, got: %s", queueFloor);
+ _hybridScoreQueueFloor = queueFloor;
}
@JsonIgnore
@@ -84,7 +90,7 @@ public class ServerRoutingStatsEntry {
@JsonProperty("hybridScore")
public double computeHybridScore() {
- double estimatedQSize = _numInFlightRequests +
_inFlighRequestsEMA.getAverage();
+ double estimatedQSize = _hybridScoreQueueFloor + _numInFlightRequests +
_inFlighRequestsEMA.getAverage();
return Math.pow(estimatedQSize, _hybridScoreExponent) *
_latencyMsEMA.getAverage();
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManager.java
index 252f15c73ce..6c1bea37f14 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManager.java
@@ -64,6 +64,7 @@ public class ServerRoutingStatsManager {
private long _warmupDurationMs;
private double _avgInitializationVal;
private int _hybridScoreExponent;
+ private int _hybridScoreQueueFloor;
private boolean _enableStatsMetricExport;
public ServerRoutingStatsManager(PinotConfiguration pinotConfig,
BrokerMetrics brokerMetrics) {
@@ -91,6 +92,8 @@ public class ServerRoutingStatsManager {
AdaptiveServerSelector.DEFAULT_AVG_INITIALIZATION_VAL);
_hybridScoreExponent =
_config.getProperty(AdaptiveServerSelector.CONFIG_OF_HYBRID_SCORE_EXPONENT,
AdaptiveServerSelector.DEFAULT_HYBRID_SCORE_EXPONENT);
+ _hybridScoreQueueFloor =
_config.getProperty(AdaptiveServerSelector.CONFIG_OF_HYBRID_SCORE_QUEUE_FLOOR,
+ AdaptiveServerSelector.DEFAULT_HYBRID_SCORE_QUEUE_FLOOR);
int threadPoolSize =
_config.getProperty(AdaptiveServerSelector.CONFIG_OF_STATS_MANAGER_THREADPOOL_SIZE,
AdaptiveServerSelector.DEFAULT_STATS_MANAGER_THREADPOOL_SIZE);
@@ -167,7 +170,7 @@ public class ServerRoutingStatsManager {
private void updateStatsAfterQuerySubmission(String serverInstanceId) {
ServerRoutingStatsEntry stats =
_serverQueryStatsMap.computeIfAbsent(serverInstanceId,
k -> new ServerRoutingStatsEntry(serverInstanceId, _alpha,
_autoDecayWindowMs, _warmupDurationMs,
- _avgInitializationVal, _hybridScoreExponent,
_periodicTaskExecutor));
+ _avgInitializationVal, _hybridScoreExponent,
_hybridScoreQueueFloor, _periodicTaskExecutor));
try {
stats.getServerWriteLock().lock();
@@ -197,7 +200,7 @@ public class ServerRoutingStatsManager {
private void updateStatsUponResponseArrival(String serverInstanceId, long
latencyMs) {
ServerRoutingStatsEntry stats =
_serverQueryStatsMap.computeIfAbsent(serverInstanceId,
k -> new ServerRoutingStatsEntry(serverInstanceId, _alpha,
_autoDecayWindowMs, _warmupDurationMs,
- _avgInitializationVal, _hybridScoreExponent,
_periodicTaskExecutor));
+ _avgInitializationVal, _hybridScoreExponent,
_hybridScoreQueueFloor, _periodicTaskExecutor));
try {
stats.getServerWriteLock().lock();
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManagerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManagerTest.java
index f188604ef24..988236d557f 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManagerTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManagerTest.java
@@ -388,6 +388,73 @@ public class ServerRoutingStatsManagerTest {
assertNull(_brokerMetrics.getGaugeValue(numInFlightKey));
}
+ @Test
+ public void testHybridScoreWithQueueFloor() {
+ // With floor=1 the formula is Math.pow(1+A+B, N)*C instead of
Math.pow(A+B, N)*C.
+ // This prevents the score from collapsing to 0 when all servers are idle
so that latency
+ // still drives routing decisions in low-traffic conditions.
+ Map<String, Object> properties = new HashMap<>();
+
properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_ENABLE_STATS_COLLECTION,
true);
+
properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_EWMA_ALPHA,
1.0);
+
properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_AUTODECAY_WINDOW_MS,
-1);
+
properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_WARMUP_DURATION_MS,
0);
+
properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_AVG_INITIALIZATION_VAL,
0.0);
+
properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_HYBRID_SCORE_EXPONENT,
3);
+
+ // floor=0 (default): after 1 submit + 1 response at latency=10,
+ // numInFlight=0, inFlightEMA=1, latencyEMA=10 -> (0+1)^3 * 10 = 10.
+
properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_HYBRID_SCORE_QUEUE_FLOOR,
0);
+ ServerRoutingStatsManager managerNoFloor =
+ new ServerRoutingStatsManager(new PinotConfiguration(properties),
_brokerMetrics);
+ managerNoFloor.init();
+ int requestId = 0;
+ managerNoFloor.recordStatsForQuerySubmission(requestId++, "floorServer");
+ waitForStatsUpdate(managerNoFloor, requestId);
+ managerNoFloor.recordStatsUponResponseArrival(requestId++, "floorServer",
10);
+ waitForStatsUpdate(managerNoFloor, requestId);
+ assertEquals(managerNoFloor.fetchHybridScoreForServer("floorServer"),
10.0);
+
+ // floor=1: same sequence -> (1+0+1)^3 * 10 = 80.
+
properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_HYBRID_SCORE_QUEUE_FLOOR,
1);
+ ServerRoutingStatsManager managerWithFloor =
+ new ServerRoutingStatsManager(new PinotConfiguration(properties),
_brokerMetrics);
+ managerWithFloor.init();
+ requestId = 0;
+ managerWithFloor.recordStatsForQuerySubmission(requestId++, "floorServer");
+ waitForStatsUpdate(managerWithFloor, requestId);
+ managerWithFloor.recordStatsUponResponseArrival(requestId++,
"floorServer", 10);
+ waitForStatsUpdate(managerWithFloor, requestId);
+ assertEquals(managerWithFloor.fetchHybridScoreForServer("floorServer"),
80.0);
+
+ // Multiple idle servers (numInFlight=0 after queries complete) should be
ranked by latency.
+ // In this config (autodecay disabled, alpha=1) inFlightEMA freezes at 1
after the first
+ // submission, so with floor=1: score = (1+0+1)^3 * latency = 8 * latency.
+ // The server with lower observed latency gets a lower score and is
therefore preferred.
+ // When autodecay is enabled, inFlightEMA eventually decays to 0 during
idle periods; at that
+ // point floor=0 collapses all scores to (0+0+0)^3 * latency = 0,
destroying latency ordering,
+ // while floor=1 keeps score = 1^3 * latency = latency and preserves the
ranking.
+
properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_HYBRID_SCORE_QUEUE_FLOOR,
1);
+ ServerRoutingStatsManager managerMultiServer =
+ new ServerRoutingStatsManager(new PinotConfiguration(properties),
_brokerMetrics);
+ managerMultiServer.init();
+ requestId = 0;
+
+ managerMultiServer.recordStatsForQuerySubmission(requestId++,
"fastServer");
+ waitForStatsUpdate(managerMultiServer, requestId);
+ managerMultiServer.recordStatsUponResponseArrival(requestId++,
"fastServer", 5);
+ waitForStatsUpdate(managerMultiServer, requestId);
+
+ managerMultiServer.recordStatsForQuerySubmission(requestId++,
"slowServer");
+ waitForStatsUpdate(managerMultiServer, requestId);
+ managerMultiServer.recordStatsUponResponseArrival(requestId++,
"slowServer", 20);
+ waitForStatsUpdate(managerMultiServer, requestId);
+
+ // Both servers are now idle (numInFlight=0). The fast server should rank
better (lower score).
+ double fastScore =
managerMultiServer.fetchHybridScoreForServer("fastServer");
+ double slowScore =
managerMultiServer.fetchHybridScoreForServer("slowServer");
+ assertTrue(fastScore < slowScore, "Idle servers should be ranked by
latency");
+ }
+
private void assertStatsNullForInstance(ServerRoutingStatsManager manager,
String instanceId) {
Integer numInFlightReq =
manager.fetchNumInFlightRequestsForServer(instanceId);
assertNull(numInFlightReq);
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index fbe1fe743f3..768d46d2144 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -1134,6 +1134,9 @@ public class CommonConstants {
// Parameters related to Hybrid score.
public static final String CONFIG_OF_HYBRID_SCORE_EXPONENT =
CONFIG_PREFIX + ".hybrid.score.exponent";
public static final int DEFAULT_HYBRID_SCORE_EXPONENT = 3;
+ public static final String CONFIG_OF_HYBRID_SCORE_QUEUE_FLOOR =
+ CONFIG_PREFIX + ".hybrid.score.queue.size.floor";
+ public static final int DEFAULT_HYBRID_SCORE_QUEUE_FLOOR = 0;
// Threadpool size of ServerRoutingStatsManager. This controls the
number of threads available to update routing
// stats for servers upon query submission and response arrival.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]