This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 7acd8833cc Add small column on empty projection (#7833)
7acd8833cc is described below

commit 7acd8833cc5d03ba7643d4ae424553c7681ccce8
Author: Christoph Schulze <[email protected]>
AuthorDate: Wed Oct 18 13:06:51 2023 +0200

    Add small column on empty projection (#7833)
    
    * Find small column when projection is empty
    
    * clippy
    
    * fix comment
    
    * fix avro.slt test
    
    * use min_by
    
    * clippy
---
 datafusion/core/tests/sql/explain_analyze.rs     |   2 +-
 datafusion/optimizer/src/push_down_projection.rs | 201 ++++++++++++++++++++---
 datafusion/sqllogictest/test_files/avro.slt      |   4 +-
 datafusion/sqllogictest/test_files/json.slt      |   4 +-
 4 files changed, 179 insertions(+), 32 deletions(-)

diff --git a/datafusion/core/tests/sql/explain_analyze.rs 
b/datafusion/core/tests/sql/explain_analyze.rs
index c328f46be7..7238369f83 100644
--- a/datafusion/core/tests/sql/explain_analyze.rs
+++ b/datafusion/core/tests/sql/explain_analyze.rs
@@ -788,7 +788,7 @@ async fn explain_logical_plan_only() {
             "logical_plan",
             "Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1)) AS COUNT(*)]]\
             \n  SubqueryAlias: t\
-            \n    Projection: column1\
+            \n    Projection: column2\
             \n      Values: (Utf8(\"a\"), Int64(1), Int64(100)), (Utf8(\"a\"), 
Int64(2), Int64(150))"
         ]];
     assert_eq!(expected, actual);
diff --git a/datafusion/optimizer/src/push_down_projection.rs 
b/datafusion/optimizer/src/push_down_projection.rs
index 6db4bb9ba4..839f6b5bb8 100644
--- a/datafusion/optimizer/src/push_down_projection.rs
+++ b/datafusion/optimizer/src/push_down_projection.rs
@@ -23,6 +23,7 @@ use crate::merge_projection::merge_projection;
 use crate::optimizer::ApplyOrder;
 use crate::push_down_filter::replace_cols_by_name;
 use crate::{OptimizerConfig, OptimizerRule};
