alamb commented on code in PR #5430:
URL: https://github.com/apache/arrow-datafusion/pull/5430#discussion_r1120401410


##########
datafusion/optimizer/src/decorrelate_where_exists.rs:
##########
@@ -670,4 +678,56 @@ mod tests {
 
         assert_plan_eq(&plan, expected)
     }
+
+    #[test]
+    fn exists_subquery_non_distinct_aggregate() -> Result<()> {
+        let table_scan = test_table_scan()?;
+        let subquery_scan = test_table_scan_with_name("sq")?;
+
+        let subquery = LogicalPlanBuilder::from(subquery_scan)
+            .filter(col("sq.a").gt(col("test.b")))?
+            .project(vec![col("sq.a"), col("sq.c")])?
+            .aggregate(vec![col("a"), col("c")], vec![count(col("a"))])?
+            .build()?;
+        let plan = LogicalPlanBuilder::from(table_scan)
+            .filter(exists(Arc::new(subquery)))?
+            .project(vec![col("test.b")])?
+            .build()?;
+
+        // Should not be optimized to join.
+        let expected = "Projection: test.b [b:UInt32]\
+                        \n  Filter: EXISTS (<subquery>) [a:UInt32, b:UInt32, 
c:UInt32]\
+                        \n    Subquery: [a:UInt32, c:UInt32, 
COUNT(sq.a):Int64;N]\
+                        \n      Aggregate: groupBy=[[sq.a, sq.c]], 
aggr=[[COUNT(sq.a)]] [a:UInt32, c:UInt32, COUNT(sq.a):Int64;N]\
+                        \n        Projection: sq.a, sq.c [a:UInt32, c:UInt32]\
+                        \n          Filter: sq.a > test.b [a:UInt32, b:UInt32, 
c:UInt32]\
+                        \n            TableScan: sq [a:UInt32, b:UInt32, 
c:UInt32]\
+                        \n    TableScan: test [a:UInt32, b:UInt32, c:UInt32]";
+
+        assert_plan_eq(&plan, expected)
+    }
+
+    #[test]
+    fn exists_subquery_aggragte_distinct() -> Result<()> {

Review Comment:
   ```suggestion
       fn exists_subquery_aggregate_distinct() -> Result<()> {
   ```



##########
datafusion/expr/src/logical_plan/plan.rs:
##########
@@ -1776,6 +1776,40 @@ impl Aggregate {
             _ => plan_err!("Could not coerce into Aggregate!"),
         }
     }
+
+    /// Check whether it is a Distinct.
+    /// A Distinct means all fields of the schema are the expressions of group 
by.

Review Comment:
   I am not quite sure what this means -- is this check designed to check the 
input expressions to the GroupBy or the output expressions?
   
   If the output expressions, here would be an alternate description
   
   ```suggestion
       /// Return true if the output values are distinct (have no duplicates)
       ///
       /// In order for this to return true, all fields of the output schema 
must be expressions of the group by
   ```



##########
datafusion/optimizer/src/replace_distinct_aggregate.rs:
##########
@@ -100,4 +173,164 @@ mod tests {
             expected,
         )
     }
