avantgardnerio opened a new issue, #23089: URL: https://github.com/apache/datafusion/issues/23089
## What is the problem the feature is trying to solve? Several downstream consumers want to know, at runtime, the lex-min / lex-max of a partition's output along its declared output ordering. Today there is no way to ask an `ExecutionPlan` for this. The need shows up in at least three places: 1. **Parallelizing single-partition window functions (RANGE frames, no PARTITION BY).** To fan a single sorted stream into N parallel output partitions without a partition key, a planner needs the global value range so it can compute interior split points. Statistics-based estimates from the catalog are too loose; we need what the sort actually produced. This is the use case that motivated the work — see the existing draft PR [#23026](https://github.com/apache/datafusion/pull/23026) for the full picture. 2. **Range-elimination rules.** A future optimizer could prune downstream filters or skip irrelevant data when it can prove a partition's range doesn't intersect a predicate's range. Needs observed extrema from the upstream. 3. **Dynamic-range repartitioning, complementing `Partitioning::Range`.** [#22207](https://github.com/apache/datafusion/pull/22207) landed `Partitioning::Range { ordering, split_points }` for the *declarative* case — split points known at plan time, e.g. from a `TableProvider` declaring its output partitioning. A natural follow-up is a *dynamic* variant where split points are discovered at execute time from an upstream's actual data range. Runtime partition extrema are the missing primitive: the dynamic variant routes rows by a runtime-computed boundary set, and downstream operators learn the per-bucket boundaries through the same trait method. Today the only way to get this is to drain the operator into a separate aggregate pass — which defeats the point. ## Describe the solution you'd like A small addition to `ExecutionPlan`: ```rust pub enum ExtremaKind { /// `min` / `max` literally bound the partition's data. /// `SortExec` is the canonical implementer. Observed, /// `min` / `max` describe the partition's primary range. /// The partition deliberately carries rows outside that range /// (a "halo"); a downstream operator is contracted to filter /// back to the range. No operator in this branch returns /// `Expanded`; a future dynamic-range repartition (e.g. a /// `Partitioning::DynamicRange` routed through `RepartitionExec`) /// would. Expanded, } pub struct PartitionExtrema { pub kind: ExtremaKind, pub min: Vec<ScalarValue>, pub max: Vec<ScalarValue>, pub row_count: usize, } // New trait method on ExecutionPlan, default Ok(None): fn runtime_partition_extrema( &self, partition: usize, ) -> Result<Option<PartitionExtrema>>; ``` Initial overrides: - `SortExec` populates a per-partition slot inside the sort code path; the override returns the slot's contents once execution has folded any sorted chunk into it. Reading before the upstream has been driven enough returns `Ok(None)` rather than panicking. - `BoundedWindowAggExec` passthroughs to its input (BWAG extends input equivalence properties, preserving the leading sort exprs along the same column indices in its output). Implementation lives at `coralogix/arrow-datafusion@brent/partition-extrema` ([compare view](https://github.com/coralogix/arrow-datafusion/compare/apache:main...coralogix:arrow-datafusion:brent/partition-extrema)). Three commits, +500 / -10 LoC, branched off `apache/main`. 7 unit tests cover observer behavior + caller contract; `cargo clippy --all-features --all-targets -- -D warnings` and `cargo fmt` clean. ## Design points worth debating 1. **One trait method, two semantics, encoded by `ExtremaKind`.** The downstream consumer of "Expanded" extrema and the upstream consumer of "Observed" extrema want exactly the same shape — `Vec<ScalarValue>` per endpoint, matched against the output ordering, populated once the partition is ready to be read. Splitting into two trait methods would force every passthrough operator to implement both with identical bodies. The contract is documented at each implementer's call site. Alternative: a pure enum with arm-specific data — rejected because passthroughs that don't care about the variant would still need to `match`. See the type-level rustdoc in the branch for the full rationale. 2. **Caller contract: silent `Ok(None)` if read too early, not a panic.** Reading before the operator's slot is populated returns `Ok(None)`. The progress contract is that callers drive `execute(partition)` past the point where the implementing operator can know. Tradeoff: easier to misuse silently vs. panicking. The branch's rustdoc spells this out as a doc-invariant rather than a runtime gate. 3. **Passthrough policy.** Operators whose output ordering matches the input's leading-key ordering MAY forward via `self.input.runtime_partition_extrema(...)`, preserving `kind` unchanged. The trait does not enforce this — it's per-operator. `BoundedWindowAggExec` is wired in this branch; `ProjectionExec` (conditional on the leading sort col surviving) and `SortPreservingMergeExec` (an N→1 reducer, not a passthrough) are follow-ups. ## Describe alternatives you've considered - **Statistics-shaped runtime stats.** First version of the spike used `Statistics`/`ColumnStatistics`. Doesn't fit: sort keys are arbitrary expressions, not just columns, and the lex-extreme row's trailing-key values aren't natural extrema of those keys. - **Async signaling on the trait method.** Overengineered for our case — every implementer that has an answer has it at a known point in execution (post-buffer for `SortExec`). The caller contract handles "not yet" via `Ok(None)`. - **Side-channel between operators** (instead of a uniform trait). Threading boundary state through specific operator pairs is fragile and doesn't generalize to other range-aware optimizations (point 2 above). ## Coexistence with existing work `Partitioning::Range` / `RangePartitioning { ordering, split_points }` from [#22207](https://github.com/apache/datafusion/pull/22207), [#22607](https://github.com/apache/datafusion/pull/22607), [#22777](https://github.com/apache/datafusion/pull/22777) (@gene-bordegaray, Datadog) covers the declarative case: split points known at plan time. `PartitionExtrema` doesn't change that path. What it enables is a future *dynamic* sibling — where the boundary set is discovered at runtime — without inventing a parallel runtime-stats facility. @stuhood's [comment on #22395](https://github.com/apache/datafusion/issues/22395#issuecomment-4618822694) about overlapping output partitioning is the design point a dynamic-range variant could explore. ## Additional context This issue is the discussion home for the API. If reviewers agree on the shape, the branch can be cleaned into a single PR (pure addition, zero behavior change, default `Ok(None)` so nothing currently in tree calls or implements the method). Window-function parallelization and a dynamic-range repartitioning variant land as separate follow-ups on top. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
