This is an automated email from the ASF dual-hosted git repository.

yma pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 27f30ad826 [GLUTEN-7110][VL][DELTA] support IncrementMetric in gluten 
(#7111)
27f30ad826 is described below

commit 27f30ad826be0e085d321c302b54eb789ad9476e
Author: Qian Sun <[email protected]>
AuthorDate: Tue Oct 15 09:16:11 2024 +0800

    [GLUTEN-7110][VL][DELTA] support IncrementMetric in gluten (#7111)
    
    * [GLUTEN-7110][VL][DELTA] support IncrementMetric in gluten
    
    * init DeltaFilterExecTransformer
    
    * init DeltaProjectExecTransformer
    
    * update metric
    
    * add FilterTransformerRegistor
    
    * fix scala 213 compile error
    
    * add ProjectTransformerRegister
    
    * remove redundant function
    
    * update supportedDelta
    
    * update veloxSparkPlanExecApi
    
    * use createUnsafe for ProjectExec
    
    * init ReplaceDeltaTransformer
    
    * use delta write rule
    
    * fix error
    
    * use copy-resources to reduce redundant resources
---
 .../backendsapi/clickhouse/CHMetricsApi.scala      |   8 +-
 .../gluten/backendsapi/velox/VeloxMetricsApi.scala |  10 +-
 .../gluten/execution/FilterExecTransformer.scala   |  21 +---
 .../gluten/metrics/FilterMetricsUpdater.scala      |  12 ++-
 .../gluten/metrics/ProjectMetricsUpdater.scala     |  12 ++-
 gluten-delta/pom.xml                               |  31 ++++++
 .../execution/DeltaFilterExecTransformer.scala     |  81 ++++++++++++++
 .../execution/DeltaProjectExecTransformer.scala    | 116 +++++++++++++++++++++
 .../execution/DeltaFilterExecTransformer.scala     |  25 +----
 .../execution/DeltaProjectExecTransformer.scala    |  22 ++--
 .../extension/DeltaRewriteTransformerRules.scala   |  28 ++++-
 .../org/apache/gluten/backendsapi/MetricsApi.scala |   8 +-
 .../BasicPhysicalOperatorTransformer.scala         |  47 +++++----
 .../gluten/execution/ProjectExecTransformer.scala  |  41 ++++++++
 14 files changed, 371 insertions(+), 91 deletions(-)

diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala
index d84181fec1..6a4f0c9a6f 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala
@@ -165,7 +165,9 @@ class CHMetricsApi extends MetricsApi with Logging with 
LogLevelUtil {
       "totalTime" -> SQLMetrics.createTimingMetric(sparkContext, "time")
     )
 
-  override def genFilterTransformerMetricsUpdater(metrics: Map[String, 
SQLMetric]): MetricsUpdater =
+  override def genFilterTransformerMetricsUpdater(
+      metrics: Map[String, SQLMetric],
+      extraMetrics: Seq[(String, SQLMetric)] = Seq.empty): MetricsUpdater =
     new FilterMetricsUpdater(metrics)
 
   override def genProjectTransformerMetrics(sparkContext: SparkContext): 
Map[String, SQLMetric] =
@@ -182,7 +184,9 @@ class CHMetricsApi extends MetricsApi with Logging with 
LogLevelUtil {
     )
 
   override def genProjectTransformerMetricsUpdater(
-      metrics: Map[String, SQLMetric]): MetricsUpdater = new 
ProjectMetricsUpdater(metrics)
+      metrics: Map[String, SQLMetric],
+      extraMetrics: Seq[(String, SQLMetric)] = Seq.empty): MetricsUpdater =
+    new ProjectMetricsUpdater(metrics)
 
   override def genHashAggregateTransformerMetrics(
       sparkContext: SparkContext): Map[String, SQLMetric] =
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala
index 00cba43728..1f87ffdba5 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala
@@ -175,8 +175,10 @@ class VeloxMetricsApi extends MetricsApi with Logging {
         "number of memory allocations")
     )
 
-  override def genFilterTransformerMetricsUpdater(metrics: Map[String, 
SQLMetric]): MetricsUpdater =
-    new FilterMetricsUpdater(metrics)
+  override def genFilterTransformerMetricsUpdater(
+      metrics: Map[String, SQLMetric],
+      extraMetrics: Seq[(String, SQLMetric)] = Seq.empty): MetricsUpdater =
+    new FilterMetricsUpdater(metrics, extraMetrics)
 
   override def genProjectTransformerMetrics(sparkContext: SparkContext): 
