This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 35f786bb6 Fix predicate pushdown bugs: project columns within 
DatafusionArrowPredicate (#4005) (#4006) (#4021)
35f786bb6 is described below

commit 35f786bb6ce33cbd58db3e16a46958b58f7676f4
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Mon Oct 31 11:40:10 2022 +1300

    Fix predicate pushdown bugs: project columns within 
DatafusionArrowPredicate (#4005) (#4006) (#4021)
    
    * Project columns within DatafusionArrowPredicate (#4005) (#4006)
    
    * Add test
    
    * Format
    
    * Fix merge blunder
    
    Co-authored-by: Andrew Lamb <[email protected]>
---
 .../core/src/physical_plan/file_format/parquet.rs  | 28 ++++++++++++++++++++++
 .../src/physical_plan/file_format/row_filter.rs    | 25 ++++++++++++++++---
 2 files changed, 50 insertions(+), 3 deletions(-)

diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs 
b/datafusion/core/src/physical_plan/file_format/parquet.rs
index f9ec72ab0..7573a263b 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet.rs
@@ -1676,6 +1676,34 @@ mod tests {
         assert_eq!(get_value(&metrics, "pushdown_rows_filtered"), 5);
     }
 
+    #[tokio::test]
+    async fn multi_column_predicate_pushdown() {
+        let c1: ArrayRef =
+            Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
+
+        let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), 
None]));
+
+        let batch1 = create_batch(vec![("c1", c1.clone()), ("c2", 
c2.clone())]);
+
+        // Columns in different order to schema
+        let filter = col("c2").eq(lit(1_i64)).or(col("c1").eq(lit("bar")));
+
+        // read/write them files:
+        let read = round_trip_to_parquet(vec![batch1], None, None, 
Some(filter), true)
+            .await
+            .unwrap();
+
+        let expected = vec![
+            "+-----+----+",
+            "| c1  | c2 |",
+            "+-----+----+",
+            "| Foo | 1  |",
+            "| bar |    |",
+            "+-----+----+",
+        ];
+        assert_batches_sorted_eq!(expected, &read);
+    }
+
     #[tokio::test]
     async fn evolved_schema_incompatible_types() {
         let c1: ArrayRef =
diff --git a/datafusion/core/src/physical_plan/file_format/row_filter.rs 
b/datafusion/core/src/physical_plan/file_format/row_filter.rs
index 49ec6b5ca..54bf4bb8f 100644
--- a/datafusion/core/src/physical_plan/file_format/row_filter.rs
+++ b/datafusion/core/src/physical_plan/file_format/row_filter.rs
@@ -67,7 +67,8 @@ use crate::physical_plan::metrics;
 #[derive(Debug)]
 pub(crate) struct DatafusionArrowPredicate {
     physical_expr: Arc<dyn PhysicalExpr>,
-    projection: ProjectionMask,
+    projection_mask: ProjectionMask,
+    projection: Vec<usize>,
     /// how many rows were filtered out by this predicate
     rows_filtered: metrics::Count,
     /// how long was spent evaluating this predicate
@@ -90,9 +91,22 @@ impl DatafusionArrowPredicate {
         let physical_expr =
             create_physical_expr(&candidate.expr, &df_schema, &schema, 
&props)?;
 
+        // ArrowPredicate::evaluate is passed columns in the order they appear 
in the file
+        // If the predicate has multiple columns, we therefore must project 
the columns based
+        // on the order they appear in the file
+        let projection = match candidate.projection.len() {
+            0 | 1 => vec![],
+            len => {
+                let mut projection: Vec<_> = (0..len).collect();
+                projection.sort_unstable_by_key(|x| candidate.projection[*x]);
+                projection
+            }
+        };
+
         Ok(Self {
             physical_expr,
-            projection: ProjectionMask::roots(
+            projection,
+            projection_mask: ProjectionMask::roots(
                 metadata.file_metadata().schema_descr(),
                 candidate.projection,
             ),
@@ -104,10 +118,15 @@ impl DatafusionArrowPredicate {
 
 impl ArrowPredicate for DatafusionArrowPredicate {
     fn projection(&self) -> &ProjectionMask {
-        &self.projection
+        &self.projection_mask
     }
 
     fn evaluate(&mut self, batch: RecordBatch) -> ArrowResult<BooleanArray> {
+        let batch = match self.projection.is_empty() {
+            true => batch,
+            false => batch.project(&self.projection)?,
+        };
+
         // scoped timer updates on drop
         let mut timer = self.time.timer();
         match self

Reply via email to