alamb commented on code in PR #4162:
URL: https://github.com/apache/arrow-datafusion/pull/4162#discussion_r1023174920


##########
datafusion/core/tests/statistics.rs:
##########
@@ -238,8 +238,9 @@ async fn sql_filter() -> Result<()> {
         .await
         .unwrap();
 
-    // with a filtering condition we loose all knowledge about the statistics
-    assert_eq!(Statistics::default(), physical_plan.statistics());
+    let stats = physical_plan.statistics();
+    assert!(!stats.is_exact);
+    assert_eq!(stats.num_rows, Some(1));

Review Comment:
   Nice



##########
datafusion/core/src/physical_plan/filter.rs:
##########
@@ -380,4 +403,108 @@ mod tests {
 
         Ok(())
     }
+
+    #[tokio::test]
+    async fn test_filter_statistics_basic_expr() -> Result<()> {
+        // Table:
+        //      a: min=1, max=100
+        let schema = Schema::new(vec![Field::new("a", DataType::Int32, 
false)]);
+        let input = Arc::new(StatisticsExec::new(
+            Statistics {
+                num_rows: Some(100),
+                column_statistics: Some(vec![ColumnStatistics {
+                    min_value: Some(ScalarValue::Int32(Some(1))),
+                    max_value: Some(ScalarValue::Int32(Some(100))),
+                    ..Default::default()
+                }]),
+                ..Default::default()
+            },
+            schema.clone(),
+        ));
+
+        // a <= 25
+        let predicate: Arc<dyn PhysicalExpr> =
+            binary(col("a", &schema)?, Operator::LtEq, lit(25i32), &schema)?;
+
+        // WHERE a <= 25
+        let filter: Arc<dyn ExecutionPlan> =
+            Arc::new(FilterExec::try_new(predicate, input)?);
+
+        let statistics = filter.statistics();
+        assert_eq!(statistics.num_rows, Some(25));

Review Comment:
   👨‍🍳 👌 
   
   Very nice



##########
datafusion/core/src/physical_plan/filter.rs:
##########
@@ -168,9 +168,27 @@ impl ExecutionPlan for FilterExec {
         Some(self.metrics.clone_inner())
     }
 
-    /// The output statistics of a filtering operation are unknown
+    /// The output statistics of a filtering operation can be estimated if the
+    /// predicate's selectivity value can be determined for the incoming data.
     fn statistics(&self) -> Statistics {
-        Statistics::default()
+        let input_stats = self.input.statistics();
+        let analysis_ctx =
+            AnalysisContext::from_statistics(self.input.schema().as_ref(), 
&input_stats);
+
+        let predicate_selectivity = self
+            .predicate
+            .boundaries(&analysis_ctx)
+            .and_then(|bounds| bounds.selectivity);
+
+        match predicate_selectivity {
+            Some(selectivity) => Statistics {
+                num_rows: input_stats
+                    .num_rows
+                    .map(|num_rows| (num_rows as f64 * selectivity).ceil() as 
usize),
+                ..Default::default()

Review Comment:
   I wonder if we should explicitly list out `is_exact: false` here? 
`Default::default()` gets the same result but maybe being explicit would be 
better 🤔 



##########
datafusion/core/src/physical_plan/filter.rs:
##########
@@ -380,4 +403,108 @@ mod tests {
 
         Ok(())
     }
+
+    #[tokio::test]
+    async fn test_filter_statistics_basic_expr() -> Result<()> {
+        // Table:
+        //      a: min=1, max=100
+        let schema = Schema::new(vec![Field::new("a", DataType::Int32, 
false)]);
+        let input = Arc::new(StatisticsExec::new(
+            Statistics {
+                num_rows: Some(100),
+                column_statistics: Some(vec![ColumnStatistics {
+                    min_value: Some(ScalarValue::Int32(Some(1))),
+                    max_value: Some(ScalarValue::Int32(Some(100))),
+                    ..Default::default()
+                }]),
+                ..Default::default()
+            },
+            schema.clone(),
+        ));
+
+        // a <= 25
+        let predicate: Arc<dyn PhysicalExpr> =
+            binary(col("a", &schema)?, Operator::LtEq, lit(25i32), &schema)?;
+
+        // WHERE a <= 25
+        let filter: Arc<dyn ExecutionPlan> =
+            Arc::new(FilterExec::try_new(predicate, input)?);
+
+        let statistics = filter.statistics();
+        assert_eq!(statistics.num_rows, Some(25));
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    #[ignore]
+    // This test requires propagation of column boundaries from the comparison 
analysis
+    // to the analysis context. This is not yet implemented.
+    async fn test_filter_statistics_column_level_basic_expr() -> Result<()> {

Review Comment:
   I don't understand what about this test requires column level analysis -- 
your figure has a join in it, but thietest just seems to be the same as 
`test_filter_statistics_basic_expr` above it. I will look at 
https://github.com/isidentical/arrow-datafusion/pull/5 shortly



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to