Map[String, SQLMetric] =
     Map(
@@ -192,7 +194,9 @@ class VeloxMetricsApi extends MetricsApi with Logging {
     )
 
   override def genProjectTransformerMetricsUpdater(
-      metrics: Map[String, SQLMetric]): MetricsUpdater = new 
ProjectMetricsUpdater(metrics)
+      metrics: Map[String, SQLMetric],
+      extraMetrics: Seq[(String, SQLMetric)] = Seq.empty): MetricsUpdater =
+    new ProjectMetricsUpdater(metrics, extraMetrics)
 
   override def genHashAggregateTransformerMetrics(
       sparkContext: SparkContext): Map[String, SQLMetric] =
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/execution/FilterExecTransformer.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/execution/FilterExecTransformer.scala
index daa08498be..fdfe1d7ea9 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/execution/FilterExecTransformer.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/execution/FilterExecTransformer.scala
@@ -16,30 +16,11 @@
  */
 package org.apache.gluten.execution
 
-import org.apache.spark.sql.catalyst.expressions.{And, Expression}
+import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.execution.SparkPlan
 
 case class FilterExecTransformer(condition: Expression, child: SparkPlan)
   extends FilterExecTransformerBase(condition, child) {
-  // FIXME: Should use field "condition" to store the actual executed filter 
expressions.
-  //  To make optimization easier (like to remove filter when it actually does 
nothing)
-  override protected def getRemainingCondition: Expression = {
-    val scanFilters = child match {
-      // Get the filters including the manually pushed down ones.
-      case basicScanExecTransformer: BasicScanExecTransformer =>
-        basicScanExecTransformer.filterExprs()
-      // For fallback scan, we need to keep original filter.
-      case _ =>
-        Seq.empty[Expression]
-    }
-    if (scanFilters.isEmpty) {
-      condition
-    } else {
-      val remainingFilters =
-        FilterHandler.getRemainingFilters(scanFilters, 
splitConjunctivePredicates(condition))
-      remainingFilters.reduceLeftOption(And).orNull
-    }
-  }
 
   override protected def withNewChildInternal(newChild: SparkPlan): 
FilterExecTransformer =
     copy(child = newChild)
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/metrics/FilterMetricsUpdater.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/metrics/FilterMetricsUpdater.scala
index c877fdbb07..e8c27f6a43 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/metrics/FilterMetricsUpdater.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/metrics/FilterMetricsUpdater.scala
@@ -18,7 +18,10 @@ package org.apache.gluten.metrics
 
 import org.apache.spark.sql.execution.metric.SQLMetric
 
