yashmayya commented on code in PR #18553:
URL: https://github.com/apache/pinot/pull/18553#discussion_r3290158601
##########
pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java:
##########
@@ -305,10 +305,12 @@ protected MultiStageBrokerRequestHandler
createMultiStageBrokerRequestHandler(
QueryQuotaManager queryQuotaManager, TableCache tableCache,
MultiStageQueryThrottler multiStageQueryThrottler, FailureDetector
failureDetector,
ThreadAccountant threadAccountant, MultiClusterRoutingContext
multiClusterRoutingContext,
- WorkerManager workerManager, WorkerManager multiClusterWorkerManager) {
+ WorkerManager workerManager, WorkerManager multiClusterWorkerManager,
+ ServerRoutingStatsManager serverRoutingStatsManager) {
Review Comment:
**MAJOR — backwards-incompatible signature change to a `protected` factory
method.**
`createMultiStageBrokerRequestHandler` is `protected`, which makes it a
de-facto extension point for downstream broker starters (e.g., StarTree's
commercial fork, custom plugin broker subclasses that wire instrumentation).
Adding `ServerRoutingStatsManager` as a required parameter silently breaks any
subclass that overrode the prior 13-arg signature: after upgrade, the
subclass's override is no longer invoked at the new call site (line 531).
The constructor on `MultiStageBrokerRequestHandler` itself preserved the old
13-arg form (line 154 — delegating with `null`) for exactly this kind of
backward compatibility, but this factory method did not.
Suggest: keep the 13-arg factory method (default it to delegate to the new
14-arg one with `_serverRoutingStatsManager`), update the call site at line 531
to use the 14-arg form, and let subclassers migrate at their own pace.
##########
pinot-core/src/main/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManager.java:
##########
@@ -149,6 +153,18 @@ public long getCompletedTaskCount() {
return tpe.getCompletedTaskCount();
}
+ private ConcurrentHashMap<String, ServerRoutingStatsEntry>
getStatsMap(QueryType queryType) {
+ return queryType == QueryType.MSE ? _mseServerQueryStatsMap :
_serverQueryStatsMap;
Review Comment:
**MAJOR — two silent fall-throughs in the new dispatch.**
1. `QueryExecutionContext.QueryType` has three values: `SSE`, `MSE`, `TSE`.
This ternary lumps `TSE` into the SSE bucket. Today `TimeSeriesRequestHandler`
doesn't call `recordStats*`, so this is dormant — but the moment anyone wires
TSE adaptive routing, TSE traffic will silently poison SSE stats, which is the
exact engine cross-talk the PR exists to prevent.
2. The no-arg `getStatsMap()` below defaults to the SSE map when
`QueryThreadContext.getIfAvailable()` is `null`. Any caller invoked from a
non-query thread (debug REST handlers, periodic tasks, future SPI consumers)
silently reads/writes the SSE map with no warning.
Suggest: switch exhaustively on the enum and either throw or log on an
unknown/unset query type, or add a third TSE-map slot up front. At minimum,
document on the method that the caller is expected to run with a
`QueryThreadContext` set.
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java:
##########
@@ -183,6 +214,11 @@ public QueryResult submitAndReduce(RequestContext context,
DispatchableSubPlan d
cancel(requestId);
throw e;
} finally {
+ if (statsManager != null) {
+ for (QueryServerInstance server : incrementedServers) {
+ statsManager.recordStatsUponResponseArrival(requestId,
server.getInstanceId(), -1);
Review Comment:
**MAJOR — latency EMA for MSE entries will be stuck at the init value
forever, and the new MSE gauges will export that stuck value.**
Passing `latency = -1` is handled safely by `updateStatsUponResponseArrival`
(it skips `updateLatency`), but the consequences go beyond "no latency yet":
1. `_latencyMsEMA._lastUpdatedTimeMs` stays at `0` for every MSE entry, so
the EMA's auto-decay scheduled task never fires.
2. `_latencyMsEMA.getAverage()` returns `_avgInitializationVal` (default
`1.0`) for the lifetime of the broker.
3. `ADAPTIVE_SERVER_MSE_LATENCY_EMA` therefore exports a flat constant —
operators wiring dashboards will assume the gauge is broken and stop trusting
it.
4. `ADAPTIVE_SERVER_MSE_HYBRID_SCORE` is `inflight^exp * latencyEma`, which
collapses to `inflight^exp * initVal`. If `_avgInitializationVal == 0.0`, the
MSE hybrid score is identically `0` for all servers.
This isn't just a v0 inert-data concern: `BalancedInstanceSelector` and
`ReplicaGroupInstanceSelector` (the common MSE leaf selectors via
`BaseInstanceSelector._priorityPoolInstanceSelector`) DO invoke
`_adaptiveServerSelector` — and via the new `getStatsMap()`, those reads now
hit the MSE map, where these values are stuck. With `HYBRID` or `LATENCY`
selector configured, MSE routing decisions will be derived from stuck data.
Until the TODO above lands, suggest either: gate
`ADAPTIVE_SERVER_MSE_LATENCY_EMA` / `ADAPTIVE_SERVER_MSE_HYBRID_SCORE` from
being exported (so dashboards don't show misleading flat lines), or fall back
to the SSE entry's latency EMA when the MSE entry has never received a real
latency sample, or document loudly in the metric Javadoc that these gauges are
inert until follow-up.
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java:
##########
@@ -167,10 +168,40 @@ public void start() {
public QueryResult submitAndReduce(RequestContext context,
DispatchableSubPlan dispatchableSubPlan, long timeoutMs,
Map<String, String> queryOptions)
throws Exception {
+ return submitAndReduce(context, dispatchableSubPlan, timeoutMs,
queryOptions, null);
+ }
+
+ /// Same as {@link #submitAndReduce(RequestContext, DispatchableSubPlan,
long, Map)} but records per-server
+ /// in-flight request statistics into {@code statsManager} for use by the
adaptive query router.
+ /// When {@code statsManager} is non-null:
+ /// <ul>
+ /// <li>Each leaf server is registered as having one more in-flight
request via
+ /// {@link ServerRoutingStatsManager#recordStatsForQuerySubmission}
after the fan-out begins.</li>
+ /// <li>After the full fan-out completes (or fails), each server is
decremented via
+ /// {@link ServerRoutingStatsManager#recordStatsUponResponseArrival}
with {@code latency = -1}
+ /// (no latency is recorded at this stage).</li>
+ /// </ul>
+ /// TODO: Replace the coarse end-of-fanout decrement with per-sender arrival
once per-sender EOS
+ /// interception is in place, and record real leaf-stage latency at
that point.
+ public QueryResult submitAndReduce(RequestContext context,
DispatchableSubPlan dispatchableSubPlan, long timeoutMs,
+ Map<String, String> queryOptions, @Nullable ServerRoutingStatsManager
statsManager)
+ throws Exception {
long requestId = context.getRequestId();
Set<QueryServerInstance> servers = new HashSet<>();
+ // Tracks servers where recordStatsForQuerySubmission was actually called,
so the finally block only
+ // decrements servers that were incremented — guarding against a partial
failure in submit().
+ Set<QueryServerInstance> incrementedServers = new HashSet<>();
try {
submit(requestId, dispatchableSubPlan, timeoutMs, servers, queryOptions);
+ // The SSE engine increments before `submit`, but here we increment
after because `submit` populates
+ // the list of servers. Getting the list of servers before calling
`submit` would expose
+ // implementation details of `submit`.
+ if (statsManager != null) {
+ for (QueryServerInstance server : servers) {
+ statsManager.recordStatsForQuerySubmission(requestId,
server.getInstanceId());
Review Comment:
**MAJOR — increment/decrement can be reordered on the stats-manager
executor, causing a transient negative inflight count and a wrong EMA sample.**
`recordStatsForQuerySubmission` (here) and `recordStatsUponResponseArrival`
(line 219 in the `finally`) both enqueue tasks on
`ServerRoutingStatsManager._executorService`, which is a
`newFixedThreadPool(2)` by default. The FIFO submission queue does NOT
guarantee execution order across the two worker threads — for any single server
X, the decrement task can acquire the per-server write lock before the
increment task. When that happens:
- `_numInFlightRequests` momentarily becomes `-1`, then `0` after the
increment lands.
- The transient `-1` is read by any concurrent
`fetchNumInFlightRequestsForServer` because `_numInFlightRequests` is
`volatile`.
- The EMA is sampled only inside
`updateNumInFlightRequestsForQuerySubmission`
(`ServerRoutingStatsEntry.java:99`), so it gets sampled at `0` instead of the
expected `+1`.
This race is preexisting for SSE (`AsyncQueryResponse` uses the same
executor), but the increment→decrement window in MSE is just `runReducer`
duration (often milliseconds) rather than a server-side roundtrip —
meaningfully more likely to reorder.
Options: serialize per-server (per-server work channel keyed by
`serverInstanceId`), single-thread the executor, or apply the decrement
synchronously while submitting the EMA update asynchronously.
##########
pinot-core/src/main/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManager.java:
##########
@@ -52,6 +54,7 @@ public class ServerRoutingStatsManager {
private final BrokerMetrics _brokerMetrics;
private volatile boolean _isEnabled;
private ConcurrentHashMap<String, ServerRoutingStatsEntry>
_serverQueryStatsMap;
+ private ConcurrentHashMap<String, ServerRoutingStatsEntry>
_mseServerQueryStatsMap;
Review Comment:
**MAJOR — debug surface silently omits the new MSE map.**
The new `_mseServerQueryStatsMap` is not exposed by any of the public read
APIs of this class:
- `getServerRoutingStats()` (line 236) still returns only
`_serverQueryStatsMap`, so the `/debug/serverRoutingStats` REST endpoint
(`PinotBrokerDebug.java:280`) returns SSE-only data after this PR.
- `getServerRoutingStatsStr()` (line 243) still iterates only
`_serverQueryStatsMap`.
After this PR, the only programmatic view into MSE adaptive-routing state is
the Prometheus gauges. Operators triaging routing decisions via the debug REST
endpoint will be silently blind for MSE.
Suggest either growing the response payload to `Map<QueryType, Map<String,
ServerRoutingStatsEntry>>` or adding a parallel debug endpoint.
##########
pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java:
##########
@@ -221,4 +224,34 @@ public void
testQueryDispatcherThrowsWhenDeadlinePreExpiredAndAsyncResponseNotPo
_queryDispatcher.submit(REQUEST_ID_GEN.getAndIncrement(),
dispatchableSubPlan, 0L, new HashSet<>(), Map.of());
}
}
+
+ @Test
+ public void testStatsManagerRecordsSubmissionAndArrivalForDispatchedServers()
+ throws Exception {
+ ServerRoutingStatsManager statsManager =
Mockito.mock(ServerRoutingStatsManager.class);
+ String sql = "SELECT * FROM a";
+ long requestId = REQUEST_ID_GEN.getAndIncrement();
+ RequestContext context = new DefaultRequestContext();
+ context.setRequestId(requestId);
+ DispatchableSubPlan plan = _queryEnvironment.planQuery(sql);
+
+ Set<String> expectedInstanceIds = new HashSet<>();
+ for (DispatchablePlanFragment fragment : plan.getQueryStagesWithoutRoot())
{
+ for (QueryServerInstance server :
fragment.getServerInstanceToWorkerIdMap().keySet()) {
+ expectedInstanceIds.add(server.getInstanceId());
+ }
+ }
+ Assert.assertFalse(expectedInstanceIds.isEmpty());
+
+ try (QueryThreadContext ignore = QueryThreadContext.openForMseTest()) {
+ _queryDispatcher.submitAndReduce(context, plan, 10_000L, Map.of(),
statsManager);
+ } catch (NullPointerException e) {
+ // expected: reduce phase fails with mocked MailboxService
+ }
+
+ for (String instanceId : expectedInstanceIds) {
+ Mockito.verify(statsManager).recordStatsForQuerySubmission(requestId,
instanceId);
+ Mockito.verify(statsManager).recordStatsUponResponseArrival(requestId,
instanceId, -1L);
Review Comment:
**MAJOR — this test does not actually verify the contract its production
code claims to defend.**
`Mockito.verify(statsManager).recordStatsForQuerySubmission(...)` only
checks the method was called at least once per server. It does not catch any of
the bugs this PR's production code comments are explicitly guarding against:
1. **Ordering**: a future refactor that moves the decrement out of the
`finally` (or breaks the `incrementedServers` set bookkeeping) still passes
this test.
2. **Partial-failure path**: the inline comment in `submitAndReduce` says
"guarding against a partial failure in submit()". But the only error path
exercised here is `runReducer` throwing NPE due to the mocked `MailboxService`
— i.e., the path where `submit()` succeeded. The actual
partial-`submit()`-failure case isn't covered.
3. **Legacy `submitAndReduce(... null)` overload**: no test asserts the
null-`statsManager` overload doesn't throw or change behavior.
4. **Real-executor behavior**: the manager is a Mockito mock, so the async
executor and per-server lock ordering aren't exercised at all (see related
comment about the executor race on `QueryDispatcher.java:201`).
Suggested additions:
- A test where `submit()` throws after partial dispatch and asserts
`incrementedServers` is empty / `recordStatsUponResponseArrival` is NOT called
for any server.
- A test exercising the legacy 4-arg `submitAndReduce` overload.
- An end-to-end test using a real `ServerRoutingStatsManager` (not a mock)
that asserts `numInFlight == 0` for every server after `submitAndReduce`
returns successfully — which is what the inline comments actually claim.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]