This is an automated email from the ASF dual-hosted git repository.
dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 7acd8833cc Add small column on empty projection (#7833)
7acd8833cc is described below
commit 7acd8833cc5d03ba7643d4ae424553c7681ccce8
Author: Christoph Schulze <[email protected]>
AuthorDate: Wed Oct 18 13:06:51 2023 +0200
Add small column on empty projection (#7833)
* Find small column when projection is empty
* clippy
* fix comment
* fix avro.slt test
* use min_by
* clippy
---
datafusion/core/tests/sql/explain_analyze.rs | 2 +-
datafusion/optimizer/src/push_down_projection.rs | 201 ++++++++++++++++++++---
datafusion/sqllogictest/test_files/avro.slt | 4 +-
datafusion/sqllogictest/test_files/json.slt | 4 +-
4 files changed, 179 insertions(+), 32 deletions(-)
diff --git a/datafusion/core/tests/sql/explain_analyze.rs
b/datafusion/core/tests/sql/explain_analyze.rs
index c328f46be7..7238369f83 100644
--- a/datafusion/core/tests/sql/explain_analyze.rs
+++ b/datafusion/core/tests/sql/explain_analyze.rs
@@ -788,7 +788,7 @@ async fn explain_logical_plan_only() {
"logical_plan",
"Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1)) AS COUNT(*)]]\
\n SubqueryAlias: t\
- \n Projection: column1\
+ \n Projection: column2\
\n Values: (Utf8(\"a\"), Int64(1), Int64(100)), (Utf8(\"a\"),
Int64(2), Int64(150))"
]];
assert_eq!(expected, actual);
diff --git a/datafusion/optimizer/src/push_down_projection.rs
b/datafusion/optimizer/src/push_down_projection.rs
index 6db4bb9ba4..839f6b5bb8 100644
--- a/datafusion/optimizer/src/push_down_projection.rs
+++ b/datafusion/optimizer/src/push_down_projection.rs
@@ -23,6 +23,7 @@ use crate::merge_projection::merge_projection;
use crate::optimizer::ApplyOrder;
use crate::push_down_filter::replace_cols_by_name;
use crate::{OptimizerConfig, OptimizerRule};
+use arrow::datatypes::DataType;
use arrow::error::Result as ArrowResult;
use datafusion_common::ScalarValue::UInt8;
use datafusion_common::{
@@ -148,8 +149,10 @@ impl OptimizerRule for PushDownProjection {
{
let mut used_columns: HashSet<Column> = HashSet::new();
if projection_is_empty {
- used_columns
-
.insert(scan.projected_schema.fields()[0].qualified_column());
+ let field =
find_small_field(scan.projected_schema.fields()).ok_or(
+ DataFusionError::Internal("Scan with empty
schema".to_string()),
+ )?;
+ used_columns.insert(field.qualified_column());
push_down_scan(&used_columns, scan, true)?
} else {
for expr in projection.expr.iter() {
@@ -161,10 +164,13 @@ impl OptimizerRule for PushDownProjection {
}
}
LogicalPlan::Values(values) if projection_is_empty => {
- let first_col =
- Expr::Column(values.schema.fields()[0].qualified_column());
+ let field = find_small_field(values.schema.fields()).ok_or(
+ DataFusionError::Internal("Values with empty
schema".to_string()),
+ )?;
+ let column = Expr::Column(field.qualified_column());
+
LogicalPlan::Projection(Projection::try_new(
- vec![first_col],
+ vec![column],
Arc::new(child_plan.clone()),
)?)
}
@@ -423,7 +429,88 @@ pub fn collect_projection_expr(projection: &Projection) ->
HashMap<String, Expr>
.collect::<HashMap<_, _>>()
}
-// Get the projection exprs from columns in the order of the schema
+/// Accumulate the memory size of a data type measured in bits.
+///
+/// Types with a variable size get assigned with a fixed size which is greater
than most
+/// primitive types.
+///
+/// While traversing nested types, `nesting` is incremented on every level.
+fn nested_size(data_type: &DataType, nesting: &mut usize) -> usize {
+ use DataType::*;
+ if data_type.is_primitive() {
+ return data_type.primitive_width().unwrap_or(1) * 8;
+ }
+
+ if data_type.is_nested() {
+ *nesting += 1;
+ }
+
+ match data_type {
+ Null => 0,
+ Boolean => 1,
+ Binary | Utf8 => 128,
+ LargeBinary | LargeUtf8 => 256,
+ FixedSizeBinary(bytes) => (*bytes * 8) as usize,
+ // primitive types
+ Int8
+ | Int16
+ | Int32
+ | Int64
+ | UInt8
+ | UInt16
+ | UInt32
+ | UInt64
+ | Float16
+ | Float32
+ | Float64
+ | Timestamp(_, _)
+ | Date32
+ | Date64
+ | Time32(_)
+ | Time64(_)
+ | Duration(_)
+ | Interval(_)
+ | Dictionary(_, _)
+ | Decimal128(_, _)
+ | Decimal256(_, _) => data_type.primitive_width().unwrap_or(1) * 8,
+ // nested types
+ List(f) => nested_size(f.data_type(), nesting),
+ FixedSizeList(_, s) => (s * 8) as usize,
+ LargeList(f) => nested_size(f.data_type(), nesting),
+ Struct(fields) => fields
+ .iter()
+ .map(|f| nested_size(f.data_type(), nesting))
+ .sum(),
+ Union(fields, _) => fields
+ .iter()
+ .map(|(_, f)| nested_size(f.data_type(), nesting))
+ .sum(),
+ Map(field, _) => nested_size(field.data_type(), nesting),
+ RunEndEncoded(run_ends, values) => {
+ nested_size(run_ends.data_type(), nesting)
+ + nested_size(values.data_type(), nesting)
+ }
+ }
+}
+
+/// Find a field with a presumable small memory footprint based on its data
type's memory size
+/// and the level of nesting.
+fn find_small_field(fields: &[DFField]) -> Option<DFField> {
+ fields
+ .iter()
+ .map(|f| {
+ let nesting = &mut 0;
+ let size = nested_size(f.data_type(), nesting);
+ (*nesting, size)
+ })
+ .enumerate()
+ .min_by(|(_, (nesting_a, size_a)), (_, (nesting_b, size_b))| {
+ nesting_a.cmp(nesting_b).then(size_a.cmp(size_b))
+ })
+ .map(|(i, _)| fields[i].clone())
+}
+
+/// Get the projection exprs from columns in the order of the schema
fn get_expr(columns: &HashSet<Column>, schema: &DFSchemaRef) ->
Result<Vec<Expr>> {
let expr = schema
.fields()
@@ -489,23 +576,14 @@ fn push_down_scan(
.filter_map(ArrowResult::ok)
.collect();
- if projection.is_empty() {
- if has_projection && !schema.fields().is_empty() {
- // Ensure that we are reading at least one column from the table
in case the query
- // does not reference any columns directly such as "SELECT
COUNT(1) FROM table",
- // except when the table is empty (no column)
- projection.insert(0);
- } else {
- // for table scan without projection, we default to return all
columns
- projection = scan
- .source
- .schema()
- .fields()
- .iter()
- .enumerate()
- .map(|(i, _)| i)
- .collect::<BTreeSet<usize>>();
- }
+ if !has_projection && projection.is_empty() {
+ // for table scan without projection, we default to return all columns
+ projection = schema
+ .fields()
+ .iter()
+ .enumerate()
+ .map(|(i, _)| i)
+ .collect::<BTreeSet<usize>>();
}
// Building new projection from BTreeSet
@@ -562,7 +640,7 @@ mod tests {
use crate::optimizer::Optimizer;
use crate::test::*;
use crate::OptimizerContext;
- use arrow::datatypes::{DataType, Field, Schema};
+ use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
use datafusion_common::DFSchema;
use datafusion_expr::builder::table_scan_with_filters;
use datafusion_expr::expr;
@@ -922,7 +1000,7 @@ mod tests {
.project(vec![lit(1_i64), lit(2_i64)])?
.build()?;
let expected = "Projection: Int64(1), Int64(2)\
- \n TableScan: test projection=[a]";
+ \n TableScan: test projection=[]";
assert_optimized_plan_eq(&plan, expected)
}
@@ -969,7 +1047,7 @@ mod tests {
let expected = "\
Projection: Int32(1) AS a\
- \n TableScan: test projection=[a]";
+ \n TableScan: test projection=[]";
assert_optimized_plan_eq(&plan, expected)
}
@@ -998,7 +1076,7 @@ mod tests {
let expected = "\
Projection: Int32(1) AS a\
- \n TableScan: test projection=[a], full_filters=[b = Int32(1)]";
+ \n TableScan: test projection=[], full_filters=[b = Int32(1)]";
assert_optimized_plan_eq(&plan, expected)
}
@@ -1154,4 +1232,73 @@ mod tests {
.unwrap_or(optimized_plan);
Ok(optimized_plan)
}
+
+ #[test]
+ fn test_nested_size() {
+ use DataType::*;
+ let nesting = &mut 0;
+ assert_eq!(nested_size(&Null, nesting), 0);
+ assert_eq!(*nesting, 0);
+ assert_eq!(nested_size(&Boolean, nesting), 1);
+ assert_eq!(*nesting, 0);
+ assert_eq!(nested_size(&UInt8, nesting), 8);
+ assert_eq!(*nesting, 0);
+ assert_eq!(nested_size(&Int64, nesting), 64);
+ assert_eq!(*nesting, 0);
+ assert_eq!(nested_size(&Decimal256(5, 2), nesting), 256);
+ assert_eq!(*nesting, 0);
+ assert_eq!(
+ nested_size(&List(Arc::new(Field::new("A", Int64, true))),
nesting),
+ 64
+ );
+ assert_eq!(*nesting, 1);
+ *nesting = 0;
+ assert_eq!(
+ nested_size(
+ &List(Arc::new(Field::new(
+ "A",
+ List(Arc::new(Field::new("AA", Int64, true))),
+ true
+ ))),
+ nesting
+ ),
+ 64
+ );
+ assert_eq!(*nesting, 2);
+ }
+
+ #[test]
+ fn test_find_small_field() {
+ use DataType::*;
+ let int32 = DFField::from(Field::new("a", Int32, false));
+ let bin = DFField::from(Field::new("b", Binary, false));
+ let list_i64 = DFField::from(Field::new(
+ "c",
+ List(Arc::new(Field::new("c_1", Int64, true))),
+ false,
+ ));
+ let time_s = DFField::from(Field::new("d", Time32(TimeUnit::Second),
false));
+
+ assert_eq!(
+ find_small_field(&[
+ int32.clone(),
+ bin.clone(),
+ list_i64.clone(),
+ time_s.clone()
+ ]),
+ Some(int32.clone())
+ );
+ assert_eq!(
+ find_small_field(&[bin.clone(), list_i64.clone(), time_s.clone()]),
+ Some(time_s.clone())
+ );
+ assert_eq!(
+ find_small_field(&[time_s.clone(), int32.clone()]),
+ Some(time_s.clone())
+ );
+ assert_eq!(
+ find_small_field(&[bin.clone(), list_i64.clone()]),
+ Some(bin.clone())
+ );
+ }
}
diff --git a/datafusion/sqllogictest/test_files/avro.slt
b/datafusion/sqllogictest/test_files/avro.slt
index 5cd268e8ef..bd2ba70666 100644
--- a/datafusion/sqllogictest/test_files/avro.slt
+++ b/datafusion/sqllogictest/test_files/avro.slt
@@ -253,10 +253,10 @@ EXPLAIN SELECT count(*) from alltypes_plain
----
logical_plan
Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1)) AS COUNT(*)]]
---TableScan: alltypes_plain projection=[id]
+--TableScan: alltypes_plain projection=[bool_col]
physical_plan
AggregateExec: mode=Final, gby=[], aggr=[COUNT(*)]
--CoalescePartitionsExec
----AggregateExec: mode=Partial, gby=[], aggr=[COUNT(*)]
------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
---------AvroExec: file_groups={1 group:
[[WORKSPACE_ROOT/testing/data/avro/alltypes_plain.avro]]}, projection=[id]
+--------AvroExec: file_groups={1 group:
[[WORKSPACE_ROOT/testing/data/avro/alltypes_plain.avro]]}, projection=[bool_col]
diff --git a/datafusion/sqllogictest/test_files/json.slt
b/datafusion/sqllogictest/test_files/json.slt
index 69902f2982..f903e48063 100644
--- a/datafusion/sqllogictest/test_files/json.slt
+++ b/datafusion/sqllogictest/test_files/json.slt
@@ -50,13 +50,13 @@ EXPLAIN SELECT count(*) from json_test
----
logical_plan
Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1)) AS COUNT(*)]]
---TableScan: json_test projection=[a]
+--TableScan: json_test projection=[c]
physical_plan
AggregateExec: mode=Final, gby=[], aggr=[COUNT(*)]
--CoalescePartitionsExec
----AggregateExec: mode=Partial, gby=[], aggr=[COUNT(*)]
------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
---------JsonExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/2.json]]}, projection=[a]
+--------JsonExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/2.json]]}, projection=[c]
query error DataFusion error: Schema error: No field named mycol\.
SELECT mycol FROM single_nan