This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new c775e4d6ea push down filter to partition listing (#10693)
c775e4d6ea is described below
commit c775e4d6ea6dfe9c26a772b676552b9711004a3d
Author: QP Hou <[email protected]>
AuthorDate: Thu May 30 04:34:18 2024 -0700
push down filter to partition listing (#10693)
---
datafusion/core/src/datasource/listing/helpers.rs | 205 +++++++++++++++++++++-
1 file changed, 202 insertions(+), 3 deletions(-)
diff --git a/datafusion/core/src/datasource/listing/helpers.rs
b/datafusion/core/src/datasource/listing/helpers.rs
index 5b87090096..b531cf8369 100644
--- a/datafusion/core/src/datasource/listing/helpers.rs
+++ b/datafusion/core/src/datasource/listing/helpers.rs
@@ -17,11 +17,13 @@
//! Helper functions for the table implementation
+use std::collections::HashMap;
use std::sync::Arc;
use super::PartitionedFile;
use crate::datasource::listing::ListingTableUrl;
use crate::execution::context::SessionState;
+use crate::logical_expr::{BinaryExpr, Operator};
use crate::{error::Result, scalar::ScalarValue};
use arrow::{
@@ -169,9 +171,17 @@ async fn list_partitions(
store: &dyn ObjectStore,
table_path: &ListingTableUrl,
max_depth: usize,
+ partition_prefix: Option<Path>,
) -> Result<Vec<Partition>> {
let partition = Partition {
- path: table_path.prefix().clone(),
+ path: match partition_prefix {
+ Some(prefix) => Path::from_iter(
+ Path::from(table_path.prefix().as_ref())
+ .parts()
+ .chain(Path::from(prefix.as_ref()).parts()),
+ ),
+ None => table_path.prefix().clone(),
+ },
depth: 0,
files: None,
};
@@ -305,6 +315,80 @@ async fn prune_partitions(
Ok(filtered)
}
+#[derive(Debug)]
+enum PartitionValue {
+ Single(String),
+ Multi,
+}
+
+fn populate_partition_values<'a>(
+ partition_values: &mut HashMap<&'a str, PartitionValue>,
+ filter: &'a Expr,
+) {
+ if let Expr::BinaryExpr(BinaryExpr {
+ ref left,
+ op,
+ ref right,
+ }) = filter
+ {
+ match op {
+ Operator::Eq => match (left.as_ref(), right.as_ref()) {
+ (Expr::Column(Column { ref name, .. }), Expr::Literal(val))
+ | (Expr::Literal(val), Expr::Column(Column { ref name, .. }))
=> {
+ if partition_values
+ .insert(name, PartitionValue::Single(val.to_string()))
+ .is_some()
+ {
+ partition_values.insert(name, PartitionValue::Multi);
+ }
+ }
+ _ => {}
+ },
+ Operator::And => {
+ populate_partition_values(partition_values, left);
+ populate_partition_values(partition_values, right);
+ }
+ _ => {}
+ }
+ }
+}
+
+fn evaluate_partition_prefix<'a>(
+ partition_cols: &'a [(String, DataType)],
+ filters: &'a [Expr],
+) -> Option<Path> {
+ let mut partition_values = HashMap::new();
+ for filter in filters {
+ populate_partition_values(&mut partition_values, filter);
+ }
+
+ if partition_values.is_empty() {
+ return None;
+ }
+
+ let mut parts = vec![];
+ for (p, _) in partition_cols {
+ match partition_values.get(p.as_str()) {
+ Some(PartitionValue::Single(val)) => {
+ // if a partition only has a single literal value, then it can
be added to the
+ // prefix
+ parts.push(format!("{p}={val}"));
+ }
+ _ => {
+ // break on the first unconstrainted partition to create a
common prefix
+ // for all covered partitions.
+ break;
+ }
+ }
+ }
+
+ if parts.is_empty() {
+ None
+ } else {
+ Some(Path::from_iter(parts))
+ }
+}
+
/// Discover the partitions on the given path and prune out files
/// that belong to irrelevant partitions using `filters` expressions.
/// `filters` might contain expressions that can be resolved only at the
@@ -327,7 +411,10 @@ pub async fn pruned_partition_list<'a>(
));
}
- let partitions = list_partitions(store, table_path,
partition_cols.len()).await?;
+ let partition_prefix = evaluate_partition_prefix(partition_cols, filters);
+ let partitions =
+ list_partitions(store, table_path, partition_cols.len(),
partition_prefix)
+ .await?;
debug!("Listed {} partitions", partitions.len());
let pruned =
@@ -416,7 +503,9 @@ where
mod tests {
use std::ops::Not;
- use crate::logical_expr::{case, col, lit};
+ use futures::StreamExt;
+
+ use crate::logical_expr::{case, col, lit, Expr};
use crate::test::object_store::make_test_store_and_state;
use super::*;
@@ -675,4 +764,114 @@ mod tests {
// this helper function
assert!(expr_applicable_for_cols(&[], &lit(true)));
}
+
+ #[test]
+ fn test_evaluate_partition_prefix() {
+ let partitions = &[
+ ("a".to_string(), DataType::Utf8),
+ ("b".to_string(), DataType::Int16),
+ ("c".to_string(), DataType::Boolean),
+ ];
+
+ assert_eq!(
+ evaluate_partition_prefix(partitions, &[col("a").eq(lit("foo"))]),
+ Some(Path::from("a=foo")),
+ );
+
+ assert_eq!(
+ evaluate_partition_prefix(partitions, &[lit("foo").eq(col("a"))]),
+ Some(Path::from("a=foo")),
+ );
+
+ assert_eq!(
+ evaluate_partition_prefix(
+ partitions,
+ &[col("a").eq(lit("foo")).and((col("b").eq(lit("bar"))))],
+ ),
+ Some(Path::from("a=foo/b=bar")),
+ );
+
+ assert_eq!(
+ evaluate_partition_prefix(
+ partitions,
+ // list of filters should be evaluated as AND
+ &[col("a").eq(lit("foo")), col("b").eq(lit("bar")),],
+ ),
+ Some(Path::from("a=foo/b=bar")),
+ );
+
+ assert_eq!(
+ evaluate_partition_prefix(
+ partitions,
+ &[col("a")
+ .eq(lit("foo"))
+ .and(col("b").eq(lit("1")))
+ .and(col("c").eq(lit("true")))],
+ ),
+ Some(Path::from("a=foo/b=1/c=true")),
+ );
+
+ // no prefix when filter is empty
+ assert_eq!(evaluate_partition_prefix(partitions, &[]), None);
+
+ // b=foo results in no prefix because a is not restricted
+ assert_eq!(
+ evaluate_partition_prefix(partitions, &[Expr::eq(col("b"),
lit("foo"))]),
+ None,
+ );
+
+ // a=foo and c=baz only results in preifx a=foo because b is not
restricted
+ assert_eq!(
+ evaluate_partition_prefix(
+ partitions,
+ &[col("a").eq(lit("foo")).and(col("c").eq(lit("baz")))],
+ ),
+ Some(Path::from("a=foo")),
+ );
+
+ // partition with multiple values results in no prefix
+ assert_eq!(
+ evaluate_partition_prefix(
+ partitions,
+ &[Expr::and(col("a").eq(lit("foo")), col("a").eq(lit("bar")))],
+ ),
+ None,
+ );
+
+ // no prefix because partition a is not restricted to a single literal
+ assert_eq!(
+ evaluate_partition_prefix(
+ partitions,
+ &[Expr::or(col("a").eq(lit("foo")), col("a").eq(lit("bar")))],
+ ),
+ None,
+ );
+ assert_eq!(
+ evaluate_partition_prefix(partitions, &[col("b").lt(lit(5))],),
+ None,
+ );
+ }
+
+ #[test]
+ fn test_evaluate_date_partition_prefix() {
+ let partitions = &[("a".to_string(), DataType::Date32)];
+ assert_eq!(
+ evaluate_partition_prefix(
+ partitions,
+ &[col("a").eq(Expr::Literal(ScalarValue::Date32(Some(3))))],
+ ),
+ Some(Path::from("a=1970-01-04")),
+ );
+
+ let partitions = &[("a".to_string(), DataType::Date64)];
+ assert_eq!(
+ evaluate_partition_prefix(
+ partitions,
+ &[col("a").eq(Expr::Literal(ScalarValue::Date64(Some(
+ 4 * 24 * 60 * 60 * 1000
+ )))),],
+ ),
+ Some(Path::from("a=1970-01-05")),
+ );
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]