This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 85f92ef6be Apply projection to `Statistics` in `FilterExec`  (#13187)
85f92ef6be is described below

commit 85f92ef6be1d7364857a2c13fbb026d9e45406ea
Author: Andrew Lamb <[email protected]>
AuthorDate: Sun Nov 3 07:07:47 2024 -0500

    Apply projection to `Statistics` in `FilterExec`  (#13187)
    
    * Apply projection to `Statistics` in `FilterExec`
    
    * Use Statistics::project in HashJoin
---
 datafusion/common/src/stats.rs                  | 20 ++++++++++
 datafusion/physical-plan/src/filter.rs          |  7 +++-
 datafusion/physical-plan/src/joins/hash_join.rs | 13 +------
 datafusion/sqllogictest/test_files/parquet.slt  | 49 +++++++++++++++++++++++++
 4 files changed, 77 insertions(+), 12 deletions(-)

diff --git a/datafusion/common/src/stats.rs b/datafusion/common/src/stats.rs
index e669c674f7..1aa42705e7 100644
--- a/datafusion/common/src/stats.rs
+++ b/datafusion/common/src/stats.rs
@@ -258,6 +258,26 @@ impl Statistics {
         self
     }
 
+    /// Project the statistics to the given column indices.
+    ///
+    /// For example, if we had statistics for columns `{"a", "b", "c"}`,
+    /// projecting to `vec![2, 1]` would return statistics for columns `{"c",
+    /// "b"}`.
+    pub fn project(mut self, projection: Option<&Vec<usize>>) -> Self {
+        let Some(projection) = projection else {
+            return self;
+        };
+
+        // todo: it would be nice to avoid cloning column statistics if
+        // possible (e.g. if the projection did not contain duplicates)
+        self.column_statistics = projection
+            .iter()
+            .map(|&i| self.column_statistics[i].clone())
+            .collect();
+
+        self
+    }
+
     /// Calculates the statistics after `fetch` and `skip` operations apply.
     /// Here, `self` denotes per-partition statistics. Use the `n_partitions`
     /// parameter to compute global statistics in a multi-partition setting.
diff --git a/datafusion/physical-plan/src/filter.rs 
b/datafusion/physical-plan/src/filter.rs
index 97d8159137..07898e8d22 100644
--- a/datafusion/physical-plan/src/filter.rs
+++ b/datafusion/physical-plan/src/filter.rs
@@ -371,7 +371,12 @@ impl ExecutionPlan for FilterExec {
     /// The output statistics of a filtering operation can be estimated if the
     /// predicate's selectivity value can be determined for the incoming data.
     fn statistics(&self) -> Result<Statistics> {
-        Self::statistics_helper(&self.input, self.predicate(), 
self.default_selectivity)
+        let stats = Self::statistics_helper(
+            &self.input,
+            self.predicate(),
+            self.default_selectivity,
+        )?;
+        Ok(stats.project(self.projection.as_ref()))
     }
 
     fn cardinality_effect(&self) -> CardinalityEffect {
diff --git a/datafusion/physical-plan/src/joins/hash_join.rs 
b/datafusion/physical-plan/src/joins/hash_join.rs
index 57d8a9ce7b..ae872e13a9 100644
--- a/datafusion/physical-plan/src/joins/hash_join.rs
+++ b/datafusion/physical-plan/src/joins/hash_join.rs
@@ -785,7 +785,7 @@ impl ExecutionPlan for HashJoinExec {
         // TODO stats: it is not possible in general to know the output size 
of joins
         // There are some special cases though, for example:
         // - `A LEFT JOIN B ON A.col=B.col` with 
`COUNT_DISTINCT(B.col)=COUNT(B.col)`
-        let mut stats = estimate_join_statistics(
+        let stats = estimate_join_statistics(
             Arc::clone(&self.left),
             Arc::clone(&self.right),
             self.on.clone(),
@@ -793,16 +793,7 @@ impl ExecutionPlan for HashJoinExec {
             &self.join_schema,
         )?;
         // Project statistics if there is a projection
-        if let Some(projection) = &self.projection {
-            stats.column_statistics = stats
-                .column_statistics
-                .into_iter()
-                .enumerate()
-                .filter(|(i, _)| projection.contains(i))
-                .map(|(_, s)| s)
-                .collect();
-        }
-        Ok(stats)
+        Ok(stats.project(self.projection.as_ref()))
     }
 }
 
diff --git a/datafusion/sqllogictest/test_files/parquet.slt 
b/datafusion/sqllogictest/test_files/parquet.slt
index ed963466fc..253ebb9ea0 100644
--- a/datafusion/sqllogictest/test_files/parquet.slt
+++ b/datafusion/sqllogictest/test_files/parquet.slt
@@ -549,3 +549,52 @@ FixedSizeBinary(16) 0166ce1d46129ad104fa4990c6057c91
 
 statement ok
 DROP TABLE test_non_utf8_binary;
+
+
+## Tests for https://github.com/apache/datafusion/issues/13186
+statement ok
+create table cpu (time timestamp, usage_idle float, usage_user float, cpu int);
+
+statement ok
+insert into cpu values ('1970-01-01 00:00:00', 1.0, 2.0, 3);
+
+# must put it into a parquet file to get statistics
+statement ok
+copy (select * from cpu) to 'test_files/scratch/parquet/cpu.parquet';
+
+# Run queries against parquet files
+statement ok
+create external table cpu_parquet
+stored as parquet
+location 'test_files/scratch/parquet/cpu.parquet';
+
+# Double filtering
+#
+# Expect 1 row for both queries
+query PI
+select time, rn
+from (
+  select time, row_number() OVER (ORDER BY usage_idle, time) as rn
+  from cpu
+  where cpu = 3
+) where rn > 0;
+----
+1970-01-01T00:00:00 1
+
+query PI
+select time, rn
+from (
+  select time, row_number() OVER (ORDER BY usage_idle, time) as rn
+  from cpu_parquet
+  where cpu = 3
+) where rn > 0;
+----
+1970-01-01T00:00:00 1
+
+
+# Clean up
+statement ok
+drop table cpu;
+
+statement ok
+drop table cpu_parquet;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to