This is an automated email from the ASF dual-hosted git repository.

lgbo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 5002106d28 [GLUTEN-8142][CH] Duplicated columns in group by (#8164)
5002106d28 is described below

commit 5002106d28a3ab782c5286438277fc15cd5686a5
Author: lgbo <[email protected]>
AuthorDate: Fri Dec 6 16:50:59 2024 +0800

    [GLUTEN-8142][CH] Duplicated columns in group by (#8164)
    
    * fixed: duplicated cols in group by
    
    * do not change hash shuffle
---
 .../gluten/backendsapi/clickhouse/CHRuleApi.scala  |   1 +
 .../clickhouse/CHSparkPlanExecApi.scala            |   4 +-
 .../gluten/extension/RemoveDuplicatedColumns.scala | 126 +++++++++++++++++++++
 .../execution/GlutenClickHouseTPCHSuite.scala      |  20 ++++
 4 files changed, 149 insertions(+), 2 deletions(-)

diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
index 98cfa0e754..1417786889 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
@@ -118,6 +118,7 @@ object CHRuleApi {
           
SparkPlanRules.extendedColumnarRule(c.glutenConf.extendedColumnarTransformRules)(
             c.session)))
     injector.injectPostTransform(c => 
InsertTransitions.create(c.outputsColumnar, CHBatch))
+    injector.injectPostTransform(c => RemoveDuplicatedColumns.apply(c.session))
 
     // Gluten columnar: Fallback policies.
     injector.injectFallbackPolicy(
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
index c2f91fa152..de0680df10 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
@@ -164,11 +164,11 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with 
Logging {
       resultExpressions)
     CHHashAggregateExecTransformer(
       requiredChildDistributionExpressions,
-      groupingExpressions.distinct,
+      groupingExpressions,
       aggregateExpressions,
       aggregateAttributes,
       initialInputBufferOffset,
-      replacedResultExpressions.distinct,
+      replacedResultExpressions,
       child
     )
   }
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/RemoveDuplicatedColumns.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/RemoveDuplicatedColumns.scala
new file mode 100644
index 0000000000..7f378b5a41
--- /dev/null
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/RemoveDuplicatedColumns.scala
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.extension
+
+import org.apache.gluten.execution._
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.CHColumnarToRowExec
+
+/*
+ * CH doesn't support will for duplicate columns in the a block.
+ * Most of the cases that introduce duplicate columns are from group by.
+ */
+case class RemoveDuplicatedColumns(session: SparkSession) extends 
Rule[SparkPlan] with Logging {
+  override def apply(plan: SparkPlan): SparkPlan = {
+    visitPlan(plan)
+  }
+
+  private def visitPlan(plan: SparkPlan): SparkPlan = {
+    plan match {
+      case c2r @ CHColumnarToRowExec(hashAgg: CHHashAggregateExecTransformer) 
=>
+        // This is a special case. Use the result from aggregation as the 
input of sink.
+        // We need to make the schema same as the input of sink.
+        val newChildren = hashAgg.children.map(visitPlan)
+        val newHashAgg = uniqueHashAggregateColumns(hashAgg)
+          .withNewChildren(newChildren)
+          .asInstanceOf[CHHashAggregateExecTransformer]
+        if (newHashAgg.resultExpressions.length != 
hashAgg.resultExpressions.length) {
+          val project = ProjectExecTransformer(hashAgg.resultExpressions, 
newHashAgg)
+          c2r.copy(child = project)
+        } else {
+          c2r.copy(child = newHashAgg)
+        }
+      case hashAgg: CHHashAggregateExecTransformer =>
+        val newChildren = hashAgg.children.map(visitPlan)
+        val newHashAgg = uniqueHashAggregateColumns(hashAgg)
+        newHashAgg.withNewChildren(newChildren)
+      case _ =>
+        plan.withNewChildren(plan.children.map(visitPlan))
+    }
+  }
+
+  private def unwrapAliasNamedExpression(e: NamedExpression): NamedExpression 
= {
+    e match {
+      case a: Alias =>
+        if (a.child.isInstanceOf[NamedExpression]) {
+          a.child.asInstanceOf[NamedExpression]
+        } else {
+          a
+        }
+      case _ => e
+    }
+  }
+  private def unwrapAliasExpression(e: Expression): Expression = {
+    e match {
+      case a: Alias =>
+        if (a.child.isInstanceOf[Expression]) {
+          a.child.asInstanceOf[Expression]
+        } else {
+          a
+        }
+      case _ => e
+    }
+  }
+
+  private def uniqueNamedExpressions(
+      groupingExpressions: Seq[NamedExpression]): Seq[NamedExpression] = {
+    var addedExpression = Seq[NamedExpression]()
+    groupingExpressions.foreach {
+      e =>
+        val unwrapped = unwrapAliasNamedExpression(e)
+        if (
+          !addedExpression.exists(_.semanticEquals(unwrapped)) && 
!unwrapped.isInstanceOf[Literal]
+        ) {
+          addedExpression = addedExpression :+ unwrapped
+        }
+    }
+    addedExpression
+  }
+
+  private def uniqueExpressions(expressions: Seq[Expression]): Seq[Expression] 
= {
+    var addedExpression = Seq[Expression]()
+    expressions.foreach {
+      e =>
+        val unwrapped = unwrapAliasExpression(e)
+        if (
+          !addedExpression.exists(_.semanticEquals(unwrapped)) && 
!unwrapped.isInstanceOf[Literal]
+        ) {
+          addedExpression = addedExpression :+ unwrapped
+        }
+    }
+    addedExpression
+  }
+
+  private def uniqueHashAggregateColumns(
+      hashAgg: CHHashAggregateExecTransformer): CHHashAggregateExecTransformer 
= {
+    val newGroupingExpressions = 
uniqueNamedExpressions(hashAgg.groupingExpressions)
+    val newResultExpressions = 
uniqueNamedExpressions(hashAgg.resultExpressions)
+    if (newResultExpressions.length != hashAgg.resultExpressions.length) {
+      hashAgg
+        .copy(
+          groupingExpressions = newGroupingExpressions,
+          resultExpressions = newResultExpressions)
+    } else {
+      hashAgg
+    }
+  }
+}
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHSuite.scala
index bbe51ef389..65a01dea30 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHSuite.scala
@@ -570,5 +570,25 @@ class GlutenClickHouseTPCHSuite extends 
GlutenClickHouseTPCHAbstractSuite {
       ", split(concat('a|b|c', cast(id as string)), '|') from range(10)"
     compareResultsAgainstVanillaSpark(sql, true, { _ => })
   }
+  test("GLUTEN-8142 duplicated columns in group by") {
+    sql("create table test_8142 (day string, rtime int, uid string, owner 
string) using parquet")
+    sql("insert into test_8142 values ('2024-09-01', 123, 'user1', 'owner1')")
+    sql("insert into test_8142 values ('2024-09-01', 123, 'user1', 'owner1')")
+    sql("insert into test_8142 values ('2024-09-02', 567, 'user2', 'owner2')")
+    compareResultsAgainstVanillaSpark(
+      """
+        |select days, rtime, uid, owner, day1
+        |from (
+        | select day1 as days, rtime, uid, owner, day1
+        | from (
+        |   select distinct coalesce(day, "today") as day1, rtime, uid, owner
+        |   from test_8142 where day = '2024-09-01'
+        | )) group by days, rtime, uid, owner, day1
+        |""".stripMargin,
+      true,
+      { _ => }
+    )
+    sql("drop table test_8142")
+  }
 }
 // scalastyle:off line.size.limit


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to