This is an automated email from the ASF dual-hosted git repository.

zhangzc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new c8cfe118b1 [GLUTEN-7096] [CH] fix exception when same names in group 
by (#7101)
c8cfe118b1 is described below

commit c8cfe118b19b2119ca7b5a24c743e0848c13805e
Author: shuai.xu <[email protected]>
AuthorDate: Tue Sep 24 14:47:25 2024 +0800

    [GLUTEN-7096] [CH] fix exception when same names in group by (#7101)
    
    * [GLUTEN-7096] [CH] fix exception when same names in group by
    
    * move case to hive
    
    * fix test failure
    
    * fix test failure
---
 .../clickhouse/CHSparkPlanExecApi.scala            |  4 +-
 .../execution/CHHashAggregateExecTransformer.scala | 10 +++++
 .../scala/org/apache/gluten/utils/CHAggUtil.scala  | 46 ++++++++++++++++++++++
 .../hive/execution/GlutenHiveSQLQueryCHSuite.scala | 46 ++++++++++++++++++++++
 .../hive/execution/GlutenHiveSQLQueryCHSuite.scala | 46 ++++++++++++++++++++++
 .../hive/execution/GlutenHiveSQLQueryCHSuite.scala | 46 ++++++++++++++++++++++
 .../hive/execution/GlutenHiveSQLQueryCHSuite.scala | 46 ++++++++++++++++++++++
 7 files changed, 242 insertions(+), 2 deletions(-)

diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
index 903523791a..659c3cbcad 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
@@ -26,7 +26,7 @@ import org.apache.gluten.extension.columnar.AddFallbackTagRule
 import 
org.apache.gluten.extension.columnar.MiscColumnarRules.TransformPreOverrides
 import org.apache.gluten.sql.shims.SparkShimLoader
 import org.apache.gluten.substrait.expression.{ExpressionBuilder, 
ExpressionNode, WindowFunctionNode}
-import org.apache.gluten.utils.{CHJoinValidateUtil, UnknownJoinStrategy}
+import org.apache.gluten.utils.{CHAggUtil, CHJoinValidateUtil, 
UnknownJoinStrategy}
 import org.apache.gluten.vectorized.CHColumnarBatchSerializer
 
 import org.apache.spark.ShuffleDependency
@@ -160,7 +160,7 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with 
Logging {
       child: SparkPlan): HashAggregateExecBaseTransformer =
     CHHashAggregateExecTransformer(
       requiredChildDistributionExpressions,
-      groupingExpressions.distinct,
+      CHAggUtil.distinctIgnoreQualifier(groupingExpressions),
       aggregateExpressions,
       aggregateAttributes,
       initialInputBufferOffset,
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHHashAggregateExecTransformer.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHHashAggregateExecTransformer.scala
index d641c05cd6..b8ddedd300 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHHashAggregateExecTransformer.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHHashAggregateExecTransformer.scala
@@ -26,6 +26,7 @@ import org.apache.gluten.substrait.{AggregationParams, 
SubstraitContext}
 import org.apache.gluten.substrait.expression.{AggregateFunctionNode, 
ExpressionBuilder, ExpressionNode}
 import org.apache.gluten.substrait.extensions.{AdvancedExtensionNode, 
ExtensionBuilder}
 import org.apache.gluten.substrait.rel.{RelBuilder, RelNode}
+import org.apache.gluten.utils.CHAggUtil
 
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate._
@@ -429,6 +430,15 @@ case class CHHashAggregateExecPullOutHelper(
     aggregateAttr.toList
   }
 
+  override def allAggregateResultAttributes(
+      groupingExpressions: Seq[NamedExpression]): List[Attribute] = {
+    if (aggregateExpressions.nonEmpty) {
+      super.allAggregateResultAttributes(groupingExpressions)
+    } else {
+      
super.allAggregateResultAttributes(CHAggUtil.distinctIgnoreQualifier(groupingExpressions))
+    }
+  }
+
   protected def getAttrForAggregateExpr(
       exp: AggregateExpression,
       aggregateAttributeList: Seq[Attribute],
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/utils/CHAggUtil.scala 
b/backends-clickhouse/src/main/scala/org/apache/gluten/utils/CHAggUtil.scala
new file mode 100644
index 0000000000..ccab7c2950
--- /dev/null
+++ b/backends-clickhouse/src/main/scala/org/apache/gluten/utils/CHAggUtil.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.utils
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.expressions.NamedExpression
+
+import scala.util.control.Breaks.{break, breakable}
+
+object CHAggUtil extends Logging {
+  def distinctIgnoreQualifier(expressions: Seq[NamedExpression]): 
Seq[NamedExpression] = {
+    var dist = List[NamedExpression]()
+    for (i <- expressions.indices) {
+      var k = -1
+      breakable {
+        for (j <- 0 to i - i)
+          if (
+            j != i &&
+            expressions(i).name == expressions(j).name &&
+            expressions(i).exprId == expressions(j).exprId &&
+            expressions(i).dataType == expressions(j).dataType &&
+            expressions(i).nullable == expressions(j).nullable
+          ) {
+            k = j
+            break
+          }
+      }
+      if (k < 0) dist = dist :+ expressions(i)
+    }
+    dist
+  }
+}
diff --git 
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQueryCHSuite.scala
 
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQueryCHSuite.scala
index e7d573ca5e..958dbf6397 100644
--- 
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQueryCHSuite.scala
+++ 
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQueryCHSuite.scala
@@ -122,4 +122,50 @@ class GlutenHiveSQLQueryCHSuite extends 
GlutenHiveSQLQuerySuiteBase {
       ignoreIfNotExists = true,
       purge = false)
   }
+
+  testGluten("GLUTEN-7096: Same names in group by may cause exception") {
+    sql("create table if not exists test_7096 (day string, rtime int, uid 
string, owner string)")
+    sql("insert into test_7096 values ('2024-09-01', 123, 'user1', 'owner1')")
+    sql("insert into test_7096 values ('2024-09-01', 123, 'user1', 'owner1')")
+    sql("insert into test_7096 values ('2024-09-02', 567, 'user2', 'owner2')")
+    val query =
+      """
+        |select days, rtime, uid, owner, day1
+        |from (
+        | select day1 as days, rtime, uid, owner, day1
+        | from (
+        |   select distinct coalesce(day, "today") as day1, rtime, uid, owner
+        |   from test_7096 where day = '2024-09-01'
+        | )) group by days, rtime, uid, owner, day1
+        |""".stripMargin
+    val df = sql(query)
+    checkAnswer(df, Seq(Row("2024-09-01", 123, "user1", "owner1", 
"2024-09-01")))
+    spark.sessionState.catalog.dropTable(
+      TableIdentifier("test_7096"),
+      ignoreIfNotExists = true,
+      purge = false)
+  }
+
+  testGluten("GLUTEN-7096: Same names with different qualifier in group by may 
cause exception") {
+    sql("create table if not exists test_7096 (day string, rtime int, uid 
string, owner string)")
+    sql("insert into test_7096 values ('2024-09-01', 123, 'user1', 'owner1')")
+    sql("insert into test_7096 values ('2024-09-01', 123, 'user1', 'owner1')")
+    sql("insert into test_7096 values ('2024-09-02', 567, 'user2', 'owner2')")
+    val query =
+      """
+        |select days, rtime, uid, owner, day1
+        |from (
+        | select day1 as days, rtime, uid, owner, day1
+        | from (
+        |   select distinct coalesce(day, "today") as day1, rtime, uid, owner
+        |   from test_7096 where day = '2024-09-01'
+        | ) t1 ) t2 group by days, rtime, uid, owner, day1
+        |""".stripMargin
+    val df = sql(query)
+    checkAnswer(df, Seq(Row("2024-09-01", 123, "user1", "owner1", 
"2024-09-01")))
+    spark.sessionState.catalog.dropTable(
+      TableIdentifier("test_7096"),
+      ignoreIfNotExists = true,
+      purge = false)
+  }
 }
diff --git 
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQueryCHSuite.scala
 
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQueryCHSuite.scala
index e7d573ca5e..958dbf6397 100644
--- 
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQueryCHSuite.scala
+++ 
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQueryCHSuite.scala
@@ -122,4 +122,50 @@ class GlutenHiveSQLQueryCHSuite extends 
GlutenHiveSQLQuerySuiteBase {
       ignoreIfNotExists = true,
       purge = false)
   }
+
+  testGluten("GLUTEN-7096: Same names in group by may cause exception") {
+    sql("create table if not exists test_7096 (day string, rtime int, uid 
string, owner string)")
+    sql("insert into test_7096 values ('2024-09-01', 123, 'user1', 'owner1')")
+    sql("insert into test_7096 values ('2024-09-01', 123, 'user1', 'owner1')")
+    sql("insert into test_7096 values ('2024-09-02', 567, 'user2', 'owner2')")
+    val query =
+      """
+        |select days, rtime, uid, owner, day1
+        |from (
+        | select day1 as days, rtime, uid, owner, day1
+        | from (
+        |   select distinct coalesce(day, "today") as day1, rtime, uid, owner
+        |   from test_7096 where day = '2024-09-01'
+        | )) group by days, rtime, uid, owner, day1
+        |""".stripMargin
+    val df = sql(query)
+    checkAnswer(df, Seq(Row("2024-09-01", 123, "user1", "owner1", 
"2024-09-01")))
+    spark.sessionState.catalog.dropTable(
+      TableIdentifier("test_7096"),
+      ignoreIfNotExists = true,
+      purge = false)
+  }
+
+  testGluten("GLUTEN-7096: Same names with different qualifier in group by may 
cause exception") {
+    sql("create table if not exists test_7096 (day string, rtime int, uid 
string, owner string)")
+    sql("insert into test_7096 values ('2024-09-01', 123, 'user1', 'owner1')")
+    sql("insert into test_7096 values ('2024-09-01', 123, 'user1', 'owner1')")
+    sql("insert into test_7096 values ('2024-09-02', 567, 'user2', 'owner2')")
+    val query =
+      """
+        |select days, rtime, uid, owner, day1
+        |from (
+        | select day1 as days, rtime, uid, owner, day1
+        | from (
+        |   select distinct coalesce(day, "today") as day1, rtime, uid, owner
+        |   from test_7096 where day = '2024-09-01'
+        | ) t1 ) t2 group by days, rtime, uid, owner, day1
+        |""".stripMargin
+    val df = sql(query)
+    checkAnswer(df, Seq(Row("2024-09-01", 123, "user1", "owner1", 
"2024-09-01")))
+    spark.sessionState.catalog.dropTable(
+      TableIdentifier("test_7096"),
+      ignoreIfNotExists = true,
+      purge = false)
+  }
 }
