This is an automated email from the ASF dual-hosted git repository.

hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new e399dc3b8 [GLUTEN-7008][VL] Report spill metrics from Velox operators 
to Spark task (#7009)
e399dc3b8 is described below

commit e399dc3b84a368830c3e71e0f3472be96978ca78
Author: Hongze Zhang <[email protected]>
AuthorDate: Mon Aug 26 13:25:42 2024 +0800

    [GLUTEN-7008][VL] Report spill metrics from Velox operators to Spark task 
(#7009)
    
    Closes #7008
---
 cpp/core/jni/JniWrapper.cc                         |  3 ++-
 cpp/core/utils/metrics.h                           |  1 +
 cpp/velox/compute/WholeStageResultIterator.cc      |  1 +
 .../java/org/apache/gluten/metrics/Metrics.java    |  4 ++++
 .../org/apache/gluten/metrics/OperatorMetrics.java |  3 +++
 .../metrics/HashAggregateMetricsUpdater.scala      | 10 +++++++++
 .../apache/gluten/metrics/JoinMetricsUpdater.scala | 16 +++++++++++++++
 .../org/apache/gluten/metrics/MetricsUtil.scala    |  3 +++
 .../apache/gluten/metrics/SortMetricsUpdater.scala | 10 +++++++++
 .../apache/spark/sql/utils/SparkMetricsUtil.scala  | 24 ++++++++++++++++++++++
 10 files changed, 74 insertions(+), 1 deletion(-)

diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc
index 4be5e9142..1662b200b 100644
--- a/cpp/core/jni/JniWrapper.cc
+++ b/cpp/core/jni/JniWrapper.cc
@@ -175,7 +175,7 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) {
   metricsBuilderClass = createGlobalClassReferenceOrError(env, 
"Lorg/apache/gluten/metrics/Metrics;");
 
   metricsBuilderConstructor = getMethodIdOrError(
-      env, metricsBuilderClass, "<init>", 
"([J[J[J[J[J[J[J[J[J[JJ[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J)V");
+      env, metricsBuilderClass, "<init>", 
"([J[J[J[J[J[J[J[J[J[JJ[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J)V");
 
   nativeColumnarToRowInfoClass =
       createGlobalClassReferenceOrError(env, 
"Lorg/apache/gluten/vectorized/NativeColumnarToRowInfo;");
@@ -478,6 +478,7 @@ JNIEXPORT jobject JNICALL 
Java_org_apache_gluten_vectorized_ColumnarBatchOutIter
       metrics ? metrics->veloxToArrow : -1,
       longArray[Metrics::kPeakMemoryBytes],
       longArray[Metrics::kNumMemoryAllocations],
+      longArray[Metrics::kSpilledInputBytes],
       longArray[Metrics::kSpilledBytes],
       longArray[Metrics::kSpilledRows],
       longArray[Metrics::kSpilledPartitions],
diff --git a/cpp/core/utils/metrics.h b/cpp/core/utils/metrics.h
index 5b3167b82..bda72b070 100644
--- a/cpp/core/utils/metrics.h
+++ b/cpp/core/utils/metrics.h
@@ -54,6 +54,7 @@ struct Metrics {
     kNumMemoryAllocations,
 
     // Spill.
+    kSpilledInputBytes,
     kSpilledBytes,
     kSpilledRows,
     kSpilledPartitions,
diff --git a/cpp/velox/compute/WholeStageResultIterator.cc 
b/cpp/velox/compute/WholeStageResultIterator.cc
index 2edf9a573..59181aaea 100644
--- a/cpp/velox/compute/WholeStageResultIterator.cc
+++ b/cpp/velox/compute/WholeStageResultIterator.cc
@@ -364,6 +364,7 @@ void WholeStageResultIterator::collectMetrics() {
       metrics_->get(Metrics::kWallNanos)[metricIndex] = 
second->cpuWallTiming.wallNanos;
       metrics_->get(Metrics::kPeakMemoryBytes)[metricIndex] = 
second->peakMemoryBytes;
       metrics_->get(Metrics::kNumMemoryAllocations)[metricIndex] = 
second->numMemoryAllocations;
+      metrics_->get(Metrics::kSpilledInputBytes)[metricIndex] = 
second->spilledInputBytes;
       metrics_->get(Metrics::kSpilledBytes)[metricIndex] = 
second->spilledBytes;
       metrics_->get(Metrics::kSpilledRows)[metricIndex] = second->spilledRows;
       metrics_->get(Metrics::kSpilledPartitions)[metricIndex] = 
second->spilledPartitions;
diff --git a/gluten-data/src/main/java/org/apache/gluten/metrics/Metrics.java 
b/gluten-data/src/main/java/org/apache/gluten/metrics/Metrics.java
index 4e91823e5..c4dcbb65a 100644
--- a/gluten-data/src/main/java/org/apache/gluten/metrics/Metrics.java
+++ b/gluten-data/src/main/java/org/apache/gluten/metrics/Metrics.java
@@ -32,6 +32,7 @@ public class Metrics implements IMetrics {
   public long[] scanTime;
   public long[] peakMemoryBytes;
   public long[] numMemoryAllocations;
+  public long[] spilledInputBytes;
   public long[] spilledBytes;
   public long[] spilledRows;
   public long[] spilledPartitions;
@@ -69,6 +70,7 @@ public class Metrics implements IMetrics {
       long veloxToArrow,
       long[] peakMemoryBytes,
       long[] numMemoryAllocations,
+      long[] spilledInputBytes,
       long[] spilledBytes,
       long[] spilledRows,
       long[] spilledPartitions,
@@ -101,6 +103,7 @@ public class Metrics implements IMetrics {
     this.singleMetric.veloxToArrow = veloxToArrow;
     this.peakMemoryBytes = peakMemoryBytes;
     this.numMemoryAllocations = numMemoryAllocations;
+    this.spilledInputBytes = spilledInputBytes;
     this.spilledBytes = spilledBytes;
     this.spilledRows = spilledRows;
     this.spilledPartitions = spilledPartitions;
@@ -138,6 +141,7 @@ public class Metrics implements IMetrics {
         wallNanos[index],
         peakMemoryBytes[index],
         numMemoryAllocations[index],
+        spilledInputBytes[index],
         spilledBytes[index],
         spilledRows[index],
         spilledPartitions[index],
diff --git 
a/gluten-data/src/main/java/org/apache/gluten/metrics/OperatorMetrics.java 
b/gluten-data/src/main/java/org/apache/gluten/metrics/OperatorMetrics.java
index cb155afed..cad04987e 100644
--- a/gluten-data/src/main/java/org/apache/gluten/metrics/OperatorMetrics.java
+++ b/gluten-data/src/main/java/org/apache/gluten/metrics/OperatorMetrics.java
@@ -30,6 +30,7 @@ public class OperatorMetrics implements IOperatorMetrics {
   public long scanTime;
   public long peakMemoryBytes;
   public long numMemoryAllocations;
+  public long spilledInputBytes;
   public long spilledBytes;
   public long spilledRows;
   public long spilledPartitions;
@@ -64,6 +65,7 @@ public class OperatorMetrics implements IOperatorMetrics {
       long wallNanos,
       long peakMemoryBytes,
       long numMemoryAllocations,
+      long spilledInputBytes,
       long spilledBytes,
       long spilledRows,
       long spilledPartitions,
@@ -95,6 +97,7 @@ public class OperatorMetrics implements IOperatorMetrics {
     this.scanTime = scanTime;
     this.peakMemoryBytes = peakMemoryBytes;
     this.numMemoryAllocations = numMemoryAllocations;
+    this.spilledInputBytes = spilledInputBytes;
     this.spilledBytes = spilledBytes;
     this.spilledRows = spilledRows;
     this.spilledPartitions = spilledPartitions;
diff --git 
a/gluten-data/src/main/scala/org/apache/gluten/metrics/HashAggregateMetricsUpdater.scala
 
b/gluten-data/src/main/scala/org/apache/gluten/metrics/HashAggregateMetricsUpdater.scala
index 5337af9e6..5053cc2ba 100644
--- 
a/gluten-data/src/main/scala/org/apache/gluten/metrics/HashAggregateMetricsUpdater.scala
+++ 
b/gluten-data/src/main/scala/org/apache/gluten/metrics/HashAggregateMetricsUpdater.scala
@@ -19,6 +19,8 @@ package org.apache.gluten.metrics
 import org.apache.gluten.substrait.AggregationParams
 
 import org.apache.spark.sql.execution.metric.SQLMetric
+import org.apache.spark.sql.utils.SparkMetricsUtil
+import org.apache.spark.util.TaskResources
 
 trait HashAggregateMetricsUpdater extends MetricsUpdater {
   def updateAggregationMetrics(
@@ -81,5 +83,13 @@ class HashAggregateMetricsUpdaterImpl(val metrics: 
Map[String, SQLMetric])
       rowConstructionWallNanos += aggregationMetrics.get(idx).wallNanos
       idx += 1
     }
+    if (TaskResources.inSparkTask()) {
+      SparkMetricsUtil.incMemoryBytesSpilled(
+        TaskResources.getLocalTaskContext().taskMetrics(),
+        aggMetrics.spilledInputBytes)
+      SparkMetricsUtil.incDiskBytesSpilled(
+        TaskResources.getLocalTaskContext().taskMetrics(),
+        aggMetrics.spilledBytes)
+    }
   }
 }
diff --git 
a/gluten-data/src/main/scala/org/apache/gluten/metrics/JoinMetricsUpdater.scala 
b/gluten-data/src/main/scala/org/apache/gluten/metrics/JoinMetricsUpdater.scala
index 60be8418a..fe1fa2ad6 100644
--- 
a/gluten-data/src/main/scala/org/apache/gluten/metrics/JoinMetricsUpdater.scala
+++ 
b/gluten-data/src/main/scala/org/apache/gluten/metrics/JoinMetricsUpdater.scala
@@ -20,6 +20,8 @@ import org.apache.gluten.metrics.Metrics.SingleMetric
 import org.apache.gluten.substrait.JoinParams
 
 import org.apache.spark.sql.execution.metric.SQLMetric
+import org.apache.spark.sql.utils.SparkMetricsUtil
+import org.apache.spark.util.TaskResources
 
 import java.util
 
@@ -150,6 +152,20 @@ class HashJoinMetricsUpdater(override val metrics: 
Map[String, SQLMetric])
       streamPreProjectionWallNanos += joinMetrics.get(idx).wallNanos
       idx += 1
     }
+    if (TaskResources.inSparkTask()) {
+      SparkMetricsUtil.incMemoryBytesSpilled(
+        TaskResources.getLocalTaskContext().taskMetrics(),
+        hashProbeMetrics.spilledInputBytes)
+      SparkMetricsUtil.incDiskBytesSpilled(
+        TaskResources.getLocalTaskContext().taskMetrics(),
+        hashProbeMetrics.spilledBytes)
+      SparkMetricsUtil.incMemoryBytesSpilled(
+        TaskResources.getLocalTaskContext().taskMetrics(),
+        hashBuildMetrics.spilledInputBytes)
+      SparkMetricsUtil.incDiskBytesSpilled(
+        TaskResources.getLocalTaskContext().taskMetrics(),
+        hashBuildMetrics.spilledBytes)
+    }
   }
 }
 
diff --git 
a/gluten-data/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala 
b/gluten-data/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala
index eff4245d5..8eea58272 100644
--- a/gluten-data/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala
+++ b/gluten-data/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala
@@ -105,6 +105,7 @@ object MetricsUtil extends Logging {
     var wallNanos: Long = 0
     var peakMemoryBytes: Long = 0
     var numMemoryAllocations: Long = 0
+    var spilledInputBytes: Long = 0
     var spilledBytes: Long = 0
     var spilledRows: Long = 0
     var spilledPartitions: Long = 0
@@ -130,6 +131,7 @@ object MetricsUtil extends Logging {
       wallNanos += metrics.wallNanos
       peakMemoryBytes = peakMemoryBytes.max(metrics.peakMemoryBytes)
       numMemoryAllocations += metrics.numMemoryAllocations
+      spilledInputBytes += metrics.spilledInputBytes
       spilledBytes += metrics.spilledBytes
       spilledRows += metrics.spilledRows
       spilledPartitions += metrics.spilledPartitions
@@ -162,6 +164,7 @@ object MetricsUtil extends Logging {
       wallNanos,
       peakMemoryBytes,
       numMemoryAllocations,
+      spilledInputBytes,
       spilledBytes,
       spilledRows,
       spilledPartitions,
diff --git 
a/gluten-data/src/main/scala/org/apache/gluten/metrics/SortMetricsUpdater.scala 
b/gluten-data/src/main/scala/org/apache/gluten/metrics/SortMetricsUpdater.scala
index 138740350..0ba48c62c 100644
--- 
a/gluten-data/src/main/scala/org/apache/gluten/metrics/SortMetricsUpdater.scala
+++ 
b/gluten-data/src/main/scala/org/apache/gluten/metrics/SortMetricsUpdater.scala
@@ -17,6 +17,8 @@
 package org.apache.gluten.metrics
 
 import org.apache.spark.sql.execution.metric.SQLMetric
+import org.apache.spark.sql.utils.SparkMetricsUtil
+import org.apache.spark.util.TaskResources
 
 class SortMetricsUpdater(val metrics: Map[String, SQLMetric]) extends 
MetricsUpdater {
 
@@ -34,6 +36,14 @@ class SortMetricsUpdater(val metrics: Map[String, 
SQLMetric]) extends MetricsUpd
       metrics("spilledRows") += operatorMetrics.spilledRows
       metrics("spilledPartitions") += operatorMetrics.spilledPartitions
       metrics("spilledFiles") += operatorMetrics.spilledFiles
+      if (TaskResources.inSparkTask()) {
+        SparkMetricsUtil.incMemoryBytesSpilled(
+          TaskResources.getLocalTaskContext().taskMetrics(),
+          operatorMetrics.spilledInputBytes)
+        SparkMetricsUtil.incDiskBytesSpilled(
+          TaskResources.getLocalTaskContext().taskMetrics(),
+          operatorMetrics.spilledBytes)
+      }
     }
   }
 }
diff --git 
a/gluten-data/src/main/scala/org/apache/spark/sql/utils/SparkMetricsUtil.scala 
b/gluten-data/src/main/scala/org/apache/spark/sql/utils/SparkMetricsUtil.scala
new file mode 100644
index 000000000..af8bbb08e
--- /dev/null
+++ 
b/gluten-data/src/main/scala/org/apache/spark/sql/utils/SparkMetricsUtil.scala
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.utils
+
+import org.apache.spark.executor.TaskMetrics
+
+object SparkMetricsUtil {
+  def incMemoryBytesSpilled(task: TaskMetrics, v: Long): Unit = 
task.incMemoryBytesSpilled(v)
+  def incDiskBytesSpilled(task: TaskMetrics, v: Long): Unit = 
task.incDiskBytesSpilled(v)
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to