This is an automated email from the ASF dual-hosted git repository.
richox pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/auron.git
The following commit(s) were added to refs/heads/master by this push:
new 9e9dc618 [AURON #1468] Introduce SparkMetricNode (#1469)
9e9dc618 is described below
commit 9e9dc618a5a2675aba32cf6b235d51dd53f3e5b5
Author: zhangmang <[email protected]>
AuthorDate: Mon Oct 20 17:26:28 2025 +0800
[AURON #1468] Introduce SparkMetricNode (#1469)
* [AURON #1468] Introduce SparkMetricNode
* fix ci fail
* fix ci
* fix checkstyle
---
.../src/main/java/org/apache/auron/metric/MetricNode.java | 3 ++-
native-engine/auron-jni-bridge/src/jni_bridge.rs | 6 +++---
.../main/scala/org/apache/spark/sql/auron/ShimsImpl.scala | 7 ++++---
.../metric/SparkMetricNode.scala} | 15 +++++++++------
.../apache/spark/sql/auron/AuronCallNativeWrapper.scala | 3 ++-
.../org/apache/spark/sql/auron/AuronConverters.scala | 3 ++-
.../scala/org/apache/spark/sql/auron/NativeHelper.scala | 3 ++-
.../main/scala/org/apache/spark/sql/auron/NativeRDD.scala | 5 +++--
.../sql/execution/auron/plan/ConvertToNativeBase.scala | 4 ++--
.../spark/sql/execution/auron/plan/NativeAggBase.scala | 4 ++--
.../auron/plan/NativeBroadcastExchangeBase.scala | 8 ++++----
.../execution/auron/plan/NativeBroadcastJoinBase.scala | 4 ++--
.../spark/sql/execution/auron/plan/NativeExpandBase.scala | 4 ++--
.../spark/sql/execution/auron/plan/NativeFilterBase.scala | 4 ++--
.../sql/execution/auron/plan/NativeGenerateBase.scala | 4 ++--
.../sql/execution/auron/plan/NativeGlobalLimitBase.scala | 4 ++--
.../sql/execution/auron/plan/NativeLocalLimitBase.scala | 4 ++--
.../sql/execution/auron/plan/NativeOrcScanBase.scala | 5 +++--
.../sql/execution/auron/plan/NativeParquetScanBase.scala | 5 +++--
.../sql/execution/auron/plan/NativeParquetSinkBase.scala | 4 ++--
.../sql/execution/auron/plan/NativeProjectBase.scala | 4 ++--
.../execution/auron/plan/NativeRenameColumnsBase.scala | 4 ++--
.../execution/auron/plan/NativeShuffleExchangeBase.scala | 6 +++---
.../execution/auron/plan/NativeShuffledHashJoinBase.scala | 4 ++--
.../spark/sql/execution/auron/plan/NativeSortBase.scala | 4 ++--
.../execution/auron/plan/NativeSortMergeJoinBase.scala | 4 ++--
.../sql/execution/auron/plan/NativeTakeOrderedBase.scala | 6 +++---
.../spark/sql/execution/auron/plan/NativeUnionBase.scala | 5 +++--
.../spark/sql/execution/auron/plan/NativeWindowBase.scala | 4 ++--
.../execution/auron/plan/NativePaimonTableScanExec.scala | 4 ++--
30 files changed, 78 insertions(+), 66 deletions(-)
diff --git a/auron-core/src/main/java/org/apache/auron/metric/MetricNode.java
b/auron-core/src/main/java/org/apache/auron/metric/MetricNode.java
index 06e517fb..47edc04d 100644
--- a/auron-core/src/main/java/org/apache/auron/metric/MetricNode.java
+++ b/auron-core/src/main/java/org/apache/auron/metric/MetricNode.java
@@ -16,6 +16,7 @@
*/
package org.apache.auron.metric;
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
@@ -23,7 +24,7 @@ import java.util.List;
* Abstract class representing a metric node in the Auron system.
* This class provides functionality for hierarchical metrics tracking.
*/
-public abstract class MetricNode {
+public abstract class MetricNode implements Serializable {
private final List<MetricNode> children = new ArrayList<>();
public MetricNode(List<MetricNode> children) {
diff --git a/native-engine/auron-jni-bridge/src/jni_bridge.rs
b/native-engine/auron-jni-bridge/src/jni_bridge.rs
index ee178a49..85cb4d0c 100644
--- a/native-engine/auron-jni-bridge/src/jni_bridge.rs
+++ b/native-engine/auron-jni-bridge/src/jni_bridge.rs
@@ -1111,7 +1111,7 @@ pub struct SparkMetricNode<'a> {
pub method_add_ret: ReturnType,
}
impl<'a> SparkMetricNode<'a> {
- pub const SIG_TYPE: &'static str = "org/apache/spark/sql/auron/MetricNode";
+ pub const SIG_TYPE: &'static str =
"org/apache/auron/metric/SparkMetricNode";
pub fn new(env: &JNIEnv<'a>) -> JniResult<SparkMetricNode<'a>> {
let class = get_global_jclass(env, Self::SIG_TYPE)?;
@@ -1120,7 +1120,7 @@ impl<'a> SparkMetricNode<'a> {
method_getChild: env.get_method_id(
class,
"getChild",
- "(I)Lorg/apache/spark/sql/auron/MetricNode;",
+ "(I)Lorg/apache/auron/metric/MetricNode;",
)?,
method_getChild_ret: ReturnType::Object,
method_add: env.get_method_id(class, "add",
"(Ljava/lang/String;J)V")?,
@@ -1435,7 +1435,7 @@ impl<'a> AuronCallNativeWrapper<'a> {
method_getMetrics: env.get_method_id(
class,
"getMetrics",
- "()Lorg/apache/spark/sql/auron/MetricNode;",
+ "()Lorg/apache/auron/metric/MetricNode;",
)?,
method_getMetrics_ret: ReturnType::Object,
method_importSchema: env.get_method_id(class, "importSchema",
"(J)V")?,
diff --git
a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/auron/ShimsImpl.scala
b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/auron/ShimsImpl.scala
index 1e2533f1..68cf3698 100644
---
a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/auron/ShimsImpl.scala
+++
b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/auron/ShimsImpl.scala
@@ -111,6 +111,7 @@ import org.apache.spark.storage.FileSegment
import org.apache.auron.{protobuf => pb, sparkver}
import org.apache.auron.common.AuronBuildInfo
+import org.apache.auron.metric.SparkMetricNode
import org.apache.auron.spark.ui.AuronBuildInfoEvent
class ShimsImpl extends Shims with Logging {
@@ -616,7 +617,7 @@ class ShimsImpl extends Shims with Logging {
val requiredMetrics = nativeShuffle.readMetrics ++
nativeShuffle.metrics.filterKeys(_ == "shuffle_read_total_time")
- val metrics = MetricNode(
+ val metrics = SparkMetricNode(
requiredMetrics,
inputRDD.metrics :: Nil,
Some({
@@ -719,7 +720,7 @@ class ShimsImpl extends Shims with Logging {
val requiredMetrics = nativeShuffle.readMetrics ++
nativeShuffle.metrics.filterKeys(_ == "shuffle_read_total_time")
- val metrics = MetricNode(
+ val metrics = SparkMetricNode(
requiredMetrics,
inputRDD.metrics :: Nil,
Some({
@@ -812,7 +813,7 @@ class ShimsImpl extends Shims with Logging {
val requiredMetrics = nativeShuffle.readMetrics ++
nativeShuffle.metrics.filterKeys(_ == "shuffle_read_total_time")
- val metrics = MetricNode(
+ val metrics = SparkMetricNode(
requiredMetrics,
inputRDD.metrics :: Nil,
Some({
diff --git
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/MetricNode.scala
b/spark-extension/src/main/scala/org/apache/auron/metric/SparkMetricNode.scala
similarity index 79%
rename from
spark-extension/src/main/scala/org/apache/spark/sql/auron/MetricNode.scala
rename to
spark-extension/src/main/scala/org/apache/auron/metric/SparkMetricNode.scala
index 4198bff3..8c3925ae 100644
--- a/spark-extension/src/main/scala/org/apache/spark/sql/auron/MetricNode.scala
+++
b/spark-extension/src/main/scala/org/apache/auron/metric/SparkMetricNode.scala
@@ -14,18 +14,21 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.spark.sql.auron
+package org.apache.auron.metric
+
+import scala.collection.JavaConverters._
import org.apache.spark.internal.Logging
import org.apache.spark.sql.execution.metric.SQLMetric
-case class MetricNode(
+case class SparkMetricNode(
metrics: Map[String, SQLMetric],
children: Seq[MetricNode],
metricValueHandler: Option[(String, Long) => Unit] = None)
- extends Logging {
+ extends MetricNode(children.asJava)
+ with Logging {
- def getChild(i: Int): MetricNode = {
+ override def getChild(i: Int): MetricNode = {
if (i < children.length) {
children(i)
} else {
@@ -40,8 +43,8 @@ case class MetricNode(
}
}
- def foreach(fn: MetricNode => Unit): Unit = {
+ def foreach(fn: SparkMetricNode => Unit): Unit = {
fn(this)
- this.children.foreach(_.foreach(fn))
+ this.children.foreach(_.asInstanceOf[SparkMetricNode].foreach(fn))
}
}
diff --git
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronCallNativeWrapper.scala
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronCallNativeWrapper.scala
index c827db32..5cf2131b 100644
---
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronCallNativeWrapper.scala
+++
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronCallNativeWrapper.scala
@@ -44,6 +44,7 @@ import org.apache.spark.util.CompletionIterator
import org.apache.spark.util.ShutdownHookManager
import org.apache.spark.util.Utils
+import org.apache.auron.metric.{MetricNode, SparkMetricNode}
import org.apache.auron.protobuf.PartitionId
import org.apache.auron.protobuf.PhysicalPlanNode
import org.apache.auron.protobuf.TaskDefinition
@@ -52,7 +53,7 @@ case class AuronCallNativeWrapper(
nativePlan: PhysicalPlanNode,
partition: Partition,
context: Option[TaskContext],
- metrics: MetricNode)
+ metrics: SparkMetricNode)
extends Logging {
AuronCallNativeWrapper.initNative()
diff --git
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala
index 556ac7f3..db3b710b 100644
---
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala
+++
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala
@@ -86,6 +86,7 @@ import
org.apache.spark.sql.hive.execution.auron.plan.NativeHiveTableScanBase
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.LongType
+import org.apache.auron.metric.SparkMetricNode
import org.apache.auron.protobuf.EmptyPartitionsExecNode
import org.apache.auron.protobuf.PhysicalPlanNode
import org.apache.auron.sparkver
@@ -1089,7 +1090,7 @@ object AuronConverters extends Logging {
new NativeRDD(
sparkContext,
- MetricNode(Map(), Nil),
+ SparkMetricNode(Map(), Nil),
rddPartitions = partitions,
rddPartitioner = None,
rddDependencies = Nil,
diff --git
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeHelper.scala
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeHelper.scala
index 64a37f6d..eabe7ff6 100644
---
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeHelper.scala
+++
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeHelper.scala
@@ -31,6 +31,7 @@ import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.execution.metric.SQLMetrics
+import org.apache.auron.metric.SparkMetricNode
import org.apache.auron.protobuf.PhysicalPlanNode
object NativeHelper extends Logging {
@@ -78,7 +79,7 @@ object NativeHelper extends Logging {
def executeNativePlan(
nativePlan: PhysicalPlanNode,
- metrics: MetricNode,
+ metrics: SparkMetricNode,
partition: Partition,
context: Option[TaskContext]): Iterator[InternalRow] = {
diff --git
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeRDD.scala
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeRDD.scala
index 5da12a44..4000035e 100644
--- a/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeRDD.scala
+++ b/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeRDD.scala
@@ -29,11 +29,12 @@ import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.auron.metric.SparkMetricNode
import org.apache.auron.protobuf.PhysicalPlanNode
class NativeRDD(
@transient private val rddSparkContext: SparkContext,
- val metrics: MetricNode,
+ val metrics: SparkMetricNode,
private val rddPartitions: Array[Partition],
private val rddPartitioner: Option[Partitioner],
private val rddDependencies: Seq[Dependency[_]],
@@ -71,7 +72,7 @@ class NativeRDD(
class EmptyNativeRDD(@transient private val rddSparkContext: SparkContext)
extends NativeRDD(
rddSparkContext = rddSparkContext,
- metrics = MetricNode(Map.empty, Seq(), None),
+ metrics = SparkMetricNode(Map.empty, Seq(), None),
rddPartitions = Array.empty,
rddPartitioner = None,
rddDependencies = Seq.empty,
diff --git
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/ConvertToNativeBase.scala
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/ConvertToNativeBase.scala
index 1e741160..6ea291a2 100644
---
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/ConvertToNativeBase.scala
+++
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/ConvertToNativeBase.scala
@@ -22,7 +22,6 @@ import scala.collection.immutable.SortedMap
import org.apache.spark.OneToOneDependency
import org.apache.spark.sql.auron.JniBridge
-import org.apache.spark.sql.auron.MetricNode
import org.apache.spark.sql.auron.NativeConverters
import org.apache.spark.sql.auron.NativeHelper
import org.apache.spark.sql.auron.NativeRDD
@@ -38,6 +37,7 @@ import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.types.StructType
+import org.apache.auron.metric.SparkMetricNode
import org.apache.auron.protobuf.FFIReaderExecNode
import org.apache.auron.protobuf.PhysicalPlanNode
import org.apache.auron.protobuf.Schema
@@ -63,7 +63,7 @@ abstract class ConvertToNativeBase(override val child:
SparkPlan)
override def doExecuteNative(): NativeRDD = {
val inputRDD = child.execute()
val numInputPartitions = inputRDD.getNumPartitions
- val nativeMetrics = MetricNode(metrics, Nil)
+ val nativeMetrics = SparkMetricNode(metrics, Nil)
new NativeRDD(
sparkContext,
diff --git
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeAggBase.scala
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeAggBase.scala
index c5c382ac..e470a0f4 100644
---
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeAggBase.scala
+++
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeAggBase.scala
@@ -23,7 +23,6 @@ import scala.collection.immutable.SortedMap
import org.apache.spark.OneToOneDependency
import org.apache.spark.internal.Logging
import org.apache.spark.sql.auron.AuronConf
-import org.apache.spark.sql.auron.MetricNode
import org.apache.spark.sql.auron.NativeConverters
import org.apache.spark.sql.auron.NativeHelper
import org.apache.spark.sql.auron.NativeRDD
@@ -58,6 +57,7 @@ import org.apache.spark.sql.types.BinaryType
import org.apache.spark.sql.types.DataType
import org.apache.auron.{protobuf => pb}
+import org.apache.auron.metric.SparkMetricNode
abstract class NativeAggBase(
execMode: AggExecMode,
@@ -162,7 +162,7 @@ abstract class NativeAggBase(
override def doExecuteNative(): NativeRDD = {
val inputRDD = NativeHelper.executeNative(child)
- val nativeMetrics = MetricNode(metrics, inputRDD.metrics :: Nil)
+ val nativeMetrics = SparkMetricNode(metrics, inputRDD.metrics :: Nil)
val nativeExecMode = this.nativeExecMode
val nativeAggrNames = this.nativeAggrNames
val nativeGroupingNames = this.nativeGroupingNames
diff --git
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeBroadcastExchangeBase.scala
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeBroadcastExchangeBase.scala
index 7dafd564..b727a929 100644
---
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeBroadcastExchangeBase.scala
+++
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeBroadcastExchangeBase.scala
@@ -37,7 +37,6 @@ import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.auron.JniBridge
-import org.apache.spark.sql.auron.MetricNode
import org.apache.spark.sql.auron.NativeConverters
import org.apache.spark.sql.auron.NativeHelper
import org.apache.spark.sql.auron.NativeRDD
@@ -66,6 +65,7 @@ import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.types.BinaryType
import org.apache.auron.{protobuf => pb}
+import org.apache.auron.metric.SparkMetricNode
abstract class NativeBroadcastExchangeBase(mode: BroadcastMode, override val
child: SparkPlan)
extends BroadcastExchangeLike
@@ -110,7 +110,7 @@ abstract class NativeBroadcastExchangeBase(mode:
BroadcastMode, override val chi
val broadcastReadNativePlan =
doExecuteNative().nativePlan(singlePartition, null)
val rowsIter = NativeHelper.executeNativePlan(
broadcastReadNativePlan,
- MetricNode(Map(), Nil, None),
+ SparkMetricNode(Map(), Nil, None),
singlePartition,
None)
val pruneKeyField = new InterpretedUnsafeProjection(
@@ -155,7 +155,7 @@ abstract class NativeBroadcastExchangeBase(mode:
BroadcastMode, override val chi
override def doExecuteNative(): NativeRDD = {
val broadcast = doExecuteBroadcastNative[Array[Array[Byte]]]()
- val nativeMetrics = MetricNode(metrics, Nil)
+ val nativeMetrics = SparkMetricNode(metrics, Nil)
val partitions = Array(new Partition() {
override def index: Int = 0
})
@@ -199,7 +199,7 @@ abstract class NativeBroadcastExchangeBase(mode:
BroadcastMode, override val chi
case child => Shims.get.createConvertToNativeExec(child)
})
val modifiedMetrics = metrics ++ Map("output_rows" ->
metrics("numOutputRows"))
- val nativeMetrics = MetricNode(modifiedMetrics, inputRDD.metrics :: Nil)
+ val nativeMetrics = SparkMetricNode(modifiedMetrics, inputRDD.metrics ::
Nil)
val ipcRDD =
new RDD[Array[Byte]](sparkContext, new OneToOneDependency(inputRDD) ::
Nil) {
diff --git
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeBroadcastJoinBase.scala
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeBroadcastJoinBase.scala
index e41e5676..dabeba3f 100644
---
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeBroadcastJoinBase.scala
+++
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeBroadcastJoinBase.scala
@@ -21,7 +21,6 @@ import scala.collection.immutable.SortedMap
import org.apache.spark.OneToOneDependency
import org.apache.spark.Partition
-import org.apache.spark.sql.auron.MetricNode
import org.apache.spark.sql.auron.NativeConverters
import org.apache.spark.sql.auron.NativeHelper
import org.apache.spark.sql.auron.NativeRDD
@@ -43,6 +42,7 @@ import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.types.LongType
import org.apache.auron.{protobuf => pb}
+import org.apache.auron.metric.SparkMetricNode
import org.apache.auron.protobuf.JoinOn
abstract class NativeBroadcastJoinBase(
@@ -126,7 +126,7 @@ abstract class NativeBroadcastJoinBase(
override def doExecuteNative(): NativeRDD = {
val leftRDD = NativeHelper.executeNative(left)
val rightRDD = NativeHelper.executeNative(right)
- val nativeMetrics = MetricNode(metrics, leftRDD.metrics ::
rightRDD.metrics :: Nil)
+ val nativeMetrics = SparkMetricNode(metrics, leftRDD.metrics ::
rightRDD.metrics :: Nil)
val nativeSchema = this.nativeSchema
val nativeJoinType = this.nativeJoinType
val nativeJoinOn = this.nativeJoinOn
diff --git
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeExpandBase.scala
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeExpandBase.scala
index 9b7a3b66..217ed140 100644
---
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeExpandBase.scala
+++
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeExpandBase.scala
@@ -20,7 +20,6 @@ import scala.collection.JavaConverters._
import scala.collection.immutable.SortedMap
import org.apache.spark.OneToOneDependency
-import org.apache.spark.sql.auron.MetricNode
import org.apache.spark.sql.auron.NativeConverters
import org.apache.spark.sql.auron.NativeHelper
import org.apache.spark.sql.auron.NativeRDD
@@ -32,6 +31,7 @@ import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.UnaryExecNode
import org.apache.spark.sql.execution.metric.SQLMetric
+import org.apache.auron.metric.SparkMetricNode
import org.apache.auron.protobuf.ExpandExecNode
import org.apache.auron.protobuf.ExpandProjection
import org.apache.auron.protobuf.PhysicalPlanNode
@@ -72,7 +72,7 @@ abstract class NativeExpandBase(
override def doExecuteNative(): NativeRDD = {
val inputRDD = NativeHelper.executeNative(child)
- val nativeMetrics = MetricNode(metrics, inputRDD.metrics :: Nil)
+ val nativeMetrics = SparkMetricNode(metrics, inputRDD.metrics :: Nil)
val nativeSchema = this.nativeSchema
val nativeProjections = this.nativeProjections
diff --git
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeFilterBase.scala
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeFilterBase.scala
index a18b3d58..56d46c46 100644
---
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeFilterBase.scala
+++
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeFilterBase.scala
@@ -21,7 +21,6 @@ import scala.collection.immutable.SortedMap
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.OneToOneDependency
-import org.apache.spark.sql.auron.MetricNode
import org.apache.spark.sql.auron.NativeConverters
import org.apache.spark.sql.auron.NativeHelper
import org.apache.spark.sql.auron.NativeRDD
@@ -38,6 +37,7 @@ import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.UnaryExecNode
import org.apache.spark.sql.execution.metric.SQLMetric
+import org.apache.auron.metric.SparkMetricNode
import org.apache.auron.protobuf.FilterExecNode
import org.apache.auron.protobuf.PhysicalExprNode
import org.apache.auron.protobuf.PhysicalPlanNode
@@ -91,7 +91,7 @@ abstract class NativeFilterBase(condition: Expression,
override val child: Spark
override def doExecuteNative(): NativeRDD = {
val inputRDD = NativeHelper.executeNative(child)
- val nativeMetrics = MetricNode(metrics, inputRDD.metrics :: Nil)
+ val nativeMetrics = SparkMetricNode(metrics, inputRDD.metrics :: Nil)
val nativeFilterExprs = this.nativeFilterExprs
new NativeRDD(
sparkContext,
diff --git
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeGenerateBase.scala
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeGenerateBase.scala
index fbdcaa4e..76e500aa 100644
---
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeGenerateBase.scala
+++
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeGenerateBase.scala
@@ -22,7 +22,6 @@ import scala.collection.mutable.ArrayBuffer
import com.google.protobuf.ByteString
import org.apache.spark.OneToOneDependency
-import org.apache.spark.sql.auron.MetricNode
import org.apache.spark.sql.auron.NativeConverters
import org.apache.spark.sql.auron.NativeHelper
import org.apache.spark.sql.auron.NativeRDD
@@ -44,6 +43,7 @@ import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StructType
import org.apache.auron.{protobuf => pb}
+import org.apache.auron.metric.SparkMetricNode
import org.apache.auron.protobuf.PhysicalPlanNode
abstract class NativeGenerateBase(
@@ -130,7 +130,7 @@ abstract class NativeGenerateBase(
override def doExecuteNative(): NativeRDD = {
val inputRDD = NativeHelper.executeNative(child)
- val nativeMetrics = MetricNode(metrics, inputRDD.metrics :: Nil)
+ val nativeMetrics = SparkMetricNode(metrics, inputRDD.metrics :: Nil)
val nativeGenerator = this.nativeGenerator
val nativeGeneratorOutput = this.nativeGeneratorOutput
val nativeRequiredChildOutput = this.nativeRequiredChildOutput
diff --git
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeGlobalLimitBase.scala
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeGlobalLimitBase.scala
index 1d48005a..83c1f7d8 100644
---
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeGlobalLimitBase.scala
+++
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeGlobalLimitBase.scala
@@ -19,7 +19,6 @@ package org.apache.spark.sql.execution.auron.plan
import scala.collection.immutable.SortedMap
import org.apache.spark.OneToOneDependency
-import org.apache.spark.sql.auron.MetricNode
import org.apache.spark.sql.auron.NativeHelper
import org.apache.spark.sql.auron.NativeRDD
import org.apache.spark.sql.auron.NativeSupports
@@ -32,6 +31,7 @@ import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.UnaryExecNode
import org.apache.spark.sql.execution.metric.SQLMetric
+import org.apache.auron.metric.SparkMetricNode
import org.apache.auron.protobuf.LimitExecNode
import org.apache.auron.protobuf.PhysicalPlanNode
@@ -52,7 +52,7 @@ abstract class NativeGlobalLimitBase(limit: Long, override
val child: SparkPlan)
override def doExecuteNative(): NativeRDD = {
val inputRDD = NativeHelper.executeNative(child)
- val nativeMetrics = MetricNode(metrics, inputRDD.metrics :: Nil)
+ val nativeMetrics = SparkMetricNode(metrics, inputRDD.metrics :: Nil)
new NativeRDD(
sparkContext,
diff --git
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeLocalLimitBase.scala
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeLocalLimitBase.scala
index 0d1b0054..f0ba9e3b 100644
---
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeLocalLimitBase.scala
+++
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeLocalLimitBase.scala
@@ -19,7 +19,6 @@ package org.apache.spark.sql.execution.auron.plan
import scala.collection.immutable.SortedMap
import org.apache.spark.OneToOneDependency
-import org.apache.spark.sql.auron.MetricNode
import org.apache.spark.sql.auron.NativeHelper
import org.apache.spark.sql.auron.NativeRDD
import org.apache.spark.sql.auron.NativeSupports
@@ -30,6 +29,7 @@ import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.UnaryExecNode
import org.apache.spark.sql.execution.metric.SQLMetric
+import org.apache.auron.metric.SparkMetricNode
import org.apache.auron.protobuf.LimitExecNode
import org.apache.auron.protobuf.PhysicalPlanNode
@@ -49,7 +49,7 @@ abstract class NativeLocalLimitBase(limit: Long, override val
child: SparkPlan)
override def doExecuteNative(): NativeRDD = {
val inputRDD = NativeHelper.executeNative(child)
- val nativeMetrics = MetricNode(metrics, inputRDD.metrics :: Nil)
+ val nativeMetrics = SparkMetricNode(metrics, inputRDD.metrics :: Nil)
new NativeRDD(
sparkContext,
diff --git
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeOrcScanBase.scala
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeOrcScanBase.scala
index b1f5e8c4..36132e44 100644
---
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeOrcScanBase.scala
+++
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeOrcScanBase.scala
@@ -22,11 +22,12 @@ import scala.collection.JavaConverters._
import org.apache.spark.Partition
import org.apache.spark.TaskContext
-import org.apache.spark.sql.auron.{EmptyNativeRDD, MetricNode, NativeRDD}
+import org.apache.spark.sql.auron.{EmptyNativeRDD, NativeRDD}
import org.apache.spark.sql.execution.FileSourceScanExec
import org.apache.spark.sql.execution.datasources.FilePartition
import org.apache.auron.{protobuf => pb}
+import org.apache.auron.metric.SparkMetricNode
abstract class NativeOrcScanBase(basedFileScan: FileSourceScanExec)
extends NativeFileSourceScanBase(basedFileScan) {
@@ -34,7 +35,7 @@ abstract class NativeOrcScanBase(basedFileScan:
FileSourceScanExec)
override def doExecuteNative(): NativeRDD = {
val partitions = inputFileScanRDD.filePartitions.toArray
if (partitions.length > 0) {
- val nativeMetrics = MetricNode(
+ val nativeMetrics = SparkMetricNode(
metrics,
Nil,
Some({
diff --git
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetScanBase.scala
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetScanBase.scala
index f4129d0e..591e31a1 100644
---
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetScanBase.scala
+++
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetScanBase.scala
@@ -22,11 +22,12 @@ import scala.collection.JavaConverters._
import org.apache.spark.Partition
import org.apache.spark.TaskContext
-import org.apache.spark.sql.auron.{EmptyNativeRDD, MetricNode, NativeRDD}
+import org.apache.spark.sql.auron.{EmptyNativeRDD, NativeRDD}
import org.apache.spark.sql.execution.FileSourceScanExec
import org.apache.spark.sql.execution.datasources.FilePartition
import org.apache.auron.{protobuf => pb}
+import org.apache.auron.metric.SparkMetricNode
abstract class NativeParquetScanBase(basedFileScan: FileSourceScanExec)
extends NativeFileSourceScanBase(basedFileScan) {
@@ -34,7 +35,7 @@ abstract class NativeParquetScanBase(basedFileScan:
FileSourceScanExec)
override def doExecuteNative(): NativeRDD = {
val partitions = inputFileScanRDD.filePartitions.toArray
if (partitions.length > 0) {
- val nativeMetrics = MetricNode(
+ val nativeMetrics = SparkMetricNode(
metrics,
Nil,
Some({
diff --git
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetSinkBase.scala
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetSinkBase.scala
index aa1b8dbf..c2bc49ca 100644
---
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetSinkBase.scala
+++
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetSinkBase.scala
@@ -38,7 +38,6 @@ import org.apache.hadoop.mapreduce.Job
import org.apache.spark.OneToOneDependency
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.auron.JniBridge
-import org.apache.spark.sql.auron.MetricNode
import org.apache.spark.sql.auron.NativeHelper
import org.apache.spark.sql.auron.NativeRDD
import org.apache.spark.sql.auron.NativeSupports
@@ -53,6 +52,7 @@ import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.hive.auron.HiveClientHelper
import org.apache.spark.util.SerializableConfiguration
+import org.apache.auron.metric.SparkMetricNode
import org.apache.auron.protobuf.ParquetProp
import org.apache.auron.protobuf.ParquetSinkExecNode
import org.apache.auron.protobuf.PhysicalPlanNode
@@ -88,7 +88,7 @@ abstract class NativeParquetSinkBase(
val numDynParts = partition.count(_._2.isEmpty)
val inputRDD = NativeHelper.executeNative(child)
- val nativeMetrics = MetricNode(metrics, inputRDD.metrics :: Nil)
+ val nativeMetrics = SparkMetricNode(metrics, inputRDD.metrics :: Nil)
val nativeDependencies = new OneToOneDependency(inputRDD) :: Nil
new NativeRDD(
sparkSession.sparkContext,
diff --git
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeProjectBase.scala
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeProjectBase.scala
index 5edf60a2..4023ae68 100644
---
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeProjectBase.scala
+++
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeProjectBase.scala
@@ -21,7 +21,6 @@ import scala.collection.immutable.SortedMap
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.OneToOneDependency
-import org.apache.spark.sql.auron.MetricNode
import org.apache.spark.sql.auron.NativeConverters
import org.apache.spark.sql.auron.NativeHelper
import org.apache.spark.sql.auron.NativeRDD
@@ -37,6 +36,7 @@ import org.apache.spark.sql.execution.UnaryExecNode
import
org.apache.spark.sql.execution.auron.plan.NativeProjectBase.getNativeProjectBuilder
import org.apache.spark.sql.execution.metric.SQLMetric
+import org.apache.auron.metric.SparkMetricNode
import org.apache.auron.protobuf.ArrowType
import org.apache.auron.protobuf.PhysicalExprNode
import org.apache.auron.protobuf.PhysicalPlanNode
@@ -70,7 +70,7 @@ abstract class NativeProjectBase(projectList:
Seq[NamedExpression], override val
override def doExecuteNative(): NativeRDD = {
val inputRDD = NativeHelper.executeNative(child)
- val nativeMetrics = MetricNode(metrics, inputRDD.metrics :: Nil)
+ val nativeMetrics = SparkMetricNode(metrics, inputRDD.metrics :: Nil)
val nativeProject = this.nativeProject
new NativeRDD(
diff --git
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeRenameColumnsBase.scala
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeRenameColumnsBase.scala
index 487cb199..f8bf6cb9 100644
---
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeRenameColumnsBase.scala
+++
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeRenameColumnsBase.scala
@@ -19,7 +19,6 @@ package org.apache.spark.sql.execution.auron.plan
import scala.collection.JavaConverters._
import org.apache.spark.OneToOneDependency
-import org.apache.spark.sql.auron.MetricNode
import org.apache.spark.sql.auron.NativeHelper
import org.apache.spark.sql.auron.NativeRDD
import org.apache.spark.sql.auron.NativeSupports
@@ -31,6 +30,7 @@ import org.apache.spark.sql.execution.UnaryExecNode
import
org.apache.spark.sql.execution.auron.plan.NativeRenameColumnsBase.buildRenameColumnsExec
import org.apache.spark.sql.execution.metric.SQLMetric
+import org.apache.auron.metric.SparkMetricNode
import org.apache.auron.protobuf.PhysicalPlanNode
import org.apache.auron.protobuf.RenameColumnsExecNode
@@ -53,7 +53,7 @@ abstract class NativeRenameColumnsBase(
override def doExecuteNative(): NativeRDD = {
val inputRDD = NativeHelper.executeNative(child)
- val nativeMetrics = MetricNode(metrics, inputRDD.metrics :: Nil)
+ val nativeMetrics = SparkMetricNode(metrics, inputRDD.metrics :: Nil)
new NativeRDD(
sparkContext,
diff --git
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeShuffleExchangeBase.scala
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeShuffleExchangeBase.scala
index 789c7310..47457a41 100644
---
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeShuffleExchangeBase.scala
+++
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeShuffleExchangeBase.scala
@@ -29,7 +29,6 @@ import org.apache.spark.rdd.{PartitionPruningRDD, RDD}
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.ShuffleWriteProcessor
import org.apache.spark.sql.auron.JniBridge
-import org.apache.spark.sql.auron.MetricNode
import org.apache.spark.sql.auron.NativeConverters
import org.apache.spark.sql.auron.NativeHelper
import org.apache.spark.sql.auron.NativeRDD
@@ -50,6 +49,7 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.ArrayType
import org.apache.spark.util.{CompletionIterator, MutablePair}
+import org.apache.auron.metric.SparkMetricNode
import org.apache.auron.protobuf.{IpcReaderExecNode, PhysicalExprNode,
PhysicalHashRepartition, PhysicalPlanNode, PhysicalRangeRepartition,
PhysicalRepartition, PhysicalRoundRobinRepartition, PhysicalSingleRepartition,
PhysicalSortExprNode, Schema, SortExecNode}
abstract class NativeShuffleExchangeBase(
@@ -125,7 +125,7 @@ abstract class NativeShuffleExchangeBase(
val shuffleHandle = shuffleDependency.shuffleHandle
val rdd = doExecuteNonNative()
- val nativeMetrics = MetricNode(
+ val nativeMetrics = SparkMetricNode(
Map(),
Nil,
Some({
@@ -192,7 +192,7 @@ abstract class NativeShuffleExchangeBase(
val nativeInputRDD = rdd.asInstanceOf[NativeRDD]
val numPartitions = outputPartitioning.numPartitions
- val nativeMetrics = MetricNode(
+ val nativeMetrics = SparkMetricNode(
metrics,
nativeInputRDD.metrics :: Nil,
Some({
diff --git
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeShuffledHashJoinBase.scala
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeShuffledHashJoinBase.scala
index 1a4ceec4..b9fc8de8 100644
---
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeShuffledHashJoinBase.scala
+++
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeShuffledHashJoinBase.scala
@@ -20,7 +20,6 @@ import scala.collection.JavaConverters._
import scala.collection.immutable.SortedMap
import org.apache.spark.OneToOneDependency
-import org.apache.spark.sql.auron.MetricNode
import org.apache.spark.sql.auron.NativeConverters
import org.apache.spark.sql.auron.NativeHelper
import org.apache.spark.sql.auron.NativeRDD
@@ -33,6 +32,7 @@ import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.auron.{protobuf => pb}
+import org.apache.auron.metric.SparkMetricNode
abstract class NativeShuffledHashJoinBase(
override val left: SparkPlan,
@@ -94,7 +94,7 @@ abstract class NativeShuffledHashJoinBase(
override def doExecuteNative(): NativeRDD = {
val leftRDD = NativeHelper.executeNative(left)
val rightRDD = NativeHelper.executeNative(right)
- val nativeMetrics = MetricNode(metrics, leftRDD.metrics ::
rightRDD.metrics :: Nil)
+ val nativeMetrics = SparkMetricNode(metrics, leftRDD.metrics ::
rightRDD.metrics :: Nil)
val nativeJoinOn = this.nativeJoinOn
val nativeJoinType = this.nativeJoinType
val nativeBuildSide = this.nativeBuildSide
diff --git
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeSortBase.scala
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeSortBase.scala
index 7d80186d..eea92cdb 100644
---
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeSortBase.scala
+++
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeSortBase.scala
@@ -20,7 +20,6 @@ import scala.collection.JavaConverters._
import scala.collection.immutable.SortedMap
import org.apache.spark.OneToOneDependency
-import org.apache.spark.sql.auron.MetricNode
import org.apache.spark.sql.auron.NativeConverters
import org.apache.spark.sql.auron.NativeHelper
import org.apache.spark.sql.auron.NativeRDD
@@ -37,6 +36,7 @@ import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.UnaryExecNode
import org.apache.spark.sql.execution.metric.SQLMetric
+import org.apache.auron.metric.SparkMetricNode
import org.apache.auron.protobuf.PhysicalExprNode
import org.apache.auron.protobuf.PhysicalPlanNode
import org.apache.auron.protobuf.PhysicalSortExprNode
@@ -95,7 +95,7 @@ abstract class NativeSortBase(
override def doExecuteNative(): NativeRDD = {
val inputRDD = NativeHelper.executeNative(child)
- val nativeMetrics = MetricNode(metrics, inputRDD.metrics :: Nil)
+ val nativeMetrics = SparkMetricNode(metrics, inputRDD.metrics :: Nil)
val nativeSortExprs = this.nativeSortExprs
new NativeRDD(
diff --git
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeSortMergeJoinBase.scala
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeSortMergeJoinBase.scala
index 42a5c492..1122093c 100644
---
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeSortMergeJoinBase.scala
+++
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeSortMergeJoinBase.scala
@@ -20,7 +20,6 @@ import scala.collection.JavaConverters._
import scala.collection.immutable.SortedMap
import org.apache.spark.OneToOneDependency
-import org.apache.spark.sql.auron.MetricNode
import org.apache.spark.sql.auron.NativeConverters
import org.apache.spark.sql.auron.NativeHelper
import org.apache.spark.sql.auron.NativeRDD
@@ -38,6 +37,7 @@ import org.apache.spark.sql.execution.BinaryExecNode
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.metric.SQLMetric
+import org.apache.auron.metric.SparkMetricNode
import org.apache.auron.protobuf.JoinOn
import org.apache.auron.protobuf.PhysicalPlanNode
import org.apache.auron.protobuf.SortMergeJoinExecNode
@@ -104,7 +104,7 @@ abstract class NativeSortMergeJoinBase(
override def doExecuteNative(): NativeRDD = {
val leftRDD = NativeHelper.executeNative(left)
val rightRDD = NativeHelper.executeNative(right)
- val nativeMetrics = MetricNode(metrics, leftRDD.metrics ::
rightRDD.metrics :: Nil)
+ val nativeMetrics = SparkMetricNode(metrics, leftRDD.metrics ::
rightRDD.metrics :: Nil)
val nativeSortOptions = this.nativeSortOptions
val nativeJoinOn = this.nativeJoinOn
val nativeJoinType = this.nativeJoinType
diff --git
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeTakeOrderedBase.scala
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeTakeOrderedBase.scala
index f8886510..b3a5b7fe 100644
---
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeTakeOrderedBase.scala
+++
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeTakeOrderedBase.scala
@@ -21,7 +21,6 @@ import scala.collection.immutable.SortedMap
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.OneToOneDependency
-import org.apache.spark.sql.auron.MetricNode
import org.apache.spark.sql.auron.NativeConverters
import org.apache.spark.sql.auron.NativeHelper
import org.apache.spark.sql.auron.NativeRDD
@@ -40,6 +39,7 @@ import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.UnaryExecNode
import org.apache.spark.sql.execution.metric.SQLMetric
+import org.apache.auron.metric.SparkMetricNode
import org.apache.auron.protobuf.FetchLimit
import org.apache.auron.protobuf.PhysicalExprNode
import org.apache.auron.protobuf.PhysicalPlanNode
@@ -130,7 +130,7 @@ abstract class NativeTakeOrderedBase(
// take top-K from the final partition
new NativeRDD(
sparkContext,
- metrics = MetricNode(metrics, shuffledRDD.metrics :: Nil),
+ metrics = SparkMetricNode(metrics, shuffledRDD.metrics :: Nil),
shuffledRDD.partitions,
shuffledRDD.partitioner,
new OneToOneDependency(shuffledRDD) :: Nil,
@@ -182,7 +182,7 @@ abstract class NativePartialTakeOrderedBase(
new NativeRDD(
sparkContext,
- metrics = MetricNode(metrics, inputRDD.metrics :: Nil),
+ metrics = SparkMetricNode(metrics, inputRDD.metrics :: Nil),
inputRDD.partitions,
inputRDD.partitioner,
new OneToOneDependency(inputRDD) :: Nil,
diff --git
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeUnionBase.scala
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeUnionBase.scala
index aad5d876..a62ea6f1 100644
---
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeUnionBase.scala
+++
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeUnionBase.scala
@@ -24,7 +24,6 @@ import org.apache.spark.rdd.PartitionerAwareUnionRDD
import org.apache.spark.rdd.PartitionerAwareUnionRDDPartition
import org.apache.spark.rdd.UnionPartition
import org.apache.spark.rdd.UnionRDD
-import org.apache.spark.sql.auron.MetricNode
import org.apache.spark.sql.auron.NativeHelper
import org.apache.spark.sql.auron.NativeRDD
import org.apache.spark.sql.auron.NativeSupports
@@ -32,6 +31,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.metric.SQLMetric
+import org.apache.auron.metric.SparkMetricNode
import org.apache.auron.protobuf.EmptyPartitionsExecNode
import org.apache.auron.protobuf.PhysicalPlanNode
import org.apache.auron.protobuf.Schema
@@ -52,7 +52,8 @@ abstract class NativeUnionBase(
override def doExecuteNative(): NativeRDD = {
val rdds = children.map(c => NativeHelper.executeNative(c))
- val nativeMetrics = MetricNode(metrics,
rdds.filter(_.partitions.nonEmpty).map(_.metrics))
+ val nativeMetrics =
+ SparkMetricNode(metrics,
rdds.filter(_.partitions.nonEmpty).map(_.metrics))
val unionRDD = sparkContext.union(rdds)
val unionedPartitions = unionRDD.partitions
val unionedPartitioner = unionRDD.partitioner
diff --git
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeWindowBase.scala
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeWindowBase.scala
index 44c786da..2e849536 100644
---
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeWindowBase.scala
+++
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeWindowBase.scala
@@ -20,7 +20,6 @@ import scala.collection.JavaConverters._
import scala.collection.immutable.SortedMap
import org.apache.spark.OneToOneDependency
-import org.apache.spark.sql.auron.MetricNode
import org.apache.spark.sql.auron.NativeConverters
import org.apache.spark.sql.auron.NativeHelper
import org.apache.spark.sql.auron.NativeRDD
@@ -49,6 +48,7 @@ import org.apache.spark.sql.execution.UnaryExecNode
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.auron.{protobuf => pb}
+import org.apache.auron.metric.SparkMetricNode
import org.apache.auron.protobuf.WindowGroupLimit
abstract class NativeWindowBase(
@@ -191,7 +191,7 @@ abstract class NativeWindowBase(
override def doExecuteNative(): NativeRDD = {
val inputRDD = NativeHelper.executeNative(child)
- val nativeMetrics = MetricNode(metrics, inputRDD.metrics :: Nil)
+ val nativeMetrics = SparkMetricNode(metrics, inputRDD.metrics :: Nil)
val nativeWindowExprs = this.nativeWindowExprs
val nativeOrderSpecExprs = this.nativeOrderSpecExprs
val nativePartitionSpecExprs = this.nativePartitionSpecExprs
diff --git
a/thirdparty/auron-paimon/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/NativePaimonTableScanExec.scala
b/thirdparty/auron-paimon/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/NativePaimonTableScanExec.scala
index 8ce3a62f..2ae7efef 100644
---
a/thirdparty/auron-paimon/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/NativePaimonTableScanExec.scala
+++
b/thirdparty/auron-paimon/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/NativePaimonTableScanExec.scala
@@ -29,7 +29,6 @@ import org.apache.spark.Partition
import org.apache.spark.TaskContext
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.auron.MetricNode
import org.apache.spark.sql.auron.NativeRDD
import org.apache.spark.sql.auron.Shims
import org.apache.spark.sql.catalyst.InternalRow
@@ -45,6 +44,7 @@ import org.apache.spark.sql.hive.execution.HiveTableScanExec
import org.apache.spark.sql.types.StructType
import org.apache.auron.{protobuf => pb}
+import org.apache.auron.metric.SparkMetricNode
case class NativePaimonTableScanExec(basedHiveScan: HiveTableScanExec)
extends NativeHiveTableScanBase(basedHiveScan)
@@ -55,7 +55,7 @@ case class NativePaimonTableScanExec(basedHiveScan:
HiveTableScanExec)
private lazy val fileFormat = PaimonUtil.paimonFileFormat(table)
override def doExecuteNative(): NativeRDD = {
- val nativeMetrics = MetricNode(
+ val nativeMetrics = SparkMetricNode(
metrics,
Nil,
Some({