-class FilterMetricsUpdater(val metrics: Map[String, SQLMetric]) extends 
MetricsUpdater {
+class FilterMetricsUpdater(
+    val metrics: Map[String, SQLMetric],
+    val extraMetrics: Seq[(String, SQLMetric)])
+  extends MetricsUpdater {
 
   override def updateNativeMetrics(opMetrics: IOperatorMetrics): Unit = {
     if (opMetrics != null) {
@@ -30,6 +33,13 @@ class FilterMetricsUpdater(val metrics: Map[String, 
SQLMetric]) extends MetricsU
       metrics("wallNanos") += operatorMetrics.wallNanos
       metrics("peakMemoryBytes") += operatorMetrics.peakMemoryBytes
       metrics("numMemoryAllocations") += operatorMetrics.numMemoryAllocations
+      extraMetrics.foreach {
+        case (name, metric) =>
+          name match {
+            case "increment_metric" => metric += operatorMetrics.outputRows
+            case _ => // do nothing
+          }
+      }
     }
   }
 }
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/metrics/ProjectMetricsUpdater.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/metrics/ProjectMetricsUpdater.scala
index ff8335c861..cbd195bb80 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/metrics/ProjectMetricsUpdater.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/metrics/ProjectMetricsUpdater.scala
@@ -18,7 +18,10 @@ package org.apache.gluten.metrics
 
 import org.apache.spark.sql.execution.metric.SQLMetric
 
-class ProjectMetricsUpdater(val metrics: Map[String, SQLMetric]) extends 
MetricsUpdater {
+class ProjectMetricsUpdater(
+    val metrics: Map[String, SQLMetric],
+    val extraMetrics: Seq[(String, SQLMetric)])
+  extends MetricsUpdater {
 
   override def updateNativeMetrics(opMetrics: IOperatorMetrics): Unit = {
     if (opMetrics != null) {
@@ -30,6 +33,13 @@ class ProjectMetricsUpdater(val metrics: Map[String, 
SQLMetric]) extends Metrics
       metrics("wallNanos") += operatorMetrics.wallNanos
       metrics("peakMemoryBytes") += operatorMetrics.peakMemoryBytes
       metrics("numMemoryAllocations") += operatorMetrics.numMemoryAllocations
+      extraMetrics.foreach {
+        case (name, metric) =>
+          name match {
+            case "increment_metric" => metric += operatorMetrics.outputRows
+            case _ => // do nothing
+          }
+      }
     }
   }
 }
diff --git a/gluten-delta/pom.xml b/gluten-delta/pom.xml
index 48d47d906b..6a6b7291d7 100755
--- a/gluten-delta/pom.xml
+++ b/gluten-delta/pom.xml
@@ -130,6 +130,19 @@
       <plugin>
         <groupId>com.diffplug.spotless</groupId>
         <artifactId>spotless-maven-plugin</artifactId>
+        <configuration>
+          <scala>
+            <scalafmt>
+              <file>${project.basedir}/../.scalafmt.conf</file>
+            </scalafmt>
+            <includes>
+              <include>src/main/scala/**/*.scala</include>
+              <include>src/test/scala/**/*.scala</include>
+              
<include>src/main/delta-${delta.binary.version}/**/*.scala</include>
+              
<include>src/test/delta-${delta.binary.version}/**/*.scala</include>
+            </includes>
+          </scala>
+        </configuration>
       </plugin>
       <plugin>
         <groupId>org.scalatest</groupId>
@@ -154,6 +167,24 @@
       <plugin>
         <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-resources-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>copy-resources</id>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>copy-resources</goal>
+            </goals>
+            <configuration>
+              
<outputDirectory>src/main/scala/org/apache/gluten/execution</outputDirectory>
+              <resources>
+                <resource>
+                  
<directory>src/main/delta-${delta.binary.version}/org/apache/gluten/execution</directory>
+                </resource>
+              </resources>
+              <overwrite>true</overwrite>
+            </configuration>
+          </execution>
+        </executions>
       </plugin>
     </plugins>
   </build>
diff --git 
a/gluten-delta/src/main/delta-32/org/apache/gluten/execution/DeltaFilterExecTransformer.scala
 
b/gluten-delta/src/main/delta-32/org/apache/gluten/execution/DeltaFilterExecTransformer.scala
new file mode 100644
index 0000000000..0c8cd54902
--- /dev/null
+++ 
b/gluten-delta/src/main/delta-32/org/apache/gluten/execution/DeltaFilterExecTransformer.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.gluten.backendsapi.BackendsApiManager
+import org.apache.gluten.expression.{ConverterUtils, ExpressionConverter}
+import org.apache.gluten.metrics.MetricsUpdater
+import org.apache.gluten.substrait.`type`.TypeBuilder
+import org.apache.gluten.substrait.SubstraitContext
+import org.apache.gluten.substrait.extensions.ExtensionBuilder
+import org.apache.gluten.substrait.rel.{RelBuilder, RelNode}
+
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
+import org.apache.spark.sql.delta.metric.IncrementMetric
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.metric.SQLMetric
+
+import scala.collection.JavaConverters._
+
+case class DeltaFilterExecTransformer(condition: Expression, child: SparkPlan)
+  extends FilterExecTransformerBase(condition, child) {
+
+  private var extraMetrics: Seq[(String, SQLMetric)] = Seq.empty
+
+  override def metricsUpdater(): MetricsUpdater =
+    
BackendsApiManager.getMetricsApiInstance.genFilterTransformerMetricsUpdater(
+      metrics,
+      extraMetrics.toSeq)
+
+  override def getRelNode(
+      context: SubstraitContext,
+      condExpr: Expression,
+      originalInputAttributes: Seq[Attribute],
+      operatorId: Long,
+      input: RelNode,
+      validation: Boolean): RelNode = {
+    assert(condExpr != null)
+    val args = context.registeredFunction
+    val condExprNode = condExpr match {
+      case IncrementMetric(child, metric) =>
+        extraMetrics :+= (condExpr.prettyName, metric)
+        ExpressionConverter
+          .replaceWithExpressionTransformer(child, attributeSeq = 
originalInputAttributes)
+          .doTransform(args)
+      case _ =>
+        ExpressionConverter
+          .replaceWithExpressionTransformer(condExpr, attributeSeq = 
originalInputAttributes)
+          .doTransform(args)
+    }
+
+    if (!validation) {
+      RelBuilder.makeFilterRel(input, condExprNode, context, operatorId)
+    } else {
+      // Use a extension node to send the input types through Substrait plan 
for validation.
+      val inputTypeNodeList = originalInputAttributes
+        .map(attr => ConverterUtils.getTypeNode(attr.dataType, attr.nullable))
+        .asJava
+      val extensionNode = ExtensionBuilder.makeAdvancedExtension(
+        BackendsApiManager.getTransformerApiInstance.packPBMessage(
+          TypeBuilder.makeStruct(false, inputTypeNodeList).toProtobuf))
+      RelBuilder.makeFilterRel(input, condExprNode, extensionNode, context, 
operatorId)
+    }
+  }
+
+  override protected def withNewChildInternal(newChild: SparkPlan): 
DeltaFilterExecTransformer =
+    copy(child = newChild)
+}
diff --git 
a/gluten-delta/src/main/delta-32/org/apache/gluten/execution/DeltaProjectExecTransformer.scala
 
b/gluten-delta/src/main/delta-32/org/apache/gluten/execution/DeltaProjectExecTransformer.scala
new file mode 100644
index 0000000000..a2be01a1f0
--- /dev/null
+++ 
b/gluten-delta/src/main/delta-32/org/apache/gluten/execution/DeltaProjectExecTransformer.scala
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.gluten.backendsapi.BackendsApiManager
+import org.apache.gluten.expression.{ConverterUtils, ExpressionConverter, 
ExpressionTransformer}
+import org.apache.gluten.metrics.MetricsUpdater
+import org.apache.gluten.substrait.`type`.TypeBuilder
+import org.apache.gluten.substrait.SubstraitContext
+import org.apache.gluten.substrait.extensions.ExtensionBuilder
+import org.apache.gluten.substrait.rel.{RelBuilder, RelNode}
+
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CaseWhen, 
NamedExpression}
+import org.apache.spark.sql.delta.metric.IncrementMetric
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.metric.SQLMetric
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+case class DeltaProjectExecTransformer(projectList: Seq[NamedExpression], 
child: SparkPlan)
+  extends ProjectExecTransformerBase(projectList, child) {
+
+  private var extraMetrics = mutable.Seq.empty[(String, SQLMetric)]
+
+  override def metricsUpdater(): MetricsUpdater =
+    
BackendsApiManager.getMetricsApiInstance.genProjectTransformerMetricsUpdater(
+      metrics,
+      extraMetrics.toSeq)
+
+  override def getRelNode(
+      context: SubstraitContext,
+      projectList: Seq[NamedExpression],
+      originalInputAttributes: Seq[Attribute],
+      operatorId: Long,
+      input: RelNode,
+      validation: Boolean): RelNode = {
+    val args = context.registeredFunction
+    val newProjectList = genNewProjectList(projectList)
+    val columnarProjExprs: Seq[ExpressionTransformer] = ExpressionConverter
+      .replaceWithExpressionTransformer(newProjectList, attributeSeq = 
originalInputAttributes)
+    val projExprNodeList = columnarProjExprs.map(_.doTransform(args)).asJava
+    val emitStartIndex = originalInputAttributes.size
+    if (!validation) {
+      RelBuilder.makeProjectRel(input, projExprNodeList, context, operatorId, 
emitStartIndex)
+    } else {
+      // Use a extension node to send the input types through Substrait plan 
for validation.
+      val inputTypeNodeList = originalInputAttributes
+        .map(attr => ConverterUtils.getTypeNode(attr.dataType, attr.nullable))
+        .asJava
+      val extensionNode = ExtensionBuilder.makeAdvancedExtension(
+        BackendsApiManager.getTransformerApiInstance.packPBMessage(
+          TypeBuilder.makeStruct(false, inputTypeNodeList).toProtobuf))
+      RelBuilder.makeProjectRel(
+        input,
+        projExprNodeList,
+        extensionNode,
+        context,
+        operatorId,
+        emitStartIndex)
+    }
+  }
+
+  override protected def withNewChildInternal(newChild: SparkPlan): 
DeltaProjectExecTransformer =
+    copy(child = newChild)
+
+  def genNewProjectList(projectList: Seq[NamedExpression]): 
Seq[NamedExpression] = {
+    projectList.map {
+      case alias: Alias =>
+        alias.child match {
+          case IncrementMetric(child, metric) =>
+            extraMetrics :+= (alias.child.prettyName, metric)
+            Alias(child = child, name = alias.name)()
+
+          case CaseWhen(branches, elseValue) =>
+            val newBranches = branches.map {
+              case (expr1, expr2: IncrementMetric) =>
+                extraMetrics :+= (expr2.prettyName, expr2.metric)
+                (expr1, expr2.child)
+              case other => other
+            }
+
+            val newElseValue = elseValue match {
+              case Some(IncrementMetric(child: IncrementMetric, metric)) =>
+                extraMetrics :+= (child.prettyName, metric)
+                extraMetrics :+= (child.prettyName, child.metric)
+                Some(child.child)
+              case _ => elseValue
+            }
+
+            Alias(
+              child = CaseWhen(newBranches, newElseValue),
+              name = alias.name
+            )(alias.exprId)
+
+          case _ =>
+            alias
+        }
+      case other => other
+    }
+  }
+}
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/execution/FilterExecTransformer.scala
 
b/gluten-delta/src/main/scala/org/apache/gluten/execution/DeltaFilterExecTransformer.scala
similarity index 51%
copy from 
backends-velox/src/main/scala/org/apache/gluten/execution/FilterExecTransformer.scala
copy to 
gluten-delta/src/main/scala/org/apache/gluten/execution/DeltaFilterExecTransformer.scala
index daa08498be..ca4665c0d0 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/execution/FilterExecTransformer.scala
+++ 
b/gluten-delta/src/main/scala/org/apache/gluten/execution/DeltaFilterExecTransformer.scala
@@ -16,31 +16,12 @@
  */
 package org.apache.gluten.execution
 
-import org.apache.spark.sql.catalyst.expressions.{And, Expression}
+import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.execution.SparkPlan
 
-case class FilterExecTransformer(condition: Expression, child: SparkPlan)
+case class DeltaFilterExecTransformer(condition: Expression, child: SparkPlan)
   extends FilterExecTransformerBase(condition, child) {
-  // FIXME: Should use field "condition" to store the actual executed filter 
expressions.
-  //  To make optimization easier (like to remove filter when it actually does 
nothing)
-  override protected def getRemainingCondition: Expression = {
-    val scanFilters = child match {
-      // Get the filters including the manually pushed down ones.
-      case basicScanExecTransformer: BasicScanExecTransformer =>
-        basicScanExecTransformer.filterExprs()
-      // For fallback scan, we need to keep original filter.
-      case _ =>
-        Seq.empty[Expression]
-    }
-    if (scanFilters.isEmpty) {
-      condition
-    } else {
-      val remainingFilters =
-        FilterHandler.getRemainingFilters(scanFilters, 
splitConjunctivePredicates(condition))
-      remainingFilters.reduceLeftOption(And).orNull
-    }
-  }
 
-  override protected def withNewChildInternal(newChild: SparkPlan): 
FilterExecTransformer =
+  override protected def withNewChildInternal(newChild: SparkPlan): 
DeltaFilterExecTransformer =
     copy(child = newChild)
 }
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/metrics/FilterMetricsUpdater.scala
 
