This is an automated email from the ASF dual-hosted git repository.
loneylee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new ef5e8af42c [Gluten-8792][CH] Support delta project incrementMetric
expr (#9353)
ef5e8af42c is described below
commit ef5e8af42cb3c1416a8e6525d8a6d7a925bb803e
Author: Shuai li <[email protected]>
AuthorDate: Tue Apr 22 10:42:04 2025 +0800
[Gluten-8792][CH] Support delta project incrementMetric expr (#9353)
* [Gluten-8792][CH] Support delta project incrementMetric expr
* rename
* add ut
* add filter
* add ut2
* fix ci
---
.../sql/execution/GlutenDeltaExpressionSuite.scala | 92 ++++++++++++++++++++++
.../apache/gluten/component/CHDeltaComponent.scala | 21 ++++-
.../backendsapi/clickhouse/CHMetricsApi.scala | 4 +-
.../gluten/metrics/FilterMetricsUpdater.scala | 31 +++++++-
.../org/apache/gluten/metrics/MetricsUtil.scala | 24 +++++-
.../gluten/metrics/ProjectMetricsUpdater.scala | 52 ++++++++++--
.../Parser/RelParsers/FilterRelParser.cpp | 2 +-
.../execution/DeltaProjectExecTransformer.scala | 35 ++------
8 files changed, 218 insertions(+), 43 deletions(-)
diff --git
a/backends-clickhouse/src-delta-32/test/scala/org/apache/spark/sql/execution/GlutenDeltaExpressionSuite.scala
b/backends-clickhouse/src-delta-32/test/scala/org/apache/spark/sql/execution/GlutenDeltaExpressionSuite.scala
new file mode 100644
index 0000000000..40b591882d
--- /dev/null
+++
b/backends-clickhouse/src-delta-32/test/scala/org/apache/spark/sql/execution/GlutenDeltaExpressionSuite.scala
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution
+
+import org.apache.gluten.execution.{DeltaFilterExecTransformer,
DeltaProjectExecTransformer, GlutenClickHouseTPCHAbstractSuite}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.Column
+import org.apache.spark.sql.catalyst.expressions.Literal
+import org.apache.spark.sql.delta.metric.IncrementMetric
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+import org.apache.spark.sql.execution.metric.SQLMetrics.createMetric
+
+// Some sqls' line length exceeds 100
+// scalastyle:off line.size.limit
+
+class GlutenDeltaExpressionSuite
+ extends GlutenClickHouseTPCHAbstractSuite
+ with AdaptiveSparkPlanHelper {
+
+ override protected val needCopyParquetToTablePath = true
+
+ override protected val tablesPath: String = basePath + "/tpch-data"
+ override protected val tpchQueries: String = rootPath +
"queries/tpch-queries-ch"
+ override protected val queriesResults: String = rootPath +
"mergetree-queries-output"
+
+ // import org.apache.gluten.backendsapi.clickhouse.CHConfig._
+
+ /** Run Gluten + ClickHouse Backend with SortShuffleManager */
+ override protected def sparkConf: SparkConf = {
+ super.sparkConf
+ .set("spark.shuffle.manager",
"org.apache.spark.shuffle.sort.ColumnarShuffleManager")
+ .set("spark.io.compression.codec", "LZ4")
+ .set("spark.sql.shuffle.partitions", "5")
+ .set("spark.sql.autoBroadcastJoinThreshold", "10MB")
+ .set("spark.sql.adaptive.enabled", "true")
+ .set("spark.sql.files.maxPartitionBytes", "20000000")
+ .set("spark.sql.storeAssignmentPolicy", "legacy")
+ .set("spark.databricks.delta.retentionDurationCheck.enabled", "false")
+ }
+
+ override protected def createTPCHNotNullTables(): Unit = {
+ createNotNullTPCHTablesInParquet(tablesPath)
+ }
+
+ test("test project IncrementMetric not fallback") {
+ val table_name = "project_increment_metric"
+ withTable(table_name) {
+ spark.sql(s"""
+ |CREATE TABLE IF NOT EXISTS $table_name
+ |($lineitemNullableSchema)
+ |USING delta
+ |TBLPROPERTIES (delta.enableDeletionVectors='true')
+ |LOCATION '$basePath/$table_name'
+ |""".stripMargin)
+
+ spark.sql(s"""insert into table $table_name select * from
lineitem""".stripMargin)
+ val metric = createMetric(sparkContext, "number of source rows")
+ val metricFilter = createMetric(sparkContext, "number of source rows
(during repeated scan)")
+ val df = sql(s"select l_orderkey,l_shipdate from $table_name")
+ .withColumn("im", Column(IncrementMetric(Literal(true), metric)))
+ .filter("im")
+ .filter(Column(IncrementMetric(Literal(true), metricFilter)))
+ .drop("im")
+ df.collect()
+
+ val cnt = df.queryExecution.executedPlan.collect {
+ case _: DeltaProjectExecTransformer => true
+ case _: DeltaFilterExecTransformer => true
+ }
+
+ assertResult(2)(cnt.size)
+ assertResult(600572)(metric.value)
+ assertResult(600572)(metricFilter.value)
+ }
+ }
+}
+// scalastyle:off line.size.limit
diff --git
a/backends-clickhouse/src-delta/main/scala/org/apache/gluten/component/CHDeltaComponent.scala
b/backends-clickhouse/src-delta/main/scala/org/apache/gluten/component/CHDeltaComponent.scala
index 8acdcee777..85caf73546 100644
---
a/backends-clickhouse/src-delta/main/scala/org/apache/gluten/component/CHDeltaComponent.scala
+++
b/backends-clickhouse/src-delta/main/scala/org/apache/gluten/component/CHDeltaComponent.scala
@@ -17,7 +17,9 @@
package org.apache.gluten.component
import org.apache.gluten.backendsapi.clickhouse.CHBackend
-import org.apache.gluten.execution.OffloadDeltaNode
+import org.apache.gluten.execution.{OffloadDeltaFilter, OffloadDeltaNode,
OffloadDeltaProject}
+import org.apache.gluten.extension.DeltaPostTransformRules
+import org.apache.gluten.extension.columnar.enumerated.RasOffload
import org.apache.gluten.extension.columnar.heuristic.HeuristicTransform
import org.apache.gluten.extension.columnar.validator.Validators
import org.apache.gluten.extension.injector.Injector
@@ -25,6 +27,7 @@ import org.apache.gluten.sql.shims.DeltaShimLoader
import org.apache.spark.SparkContext
import org.apache.spark.api.plugin.PluginContext
+import org.apache.spark.sql.execution.{FilterExec, ProjectExec}
class CHDeltaComponent extends Component {
override def name(): String = "ch-delta"
@@ -37,11 +40,25 @@ class CHDeltaComponent extends Component {
override def injectRules(injector: Injector): Unit = {
val legacy = injector.gluten.legacy
+ val ras = injector.gluten.ras
legacy.injectTransform {
c =>
- val offload = Seq(OffloadDeltaNode())
+ val offload = Seq(OffloadDeltaNode(), OffloadDeltaProject(),
OffloadDeltaFilter())
HeuristicTransform.Simple(Validators.newValidator(c.glutenConf,
offload), offload)
}
+ val offloads: Seq[RasOffload] = Seq(
+ RasOffload.from[ProjectExec](OffloadDeltaProject()),
+ RasOffload.from[FilterExec](OffloadDeltaFilter())
+ )
+ offloads.foreach(
+ offload =>
+ ras.injectRasRule(
+ c => RasOffload.Rule(offload, Validators.newValidator(c.glutenConf),
Nil)))
+ DeltaPostTransformRules.rules.foreach {
+ r =>
+ legacy.injectPostTransform(_ => r)
+ ras.injectPostTransform(_ => r)
+ }
DeltaShimLoader.getDeltaShims.registerExpressionExtension()
}
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala
index 0789155951..6ff629aafb 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala
@@ -188,7 +188,7 @@ class CHMetricsApi extends MetricsApi with Logging with
LogLevelUtil {
override def genFilterTransformerMetricsUpdater(
metrics: Map[String, SQLMetric],
extraMetrics: Seq[(String, SQLMetric)] = Seq.empty): MetricsUpdater =
- new FilterMetricsUpdater(metrics)
+ new FilterMetricsUpdater(metrics, extraMetrics)
override def genProjectTransformerMetrics(sparkContext: SparkContext):
Map[String, SQLMetric] =
Map(
@@ -206,7 +206,7 @@ class CHMetricsApi extends MetricsApi with Logging with
LogLevelUtil {
override def genProjectTransformerMetricsUpdater(
metrics: Map[String, SQLMetric],
extraMetrics: Seq[(String, SQLMetric)] = Seq.empty): MetricsUpdater =
- new ProjectMetricsUpdater(metrics)
+ new ProjectMetricsUpdater(metrics, extraMetrics)
override def genHashAggregateTransformerMetrics(
sparkContext: SparkContext): Map[String, SQLMetric] =
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/FilterMetricsUpdater.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/FilterMetricsUpdater.scala
index d41957c860..7afb249ffd 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/FilterMetricsUpdater.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/FilterMetricsUpdater.scala
@@ -16,14 +16,41 @@
*/
package org.apache.gluten.metrics
+import
org.apache.gluten.metrics.ProjectMetricsUpdater.{DELTA_INPUT_ROW_METRIC_NAMES,
UNSUPPORTED_METRIC_NAMES}
+
+import org.apache.spark.internal.Logging
import org.apache.spark.sql.execution.metric.SQLMetric
-class FilterMetricsUpdater(val metrics: Map[String, SQLMetric]) extends
MetricsUpdater {
+class FilterMetricsUpdater(
+ val metrics: Map[String, SQLMetric],
+ val extraMetrics: Seq[(String, SQLMetric)]
+) extends MetricsUpdater
+ with Logging {
override def updateNativeMetrics(opMetrics: IOperatorMetrics): Unit = {
if (opMetrics != null) {
val operatorMetrics = opMetrics.asInstanceOf[OperatorMetrics]
if (!operatorMetrics.metricsList.isEmpty) {
+ var numInputRows = Seq(metrics("numInputRows"))
+ extraMetrics.foreach {
+ case (name, metric) =>
+ name match {
+ case "increment_metric" =>
+ metric.name match {
+ case Some(input) if
DELTA_INPUT_ROW_METRIC_NAMES.contains(input) =>
+ numInputRows = numInputRows :+ metric
+ case Some(unSupport) if
UNSUPPORTED_METRIC_NAMES.contains(unSupport) =>
+ logTrace(s"Unsupported metric name: $unSupport")
+ case Some(other) =>
+ logTrace(s"Unknown metric name: $other")
+ case _ => // do nothing
+ }
+ case o: String =>
+ logTrace(s"Unknown metric name: $o")
+ case _ => // do nothing
+ }
+ }
+
val metricsData = operatorMetrics.metricsList.get(0)
metrics("totalTime") += (metricsData.time / 1000L).toLong
metrics("inputWaitTime") += (metricsData.inputWaitTime / 1000L).toLong
@@ -35,7 +62,7 @@ class FilterMetricsUpdater(val metrics: Map[String,
SQLMetric]) extends MetricsU
metrics("extraTime"),
metrics("numOutputRows"),
metrics("outputBytes"),
- metrics("numInputRows"),
+ numInputRows,
metrics("inputBytes"),
FilterMetricsUpdater.INCLUDING_PROCESSORS,
FilterMetricsUpdater.INCLUDING_PROCESSORS
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala
index 7d81467e97..3630205fd0 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala
@@ -194,7 +194,7 @@ object MetricsUtil extends Logging {
extraTime: SQLMetric,
outputRows: SQLMetric,
outputBytes: SQLMetric,
- inputRows: SQLMetric,
+ inputRows: Seq[SQLMetric],
inputBytes: SQLMetric,
includingMetrics: Array[String],
planNodeNames: Array[String]): Unit = {
@@ -207,9 +207,29 @@ object MetricsUtil extends Logging {
if (planNodeNames.exists(processor.name.startsWith(_))) {
outputRows += processor.outputRows
outputBytes += processor.outputBytes
- inputRows += processor.inputRows
+ inputRows.foreach(inputRow => inputRow += processor.inputRows)
inputBytes += processor.inputBytes
}
})
}
+
+ def updateExtraTimeMetric(
+ metricData: MetricsData,
+ extraTime: SQLMetric,
+ outputRows: SQLMetric,
+ outputBytes: SQLMetric,
+ inputRows: SQLMetric,
+ inputBytes: SQLMetric,
+ includingMetrics: Array[String],
+ planNodeNames: Array[String]): Unit = {
+ updateExtraTimeMetric(
+ metricData,
+ extraTime,
+ outputRows,
+ outputBytes,
+ Seq(inputRows),
+ inputBytes,
+ includingMetrics,
+ planNodeNames)
+ }
}
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/ProjectMetricsUpdater.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/ProjectMetricsUpdater.scala
index f307f3c7dc..89f4337f34 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/ProjectMetricsUpdater.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/ProjectMetricsUpdater.scala
@@ -16,14 +16,41 @@
*/
package org.apache.gluten.metrics
-import org.apache.spark.sql.execution.metric.SQLMetric
+import
org.apache.gluten.metrics.ProjectMetricsUpdater.{DELTA_INPUT_ROW_METRIC_NAMES,
UNSUPPORTED_METRIC_NAMES}
-class ProjectMetricsUpdater(val metrics: Map[String, SQLMetric]) extends
MetricsUpdater {
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.execution.metric.SQLMetric
+class ProjectMetricsUpdater(
+ val metrics: Map[String, SQLMetric],
+ val extraMetrics: Seq[(String, SQLMetric)])
+ extends MetricsUpdater
+ with Logging {
override def updateNativeMetrics(opMetrics: IOperatorMetrics): Unit = {
if (opMetrics != null) {
val operatorMetrics = opMetrics.asInstanceOf[OperatorMetrics]
if (!operatorMetrics.metricsList.isEmpty) {
+ var numInputRows = Seq(metrics("numInputRows"))
+
+ extraMetrics.foreach {
+ case (name, metric) =>
+ name match {
+ case "increment_metric" =>
+ metric.name match {
+ case Some(input) if
DELTA_INPUT_ROW_METRIC_NAMES.contains(input) =>
+ numInputRows = numInputRows :+ metric
+ case Some(unSupport) if
UNSUPPORTED_METRIC_NAMES.contains(unSupport) =>
+ logTrace(s"Unsupported metric name: $unSupport")
+ case Some(other) =>
+ logTrace(s"Unknown metric name: $other")
+ case _ => // do nothing
+ }
+ case o: String =>
+ logTrace(s"Unknown metric name: $o")
+ case _ => // do nothing
+ }
+ }
+
val metricsData = operatorMetrics.metricsList.get(0)
metrics("totalTime") += (metricsData.time / 1000L).toLong
metrics("inputWaitTime") += (metricsData.inputWaitTime / 1000L).toLong
@@ -35,7 +62,7 @@ class ProjectMetricsUpdater(val metrics: Map[String,
SQLMetric]) extends Metrics
metrics("extraTime"),
metrics("numOutputRows"),
metrics("outputBytes"),
- metrics("numInputRows"),
+ numInputRows,
metrics("inputBytes"),
ProjectMetricsUpdater.INCLUDING_PROCESSORS,
ProjectMetricsUpdater.CH_PLAN_NODE_NAME
@@ -46,6 +73,21 @@ class ProjectMetricsUpdater(val metrics: Map[String,
SQLMetric]) extends Metrics
}
object ProjectMetricsUpdater {
- val INCLUDING_PROCESSORS = Array("ExpressionTransform")
- val CH_PLAN_NODE_NAME = Array("ExpressionTransform")
+ val INCLUDING_PROCESSORS: Array[String] = Array("ExpressionTransform")
+ val CH_PLAN_NODE_NAME: Array[String] = Array("ExpressionTransform")
+
+ val UNSUPPORTED_METRIC_NAMES: Set[String] =
+ Set(
+ "number of updated rows",
+ "number of deleted rows",
+ "number of inserted rows",
+ "number of rows updated by a matched clause",
+ "number of rows deleted by a matched clause"
+ )
+
+ val DELTA_INPUT_ROW_METRIC_NAMES: Set[String] = Set(
+ "number of source rows",
+ "number of target rows rewritten unmodified",
+ "number of source rows (during repeated scan)"
+ )
}
diff --git a/cpp-ch/local-engine/Parser/RelParsers/FilterRelParser.cpp
b/cpp-ch/local-engine/Parser/RelParsers/FilterRelParser.cpp
index 64546a4ff1..14c41e15b8 100644
--- a/cpp-ch/local-engine/Parser/RelParsers/FilterRelParser.cpp
+++ b/cpp-ch/local-engine/Parser/RelParsers/FilterRelParser.cpp
@@ -39,7 +39,7 @@ FilterRelParser::parse(DB::QueryPlanPtr query_plan, const
substrait::Rel & rel,
auto input_header = query_plan->getCurrentHeader();
DB::ActionsDAG actions_dag{input_header.getColumnsWithTypeAndName()};
const auto condition_node = parseExpression(actions_dag,
filter_rel.condition());
- if (filter_rel.condition().has_scalar_function())
+ if (filter_rel.condition().has_scalar_function() ||
filter_rel.condition().has_literal())
{
actions_dag.addOrReplaceInOutputs(*condition_node);
}
diff --git
a/gluten-delta/src-delta-32/main/scala/org/apache/gluten/execution/DeltaProjectExecTransformer.scala
b/gluten-delta/src-delta-32/main/scala/org/apache/gluten/execution/DeltaProjectExecTransformer.scala
index 39e8d5bfa9..dc709dad2d 100644
---
a/gluten-delta/src-delta-32/main/scala/org/apache/gluten/execution/DeltaProjectExecTransformer.scala
+++
b/gluten-delta/src-delta-32/main/scala/org/apache/gluten/execution/DeltaProjectExecTransformer.scala
@@ -24,7 +24,7 @@ import org.apache.gluten.substrait.SubstraitContext
import org.apache.gluten.substrait.extensions.ExtensionBuilder
import org.apache.gluten.substrait.rel.{RelBuilder, RelNode}
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CaseWhen,
NamedExpression}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute,
NamedExpression}
import org.apache.spark.sql.delta.metric.IncrementMetric
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.metric.SQLMetric
@@ -80,35 +80,12 @@ case class DeltaProjectExecTransformer(projectList:
Seq[NamedExpression], child:
def genNewProjectList(projectList: Seq[NamedExpression]):
Seq[NamedExpression] = {
projectList.map {
case alias: Alias =>
- alias.child match {
- case IncrementMetric(child, metric) =>
- extraMetrics :+= (alias.child.prettyName, metric)
- Alias(child = child, name = alias.name)()
-
- case CaseWhen(branches, elseValue) =>
- val newBranches = branches.map {
- case (expr1, expr2: IncrementMetric) =>
- extraMetrics :+= (expr2.prettyName, expr2.metric)
- (expr1, expr2.child)
- case other => other
- }
-
- val newElseValue = elseValue match {
- case Some(IncrementMetric(child: IncrementMetric, metric)) =>
- extraMetrics :+= (child.prettyName, metric)
- extraMetrics :+= (child.prettyName, child.metric)
- Some(child.child)
- case _ => elseValue
- }
-
- Alias(
- child = CaseWhen(newBranches, newElseValue),
- name = alias.name
- )(alias.exprId)
-
- case _ =>
- alias
+ val newChild = alias.child.transformUp {
+ case im @ IncrementMetric(child, metric) =>
+ extraMetrics :+= (im.prettyName, metric)
+ child
}
+ Alias(child = newChild, name = alias.name)(alias.exprId)
case other => other
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]