alamb commented on code in PR #7793:
URL: https://github.com/apache/arrow-datafusion/pull/7793#discussion_r1361006687
##########
datafusion/physical-plan/src/aggregates/mod.rs:
##########
@@ -1005,28 +1005,50 @@ impl ExecutionPlan for AggregateExec {
Some(self.metrics.clone_inner())
}
- fn statistics(&self) -> Statistics {
+ fn statistics(&self) -> Result<Statistics> {
// TODO stats: group expressions:
// - once expressions will be able to compute their own stats, use it
here
// - case where we group by on a column for which with have the
`distinct` stat
// TODO stats: aggr expression:
// - aggregations somtimes also preserve invariants such as min, max...
+ let column_statistics = Statistics::unknown_column(&self.schema());
match self.mode {
AggregateMode::Final | AggregateMode::FinalPartitioned
if self.group_by.expr.is_empty() =>
{
- Statistics {
- num_rows: Some(1),
- is_exact: true,
- ..Default::default()
- }
+ Ok(Statistics {
+ num_rows: Precision::Exact(1),
+ column_statistics,
+ total_byte_size: Precision::Absent,
+ })
+ }
+ _ => {
+ // When the input row count is 0 or 1, we can adopt that
statistic keeping its reliability.
Review Comment:
This change seems related / may conflict with
https://github.com/apache/arrow-datafusion/pull/7832 from @mustafasrepo
##########
datafusion/physical-plan/src/lib.rs:
##########
@@ -17,39 +17,80 @@
//! Traits for physical query plan, supporting parallel execution for
partitioned relations.
-mod topk;
-mod visitor;
-pub use self::metrics::Metric;
-use self::metrics::MetricsSet;
-use self::{
- coalesce_partitions::CoalescePartitionsExec,
display::DisplayableExecutionPlan,
-};
-pub use datafusion_common::{internal_err, ColumnStatistics, Statistics};
-use datafusion_common::{plan_err, Result};
-use datafusion_physical_expr::PhysicalSortExpr;
-pub use topk::TopK;
-pub use visitor::{accept, visit_execution_plan, ExecutionPlanVisitor};
+use std::any::Any;
+use std::fmt::Debug;
+use std::sync::Arc;
+
+use crate::coalesce_partitions::CoalescePartitionsExec;
+use crate::display::DisplayableExecutionPlan;
+use crate::metrics::MetricsSet;
+use crate::repartition::RepartitionExec;
+use crate::sorts::sort_preserving_merge::SortPreservingMergeExec;
use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
-
+use datafusion_common::tree_node::Transformed;
use datafusion_common::utils::DataPtr;
-pub use datafusion_expr::Accumulator;
-pub use datafusion_expr::ColumnarValue;
+use datafusion_common::{plan_err, DataFusionError, Result};
+use datafusion_execution::TaskContext;
use datafusion_physical_expr::equivalence::OrderingEquivalenceProperties;
-pub use display::{DefaultDisplay, DisplayAs, DisplayFormatType,
VerboseDisplay};
+use datafusion_physical_expr::expressions::Column;
+use datafusion_physical_expr::{
+ EquivalenceProperties, PhysicalSortExpr, PhysicalSortRequirement,
+};
+
use futures::stream::TryStreamExt;
-use std::fmt::Debug;
use tokio::task::JoinSet;
-use datafusion_common::tree_node::Transformed;
-use datafusion_common::DataFusionError;
-use std::any::Any;
-use std::sync::Arc;
+// Interfaces:
Review Comment:
I am not sure what this comment refers to
##########
datafusion/physical-plan/src/filter.rs:
##########
@@ -200,56 +200,42 @@ impl ExecutionPlan for FilterExec {
/// The output statistics of a filtering operation can be estimated if the
/// predicate's selectivity value can be determined for the incoming data.
- fn statistics(&self) -> Statistics {
+ fn statistics(&self) -> Result<Statistics> {
let predicate = self.predicate();
- if !check_support(predicate, &self.schema()) {
- return Statistics::default();
+ let schema = self.schema();
+ if !check_support(predicate, &schema) {
+ return Ok(Statistics::new_unknown(&schema));
}
+ let input_stats = self.input.statistics()?;
- let input_stats = self.input.statistics();
- let input_column_stats = match input_stats.column_statistics {
- Some(stats) => stats,
- None => self
- .schema()
- .fields
- .iter()
- .map(|field| {
-
ColumnStatistics::new_with_unbounded_column(field.data_type())
- })
- .collect::<Vec<_>>(),
- };
-
- let starter_ctx =
- AnalysisContext::from_statistics(&self.input.schema(),
&input_column_stats);
-
- let analysis_ctx = match analyze(predicate, starter_ctx) {
- Ok(ctx) => ctx,
- Err(_) => return Statistics::default(),
- };
+ let num_rows = input_stats.num_rows;
+ let total_byte_size = input_stats.total_byte_size;
+ let input_analysis_ctx = AnalysisContext::try_from_statistics(
+ &self.input.schema(),
+ &input_stats.column_statistics,
+ )?;
+ let analysis_ctx = analyze(predicate, input_analysis_ctx)?;
let selectivity = analysis_ctx.selectivity.unwrap_or(1.0);
Review Comment:
```suggestion
// Estimate (inexact) selectivity of predicate
let selectivity = analysis_ctx.selectivity.unwrap_or(1.0);
```
I think this would help explain why the output estimates are `Inexact`
##########
datafusion/core/src/datasource/statistics.rs:
##########
@@ -33,101 +36,95 @@ pub async fn get_statistics_with_limit(
limit: Option<usize>,
) -> Result<(Vec<PartitionedFile>, Statistics)> {
let mut result_files = vec![];
+ // These statistics can be calculated as long as at least one file has
them.
+ // If none of the files provide them, then they will become an absent
precision.
+ // The missing values will be counted as
+ // - zero for summations,
+ // - neutral element for extreme points.
+ let mut null_counts: Vec<Precision<usize>> =
+ vec![Precision::Absent; file_schema.fields().len()];
+ let mut max_values: Vec<Precision<ScalarValue>> =
+ vec![Precision::Absent; file_schema.fields().len()];
+ let mut min_values: Vec<Precision<ScalarValue>> =
+ vec![Precision::Absent; file_schema.fields().len()];
+ let mut num_rows: Precision<usize> = Precision::Absent;
+ let mut total_byte_size: Precision<usize> = Precision::Absent;
- let mut null_counts = vec![0; file_schema.fields().len()];
- let mut has_statistics = false;
- let (mut max_values, mut min_values) = create_max_min_accs(&file_schema);
-
- let mut is_exact = true;
-
- // The number of rows and the total byte size can be calculated as long as
- // at least one file has them. If none of the files provide them, then they
- // will be omitted from the statistics. The missing values will be counted
- // as zero.
- let mut num_rows = None;
- let mut total_byte_size = None;
-
- // fusing the stream allows us to call next safely even once it is finished
+ // Fusing the stream allows us to call next safely even once it is
finished.
let mut all_files = Box::pin(all_files.fuse());
- while let Some(res) = all_files.next().await {
- let (file, file_stats) = res?;
+
+ if let Some(first_file) = all_files.next().await {
+ let (file, file_stats) = first_file?;
result_files.push(file);
- is_exact &= file_stats.is_exact;
- num_rows = if let Some(num_rows) = num_rows {
- Some(num_rows + file_stats.num_rows.unwrap_or(0))
- } else {
- file_stats.num_rows
- };
- total_byte_size = if let Some(total_byte_size) = total_byte_size {
- Some(total_byte_size + file_stats.total_byte_size.unwrap_or(0))
- } else {
- file_stats.total_byte_size
- };
- if let Some(vec) = &file_stats.column_statistics {
- has_statistics = true;
- for (i, cs) in vec.iter().enumerate() {
- null_counts[i] += cs.null_count.unwrap_or(0);
-
- if let Some(max_value) = &mut max_values[i] {
- if let Some(file_max) = cs.max_value.clone() {
- match max_value.update_batch(&[file_max.to_array()]) {
- Ok(_) => {}
- Err(_) => {
- max_values[i] = None;
- }
- }
- } else {
- max_values[i] = None;
- }
- }
- if let Some(min_value) = &mut min_values[i] {
- if let Some(file_min) = cs.min_value.clone() {
- match min_value.update_batch(&[file_min.to_array()]) {
- Ok(_) => {}
- Err(_) => {
- min_values[i] = None;
- }
- }
- } else {
- min_values[i] = None;
- }
- }
- }
- }
+ // First file, we set them directly from the file statistics.
+ set_from_file_statistics(
+ &mut num_rows,
+ &mut total_byte_size,
+ &mut null_counts,
+ &mut max_values,
+ &mut min_values,
+ file_stats,
+ );
// If the number of rows exceeds the limit, we can stop processing
// files. This only applies when we know the number of rows. It also
// currently ignores tables that have no statistics regarding the
// number of rows.
- if num_rows.unwrap_or(usize::MIN) > limit.unwrap_or(usize::MAX) {
- break;
- }
- }
- // if we still have files in the stream, it means that the limit kicked
- // in and that the statistic could have been different if we processed
- // the files in a different order.
- if all_files.next().await.is_some() {
- is_exact = false;
- }
+ if num_rows.get_value().unwrap_or(&usize::MIN) <=
&limit.unwrap_or(usize::MAX) {
+ while let Some(current) = all_files.next().await {
+ let (file, file_stats) = current?;
+ result_files.push(file);
- let column_stats = if has_statistics {
- Some(get_col_stats(
- &file_schema,
- null_counts,
- &mut max_values,
- &mut min_values,
- ))
- } else {
- None
+ // Number of rows, total byte size and null counts are added
for each file.
Review Comment:
This logic now looks really nice
##########
datafusion/core/src/datasource/statistics.rs:
##########
@@ -33,101 +36,95 @@ pub async fn get_statistics_with_limit(
limit: Option<usize>,
) -> Result<(Vec<PartitionedFile>, Statistics)> {
let mut result_files = vec![];
+ // These statistics can be calculated as long as at least one file has
them.
Review Comment:
Another pattern, if we wanted to encapsulate this code / make it easier to
unit test could be to put these fields in a "builder" style pattern. This way
is fine too I just figured I would point it out
```rust
let builder = FileStatisticsBuilder::new(file_schema);
...
if let Some(first_file) = all_files.next().await {
let (file, file_stats) = first_file?;
result_files.push(file);
// First file, we set them directly from the file statistics.
builder.add_file(
file_stats,
);
...
let mut statistics: Statistics = builder.build()
...
```
##########
datafusion/common/src/stats.rs:
##########
@@ -203,70 +204,94 @@ impl<T: fmt::Debug + Clone + PartialEq + Eq + PartialOrd>
Display for Precision<
/// Fields are optional and can be inexact because the sources
/// sometimes provide approximate estimates for performance reasons
/// and the transformations output are not always predictable.
-#[derive(Debug, Clone, Default, PartialEq, Eq)]
+#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Statistics {
/// The number of table rows
- pub num_rows: Option<usize>,
+ pub num_rows: Precision<usize>,
/// total bytes of the table rows
- pub total_byte_size: Option<usize>,
+ pub total_byte_size: Precision<usize>,
/// Statistics on a column level
- pub column_statistics: Option<Vec<ColumnStatistics>>,
- /// If true, any field that is `Some(..)` is the actual value in the data
provided by the operator (it is not
- /// an estimate). Any or all other fields might still be None, in which
case no information is known.
- /// if false, any field that is `Some(..)` may contain an inexact estimate
and may not be the actual value.
- pub is_exact: bool,
+ pub column_statistics: Vec<ColumnStatistics>,
Review Comment:
A private constructor might be nice, but maybe we can do that as a follow on
PR
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]