This is an automated email from the ASF dual-hosted git repository.
zhangzc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 1dfbdb43f [GLUTEN-5405][CH] Add rewrite todate function (#5406)
1dfbdb43f is described below
commit 1dfbdb43f8c2d853db8357051e942df2184859fc
Author: Shuai li <[email protected]>
AuthorDate: Fri Apr 19 11:03:33 2024 +0800
[GLUTEN-5405][CH] Add rewrite todate function (#5406)
[CH] Add rewrite todate function
---
.../clickhouse/CHSparkPlanExecApi.scala | 6 +-
.../GlutenClickHouseTPCHNullableSuite.scala | 30 +++++++
.../benchmarks/CHOptimizeRuleBenchmark.scala | 78 +++++++++++++++++++
.../gluten/expression/ExpressionConverter.scala | 2 +-
...ansform.scala => TimestampAddTransformer.scala} | 2 +-
.../extension/RewriteToDateExpresstionRule.scala | 91 ++++++++++++++++++++++
.../scala/org/apache/gluten/GlutenConfig.scala | 13 ++++
7 files changed, 218 insertions(+), 4 deletions(-)
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
index a2f7ae984..fb564c9e2 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
@@ -22,7 +22,7 @@ import org.apache.gluten.exception.GlutenNotSupportException
import org.apache.gluten.execution._
import org.apache.gluten.expression._
import org.apache.gluten.expression.ConverterUtils.FunctionConfig
-import org.apache.gluten.extension.{CountDistinctWithoutExpand,
FallbackBroadcastHashJoin, FallbackBroadcastHashJoinPrepQueryStage}
+import org.apache.gluten.extension.{CountDistinctWithoutExpand,
FallbackBroadcastHashJoin, FallbackBroadcastHashJoinPrepQueryStage,
RewriteToDateExpresstionRule}
import org.apache.gluten.extension.columnar.AddTransformHintRule
import
org.apache.gluten.extension.columnar.MiscColumnarRules.TransformPreOverrides
import org.apache.gluten.substrait.expression.{ExpressionBuilder,
ExpressionNode, WindowFunctionNode}
@@ -573,7 +573,9 @@ class CHSparkPlanExecApi extends SparkPlanExecApi {
* @return
*/
override def genExtendedAnalyzers(): List[SparkSession => Rule[LogicalPlan]]
= {
- List(spark => new RewriteDateTimestampComparisonRule(spark,
spark.sessionState.conf))
+ List(
+ spark => new RewriteToDateExpresstionRule(spark,
spark.sessionState.conf),
+ spark => new RewriteDateTimestampComparisonRule(spark,
spark.sessionState.conf))
}
/**
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHNullableSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHNullableSuite.scala
index 1241f0bcd..fe6afedf4 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHNullableSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHNullableSuite.scala
@@ -16,6 +16,8 @@
*/
package org.apache.gluten.execution
+import org.apache.gluten.GlutenConfig
+
import org.apache.spark.SparkConf
import org.apache.spark.sql.catalyst.optimizer.BuildLeft
@@ -210,4 +212,32 @@ class GlutenClickHouseTPCHNullableSuite extends
GlutenClickHouseTPCHAbstractSuit
runSql(sql, noFallBack = true) { _ => }
}
}
+
+ test("test rewrite date conversion") {
+ val sqlStr =
+ """
+ |SELECT
+ |to_date(
+ | from_unixtime(
+ | unix_timestamp(date_format(l_shipdate, 'yyyyMMdd'), 'yyyyMMdd')
+ | )
+ |)
+ |FROM lineitem
+ |limit 10
+ |""".stripMargin
+
+ Seq(("true", false), ("false", true)).foreach(
+ conf => {
+ withSQLConf((GlutenConfig.ENABLE_CH_REWRITE_DATE_CONVERSION.key,
conf._1)) {
+ runSql(sqlStr)(
+ df => {
+ val project = df.queryExecution.executedPlan.collect {
+ case project: ProjectExecTransformer => project
+ }
+ assert(project.size == 1)
+
assert(project.apply(0).projectList.toString().contains("from_unixtime") ==
conf._2)
+ })
+ }
+ })
+ }
}
diff --git
a/backends-clickhouse/src/test/scala/org/apache/spark/sql/execution/benchmarks/CHOptimizeRuleBenchmark.scala
b/backends-clickhouse/src/test/scala/org/apache/spark/sql/execution/benchmarks/CHOptimizeRuleBenchmark.scala
new file mode 100644
index 000000000..8d6d749fd
--- /dev/null
+++
b/backends-clickhouse/src/test/scala/org/apache/spark/sql/execution/benchmarks/CHOptimizeRuleBenchmark.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.benchmarks
+
+import org.apache.spark.benchmark.Benchmark
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.execution.benchmark.SqlBasedBenchmark
+
+object CHOptimizeRuleBenchmark extends SqlBasedBenchmark with
CHSqlBasedBenchmark {
+
+ protected lazy val appName = "CHOptimizeRuleBenchmark"
+ protected lazy val thrdNum = "1"
+ protected lazy val memorySize = "4G"
+ protected lazy val offheapSize = "4G"
+
+ def beforeAll(): Unit = {}
+
+ override def getSparkSession: SparkSession = {
+ beforeAll()
+ val conf = getSparkConf
+ .setIfMissing("spark.sql.columnVector.offheap.enabled", "true")
+
+ SparkSession.builder.config(conf).getOrCreate()
+ }
+
+ override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
+ val (parquetDir, readFileCnt, scanSchema, executedCnt, executedVanilla) =
+ if (mainArgs.isEmpty) {
+ ("/data/tpch-data-sf1/parquet/lineitem", 3,
"l_orderkey,l_receiptdate", 5, true)
+ } else {
+ (mainArgs(0), mainArgs(1).toInt, mainArgs(2), mainArgs(3).toInt,
mainArgs(4).toBoolean)
+ }
+
+ val parquetReadBenchmark =
+ new Benchmark(s"OptimizeRuleBenchmark", 10, output = output)
+
+ parquetReadBenchmark.addCase(s"ClickHouse rewrite dateConversion: false",
executedCnt) {
+ _ => testToDateOptimize(parquetDir, "false")
+ }
+
+ parquetReadBenchmark.addCase(s"ClickHouse rewrite dateConversion: true",
executedCnt) {
+ _ => testToDateOptimize(parquetDir, "true")
+ }
+
+ parquetReadBenchmark.run()
+ }
+
+ def testToDateOptimize(parquetDir: String, enable: String): Unit = {
+
withSQLConf(("spark.gluten.sql.columnar.backend.ch.rewrite.dateConversion",
enable)) {
+ spark
+ .sql(s"""
+ |select
+ |to_date(
+ | from_unixtime(
+ | unix_timestamp(date_format(l_shipdate, 'yyyyMMdd'),
'yyyyMMdd')
+ | )
+ |)
+ |from parquet.`$parquetDir`
+ |
+ |""".stripMargin)
+ .collect()
+ }
+ }
+}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala
b/gluten-core/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala
index d70bb8fed..8c2427509 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala
@@ -566,7 +566,7 @@ object ExpressionConverter extends SQLConfHelper with
Logging {
throw new UnsupportedOperationException(s"Not support expression
TimestampAdd.")
}
val add = timestampAdd.asInstanceOf[BinaryExpression]
- TimestampAddTransform(
+ TimestampAddTransformer(
substraitExprName,
extract.get.head,
replaceWithExpressionTransformerInternal(add.left, attributeSeq,
expressionsMap),
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/expression/TimestampAddTransform.scala
b/gluten-core/src/main/scala/org/apache/gluten/expression/TimestampAddTransformer.scala
similarity index 98%
rename from
gluten-core/src/main/scala/org/apache/gluten/expression/TimestampAddTransform.scala
rename to
gluten-core/src/main/scala/org/apache/gluten/expression/TimestampAddTransformer.scala
index b3b3730b7..acede4523 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/expression/TimestampAddTransform.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/expression/TimestampAddTransformer.scala
@@ -23,7 +23,7 @@ import org.apache.spark.sql.types.DataType
import com.google.common.collect.Lists
-case class TimestampAddTransform(
+case class TimestampAddTransformer(
substraitExprName: String,
unit: String,
left: ExpressionTransformer,
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/RewriteToDateExpresstionRule.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/RewriteToDateExpresstionRule.scala
new file mode 100644
index 000000000..f809bb70f
--- /dev/null
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/RewriteToDateExpresstionRule.scala
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.extension
+
+import org.apache.gluten.GlutenConfig
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
+
+// If users query data through BI tools.
+// The BI tools may generate SQL similar to
+// `to_date(
+// from_unixtime(
+// unix_timestamp(stringType, 'yyyyMMdd')
+// )
+// )`
+// to convert string strings to dates.
+// Under ch backend, the StringType can be directly converted into DateType,
+// and the functions `from_unixtime` and `unix_timestamp` can be optimized
here.
+// Optimized result is `to_date(stringType)`
+class RewriteToDateExpresstionRule(session: SparkSession, conf: SQLConf)
+ extends Rule[LogicalPlan]
+ with Logging {
+
+ override def apply(plan: LogicalPlan): LogicalPlan = {
+ if (
+ plan.resolved &&
+ GlutenConfig.getConf.enableGluten &&
+ GlutenConfig.getConf.enableCHRewriteDateConversion
+ ) {
+ visitPlan(plan)
+ } else {
+ plan
+ }
+ }
+
+ private def visitPlan(plan: LogicalPlan): LogicalPlan = plan match {
+ case project: Project if canRewrite(project) =>
+ val newProjectList = project.projectList.map(expr =>
visitExpression(expr))
+ val newProject = Project(newProjectList, project.child)
+ newProject
+ case other =>
+ val children = other.children.map(visitPlan)
+ other.withNewChildren(children)
+ }
+
+ private def visitExpression(expression: NamedExpression): NamedExpression =
expression match {
+ case Alias(c, _) if c.isInstanceOf[ParseToDate] =>
+ val newToDate = rewriteParseToDate(c.asInstanceOf[ParseToDate])
+ if (!newToDate.fastEquals(c)) {
+ Alias(newToDate, newToDate.toString())()
+ } else {
+ expression
+ }
+ case _ => expression
+ }
+
+ private def rewriteParseToDate(toDate: ParseToDate): Expression =
toDate.left match {
+ case fromUnixTime: FromUnixTime
+ if fromUnixTime.left.isInstanceOf[UnixTimestamp]
+ &&
fromUnixTime.left.asInstanceOf[UnixTimestamp].left.dataType.isInstanceOf[StringType]
=>
+ val unixTimestamp = fromUnixTime.left.asInstanceOf[UnixTimestamp]
+ val newLeft = unixTimestamp.left
+ new ParseToDate(newLeft)
+ case _ => toDate
+ }
+
+ private def canRewrite(project: Project): Boolean = {
+ project.projectList.exists(
+ expr => expr.isInstanceOf[Alias] &&
expr.asInstanceOf[Alias].child.isInstanceOf[ParseToDate])
+ }
+}
diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
index 60ff95a7e..437cea3cf 100644
--- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
@@ -94,6 +94,9 @@ class GlutenConfig(conf: SQLConf) extends Logging {
def enableRewriteDateTimestampComparison: Boolean =
conf.getConf(ENABLE_REWRITE_DATE_TIMESTAMP_COMPARISON)
+ def enableCHRewriteDateConversion: Boolean =
+ conf.getConf(ENABLE_CH_REWRITE_DATE_CONVERSION)
+
def enableCommonSubexpressionEliminate: Boolean =
conf.getConf(ENABLE_COMMON_SUBEXPRESSION_ELIMINATE)
@@ -1588,6 +1591,16 @@ object GlutenConfig {
.booleanConf
.createWithDefault(true)
+ val ENABLE_CH_REWRITE_DATE_CONVERSION =
+ buildConf("spark.gluten.sql.columnar.backend.ch.rewrite.dateConversion")
+ .internal()
+ .doc(
+ "Rewrite the conversion between date and string."
+ + "For example `to_date(from_unixtime(unix_timestamp(stringType,
'yyyyMMdd')))`"
+ + " will be rewritten to `to_date(stringType)`")
+ .booleanConf
+ .createWithDefault(true)
+
val ENABLE_COLUMNAR_PROJECT_COLLAPSE =
buildConf("spark.gluten.sql.columnar.project.collapse")
.internal()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]