This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new c775e4d6ea push down filter to partition listing (#10693)
c775e4d6ea is described below

commit c775e4d6ea6dfe9c26a772b676552b9711004a3d
Author: QP Hou <[email protected]>
AuthorDate: Thu May 30 04:34:18 2024 -0700

    push down filter to partition listing (#10693)
---
 datafusion/core/src/datasource/listing/helpers.rs | 205 +++++++++++++++++++++-
 1 file changed, 202 insertions(+), 3 deletions(-)

diff --git a/datafusion/core/src/datasource/listing/helpers.rs 
b/datafusion/core/src/datasource/listing/helpers.rs
index 5b87090096..b531cf8369 100644
--- a/datafusion/core/src/datasource/listing/helpers.rs
+++ b/datafusion/core/src/datasource/listing/helpers.rs
@@ -17,11 +17,13 @@
 
 //! Helper functions for the table implementation
 
+use std::collections::HashMap;
 use std::sync::Arc;
 
 use super::PartitionedFile;
 use crate::datasource::listing::ListingTableUrl;
 use crate::execution::context::SessionState;
+use crate::logical_expr::{BinaryExpr, Operator};
 use crate::{error::Result, scalar::ScalarValue};
 
 use arrow::{
@@ -169,9 +171,17 @@ async fn list_partitions(
     store: &dyn ObjectStore,
     table_path: &ListingTableUrl,
     max_depth: usize,
+    partition_prefix: Option<Path>,
 ) -> Result<Vec<Partition>> {
     let partition = Partition {
-        path: table_path.prefix().clone(),
+        path: match partition_prefix {
+            Some(prefix) => Path::from_iter(
+                Path::from(table_path.prefix().as_ref())
+                    .parts()
+                    .chain(Path::from(prefix.as_ref()).parts()),
+            ),
+            None => table_path.prefix().clone(),
+        },
         depth: 0,
         files: None,
     };
@@ -305,6 +315,80 @@ async fn prune_partitions(
     Ok(filtered)
 }
 
+#[derive(Debug)]
+enum PartitionValue {
+    Single(String),
+    Multi,
+}
+
+fn populate_partition_values<'a>(
+    partition_values: &mut HashMap<&'a str, PartitionValue>,
+    filter: &'a Expr,
+) {
+    if let Expr::BinaryExpr(BinaryExpr {
+        ref left,
+        op,
+        ref right,
+    }) = filter
+    {
+        match op {
+            Operator::Eq => match (left.as_ref(), right.as_ref()) {
+                (Expr::Column(Column { ref name, .. }), Expr::Literal(val))
+                | (Expr::Literal(val), Expr::Column(Column { ref name, .. })) 
=> {
+                    if partition_values
+                        .insert(name, PartitionValue::Single(val.to_string()))
+                        .is_some()
+                    {
+                        partition_values.insert(name, PartitionValue::Multi);
+                    }
+                }
+                _ => {}
+            },
+            Operator::And => {
+                populate_partition_values(partition_values, left);
+                populate_partition_values(partition_values, right);
+            }
+            _ => {}
+        }
+    }
+}
+
+fn evaluate_partition_prefix<'a>(
+    partition_cols: &'a [(String, DataType)],
+    filters: &'a [Expr],
+) -> Option<Path> {
+    let mut partition_values = HashMap::new();
+    for filter in filters {
+        populate_partition_values(&mut partition_values, filter);
+    }
+
+    if partition_values.is_empty() {
+        return None;
+    }
+
+    let mut parts = vec![];
+    for (p, _) in partition_cols {
+        match partition_values.get(p.as_str()) {
+            Some(PartitionValue::Single(val)) => {
+                // if a partition only has a single literal value, then it can 
be added to the
+                // prefix
+                parts.push(format!("{p}={val}"));
+            }
+            _ => {
+                // break on the first unconstrainted partition to create a 
common prefix
+                // for all covered partitions.
+                break;
+            }
+        }
+    }
+
+    if parts.is_empty() {
+        None
+    } else {
+        Some(Path::from_iter(parts))
+    }
+}
+
 /// Discover the partitions on the given path and prune out files
 /// that belong to irrelevant partitions using `filters` expressions.
 /// `filters` might contain expressions that can be resolved only at the
@@ -327,7 +411,10 @@ pub async fn pruned_partition_list<'a>(
         ));
     }
 
