This is an automated email from the ASF dual-hosted git repository.
comphead pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 923772997f Minor: Add more projection pushdown tests, clarify comments
(#14963)
923772997f is described below
commit 923772997f735765433fea6a855cdff69fa7b774
Author: Andrew Lamb <[email protected]>
AuthorDate: Sun Mar 2 13:42:16 2025 -0500
Minor: Add more projection pushdown tests, clarify comments (#14963)
* Minor: Add more projection pushdown tests
* Improve comment and indenting
---
.../physical_optimizer/projection_pushdown.rs | 69 ++++++++++++++++++----
datafusion/datasource/src/file_scan_config.rs | 40 ++++++-------
2 files changed, 76 insertions(+), 33 deletions(-)
diff --git a/datafusion/core/tests/physical_optimizer/projection_pushdown.rs
b/datafusion/core/tests/physical_optimizer/projection_pushdown.rs
index 77c32f5623..b0b5f73106 100644
--- a/datafusion/core/tests/physical_optimizer/projection_pushdown.rs
+++ b/datafusion/core/tests/physical_optimizer/projection_pushdown.rs
@@ -31,7 +31,7 @@ use datafusion_execution::object_store::ObjectStoreUrl;
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_expr::{Operator, ScalarUDF, ScalarUDFImpl, Signature,
Volatility};
use datafusion_physical_expr::expressions::{
- binary, col, BinaryExpr, CaseExpr, CastExpr, Column, Literal, NegativeExpr,
+ binary, cast, col, BinaryExpr, CaseExpr, CastExpr, Column, Literal,
NegativeExpr,
};
use datafusion_physical_expr::ScalarFunctionExpr;
use datafusion_physical_expr::{
@@ -1383,20 +1383,15 @@ fn test_union_after_projection() -> Result<()> {
Ok(())
}
-#[test]
-fn test_partition_col_projection_pushdown() -> Result<()> {
+/// Returns a DataSourceExec that scans a file with (int_col, string_col)
+/// and has a partitioning column partition_col (Utf8)
+fn partitioned_data_source() -> Arc<DataSourceExec> {
let file_schema = Arc::new(Schema::new(vec![
Field::new("int_col", DataType::Int32, true),
Field::new("string_col", DataType::Utf8, true),
]));
- let partitioned_schema = Arc::new(Schema::new(vec![
- Field::new("int_col", DataType::Int32, true),
- Field::new("string_col", DataType::Utf8, true),
- Field::new("partition_col", DataType::Utf8, true),
- ]));
-
- let source = FileScanConfig::new(
+ FileScanConfig::new(
ObjectStoreUrl::parse("test:///").unwrap(),
file_schema.clone(),
Arc::new(CsvSource::default()),
@@ -1404,7 +1399,13 @@ fn test_partition_col_projection_pushdown() ->
Result<()> {
.with_file(PartitionedFile::new("x".to_string(), 100))
.with_table_partition_cols(vec![Field::new("partition_col",
DataType::Utf8, true)])
.with_projection(Some(vec![0, 1, 2]))
- .build();
+ .build()
+}
+
+#[test]
+fn test_partition_col_projection_pushdown() -> Result<()> {
+ let source = partitioned_data_source();
+ let partitioned_schema = source.schema();
let projection = Arc::new(ProjectionExec::try_new(
vec![
@@ -1427,8 +1428,50 @@ fn test_partition_col_projection_pushdown() ->
Result<()> {
let after_optimize =
ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?;
- let expected = ["ProjectionExec: expr=[string_col@1 as string_col,
partition_col@2 as partition_col, int_col@0 as int_col]",
- " DataSourceExec: file_groups={1 group: [[x]]}, projection=[int_col,
string_col, partition_col], file_type=csv, has_header=false"];
+ let expected = [
+ "ProjectionExec: expr=[string_col@1 as string_col, partition_col@2 as
partition_col, int_col@0 as int_col]",
+ " DataSourceExec: file_groups={1 group: [[x]]}, projection=[int_col,
string_col, partition_col], file_type=csv, has_header=false"
+ ];
+ assert_eq!(get_plan_string(&after_optimize), expected);
+
+ Ok(())
+}
+
+#[test]
+fn test_partition_col_projection_pushdown_expr() -> Result<()> {
+ let source = partitioned_data_source();
+ let partitioned_schema = source.schema();
+
+ let projection = Arc::new(ProjectionExec::try_new(
+ vec![
+ (
+ col("string_col", partitioned_schema.as_ref())?,
+ "string_col".to_string(),
+ ),
+ (
+ // CAST(partition_col, Utf8View)
+ cast(
+ col("partition_col", partitioned_schema.as_ref())?,
+ partitioned_schema.as_ref(),
+ DataType::Utf8View,
+ )?,
+ "partition_col".to_string(),
+ ),
+ (
+ col("int_col", partitioned_schema.as_ref())?,
+ "int_col".to_string(),
+ ),
+ ],
+ source,
+ )?);
+
+ let after_optimize =
+ ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?;
+
+ let expected = [
+ "ProjectionExec: expr=[string_col@1 as string_col,
CAST(partition_col@2 AS Utf8View) as partition_col, int_col@0 as int_col]",
+ " DataSourceExec: file_groups={1 group: [[x]]}, projection=[int_col,
string_col, partition_col], file_type=csv, has_header=false"
+ ];
assert_eq!(get_plan_string(&after_optimize), expected);
Ok(())
diff --git a/datafusion/datasource/src/file_scan_config.rs
b/datafusion/datasource/src/file_scan_config.rs
index bee74e042f..ef951ea0cc 100644
--- a/datafusion/datasource/src/file_scan_config.rs
+++ b/datafusion/datasource/src/file_scan_config.rs
@@ -264,9 +264,9 @@ impl DataSource for FileScanConfig {
&self,
projection: &ProjectionExec,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
- // If there is any non-column or alias-carrier expression, Projection
should not be removed.
// This process can be moved into CsvExec, but it would be an overlap
of their responsibility.
+ // Must be all column references, with no table partition columns
(which can not be projected)
let partitioned_columns_in_proj = projection.expr().iter().any(|(expr,
_)| {
expr.as_any()
.downcast_ref::<Column>()
@@ -274,25 +274,25 @@ impl DataSource for FileScanConfig {
.unwrap_or(false)
});
- Ok(
- (all_alias_free_columns(projection.expr()) &&
!partitioned_columns_in_proj)
- .then(|| {
- let file_scan = self.clone();
- let source = Arc::clone(&file_scan.file_source);
- let new_projections = new_projections_for_columns(
- projection,
- &file_scan
- .projection
- .clone()
-
.unwrap_or((0..self.file_schema.fields().len()).collect()),
- );
- file_scan
- // Assign projected statistics to source
- .with_projection(Some(new_projections))
- .with_source(source)
- .build() as _
- }),
- )
+ // If there is any non-column or alias-carrier expression, Projection
should not be removed.
+ let no_aliases = all_alias_free_columns(projection.expr());
+
+ Ok((no_aliases && !partitioned_columns_in_proj).then(|| {
+ let file_scan = self.clone();
+ let source = Arc::clone(&file_scan.file_source);
+ let new_projections = new_projections_for_columns(
+ projection,
+ &file_scan
+ .projection
+ .clone()
+ .unwrap_or((0..self.file_schema.fields().len()).collect()),
+ );
+ file_scan
+ // Assign projected statistics to source
+ .with_projection(Some(new_projections))
+ .with_source(source)
+ .build() as _
+ }))
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]