This is an automated email from the ASF dual-hosted git repository.

richox pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/auron.git


The following commit(s) were added to refs/heads/master by this push:
     new 9e9dc618 [AURON #1468] Introduce SparkMetricNode (#1469)
9e9dc618 is described below

commit 9e9dc618a5a2675aba32cf6b235d51dd53f3e5b5
Author: zhangmang <[email protected]>
AuthorDate: Mon Oct 20 17:26:28 2025 +0800

    [AURON #1468] Introduce SparkMetricNode (#1469)
    
    * [AURON #1468] Introduce SparkMetricNode
    
    * fix ci fail
    
    * fix ci
    
    * fix checkstyle
---
 .../src/main/java/org/apache/auron/metric/MetricNode.java |  3 ++-
 native-engine/auron-jni-bridge/src/jni_bridge.rs          |  6 +++---
 .../main/scala/org/apache/spark/sql/auron/ShimsImpl.scala |  7 ++++---
 .../metric/SparkMetricNode.scala}                         | 15 +++++++++------
 .../apache/spark/sql/auron/AuronCallNativeWrapper.scala   |  3 ++-
 .../org/apache/spark/sql/auron/AuronConverters.scala      |  3 ++-
 .../scala/org/apache/spark/sql/auron/NativeHelper.scala   |  3 ++-
 .../main/scala/org/apache/spark/sql/auron/NativeRDD.scala |  5 +++--
 .../sql/execution/auron/plan/ConvertToNativeBase.scala    |  4 ++--
 .../spark/sql/execution/auron/plan/NativeAggBase.scala    |  4 ++--
 .../auron/plan/NativeBroadcastExchangeBase.scala          |  8 ++++----
 .../execution/auron/plan/NativeBroadcastJoinBase.scala    |  4 ++--
 .../spark/sql/execution/auron/plan/NativeExpandBase.scala |  4 ++--
 .../spark/sql/execution/auron/plan/NativeFilterBase.scala |  4 ++--
 .../sql/execution/auron/plan/NativeGenerateBase.scala     |  4 ++--
 .../sql/execution/auron/plan/NativeGlobalLimitBase.scala  |  4 ++--
 .../sql/execution/auron/plan/NativeLocalLimitBase.scala   |  4 ++--
 .../sql/execution/auron/plan/NativeOrcScanBase.scala      |  5 +++--
 .../sql/execution/auron/plan/NativeParquetScanBase.scala  |  5 +++--
 .../sql/execution/auron/plan/NativeParquetSinkBase.scala  |  4 ++--
 .../sql/execution/auron/plan/NativeProjectBase.scala      |  4 ++--
 .../execution/auron/plan/NativeRenameColumnsBase.scala    |  4 ++--
 .../execution/auron/plan/NativeShuffleExchangeBase.scala  |  6 +++---
 .../execution/auron/plan/NativeShuffledHashJoinBase.scala |  4 ++--
 .../spark/sql/execution/auron/plan/NativeSortBase.scala   |  4 ++--
 .../execution/auron/plan/NativeSortMergeJoinBase.scala    |  4 ++--
 .../sql/execution/auron/plan/NativeTakeOrderedBase.scala  |  6 +++---
 .../spark/sql/execution/auron/plan/NativeUnionBase.scala  |  5 +++--
 .../spark/sql/execution/auron/plan/NativeWindowBase.scala |  4 ++--
 .../execution/auron/plan/NativePaimonTableScanExec.scala  |  4 ++--
 30 files changed, 78 insertions(+), 66 deletions(-)

diff --git a/auron-core/src/main/java/org/apache/auron/metric/MetricNode.java 
b/auron-core/src/main/java/org/apache/auron/metric/MetricNode.java
index 06e517fb..47edc04d 100644
--- a/auron-core/src/main/java/org/apache/auron/metric/MetricNode.java
+++ b/auron-core/src/main/java/org/apache/auron/metric/MetricNode.java
@@ -16,6 +16,7 @@
  */
 package org.apache.auron.metric;
 
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -23,7 +24,7 @@ import java.util.List;
  * Abstract class representing a metric node in the Auron system.
  * This class provides functionality for hierarchical metrics tracking.
  */
-public abstract class MetricNode {
+public abstract class MetricNode implements Serializable {
     private final List<MetricNode> children = new ArrayList<>();
 
     public MetricNode(List<MetricNode> children) {
diff --git a/native-engine/auron-jni-bridge/src/jni_bridge.rs 
b/native-engine/auron-jni-bridge/src/jni_bridge.rs
index ee178a49..85cb4d0c 100644
--- a/native-engine/auron-jni-bridge/src/jni_bridge.rs
+++ b/native-engine/auron-jni-bridge/src/jni_bridge.rs
@@ -1111,7 +1111,7 @@ pub struct SparkMetricNode<'a> {
     pub method_add_ret: ReturnType,
 }
 impl<'a> SparkMetricNode<'a> {
-    pub const SIG_TYPE: &'static str = "org/apache/spark/sql/auron/MetricNode";
+    pub const SIG_TYPE: &'static str = 
"org/apache/auron/metric/SparkMetricNode";
 
     pub fn new(env: &JNIEnv<'a>) -> JniResult<SparkMetricNode<'a>> {
         let class = get_global_jclass(env, Self::SIG_TYPE)?;
@@ -1120,7 +1120,7 @@ impl<'a> SparkMetricNode<'a> {
             method_getChild: env.get_method_id(
                 class,
                 "getChild",
-                "(I)Lorg/apache/spark/sql/auron/MetricNode;",
+                "(I)Lorg/apache/auron/metric/MetricNode;",
             )?,
             method_getChild_ret: ReturnType::Object,
             method_add: env.get_method_id(class, "add", 
"(Ljava/lang/String;J)V")?,
@@ -1435,7 +1435,7 @@ impl<'a> AuronCallNativeWrapper<'a> {
             method_getMetrics: env.get_method_id(
                 class,
                 "getMetrics",
-                "()Lorg/apache/spark/sql/auron/MetricNode;",
+                "()Lorg/apache/auron/metric/MetricNode;",
             )?,
             method_getMetrics_ret: ReturnType::Object,
             method_importSchema: env.get_method_id(class, "importSchema", 
"(J)V")?,
diff --git 
a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/auron/ShimsImpl.scala
 
b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/auron/ShimsImpl.scala
index 1e2533f1..68cf3698 100644
--- 
a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/auron/ShimsImpl.scala
+++ 
b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/auron/ShimsImpl.scala
@@ -111,6 +111,7 @@ import org.apache.spark.storage.FileSegment
 
 import org.apache.auron.{protobuf => pb, sparkver}
 import org.apache.auron.common.AuronBuildInfo
+import org.apache.auron.metric.SparkMetricNode
 import org.apache.auron.spark.ui.AuronBuildInfoEvent
 
 class ShimsImpl extends Shims with Logging {
@@ -616,7 +617,7 @@ class ShimsImpl extends Shims with Logging {
 
         val requiredMetrics = nativeShuffle.readMetrics ++
           nativeShuffle.metrics.filterKeys(_ == "shuffle_read_total_time")
-        val metrics = MetricNode(
+        val metrics = SparkMetricNode(
           requiredMetrics,
           inputRDD.metrics :: Nil,
           Some({
@@ -719,7 +720,7 @@ class ShimsImpl extends Shims with Logging {
 
         val requiredMetrics = nativeShuffle.readMetrics ++
           nativeShuffle.metrics.filterKeys(_ == "shuffle_read_total_time")
-        val metrics = MetricNode(
+        val metrics = SparkMetricNode(
           requiredMetrics,
           inputRDD.metrics :: Nil,
           Some({
@@ -812,7 +813,7 @@ class ShimsImpl extends Shims with Logging {
 
         val requiredMetrics = nativeShuffle.readMetrics ++
           nativeShuffle.metrics.filterKeys(_ == "shuffle_read_total_time")
-        val metrics = MetricNode(
+        val metrics = SparkMetricNode(
           requiredMetrics,
           inputRDD.metrics :: Nil,
           Some({
diff --git 
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/MetricNode.scala 
b/spark-extension/src/main/scala/org/apache/auron/metric/SparkMetricNode.scala
similarity index 79%
rename from 
spark-extension/src/main/scala/org/apache/spark/sql/auron/MetricNode.scala
rename to 
spark-extension/src/main/scala/org/apache/auron/metric/SparkMetricNode.scala
index 4198bff3..8c3925ae 100644
--- a/spark-extension/src/main/scala/org/apache/spark/sql/auron/MetricNode.scala
+++ 
b/spark-extension/src/main/scala/org/apache/auron/metric/SparkMetricNode.scala
@@ -14,18 +14,21 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.spark.sql.auron
+package org.apache.auron.metric
+
+import scala.collection.JavaConverters._
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.execution.metric.SQLMetric
 
-case class MetricNode(
+case class SparkMetricNode(
     metrics: Map[String, SQLMetric],
     children: Seq[MetricNode],
     metricValueHandler: Option[(String, Long) => Unit] = None)
-    extends Logging {
+    extends MetricNode(children.asJava)
+    with Logging {
 
-  def getChild(i: Int): MetricNode = {
+  override def getChild(i: Int): MetricNode = {
     if (i < children.length) {
       children(i)
     } else {
@@ -40,8 +43,8 @@ case class MetricNode(
     }
   }
 
-  def foreach(fn: MetricNode => Unit): Unit = {
+  def foreach(fn: SparkMetricNode => Unit): Unit = {
     fn(this)
-    this.children.foreach(_.foreach(fn))
+    this.children.foreach(_.asInstanceOf[SparkMetricNode].foreach(fn))
   }
 }
diff --git 
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronCallNativeWrapper.scala
 
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronCallNativeWrapper.scala
index c827db32..5cf2131b 100644
--- 
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronCallNativeWrapper.scala
+++ 
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronCallNativeWrapper.scala
@@ -44,6 +44,7 @@ import org.apache.spark.util.CompletionIterator
 import org.apache.spark.util.ShutdownHookManager
 import org.apache.spark.util.Utils
 
+import org.apache.auron.metric.{MetricNode, SparkMetricNode}
 import org.apache.auron.protobuf.PartitionId
 import org.apache.auron.protobuf.PhysicalPlanNode
 import org.apache.auron.protobuf.TaskDefinition
@@ -52,7 +53,7 @@ case class AuronCallNativeWrapper(
     nativePlan: PhysicalPlanNode,
     partition: Partition,
     context: Option[TaskContext],
-    metrics: MetricNode)
+    metrics: SparkMetricNode)
     extends Logging {
 
   AuronCallNativeWrapper.initNative()
diff --git 
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala
 
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala
index 556ac7f3..db3b710b 100644
--- 
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala
+++ 
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala
@@ -86,6 +86,7 @@ import 
org.apache.spark.sql.hive.execution.auron.plan.NativeHiveTableScanBase
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.LongType
 
+import org.apache.auron.metric.SparkMetricNode
 import org.apache.auron.protobuf.EmptyPartitionsExecNode
 import org.apache.auron.protobuf.PhysicalPlanNode
 import org.apache.auron.sparkver
@@ -1089,7 +1090,7 @@ object AuronConverters extends Logging {
 
         new NativeRDD(
           sparkContext,
-          MetricNode(Map(), Nil),
+          SparkMetricNode(Map(), Nil),
           rddPartitions = partitions,
           rddPartitioner = None,
           rddDependencies = Nil,
diff --git 
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeHelper.scala 
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeHelper.scala
index 64a37f6d..eabe7ff6 100644
--- 
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeHelper.scala
+++ 
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeHelper.scala
@@ -31,6 +31,7 @@ import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.execution.metric.SQLMetric
 import org.apache.spark.sql.execution.metric.SQLMetrics
 
+import org.apache.auron.metric.SparkMetricNode
 import org.apache.auron.protobuf.PhysicalPlanNode
 
 object NativeHelper extends Logging {
@@ -78,7 +79,7 @@ object NativeHelper extends Logging {
 
   def executeNativePlan(
       nativePlan: PhysicalPlanNode,
-      metrics: MetricNode,
+      metrics: SparkMetricNode,
       partition: Partition,
       context: Option[TaskContext]): Iterator[InternalRow] = {
 
diff --git 
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeRDD.scala 
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeRDD.scala
index 5da12a44..4000035e 100644
--- a/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeRDD.scala
+++ b/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeRDD.scala
@@ -29,11 +29,12 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 
+import org.apache.auron.metric.SparkMetricNode
 import org.apache.auron.protobuf.PhysicalPlanNode
 
 class NativeRDD(
     @transient private val rddSparkContext: SparkContext,
-    val metrics: MetricNode,
+    val metrics: SparkMetricNode,
     private val rddPartitions: Array[Partition],
     private val rddPartitioner: Option[Partitioner],
     private val rddDependencies: Seq[Dependency[_]],
@@ -71,7 +72,7 @@ class NativeRDD(
 class EmptyNativeRDD(@transient private val rddSparkContext: SparkContext)
     extends NativeRDD(
       rddSparkContext = rddSparkContext,
-      metrics = MetricNode(Map.empty, Seq(), None),
+      metrics = SparkMetricNode(Map.empty, Seq(), None),
       rddPartitions = Array.empty,
       rddPartitioner = None,
       rddDependencies = Seq.empty,
diff --git 
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/ConvertToNativeBase.scala
 
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/ConvertToNativeBase.scala
index 1e741160..6ea291a2 100644
--- 
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/ConvertToNativeBase.scala
+++ 
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/ConvertToNativeBase.scala
@@ -22,7 +22,6 @@ import scala.collection.immutable.SortedMap
 
 import org.apache.spark.OneToOneDependency
 import org.apache.spark.sql.auron.JniBridge
-import org.apache.spark.sql.auron.MetricNode
 import org.apache.spark.sql.auron.NativeConverters
 import org.apache.spark.sql.auron.NativeHelper
 import org.apache.spark.sql.auron.NativeRDD
@@ -38,6 +37,7 @@ import org.apache.spark.sql.execution.metric.SQLMetric
 import org.apache.spark.sql.execution.metric.SQLMetrics
 import org.apache.spark.sql.types.StructType
 
+import org.apache.auron.metric.SparkMetricNode
 import org.apache.auron.protobuf.FFIReaderExecNode
 import org.apache.auron.protobuf.PhysicalPlanNode
 import org.apache.auron.protobuf.Schema
@@ -63,7 +63,7 @@ abstract class ConvertToNativeBase(override val child: 
SparkPlan)
   override def doExecuteNative(): NativeRDD = {
     val inputRDD = child.execute()
     val numInputPartitions = inputRDD.getNumPartitions
-    val nativeMetrics = MetricNode(metrics, Nil)
+    val nativeMetrics = SparkMetricNode(metrics, Nil)
 
     new NativeRDD(
       sparkContext,
diff --git 
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeAggBase.scala
 
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeAggBase.scala
index c5c382ac..e470a0f4 100644
--- 
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeAggBase.scala
+++ 
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeAggBase.scala
@@ -23,7 +23,6 @@ import scala.collection.immutable.SortedMap
 import org.apache.spark.OneToOneDependency
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.auron.AuronConf
-import org.apache.spark.sql.auron.MetricNode
 import org.apache.spark.sql.auron.NativeConverters
 import org.apache.spark.sql.auron.NativeHelper
 import org.apache.spark.sql.auron.NativeRDD
@@ -58,6 +57,7 @@ import org.apache.spark.sql.types.BinaryType
 import org.apache.spark.sql.types.DataType
 
 import org.apache.auron.{protobuf => pb}
+import org.apache.auron.metric.SparkMetricNode
 
 abstract class NativeAggBase(
     execMode: AggExecMode,
@@ -162,7 +162,7 @@ abstract class NativeAggBase(
 
   override def doExecuteNative(): NativeRDD = {
     val inputRDD = NativeHelper.executeNative(child)
-    val nativeMetrics = MetricNode(metrics, inputRDD.metrics :: Nil)
+    val nativeMetrics = SparkMetricNode(metrics, inputRDD.metrics :: Nil)
     val nativeExecMode = this.nativeExecMode
     val nativeAggrNames = this.nativeAggrNames
     val nativeGroupingNames = this.nativeGroupingNames
diff --git 
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeBroadcastExchangeBase.scala
 
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeBroadcastExchangeBase.scala
index 7dafd564..b727a929 100644
--- 
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeBroadcastExchangeBase.scala
+++ 
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeBroadcastExchangeBase.scala
@@ -37,7 +37,6 @@ import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.auron.JniBridge
-import org.apache.spark.sql.auron.MetricNode
 import org.apache.spark.sql.auron.NativeConverters
 import org.apache.spark.sql.auron.NativeHelper
 import org.apache.spark.sql.auron.NativeRDD
@@ -66,6 +65,7 @@ import org.apache.spark.sql.execution.metric.SQLMetrics
 import org.apache.spark.sql.types.BinaryType
 
 import org.apache.auron.{protobuf => pb}
+import org.apache.auron.metric.SparkMetricNode
 
 abstract class NativeBroadcastExchangeBase(mode: BroadcastMode, override val 
child: SparkPlan)
     extends BroadcastExchangeLike
@@ -110,7 +110,7 @@ abstract class NativeBroadcastExchangeBase(mode: 
BroadcastMode, override val chi
     val broadcastReadNativePlan = 
doExecuteNative().nativePlan(singlePartition, null)
     val rowsIter = NativeHelper.executeNativePlan(
       broadcastReadNativePlan,
-      MetricNode(Map(), Nil, None),
+      SparkMetricNode(Map(), Nil, None),
       singlePartition,
       None)
     val pruneKeyField = new InterpretedUnsafeProjection(
@@ -155,7 +155,7 @@ abstract class NativeBroadcastExchangeBase(mode: 
BroadcastMode, override val chi
 
   override def doExecuteNative(): NativeRDD = {
     val broadcast = doExecuteBroadcastNative[Array[Array[Byte]]]()
-    val nativeMetrics = MetricNode(metrics, Nil)
+    val nativeMetrics = SparkMetricNode(metrics, Nil)
     val partitions = Array(new Partition() {
       override def index: Int = 0
     })
@@ -199,7 +199,7 @@ abstract class NativeBroadcastExchangeBase(mode: 
BroadcastMode, override val chi
       case child => Shims.get.createConvertToNativeExec(child)
     })
     val modifiedMetrics = metrics ++ Map("output_rows" -> 
metrics("numOutputRows"))
-    val nativeMetrics = MetricNode(modifiedMetrics, inputRDD.metrics :: Nil)
+    val nativeMetrics = SparkMetricNode(modifiedMetrics, inputRDD.metrics :: 
Nil)
 
     val ipcRDD =
       new RDD[Array[Byte]](sparkContext, new OneToOneDependency(inputRDD) :: 
Nil) {
diff --git 
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeBroadcastJoinBase.scala
 
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeBroadcastJoinBase.scala
index e41e5676..dabeba3f 100644
--- 
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeBroadcastJoinBase.scala
+++ 
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeBroadcastJoinBase.scala
@@ -21,7 +21,6 @@ import scala.collection.immutable.SortedMap
 
 import org.apache.spark.OneToOneDependency
 import org.apache.spark.Partition
-import org.apache.spark.sql.auron.MetricNode
 import org.apache.spark.sql.auron.NativeConverters
 import org.apache.spark.sql.auron.NativeHelper
 import org.apache.spark.sql.auron.NativeRDD
@@ -43,6 +42,7 @@ import org.apache.spark.sql.execution.metric.SQLMetric
 import org.apache.spark.sql.types.LongType
 
 import org.apache.auron.{protobuf => pb}
+import org.apache.auron.metric.SparkMetricNode
 import org.apache.auron.protobuf.JoinOn
 
 abstract class NativeBroadcastJoinBase(
@@ -126,7 +126,7 @@ abstract class NativeBroadcastJoinBase(
   override def doExecuteNative(): NativeRDD = {
     val leftRDD = NativeHelper.executeNative(left)
     val rightRDD = NativeHelper.executeNative(right)
-    val nativeMetrics = MetricNode(metrics, leftRDD.metrics :: 
rightRDD.metrics :: Nil)
+    val nativeMetrics = SparkMetricNode(metrics, leftRDD.metrics :: 
rightRDD.metrics :: Nil)
     val nativeSchema = this.nativeSchema
     val nativeJoinType = this.nativeJoinType
     val nativeJoinOn = this.nativeJoinOn
diff --git 
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeExpandBase.scala
 
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeExpandBase.scala
index 9b7a3b66..217ed140 100644
--- 
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeExpandBase.scala
+++ 
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeExpandBase.scala
@@ -20,7 +20,6 @@ import scala.collection.JavaConverters._
 import scala.collection.immutable.SortedMap
 
 import org.apache.spark.OneToOneDependency
-import org.apache.spark.sql.auron.MetricNode
 import org.apache.spark.sql.auron.NativeConverters
 import org.apache.spark.sql.auron.NativeHelper
 import org.apache.spark.sql.auron.NativeRDD
@@ -32,6 +31,7 @@ import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.execution.UnaryExecNode
 import org.apache.spark.sql.execution.metric.SQLMetric
 
+import org.apache.auron.metric.SparkMetricNode
 import org.apache.auron.protobuf.ExpandExecNode
 import org.apache.auron.protobuf.ExpandProjection
 import org.apache.auron.protobuf.PhysicalPlanNode
@@ -72,7 +72,7 @@ abstract class NativeExpandBase(
 
   override def doExecuteNative(): NativeRDD = {
     val inputRDD = NativeHelper.executeNative(child)
-    val nativeMetrics = MetricNode(metrics, inputRDD.metrics :: Nil)
+    val nativeMetrics = SparkMetricNode(metrics, inputRDD.metrics :: Nil)
     val nativeSchema = this.nativeSchema
     val nativeProjections = this.nativeProjections
 
diff --git 
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeFilterBase.scala
 
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeFilterBase.scala
index a18b3d58..56d46c46 100644
--- 
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeFilterBase.scala
+++ 
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeFilterBase.scala
@@ -21,7 +21,6 @@ import scala.collection.immutable.SortedMap
 import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.OneToOneDependency
-import org.apache.spark.sql.auron.MetricNode
 import org.apache.spark.sql.auron.NativeConverters
 import org.apache.spark.sql.auron.NativeHelper
 import org.apache.spark.sql.auron.NativeRDD
@@ -38,6 +37,7 @@ import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.execution.UnaryExecNode
 import org.apache.spark.sql.execution.metric.SQLMetric
 
+import org.apache.auron.metric.SparkMetricNode
 import org.apache.auron.protobuf.FilterExecNode
 import org.apache.auron.protobuf.PhysicalExprNode
 import org.apache.auron.protobuf.PhysicalPlanNode
@@ -91,7 +91,7 @@ abstract class NativeFilterBase(condition: Expression, 
override val child: Spark
 
   override def doExecuteNative(): NativeRDD = {
     val inputRDD = NativeHelper.executeNative(child)
-    val nativeMetrics = MetricNode(metrics, inputRDD.metrics :: Nil)
+    val nativeMetrics = SparkMetricNode(metrics, inputRDD.metrics :: Nil)
     val nativeFilterExprs = this.nativeFilterExprs
     new NativeRDD(
       sparkContext,
diff --git 
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeGenerateBase.scala
 
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeGenerateBase.scala
index fbdcaa4e..76e500aa 100644
--- 
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeGenerateBase.scala
+++ 
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeGenerateBase.scala
@@ -22,7 +22,6 @@ import scala.collection.mutable.ArrayBuffer
 
 import com.google.protobuf.ByteString
 import org.apache.spark.OneToOneDependency
-import org.apache.spark.sql.auron.MetricNode
 import org.apache.spark.sql.auron.NativeConverters
 import org.apache.spark.sql.auron.NativeHelper
 import org.apache.spark.sql.auron.NativeRDD
@@ -44,6 +43,7 @@ import org.apache.spark.sql.types.StructField
 import org.apache.spark.sql.types.StructType
 
 import org.apache.auron.{protobuf => pb}
+import org.apache.auron.metric.SparkMetricNode
 import org.apache.auron.protobuf.PhysicalPlanNode
 
 abstract class NativeGenerateBase(
@@ -130,7 +130,7 @@ abstract class NativeGenerateBase(
 
   override def doExecuteNative(): NativeRDD = {
     val inputRDD = NativeHelper.executeNative(child)
-    val nativeMetrics = MetricNode(metrics, inputRDD.metrics :: Nil)
+    val nativeMetrics = SparkMetricNode(metrics, inputRDD.metrics :: Nil)
     val nativeGenerator = this.nativeGenerator
     val nativeGeneratorOutput = this.nativeGeneratorOutput
     val nativeRequiredChildOutput = this.nativeRequiredChildOutput
diff --git 
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeGlobalLimitBase.scala
 
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeGlobalLimitBase.scala
index 1d48005a..83c1f7d8 100644
--- 
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeGlobalLimitBase.scala
+++ 
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeGlobalLimitBase.scala
@@ -19,7 +19,6 @@ package org.apache.spark.sql.execution.auron.plan
 import scala.collection.immutable.SortedMap
 
 import org.apache.spark.OneToOneDependency
-import org.apache.spark.sql.auron.MetricNode
 import org.apache.spark.sql.auron.NativeHelper
 import org.apache.spark.sql.auron.NativeRDD
 import org.apache.spark.sql.auron.NativeSupports
@@ -32,6 +31,7 @@ import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.execution.UnaryExecNode
 import org.apache.spark.sql.execution.metric.SQLMetric
 
+import org.apache.auron.metric.SparkMetricNode
 import org.apache.auron.protobuf.LimitExecNode
 import org.apache.auron.protobuf.PhysicalPlanNode
 
@@ -52,7 +52,7 @@ abstract class NativeGlobalLimitBase(limit: Long, override 
val child: SparkPlan)
 
   override def doExecuteNative(): NativeRDD = {
     val inputRDD = NativeHelper.executeNative(child)
-    val nativeMetrics = MetricNode(metrics, inputRDD.metrics :: Nil)
+    val nativeMetrics = SparkMetricNode(metrics, inputRDD.metrics :: Nil)
 
     new NativeRDD(
       sparkContext,
diff --git 
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeLocalLimitBase.scala
 
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeLocalLimitBase.scala
index 0d1b0054..f0ba9e3b 100644
--- 
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeLocalLimitBase.scala
+++ 
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeLocalLimitBase.scala
@@ -19,7 +19,6 @@ package org.apache.spark.sql.execution.auron.plan
 import scala.collection.immutable.SortedMap
 
 import org.apache.spark.OneToOneDependency
-import org.apache.spark.sql.auron.MetricNode
 import org.apache.spark.sql.auron.NativeHelper
 import org.apache.spark.sql.auron.NativeRDD
 import org.apache.spark.sql.auron.NativeSupports
@@ -30,6 +29,7 @@ import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.execution.UnaryExecNode
 import org.apache.spark.sql.execution.metric.SQLMetric
 
+import org.apache.auron.metric.SparkMetricNode
 import org.apache.auron.protobuf.LimitExecNode
 import org.apache.auron.protobuf.PhysicalPlanNode
 
@@ -49,7 +49,7 @@ abstract class NativeLocalLimitBase(limit: Long, override val 
child: SparkPlan)
 
   override def doExecuteNative(): NativeRDD = {
     val inputRDD = NativeHelper.executeNative(child)
-    val nativeMetrics = MetricNode(metrics, inputRDD.metrics :: Nil)
+    val nativeMetrics = SparkMetricNode(metrics, inputRDD.metrics :: Nil)
 
     new NativeRDD(
       sparkContext,
diff --git 
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeOrcScanBase.scala
 
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeOrcScanBase.scala
index b1f5e8c4..36132e44 100644
--- 
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeOrcScanBase.scala
+++ 
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeOrcScanBase.scala
@@ -22,11 +22,12 @@ import scala.collection.JavaConverters._
 
 import org.apache.spark.Partition
 import org.apache.spark.TaskContext
-import org.apache.spark.sql.auron.{EmptyNativeRDD, MetricNode, NativeRDD}
+import org.apache.spark.sql.auron.{EmptyNativeRDD, NativeRDD}
 import org.apache.spark.sql.execution.FileSourceScanExec
 import org.apache.spark.sql.execution.datasources.FilePartition
 
 import org.apache.auron.{protobuf => pb}
+import org.apache.auron.metric.SparkMetricNode
 
 abstract class NativeOrcScanBase(basedFileScan: FileSourceScanExec)
     extends NativeFileSourceScanBase(basedFileScan) {
@@ -34,7 +35,7 @@ abstract class NativeOrcScanBase(basedFileScan: 
FileSourceScanExec)
   override def doExecuteNative(): NativeRDD = {
     val partitions = inputFileScanRDD.filePartitions.toArray
     if (partitions.length > 0) {
-      val nativeMetrics = MetricNode(
+      val nativeMetrics = SparkMetricNode(
         metrics,
         Nil,
         Some({
diff --git 
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetScanBase.scala
 
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetScanBase.scala
index f4129d0e..591e31a1 100644
--- 
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetScanBase.scala
+++ 
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetScanBase.scala
@@ -22,11 +22,12 @@ import scala.collection.JavaConverters._
 
 import org.apache.spark.Partition
 import org.apache.spark.TaskContext
-import org.apache.spark.sql.auron.{EmptyNativeRDD, MetricNode, NativeRDD}
+import org.apache.spark.sql.auron.{EmptyNativeRDD, NativeRDD}
 import org.apache.spark.sql.execution.FileSourceScanExec
 import org.apache.spark.sql.execution.datasources.FilePartition
 
 import org.apache.auron.{protobuf => pb}
+import org.apache.auron.metric.SparkMetricNode
 
 abstract class NativeParquetScanBase(basedFileScan: FileSourceScanExec)
     extends NativeFileSourceScanBase(basedFileScan) {
@@ -34,7 +35,7 @@ abstract class NativeParquetScanBase(basedFileScan: 
FileSourceScanExec)
   override def doExecuteNative(): NativeRDD = {
     val partitions = inputFileScanRDD.filePartitions.toArray
     if (partitions.length > 0) {
-      val nativeMetrics = MetricNode(
+      val nativeMetrics = SparkMetricNode(
         metrics,
         Nil,
         Some({
diff --git 
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetSinkBase.scala
 
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetSinkBase.scala
index aa1b8dbf..c2bc49ca 100644
--- 
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetSinkBase.scala
+++ 
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetSinkBase.scala
@@ -38,7 +38,6 @@ import org.apache.hadoop.mapreduce.Job
 import org.apache.spark.OneToOneDependency
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.auron.JniBridge
-import org.apache.spark.sql.auron.MetricNode
 import org.apache.spark.sql.auron.NativeHelper
 import org.apache.spark.sql.auron.NativeRDD
 import org.apache.spark.sql.auron.NativeSupports
@@ -53,6 +52,7 @@ import org.apache.spark.sql.execution.metric.SQLMetric
 import org.apache.spark.sql.hive.auron.HiveClientHelper
 import org.apache.spark.util.SerializableConfiguration
 
+import org.apache.auron.metric.SparkMetricNode
 import org.apache.auron.protobuf.ParquetProp
 import org.apache.auron.protobuf.ParquetSinkExecNode
 import org.apache.auron.protobuf.PhysicalPlanNode
@@ -88,7 +88,7 @@ abstract class NativeParquetSinkBase(
     val numDynParts = partition.count(_._2.isEmpty)
 
     val inputRDD = NativeHelper.executeNative(child)
-    val nativeMetrics = MetricNode(metrics, inputRDD.metrics :: Nil)
+    val nativeMetrics = SparkMetricNode(metrics, inputRDD.metrics :: Nil)
     val nativeDependencies = new OneToOneDependency(inputRDD) :: Nil
     new NativeRDD(
       sparkSession.sparkContext,
diff --git 
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeProjectBase.scala
 
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeProjectBase.scala
index 5edf60a2..4023ae68 100644
--- 
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeProjectBase.scala
+++ 
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeProjectBase.scala
@@ -21,7 +21,6 @@ import scala.collection.immutable.SortedMap
 import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.OneToOneDependency
-import org.apache.spark.sql.auron.MetricNode
 import org.apache.spark.sql.auron.NativeConverters
 import org.apache.spark.sql.auron.NativeHelper
 import org.apache.spark.sql.auron.NativeRDD
@@ -37,6 +36,7 @@ import org.apache.spark.sql.execution.UnaryExecNode
 import 
org.apache.spark.sql.execution.auron.plan.NativeProjectBase.getNativeProjectBuilder
 import org.apache.spark.sql.execution.metric.SQLMetric
 
+import org.apache.auron.metric.SparkMetricNode
 import org.apache.auron.protobuf.ArrowType
 import org.apache.auron.protobuf.PhysicalExprNode
 import org.apache.auron.protobuf.PhysicalPlanNode
@@ -70,7 +70,7 @@ abstract class NativeProjectBase(projectList: 
Seq[NamedExpression], override val
 
   override def doExecuteNative(): NativeRDD = {
     val inputRDD = NativeHelper.executeNative(child)
-    val nativeMetrics = MetricNode(metrics, inputRDD.metrics :: Nil)
+    val nativeMetrics = SparkMetricNode(metrics, inputRDD.metrics :: Nil)
     val nativeProject = this.nativeProject
 
     new NativeRDD(
diff --git 
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeRenameColumnsBase.scala
 
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeRenameColumnsBase.scala
index 487cb199..f8bf6cb9 100644
--- 
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeRenameColumnsBase.scala
+++ 
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeRenameColumnsBase.scala
@@ -19,7 +19,6 @@ package org.apache.spark.sql.execution.auron.plan
 import scala.collection.JavaConverters._
 
 import org.apache.spark.OneToOneDependency
-import org.apache.spark.sql.auron.MetricNode
 import org.apache.spark.sql.auron.NativeHelper
 import org.apache.spark.sql.auron.NativeRDD
 import org.apache.spark.sql.auron.NativeSupports
@@ -31,6 +30,7 @@ import org.apache.spark.sql.execution.UnaryExecNode
 import 
org.apache.spark.sql.execution.auron.plan.NativeRenameColumnsBase.buildRenameColumnsExec
 import org.apache.spark.sql.execution.metric.SQLMetric
 
+import org.apache.auron.metric.SparkMetricNode
 import org.apache.auron.protobuf.PhysicalPlanNode
 import org.apache.auron.protobuf.RenameColumnsExecNode
 
@@ -53,7 +53,7 @@ abstract class NativeRenameColumnsBase(
 
   override def doExecuteNative(): NativeRDD = {
     val inputRDD = NativeHelper.executeNative(child)
-    val nativeMetrics = MetricNode(metrics, inputRDD.metrics :: Nil)
+    val nativeMetrics = SparkMetricNode(metrics, inputRDD.metrics :: Nil)
 
     new NativeRDD(
       sparkContext,
diff --git 
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeShuffleExchangeBase.scala
 
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeShuffleExchangeBase.scala
index 789c7310..47457a41 100644
--- 
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeShuffleExchangeBase.scala
+++ 
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeShuffleExchangeBase.scala
@@ -29,7 +29,6 @@ import org.apache.spark.rdd.{PartitionPruningRDD, RDD}
 import org.apache.spark.serializer.Serializer
 import org.apache.spark.shuffle.ShuffleWriteProcessor
 import org.apache.spark.sql.auron.JniBridge
-import org.apache.spark.sql.auron.MetricNode
 import org.apache.spark.sql.auron.NativeConverters
 import org.apache.spark.sql.auron.NativeHelper
 import org.apache.spark.sql.auron.NativeRDD
@@ -50,6 +49,7 @@ import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.ArrayType
 import org.apache.spark.util.{CompletionIterator, MutablePair}
 
+import org.apache.auron.metric.SparkMetricNode
 import org.apache.auron.protobuf.{IpcReaderExecNode, PhysicalExprNode, 
PhysicalHashRepartition, PhysicalPlanNode, PhysicalRangeRepartition, 
PhysicalRepartition, PhysicalRoundRobinRepartition, PhysicalSingleRepartition, 
PhysicalSortExprNode, Schema, SortExecNode}
 
 abstract class NativeShuffleExchangeBase(
@@ -125,7 +125,7 @@ abstract class NativeShuffleExchangeBase(
     val shuffleHandle = shuffleDependency.shuffleHandle
     val rdd = doExecuteNonNative()
 
-    val nativeMetrics = MetricNode(
+    val nativeMetrics = SparkMetricNode(
       Map(),
       Nil,
       Some({
@@ -192,7 +192,7 @@ abstract class NativeShuffleExchangeBase(
 
     val nativeInputRDD = rdd.asInstanceOf[NativeRDD]
     val numPartitions = outputPartitioning.numPartitions
-    val nativeMetrics = MetricNode(
+    val nativeMetrics = SparkMetricNode(
       metrics,
       nativeInputRDD.metrics :: Nil,
       Some({
diff --git 
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeShuffledHashJoinBase.scala
 
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeShuffledHashJoinBase.scala
index 1a4ceec4..b9fc8de8 100644
--- 
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeShuffledHashJoinBase.scala
+++ 
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeShuffledHashJoinBase.scala
@@ -20,7 +20,6 @@ import scala.collection.JavaConverters._
 import scala.collection.immutable.SortedMap
 
 import org.apache.spark.OneToOneDependency
-import org.apache.spark.sql.auron.MetricNode
 import org.apache.spark.sql.auron.NativeConverters
 import org.apache.spark.sql.auron.NativeHelper
 import org.apache.spark.sql.auron.NativeRDD
@@ -33,6 +32,7 @@ import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.execution.metric.SQLMetric
 
 import org.apache.auron.{protobuf => pb}
+import org.apache.auron.metric.SparkMetricNode
 
 abstract class NativeShuffledHashJoinBase(
     override val left: SparkPlan,
@@ -94,7 +94,7 @@ abstract class NativeShuffledHashJoinBase(
   override def doExecuteNative(): NativeRDD = {
     val leftRDD = NativeHelper.executeNative(left)
     val rightRDD = NativeHelper.executeNative(right)
-    val nativeMetrics = MetricNode(metrics, leftRDD.metrics :: 
rightRDD.metrics :: Nil)
+    val nativeMetrics = SparkMetricNode(metrics, leftRDD.metrics :: 
rightRDD.metrics :: Nil)
     val nativeJoinOn = this.nativeJoinOn
     val nativeJoinType = this.nativeJoinType
     val nativeBuildSide = this.nativeBuildSide
diff --git 
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeSortBase.scala
 
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeSortBase.scala
index 7d80186d..eea92cdb 100644
--- 
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeSortBase.scala
+++ 
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeSortBase.scala
@@ -20,7 +20,6 @@ import scala.collection.JavaConverters._
 import scala.collection.immutable.SortedMap
 
 import org.apache.spark.OneToOneDependency
-import org.apache.spark.sql.auron.MetricNode
 import org.apache.spark.sql.auron.NativeConverters
 import org.apache.spark.sql.auron.NativeHelper
 import org.apache.spark.sql.auron.NativeRDD
@@ -37,6 +36,7 @@ import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.execution.UnaryExecNode
 import org.apache.spark.sql.execution.metric.SQLMetric
 
+import org.apache.auron.metric.SparkMetricNode
 import org.apache.auron.protobuf.PhysicalExprNode
 import org.apache.auron.protobuf.PhysicalPlanNode
 import org.apache.auron.protobuf.PhysicalSortExprNode
@@ -95,7 +95,7 @@ abstract class NativeSortBase(
 
   override def doExecuteNative(): NativeRDD = {
     val inputRDD = NativeHelper.executeNative(child)
-    val nativeMetrics = MetricNode(metrics, inputRDD.metrics :: Nil)
+    val nativeMetrics = SparkMetricNode(metrics, inputRDD.metrics :: Nil)
     val nativeSortExprs = this.nativeSortExprs
 
     new NativeRDD(
diff --git 
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeSortMergeJoinBase.scala
 
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeSortMergeJoinBase.scala
index 42a5c492..1122093c 100644
--- 
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeSortMergeJoinBase.scala
+++ 
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeSortMergeJoinBase.scala
@@ -20,7 +20,6 @@ import scala.collection.JavaConverters._
 import scala.collection.immutable.SortedMap
 
 import org.apache.spark.OneToOneDependency
-import org.apache.spark.sql.auron.MetricNode
 import org.apache.spark.sql.auron.NativeConverters
 import org.apache.spark.sql.auron.NativeHelper
 import org.apache.spark.sql.auron.NativeRDD
@@ -38,6 +37,7 @@ import org.apache.spark.sql.execution.BinaryExecNode
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.execution.metric.SQLMetric
 
+import org.apache.auron.metric.SparkMetricNode
 import org.apache.auron.protobuf.JoinOn
 import org.apache.auron.protobuf.PhysicalPlanNode
 import org.apache.auron.protobuf.SortMergeJoinExecNode
@@ -104,7 +104,7 @@ abstract class NativeSortMergeJoinBase(
   override def doExecuteNative(): NativeRDD = {
     val leftRDD = NativeHelper.executeNative(left)
     val rightRDD = NativeHelper.executeNative(right)
-    val nativeMetrics = MetricNode(metrics, leftRDD.metrics :: 
rightRDD.metrics :: Nil)
+    val nativeMetrics = SparkMetricNode(metrics, leftRDD.metrics :: 
rightRDD.metrics :: Nil)
     val nativeSortOptions = this.nativeSortOptions
     val nativeJoinOn = this.nativeJoinOn
     val nativeJoinType = this.nativeJoinType
diff --git 
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeTakeOrderedBase.scala
 
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeTakeOrderedBase.scala
index f8886510..b3a5b7fe 100644
--- 
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeTakeOrderedBase.scala
+++ 
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeTakeOrderedBase.scala
@@ -21,7 +21,6 @@ import scala.collection.immutable.SortedMap
 import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.OneToOneDependency
-import org.apache.spark.sql.auron.MetricNode
 import org.apache.spark.sql.auron.NativeConverters
 import org.apache.spark.sql.auron.NativeHelper
 import org.apache.spark.sql.auron.NativeRDD
@@ -40,6 +39,7 @@ import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.execution.UnaryExecNode
 import org.apache.spark.sql.execution.metric.SQLMetric
 
+import org.apache.auron.metric.SparkMetricNode
 import org.apache.auron.protobuf.FetchLimit
 import org.apache.auron.protobuf.PhysicalExprNode
 import org.apache.auron.protobuf.PhysicalPlanNode
@@ -130,7 +130,7 @@ abstract class NativeTakeOrderedBase(
     // take top-K from the final partition
     new NativeRDD(
       sparkContext,
-      metrics = MetricNode(metrics, shuffledRDD.metrics :: Nil),
+      metrics = SparkMetricNode(metrics, shuffledRDD.metrics :: Nil),
       shuffledRDD.partitions,
       shuffledRDD.partitioner,
       new OneToOneDependency(shuffledRDD) :: Nil,
@@ -182,7 +182,7 @@ abstract class NativePartialTakeOrderedBase(
 
     new NativeRDD(
       sparkContext,
-      metrics = MetricNode(metrics, inputRDD.metrics :: Nil),
+      metrics = SparkMetricNode(metrics, inputRDD.metrics :: Nil),
       inputRDD.partitions,
       inputRDD.partitioner,
       new OneToOneDependency(inputRDD) :: Nil,
diff --git 
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeUnionBase.scala
 
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeUnionBase.scala
index aad5d876..a62ea6f1 100644
--- 
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeUnionBase.scala
+++ 
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeUnionBase.scala
@@ -24,7 +24,6 @@ import org.apache.spark.rdd.PartitionerAwareUnionRDD
 import org.apache.spark.rdd.PartitionerAwareUnionRDDPartition
 import org.apache.spark.rdd.UnionPartition
 import org.apache.spark.rdd.UnionRDD
-import org.apache.spark.sql.auron.MetricNode
 import org.apache.spark.sql.auron.NativeHelper
 import org.apache.spark.sql.auron.NativeRDD
 import org.apache.spark.sql.auron.NativeSupports
@@ -32,6 +31,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.execution.metric.SQLMetric
 
+import org.apache.auron.metric.SparkMetricNode
 import org.apache.auron.protobuf.EmptyPartitionsExecNode
 import org.apache.auron.protobuf.PhysicalPlanNode
 import org.apache.auron.protobuf.Schema
@@ -52,7 +52,8 @@ abstract class NativeUnionBase(
 
   override def doExecuteNative(): NativeRDD = {
     val rdds = children.map(c => NativeHelper.executeNative(c))
-    val nativeMetrics = MetricNode(metrics, 
rdds.filter(_.partitions.nonEmpty).map(_.metrics))
+    val nativeMetrics =
+      SparkMetricNode(metrics, 
rdds.filter(_.partitions.nonEmpty).map(_.metrics))
     val unionRDD = sparkContext.union(rdds)
     val unionedPartitions = unionRDD.partitions
     val unionedPartitioner = unionRDD.partitioner
diff --git 
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeWindowBase.scala
 
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeWindowBase.scala
index 44c786da..2e849536 100644
--- 
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeWindowBase.scala
+++ 
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeWindowBase.scala
@@ -20,7 +20,6 @@ import scala.collection.JavaConverters._
 import scala.collection.immutable.SortedMap
 
 import org.apache.spark.OneToOneDependency
-import org.apache.spark.sql.auron.MetricNode
 import org.apache.spark.sql.auron.NativeConverters
 import org.apache.spark.sql.auron.NativeHelper
 import org.apache.spark.sql.auron.NativeRDD
@@ -49,6 +48,7 @@ import org.apache.spark.sql.execution.UnaryExecNode
 import org.apache.spark.sql.execution.metric.SQLMetric
 
 import org.apache.auron.{protobuf => pb}
+import org.apache.auron.metric.SparkMetricNode
 import org.apache.auron.protobuf.WindowGroupLimit
 
 abstract class NativeWindowBase(
@@ -191,7 +191,7 @@ abstract class NativeWindowBase(
 
   override def doExecuteNative(): NativeRDD = {
     val inputRDD = NativeHelper.executeNative(child)
-    val nativeMetrics = MetricNode(metrics, inputRDD.metrics :: Nil)
+    val nativeMetrics = SparkMetricNode(metrics, inputRDD.metrics :: Nil)
     val nativeWindowExprs = this.nativeWindowExprs
     val nativeOrderSpecExprs = this.nativeOrderSpecExprs
     val nativePartitionSpecExprs = this.nativePartitionSpecExprs
diff --git 
a/thirdparty/auron-paimon/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/NativePaimonTableScanExec.scala
 
b/thirdparty/auron-paimon/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/NativePaimonTableScanExec.scala
index 8ce3a62f..2ae7efef 100644
--- 
a/thirdparty/auron-paimon/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/NativePaimonTableScanExec.scala
+++ 
b/thirdparty/auron-paimon/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/NativePaimonTableScanExec.scala
@@ -29,7 +29,6 @@ import org.apache.spark.Partition
 import org.apache.spark.TaskContext
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.auron.MetricNode
 import org.apache.spark.sql.auron.NativeRDD
 import org.apache.spark.sql.auron.Shims
 import org.apache.spark.sql.catalyst.InternalRow
@@ -45,6 +44,7 @@ import org.apache.spark.sql.hive.execution.HiveTableScanExec
 import org.apache.spark.sql.types.StructType
 
 import org.apache.auron.{protobuf => pb}
+import org.apache.auron.metric.SparkMetricNode
 
 case class NativePaimonTableScanExec(basedHiveScan: HiveTableScanExec)
     extends NativeHiveTableScanBase(basedHiveScan)
@@ -55,7 +55,7 @@ case class NativePaimonTableScanExec(basedHiveScan: 
HiveTableScanExec)
   private lazy val fileFormat = PaimonUtil.paimonFileFormat(table)
 
   override def doExecuteNative(): NativeRDD = {
-    val nativeMetrics = MetricNode(
+    val nativeMetrics = SparkMetricNode(
       metrics,
       Nil,
       Some({

Reply via email to