This is an automated email from the ASF dual-hosted git repository.
jakevin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 253550c6c preserve scan projection order in push_down_projection
(#5261)
253550c6c is described below
commit 253550c6c936f75f654dcdc9480025a9ef55d9fd
Author: Eduard Karacharov <[email protected]>
AuthorDate: Wed Feb 15 08:12:00 2023 +0300
preserve scan projection order in push_down_projection (#5261)
---
.../core/src/physical_plan/file_format/csv.rs | 55 ++++++++++++++++++++++
datafusion/optimizer/src/push_down_projection.rs | 42 ++++++++++++++++-
2 files changed, 95 insertions(+), 2 deletions(-)
diff --git a/datafusion/core/src/physical_plan/file_format/csv.rs
b/datafusion/core/src/physical_plan/file_format/csv.rs
index ce535dc67..337a54f42 100644
--- a/datafusion/core/src/physical_plan/file_format/csv.rs
+++ b/datafusion/core/src/physical_plan/file_format/csv.rs
@@ -395,6 +395,61 @@ mod tests {
Ok(())
}
+ #[rstest(
+ file_compression_type,
+ case(FileCompressionType::UNCOMPRESSED),
+ case(FileCompressionType::GZIP),
+ case(FileCompressionType::BZIP2),
+ case(FileCompressionType::XZ)
+ )]
+ #[tokio::test]
+ async fn csv_exec_with_mixed_order_projection(
+ file_compression_type: FileCompressionType,
+ ) -> Result<()> {
+ let session_ctx = SessionContext::new();
+ let task_ctx = session_ctx.task_ctx();
+ let file_schema = aggr_test_schema();
+ let path = format!("{}/csv", arrow_test_data());
+ let filename = "aggregate_test_100.csv";
+
+ let file_groups = partitioned_file_groups(
+ path.as_str(),
+ filename,
+ 1,
+ FileType::CSV,
+ file_compression_type.to_owned(),
+ )?;
+
+ let mut config = partitioned_csv_config(file_schema, file_groups)?;
+ config.projection = Some(vec![4, 0, 2]);
+
+ let csv = CsvExec::new(config, true, b',',
file_compression_type.to_owned());
+ assert_eq!(13, csv.base_config.file_schema.fields().len());
+ assert_eq!(3, csv.projected_schema.fields().len());
+ assert_eq!(3, csv.schema().fields().len());
+
+ let mut stream = csv.execute(0, task_ctx)?;
+ let batch = stream.next().await.unwrap()?;
+ assert_eq!(3, batch.num_columns());
+ assert_eq!(100, batch.num_rows());
+
+ // slice of the first 5 lines
+ let expected = vec![
+ "+------------+----+-----+",
+ "| c5 | c1 | c3 |",
+ "+------------+----+-----+",
+ "| 2033001162 | c | 1 |",
+ "| 706441268 | d | -40 |",
+ "| 994303988 | b | 29 |",
+ "| 1171968280 | a | -85 |",
+ "| 1824882165 | b | -82 |",
+ "+------------+----+-----+",
+ ];
+
+ crate::assert_batches_eq!(expected, &[batch.slice(0, 5)]);
+ Ok(())
+ }
+
#[rstest(
file_compression_type,
case(FileCompressionType::UNCOMPRESSED),
diff --git a/datafusion/optimizer/src/push_down_projection.rs
b/datafusion/optimizer/src/push_down_projection.rs
index 85bdca173..b6ba3131c 100644
--- a/datafusion/optimizer/src/push_down_projection.rs
+++ b/datafusion/optimizer/src/push_down_projection.rs
@@ -523,13 +523,24 @@ fn push_down_scan(
}
}
+ // Building new projection from BTreeSet
+ // preserving source projection order if it exists
+ let projection = if let Some(original_projection) = &scan.projection {
+ original_projection
+ .clone()
+ .into_iter()
+ .filter(|idx| projection.contains(idx))
+ .collect::<Vec<_>>()
+ } else {
+ projection.into_iter().collect::<Vec<_>>()
+ };
+
// create the projected schema
let projected_fields: Vec<DFField> = projection
.iter()
.map(|i| DFField::from_qualified(&scan.table_name,
schema.fields()[*i].clone()))
.collect();
- let projection = projection.into_iter().collect::<Vec<_>>();
let projected_schema = projected_fields.to_dfschema_ref()?;
// return the table scan with projection
@@ -553,7 +564,7 @@ mod tests {
use datafusion_expr::expr::Cast;
use datafusion_expr::{
col, count, lit,
- logical_plan::{builder::LogicalPlanBuilder, JoinType},
+ logical_plan::{builder::LogicalPlanBuilder, table_scan, JoinType},
max, min, AggregateFunction, Expr,
};
use std::collections::HashMap;
@@ -642,6 +653,33 @@ mod tests {
Ok(())
}
+ #[test]
+ fn reorder_scan() -> Result<()> {
+ let schema = Schema::new(test_table_scan_fields());
+
+ let plan = table_scan(Some("test"), &schema, Some(vec![1, 0,
2]))?.build()?;
+ let expected = "TableScan: test projection=[b, a, c]";
+
+ assert_optimized_plan_eq(&plan, expected);
+
+ Ok(())
+ }
+
+ #[test]
+ fn reorder_scan_projection() -> Result<()> {
+ let schema = Schema::new(test_table_scan_fields());
+
+ let plan = table_scan(Some("test"), &schema, Some(vec![1, 0, 2]))?
+ .project(vec![col("a"), col("b")])?
+ .build()?;
+ let expected = "Projection: test.a, test.b\
+ \n TableScan: test projection=[b, a]";
+
+ assert_optimized_plan_eq(&plan, expected);
+
+ Ok(())
+ }
+
#[test]
fn reorder_projection() -> Result<()> {
let table_scan = test_table_scan()?;