This is an automated email from the ASF dual-hosted git repository.

liukun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 58fc80edda [minior fix]: adjust the projection statistics (#7428)
58fc80edda is described below

commit 58fc80eddaf93d4c6399e4ac4fc1234649b26e43
Author: Kun Liu <[email protected]>
AuthorDate: Thu Aug 31 17:04:46 2023 +0800

    [minior fix]: adjust the projection statistics (#7428)
    
    * adjust the projection statistics
    
    * update the sql test case
    
    * fix clippy
---
 datafusion/core/src/physical_plan/projection.rs | 102 ++++++++++++++++++++----
 datafusion/sqllogictest/test_files/joins.slt    |  68 +++++++++-------
 datafusion/sqllogictest/test_files/subquery.slt |  46 ++++++-----
 3 files changed, 149 insertions(+), 67 deletions(-)

diff --git a/datafusion/core/src/physical_plan/projection.rs 
b/datafusion/core/src/physical_plan/projection.rs
index f7f8b0f452..12c89eee19 100644
--- a/datafusion/core/src/physical_plan/projection.rs
+++ b/datafusion/core/src/physical_plan/projection.rs
@@ -336,6 +336,7 @@ impl ExecutionPlan for ProjectionExec {
         stats_projection(
             self.input.statistics(),
             self.expr.iter().map(|(e, _)| Arc::clone(e)),
+            self.schema.clone(),
         )
     }
 }
@@ -395,9 +396,13 @@ fn get_field_metadata(
 fn stats_projection(
     stats: Statistics,
     exprs: impl Iterator<Item = Arc<dyn PhysicalExpr>>,
+    schema: SchemaRef,
 ) -> Statistics {
+    let inner_exprs = exprs.collect::<Vec<_>>();
     let column_statistics = stats.column_statistics.map(|input_col_stats| {
-        exprs
+        inner_exprs
+            .clone()
+            .into_iter()
             .map(|e| {
                 if let Some(col) = e.as_any().downcast_ref::<Column>() {
                     input_col_stats[col.index()].clone()
@@ -410,12 +415,35 @@ fn stats_projection(
             .collect()
     });
 
-    Statistics {
-        is_exact: stats.is_exact,
-        num_rows: stats.num_rows,
-        column_statistics,
-        // TODO stats: knowing the type of the new columns we can guess the 
output size
-        total_byte_size: None,
+    let primitive_row_size = inner_exprs
+        .into_iter()
+        .map(|e| match e.data_type(schema.as_ref()) {
+            Ok(data_type) => data_type.primitive_width(),
+            Err(_) => None,
+        })
+        .try_fold(0usize, |init, v| v.map(|value| init + value));
+
+    match (primitive_row_size, stats.num_rows) {
+        (Some(row_size), Some(row_count)) => {
+            Statistics {
+                is_exact: stats.is_exact,
+                num_rows: stats.num_rows,
+                column_statistics,
+                // Use the row_size * row_count as the total byte size
+                total_byte_size: Some(row_size * row_count),
+            }
+        }
+        _ => {
+            Statistics {
+                is_exact: stats.is_exact,
+                num_rows: stats.num_rows,
+                column_statistics,
+                // TODO stats: knowing the type of the new columns we can 
guess the output size
+                // If we can't get the exact statistics for the project
+                // Before we get the exact result, we just use the child status
+                total_byte_size: stats.total_byte_size,
+            }
+        }
     }
 }
 
@@ -479,12 +507,12 @@ impl RecordBatchStream for ProjectionStream {
 
 #[cfg(test)]
 mod tests {
-
     use super::*;
     use crate::physical_plan::common::collect;
     use crate::physical_plan::expressions::{self, col};
     use crate::test::{self};
     use crate::test_util;
+    use arrow_schema::DataType;
     use datafusion_common::ScalarValue;
     use datafusion_expr::Operator;
     use datafusion_physical_expr::expressions::binary;
@@ -591,9 +619,8 @@ mod tests {
         Ok(())
     }
 
-    #[tokio::test]
-    async fn test_stats_projection_columns_only() {
-        let source = Statistics {
+    fn get_stats() -> Statistics {
+        Statistics {
             is_exact: true,
             num_rows: Some(5),
             total_byte_size: Some(23),
@@ -617,19 +644,31 @@ mod tests {
                     null_count: None,
                 },
             ]),
-        };
+        }
+    }
+
+    fn get_schema() -> Schema {
+        let field_0 = Field::new("col0", DataType::Int64, false);
+        let field_1 = Field::new("col1", DataType::Utf8, false);
+        let field_2 = Field::new("col2", DataType::Float32, false);
+        Schema::new(vec![field_0, field_1, field_2])
+    }
+    #[tokio::test]
+    async fn test_stats_projection_columns_only() {
+        let source = get_stats();
+        let schema = get_schema();
 
         let exprs: Vec<Arc<dyn PhysicalExpr>> = vec![
             Arc::new(expressions::Column::new("col1", 1)),
             Arc::new(expressions::Column::new("col0", 0)),
         ];
 
-        let result = stats_projection(source, exprs.into_iter());
+        let result = stats_projection(source, exprs.into_iter(), 
Arc::new(schema));
 
         let expected = Statistics {
             is_exact: true,
             num_rows: Some(5),
-            total_byte_size: None,
+            total_byte_size: Some(23),
             column_statistics: Some(vec![
                 ColumnStatistics {
                     distinct_count: Some(1),
@@ -648,4 +687,39 @@ mod tests {
 
         assert_eq!(result, expected);
     }
+
+    #[tokio::test]
+    async fn test_stats_projection_column_with_primitive_width_only() {
+        let source = get_stats();
+        let schema = get_schema();
+
+        let exprs: Vec<Arc<dyn PhysicalExpr>> = vec![
+            Arc::new(expressions::Column::new("col2", 2)),
+            Arc::new(expressions::Column::new("col0", 0)),
+        ];
+
+        let result = stats_projection(source, exprs.into_iter(), 
Arc::new(schema));
+
+        let expected = Statistics {
+            is_exact: true,
+            num_rows: Some(5),
+            total_byte_size: Some(60),
+            column_statistics: Some(vec![
+                ColumnStatistics {
+                    distinct_count: None,
+                    max_value: Some(ScalarValue::Float32(Some(1.1))),
+                    min_value: Some(ScalarValue::Float32(Some(0.1))),
+                    null_count: None,
+                },
+                ColumnStatistics {
+                    distinct_count: Some(5),
+                    max_value: Some(ScalarValue::Int64(Some(21))),
+                    min_value: Some(ScalarValue::Int64(Some(-4))),
+                    null_count: Some(0),
+                },
+            ]),
+        };
+
+        assert_eq!(result, expected);
+    }
 }
diff --git a/datafusion/sqllogictest/test_files/joins.slt 
b/datafusion/sqllogictest/test_files/joins.slt
index 5bc36e4c6d..cd93e51951 100644
--- a/datafusion/sqllogictest/test_files/joins.slt
+++ b/datafusion/sqllogictest/test_files/joins.slt
@@ -1500,15 +1500,16 @@ Projection: join_t1.t1_id, join_t2.t2_id, 
join_t1.t1_name
 physical_plan
 ProjectionExec: expr=[t1_id@0 as t1_id, t2_id@2 as t2_id, t1_name@1 as t1_name]
 --ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t2_id@3 as 
t2_id]
-----CoalesceBatchesExec: target_batch_size=4096
-------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(join_t1.t1_id + 
UInt32(12)@2, join_t2.t2_id + UInt32(1)@1)]
---------CoalescePartitionsExec
+----ProjectionExec: expr=[t1_id@2 as t1_id, t1_name@3 as t1_name, 
join_t1.t1_id + UInt32(12)@4 as join_t1.t1_id + UInt32(12), t2_id@0 as t2_id, 
join_t2.t2_id + UInt32(1)@1 as join_t2.t2_id + UInt32(1)]
+------CoalesceBatchesExec: target_batch_size=4096
+--------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(join_t2.t2_id + 
UInt32(1)@1, join_t1.t1_id + UInt32(12)@2)]
+----------CoalescePartitionsExec
+------------ProjectionExec: expr=[t2_id@0 as t2_id, t2_id@0 + 1 as 
join_t2.t2_id + UInt32(1)]
+--------------RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
+----------------MemoryExec: partitions=1, partition_sizes=[1]
 ----------ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, 
t1_id@0 + 12 as join_t1.t1_id + UInt32(12)]
 ------------RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
 --------------MemoryExec: partitions=1, partition_sizes=[1]
---------ProjectionExec: expr=[t2_id@0 as t2_id, t2_id@0 + 1 as join_t2.t2_id + 
UInt32(1)]
-----------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
-------------MemoryExec: partitions=1, partition_sizes=[1]
 
 statement ok
 set datafusion.optimizer.repartition_joins = true;
@@ -1527,18 +1528,19 @@ Projection: join_t1.t1_id, join_t2.t2_id, 
join_t1.t1_name
 physical_plan
 ProjectionExec: expr=[t1_id@0 as t1_id, t2_id@2 as t2_id, t1_name@1 as t1_name]
 --ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t2_id@3 as 
t2_id]
-----CoalesceBatchesExec: target_batch_size=4096
-------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(join_t1.t1_id + 
UInt32(12)@2, join_t2.t2_id + UInt32(1)@1)]
---------CoalesceBatchesExec: target_batch_size=4096
-----------RepartitionExec: partitioning=Hash([join_t1.t1_id + UInt32(12)@2], 
2), input_partitions=2
-------------ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, 
t1_id@0 + 12 as join_t1.t1_id + UInt32(12)]
---------------RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
-----------------MemoryExec: partitions=1, partition_sizes=[1]
---------CoalesceBatchesExec: target_batch_size=4096
-----------RepartitionExec: partitioning=Hash([join_t2.t2_id + UInt32(1)@1], 
2), input_partitions=2
-------------ProjectionExec: expr=[t2_id@0 as t2_id, t2_id@0 + 1 as 
join_t2.t2_id + UInt32(1)]
---------------RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
-----------------MemoryExec: partitions=1, partition_sizes=[1]
+----ProjectionExec: expr=[t1_id@2 as t1_id, t1_name@3 as t1_name, 
join_t1.t1_id + UInt32(12)@4 as join_t1.t1_id + UInt32(12), t2_id@0 as t2_id, 
join_t2.t2_id + UInt32(1)@1 as join_t2.t2_id + UInt32(1)]
+------CoalesceBatchesExec: target_batch_size=4096
+--------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(join_t2.t2_id + 
UInt32(1)@1, join_t1.t1_id + UInt32(12)@2)]
+----------CoalesceBatchesExec: target_batch_size=4096
+------------RepartitionExec: partitioning=Hash([join_t2.t2_id + UInt32(1)@1], 
2), input_partitions=2
+--------------ProjectionExec: expr=[t2_id@0 as t2_id, t2_id@0 + 1 as 
join_t2.t2_id + UInt32(1)]
+----------------RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
+------------------MemoryExec: partitions=1, partition_sizes=[1]
+----------CoalesceBatchesExec: target_batch_size=4096
+------------RepartitionExec: partitioning=Hash([join_t1.t1_id + UInt32(12)@2], 
2), input_partitions=2
+--------------ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, 
t1_id@0 + 12 as join_t1.t1_id + UInt32(12)]
+----------------RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
+------------------MemoryExec: partitions=1, partition_sizes=[1]
 
 # Left side expr key inner join
 
@@ -1619,10 +1621,13 @@ Projection: join_t1.t1_id, join_t2.t2_id, 
join_t1.t1_name
 physical_plan
 ProjectionExec: expr=[t1_id@0 as t1_id, t2_id@2 as t2_id, t1_name@1 as t1_name]
 --ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t2_id@2 as 
t2_id]
-----CoalesceBatchesExec: target_batch_size=4096
-------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(t1_id@0, 
join_t2.t2_id - UInt32(11)@1)]
---------MemoryExec: partitions=1, partition_sizes=[1]
---------ProjectionExec: expr=[t2_id@0 as t2_id, t2_id@0 - 11 as join_t2.t2_id 
- UInt32(11)]
+----ProjectionExec: expr=[t1_id@2 as t1_id, t1_name@3 as t1_name, t2_id@0 as 
t2_id, join_t2.t2_id - UInt32(11)@1 as join_t2.t2_id - UInt32(11)]
+------CoalesceBatchesExec: target_batch_size=4096
+--------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(join_t2.t2_id - 
UInt32(11)@1, t1_id@0)]
+----------CoalescePartitionsExec
+------------ProjectionExec: expr=[t2_id@0 as t2_id, t2_id@0 - 11 as 
join_t2.t2_id - UInt32(11)]
+--------------RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
+----------------MemoryExec: partitions=1, partition_sizes=[1]
 ----------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
 ------------MemoryExec: partitions=1, partition_sizes=[1]
 
