This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 016dfeb760d [SPARK-37527][SQL][FOLLOWUP] Cannot compile COVAR_POP, 
COVAR_SAMP and CORR in `H2Dialect` if them with `DISTINCT`
016dfeb760d is described below

commit 016dfeb760dbe1109e3c81c39bcd1bf3316a3e20
Author: Jiaan Geng <belie...@163.com>
AuthorDate: Thu Jul 7 09:55:45 2022 +0800

    [SPARK-37527][SQL][FOLLOWUP] Cannot compile COVAR_POP, COVAR_SAMP and CORR 
in `H2Dialect` if them with `DISTINCT`
    
    https://github.com/apache/spark/pull/35145 compile COVAR_POP, COVAR_SAMP 
and CORR in H2Dialect.
    Because H2 does't support COVAR_POP, COVAR_SAMP and CORR works with 
DISTINCT.
    So https://github.com/apache/spark/pull/35145 introduces a bug that compile 
COVAR_POP, COVAR_SAMP and CORR if these aggregate functions with DISTINCT.
    
    Fix bug that compile COVAR_POP, COVAR_SAMP and CORR if these aggregate 
functions with DISTINCT.
    
    'Yes'.
    Bug will be fix.
    
    New test cases.
    
    Closes #37090 from beliefer/SPARK-37527_followup2.
    
    Authored-by: Jiaan Geng <belie...@163.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
    (cherry picked from commit 14f2bae208c093dea58e3f947fb660e8345fb256)
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../org/apache/spark/sql/jdbc/H2Dialect.scala      | 15 ++++-----
 .../org/apache/spark/sql/jdbc/JDBCV2Suite.scala    | 38 +++++++++++++++-------
 2 files changed, 32 insertions(+), 21 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala
index 4a88203ec59..967df112af2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala
@@ -55,18 +55,15 @@ private object H2Dialect extends JdbcDialect {
           assert(f.children().length == 1)
           val distinct = if (f.isDistinct) "DISTINCT " else ""
           Some(s"STDDEV_SAMP($distinct${f.children().head})")
-        case f: GeneralAggregateFunc if f.name() == "COVAR_POP" =>
+        case f: GeneralAggregateFunc if f.name() == "COVAR_POP" && 
!f.isDistinct =>
           assert(f.children().length == 2)
-          val distinct = if (f.isDistinct) "DISTINCT " else ""
-          Some(s"COVAR_POP($distinct${f.children().head}, 
${f.children().last})")
-        case f: GeneralAggregateFunc if f.name() == "COVAR_SAMP" =>
+          Some(s"COVAR_POP(${f.children().head}, ${f.children().last})")
+        case f: GeneralAggregateFunc if f.name() == "COVAR_SAMP" && 
!f.isDistinct =>
           assert(f.children().length == 2)
-          val distinct = if (f.isDistinct) "DISTINCT " else ""
-          Some(s"COVAR_SAMP($distinct${f.children().head}, 
${f.children().last})")
-        case f: GeneralAggregateFunc if f.name() == "CORR" =>
+          Some(s"COVAR_SAMP(${f.children().head}, ${f.children().last})")
+        case f: GeneralAggregateFunc if f.name() == "CORR" && !f.isDistinct =>
           assert(f.children().length == 2)
-          val distinct = if (f.isDistinct) "DISTINCT " else ""
-          Some(s"CORR($distinct${f.children().head}, ${f.children().last})")
+          Some(s"CORR(${f.children().head}, ${f.children().last})")
         case _ => None
       }
     )
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
index 2f94f9ef31e..293334084af 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
@@ -1028,23 +1028,37 @@ class JDBCV2Suite extends QueryTest with 
SharedSparkSession with ExplainSuiteHel
   }
 
   test("scan with aggregate push-down: COVAR_POP COVAR_SAMP with filter and 
group by") {
-    val df = sql("select COVAR_POP(bonus, bonus), COVAR_SAMP(bonus, bonus)" +
-      " FROM h2.test.employee where dept > 0 group by DePt")
-    checkFiltersRemoved(df)
-    checkAggregateRemoved(df)
-    checkPushedInfo(df, "PushedAggregates: [COVAR_POP(BONUS, BONUS), 
COVAR_SAMP(BONUS, BONUS)], " +
+    val df1 = sql("SELECT COVAR_POP(bonus, bonus), COVAR_SAMP(bonus, bonus)" +
+      " FROM h2.test.employee WHERE dept > 0 GROUP BY DePt")
+    checkFiltersRemoved(df1)
+    checkAggregateRemoved(df1)
+    checkPushedInfo(df1, "PushedAggregates: [COVAR_POP(BONUS, BONUS), 
COVAR_SAMP(BONUS, BONUS)], " +
       "PushedFilters: [DEPT IS NOT NULL, DEPT > 0], PushedGroupByExpressions: 
[DEPT]")
-    checkAnswer(df, Seq(Row(10000d, 20000d), Row(2500d, 5000d), Row(0d, null)))
+    checkAnswer(df1, Seq(Row(10000d, 20000d), Row(2500d, 5000d), Row(0d, 
null)))
+
+    val df2 = sql("SELECT COVAR_POP(DISTINCT bonus, bonus), 
COVAR_SAMP(DISTINCT bonus, bonus)" +
+      " FROM h2.test.employee WHERE dept > 0 GROUP BY DePt")
+    checkFiltersRemoved(df2)
+    checkAggregateRemoved(df2, false)
+    checkPushedInfo(df2, "PushedFilters: [DEPT IS NOT NULL, DEPT > 0]")
+    checkAnswer(df2, Seq(Row(10000d, 20000d), Row(2500d, 5000d), Row(0d, 
null)))
   }
 
   test("scan with aggregate push-down: CORR with filter and group by") {
-    val df = sql("select CORR(bonus, bonus) FROM h2.test.employee where dept > 
0" +
-      " group by DePt")
-    checkFiltersRemoved(df)
-    checkAggregateRemoved(df)
-    checkPushedInfo(df, "PushedAggregates: [CORR(BONUS, BONUS)], " +
+    val df1 = sql("SELECT CORR(bonus, bonus) FROM h2.test.employee WHERE dept 
> 0" +
+      " GROUP BY DePt")
+    checkFiltersRemoved(df1)
+    checkAggregateRemoved(df1)
+    checkPushedInfo(df1, "PushedAggregates: [CORR(BONUS, BONUS)], " +
       "PushedFilters: [DEPT IS NOT NULL, DEPT > 0], PushedGroupByExpressions: 
[DEPT]")
-    checkAnswer(df, Seq(Row(1d), Row(1d), Row(null)))
+    checkAnswer(df1, Seq(Row(1d), Row(1d), Row(null)))
+
+    val df2 = sql("SELECT CORR(DISTINCT bonus, bonus) FROM h2.test.employee 
WHERE dept > 0" +
+      " GROUP BY DePt")
+    checkFiltersRemoved(df2)
+    checkAggregateRemoved(df2, false)
+    checkPushedInfo(df2, "PushedFilters: [DEPT IS NOT NULL, DEPT > 0]")
+    checkAnswer(df2, Seq(Row(1d), Row(1d), Row(null)))
   }
 
   test("scan with aggregate push-down: aggregate over alias push down") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to