+
+    #[test]
+    fn replace_single_distinct_where_in() -> datafusion_common::Result<()> {
+        let table_scan = test_table_scan()?;
+        let subquery_scan = test_table_scan_with_name("sq")?;
+
+        // distinct in where-in subquery
+        let subquery = LogicalPlanBuilder::from(subquery_scan)
+            .filter(col("test.a").eq(col("sq.a")))?
+            .project(vec![col("sq.b")])?
+            .distinct()?
+            .build()?;
+
+        let plan = LogicalPlanBuilder::from(table_scan)
+            .filter(in_subquery(col("test.c"), Arc::new(subquery)))?
+            .project(vec![col("test.b")])?
+            .build()?;
+
+        let expected = "Projection: test.b\
+                       \n  Filter: test.c IN (<subquery>)\
+                       \n    Subquery:\
+                       \n      Aggregate: groupBy=[[sq.b]], aggr=[[]]\
+                       \n        Projection: sq.b\
+                       \n          Filter: test.a = sq.a\
+                       \n            TableScan: sq\
+                       \n    TableScan: test";
+
+        assert_optimized_plan_eq(
+            Arc::new(ReplaceDistinctWithAggregate::new()),
+            &plan,
+            expected,
+        )
+    }
+
+    #[test]
+    fn replace_distinct_in_where_in() -> datafusion_common::Result<()> {
+        let table_scan = test_table_scan()?;
+        let subquery_scan = test_table_scan_with_name("sq")?;
+
+        // distinct in where-in subquery
+        let subquery = LogicalPlanBuilder::from(subquery_scan)
+            .filter(col("test.a").eq(col("sq.a")))?
+            .project(vec![col("sq.b"), col("sq.c")])?

Review Comment:
   In these tests too I recommend projecting an expression (not just columns)



##########
datafusion/optimizer/src/decorrelate_where_exists.rs:
##########
@@ -670,4 +678,56 @@ mod tests {
 
         assert_plan_eq(&plan, expected)
     }
+
+    #[test]
+    fn exists_subquery_non_distinct_aggregate() -> Result<()> {
+        let table_scan = test_table_scan()?;
+        let subquery_scan = test_table_scan_with_name("sq")?;
+
+        let subquery = LogicalPlanBuilder::from(subquery_scan)
+            .filter(col("sq.a").gt(col("test.b")))?
+            .project(vec![col("sq.a"), col("sq.c")])?
+            .aggregate(vec![col("a"), col("c")], vec![count(col("a"))])?
+            .build()?;
+        let plan = LogicalPlanBuilder::from(table_scan)
+            .filter(exists(Arc::new(subquery)))?
+            .project(vec![col("test.b")])?
+            .build()?;
+
+        // Should not be optimized to join.
+        let expected = "Projection: test.b [b:UInt32]\
+                        \n  Filter: EXISTS (<subquery>) [a:UInt32, b:UInt32, 
c:UInt32]\
+                        \n    Subquery: [a:UInt32, c:UInt32, 
COUNT(sq.a):Int64;N]\
+                        \n      Aggregate: groupBy=[[sq.a, sq.c]], 
aggr=[[COUNT(sq.a)]] [a:UInt32, c:UInt32, COUNT(sq.a):Int64;N]\
+                        \n        Projection: sq.a, sq.c [a:UInt32, c:UInt32]\
+                        \n          Filter: sq.a > test.b [a:UInt32, b:UInt32, 
c:UInt32]\
+                        \n            TableScan: sq [a:UInt32, b:UInt32, 
c:UInt32]\
+                        \n    TableScan: test [a:UInt32, b:UInt32, c:UInt32]";
+
+        assert_plan_eq(&plan, expected)
+    }
+
+    #[test]
+    fn exists_subquery_aggragte_distinct() -> Result<()> {
+        let table_scan = test_table_scan()?;
+        let subquery_scan = test_table_scan_with_name("sq")?;
+
+        let subquery = LogicalPlanBuilder::from(subquery_scan)
+            .filter(col("sq.a").gt(col("test.b")))?
+            .project(vec![col("sq.a"), col("sq.c")])?

Review Comment:
   Can you add a test for:
   1. When there is no projection in the subquery?
   2. When the projection in the subquery is an expression (like `sq.a + sq.b`)?



##########
datafusion/expr/src/logical_plan/plan.rs:
##########
@@ -1776,6 +1776,40 @@ impl Aggregate {
             _ => plan_err!("Could not coerce into Aggregate!"),
         }
     }
+
+    /// Check whether it is a Distinct.
+    /// A Distinct means all fields of the schema are the expressions of group 
by.

Review Comment:
   is this correct? Or is this check designed to check the input expressions 
only?



##########
datafusion/expr/src/logical_plan/plan.rs:
##########
@@ -1776,6 +1776,40 @@ impl Aggregate {
             _ => plan_err!("Could not coerce into Aggregate!"),
         }
     }
+
+    /// Check whether it is a Distinct.
+    /// A Distinct means all fields of the schema are the expressions of group 
by.
+    pub fn is_distinct(&self) -> datafusion_common::Result<bool> {
+        let group_expr_size = self.group_expr.len();
+        if !self.aggr_expr.is_empty() || group_expr_size != 
self.schema.fields().len() {

Review Comment:
   wouldn't `SELECT a ... GROUP BY a, a` be distinct even though the number of 
group exprs didn't match? Maybe this case isn't important to handle now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to