This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new b098893a34 Fix 2 bugs related to push down partition filters (#12902)
b098893a34 is described below

commit b098893a34f83f1a1df290168377d7622938b3f5
Author: Emil Ejbyfeldt <[email protected]>
AuthorDate: Thu Oct 17 19:22:18 2024 +0200

    Fix 2 bugs related to push down partition filters (#12902)
    
    * Report errors in partition filters
    
    This patch fixes 2 bugs. Errors in partition filters are ignored and
    that we allow partitions filters be push down for unpartition tables
    but we never evaluate such filters.
    
    The first bug is fixed by reporting errors for partition filters and
    only evaluating the filters we allowed as partition filters in
    `supports_filters_pushdown`.
    
    The second bug is fixed by only allowing partition filters to be pushed
    down when we have partition columns.
    
    * Update datafusion/sqllogictest/test_files/errors.slt
---
 datafusion/core/src/dataframe/mod.rs               |  4 +-
 datafusion/core/src/datasource/listing/helpers.rs  | 36 +++++------
 datafusion/core/src/datasource/listing/table.rs    | 69 ++++++++++++----------
 datafusion/sqllogictest/test_files/arrow_files.slt |  5 ++
 datafusion/sqllogictest/test_files/errors.slt      |  4 ++
 5 files changed, 65 insertions(+), 53 deletions(-)

diff --git a/datafusion/core/src/dataframe/mod.rs 
b/datafusion/core/src/dataframe/mod.rs
index 67e2a4780d..8a0829cd5e 100644
--- a/datafusion/core/src/dataframe/mod.rs
+++ b/datafusion/core/src/dataframe/mod.rs
@@ -2987,9 +2987,7 @@ mod tests {
             JoinType::Inner,
             Some(Expr::Literal(ScalarValue::Null)),
         )?;
-        let expected_plan = "CrossJoin:\
-        \n  TableScan: a projection=[c1], full_filters=[Boolean(NULL)]\
-        \n  TableScan: b projection=[c1]";
+        let expected_plan = "EmptyRelation";
         assert_eq!(expected_plan, format!("{}", join.into_optimized_plan()?));
 
         // JOIN ON expression must be boolean type
diff --git a/datafusion/core/src/datasource/listing/helpers.rs 
b/datafusion/core/src/datasource/listing/helpers.rs
index 72d7277d6a..47012f777a 100644
--- a/datafusion/core/src/datasource/listing/helpers.rs
+++ b/datafusion/core/src/datasource/listing/helpers.rs
@@ -24,6 +24,7 @@ use std::sync::Arc;
 use super::ListingTableUrl;
 use super::PartitionedFile;
 use crate::execution::context::SessionState;
+use datafusion_common::internal_err;
 use datafusion_common::{Result, ScalarValue};
 use datafusion_expr::{BinaryExpr, Operator};
 
@@ -285,25 +286,20 @@ async fn prune_partitions(
     let props = ExecutionProps::new();
 
     // Applies `filter` to `batch` returning `None` on error
-    let do_filter = |filter| -> Option<ArrayRef> {
-        let expr = create_physical_expr(filter, &df_schema, &props).ok()?;
-        expr.evaluate(&batch)
-            .ok()?
-            .into_array(partitions.len())
-            .ok()
+    let do_filter = |filter| -> Result<ArrayRef> {
+        let expr = create_physical_expr(filter, &df_schema, &props)?;
+        expr.evaluate(&batch)?.into_array(partitions.len())
     };
 
-    //.Compute the conjunction of the filters, ignoring errors
+    //.Compute the conjunction of the filters
     let mask = filters
         .iter()
-        .fold(None, |acc, filter| match (acc, do_filter(filter)) {
-            (Some(a), Some(b)) => Some(and(&a, b.as_boolean()).unwrap_or(a)),
-            (None, Some(r)) => Some(r.as_boolean().clone()),
-            (r, None) => r,
-        });
+        .map(|f| do_filter(f).map(|a| a.as_boolean().clone()))
+        .reduce(|a, b| Ok(and(&a?, &b?)?));
 
     let mask = match mask {
-        Some(mask) => mask,
+        Some(Ok(mask)) => mask,
+        Some(Err(err)) => return Err(err),
         None => return Ok(partitions),
     };
 
@@ -401,8 +397,8 @@ fn evaluate_partition_prefix<'a>(
 
 /// Discover the partitions on the given path and prune out files
 /// that belong to irrelevant partitions using `filters` expressions.
-/// `filters` might contain expressions that can be resolved only at the
-/// file level (e.g. Parquet row group pruning).
+/// `filters` should only contain expressions that can be evaluated
+/// using only the partition columns.
 pub async fn pruned_partition_list<'a>(
     ctx: &'a SessionState,
     store: &'a dyn ObjectStore,
@@ -413,6 +409,12 @@ pub async fn pruned_partition_list<'a>(
 ) -> Result<BoxStream<'a, Result<PartitionedFile>>> {
     // if no partition col => simply list all the files
     if partition_cols.is_empty() {
+        if !filters.is_empty() {
+            return internal_err!(
+                "Got partition filters for unpartitioned table {}",
+                table_path
+            );
+        }
         return Ok(Box::pin(
             table_path
                 .list_all_files(ctx, store, file_extension)
@@ -631,13 +633,11 @@ mod tests {
         ]);
         let filter1 = Expr::eq(col("part1"), lit("p1v2"));
         let filter2 = Expr::eq(col("part2"), lit("p2v1"));
-        // filter3 cannot be resolved at partition pruning
-        let filter3 = Expr::eq(col("part2"), col("other"));
         let pruned = pruned_partition_list(
             &state,
             store.as_ref(),
             &ListingTableUrl::parse("file:///tablepath/").unwrap(),
-            &[filter1, filter2, filter3],
+            &[filter1, filter2],
             ".parquet",
             &[
                 (String::from("part1"), DataType::Utf8),
diff --git a/datafusion/core/src/datasource/listing/table.rs 
b/datafusion/core/src/datasource/listing/table.rs
index a9c6aec175..1e9f06c20b 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -782,6 +782,16 @@ impl ListingTable {
     }
 }
 
+// Expressions can be used for parttion pruning if they can be evaluated using
+// only the partiton columns and there are partition columns.
+fn can_be_evaluted_for_partition_pruning(
+    partition_column_names: &[&str],
+    expr: &Expr,
+) -> bool {
+    !partition_column_names.is_empty()
+        && expr_applicable_for_cols(partition_column_names, expr)
+}
+
 #[async_trait]
 impl TableProvider for ListingTable {
     fn as_any(&self) -> &dyn Any {
@@ -807,10 +817,28 @@ impl TableProvider for ListingTable {
         filters: &[Expr],
         limit: Option<usize>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
+        // extract types of partition columns
+        let table_partition_cols = self
+            .options
+            .table_partition_cols
+            .iter()
+            .map(|col| Ok(self.table_schema.field_with_name(&col.0)?.clone()))
+            .collect::<Result<Vec<_>>>()?;
+
+        let table_partition_col_names = table_partition_cols
+            .iter()
+            .map(|field| field.name().as_str())
+            .collect::<Vec<_>>();
+        // If the filters can be resolved using only partition cols, there is 
no need to
+        // pushdown it to TableScan, otherwise, `unhandled` pruning predicates 
will be generated
+        let (partition_filters, filters): (Vec<_>, Vec<_>) =
+            filters.iter().cloned().partition(|filter| {
+                
can_be_evaluted_for_partition_pruning(&table_partition_col_names, filter)
+            });
         // TODO (https://github.com/apache/datafusion/issues/11600) remove 
downcast_ref from here?
         let session_state = 
state.as_any().downcast_ref::<SessionState>().unwrap();
         let (mut partitioned_file_lists, statistics) = self
-            .list_files_for_scan(session_state, filters, limit)
+            .list_files_for_scan(session_state, &partition_filters, limit)
             .await?;
 
         // if no files need to be read, return an `EmptyExec`
@@ -846,28 +874,6 @@ impl TableProvider for ListingTable {
             None => {} // no ordering required
         };
 
-        // extract types of partition columns
-        let table_partition_cols = self
-            .options
-            .table_partition_cols
-            .iter()
-            .map(|col| Ok(self.table_schema.field_with_name(&col.0)?.clone()))
-            .collect::<Result<Vec<_>>>()?;
-
-        // If the filters can be resolved using only partition cols, there is 
no need to
-        // pushdown it to TableScan, otherwise, `unhandled` pruning predicates 
will be generated
-        let table_partition_col_names = table_partition_cols
-            .iter()
-            .map(|field| field.name().as_str())
-            .collect::<Vec<_>>();
-        let filters = filters
-            .iter()
-            .filter(|filter| {
-                !expr_applicable_for_cols(&table_partition_col_names, filter)
-            })
-            .cloned()
-            .collect::<Vec<_>>();
-
         let filters = conjunction(filters.to_vec())
             .map(|expr| -> Result<_> {
                 // NOTE: Use the table schema (NOT file schema) here because 
`expr` may contain references to partition columns.
@@ -908,18 +914,17 @@ impl TableProvider for ListingTable {
         &self,
         filters: &[&Expr],
     ) -> Result<Vec<TableProviderFilterPushDown>> {
+        let partition_column_names = self
+            .options
+            .table_partition_cols
+            .iter()
+            .map(|col| col.0.as_str())
+            .collect::<Vec<_>>();
         filters
             .iter()
             .map(|filter| {
-                if expr_applicable_for_cols(
-                    &self
-                        .options
-                        .table_partition_cols
-                        .iter()
-                        .map(|col| col.0.as_str())
-                        .collect::<Vec<_>>(),
-                    filter,
-                ) {
+                if 
can_be_evaluted_for_partition_pruning(&partition_column_names, filter)
+                {
                     // if filter can be handled by partition pruning, it is 
exact
                     return Ok(TableProviderFilterPushDown::Exact);
                 }
diff --git a/datafusion/sqllogictest/test_files/arrow_files.slt 
b/datafusion/sqllogictest/test_files/arrow_files.slt
index e66ba7477f..e73acc384c 100644
--- a/datafusion/sqllogictest/test_files/arrow_files.slt
+++ b/datafusion/sqllogictest/test_files/arrow_files.slt
@@ -118,3 +118,8 @@ EXPLAIN SELECT f0 FROM arrow_partitioned WHERE part = 456
 ----
 logical_plan TableScan: arrow_partitioned projection=[f0], 
full_filters=[arrow_partitioned.part = Int32(456)]
 physical_plan ArrowExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/partitioned_table_arrow/part=456/data.arrow]]},
 projection=[f0]
+
+
+# Errors in partition filters should be reported
+query error Divide by zero error
+SELECT f0 FROM arrow_partitioned WHERE CASE WHEN true THEN 1 / 0 ELSE part END 
= 1;
diff --git a/datafusion/sqllogictest/test_files/errors.slt 
b/datafusion/sqllogictest/test_files/errors.slt
index ce09475253..da46a7e5e6 100644
--- a/datafusion/sqllogictest/test_files/errors.slt
+++ b/datafusion/sqllogictest/test_files/errors.slt
@@ -133,3 +133,7 @@ create table foo as values (1), ('foo');
 
 query error No function matches
 select 1 group by substr('');
+
+# Error in filter should be reported
+query error Divide by zero
+SELECT c2 from aggregate_test_100 where CASE WHEN true THEN 1 / 0 ELSE 0 END = 
1;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to