@@ -1644,15 +1649,16 @@ Projection: join_t1.t1_id, join_t2.t2_id, 
join_t1.t1_name
 physical_plan
 ProjectionExec: expr=[t1_id@0 as t1_id, t2_id@2 as t2_id, t1_name@1 as t1_name]
 --ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t2_id@2 as 
t2_id]
-----CoalesceBatchesExec: target_batch_size=4096
-------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(t1_id@0, 
join_t2.t2_id - UInt32(11)@1)]
---------CoalesceBatchesExec: target_batch_size=4096
-----------RepartitionExec: partitioning=Hash([t1_id@0], 2), input_partitions=2
-------------RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
---------------MemoryExec: partitions=1, partition_sizes=[1]
---------CoalesceBatchesExec: target_batch_size=4096
-----------RepartitionExec: partitioning=Hash([join_t2.t2_id - UInt32(11)@1], 
2), input_partitions=2
-------------ProjectionExec: expr=[t2_id@0 as t2_id, t2_id@0 - 11 as 
join_t2.t2_id - UInt32(11)]
+----ProjectionExec: expr=[t1_id@2 as t1_id, t1_name@3 as t1_name, t2_id@0 as 
t2_id, join_t2.t2_id - UInt32(11)@1 as join_t2.t2_id - UInt32(11)]
+------CoalesceBatchesExec: target_batch_size=4096
+--------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(join_t2.t2_id - 
UInt32(11)@1, t1_id@0)]
+----------CoalesceBatchesExec: target_batch_size=4096
+------------RepartitionExec: partitioning=Hash([join_t2.t2_id - UInt32(11)@1], 
2), input_partitions=2
+--------------ProjectionExec: expr=[t2_id@0 as t2_id, t2_id@0 - 11 as 
join_t2.t2_id - UInt32(11)]
+----------------RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
+------------------MemoryExec: partitions=1, partition_sizes=[1]
+----------CoalesceBatchesExec: target_batch_size=4096
+------------RepartitionExec: partitioning=Hash([t1_id@0], 2), 
input_partitions=2
 --------------RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
 ----------------MemoryExec: partitions=1, partition_sizes=[1]
 
