2010YOUY01 commented on PR #21815:
URL: https://github.com/apache/datafusion/pull/21815#issuecomment-4697341372
Thank you, the implementation is neat! I believe it solves the recomputation
issue.
I have one idea to improve the API:
Currently, the caching logic is explicitly implemented inside each
operator’s statistics computation. We could decouple cache management from the
operator-level statistics propagation, so that the implementation is easier to
evolve.
The idea would look like this:
### Stateless API inside `ExecutionPlan`
The API inside `ExecutionPlan` should only take input statistics and compute
output statistics. Cache operations should be managed externally.
For example, the operator-level API could look like:
```rust
fn statistics_from_inputs(
&self,
input_stats: &[Arc<Statistics>],
partition: Option<usize>,
) -> Result<Arc<Statistics>>;
```
For a unary operator such as `FilterExec`, the implementation would only
describe the local propagation logic:
```rust
fn statistics_from_inputs(
&self,
input_stats: &[Arc<Statistics>],
partition: Option<usize>,
) -> Result<Arc<Statistics>> {
let input_stats = input_stats[0].as_ref();
let stats = Self::statistics_helper(
&self.input.schema(),
input_stats,
self.predicate(),
self.default_selectivity,
)?;
Ok(Arc::new(stats.project(self.projection.as_ref())))
}
```
This keeps the operator implementation focused on the question:
> Given the input statistics, how do I derive the output statistics?
It does not need to know whether the input statistics came from a cache,
from recursive computation, or from an external statistics provider.
### External `StatisticsContext` layer for cache operations
The cache lookup, recursive child computation, and cache insertion can be
handled by an external `StatisticsContext`:
```rust
impl StatisticsContext {
pub fn compute_statistics(
&self,
plan: &Arc<dyn ExecutionPlan>,
partition: Option<usize>,
) -> Result<Arc<Statistics>> {
if let Some(cached) = self.cache.get(plan, partition) {
return Ok(cached);
}
let input_stats = plan
.children()
.iter()
.map(|child| self.compute_statistics(child, partition))
.collect::<Result<Vec<_>>>()?;
let stats = plan.statistics_from_inputs(&input_stats, partition)?;
self.cache.insert(plan, partition, Arc::clone(&stats));
Ok(stats)
}
}
```
With this structure, each `ExecutionPlan` only implements local statistics
propagation, while `StatisticsContext` owns traversal and caching.
### Reasons
We are still uncertain about the long-term implementation details of
statistics propagation, so it is likely that this layer will need frequent
changes.
One specific example is cached per-partition statistics. In the future, the
cache may need to become more fine-grained. For example, suppose `partition[1]`
already has cached `range` and `ndv`, and a new request asks for `range`,
`ndv`, and `histogram`. Ideally, we should only compute the missing `histogram`
on demand, instead of recomputing the whole statistics object.
Such requirements would need significant changes to the caching layer. If
cache operations are embedded directly inside every `ExecutionPlan`, then each
change to the cache model may require touching many operators.
By moving cache management into an external `StatisticsContext`, we can
evolve the caching strategy independently, without propagating large API
changes into every `ExecutionPlan`.
In short, I think the current implementation solves the recomputation issue,
but separating local statistics propagation from cache management may give us a
cleaner long-term API.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]