-    let partitions = list_partitions(store, table_path, 
partition_cols.len()).await?;
+    let partition_prefix = evaluate_partition_prefix(partition_cols, filters);
+    let partitions =
+        list_partitions(store, table_path, partition_cols.len(), 
partition_prefix)
+            .await?;
     debug!("Listed {} partitions", partitions.len());
 
     let pruned =
@@ -416,7 +503,9 @@ where
 mod tests {
     use std::ops::Not;
 
-    use crate::logical_expr::{case, col, lit};
+    use futures::StreamExt;
+
+    use crate::logical_expr::{case, col, lit, Expr};
     use crate::test::object_store::make_test_store_and_state;
 
     use super::*;
@@ -675,4 +764,114 @@ mod tests {
         // this helper function
         assert!(expr_applicable_for_cols(&[], &lit(true)));
     }
+
+    #[test]
+    fn test_evaluate_partition_prefix() {
+        let partitions = &[
+            ("a".to_string(), DataType::Utf8),
+            ("b".to_string(), DataType::Int16),
+            ("c".to_string(), DataType::Boolean),
+        ];
+
+        assert_eq!(
+            evaluate_partition_prefix(partitions, &[col("a").eq(lit("foo"))]),
+            Some(Path::from("a=foo")),
+        );
+
+        assert_eq!(
+            evaluate_partition_prefix(partitions, &[lit("foo").eq(col("a"))]),
+            Some(Path::from("a=foo")),
+        );
+
+        assert_eq!(
+            evaluate_partition_prefix(
+                partitions,
+                &[col("a").eq(lit("foo")).and((col("b").eq(lit("bar"))))],
+            ),
+            Some(Path::from("a=foo/b=bar")),
+        );
+
+        assert_eq!(
+            evaluate_partition_prefix(
+                partitions,
+                // list of filters should be evaluated as AND
+                &[col("a").eq(lit("foo")), col("b").eq(lit("bar")),],
+            ),
+            Some(Path::from("a=foo/b=bar")),
+        );
+
+        assert_eq!(
+            evaluate_partition_prefix(
+                partitions,
+                &[col("a")
+                    .eq(lit("foo"))
+                    .and(col("b").eq(lit("1")))
+                    .and(col("c").eq(lit("true")))],
+            ),
+            Some(Path::from("a=foo/b=1/c=true")),
+        );
+
+        // no prefix when filter is empty
+        assert_eq!(evaluate_partition_prefix(partitions, &[]), None);
+
+        // b=foo results in no prefix because a is not restricted
+        assert_eq!(
+            evaluate_partition_prefix(partitions, &[Expr::eq(col("b"), 
lit("foo"))]),
+            None,
+        );
+
+        // a=foo and c=baz only results in preifx a=foo because b is not 
restricted
+        assert_eq!(
+            evaluate_partition_prefix(
+                partitions,
+                &[col("a").eq(lit("foo")).and(col("c").eq(lit("baz")))],
+            ),
+            Some(Path::from("a=foo")),
+        );
+
+        // partition with multiple values results in no prefix
+        assert_eq!(
+            evaluate_partition_prefix(
+                partitions,
+                &[Expr::and(col("a").eq(lit("foo")), col("a").eq(lit("bar")))],
+            ),
+            None,
+        );
+
+        // no prefix because partition a is not restricted to a single literal
+        assert_eq!(
+            evaluate_partition_prefix(
+                partitions,
+                &[Expr::or(col("a").eq(lit("foo")), col("a").eq(lit("bar")))],
+            ),
+            None,
+        );
+        assert_eq!(
+            evaluate_partition_prefix(partitions, &[col("b").lt(lit(5))],),
+            None,
+        );
+    }
+
+    #[test]
+    fn test_evaluate_date_partition_prefix() {
+        let partitions = &[("a".to_string(), DataType::Date32)];
+        assert_eq!(
+            evaluate_partition_prefix(
+                partitions,
+                &[col("a").eq(Expr::Literal(ScalarValue::Date32(Some(3))))],
+            ),
+            Some(Path::from("a=1970-01-04")),
+        );
+
+        let partitions = &[("a".to_string(), DataType::Date64)];
+        assert_eq!(
+            evaluate_partition_prefix(
+                partitions,
+                &[col("a").eq(Expr::Literal(ScalarValue::Date64(Some(
+                    4 * 24 * 60 * 60 * 1000
+                )))),],
+            ),
+            Some(Path::from("a=1970-01-05")),
+        );
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to