xiangfu0 opened a new pull request, #18724:
URL: https://github.com/apache/pinot/pull/18724
## Description
Adds `"first"` and `"last"` as accepted `aggregationType` values for
`MergeRollupTask` (and `RealtimeToOfflineSegmentsTask`), e.g.:
```json
"MergeRollupTask": {
"1day.mergeType": "rollup",
"1day.bucketTimePeriod": "1d",
"1day.roundBucketTimePeriod": "1d",
"gaugeMetric.aggregationType": "last"
}
```
The rollup picks the metric value with the earliest/latest event time within
each rollup group, which is useful for gauge-style metrics where summing makes
no sense and the first/last reading per bucket should be kept. The aliases map
to the existing `FIRSTWITHTIME`/`LASTWITHTIME` `AggregationFunctionType`
constants (no new enum values, SQL surface untouched);
`"firstWithTime"`/`"lastWithTime"` are also accepted.
## Design
A naive first/last on rollup would be non-deterministic: rows within a
rollup group are ordered by an unstable quicksort, and the time column is
rounded in place during the map phase, destroying the original ordering before
reduce.
Instead, when an order-sensitive aggregation is configured:
- `SegmentMapper`/`EpochTimeHandler` preserve the original (pre-rounding)
epoch-millis time in a hidden column (`$originalTimeMs$`) appended as the
**last sort field** of the intermediate generic row files.
- `RollupReducer` compares group keys on all sort fields except the hidden
one, so rows within each group arrive sorted by original time; the first/last
aggregators then simply keep the accumulated value / take the new value. Null
values are skipped consistently with the other aggregators (first/last
**non-null** value).
- The hidden column is stripped before output segments are built.
### Semantics note
The ordering is based on the time column values of the input segments of
each rollup task. Within a single rollup pass the ordering is exact
(pre-rounding time). Across multiple passes (multi-level merges, or re-merging
with late arriving data), the ordering is based on the already-rounded time of
the earlier pass, so it is approximate at the granularity of the previous round
bucket.
## Validation
- `MergeRollupTaskGenerator.validateTaskConfigs` now validates
`*.aggregationType` values: the type must parse, and first/last additionally
requires the table to have a time column and the column to be a METRIC in
schema. Invalid aggregation type values now fail at table config validation
instead of only at task runtime.
- Same metric-column check added to
`RealtimeToOfflineSegmentsTaskGenerator`, and `FIRSTWITHTIME`/`LASTWITHTIME`
added to its value-aggregator allowlist.
## Upgrade note
If `first`/`last` is configured while old minions are still running, those
minions fail the task loudly with `IllegalArgumentException` (no silent wrong
results). Configure the new aggregation types after minions are upgraded.
## Testing
- `ReducerTest`: ordering correctness with shuffled input, null handling for
both first and last, and failure when the time column is missing.
- `BaseSegmentProcessorFrameworkTest#testRollupWithFirstLastAggregation`:
end-to-end map/reduce/segment-build with time rounding, partitioning, and
out-of-order input.
- `MergeRollupFirstLastTaskExecutorTest`: minion executor end-to-end with
the `"first"`/`"last"` aliases in task config, verifying time-order (not
input-order) semantics across segments.
- `MergeTaskUtilsTest` / `MergeRollupTaskGeneratorTest`: alias parsing and
config validation.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]