gabotechs opened a new issue, #20184:
URL: https://github.com/apache/datafusion/issues/20184
### Is your feature request related to a problem or challenge?
I found myself consuming heavily the `ExecutionPlan::partition_statistics()`
method, and I'm finding some challenges in the current API that I think would
require breaking changes in order to properly address them.
The challenge can be boiled down to one thing: the current API forces
implementations to always recompute their children stats instead of accepting
them as an argument.
This has two main consequences:
## 1. unnecessary re-computation of statistics
Imagine that I want to recurse the plan, and do operations based on the
stats of different nodes. I would start from the bottom, and I would compute
stats for the leaf node:
```
┌──────────┐
│ Exec │
└─────▲────┘
│
┌─────┴────┐
│ Exec │
└─────▲────┘
│
┌─────┴────┐ .───────────.
│ Exec │ (compute stats)
└──────────┘ `───────────'
```
Now I go to the next node, and compute its stats:
```
┌──────────┐
│ Exec │
└─────▲────┘
│
┌─────┴────┐ .───────────.
│ Exec │ (compute stats)
└─────▲────┘ `───────────'
│
┌─────┴────┐ .───────────. .───────────.
│ Exec │ (compute stats)(compute stats)
└──────────┘ `───────────' `───────────'
```
The stats for the leaf node got recomputed twice unnecessarily
Now I want to finish and compute the head node:
```
┌──────────┐ .───────────.
│ Exec │ (compute stats)
└─────▲────┘ `───────────'
│
┌─────┴────┐ .───────────. .───────────.
│ Exec │ (compute stats)(compute stats)
└─────▲────┘ `───────────' `───────────'
│
┌─────┴────┐ .───────────. .───────────. .───────────.
│ Exec │ (compute stats)(compute stats)(compute stats)
└──────────┘ `───────────' `───────────' `───────────'
```
Note how the number of times stats are recomputed is exponential to the
number of nodes in a plan, and for plans that are arbitrarily complex, this can
become an unnecessary waste of CPU cycles.
## 2. Inability to provide my own statistics
At the current stage of the project, there are still nodes that do not have
a perfect implementation of their statistics
(https://github.com/apache/datafusion/issues/15873), so I would like to be able
fallback in those cases to a conservative estimation that I could still use for
propagating through the plan.
For example, imagine that a node returns a correct estimated amount of rows,
but it does not return a number of distinct values (reported as Absent)
```
┌──────────┐
│ Exec │
└─────▲────┘
│
┌─────┴────┐
│ Exec │
└─────▲────┘
│
.──────────┴────────.
( 10 rows, absent NDV )
`──────────▲────────'
│
┌─────┴────┐
│ Exec │
└──────────┘
```
In this case, I would like to intercept that "absent NDV" and override it
with a sane default, like "10 NDV":
```
┌──────────┐
│ Exec │
└─────▲────┘
│
┌─────┴────┐
│ Exec │
└─────▲────┘
│
..................... .───────────────────.
. 10 rows, absent NDV .( 10 rows, 10 NDV )
...........▲......... `───────────────────'
│
┌─────┴────┐
│ Exec │
└──────────┘
```
I have no way of telling the `Exec` above that I want it to use my "10 rows,
10 NDV" stats in order to compute its estimations, as no matter what it will
re-compute the stats from the node below, yielding again the "10 rows, absent
NDV" values that I wanted to override on the first place.
### Describe the solution you'd like
Extend the `ExecutionPlan::partition_statistics()` method from:
```rust
fn partition_statistics(
&self,
partition: Option<usize>,
) -> Result<Statistics>;
```
to
```rust
fn partition_statistics(
&self,
partition: Option<usize>,
child_stats: Vec<Statistics>,
) -> Result<Statistics>;
```
Not sure how acceptable this breaking change would be, so suggestions that
are non breaking are also appreciated.
### Describe alternatives you've considered
Have a custom wrapper that will return the statistics that I want:
```rust
#[derive(Debug)]
struct StatisticsWrapper {
stats: Statistics,
inner: Arc<dyn ExecutionPlan>,
}
impl DisplayAs for StatisticsWrapper {
delegate! {
to self.inner {
fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) ->
std::fmt::Result;
}
}
}
impl ExecutionPlan for StatisticsWrapper {
fn partition_statistics(&self, partition: Option<usize>) ->
Result<Statistics> {
if partition.is_some() {
return plan_err!("StatisticsWrapper not prepared for
partition-specific stats");
}
Ok(self.stats.clone())
}
delegate! {
to self.inner {
fn name(&self) -> &str;
fn as_any(&self) -> &dyn Any;
fn properties(&self) -> &PlanProperties;
fn maintains_input_order(&self) -> Vec<bool>;
fn benefits_from_input_partitioning(&self) -> Vec<bool>;
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>>;
fn repartitioned(&self, _target_partitions: usize, _config:
&ConfigOptions) -> Result<Option<Arc<dyn ExecutionPlan>>>;
fn execute(&self, partition: usize, context: Arc<TaskContext>)
-> Result<SendableRecordBatchStream>;
fn supports_limit_pushdown(&self) -> bool;
fn with_fetch(&self, _limit: Option<usize>) -> Option<Arc<dyn
ExecutionPlan>>;
fn fetch(&self) -> Option<usize>;
fn cardinality_effect(&self) -> CardinalityEffect;
}
}
fn with_new_children(
self: Arc<Self>,
_: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
not_impl_err!("with_new_children not implemented")
}
}
```
And then compute statistics for nodes like this:
```rust
let statistics_wrapped_children = children_stats
.iter()
.zip(node.children())
.map(|(&stats, child)| StatisticsWrapper {
inner: Arc::clone(child),
stats: stats.clone(),
})
.map(|v| Arc::new(v) as _)
.collect();
Arc::clone(node)
.with_new_children(statistics_wrapped_children)?
.partition_statistics(None)?;
```
### Additional context
Mainly to be used in
https://github.com/datafusion-contrib/datafusion-distributed/pull/311
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]