diff --git a/datafusion/sqllogictest/test_files/subquery.slt 
b/datafusion/sqllogictest/test_files/subquery.slt
index d88291b56f..fe074da1bb 100644
--- a/datafusion/sqllogictest/test_files/subquery.slt
+++ b/datafusion/sqllogictest/test_files/subquery.slt
@@ -177,17 +177,18 @@ Projection: t1.t1_id, __scalar_sq_1.SUM(t2.t2_int) AS 
t2_sum
 ----------TableScan: t2 projection=[t2_id, t2_int]
 physical_plan
 ProjectionExec: expr=[t1_id@0 as t1_id, SUM(t2.t2_int)@1 as t2_sum]
---CoalesceBatchesExec: target_batch_size=8192
-----HashJoinExec: mode=Partitioned, join_type=Left, on=[(t1_id@0, t2_id@1)]
-------CoalesceBatchesExec: target_batch_size=8192
---------RepartitionExec: partitioning=Hash([t1_id@0], 4), input_partitions=4
-----------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
-------ProjectionExec: expr=[SUM(t2.t2_int)@1 as SUM(t2.t2_int), t2_id@0 as 
t2_id]
---------AggregateExec: mode=FinalPartitioned, gby=[t2_id@0 as t2_id], 
aggr=[SUM(t2.t2_int)]
-----------CoalesceBatchesExec: target_batch_size=8192
-------------RepartitionExec: partitioning=Hash([t2_id@0], 4), 
input_partitions=4
---------------AggregateExec: mode=Partial, gby=[t2_id@0 as t2_id], 
aggr=[SUM(t2.t2_int)]
-----------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
+--ProjectionExec: expr=[t1_id@2 as t1_id, SUM(t2.t2_int)@0 as SUM(t2.t2_int), 
t2_id@1 as t2_id]
+----CoalesceBatchesExec: target_batch_size=8192
+------HashJoinExec: mode=Partitioned, join_type=Right, on=[(t2_id@1, t1_id@0)]
+--------ProjectionExec: expr=[SUM(t2.t2_int)@1 as SUM(t2.t2_int), t2_id@0 as 
t2_id]
+----------AggregateExec: mode=FinalPartitioned, gby=[t2_id@0 as t2_id], 
aggr=[SUM(t2.t2_int)]
+------------CoalesceBatchesExec: target_batch_size=8192
+--------------RepartitionExec: partitioning=Hash([t2_id@0], 4), 
input_partitions=4
+----------------AggregateExec: mode=Partial, gby=[t2_id@0 as t2_id], 
aggr=[SUM(t2.t2_int)]
+------------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
+--------CoalesceBatchesExec: target_batch_size=8192
+----------RepartitionExec: partitioning=Hash([t1_id@0], 4), input_partitions=4
+------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
 
 query II rowsort
 SELECT t1_id, (SELECT sum(t2_int) FROM t2 WHERE t2.t2_id = t1.t1_id) as t2_sum 