+use arrow::datatypes::DataType;
 use arrow::error::Result as ArrowResult;
 use datafusion_common::ScalarValue::UInt8;
 use datafusion_common::{
@@ -148,8 +149,10 @@ impl OptimizerRule for PushDownProjection {
             {
                 let mut used_columns: HashSet<Column> = HashSet::new();
                 if projection_is_empty {
-                    used_columns
-                        
.insert(scan.projected_schema.fields()[0].qualified_column());
+                    let field = 
find_small_field(scan.projected_schema.fields()).ok_or(
+                        DataFusionError::Internal("Scan with empty 
schema".to_string()),
+                    )?;
+                    used_columns.insert(field.qualified_column());
                     push_down_scan(&used_columns, scan, true)?
                 } else {
                     for expr in projection.expr.iter() {
@@ -161,10 +164,13 @@ impl OptimizerRule for PushDownProjection {
                 }
             }
             LogicalPlan::Values(values) if projection_is_empty => {
-                let first_col =
-                    Expr::Column(values.schema.fields()[0].qualified_column());
+                let field = find_small_field(values.schema.fields()).ok_or(
+                    DataFusionError::Internal("Values with empty 
schema".to_string()),
+                )?;
+                let column = Expr::Column(field.qualified_column());
+
                 LogicalPlan::Projection(Projection::try_new(
-                    vec![first_col],
+                    vec![column],
                     Arc::new(child_plan.clone()),
                 )?)
             }
@@ -423,7 +429,88 @@ pub fn collect_projection_expr(projection: &Projection) -> 
HashMap<String, Expr>
         .collect::<HashMap<_, _>>()
 }
 
-// Get the projection exprs from columns in the order of the schema
+/// Accumulate the memory size of a data type measured in bits.
+///
+/// Types with a variable size get assigned with a fixed size which is greater 
than most
+/// primitive types.
+///
+/// While traversing nested types, `nesting` is incremented on every level.
+fn nested_size(data_type: &DataType, nesting: &mut usize) -> usize {
+    use DataType::*;
+    if data_type.is_primitive() {
+        return data_type.primitive_width().unwrap_or(1) * 8;
+    }
+
+    if data_type.is_nested() {
+        *nesting += 1;
+    }
+
+    match data_type {
+        Null => 0,
+        Boolean => 1,
+        Binary | Utf8 => 128,
+        LargeBinary | LargeUtf8 => 256,
+        FixedSizeBinary(bytes) => (*bytes * 8) as usize,
+        // primitive types
+        Int8
+        | Int16
+        | Int32
+        | Int64
+        | UInt8
+        | UInt16
+        | UInt32
+        | UInt64
+        | Float16
+        | Float32
+        | Float64
+        | Timestamp(_, _)
+        | Date32
+        | Date64
+        | Time32(_)
+        | Time64(_)
+        | Duration(_)
+        | Interval(_)
+        | Dictionary(_, _)
+        | Decimal128(_, _)
+        | Decimal256(_, _) => data_type.primitive_width().unwrap_or(1) * 8,
+        // nested types
+        List(f) => nested_size(f.data_type(), nesting),
+        FixedSizeList(_, s) => (s * 8) as usize,
+        LargeList(f) => nested_size(f.data_type(), nesting),
+        Struct(fields) => fields
+            .iter()
+            .map(|f| nested_size(f.data_type(), nesting))
+            .sum(),
+        Union(fields, _) => fields
+            .iter()
+            .map(|(_, f)| nested_size(f.data_type(), nesting))
+            .sum(),
+        Map(field, _) => nested_size(field.data_type(), nesting),
+        RunEndEncoded(run_ends, values) => {
+            nested_size(run_ends.data_type(), nesting)
+                + nested_size(values.data_type(), nesting)
+        }
+    }
+}
+
+/// Find a field with a presumable small memory footprint based on its data 
type's memory size
+/// and the level of nesting.
+fn find_small_field(fields: &[DFField]) -> Option<DFField> {
+    fields
+        .iter()
+        .map(|f| {
+            let nesting = &mut 0;
+            let size = nested_size(f.data_type(), nesting);
+            (*nesting, size)
+        })
+        .enumerate()
+        .min_by(|(_, (nesting_a, size_a)), (_, (nesting_b, size_b))| {
+            nesting_a.cmp(nesting_b).then(size_a.cmp(size_b))
+        })
+        .map(|(i, _)| fields[i].clone())
+}
+
+/// Get the projection exprs from columns in the order of the schema
 fn get_expr(columns: &HashSet<Column>, schema: &DFSchemaRef) -> 
Result<Vec<Expr>> {
     let expr = schema
         .fields()
@@ -489,23 +576,14 @@ fn push_down_scan(
         .filter_map(ArrowResult::ok)
         .collect();
 
-    if projection.is_empty() {
-        if has_projection && !schema.fields().is_empty() {
-            // Ensure that we are reading at least one column from the table 
in case the query
-            // does not reference any columns directly such as "SELECT 
COUNT(1) FROM table",
-            // except when the table is empty (no column)
-            projection.insert(0);
-        } else {
-            // for table scan without projection, we default to return all 
columns
-            projection = scan
-                .source
-                .schema()
-                .fields()
-                .iter()
-                .enumerate()
-                .map(|(i, _)| i)
-                .collect::<BTreeSet<usize>>();
-        }
+    if !has_projection && projection.is_empty() {
+        // for table scan without projection, we default to return all columns
+        projection = schema
+            .fields()
+            .iter()
+            .enumerate()
+            .map(|(i, _)| i)
+            .collect::<BTreeSet<usize>>();
     }
 
     // Building new projection from BTreeSet
@@ -562,7 +640,7 @@ mod tests {
     use crate::optimizer::Optimizer;
     use crate::test::*;
     use crate::OptimizerContext;
-    use arrow::datatypes::{DataType, Field, Schema};
+    use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
     use datafusion_common::DFSchema;
     use datafusion_expr::builder::table_scan_with_filters;
     use datafusion_expr::expr;
@@ -922,7 +1000,7 @@ mod tests {
             .project(vec![lit(1_i64), lit(2_i64)])?
             .build()?;
         let expected = "Projection: Int64(1), Int64(2)\
-                      \n  TableScan: test projection=[a]";
+                      \n  TableScan: test projection=[]";
         assert_optimized_plan_eq(&plan, expected)
     }
 
@@ -969,7 +1047,7 @@ mod tests {
 
         let expected = "\
         Projection: Int32(1) AS a\
-        \n  TableScan: test projection=[a]";
+        \n  TableScan: test projection=[]";
 
         assert_optimized_plan_eq(&plan, expected)
     }
@@ -998,7 +1076,7 @@ mod tests {
 
         let expected = "\
         Projection: Int32(1) AS a\
-        \n  TableScan: test projection=[a], full_filters=[b = Int32(1)]";
+        \n  TableScan: test projection=[], full_filters=[b = Int32(1)]";
 
         assert_optimized_plan_eq(&plan, expected)
     }
@@ -1154,4 +1232,73 @@ mod tests {
             .unwrap_or(optimized_plan);
         Ok(optimized_plan)
     }
+
+    #[test]
+    fn test_nested_size() {
+        use DataType::*;
+        let nesting = &mut 0;
+        assert_eq!(nested_size(&Null, nesting), 0);
+        assert_eq!(*nesting, 0);
+        assert_eq!(nested_size(&Boolean, nesting), 1);
+        assert_eq!(*nesting, 0);
+        assert_eq!(nested_size(&UInt8, nesting), 8);
+        assert_eq!(*nesting, 0);
+        assert_eq!(nested_size(&Int64, nesting), 64);
+        assert_eq!(*nesting, 0);
+        assert_eq!(nested_size(&Decimal256(5, 2), nesting), 256);
+        assert_eq!(*nesting, 0);
+        assert_eq!(
+            nested_size(&List(Arc::new(Field::new("A", Int64, true))), 
nesting),
+            64
+        );
+        assert_eq!(*nesting, 1);
+        *nesting = 0;
+        assert_eq!(
+            nested_size(
+                &List(Arc::new(Field::new(
+                    "A",
+                    List(Arc::new(Field::new("AA", Int64, true))),
+                    true
+                ))),
+                nesting
+            ),
+            64
+        );
+        assert_eq!(*nesting, 2);
+    }
+
+    #[test]
+    fn test_find_small_field() {
+        use DataType::*;
+        let int32 = DFField::from(Field::new("a", Int32, false));
+        let bin = DFField::from(Field::new("b", Binary, false));
+        let list_i64 = DFField::from(Field::new(
+            "c",
+            List(Arc::new(Field::new("c_1", Int64, true))),
+            false,
+        ));
+        let time_s = DFField::from(Field::new("d", Time32(TimeUnit::Second), 
false));
+
+        assert_eq!(
+            find_small_field(&[
+                int32.clone(),
+                bin.clone(),
+                list_i64.clone(),
+                time_s.clone()
+            ]),
+            Some(int32.clone())
+        );
+        assert_eq!(
+            find_small_field(&[bin.clone(), list_i64.clone(), time_s.clone()]),
+            Some(time_s.clone())
+        );
+        assert_eq!(
+            find_small_field(&[time_s.clone(), int32.clone()]),
+            Some(time_s.clone())
+        );
+        assert_eq!(
+            find_small_field(&[bin.clone(), list_i64.clone()]),
+            Some(bin.clone())
+        );
+    }
 }
diff --git a/datafusion/sqllogictest/test_files/avro.slt 
b/datafusion/sqllogictest/test_files/avro.slt
index 5cd268e8ef..bd2ba70666 100644
--- a/datafusion/sqllogictest/test_files/avro.slt
+++ b/datafusion/sqllogictest/test_files/avro.slt
@@ -253,10 +253,10 @@ EXPLAIN SELECT count(*) from alltypes_plain
 ----
 logical_plan
 Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1)) AS COUNT(*)]]
---TableScan: alltypes_plain projection=[id]
+--TableScan: alltypes_plain projection=[bool_col]
 physical_plan
 AggregateExec: mode=Final, gby=[], aggr=[COUNT(*)]
 --CoalescePartitionsExec
 ----AggregateExec: mode=Partial, gby=[], aggr=[COUNT(*)]
 ------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
---------AvroExec: file_groups={1 group: 
[[WORKSPACE_ROOT/testing/data/avro/alltypes_plain.avro]]}, projection=[id]
+--------AvroExec: file_groups={1 group: 
[[WORKSPACE_ROOT/testing/data/avro/alltypes_plain.avro]]}, projection=[bool_col]
diff --git a/datafusion/sqllogictest/test_files/json.slt 
b/datafusion/sqllogictest/test_files/json.slt
index 69902f2982..f903e48063 100644
--- a/datafusion/sqllogictest/test_files/json.slt
+++ b/datafusion/sqllogictest/test_files/json.slt
@@ -50,13 +50,13 @@ EXPLAIN SELECT count(*) from json_test
 ----
 logical_plan
 Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1)) AS COUNT(*)]]
---TableScan: json_test projection=[a]
+--TableScan: json_test projection=[c]
 physical_plan
 AggregateExec: mode=Final, gby=[], aggr=[COUNT(*)]
 --CoalescePartitionsExec
 ----AggregateExec: mode=Partial, gby=[], aggr=[COUNT(*)]
 ------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
---------JsonExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/2.json]]}, projection=[a]
+--------JsonExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/2.json]]}, projection=[c]
 
 query error DataFusion error: Schema error: No field named mycol\.
 SELECT mycol FROM single_nan

Reply via email to