This is an automated email from the ASF dual-hosted git repository.

jakevin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 253550c6c preserve scan projection order in push_down_projection 
(#5261)
253550c6c is described below

commit 253550c6c936f75f654dcdc9480025a9ef55d9fd
Author: Eduard Karacharov <[email protected]>
AuthorDate: Wed Feb 15 08:12:00 2023 +0300

    preserve scan projection order in push_down_projection (#5261)
---
 .../core/src/physical_plan/file_format/csv.rs      | 55 ++++++++++++++++++++++
 datafusion/optimizer/src/push_down_projection.rs   | 42 ++++++++++++++++-
 2 files changed, 95 insertions(+), 2 deletions(-)

diff --git a/datafusion/core/src/physical_plan/file_format/csv.rs 
b/datafusion/core/src/physical_plan/file_format/csv.rs
index ce535dc67..337a54f42 100644
--- a/datafusion/core/src/physical_plan/file_format/csv.rs
+++ b/datafusion/core/src/physical_plan/file_format/csv.rs
@@ -395,6 +395,61 @@ mod tests {
         Ok(())
     }
 
+    #[rstest(
+        file_compression_type,
+        case(FileCompressionType::UNCOMPRESSED),
+        case(FileCompressionType::GZIP),
+        case(FileCompressionType::BZIP2),
+        case(FileCompressionType::XZ)
+    )]
+    #[tokio::test]
+    async fn csv_exec_with_mixed_order_projection(
+        file_compression_type: FileCompressionType,
+    ) -> Result<()> {
+        let session_ctx = SessionContext::new();
+        let task_ctx = session_ctx.task_ctx();
+        let file_schema = aggr_test_schema();
+        let path = format!("{}/csv", arrow_test_data());
+        let filename = "aggregate_test_100.csv";
+
+        let file_groups = partitioned_file_groups(
+            path.as_str(),
+            filename,
+            1,
+            FileType::CSV,
+            file_compression_type.to_owned(),
+        )?;
+
+        let mut config = partitioned_csv_config(file_schema, file_groups)?;
+        config.projection = Some(vec![4, 0, 2]);
+
+        let csv = CsvExec::new(config, true, b',', 
file_compression_type.to_owned());
+        assert_eq!(13, csv.base_config.file_schema.fields().len());
+        assert_eq!(3, csv.projected_schema.fields().len());
+        assert_eq!(3, csv.schema().fields().len());
+
+        let mut stream = csv.execute(0, task_ctx)?;
+        let batch = stream.next().await.unwrap()?;
+        assert_eq!(3, batch.num_columns());
+        assert_eq!(100, batch.num_rows());
+
+        // slice of the first 5 lines
+        let expected = vec![
+            "+------------+----+-----+",
+            "| c5         | c1 | c3  |",
+            "+------------+----+-----+",
+            "| 2033001162 | c  | 1   |",
+            "| 706441268  | d  | -40 |",
+            "| 994303988  | b  | 29  |",
+            "| 1171968280 | a  | -85 |",
+            "| 1824882165 | b  | -82 |",
+            "+------------+----+-----+",
+        ];
+
+        crate::assert_batches_eq!(expected, &[batch.slice(0, 5)]);
+        Ok(())
+    }
+
     #[rstest(
         file_compression_type,
         case(FileCompressionType::UNCOMPRESSED),
diff --git a/datafusion/optimizer/src/push_down_projection.rs 
b/datafusion/optimizer/src/push_down_projection.rs
index 85bdca173..b6ba3131c 100644
--- a/datafusion/optimizer/src/push_down_projection.rs
+++ b/datafusion/optimizer/src/push_down_projection.rs
@@ -523,13 +523,24 @@ fn push_down_scan(
         }
     }
 
+    // Building new projection from BTreeSet
+    // preserving source projection order if it exists
+    let projection = if let Some(original_projection) = &scan.projection {
+        original_projection
+            .clone()
+            .into_iter()
+            .filter(|idx| projection.contains(idx))
+            .collect::<Vec<_>>()
+    } else {
+        projection.into_iter().collect::<Vec<_>>()
+    };
+
     // create the projected schema
     let projected_fields: Vec<DFField> = projection
         .iter()
         .map(|i| DFField::from_qualified(&scan.table_name, 
schema.fields()[*i].clone()))
         .collect();
 
-    let projection = projection.into_iter().collect::<Vec<_>>();
     let projected_schema = projected_fields.to_dfschema_ref()?;
 
     // return the table scan with projection
@@ -553,7 +564,7 @@ mod tests {
     use datafusion_expr::expr::Cast;
     use datafusion_expr::{
         col, count, lit,
-        logical_plan::{builder::LogicalPlanBuilder, JoinType},
+        logical_plan::{builder::LogicalPlanBuilder, table_scan, JoinType},
         max, min, AggregateFunction, Expr,
     };
     use std::collections::HashMap;
@@ -642,6 +653,33 @@ mod tests {
         Ok(())
     }
 
+    #[test]
+    fn reorder_scan() -> Result<()> {
+        let schema = Schema::new(test_table_scan_fields());
+
+        let plan = table_scan(Some("test"), &schema, Some(vec![1, 0, 
2]))?.build()?;
+        let expected = "TableScan: test projection=[b, a, c]";
+
+        assert_optimized_plan_eq(&plan, expected);
+
+        Ok(())
+    }
+
+    #[test]
+    fn reorder_scan_projection() -> Result<()> {
+        let schema = Schema::new(test_table_scan_fields());
+
+        let plan = table_scan(Some("test"), &schema, Some(vec![1, 0, 2]))?
+            .project(vec![col("a"), col("b")])?
+            .build()?;
+        let expected = "Projection: test.a, test.b\
+        \n  TableScan: test projection=[b, a]";
+
+        assert_optimized_plan_eq(&plan, expected);
+
+        Ok(())
+    }
+
     #[test]
     fn reorder_projection() -> Result<()> {
         let table_scan = test_table_scan()?;

Reply via email to