This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 85f92ef6be Apply projection to `Statistics` in `FilterExec` (#13187)
85f92ef6be is described below
commit 85f92ef6be1d7364857a2c13fbb026d9e45406ea
Author: Andrew Lamb <[email protected]>
AuthorDate: Sun Nov 3 07:07:47 2024 -0500
Apply projection to `Statistics` in `FilterExec` (#13187)
* Apply projection to `Statistics` in `FilterExec`
* Use Statistics::project in HashJoin
---
datafusion/common/src/stats.rs | 20 ++++++++++
datafusion/physical-plan/src/filter.rs | 7 +++-
datafusion/physical-plan/src/joins/hash_join.rs | 13 +------
datafusion/sqllogictest/test_files/parquet.slt | 49 +++++++++++++++++++++++++
4 files changed, 77 insertions(+), 12 deletions(-)
diff --git a/datafusion/common/src/stats.rs b/datafusion/common/src/stats.rs
index e669c674f7..1aa42705e7 100644
--- a/datafusion/common/src/stats.rs
+++ b/datafusion/common/src/stats.rs
@@ -258,6 +258,26 @@ impl Statistics {
self
}
+ /// Project the statistics to the given column indices.
+ ///
+ /// For example, if we had statistics for columns `{"a", "b", "c"}`,
+ /// projecting to `vec![2, 1]` would return statistics for columns `{"c",
+ /// "b"}`.
+ pub fn project(mut self, projection: Option<&Vec<usize>>) -> Self {
+ let Some(projection) = projection else {
+ return self;
+ };
+
+ // todo: it would be nice to avoid cloning column statistics if
+ // possible (e.g. if the projection did not contain duplicates)
+ self.column_statistics = projection
+ .iter()
+ .map(|&i| self.column_statistics[i].clone())
+ .collect();
+
+ self
+ }
+
/// Calculates the statistics after `fetch` and `skip` operations apply.
/// Here, `self` denotes per-partition statistics. Use the `n_partitions`
/// parameter to compute global statistics in a multi-partition setting.
diff --git a/datafusion/physical-plan/src/filter.rs
b/datafusion/physical-plan/src/filter.rs
index 97d8159137..07898e8d22 100644
--- a/datafusion/physical-plan/src/filter.rs
+++ b/datafusion/physical-plan/src/filter.rs
@@ -371,7 +371,12 @@ impl ExecutionPlan for FilterExec {
/// The output statistics of a filtering operation can be estimated if the
/// predicate's selectivity value can be determined for the incoming data.
fn statistics(&self) -> Result<Statistics> {
- Self::statistics_helper(&self.input, self.predicate(),
self.default_selectivity)
+ let stats = Self::statistics_helper(
+ &self.input,
+ self.predicate(),
+ self.default_selectivity,
+ )?;
+ Ok(stats.project(self.projection.as_ref()))
}
fn cardinality_effect(&self) -> CardinalityEffect {
diff --git a/datafusion/physical-plan/src/joins/hash_join.rs
b/datafusion/physical-plan/src/joins/hash_join.rs
index 57d8a9ce7b..ae872e13a9 100644
--- a/datafusion/physical-plan/src/joins/hash_join.rs
+++ b/datafusion/physical-plan/src/joins/hash_join.rs
@@ -785,7 +785,7 @@ impl ExecutionPlan for HashJoinExec {
// TODO stats: it is not possible in general to know the output size
of joins
// There are some special cases though, for example:
// - `A LEFT JOIN B ON A.col=B.col` with
`COUNT_DISTINCT(B.col)=COUNT(B.col)`
- let mut stats = estimate_join_statistics(
+ let stats = estimate_join_statistics(
Arc::clone(&self.left),
Arc::clone(&self.right),
self.on.clone(),
@@ -793,16 +793,7 @@ impl ExecutionPlan for HashJoinExec {
&self.join_schema,
)?;
// Project statistics if there is a projection
- if let Some(projection) = &self.projection {
- stats.column_statistics = stats
- .column_statistics
- .into_iter()
- .enumerate()
- .filter(|(i, _)| projection.contains(i))
- .map(|(_, s)| s)
- .collect();
- }
- Ok(stats)
+ Ok(stats.project(self.projection.as_ref()))
}
}
diff --git a/datafusion/sqllogictest/test_files/parquet.slt
b/datafusion/sqllogictest/test_files/parquet.slt
index ed963466fc..253ebb9ea0 100644
--- a/datafusion/sqllogictest/test_files/parquet.slt
+++ b/datafusion/sqllogictest/test_files/parquet.slt
@@ -549,3 +549,52 @@ FixedSizeBinary(16) 0166ce1d46129ad104fa4990c6057c91
statement ok
DROP TABLE test_non_utf8_binary;
+
+
+## Tests for https://github.com/apache/datafusion/issues/13186
+statement ok
+create table cpu (time timestamp, usage_idle float, usage_user float, cpu int);
+
+statement ok
+insert into cpu values ('1970-01-01 00:00:00', 1.0, 2.0, 3);
+
+# must put it into a parquet file to get statistics
+statement ok
+copy (select * from cpu) to 'test_files/scratch/parquet/cpu.parquet';
+
+# Run queries against parquet files
+statement ok
+create external table cpu_parquet
+stored as parquet
+location 'test_files/scratch/parquet/cpu.parquet';
+
+# Double filtering
+#
+# Expect 1 row for both queries
+query PI
+select time, rn
+from (
+ select time, row_number() OVER (ORDER BY usage_idle, time) as rn
+ from cpu
+ where cpu = 3
+) where rn > 0;
+----
+1970-01-01T00:00:00 1
+
+query PI
+select time, rn
+from (
+ select time, row_number() OVER (ORDER BY usage_idle, time) as rn
+ from cpu_parquet
+ where cpu = 3
+) where rn > 0;
+----
+1970-01-01T00:00:00 1
+
+
+# Clean up
+statement ok
+drop table cpu;
+
+statement ok
+drop table cpu_parquet;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]