This is an automated email from the ASF dual-hosted git repository.

comphead pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 923772997f Minor: Add more projection pushdown tests, clarify comments 
(#14963)
923772997f is described below

commit 923772997f735765433fea6a855cdff69fa7b774
Author: Andrew Lamb <[email protected]>
AuthorDate: Sun Mar 2 13:42:16 2025 -0500

    Minor: Add more projection pushdown tests, clarify comments (#14963)
    
    * Minor: Add more projection pushdown tests
    
    * Improve comment and indenting
---
 .../physical_optimizer/projection_pushdown.rs      | 69 ++++++++++++++++++----
 datafusion/datasource/src/file_scan_config.rs      | 40 ++++++-------
 2 files changed, 76 insertions(+), 33 deletions(-)

diff --git a/datafusion/core/tests/physical_optimizer/projection_pushdown.rs 
b/datafusion/core/tests/physical_optimizer/projection_pushdown.rs
index 77c32f5623..b0b5f73106 100644
--- a/datafusion/core/tests/physical_optimizer/projection_pushdown.rs
+++ b/datafusion/core/tests/physical_optimizer/projection_pushdown.rs
@@ -31,7 +31,7 @@ use datafusion_execution::object_store::ObjectStoreUrl;
 use datafusion_execution::{SendableRecordBatchStream, TaskContext};
 use datafusion_expr::{Operator, ScalarUDF, ScalarUDFImpl, Signature, 
Volatility};
 use datafusion_physical_expr::expressions::{
-    binary, col, BinaryExpr, CaseExpr, CastExpr, Column, Literal, NegativeExpr,
+    binary, cast, col, BinaryExpr, CaseExpr, CastExpr, Column, Literal, 
NegativeExpr,
 };
 use datafusion_physical_expr::ScalarFunctionExpr;
 use datafusion_physical_expr::{
@@ -1383,20 +1383,15 @@ fn test_union_after_projection() -> Result<()> {
     Ok(())
 }
 
-#[test]
-fn test_partition_col_projection_pushdown() -> Result<()> {
+/// Returns a DataSourceExec that scans a file with (int_col, string_col)
+/// and has a partitioning column partition_col (Utf8)
+fn partitioned_data_source() -> Arc<DataSourceExec> {
     let file_schema = Arc::new(Schema::new(vec![
         Field::new("int_col", DataType::Int32, true),
         Field::new("string_col", DataType::Utf8, true),
     ]));
 
-    let partitioned_schema = Arc::new(Schema::new(vec![
-        Field::new("int_col", DataType::Int32, true),
-        Field::new("string_col", DataType::Utf8, true),
-        Field::new("partition_col", DataType::Utf8, true),
-    ]));
-
-    let source = FileScanConfig::new(
+    FileScanConfig::new(
         ObjectStoreUrl::parse("test:///").unwrap(),
         file_schema.clone(),
         Arc::new(CsvSource::default()),
@@ -1404,7 +1399,13 @@ fn test_partition_col_projection_pushdown() -> 
Result<()> {
     .with_file(PartitionedFile::new("x".to_string(), 100))
     .with_table_partition_cols(vec![Field::new("partition_col", 
DataType::Utf8, true)])
     .with_projection(Some(vec![0, 1, 2]))
-    .build();
+    .build()
+}
+
+#[test]
+fn test_partition_col_projection_pushdown() -> Result<()> {
+    let source = partitioned_data_source();
+    let partitioned_schema = source.schema();
 
     let projection = Arc::new(ProjectionExec::try_new(
         vec![
@@ -1427,8 +1428,50 @@ fn test_partition_col_projection_pushdown() -> 
Result<()> {
     let after_optimize =
         ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?;
 
-    let expected = ["ProjectionExec: expr=[string_col@1 as string_col, 
partition_col@2 as partition_col, int_col@0 as int_col]",
-        "  DataSourceExec: file_groups={1 group: [[x]]}, projection=[int_col, 
string_col, partition_col], file_type=csv, has_header=false"];
+    let expected = [
+        "ProjectionExec: expr=[string_col@1 as string_col, partition_col@2 as 
partition_col, int_col@0 as int_col]",
+        "  DataSourceExec: file_groups={1 group: [[x]]}, projection=[int_col, 
string_col, partition_col], file_type=csv, has_header=false"
+    ];
+    assert_eq!(get_plan_string(&after_optimize), expected);
+
+    Ok(())
+}
+
+#[test]
+fn test_partition_col_projection_pushdown_expr() -> Result<()> {
+    let source = partitioned_data_source();
+    let partitioned_schema = source.schema();
+
+    let projection = Arc::new(ProjectionExec::try_new(
+        vec![
+            (
+                col("string_col", partitioned_schema.as_ref())?,
+                "string_col".to_string(),
+            ),
+            (
+                // CAST(partition_col, Utf8View)
+                cast(
+                    col("partition_col", partitioned_schema.as_ref())?,
+                    partitioned_schema.as_ref(),
+                    DataType::Utf8View,
+                )?,
+                "partition_col".to_string(),
+            ),
+            (
+                col("int_col", partitioned_schema.as_ref())?,
+                "int_col".to_string(),
+            ),
+        ],
+        source,
+    )?);
+
+    let after_optimize =
+        ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?;
+
+    let expected = [
+        "ProjectionExec: expr=[string_col@1 as string_col, 
CAST(partition_col@2 AS Utf8View) as partition_col, int_col@0 as int_col]",
+        "  DataSourceExec: file_groups={1 group: [[x]]}, projection=[int_col, 
string_col, partition_col], file_type=csv, has_header=false"
+    ];
     assert_eq!(get_plan_string(&after_optimize), expected);
 
     Ok(())
diff --git a/datafusion/datasource/src/file_scan_config.rs 
b/datafusion/datasource/src/file_scan_config.rs
index bee74e042f..ef951ea0cc 100644
--- a/datafusion/datasource/src/file_scan_config.rs
+++ b/datafusion/datasource/src/file_scan_config.rs
@@ -264,9 +264,9 @@ impl DataSource for FileScanConfig {
         &self,
         projection: &ProjectionExec,
     ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
-        // If there is any non-column or alias-carrier expression, Projection 
should not be removed.
         // This process can be moved into CsvExec, but it would be an overlap 
of their responsibility.
 
+        // Must be all column references, with no table partition columns 
(which can not be projected)
         let partitioned_columns_in_proj = projection.expr().iter().any(|(expr, 
_)| {
             expr.as_any()
                 .downcast_ref::<Column>()
@@ -274,25 +274,25 @@ impl DataSource for FileScanConfig {
                 .unwrap_or(false)
         });
 
-        Ok(
-            (all_alias_free_columns(projection.expr()) && 
!partitioned_columns_in_proj)
-                .then(|| {
-                    let file_scan = self.clone();
-                    let source = Arc::clone(&file_scan.file_source);
-                    let new_projections = new_projections_for_columns(
-                        projection,
-                        &file_scan
-                            .projection
-                            .clone()
-                            
.unwrap_or((0..self.file_schema.fields().len()).collect()),
-                    );
-                    file_scan
-                        // Assign projected statistics to source
-                        .with_projection(Some(new_projections))
-                        .with_source(source)
-                        .build() as _
-                }),
-        )
+        // If there is any non-column or alias-carrier expression, Projection 
should not be removed.
+        let no_aliases = all_alias_free_columns(projection.expr());
+
+        Ok((no_aliases && !partitioned_columns_in_proj).then(|| {
+            let file_scan = self.clone();
+            let source = Arc::clone(&file_scan.file_source);
+            let new_projections = new_projections_for_columns(
+                projection,
+                &file_scan
+                    .projection
+                    .clone()
+                    .unwrap_or((0..self.file_schema.fields().len()).collect()),
+            );
+            file_scan
+                // Assign projected statistics to source
+                .with_projection(Some(new_projections))
+                .with_source(source)
+                .build() as _
+        }))
     }
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to