This is an automated email from the ASF dual-hosted git repository.

hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 20d108a5c [CH] Move project transformer rewriting code to CH backend 
(#5171)
20d108a5c is described below

commit 20d108a5ccc4e6dd2bebcfedffed448043568235
Author: Hongze Zhang <[email protected]>
AuthorDate: Fri Mar 29 15:47:30 2024 +0800

    [CH] Move project transformer rewriting code to CH backend (#5171)
---
 .../clickhouse/CHSparkPlanExecApi.scala            | 43 +++++++++++++++++++++
 .../backendsapi/SparkPlanExecApi.scala             |  5 +++
 .../BasicPhysicalOperatorTransformer.scala         | 45 +++-------------------
 3 files changed, 54 insertions(+), 39 deletions(-)

diff --git 
a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHSparkPlanExecApi.scala
 
b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHSparkPlanExecApi.scala
index 781884ad5..eef27665f 100644
--- 
a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHSparkPlanExecApi.scala
+++ 
b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHSparkPlanExecApi.scala
@@ -108,6 +108,49 @@ class CHSparkPlanExecApi extends SparkPlanExecApi {
     RowToCHNativeColumnarExec(child)
   }
 
+  override def genProjectExecTransformer(
+      projectList: Seq[NamedExpression],
+      child: SparkPlan): ProjectExecTransformer = {
+    def processProjectExecTransformer(projectList: Seq[NamedExpression]): 
Seq[NamedExpression] = {
+      // When there is a MergeScalarSubqueries which will create the 
named_struct with the
+      // same name, looks like {'bloomFilter', BF1, 'bloomFilter', BF2}
+      // or {'count(1)', count(1)#111L, 'avg(a)', avg(a)#222L, 'count(1)', 
count(1)#333L},
+      // it will cause problem for ClickHouse backend,
+      // which cannot tolerate duplicate type names in struct type,
+      // so we need to rename 'nameExpr' in the named_struct to make them 
unique
+      // after executing the MergeScalarSubqueries.
+      var needToReplace = false
+      val newProjectList = projectList.map {
+        case alias @ Alias(cns @ CreateNamedStruct(children: Seq[Expression]), 
"mergedValue") =>
+          // check whether there are some duplicate names
+          if (cns.nameExprs.distinct.size == cns.nameExprs.size) {
+            alias
+          } else {
+            val newChildren = children
+              .grouped(2)
+              .flatMap {
+                case Seq(name: Literal, value: NamedExpression) =>
+                  val newLiteral = Literal(name.toString() + "#" + 
value.exprId.id)
+                  Seq(newLiteral, value)
+                case Seq(name, value) => Seq(name, value)
+              }
+              .toSeq
+            needToReplace = true
+            Alias.apply(CreateNamedStruct(newChildren), 
"mergedValue")(alias.exprId)
+          }
+        case other: NamedExpression => other
+      }
+
+      if (!needToReplace) {
+        projectList
+      } else {
+        newProjectList
+      }
+    }
+
+    
ProjectExecTransformer.createUnsafe(processProjectExecTransformer(projectList), 
child)
+  }
+
   /**
    * Generate FilterExecTransformer.
    *
diff --git 
a/gluten-core/src/main/scala/io/glutenproject/backendsapi/SparkPlanExecApi.scala
 
b/gluten-core/src/main/scala/io/glutenproject/backendsapi/SparkPlanExecApi.scala
index 759f7cfad..711b49645 100644
--- 
a/gluten-core/src/main/scala/io/glutenproject/backendsapi/SparkPlanExecApi.scala
+++ 
b/gluten-core/src/main/scala/io/glutenproject/backendsapi/SparkPlanExecApi.scala
@@ -85,6 +85,11 @@ trait SparkPlanExecApi {
   def genHiveTableScanExecTransformer(plan: SparkPlan): 
HiveTableScanExecTransformer =
     HiveTableScanExecTransformer(plan)
 
+  def genProjectExecTransformer(
+      projectList: Seq[NamedExpression],
+      child: SparkPlan): ProjectExecTransformer =
+    ProjectExecTransformer.createUnsafe(projectList, child)
+
   /** Generate HashAggregateExecTransformer. */
   def genHashAggregateExecTransformer(
       requiredChildDistributionExpressions: Option[Seq[Expression]],
diff --git 
a/gluten-core/src/main/scala/io/glutenproject/execution/BasicPhysicalOperatorTransformer.scala
 
b/gluten-core/src/main/scala/io/glutenproject/execution/BasicPhysicalOperatorTransformer.scala
index 9fa252016..3a32b5eb8 100644
--- 
a/gluten-core/src/main/scala/io/glutenproject/execution/BasicPhysicalOperatorTransformer.scala
+++ 
b/gluten-core/src/main/scala/io/glutenproject/execution/BasicPhysicalOperatorTransformer.scala
@@ -248,47 +248,14 @@ case class ProjectExecTransformer private (projectList: 
Seq[NamedExpression], ch
     copy(child = newChild)
 }
 object ProjectExecTransformer {
-  private def processProjectExecTransformer(
-      projectList: Seq[NamedExpression]): Seq[NamedExpression] = {
-
-    // When there is a MergeScalarSubqueries which will create the 
named_struct with the same name,
-    // looks like {'bloomFilter', BF1, 'bloomFilter', BF2}
-    // or {'count(1)', count(1)#111L, 'avg(a)', avg(a)#222L, 'count(1)', 
count(1)#333L},
-    // it will cause problem for some backends, e.g. ClickHouse,
-    // which cannot tolerate duplicate type names in struct type,
-    // so we need to rename 'nameExpr' in the named_struct to make them unique
-    // after executing the MergeScalarSubqueries.
-    var needToReplace = false
-    val newProjectList = projectList.map {
-      case alias @ Alias(cns @ CreateNamedStruct(children: Seq[Expression]), 
"mergedValue") =>
-        // check whether there are some duplicate names
-        if (cns.nameExprs.distinct.size == cns.nameExprs.size) {
-          alias
-        } else {
-          val newChildren = children
-            .grouped(2)
-            .flatMap {
-              case Seq(name: Literal, value: NamedExpression) =>
-                val newLiteral = Literal(name.toString() + "#" + 
value.exprId.id)
-                Seq(newLiteral, value)
-              case Seq(name, value) => Seq(name, value)
-            }
-            .toSeq
-          needToReplace = true
-          Alias.apply(CreateNamedStruct(newChildren), 
"mergedValue")(alias.exprId)
-        }
-      case other: NamedExpression => other
-    }
-
-    if (!needToReplace) {
-      projectList
-    } else {
-      newProjectList
-    }
+  def apply(projectList: Seq[NamedExpression], child: SparkPlan): 
ProjectExecTransformer = {
+    
BackendsApiManager.getSparkPlanExecApiInstance.genProjectExecTransformer(projectList,
 child)
   }
 
-  def apply(projectList: Seq[NamedExpression], child: SparkPlan): 
ProjectExecTransformer =
-    new ProjectExecTransformer(processProjectExecTransformer(projectList), 
child)
+  // Directly creating a project transformer may not be considered safe since 
some backends, E.g.,
+  // Clickhouse may require to intercept the instantiation procedure.
+  def createUnsafe(projectList: Seq[NamedExpression], child: SparkPlan): 
ProjectExecTransformer =
+    new ProjectExecTransformer(projectList, child)
 }
 
 // An alternatives for UnionExec.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to