b/gluten-delta/src/main/scala/org/apache/gluten/execution/DeltaProjectExecTransformer.scala
similarity index 50%
copy from 
backends-velox/src/main/scala/org/apache/gluten/metrics/FilterMetricsUpdater.scala
copy to 
gluten-delta/src/main/scala/org/apache/gluten/execution/DeltaProjectExecTransformer.scala
index c877fdbb07..9b720b19c5 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/metrics/FilterMetricsUpdater.scala
+++ 
b/gluten-delta/src/main/scala/org/apache/gluten/execution/DeltaProjectExecTransformer.scala
@@ -14,22 +14,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.gluten.metrics
+package org.apache.gluten.execution
 
-import org.apache.spark.sql.execution.metric.SQLMetric
+import org.apache.spark.sql.catalyst.expressions.NamedExpression
+import org.apache.spark.sql.execution.SparkPlan
 
-class FilterMetricsUpdater(val metrics: Map[String, SQLMetric]) extends 
MetricsUpdater {
+case class DeltaProjectExecTransformer(projectList: Seq[NamedExpression], 
child: SparkPlan)
+  extends ProjectExecTransformerBase(projectList, child) {
 
-  override def updateNativeMetrics(opMetrics: IOperatorMetrics): Unit = {
-    if (opMetrics != null) {
-      val operatorMetrics = opMetrics.asInstanceOf[OperatorMetrics]
-      metrics("numOutputRows") += operatorMetrics.outputRows
-      metrics("outputVectors") += operatorMetrics.outputVectors
-      metrics("outputBytes") += operatorMetrics.outputBytes
-      metrics("cpuCount") += operatorMetrics.cpuCount
-      metrics("wallNanos") += operatorMetrics.wallNanos
-      metrics("peakMemoryBytes") += operatorMetrics.peakMemoryBytes
-      metrics("numMemoryAllocations") += operatorMetrics.numMemoryAllocations
-    }
-  }
+  override protected def withNewChildInternal(newChild: SparkPlan): 
DeltaProjectExecTransformer =
+    copy(child = newChild)
 }
diff --git 
a/gluten-delta/src/main/scala/org/apache/gluten/extension/DeltaRewriteTransformerRules.scala
 
b/gluten-delta/src/main/scala/org/apache/gluten/extension/DeltaRewriteTransformerRules.scala
index 3dc1260321..011184724e 100644
--- 
a/gluten-delta/src/main/scala/org/apache/gluten/extension/DeltaRewriteTransformerRules.scala
+++ 
b/gluten-delta/src/main/scala/org/apache/gluten/extension/DeltaRewriteTransformerRules.scala
@@ -16,8 +16,8 @@
  */
 package org.apache.gluten.extension
 
-import org.apache.gluten.execution.{DeltaScanTransformer, 
ProjectExecTransformer}
-import 
org.apache.gluten.extension.DeltaRewriteTransformerRules.{columnMappingRule, 
pushDownInputFileExprRule}
+import org.apache.gluten.execution.{DeltaFilterExecTransformer, 
DeltaProjectExecTransformer, DeltaScanTransformer, ProjectExecTransformer}
+import 
org.apache.gluten.extension.DeltaRewriteTransformerRules.{columnMappingRule, 
filterRule, projectRule, pushDownInputFileExprRule}
 import org.apache.gluten.extension.columnar.RewriteTransformerRules
 
 import org.apache.spark.sql.SparkSession
@@ -25,13 +25,14 @@ import org.apache.spark.sql.catalyst.expressions.{Alias, 
Attribute, AttributeRef
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.trees.TreeNodeTag
 import org.apache.spark.sql.delta.{DeltaColumnMapping, DeltaParquetFileFormat, 
NoMapping}
-import org.apache.spark.sql.execution.{ProjectExec, SparkPlan}
+import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan}
 import org.apache.spark.sql.execution.datasources.FileFormat
 
 import scala.collection.mutable.ListBuffer
 
 class DeltaRewriteTransformerRules extends RewriteTransformerRules {
-  override def rules: Seq[Rule[SparkPlan]] = columnMappingRule :: 
pushDownInputFileExprRule :: Nil
+  override def rules: Seq[Rule[SparkPlan]] =
+    columnMappingRule :: filterRule :: projectRule :: 
pushDownInputFileExprRule :: Nil
 }
 
 object DeltaRewriteTransformerRules {
@@ -58,6 +59,18 @@ object DeltaRewriteTransformerRules {
         transformColumnMappingPlan(p)
     }
 
+  val filterRule: Rule[SparkPlan] = (plan: SparkPlan) =>
+    plan.transformUp {
+      case FilterExec(condition, child) if 
containsIncrementMetricExpr(condition) =>
+        DeltaFilterExecTransformer(condition, child)
+    }
+
+  val projectRule: Rule[SparkPlan] = (plan: SparkPlan) =>
+    plan.transformUp {
+      case ProjectExec(projectList, child) if 
projectList.exists(containsIncrementMetricExpr) =>
+        DeltaProjectExecTransformer(projectList, child)
+    }
+
   val pushDownInputFileExprRule: Rule[SparkPlan] = (plan: SparkPlan) =>
     plan.transformUp {
       case p @ ProjectExec(projectList, child: DeltaScanTransformer)
@@ -79,6 +92,13 @@ object DeltaRewriteTransformerRules {
     }
   }
 
+  private def containsIncrementMetricExpr(expr: Expression): Boolean = {
+    expr match {
+      case e if e.prettyName == "increment_metric" => true
+      case _ => expr.children.exists(containsIncrementMetricExpr)
+    }
+  }
+
   /**
    * This method is only used for Delta ColumnMapping FileFormat(e.g. 
nameMapping and idMapping)
    * transform the metadata of Delta into Parquet's, each plan should only be 
transformed once.
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/MetricsApi.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/MetricsApi.scala
index a96f27f5a8..62008767f5 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/MetricsApi.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/MetricsApi.scala
@@ -57,11 +57,15 @@ trait MetricsApi extends Serializable {
 
   def genFilterTransformerMetrics(sparkContext: SparkContext): Map[String, 
SQLMetric]
 
-  def genFilterTransformerMetricsUpdater(metrics: Map[String, SQLMetric]): 
MetricsUpdater
+  def genFilterTransformerMetricsUpdater(
+      metrics: Map[String, SQLMetric],
+      extraMetrics: Seq[(String, SQLMetric)] = Seq.empty): MetricsUpdater
 
   def genProjectTransformerMetrics(sparkContext: SparkContext): Map[String, 
SQLMetric]
 
-  def genProjectTransformerMetricsUpdater(metrics: Map[String, SQLMetric]): 
MetricsUpdater
+  def genProjectTransformerMetricsUpdater(
+      metrics: Map[String, SQLMetric],
+      extraMetrics: Seq[(String, SQLMetric)] = Seq.empty): MetricsUpdater
 
   def genHashAggregateTransformerMetrics(sparkContext: SparkContext): 
Map[String, SQLMetric]
 
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicPhysicalOperatorTransformer.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicPhysicalOperatorTransformer.scala
index 0801ffb27a..cc5c2325dc 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicPhysicalOperatorTransformer.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicPhysicalOperatorTransformer.scala
@@ -106,7 +106,25 @@ abstract class FilterExecTransformerBase(val cond: 
Expression, val input: SparkP
 
   override protected def outputExpressions: Seq[NamedExpression] = child.output
 
-  protected def getRemainingCondition: Expression
+  // FIXME: Should use field "condition" to store the actual executed filter 
expressions.
+  //  To make optimization easier (like to remove filter when it actually does 
nothing)
+  protected def getRemainingCondition: Expression = {
+    val scanFilters = child match {
+      // Get the filters including the manually pushed down ones.
+      case basicScanExecTransformer: BasicScanExecTransformer =>
+        basicScanExecTransformer.filterExprs()
+      // For fallback scan, we need to keep original filter.
+      case _ =>
+        Seq.empty[Expression]
+    }
+    if (scanFilters.isEmpty) {
+      cond
+    } else {
+      val remainingFilters =
+        FilterHandler.getRemainingFilters(scanFilters, 
splitConjunctivePredicates(cond))
+      remainingFilters.reduceLeftOption(And).orNull
+    }
+  }
 
   override protected def doValidateInternal(): ValidationResult = {
     val remainingCondition = getRemainingCondition
@@ -160,7 +178,7 @@ object FilterExecTransformerBase {
   }
 }
 
-case class ProjectExecTransformer private (projectList: Seq[NamedExpression], 
child: SparkPlan)
+abstract class ProjectExecTransformerBase(val list: Seq[NamedExpression], val 
input: SparkPlan)
   extends UnaryTransformSupport
   with OrderPreservingNodeShim
   with PartitioningPreservingNodeShim
@@ -176,7 +194,7 @@ case class ProjectExecTransformer private (projectList: 
Seq[NamedExpression], ch
     // Firstly, need to check if the Substrait plan for this operator can be 
successfully generated.
     val operatorId = substraitContext.nextOperatorId(this.nodeName)
     val relNode =
-      getRelNode(substraitContext, projectList, child.output, operatorId, 
null, validation = true)
+      getRelNode(substraitContext, list, child.output, operatorId, null, 
validation = true)
     // Then, validate the generated plan in native engine.
     doNativeValidation(substraitContext, relNode)
   }
@@ -192,7 +210,7 @@ case class ProjectExecTransformer private (projectList: 
Seq[NamedExpression], ch
   override def doTransform(context: SubstraitContext): TransformContext = {
     val childCtx = child.asInstanceOf[TransformSupport].transform(context)
     val operatorId = context.nextOperatorId(this.nodeName)
-    if ((projectList == null || projectList.isEmpty) && childCtx != null) {
+    if ((list == null || list.isEmpty) && childCtx != null) {
       // The computing for this project is not needed.
       // the child may be an input adapter and childCtx is null. In this case 
we want to
       // make a read node with non-empty base_schema.
@@ -201,16 +219,16 @@ case class ProjectExecTransformer private (projectList: 
Seq[NamedExpression], ch
     }
 
     val currRel =
-      getRelNode(context, projectList, child.output, operatorId, 
childCtx.root, validation = false)
+      getRelNode(context, list, child.output, operatorId, childCtx.root, 
validation = false)
     assert(currRel != null, "Project Rel should be valid")
     TransformContext(childCtx.outputAttributes, output, currRel)
   }
 
-  override def output: Seq[Attribute] = projectList.map(_.toAttribute)
+  override def output: Seq[Attribute] = list.map(_.toAttribute)
 
   override protected def orderingExpressions: Seq[SortOrder] = 
child.outputOrdering
 
-  override protected def outputExpressions: Seq[NamedExpression] = projectList
+  override protected def outputExpressions: Seq[NamedExpression] = list
 
   def getRelNode(
       context: SubstraitContext,
@@ -247,23 +265,10 @@ case class ProjectExecTransformer private (projectList: 
Seq[NamedExpression], ch
   override def verboseStringWithOperatorId(): String = {
     s"""
        |$formattedNodeName
-       |${ExplainUtils.generateFieldString("Output", projectList)}
+       |${ExplainUtils.generateFieldString("Output", list)}
        |${ExplainUtils.generateFieldString("Input", child.output)}
        |""".stripMargin
   }
-
-  override protected def withNewChildInternal(newChild: SparkPlan): 
ProjectExecTransformer =
-    copy(child = newChild)
-}
-object ProjectExecTransformer {
-  def apply(projectList: Seq[NamedExpression], child: SparkPlan): 
ProjectExecTransformer = {
-    
BackendsApiManager.getSparkPlanExecApiInstance.genProjectExecTransformer(projectList,
 child)
-  }
-
-  // Directly creating a project transformer may not be considered safe since 
some backends, E.g.,
-  // Clickhouse may require to intercept the instantiation procedure.
-  def createUnsafe(projectList: Seq[NamedExpression], child: SparkPlan): 
ProjectExecTransformer =
-    new ProjectExecTransformer(projectList, child)
 }
 
 // An alternatives for UnionExec.
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/ProjectExecTransformer.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/ProjectExecTransformer.scala
new file mode 100644
index 0000000000..bb8361d993
--- /dev/null
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/ProjectExecTransformer.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.gluten.backendsapi.BackendsApiManager
+
+import org.apache.spark.sql.catalyst.expressions.NamedExpression
+import org.apache.spark.sql.execution.SparkPlan
+
+case class ProjectExecTransformer(projectList: Seq[NamedExpression], child: 
SparkPlan)
+  extends ProjectExecTransformerBase(projectList, child) {
+
+  override protected def withNewChildInternal(newChild: SparkPlan): 
ProjectExecTransformer =
+    copy(child = newChild)
+}
+
+object ProjectExecTransformer {
+
+  def apply(projectList: Seq[NamedExpression], child: SparkPlan): 
ProjectExecTransformer = {
+    
BackendsApiManager.getSparkPlanExecApiInstance.genProjectExecTransformer(projectList,
 child)
+  }
+
+  // Directly creating a project transformer may not be considered safe since 
some backends, E.g.,
+  // Clickhouse may require to intercept the instantiation procedure.
+  def createUnsafe(projectList: Seq[NamedExpression], child: SparkPlan): 
ProjectExecTransformer =
+    new ProjectExecTransformer(projectList, child)
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to