This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new ab2823475d Support `columns_sorted` in row_filters (#20497)
ab2823475d is described below
commit ab2823475d0c79a749120ae354572ab85c043b78
Author: Konstantin Tarasov <[email protected]>
AuthorDate: Sun Mar 15 18:09:50 2026 -0400
Support `columns_sorted` in row_filters (#20497)
## Which issue does this PR close?
<!--
We generally require a GitHub issue to be filed for all bug fixes and
enhancements and this helps us generate change logs for our releases.
You can link an issue to this PR using the GitHub syntax. For example
`Closes #123` indicates that this PR will close issue #123.
-->
- Closes #3476.
## Rationale for this change
Improving predicate ordering for predicate pushdown
<!--
Why are you proposing this change? If this is already explained clearly
in the issue then this section is not needed.
Explaining clearly why changes are proposed helps reviewers understand
your changes and offer better suggestions for fixes.
-->
## What changes are included in this PR?
Building on changes from #3477 and #7528
- Implement the `columns_sorted` function
- Change `should_enable_page_index` to use index when choose to reorder
predicates in config
<!--
There is no need to duplicate the description in the issue here but it
is sometimes worth providing a summary of the individual changes in this
PR.
-->
## Are these changes tested?
Yes, unit tests
<!--
We typically require tests for all PRs in order to:
1. Prevent the code from being accidentally broken by subsequent changes
2. Serve as another way to document the expected behavior of the code
If tests are not included in your PR, please explain why (for example,
are they covered by existing tests)?
-->
## Are there any user-facing changes?
No
<!--
If there are user-facing changes then we may require documentation to be
updated before approving the PR.
-->
<!--
If there are any breaking changes to public APIs, please add the `api
change` label.
-->
---
benchmarks/README.md | 6 ++---
datafusion/datasource-parquet/src/row_filter.rs | 34 +++----------------------
2 files changed, 7 insertions(+), 33 deletions(-)
diff --git a/benchmarks/README.md b/benchmarks/README.md
index 7e9818aef2..3aa4f4bb86 100644
--- a/benchmarks/README.md
+++ b/benchmarks/README.md
@@ -879,13 +879,13 @@ The benchmark includes queries that:
The sorted dataset is automatically generated from the ClickBench partitioned
dataset. You can configure the memory used during the sorting process with the
`DATAFUSION_MEMORY_GB` environment variable. The default memory limit is 12GB.
```bash
-./bench.sh data data_sorted_clickbench
+./bench.sh data clickbench_sorted
```
To create the sorted dataset, for example with 16GB of memory, run:
```bash
-DATAFUSION_MEMORY_GB=16 ./bench.sh data data_sorted_clickbench
+DATAFUSION_MEMORY_GB=16 ./bench.sh data clickbench_sorted
```
This command will:
@@ -896,7 +896,7 @@ This command will:
#### Running the Benchmark
```bash
-./bench.sh run data_sorted_clickbench
+./bench.sh run clickbench_sorted
```
This runs queries against the pre-sorted dataset with the `--sorted-by
EventTime` flag, which informs DataFusion that the data is pre-sorted, allowing
it to optimize away redundant sort operations.
diff --git a/datafusion/datasource-parquet/src/row_filter.rs
b/datafusion/datasource-parquet/src/row_filter.rs
index 42bdbfb992..c57bbdaeba 100644
--- a/datafusion/datasource-parquet/src/row_filter.rs
+++ b/datafusion/datasource-parquet/src/row_filter.rs
@@ -50,14 +50,9 @@
//! 2. Determine whether each predicate can be evaluated as an
`ArrowPredicate`.
//! 3. Determine, for each predicate, the total compressed size of all
//! columns required to evaluate the predicate.
-//! 4. Determine, for each predicate, whether all columns required to
-//! evaluate the expression are sorted.
-//! 5. Re-order the predicate by total size (from step 3).
-//! 6. Partition the predicates according to whether they are sorted (from
step 4)
-//! 7. "Compile" each predicate `Expr` to a `DatafusionArrowPredicate`.
-//! 8. Build the `RowFilter` with the sorted predicates followed by
-//! the unsorted predicates. Within each partition, predicates are
-//! still be sorted by size.
+//! 4. Re-order predicates by total size (from step 3).
+//! 5. "Compile" each predicate `Expr` to a `DatafusionArrowPredicate`.
+//! 6. Build the `RowFilter` from the ordered predicates.
//!
//! List-aware predicates (for example, `array_has`, `array_has_all`, and
//! `array_has_any`) can be evaluated directly during Parquet decoding.
@@ -70,7 +65,6 @@
//! - `WHERE s['value'] > 5` — pushed down (accesses a primitive leaf)
//! - `WHERE s IS NOT NULL` — not pushed down (references the whole struct)
-use std::cmp::Ordering;
use std::collections::BTreeSet;
use std::sync::Arc;
@@ -197,8 +191,6 @@ pub(crate) struct FilterCandidate {
/// the filter and to order the filters when `reorder_predicates` is true.
/// This is generated by summing the compressed size of all columns that
the filter references.
required_bytes: usize,
- /// Can this filter use an index (e.g. a page index) to prune rows?
- can_use_index: bool,
/// Column indices into the parquet file schema required to evaluate this
filter.
projection: LeafProjection,
/// The Arrow schema containing only the columns required by this filter,
@@ -273,12 +265,9 @@ impl FilterCandidateBuilder {
);
let required_bytes = size_of_columns(&leaf_indices, metadata)?;
- let can_use_index = columns_sorted(&leaf_indices, metadata)?;
-
Ok(Some(FilterCandidate {
expr: self.expr,
required_bytes,
- can_use_index,
projection: LeafProjection { leaf_indices },
filter_schema: projected_schema,
}))
@@ -817,16 +806,6 @@ fn size_of_columns(columns: &[usize], metadata:
&ParquetMetaData) -> Result<usiz
Ok(total_size)
}
-/// For a given set of `Column`s required for predicate `Expr` determine
whether
-/// all columns are sorted.
-///
-/// Sorted columns may be queried more efficiently in the presence of
-/// a PageIndex.
-fn columns_sorted(_columns: &[usize], _metadata: &ParquetMetaData) ->
Result<bool> {
- // TODO How do we know this?
- Ok(false)
-}
-
/// Build a [`RowFilter`] from the given predicate expression if possible.
///
/// # Arguments
@@ -881,12 +860,7 @@ pub fn build_row_filter(
}
if reorder_predicates {
- candidates.sort_unstable_by(|c1, c2| {
- match c1.can_use_index.cmp(&c2.can_use_index) {
- Ordering::Equal => c1.required_bytes.cmp(&c2.required_bytes),
- ord => ord,
- }
- });
+ candidates.sort_unstable_by_key(|c| c.required_bytes);
}
// To avoid double-counting metrics when multiple predicates are used:
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]