This is an automated email from the ASF dual-hosted git repository.
zhangzc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new c8cfe118b1 [GLUTEN-7096] [CH] fix exception when same names in group
by (#7101)
c8cfe118b1 is described below
commit c8cfe118b19b2119ca7b5a24c743e0848c13805e
Author: shuai.xu <[email protected]>
AuthorDate: Tue Sep 24 14:47:25 2024 +0800
[GLUTEN-7096] [CH] fix exception when same names in group by (#7101)
* [GLUTEN-7096] [CH] fix exception when same names in group by
* move case to hive
* fix test failure
* fix test failure
---
.../clickhouse/CHSparkPlanExecApi.scala | 4 +-
.../execution/CHHashAggregateExecTransformer.scala | 10 +++++
.../scala/org/apache/gluten/utils/CHAggUtil.scala | 46 ++++++++++++++++++++++
.../hive/execution/GlutenHiveSQLQueryCHSuite.scala | 46 ++++++++++++++++++++++
.../hive/execution/GlutenHiveSQLQueryCHSuite.scala | 46 ++++++++++++++++++++++
.../hive/execution/GlutenHiveSQLQueryCHSuite.scala | 46 ++++++++++++++++++++++
.../hive/execution/GlutenHiveSQLQueryCHSuite.scala | 46 ++++++++++++++++++++++
7 files changed, 242 insertions(+), 2 deletions(-)
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
index 903523791a..659c3cbcad 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
@@ -26,7 +26,7 @@ import org.apache.gluten.extension.columnar.AddFallbackTagRule
import
org.apache.gluten.extension.columnar.MiscColumnarRules.TransformPreOverrides
import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.gluten.substrait.expression.{ExpressionBuilder,
ExpressionNode, WindowFunctionNode}
-import org.apache.gluten.utils.{CHJoinValidateUtil, UnknownJoinStrategy}
+import org.apache.gluten.utils.{CHAggUtil, CHJoinValidateUtil,
UnknownJoinStrategy}
import org.apache.gluten.vectorized.CHColumnarBatchSerializer
import org.apache.spark.ShuffleDependency
@@ -160,7 +160,7 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with
Logging {
child: SparkPlan): HashAggregateExecBaseTransformer =
CHHashAggregateExecTransformer(
requiredChildDistributionExpressions,
- groupingExpressions.distinct,
+ CHAggUtil.distinctIgnoreQualifier(groupingExpressions),
aggregateExpressions,
aggregateAttributes,
initialInputBufferOffset,
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHHashAggregateExecTransformer.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHHashAggregateExecTransformer.scala
index d641c05cd6..b8ddedd300 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHHashAggregateExecTransformer.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHHashAggregateExecTransformer.scala
@@ -26,6 +26,7 @@ import org.apache.gluten.substrait.{AggregationParams,
SubstraitContext}
import org.apache.gluten.substrait.expression.{AggregateFunctionNode,
ExpressionBuilder, ExpressionNode}
import org.apache.gluten.substrait.extensions.{AdvancedExtensionNode,
ExtensionBuilder}
import org.apache.gluten.substrait.rel.{RelBuilder, RelNode}
+import org.apache.gluten.utils.CHAggUtil
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
@@ -429,6 +430,15 @@ case class CHHashAggregateExecPullOutHelper(
aggregateAttr.toList
}
+ override def allAggregateResultAttributes(
+ groupingExpressions: Seq[NamedExpression]): List[Attribute] = {
+ if (aggregateExpressions.nonEmpty) {
+ super.allAggregateResultAttributes(groupingExpressions)
+ } else {
+
super.allAggregateResultAttributes(CHAggUtil.distinctIgnoreQualifier(groupingExpressions))
+ }
+ }
+
protected def getAttrForAggregateExpr(
exp: AggregateExpression,
aggregateAttributeList: Seq[Attribute],
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/utils/CHAggUtil.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/utils/CHAggUtil.scala
new file mode 100644
index 0000000000..ccab7c2950
--- /dev/null
+++ b/backends-clickhouse/src/main/scala/org/apache/gluten/utils/CHAggUtil.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.utils
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.expressions.NamedExpression
+
+import scala.util.control.Breaks.{break, breakable}
+
+object CHAggUtil extends Logging {
+ def distinctIgnoreQualifier(expressions: Seq[NamedExpression]):
Seq[NamedExpression] = {
+ var dist = List[NamedExpression]()
+ for (i <- expressions.indices) {
+ var k = -1
+ breakable {
+ for (j <- 0 to i - i)
+ if (
+ j != i &&
+ expressions(i).name == expressions(j).name &&
+ expressions(i).exprId == expressions(j).exprId &&
+ expressions(i).dataType == expressions(j).dataType &&
+ expressions(i).nullable == expressions(j).nullable
+ ) {
+ k = j
+ break
+ }
+ }
+ if (k < 0) dist = dist :+ expressions(i)
+ }
+ dist
+ }
+}
diff --git
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQueryCHSuite.scala
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQueryCHSuite.scala
index e7d573ca5e..958dbf6397 100644
---
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQueryCHSuite.scala
+++
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQueryCHSuite.scala
@@ -122,4 +122,50 @@ class GlutenHiveSQLQueryCHSuite extends
GlutenHiveSQLQuerySuiteBase {
ignoreIfNotExists = true,
purge = false)
}
+
+ testGluten("GLUTEN-7096: Same names in group by may cause exception") {
+ sql("create table if not exists test_7096 (day string, rtime int, uid
string, owner string)")
+ sql("insert into test_7096 values ('2024-09-01', 123, 'user1', 'owner1')")
+ sql("insert into test_7096 values ('2024-09-01', 123, 'user1', 'owner1')")
+ sql("insert into test_7096 values ('2024-09-02', 567, 'user2', 'owner2')")
+ val query =
+ """
+ |select days, rtime, uid, owner, day1
+ |from (
+ | select day1 as days, rtime, uid, owner, day1
+ | from (
+ | select distinct coalesce(day, "today") as day1, rtime, uid, owner
+ | from test_7096 where day = '2024-09-01'
+ | )) group by days, rtime, uid, owner, day1
+ |""".stripMargin
+ val df = sql(query)
+ checkAnswer(df, Seq(Row("2024-09-01", 123, "user1", "owner1",
"2024-09-01")))
+ spark.sessionState.catalog.dropTable(
+ TableIdentifier("test_7096"),
+ ignoreIfNotExists = true,
+ purge = false)
+ }
+
+ testGluten("GLUTEN-7096: Same names with different qualifier in group by may
cause exception") {
+ sql("create table if not exists test_7096 (day string, rtime int, uid
string, owner string)")
+ sql("insert into test_7096 values ('2024-09-01', 123, 'user1', 'owner1')")
+ sql("insert into test_7096 values ('2024-09-01', 123, 'user1', 'owner1')")
+ sql("insert into test_7096 values ('2024-09-02', 567, 'user2', 'owner2')")
+ val query =
+ """
+ |select days, rtime, uid, owner, day1
+ |from (
+ | select day1 as days, rtime, uid, owner, day1
+ | from (
+ | select distinct coalesce(day, "today") as day1, rtime, uid, owner
+ | from test_7096 where day = '2024-09-01'
+ | ) t1 ) t2 group by days, rtime, uid, owner, day1
+ |""".stripMargin
+ val df = sql(query)
+ checkAnswer(df, Seq(Row("2024-09-01", 123, "user1", "owner1",
"2024-09-01")))
+ spark.sessionState.catalog.dropTable(
+ TableIdentifier("test_7096"),
+ ignoreIfNotExists = true,
+ purge = false)
+ }
}
diff --git
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQueryCHSuite.scala
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQueryCHSuite.scala
index e7d573ca5e..958dbf6397 100644
---
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQueryCHSuite.scala
+++
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQueryCHSuite.scala
@@ -122,4 +122,50 @@ class GlutenHiveSQLQueryCHSuite extends
GlutenHiveSQLQuerySuiteBase {
ignoreIfNotExists = true,
purge = false)
}
+
+ testGluten("GLUTEN-7096: Same names in group by may cause exception") {
+ sql("create table if not exists test_7096 (day string, rtime int, uid
string, owner string)")
+ sql("insert into test_7096 values ('2024-09-01', 123, 'user1', 'owner1')")
+ sql("insert into test_7096 values ('2024-09-01', 123, 'user1', 'owner1')")
+ sql("insert into test_7096 values ('2024-09-02', 567, 'user2', 'owner2')")
+ val query =
+ """
+ |select days, rtime, uid, owner, day1
+ |from (
+ | select day1 as days, rtime, uid, owner, day1
+ | from (
+ | select distinct coalesce(day, "today") as day1, rtime, uid, owner
+ | from test_7096 where day = '2024-09-01'
+ | )) group by days, rtime, uid, owner, day1
+ |""".stripMargin
+ val df = sql(query)
+ checkAnswer(df, Seq(Row("2024-09-01", 123, "user1", "owner1",
"2024-09-01")))
+ spark.sessionState.catalog.dropTable(
+ TableIdentifier("test_7096"),
+ ignoreIfNotExists = true,
+ purge = false)
+ }
+
+ testGluten("GLUTEN-7096: Same names with different qualifier in group by may
cause exception") {
+ sql("create table if not exists test_7096 (day string, rtime int, uid
string, owner string)")
+ sql("insert into test_7096 values ('2024-09-01', 123, 'user1', 'owner1')")
+ sql("insert into test_7096 values ('2024-09-01', 123, 'user1', 'owner1')")
+ sql("insert into test_7096 values ('2024-09-02', 567, 'user2', 'owner2')")
+ val query =
+ """
+ |select days, rtime, uid, owner, day1
+ |from (
+ | select day1 as days, rtime, uid, owner, day1
+ | from (
+ | select distinct coalesce(day, "today") as day1, rtime, uid, owner
+ | from test_7096 where day = '2024-09-01'
+ | ) t1 ) t2 group by days, rtime, uid, owner, day1
+ |""".stripMargin
+ val df = sql(query)
+ checkAnswer(df, Seq(Row("2024-09-01", 123, "user1", "owner1",
"2024-09-01")))
+ spark.sessionState.catalog.dropTable(
+ TableIdentifier("test_7096"),
+ ignoreIfNotExists = true,
+ purge = false)
+ }
}
diff --git
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQueryCHSuite.scala
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQueryCHSuite.scala
index e7d573ca5e..958dbf6397 100644
---
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQueryCHSuite.scala
+++
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQueryCHSuite.scala
@@ -122,4 +122,50 @@ class GlutenHiveSQLQueryCHSuite extends
GlutenHiveSQLQuerySuiteBase {
ignoreIfNotExists = true,
purge = false)
}
+
+ testGluten("GLUTEN-7096: Same names in group by may cause exception") {
+ sql("create table if not exists test_7096 (day string, rtime int, uid
string, owner string)")
+ sql("insert into test_7096 values ('2024-09-01', 123, 'user1', 'owner1')")
+ sql("insert into test_7096 values ('2024-09-01', 123, 'user1', 'owner1')")
+ sql("insert into test_7096 values ('2024-09-02', 567, 'user2', 'owner2')")
+ val query =
+ """
+ |select days, rtime, uid, owner, day1
+ |from (
+ | select day1 as days, rtime, uid, owner, day1
+ | from (
+ | select distinct coalesce(day, "today") as day1, rtime, uid, owner
+ | from test_7096 where day = '2024-09-01'
+ | )) group by days, rtime, uid, owner, day1
+ |""".stripMargin
+ val df = sql(query)
+ checkAnswer(df, Seq(Row("2024-09-01", 123, "user1", "owner1",
"2024-09-01")))
+ spark.sessionState.catalog.dropTable(
+ TableIdentifier("test_7096"),
+ ignoreIfNotExists = true,
+ purge = false)
+ }
+
+ testGluten("GLUTEN-7096: Same names with different qualifier in group by may
cause exception") {
+ sql("create table if not exists test_7096 (day string, rtime int, uid
string, owner string)")
+ sql("insert into test_7096 values ('2024-09-01', 123, 'user1', 'owner1')")
+ sql("insert into test_7096 values ('2024-09-01', 123, 'user1', 'owner1')")
+ sql("insert into test_7096 values ('2024-09-02', 567, 'user2', 'owner2')")
+ val query =
+ """
+ |select days, rtime, uid, owner, day1
+ |from (
+ | select day1 as days, rtime, uid, owner, day1
+ | from (
+ | select distinct coalesce(day, "today") as day1, rtime, uid, owner
+ | from test_7096 where day = '2024-09-01'
+ | ) t1 ) t2 group by days, rtime, uid, owner, day1
+ |""".stripMargin
+ val df = sql(query)
+ checkAnswer(df, Seq(Row("2024-09-01", 123, "user1", "owner1",
"2024-09-01")))
+ spark.sessionState.catalog.dropTable(
+ TableIdentifier("test_7096"),
+ ignoreIfNotExists = true,
+ purge = false)
+ }
}
diff --git
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQueryCHSuite.scala
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQueryCHSuite.scala
index e7d573ca5e..958dbf6397 100644
---
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQueryCHSuite.scala
+++
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQueryCHSuite.scala
@@ -122,4 +122,50 @@ class GlutenHiveSQLQueryCHSuite extends
GlutenHiveSQLQuerySuiteBase {
ignoreIfNotExists = true,
purge = false)
}
+
+ testGluten("GLUTEN-7096: Same names in group by may cause exception") {
+ sql("create table if not exists test_7096 (day string, rtime int, uid
string, owner string)")
+ sql("insert into test_7096 values ('2024-09-01', 123, 'user1', 'owner1')")
+ sql("insert into test_7096 values ('2024-09-01', 123, 'user1', 'owner1')")
+ sql("insert into test_7096 values ('2024-09-02', 567, 'user2', 'owner2')")
+ val query =
+ """
+ |select days, rtime, uid, owner, day1
+ |from (
+ | select day1 as days, rtime, uid, owner, day1
+ | from (
+ | select distinct coalesce(day, "today") as day1, rtime, uid, owner
+ | from test_7096 where day = '2024-09-01'
+ | )) group by days, rtime, uid, owner, day1
+ |""".stripMargin
+ val df = sql(query)
+ checkAnswer(df, Seq(Row("2024-09-01", 123, "user1", "owner1",
"2024-09-01")))
+ spark.sessionState.catalog.dropTable(
+ TableIdentifier("test_7096"),
+ ignoreIfNotExists = true,
+ purge = false)
+ }
+
+ testGluten("GLUTEN-7096: Same names with different qualifier in group by may
cause exception") {
+ sql("create table if not exists test_7096 (day string, rtime int, uid
string, owner string)")
+ sql("insert into test_7096 values ('2024-09-01', 123, 'user1', 'owner1')")
+ sql("insert into test_7096 values ('2024-09-01', 123, 'user1', 'owner1')")
+ sql("insert into test_7096 values ('2024-09-02', 567, 'user2', 'owner2')")
+ val query =
+ """
+ |select days, rtime, uid, owner, day1
+ |from (
+ | select day1 as days, rtime, uid, owner, day1
+ | from (
+ | select distinct coalesce(day, "today") as day1, rtime, uid, owner
+ | from test_7096 where day = '2024-09-01'
+ | ) t1 ) t2 group by days, rtime, uid, owner, day1
+ |""".stripMargin
+ val df = sql(query)
+ checkAnswer(df, Seq(Row("2024-09-01", 123, "user1", "owner1",
"2024-09-01")))
+ spark.sessionState.catalog.dropTable(
+ TableIdentifier("test_7096"),
+ ignoreIfNotExists = true,
+ purge = false)
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]