gabotechs opened a new issue, #20184:
URL: https://github.com/apache/datafusion/issues/20184

   ### Is your feature request related to a problem or challenge?
   
   I found myself consuming heavily the `ExecutionPlan::partition_statistics()` 
method, and I'm finding some challenges in the current API that I think would 
require breaking changes in order to properly address them.
   
   The challenge can be boiled down to one thing: the current API forces 
implementations to always recompute their children stats instead of accepting 
them as an argument.
   
   This has two main consequences:
   
   ## 1. unnecessary re-computation of statistics
   
   Imagine that I want to recurse the plan, and do operations based on the 
stats of different nodes. I would start from the bottom, and I would compute 
stats for the leaf node:
   
   ```
   ┌──────────┐                
   │   Exec   │                
   └─────▲────┘                
         │                     
   ┌─────┴────┐                
   │   Exec   │                
   └─────▲────┘                
         │                     
   ┌─────┴────┐  .───────────. 
   │   Exec   │ (compute stats)
   └──────────┘  `───────────' 
   ```
   
   Now I go to the next node, and compute its stats:
   
   ```
   ┌──────────┐                               
   │   Exec   │                               
   └─────▲────┘                               
         │                                    
   ┌─────┴────┐  .───────────.                
   │   Exec   │ (compute stats)               
   └─────▲────┘  `───────────'                
         │                                    
   ┌─────┴────┐  .───────────.  .───────────. 
   │   Exec   │ (compute stats)(compute stats)
   └──────────┘  `───────────'  `───────────' 
   ```
   
   The stats for the leaf node got recomputed twice unnecessarily
   
   Now I want to finish and compute the head node:
   
   ```
   ┌──────────┐  .───────────.                               
   │   Exec   │ (compute stats)                              
   └─────▲────┘  `───────────'                               
         │                                                   
   ┌─────┴────┐  .───────────.  .───────────.                
   │   Exec   │ (compute stats)(compute stats)               
   └─────▲────┘  `───────────'  `───────────'                
         │                                                   
   ┌─────┴────┐  .───────────.  .───────────.  .───────────. 
   │   Exec   │ (compute stats)(compute stats)(compute stats)
   └──────────┘  `───────────'  `───────────'  `───────────' 
   ```
   
   Note how the number of times stats are recomputed is exponential to the 
number of nodes in a plan, and for plans that are arbitrarily complex, this can 
become an unnecessary waste of CPU cycles.
   
   ## 2. Inability to provide my own statistics
   
   At the current stage of the project, there are still nodes that do not have 
a perfect implementation of their statistics 
(https://github.com/apache/datafusion/issues/15873), so I would like to be able 
fallback in those cases to a conservative estimation that I could still use for 
propagating through the plan.
   
   For example, imagine that a node returns a correct estimated amount of rows, 
but it does not return a number of distinct values (reported as Absent)
   
   ```
         ┌──────────┐     
         │   Exec   │     
         └─────▲────┘     
               │          
         ┌─────┴────┐     
         │   Exec   │     
         └─────▲────┘     
               │          
    .──────────┴────────. 
   ( 10 rows, absent NDV )
    `──────────▲────────' 
               │          
         ┌─────┴────┐     
         │   Exec   │     
         └──────────┘     
   ```
   
   In this case, I would like to intercept that "absent NDV" and override it 
with a sane default, like "10 NDV":
   
   ```
         ┌──────────┐                            
         │   Exec   │                            
         └─────▲────┘                            
               │                                 
         ┌─────┴────┐                            
         │   Exec   │                            
         └─────▲────┘                            
               │                                 
    .....................  .───────────────────. 
   . 10 rows, absent NDV .(   10 rows, 10 NDV   )
    ...........▲.........  `───────────────────' 
               │                                 
         ┌─────┴────┐                            
         │   Exec   │                            
         └──────────┘                            
   ```
   
   I have no way of telling the `Exec` above that I want it to use my "10 rows, 
10 NDV" stats in order to compute its estimations, as no matter what it will 
re-compute the stats from the node below, yielding again the "10 rows, absent 
NDV" values that I wanted to override on the first place.
   
   ### Describe the solution you'd like
   
   Extend the `ExecutionPlan::partition_statistics()` method from:
   ```rust
       fn partition_statistics(
           &self,
           partition: Option<usize>,
       ) -> Result<Statistics>;
   ```
   to
   ```rust
       fn partition_statistics(
           &self,
           partition: Option<usize>,
           child_stats: Vec<Statistics>,
       ) -> Result<Statistics>;
   ```
   
   Not sure how acceptable this breaking change would be, so suggestions that 
are non breaking are also appreciated.
   
   ### Describe alternatives you've considered
   
   Have a custom wrapper that will return the statistics that I want:
   ```rust
   #[derive(Debug)]
   struct StatisticsWrapper {
       stats: Statistics,
       inner: Arc<dyn ExecutionPlan>,
   }
   
   impl DisplayAs for StatisticsWrapper {
       delegate! {
           to self.inner {
               fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> 
std::fmt::Result;
           }
       }
   }
   
   impl ExecutionPlan for StatisticsWrapper {
       fn partition_statistics(&self, partition: Option<usize>) -> 
Result<Statistics> {
           if partition.is_some() {
               return plan_err!("StatisticsWrapper not prepared for 
partition-specific stats");
           }
           Ok(self.stats.clone())
       }
   
       delegate! {
           to self.inner {
               fn name(&self) -> &str;
               fn as_any(&self) -> &dyn Any;
               fn properties(&self) -> &PlanProperties;
               fn maintains_input_order(&self) -> Vec<bool>;
               fn benefits_from_input_partitioning(&self) -> Vec<bool>;
               fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>>;
               fn repartitioned(&self, _target_partitions: usize, _config: 
&ConfigOptions) -> Result<Option<Arc<dyn ExecutionPlan>>>;
               fn execute(&self, partition: usize, context: Arc<TaskContext>) 
-> Result<SendableRecordBatchStream>;
               fn supports_limit_pushdown(&self) -> bool;
               fn with_fetch(&self, _limit: Option<usize>) -> Option<Arc<dyn 
ExecutionPlan>>;
               fn fetch(&self) -> Option<usize>;
               fn cardinality_effect(&self) -> CardinalityEffect;
           }
       }
   
       fn with_new_children(
           self: Arc<Self>,
           _: Vec<Arc<dyn ExecutionPlan>>,
       ) -> Result<Arc<dyn ExecutionPlan>> {
           not_impl_err!("with_new_children not implemented")
       }
   }
   ```
   
   And then compute statistics for nodes like this:
   
   ```rust
       let statistics_wrapped_children = children_stats
           .iter()
           .zip(node.children())
           .map(|(&stats, child)| StatisticsWrapper {
               inner: Arc::clone(child),
               stats: stats.clone(),
           })
           .map(|v| Arc::new(v) as _)
           .collect();
   
   Arc::clone(node)
           .with_new_children(statistics_wrapped_children)?
           .partition_statistics(None)?;
   ```
   
   ### Additional context
   
   Mainly to be used in 
https://github.com/datafusion-contrib/datafusion-distributed/pull/311


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to