This is an automated email from the ASF dual-hosted git repository.
liukun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 58fc80edda [minior fix]: adjust the projection statistics (#7428)
58fc80edda is described below
commit 58fc80eddaf93d4c6399e4ac4fc1234649b26e43
Author: Kun Liu <[email protected]>
AuthorDate: Thu Aug 31 17:04:46 2023 +0800
[minior fix]: adjust the projection statistics (#7428)
* adjust the projection statistics
* update the sql test case
* fix clippy
---
datafusion/core/src/physical_plan/projection.rs | 102 ++++++++++++++++++++----
datafusion/sqllogictest/test_files/joins.slt | 68 +++++++++-------
datafusion/sqllogictest/test_files/subquery.slt | 46 ++++++-----
3 files changed, 149 insertions(+), 67 deletions(-)
diff --git a/datafusion/core/src/physical_plan/projection.rs
b/datafusion/core/src/physical_plan/projection.rs
index f7f8b0f452..12c89eee19 100644
--- a/datafusion/core/src/physical_plan/projection.rs
+++ b/datafusion/core/src/physical_plan/projection.rs
@@ -336,6 +336,7 @@ impl ExecutionPlan for ProjectionExec {
stats_projection(
self.input.statistics(),
self.expr.iter().map(|(e, _)| Arc::clone(e)),
+ self.schema.clone(),
)
}
}
@@ -395,9 +396,13 @@ fn get_field_metadata(
fn stats_projection(
stats: Statistics,
exprs: impl Iterator<Item = Arc<dyn PhysicalExpr>>,
+ schema: SchemaRef,
) -> Statistics {
+ let inner_exprs = exprs.collect::<Vec<_>>();
let column_statistics = stats.column_statistics.map(|input_col_stats| {
- exprs
+ inner_exprs
+ .clone()
+ .into_iter()
.map(|e| {
if let Some(col) = e.as_any().downcast_ref::<Column>() {
input_col_stats[col.index()].clone()
@@ -410,12 +415,35 @@ fn stats_projection(
.collect()
});
- Statistics {
- is_exact: stats.is_exact,
- num_rows: stats.num_rows,
- column_statistics,
- // TODO stats: knowing the type of the new columns we can guess the
output size
- total_byte_size: None,
+ let primitive_row_size = inner_exprs
+ .into_iter()
+ .map(|e| match e.data_type(schema.as_ref()) {
+ Ok(data_type) => data_type.primitive_width(),
+ Err(_) => None,
+ })
+ .try_fold(0usize, |init, v| v.map(|value| init + value));
+
+ match (primitive_row_size, stats.num_rows) {
+ (Some(row_size), Some(row_count)) => {
+ Statistics {
+ is_exact: stats.is_exact,
+ num_rows: stats.num_rows,
+ column_statistics,
+ // Use the row_size * row_count as the total byte size
+ total_byte_size: Some(row_size * row_count),
+ }
+ }
+ _ => {
+ Statistics {
+ is_exact: stats.is_exact,
+ num_rows: stats.num_rows,
+ column_statistics,
+ // TODO stats: knowing the type of the new columns we can
guess the output size
+ // If we can't get the exact statistics for the project
+ // Before we get the exact result, we just use the child status
+ total_byte_size: stats.total_byte_size,
+ }
+ }
}
}
@@ -479,12 +507,12 @@ impl RecordBatchStream for ProjectionStream {
#[cfg(test)]
mod tests {
-
use super::*;
use crate::physical_plan::common::collect;
use crate::physical_plan::expressions::{self, col};
use crate::test::{self};
use crate::test_util;
+ use arrow_schema::DataType;
use datafusion_common::ScalarValue;
use datafusion_expr::Operator;
use datafusion_physical_expr::expressions::binary;
@@ -591,9 +619,8 @@ mod tests {
Ok(())
}
- #[tokio::test]
- async fn test_stats_projection_columns_only() {
- let source = Statistics {
+ fn get_stats() -> Statistics {
+ Statistics {
is_exact: true,
num_rows: Some(5),
total_byte_size: Some(23),
@@ -617,19 +644,31 @@ mod tests {
null_count: None,
},
]),
- };
+ }
+ }
+
+ fn get_schema() -> Schema {
+ let field_0 = Field::new("col0", DataType::Int64, false);
+ let field_1 = Field::new("col1", DataType::Utf8, false);
+ let field_2 = Field::new("col2", DataType::Float32, false);
+ Schema::new(vec![field_0, field_1, field_2])
+ }
+ #[tokio::test]
+ async fn test_stats_projection_columns_only() {
+ let source = get_stats();
+ let schema = get_schema();
let exprs: Vec<Arc<dyn PhysicalExpr>> = vec![
Arc::new(expressions::Column::new("col1", 1)),
Arc::new(expressions::Column::new("col0", 0)),
];
- let result = stats_projection(source, exprs.into_iter());
+ let result = stats_projection(source, exprs.into_iter(),
Arc::new(schema));
let expected = Statistics {
is_exact: true,
num_rows: Some(5),
- total_byte_size: None,
+ total_byte_size: Some(23),
column_statistics: Some(vec![
ColumnStatistics {
distinct_count: Some(1),
@@ -648,4 +687,39 @@ mod tests {
assert_eq!(result, expected);
}
+
+ #[tokio::test]
+ async fn test_stats_projection_column_with_primitive_width_only() {
+ let source = get_stats();
+ let schema = get_schema();
+
+ let exprs: Vec<Arc<dyn PhysicalExpr>> = vec![
+ Arc::new(expressions::Column::new("col2", 2)),
+ Arc::new(expressions::Column::new("col0", 0)),
+ ];
+
+ let result = stats_projection(source, exprs.into_iter(),
Arc::new(schema));
+
+ let expected = Statistics {
+ is_exact: true,
+ num_rows: Some(5),
+ total_byte_size: Some(60),
+ column_statistics: Some(vec![
+ ColumnStatistics {
+ distinct_count: None,
+ max_value: Some(ScalarValue::Float32(Some(1.1))),
+ min_value: Some(ScalarValue::Float32(Some(0.1))),
+ null_count: None,
+ },
+ ColumnStatistics {
+ distinct_count: Some(5),
+ max_value: Some(ScalarValue::Int64(Some(21))),
+ min_value: Some(ScalarValue::Int64(Some(-4))),
+ null_count: Some(0),
+ },
+ ]),
+ };
+
+ assert_eq!(result, expected);
+ }
}
diff --git a/datafusion/sqllogictest/test_files/joins.slt
b/datafusion/sqllogictest/test_files/joins.slt
index 5bc36e4c6d..cd93e51951 100644
--- a/datafusion/sqllogictest/test_files/joins.slt
+++ b/datafusion/sqllogictest/test_files/joins.slt
@@ -1500,15 +1500,16 @@ Projection: join_t1.t1_id, join_t2.t2_id,
join_t1.t1_name
physical_plan
ProjectionExec: expr=[t1_id@0 as t1_id, t2_id@2 as t2_id, t1_name@1 as t1_name]
--ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t2_id@3 as
t2_id]
-----CoalesceBatchesExec: target_batch_size=4096
-------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(join_t1.t1_id +
UInt32(12)@2, join_t2.t2_id + UInt32(1)@1)]
---------CoalescePartitionsExec
+----ProjectionExec: expr=[t1_id@2 as t1_id, t1_name@3 as t1_name,
join_t1.t1_id + UInt32(12)@4 as join_t1.t1_id + UInt32(12), t2_id@0 as t2_id,
join_t2.t2_id + UInt32(1)@1 as join_t2.t2_id + UInt32(1)]
+------CoalesceBatchesExec: target_batch_size=4096
+--------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(join_t2.t2_id +
UInt32(1)@1, join_t1.t1_id + UInt32(12)@2)]
+----------CoalescePartitionsExec
+------------ProjectionExec: expr=[t2_id@0 as t2_id, t2_id@0 + 1 as
join_t2.t2_id + UInt32(1)]
+--------------RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
+----------------MemoryExec: partitions=1, partition_sizes=[1]
----------ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name,
t1_id@0 + 12 as join_t1.t1_id + UInt32(12)]
------------RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
--------------MemoryExec: partitions=1, partition_sizes=[1]
---------ProjectionExec: expr=[t2_id@0 as t2_id, t2_id@0 + 1 as join_t2.t2_id +
UInt32(1)]
-----------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
-------------MemoryExec: partitions=1, partition_sizes=[1]
statement ok
set datafusion.optimizer.repartition_joins = true;
@@ -1527,18 +1528,19 @@ Projection: join_t1.t1_id, join_t2.t2_id,
join_t1.t1_name
physical_plan
ProjectionExec: expr=[t1_id@0 as t1_id, t2_id@2 as t2_id, t1_name@1 as t1_name]
--ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t2_id@3 as
t2_id]
-----CoalesceBatchesExec: target_batch_size=4096
-------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(join_t1.t1_id +
UInt32(12)@2, join_t2.t2_id + UInt32(1)@1)]
---------CoalesceBatchesExec: target_batch_size=4096
-----------RepartitionExec: partitioning=Hash([join_t1.t1_id + UInt32(12)@2],
2), input_partitions=2
-------------ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name,
t1_id@0 + 12 as join_t1.t1_id + UInt32(12)]
---------------RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
-----------------MemoryExec: partitions=1, partition_sizes=[1]
---------CoalesceBatchesExec: target_batch_size=4096
-----------RepartitionExec: partitioning=Hash([join_t2.t2_id + UInt32(1)@1],
2), input_partitions=2
-------------ProjectionExec: expr=[t2_id@0 as t2_id, t2_id@0 + 1 as
join_t2.t2_id + UInt32(1)]
---------------RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
-----------------MemoryExec: partitions=1, partition_sizes=[1]
+----ProjectionExec: expr=[t1_id@2 as t1_id, t1_name@3 as t1_name,
join_t1.t1_id + UInt32(12)@4 as join_t1.t1_id + UInt32(12), t2_id@0 as t2_id,
join_t2.t2_id + UInt32(1)@1 as join_t2.t2_id + UInt32(1)]
+------CoalesceBatchesExec: target_batch_size=4096
+--------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(join_t2.t2_id +
UInt32(1)@1, join_t1.t1_id + UInt32(12)@2)]
+----------CoalesceBatchesExec: target_batch_size=4096
+------------RepartitionExec: partitioning=Hash([join_t2.t2_id + UInt32(1)@1],
2), input_partitions=2
+--------------ProjectionExec: expr=[t2_id@0 as t2_id, t2_id@0 + 1 as
join_t2.t2_id + UInt32(1)]
+----------------RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
+------------------MemoryExec: partitions=1, partition_sizes=[1]
+----------CoalesceBatchesExec: target_batch_size=4096
+------------RepartitionExec: partitioning=Hash([join_t1.t1_id + UInt32(12)@2],
2), input_partitions=2
+--------------ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name,
t1_id@0 + 12 as join_t1.t1_id + UInt32(12)]
+----------------RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
+------------------MemoryExec: partitions=1, partition_sizes=[1]
# Left side expr key inner join
@@ -1619,10 +1621,13 @@ Projection: join_t1.t1_id, join_t2.t2_id,
join_t1.t1_name
physical_plan
ProjectionExec: expr=[t1_id@0 as t1_id, t2_id@2 as t2_id, t1_name@1 as t1_name]
--ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t2_id@2 as
t2_id]
-----CoalesceBatchesExec: target_batch_size=4096
-------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(t1_id@0,
join_t2.t2_id - UInt32(11)@1)]
---------MemoryExec: partitions=1, partition_sizes=[1]
---------ProjectionExec: expr=[t2_id@0 as t2_id, t2_id@0 - 11 as join_t2.t2_id
- UInt32(11)]
+----ProjectionExec: expr=[t1_id@2 as t1_id, t1_name@3 as t1_name, t2_id@0 as
t2_id, join_t2.t2_id - UInt32(11)@1 as join_t2.t2_id - UInt32(11)]
+------CoalesceBatchesExec: target_batch_size=4096
+--------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(join_t2.t2_id -
UInt32(11)@1, t1_id@0)]
+----------CoalescePartitionsExec
+------------ProjectionExec: expr=[t2_id@0 as t2_id, t2_id@0 - 11 as
join_t2.t2_id - UInt32(11)]
+--------------RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
+----------------MemoryExec: partitions=1, partition_sizes=[1]
----------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
------------MemoryExec: partitions=1, partition_sizes=[1]
@@ -1644,15 +1649,16 @@ Projection: join_t1.t1_id, join_t2.t2_id,
join_t1.t1_name
physical_plan
ProjectionExec: expr=[t1_id@0 as t1_id, t2_id@2 as t2_id, t1_name@1 as t1_name]
--ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t2_id@2 as
t2_id]
-----CoalesceBatchesExec: target_batch_size=4096
-------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(t1_id@0,
join_t2.t2_id - UInt32(11)@1)]
---------CoalesceBatchesExec: target_batch_size=4096
-----------RepartitionExec: partitioning=Hash([t1_id@0], 2), input_partitions=2
-------------RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
---------------MemoryExec: partitions=1, partition_sizes=[1]
---------CoalesceBatchesExec: target_batch_size=4096
-----------RepartitionExec: partitioning=Hash([join_t2.t2_id - UInt32(11)@1],
2), input_partitions=2
-------------ProjectionExec: expr=[t2_id@0 as t2_id, t2_id@0 - 11 as
join_t2.t2_id - UInt32(11)]
+----ProjectionExec: expr=[t1_id@2 as t1_id, t1_name@3 as t1_name, t2_id@0 as
t2_id, join_t2.t2_id - UInt32(11)@1 as join_t2.t2_id - UInt32(11)]
+------CoalesceBatchesExec: target_batch_size=4096
+--------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(join_t2.t2_id -
UInt32(11)@1, t1_id@0)]
+----------CoalesceBatchesExec: target_batch_size=4096
+------------RepartitionExec: partitioning=Hash([join_t2.t2_id - UInt32(11)@1],
2), input_partitions=2
+--------------ProjectionExec: expr=[t2_id@0 as t2_id, t2_id@0 - 11 as
join_t2.t2_id - UInt32(11)]
+----------------RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
+------------------MemoryExec: partitions=1, partition_sizes=[1]
+----------CoalesceBatchesExec: target_batch_size=4096
+------------RepartitionExec: partitioning=Hash([t1_id@0], 2),
input_partitions=2
--------------RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
----------------MemoryExec: partitions=1, partition_sizes=[1]
diff --git a/datafusion/sqllogictest/test_files/subquery.slt
b/datafusion/sqllogictest/test_files/subquery.slt
index d88291b56f..fe074da1bb 100644
--- a/datafusion/sqllogictest/test_files/subquery.slt
+++ b/datafusion/sqllogictest/test_files/subquery.slt
@@ -177,17 +177,18 @@ Projection: t1.t1_id, __scalar_sq_1.SUM(t2.t2_int) AS
t2_sum
----------TableScan: t2 projection=[t2_id, t2_int]
physical_plan
ProjectionExec: expr=[t1_id@0 as t1_id, SUM(t2.t2_int)@1 as t2_sum]
---CoalesceBatchesExec: target_batch_size=8192
-----HashJoinExec: mode=Partitioned, join_type=Left, on=[(t1_id@0, t2_id@1)]
-------CoalesceBatchesExec: target_batch_size=8192
---------RepartitionExec: partitioning=Hash([t1_id@0], 4), input_partitions=4
-----------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
-------ProjectionExec: expr=[SUM(t2.t2_int)@1 as SUM(t2.t2_int), t2_id@0 as
t2_id]
---------AggregateExec: mode=FinalPartitioned, gby=[t2_id@0 as t2_id],
aggr=[SUM(t2.t2_int)]
-----------CoalesceBatchesExec: target_batch_size=8192
-------------RepartitionExec: partitioning=Hash([t2_id@0], 4),
input_partitions=4
---------------AggregateExec: mode=Partial, gby=[t2_id@0 as t2_id],
aggr=[SUM(t2.t2_int)]
-----------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
+--ProjectionExec: expr=[t1_id@2 as t1_id, SUM(t2.t2_int)@0 as SUM(t2.t2_int),
t2_id@1 as t2_id]
+----CoalesceBatchesExec: target_batch_size=8192
+------HashJoinExec: mode=Partitioned, join_type=Right, on=[(t2_id@1, t1_id@0)]
+--------ProjectionExec: expr=[SUM(t2.t2_int)@1 as SUM(t2.t2_int), t2_id@0 as
t2_id]
+----------AggregateExec: mode=FinalPartitioned, gby=[t2_id@0 as t2_id],
aggr=[SUM(t2.t2_int)]
+------------CoalesceBatchesExec: target_batch_size=8192
+--------------RepartitionExec: partitioning=Hash([t2_id@0], 4),
input_partitions=4
+----------------AggregateExec: mode=Partial, gby=[t2_id@0 as t2_id],
aggr=[SUM(t2.t2_int)]
+------------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
+--------CoalesceBatchesExec: target_batch_size=8192
+----------RepartitionExec: partitioning=Hash([t1_id@0], 4), input_partitions=4
+------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
query II rowsort
SELECT t1_id, (SELECT sum(t2_int) FROM t2 WHERE t2.t2_id = t1.t1_id) as t2_sum
from t1
@@ -211,17 +212,18 @@ Projection: t1.t1_id, __scalar_sq_1.SUM(t2.t2_int *
Float64(1)) + Int64(1) AS t2
----------TableScan: t2 projection=[t2_id, t2_int]
physical_plan
ProjectionExec: expr=[t1_id@0 as t1_id, SUM(t2.t2_int * Float64(1)) +
Int64(1)@1 as t2_sum]
---CoalesceBatchesExec: target_batch_size=8192
-----HashJoinExec: mode=Partitioned, join_type=Left, on=[(t1_id@0, t2_id@1)]
-------CoalesceBatchesExec: target_batch_size=8192
---------RepartitionExec: partitioning=Hash([t1_id@0], 4), input_partitions=4
-----------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
-------ProjectionExec: expr=[SUM(t2.t2_int * Float64(1))@1 + 1 as SUM(t2.t2_int
* Float64(1)) + Int64(1), t2_id@0 as t2_id]
---------AggregateExec: mode=FinalPartitioned, gby=[t2_id@0 as t2_id],
aggr=[SUM(t2.t2_int * Float64(1))]
-----------CoalesceBatchesExec: target_batch_size=8192
-------------RepartitionExec: partitioning=Hash([t2_id@0], 4),
input_partitions=4
---------------AggregateExec: mode=Partial, gby=[t2_id@0 as t2_id],
aggr=[SUM(t2.t2_int * Float64(1))]
-----------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
+--ProjectionExec: expr=[t1_id@2 as t1_id, SUM(t2.t2_int * Float64(1)) +
Int64(1)@0 as SUM(t2.t2_int * Float64(1)) + Int64(1), t2_id@1 as t2_id]
+----CoalesceBatchesExec: target_batch_size=8192
+------HashJoinExec: mode=Partitioned, join_type=Right, on=[(t2_id@1, t1_id@0)]
+--------ProjectionExec: expr=[SUM(t2.t2_int * Float64(1))@1 + 1 as
SUM(t2.t2_int * Float64(1)) + Int64(1), t2_id@0 as t2_id]
+----------AggregateExec: mode=FinalPartitioned, gby=[t2_id@0 as t2_id],
aggr=[SUM(t2.t2_int * Float64(1))]
+------------CoalesceBatchesExec: target_batch_size=8192
+--------------RepartitionExec: partitioning=Hash([t2_id@0], 4),
input_partitions=4
+----------------AggregateExec: mode=Partial, gby=[t2_id@0 as t2_id],
aggr=[SUM(t2.t2_int * Float64(1))]
+------------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
+--------CoalesceBatchesExec: target_batch_size=8192
+----------RepartitionExec: partitioning=Hash([t1_id@0], 4), input_partitions=4
+------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
query IR rowsort
SELECT t1_id, (SELECT sum(t2_int * 1.0) + 1 FROM t2 WHERE t2.t2_id = t1.t1_id)
as t2_sum from t1