This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new ad4b7b7cfd fix guarantees in allways_true of PruningPredicate (#8732)
ad4b7b7cfd is described below
commit ad4b7b7cfd4a2f93bbef3c2bff8a6ce65db24b53
Author: yi wang <[email protected]>
AuthorDate: Thu Jan 4 06:01:13 2024 +0800
fix guarantees in allways_true of PruningPredicate (#8732)
* fix: check guarantees in allways_true
* Add test for allways_true
* refine comment
---------
Co-authored-by: Andrew Lamb <[email protected]>
---
.../src/datasource/physical_plan/parquet/mod.rs | 78 +++++++++++++---------
datafusion/core/src/physical_optimizer/pruning.rs | 7 +-
2 files changed, 53 insertions(+), 32 deletions(-)
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
index 76a6cc297b..9d81d8d083 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
@@ -1768,8 +1768,9 @@ mod tests {
);
}
- #[tokio::test]
- async fn parquet_exec_metrics() {
+ /// Returns a string array with contents:
+ /// "[Foo, null, bar, bar, bar, bar, zzz]"
+ fn string_batch() -> RecordBatch {
let c1: ArrayRef = Arc::new(StringArray::from(vec![
Some("Foo"),
None,
@@ -1781,9 +1782,15 @@ mod tests {
]));
// batch1: c1(string)
- let batch1 = create_batch(vec![("c1", c1.clone())]);
+ create_batch(vec![("c1", c1.clone())])
+ }
+
+ #[tokio::test]
+ async fn parquet_exec_metrics() {
+ // batch1: c1(string)
+ let batch1 = string_batch();
- // on
+ // c1 != 'bar'
let filter = col("c1").not_eq(lit("bar"));
// read/write them files:
@@ -1812,20 +1819,10 @@ mod tests {
#[tokio::test]
async fn parquet_exec_display() {
- let c1: ArrayRef = Arc::new(StringArray::from(vec![
- Some("Foo"),
- None,
- Some("bar"),
- Some("bar"),
- Some("bar"),
- Some("bar"),
- Some("zzz"),
- ]));
-
// batch1: c1(string)
- let batch1 = create_batch(vec![("c1", c1.clone())]);
+ let batch1 = string_batch();
- // on
+ // c1 != 'bar'
let filter = col("c1").not_eq(lit("bar"));
let rt = RoundTrip::new()
@@ -1854,21 +1851,15 @@ mod tests {
}
#[tokio::test]
- async fn parquet_exec_skip_empty_pruning() {
- let c1: ArrayRef = Arc::new(StringArray::from(vec![
- Some("Foo"),
- None,
- Some("bar"),
- Some("bar"),
- Some("bar"),
- Some("bar"),
- Some("zzz"),
- ]));
-
+ async fn parquet_exec_has_no_pruning_predicate_if_can_not_prune() {
// batch1: c1(string)
- let batch1 = create_batch(vec![("c1", c1.clone())]);
+ let batch1 = string_batch();
- // filter is too complicated for pruning
+ // filter is too complicated for pruning (PruningPredicate code does
not
+ // handle case expressions), so the pruning predicate will always be
+ // "true"
+
+ // WHEN c1 != bar THEN true ELSE false END
let filter = when(col("c1").not_eq(lit("bar")), lit(true))
.otherwise(lit(false))
.unwrap();
@@ -1879,7 +1870,7 @@ mod tests {
.round_trip(vec![batch1])
.await;
- // Should not contain a pruning predicate
+ // Should not contain a pruning predicate (since nothing can be pruned)
let pruning_predicate = &rt.parquet_exec.pruning_predicate;
assert!(
pruning_predicate.is_none(),
@@ -1892,6 +1883,33 @@ mod tests {
assert_eq!(predicate.unwrap().to_string(), filter_phys.to_string());
}
+ #[tokio::test]
+ async fn parquet_exec_has_pruning_predicate_for_guarantees() {
+ // batch1: c1(string)
+ let batch1 = string_batch();
+
+ // part of the filter is too complicated for pruning (PruningPredicate
code does not
+ // handle case expressions), but part (c1 = 'foo') can be used for
bloom filtering, so
+ // should still have the pruning predicate.
+
+ // c1 = 'foo' AND (WHEN c1 != bar THEN true ELSE false END)
+ let filter = col("c1").eq(lit("foo")).and(
+ when(col("c1").not_eq(lit("bar")), lit(true))
+ .otherwise(lit(false))
+ .unwrap(),
+ );
+
+ let rt = RoundTrip::new()
+ .with_predicate(filter.clone())
+ .with_pushdown_predicate()
+ .round_trip(vec![batch1])
+ .await;
+
+ // Should have a pruning predicate
+ let pruning_predicate = &rt.parquet_exec.pruning_predicate;
+ assert!(pruning_predicate.is_some());
+ }
+
/// returns the sum of all the metrics with the specified name
/// the returned set.
///
diff --git a/datafusion/core/src/physical_optimizer/pruning.rs
b/datafusion/core/src/physical_optimizer/pruning.rs
index fecbffdbb0..06cfc72824 100644
--- a/datafusion/core/src/physical_optimizer/pruning.rs
+++ b/datafusion/core/src/physical_optimizer/pruning.rs
@@ -295,9 +295,12 @@ impl PruningPredicate {
&self.predicate_expr
}
- /// Returns true if this pruning predicate is "always true" (aka will not
prune anything)
+ /// Returns true if this pruning predicate can not prune anything.
+ ///
+ /// This happens if the predicate is a literal `true` and
+ /// literal_guarantees is empty.
pub fn allways_true(&self) -> bool {
- is_always_true(&self.predicate_expr)
+ is_always_true(&self.predicate_expr) &&
self.literal_guarantees.is_empty()
}
pub(crate) fn required_columns(&self) -> &RequiredColumns {