This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 35f786bb6 Fix predicate pushdown bugs: project columns within
DatafusionArrowPredicate (#4005) (#4006) (#4021)
35f786bb6 is described below
commit 35f786bb6ce33cbd58db3e16a46958b58f7676f4
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Mon Oct 31 11:40:10 2022 +1300
Fix predicate pushdown bugs: project columns within
DatafusionArrowPredicate (#4005) (#4006) (#4021)
* Project columns within DatafusionArrowPredicate (#4005) (#4006)
* Add test
* Format
* Fix merge blunder
Co-authored-by: Andrew Lamb <[email protected]>
---
.../core/src/physical_plan/file_format/parquet.rs | 28 ++++++++++++++++++++++
.../src/physical_plan/file_format/row_filter.rs | 25 ++++++++++++++++---
2 files changed, 50 insertions(+), 3 deletions(-)
diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs
b/datafusion/core/src/physical_plan/file_format/parquet.rs
index f9ec72ab0..7573a263b 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet.rs
@@ -1676,6 +1676,34 @@ mod tests {
assert_eq!(get_value(&metrics, "pushdown_rows_filtered"), 5);
}
+ #[tokio::test]
+ async fn multi_column_predicate_pushdown() {
+ let c1: ArrayRef =
+ Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
+
+ let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2),
None]));
+
+ let batch1 = create_batch(vec![("c1", c1.clone()), ("c2",
c2.clone())]);
+
+ // Columns in different order to schema
+ let filter = col("c2").eq(lit(1_i64)).or(col("c1").eq(lit("bar")));
+
+ // read/write them files:
+ let read = round_trip_to_parquet(vec![batch1], None, None,
Some(filter), true)
+ .await
+ .unwrap();
+
+ let expected = vec![
+ "+-----+----+",
+ "| c1 | c2 |",
+ "+-----+----+",
+ "| Foo | 1 |",
+ "| bar | |",
+ "+-----+----+",
+ ];
+ assert_batches_sorted_eq!(expected, &read);
+ }
+
#[tokio::test]
async fn evolved_schema_incompatible_types() {
let c1: ArrayRef =
diff --git a/datafusion/core/src/physical_plan/file_format/row_filter.rs
b/datafusion/core/src/physical_plan/file_format/row_filter.rs
index 49ec6b5ca..54bf4bb8f 100644
--- a/datafusion/core/src/physical_plan/file_format/row_filter.rs
+++ b/datafusion/core/src/physical_plan/file_format/row_filter.rs
@@ -67,7 +67,8 @@ use crate::physical_plan::metrics;
#[derive(Debug)]
pub(crate) struct DatafusionArrowPredicate {
physical_expr: Arc<dyn PhysicalExpr>,
- projection: ProjectionMask,
+ projection_mask: ProjectionMask,
+ projection: Vec<usize>,
/// how many rows were filtered out by this predicate
rows_filtered: metrics::Count,
/// how long was spent evaluating this predicate
@@ -90,9 +91,22 @@ impl DatafusionArrowPredicate {
let physical_expr =
create_physical_expr(&candidate.expr, &df_schema, &schema,
&props)?;
+ // ArrowPredicate::evaluate is passed columns in the order they appear
in the file
+ // If the predicate has multiple columns, we therefore must project
the columns based
+ // on the order they appear in the file
+ let projection = match candidate.projection.len() {
+ 0 | 1 => vec![],
+ len => {
+ let mut projection: Vec<_> = (0..len).collect();
+ projection.sort_unstable_by_key(|x| candidate.projection[*x]);
+ projection
+ }
+ };
+
Ok(Self {
physical_expr,
- projection: ProjectionMask::roots(
+ projection,
+ projection_mask: ProjectionMask::roots(
metadata.file_metadata().schema_descr(),
candidate.projection,
),
@@ -104,10 +118,15 @@ impl DatafusionArrowPredicate {
impl ArrowPredicate for DatafusionArrowPredicate {
fn projection(&self) -> &ProjectionMask {
- &self.projection
+ &self.projection_mask
}
fn evaluate(&mut self, batch: RecordBatch) -> ArrowResult<BooleanArray> {
+ let batch = match self.projection.is_empty() {
+ true => batch,
+ false => batch.project(&self.projection)?,
+ };
+
// scoped timer updates on drop
let mut timer = self.time.timer();
match self