from t1
@@ -211,17 +212,18 @@ Projection: t1.t1_id, __scalar_sq_1.SUM(t2.t2_int * 
Float64(1)) + Int64(1) AS t2
 ----------TableScan: t2 projection=[t2_id, t2_int]
 physical_plan
 ProjectionExec: expr=[t1_id@0 as t1_id, SUM(t2.t2_int * Float64(1)) + 
Int64(1)@1 as t2_sum]
---CoalesceBatchesExec: target_batch_size=8192
-----HashJoinExec: mode=Partitioned, join_type=Left, on=[(t1_id@0, t2_id@1)]
-------CoalesceBatchesExec: target_batch_size=8192
---------RepartitionExec: partitioning=Hash([t1_id@0], 4), input_partitions=4
-----------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
-------ProjectionExec: expr=[SUM(t2.t2_int * Float64(1))@1 + 1 as SUM(t2.t2_int 
* Float64(1)) + Int64(1), t2_id@0 as t2_id]
---------AggregateExec: mode=FinalPartitioned, gby=[t2_id@0 as t2_id], 
aggr=[SUM(t2.t2_int * Float64(1))]
-----------CoalesceBatchesExec: target_batch_size=8192
-------------RepartitionExec: partitioning=Hash([t2_id@0], 4), 
input_partitions=4
---------------AggregateExec: mode=Partial, gby=[t2_id@0 as t2_id], 
aggr=[SUM(t2.t2_int * Float64(1))]
-----------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
+--ProjectionExec: expr=[t1_id@2 as t1_id, SUM(t2.t2_int * Float64(1)) + 
Int64(1)@0 as SUM(t2.t2_int * Float64(1)) + Int64(1), t2_id@1 as t2_id]
+----CoalesceBatchesExec: target_batch_size=8192
+------HashJoinExec: mode=Partitioned, join_type=Right, on=[(t2_id@1, t1_id@0)]
+--------ProjectionExec: expr=[SUM(t2.t2_int * Float64(1))@1 + 1 as 
SUM(t2.t2_int * Float64(1)) + Int64(1), t2_id@0 as t2_id]
+----------AggregateExec: mode=FinalPartitioned, gby=[t2_id@0 as t2_id], 
aggr=[SUM(t2.t2_int * Float64(1))]
+------------CoalesceBatchesExec: target_batch_size=8192
+--------------RepartitionExec: partitioning=Hash([t2_id@0], 4), 
input_partitions=4
+----------------AggregateExec: mode=Partial, gby=[t2_id@0 as t2_id], 
aggr=[SUM(t2.t2_int * Float64(1))]
+------------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
+--------CoalesceBatchesExec: target_batch_size=8192
+----------RepartitionExec: partitioning=Hash([t1_id@0], 4), input_partitions=4
+------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
 
 query IR rowsort
 SELECT t1_id, (SELECT sum(t2_int * 1.0) + 1 FROM t2 WHERE t2.t2_id = t1.t1_id) 
as t2_sum from t1

Reply via email to