gortiz commented on PR #18649:
URL: https://github.com/apache/pinot/pull/18649#issuecomment-4624170208
This is a really useful feature — having progress while a query runs is
something users ask for constantly. A few thoughts on how it interacts with
#18458 (SubmitWithStream bidi stats), which I think is worth addressing before
merge since the two PRs modify some of the same infrastructure.
**Merge conflict in `OpChainSchedulerService`**
Both PRs add fields and lifecycle logic to `OpChainSchedulerService`.
Concretely:
This PR adds:
- `_executionContextCache` (Guava `Cache<Long, QueryExecutionContext>`,
time-based eviction)
- `_completedProgressStatsCache` (Guava `Cache<Long, QueryProgressStats>`,
time-based eviction)
- `trackExecutionContext()` / `retainCompletedProgressStatsIfFinished()` in
the `FutureCallback`
#18458 adds:
- `_executionContextByRequest` (`ConcurrentMap<Long,
QueryExecutionContext>`, ref-counted)
- `_activeOpChainsByRequest` (`ConcurrentMap<Long, AtomicInteger>`,
reference counter)
- `decrementActiveOpChains()` in the same `FutureCallback`
- `OpChainCompletionListener` — a per-request callback that fires on
op-chain completion with the full `MultiStageQueryStats` payload
The `_executionContextByRequest` in #18458 is a strictly better version of
`_executionContextCache` here: it keeps the context alive for exactly as long
as op-chains are running (ref-counted) rather than until a timer fires. A Guava
time-based cache can either evict a live context prematurely (returning null
for a running query) or retain a completed context longer than needed. The
ref-counted map avoids both failure modes.
I'd suggest deferring this PR until after #18458 merges, then replacing
`_executionContextCache` with `_executionContextByRequest` and building the
progress lifecycle on top of `OpChainCompletionListener` — which brings me to
the next point.
**`OpChainCompletionListener` enables better MSE progress**
The current MSE progress model counts op-chains as work units. That means
progress only moves when an op-chain *finishes*. In a typical pipeline, leaf
scan op-chains finish early while join and aggregation op-chains run for the
full query duration:
```
time ───────────────────────────────────────────────────────────────►
leaf-0: ████░░░░░░░░░░░░░░░░░░░░░░░ finishes at ~30% of wall-clock
leaf-1: ██████░░░░░░░░░░░░░░░░░░░░░ finishes at ~40%
join-0: ░░░░░░████████████████████░ runs for nearly the whole query
join-1: ░░░░░░████████████████████░ runs for nearly the whole query
agg-0: ░░░░░░░░░░░░░░░░░░█████████ runs near the end
```
With op-chain counting: progress reads 2/5 = 40% for most of the query, then
5/5 = 100% in rapid succession. The bar sits still for the vast majority of the
query duration.
`OpChainCompletionListener` (from #18458) fires with the actual
`MultiStageQueryStats` — including rows scanned, CPU time, rows emitted. This
opens up a much better model:
```java
// At query start: use leaf segment count as the denominator (exact, known
upfront)
ctx.addTotalWorkUnits(totalLeafSegments);
// In OpChainCompletionListener (fires per op-chain, with stats):
if (isLeafStage(opChainId)) {
long rowsScanned = stats.get(LeafOperator.StatKey.NUM_DOCS_SCANNED);
ctx.addProcessedWorkUnits(rowsScanned);
}
// Non-leaf op-chains don't contribute — they're bounded by what the leaves
produce
```
This makes progress increase smoothly as leaf segments are scanned, which is
both more accurate and more informative. It also naturally fixes the
double-counting issue where `addTotalSegmentsToProcess` (in
`ServerQueryExecutorV1Impl`) calls `addTotalWorkUnits` on the same context that
`QueryServer.submitInternal` already called `addTotalWorkUnits(opChainCount)`
on.
**Rows-per-second as the primary signal (optional)**
A related idea worth considering: rather than a percentage (which requires a
reliable denominator), expose `rowsPerSecond` alongside `processedRows`. This
is useful even when the total is unknown:
```
Scanning... 42.3M rows | 1.2M rows/s | ~35s remaining
```
This is the model that both Trino and ClickHouse have converged on:
- **Trino CLI** (`StatusPrinter.java`) computes rows/s and bytes/s from each
polling response and displays them at every tick:
`0:13 [6.45M rows, 560MB] [473K rows/s, 41.1MB/s] [=========>> ] 20%`
The REST API does not have dedicated throughput fields — rates are derived
client-side from `processedRows / elapsedTimeMillis`. No server changes were
needed to add this.
- **ClickHouse HTTP interface** streams `Progress` packets (`read_rows`,
`read_bytes`, `elapsed_ns`) as the query executes, and `clickhouse-client`
computes and displays:
`Progress: 5.3M rows, 2.4GB (234K rows/s., 234MB/s.)`
The server provides the raw counters; the rate is computed at the display
layer.
Both approaches show that rows/s is valuable even without a perfect
denominator. When a percentage is available (Trino has `progressPercentage`,
ClickHouse has `total_rows_to_read`), it appears alongside the throughput; when
not, the throughput alone is shown.
For Pinot, the simplest path is the Trino approach: add `rowsProcessed` and
`elapsedMs` to `QueryProgressStats`, then compute `rowsPerSecond` in the CLI/UI
from successive responses. No server changes needed for V1. When
`totalWorkUnits` is known, ETA follows from `(total - processed) /
rowsPerSecond`. When it isn't, rows/s alone tells the user whether the query is
making progress and at what speed — arguably more actionable than a percentage
built on a plan-cardinality estimate that may be off by an order of magnitude.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]