diff --git 
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQueryCHSuite.scala
 
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQueryCHSuite.scala
index e7d573ca5e..958dbf6397 100644
--- 
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQueryCHSuite.scala
+++ 
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQueryCHSuite.scala
@@ -122,4 +122,50 @@ class GlutenHiveSQLQueryCHSuite extends 
GlutenHiveSQLQuerySuiteBase {
       ignoreIfNotExists = true,
       purge = false)
   }
+
+  testGluten("GLUTEN-7096: Same names in group by may cause exception") {
+    sql("create table if not exists test_7096 (day string, rtime int, uid 
string, owner string)")
+    sql("insert into test_7096 values ('2024-09-01', 123, 'user1', 'owner1')")
+    sql("insert into test_7096 values ('2024-09-01', 123, 'user1', 'owner1')")
+    sql("insert into test_7096 values ('2024-09-02', 567, 'user2', 'owner2')")
+    val query =
+      """
+        |select days, rtime, uid, owner, day1
+        |from (
+        | select day1 as days, rtime, uid, owner, day1
+        | from (
+        |   select distinct coalesce(day, "today") as day1, rtime, uid, owner
+        |   from test_7096 where day = '2024-09-01'
+        | )) group by days, rtime, uid, owner, day1
+        |""".stripMargin
+    val df = sql(query)
+    checkAnswer(df, Seq(Row("2024-09-01", 123, "user1", "owner1", 
"2024-09-01")))
+    spark.sessionState.catalog.dropTable(
+      TableIdentifier("test_7096"),
+      ignoreIfNotExists = true,
+      purge = false)
+  }
+
+  testGluten("GLUTEN-7096: Same names with different qualifier in group by may 
cause exception") {
+    sql("create table if not exists test_7096 (day string, rtime int, uid 
string, owner string)")
+    sql("insert into test_7096 values ('2024-09-01', 123, 'user1', 'owner1')")
+    sql("insert into test_7096 values ('2024-09-01', 123, 'user1', 'owner1')")
+    sql("insert into test_7096 values ('2024-09-02', 567, 'user2', 'owner2')")
+    val query =
+      """
+        |select days, rtime, uid, owner, day1
+        |from (
+        | select day1 as days, rtime, uid, owner, day1
+        | from (
+        |   select distinct coalesce(day, "today") as day1, rtime, uid, owner
+        |   from test_7096 where day = '2024-09-01'
+        | ) t1 ) t2 group by days, rtime, uid, owner, day1
+        |""".stripMargin
+    val df = sql(query)
+    checkAnswer(df, Seq(Row("2024-09-01", 123, "user1", "owner1", 
"2024-09-01")))
+    spark.sessionState.catalog.dropTable(
+      TableIdentifier("test_7096"),
+      ignoreIfNotExists = true,
+      purge = false)
+  }
 }
