This is an automated email from the ASF dual-hosted git repository.

chengchengjin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 9d4111ebee [CORE] Support batch scan customMetrics (#9450)
9d4111ebee is described below

commit 9d4111ebee8bb2169597224890fd6c958b2a78b0
Author: Zouxxyy <[email protected]>
AuthorDate: Wed May 7 19:05:33 2025 +0800

    [CORE] Support batch scan customMetrics (#9450)
---
 .../org/apache/gluten/execution/BatchScanExecTransformer.scala     | 7 ++++++-
 .../scala/org/apache/gluten/execution/WholeStageTransformer.scala  | 6 ++++++
 .../spark/sql/execution/datasources/v2/BatchScanExecShim.scala     | 2 ++
 .../spark/sql/execution/datasources/v2/BatchScanExecShim.scala     | 2 ++
 4 files changed, 16 insertions(+), 1 deletion(-)

diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala
index 2b1f90b726..19ab0b1c72 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala
@@ -95,7 +95,12 @@ abstract class BatchScanExecTransformerBase(
 
   // Note: "metrics" is made transient to avoid sending driver-side metrics to 
tasks.
   @transient override lazy val metrics: Map[String, SQLMetric] =
-    
BackendsApiManager.getMetricsApiInstance.genBatchScanTransformerMetrics(sparkContext)
+    BackendsApiManager.getMetricsApiInstance.genBatchScanTransformerMetrics(
+      sparkContext) ++ customMetrics
+
+  def doPostDriverMetrics(): Unit = {
+    postDriverMetrics()
+  }
 
   // Similar to the problem encountered in 
https://github.com/oap-project/gluten/pull/3184,
   // we cannot add member variables to BatchScanExecTransformerBase, which 
inherits from case
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
index bdf425f78d..2670655593 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
@@ -431,6 +431,12 @@ case class WholeStageTransformer(child: SparkPlan, 
materializeInput: Boolean = f
 
     SoftAffinity.updateFilePartitionLocations(allInputPartitions, rdd.id)
 
+    leafTransformers.foreach {
+      case batchScan: BatchScanExecTransformerBase =>
+        batchScan.doPostDriverMetrics()
+      case _ =>
+    }
+
     rdd
   }
 
diff --git 
a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
 
b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
index dcfb1c1447..760a584cf5 100644
--- 
a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
+++ 
b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
@@ -46,6 +46,8 @@ abstract class BatchScanExecShim(
 
   def hasUnsupportedColumns: Boolean = false
 
+  def postDriverMetrics(): Unit = {}
+
   override def doExecuteColumnar(): RDD[ColumnarBatch] = {
     throw new UnsupportedOperationException("Need to implement this method")
   }
diff --git 
a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
 
b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
index ec5c097d97..ae60134f88 100644
--- 
a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
+++ 
b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala
@@ -59,6 +59,8 @@ abstract class BatchScanExecShim(
       .exists(v => metadataColumnsNames.contains(v.name))
   }
 
+  def postDriverMetrics(): Unit = {}
+
   override def doExecuteColumnar(): RDD[ColumnarBatch] = {
     throw new UnsupportedOperationException("Need to implement this method")
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to