This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 20d108a5c [CH] Move project transformer rewriting code to CH backend
(#5171)
20d108a5c is described below
commit 20d108a5ccc4e6dd2bebcfedffed448043568235
Author: Hongze Zhang <[email protected]>
AuthorDate: Fri Mar 29 15:47:30 2024 +0800
[CH] Move project transformer rewriting code to CH backend (#5171)
---
.../clickhouse/CHSparkPlanExecApi.scala | 43 +++++++++++++++++++++
.../backendsapi/SparkPlanExecApi.scala | 5 +++
.../BasicPhysicalOperatorTransformer.scala | 45 +++-------------------
3 files changed, 54 insertions(+), 39 deletions(-)
diff --git
a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHSparkPlanExecApi.scala
b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHSparkPlanExecApi.scala
index 781884ad5..eef27665f 100644
---
a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHSparkPlanExecApi.scala
+++
b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHSparkPlanExecApi.scala
@@ -108,6 +108,49 @@ class CHSparkPlanExecApi extends SparkPlanExecApi {
RowToCHNativeColumnarExec(child)
}
+ override def genProjectExecTransformer(
+ projectList: Seq[NamedExpression],
+ child: SparkPlan): ProjectExecTransformer = {
+ def processProjectExecTransformer(projectList: Seq[NamedExpression]):
Seq[NamedExpression] = {
+ // When there is a MergeScalarSubqueries which will create the
named_struct with the
+ // same name, looks like {'bloomFilter', BF1, 'bloomFilter', BF2}
+ // or {'count(1)', count(1)#111L, 'avg(a)', avg(a)#222L, 'count(1)',
count(1)#333L},
+ // it will cause problem for ClickHouse backend,
+ // which cannot tolerate duplicate type names in struct type,
+ // so we need to rename 'nameExpr' in the named_struct to make them
unique
+ // after executing the MergeScalarSubqueries.
+ var needToReplace = false
+ val newProjectList = projectList.map {
+ case alias @ Alias(cns @ CreateNamedStruct(children: Seq[Expression]),
"mergedValue") =>
+ // check whether there are some duplicate names
+ if (cns.nameExprs.distinct.size == cns.nameExprs.size) {
+ alias
+ } else {
+ val newChildren = children
+ .grouped(2)
+ .flatMap {
+ case Seq(name: Literal, value: NamedExpression) =>
+ val newLiteral = Literal(name.toString() + "#" +
value.exprId.id)
+ Seq(newLiteral, value)
+ case Seq(name, value) => Seq(name, value)
+ }
+ .toSeq
+ needToReplace = true
+ Alias.apply(CreateNamedStruct(newChildren),
"mergedValue")(alias.exprId)
+ }
+ case other: NamedExpression => other
+ }
+
+ if (!needToReplace) {
+ projectList
+ } else {
+ newProjectList
+ }
+ }
+
+
ProjectExecTransformer.createUnsafe(processProjectExecTransformer(projectList),
child)
+ }
+
/**
* Generate FilterExecTransformer.
*
diff --git
a/gluten-core/src/main/scala/io/glutenproject/backendsapi/SparkPlanExecApi.scala
b/gluten-core/src/main/scala/io/glutenproject/backendsapi/SparkPlanExecApi.scala
index 759f7cfad..711b49645 100644
---
a/gluten-core/src/main/scala/io/glutenproject/backendsapi/SparkPlanExecApi.scala
+++
b/gluten-core/src/main/scala/io/glutenproject/backendsapi/SparkPlanExecApi.scala
@@ -85,6 +85,11 @@ trait SparkPlanExecApi {
def genHiveTableScanExecTransformer(plan: SparkPlan):
HiveTableScanExecTransformer =
HiveTableScanExecTransformer(plan)
+ def genProjectExecTransformer(
+ projectList: Seq[NamedExpression],
+ child: SparkPlan): ProjectExecTransformer =
+ ProjectExecTransformer.createUnsafe(projectList, child)
+
/** Generate HashAggregateExecTransformer. */
def genHashAggregateExecTransformer(
requiredChildDistributionExpressions: Option[Seq[Expression]],
diff --git
a/gluten-core/src/main/scala/io/glutenproject/execution/BasicPhysicalOperatorTransformer.scala
b/gluten-core/src/main/scala/io/glutenproject/execution/BasicPhysicalOperatorTransformer.scala
index 9fa252016..3a32b5eb8 100644
---
a/gluten-core/src/main/scala/io/glutenproject/execution/BasicPhysicalOperatorTransformer.scala
+++
b/gluten-core/src/main/scala/io/glutenproject/execution/BasicPhysicalOperatorTransformer.scala
@@ -248,47 +248,14 @@ case class ProjectExecTransformer private (projectList:
Seq[NamedExpression], ch
copy(child = newChild)
}
object ProjectExecTransformer {
- private def processProjectExecTransformer(
- projectList: Seq[NamedExpression]): Seq[NamedExpression] = {
-
- // When there is a MergeScalarSubqueries which will create the
named_struct with the same name,
- // looks like {'bloomFilter', BF1, 'bloomFilter', BF2}
- // or {'count(1)', count(1)#111L, 'avg(a)', avg(a)#222L, 'count(1)',
count(1)#333L},
- // it will cause problem for some backends, e.g. ClickHouse,
- // which cannot tolerate duplicate type names in struct type,
- // so we need to rename 'nameExpr' in the named_struct to make them unique
- // after executing the MergeScalarSubqueries.
- var needToReplace = false
- val newProjectList = projectList.map {
- case alias @ Alias(cns @ CreateNamedStruct(children: Seq[Expression]),
"mergedValue") =>
- // check whether there are some duplicate names
- if (cns.nameExprs.distinct.size == cns.nameExprs.size) {
- alias
- } else {
- val newChildren = children
- .grouped(2)
- .flatMap {
- case Seq(name: Literal, value: NamedExpression) =>
- val newLiteral = Literal(name.toString() + "#" +
value.exprId.id)
- Seq(newLiteral, value)
- case Seq(name, value) => Seq(name, value)
- }
- .toSeq
- needToReplace = true
- Alias.apply(CreateNamedStruct(newChildren),
"mergedValue")(alias.exprId)
- }
- case other: NamedExpression => other
- }
-
- if (!needToReplace) {
- projectList
- } else {
- newProjectList
- }
+ def apply(projectList: Seq[NamedExpression], child: SparkPlan):
ProjectExecTransformer = {
+
BackendsApiManager.getSparkPlanExecApiInstance.genProjectExecTransformer(projectList,
child)
}
- def apply(projectList: Seq[NamedExpression], child: SparkPlan):
ProjectExecTransformer =
- new ProjectExecTransformer(processProjectExecTransformer(projectList),
child)
+ // Directly creating a project transformer may not be considered safe since
some backends, E.g.,
+ // Clickhouse may require to intercept the instantiation procedure.
+ def createUnsafe(projectList: Seq[NamedExpression], child: SparkPlan):
ProjectExecTransformer =
+ new ProjectExecTransformer(projectList, child)
}
// An alternatives for UnionExec.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]