diff --git 
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQueryCHSuite.scala
 
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQueryCHSuite.scala
index e7d573ca5e..958dbf6397 100644
--- 
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQueryCHSuite.scala
+++ 
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQueryCHSuite.scala
@@ -122,4 +122,50 @@ class GlutenHiveSQLQueryCHSuite extends 
GlutenHiveSQLQuerySuiteBase {
       ignoreIfNotExists = true,
       purge = false)
   }
+
+  testGluten("GLUTEN-7096: Same names in group by may cause exception") {
+    sql("create table if not exists test_7096 (day string, rtime int, uid 
string, owner string)")
+    sql("insert into test_7096 values ('2024-09-01', 123, 'user1', 'owner1')")
+    sql("insert into test_7096 values ('2024-09-01', 123, 'user1', 'owner1')")
+    sql("insert into test_7096 values ('2024-09-02', 567, 'user2', 'owner2')")
+    val query =
+      """
+        |select days, rtime, uid, owner, day1
+        |from (
+        | select day1 as days, rtime, uid, owner, day1
+        | from (
+        |   select distinct coalesce(day, "today") as day1, rtime, uid, owner
+        |   from test_7096 where day = '2024-09-01'
+        | )) group by days, rtime, uid, owner, day1
+        |""".stripMargin
+    val df = sql(query)
+    checkAnswer(df, Seq(Row("2024-09-01", 123, "user1", "owner1", 
"2024-09-01")))
+    spark.sessionState.catalog.dropTable(
+      TableIdentifier("test_7096"),
+      ignoreIfNotExists = true,
+      purge = false)
+  }
+
+  testGluten("GLUTEN-7096: Same names with different qualifier in group by may 
cause exception") {
+    sql("create table if not exists test_7096 (day string, rtime int, uid 
string, owner string)")
+    sql("insert into test_7096 values ('2024-09-01', 123, 'user1', 'owner1')")
+    sql("insert into test_7096 values ('2024-09-01', 123, 'user1', 'owner1')")
+    sql("insert into test_7096 values ('2024-09-02', 567, 'user2', 'owner2')")
+    val query =
+      """
+        |select days, rtime, uid, owner, day1
+        |from (
+        | select day1 as days, rtime, uid, owner, day1
+        | from (
+        |   select distinct coalesce(day, "today") as day1, rtime, uid, owner
+        |   from test_7096 where day = '2024-09-01'
+        | ) t1 ) t2 group by days, rtime, uid, owner, day1
+        |""".stripMargin
+    val df = sql(query)
+    checkAnswer(df, Seq(Row("2024-09-01", 123, "user1", "owner1", 
"2024-09-01")))
+    spark.sessionState.catalog.dropTable(
+      TableIdentifier("test_7096"),
+      ignoreIfNotExists = true,
+      purge = false)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to