This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new e399dc3b8 [GLUTEN-7008][VL] Report spill metrics from Velox operators
to Spark task (#7009)
e399dc3b8 is described below
commit e399dc3b84a368830c3e71e0f3472be96978ca78
Author: Hongze Zhang <[email protected]>
AuthorDate: Mon Aug 26 13:25:42 2024 +0800
[GLUTEN-7008][VL] Report spill metrics from Velox operators to Spark task
(#7009)
Closes #7008
---
cpp/core/jni/JniWrapper.cc | 3 ++-
cpp/core/utils/metrics.h | 1 +
cpp/velox/compute/WholeStageResultIterator.cc | 1 +
.../java/org/apache/gluten/metrics/Metrics.java | 4 ++++
.../org/apache/gluten/metrics/OperatorMetrics.java | 3 +++
.../metrics/HashAggregateMetricsUpdater.scala | 10 +++++++++
.../apache/gluten/metrics/JoinMetricsUpdater.scala | 16 +++++++++++++++
.../org/apache/gluten/metrics/MetricsUtil.scala | 3 +++
.../apache/gluten/metrics/SortMetricsUpdater.scala | 10 +++++++++
.../apache/spark/sql/utils/SparkMetricsUtil.scala | 24 ++++++++++++++++++++++
10 files changed, 74 insertions(+), 1 deletion(-)
diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc
index 4be5e9142..1662b200b 100644
--- a/cpp/core/jni/JniWrapper.cc
+++ b/cpp/core/jni/JniWrapper.cc
@@ -175,7 +175,7 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) {
metricsBuilderClass = createGlobalClassReferenceOrError(env,
"Lorg/apache/gluten/metrics/Metrics;");
metricsBuilderConstructor = getMethodIdOrError(
- env, metricsBuilderClass, "<init>",
"([J[J[J[J[J[J[J[J[J[JJ[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J)V");
+ env, metricsBuilderClass, "<init>",
"([J[J[J[J[J[J[J[J[J[JJ[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J)V");
nativeColumnarToRowInfoClass =
createGlobalClassReferenceOrError(env,
"Lorg/apache/gluten/vectorized/NativeColumnarToRowInfo;");
@@ -478,6 +478,7 @@ JNIEXPORT jobject JNICALL
Java_org_apache_gluten_vectorized_ColumnarBatchOutIter
metrics ? metrics->veloxToArrow : -1,
longArray[Metrics::kPeakMemoryBytes],
longArray[Metrics::kNumMemoryAllocations],
+ longArray[Metrics::kSpilledInputBytes],
longArray[Metrics::kSpilledBytes],
longArray[Metrics::kSpilledRows],
longArray[Metrics::kSpilledPartitions],
diff --git a/cpp/core/utils/metrics.h b/cpp/core/utils/metrics.h
index 5b3167b82..bda72b070 100644
--- a/cpp/core/utils/metrics.h
+++ b/cpp/core/utils/metrics.h
@@ -54,6 +54,7 @@ struct Metrics {
kNumMemoryAllocations,
// Spill.
+ kSpilledInputBytes,
kSpilledBytes,
kSpilledRows,
kSpilledPartitions,
diff --git a/cpp/velox/compute/WholeStageResultIterator.cc
b/cpp/velox/compute/WholeStageResultIterator.cc
index 2edf9a573..59181aaea 100644
--- a/cpp/velox/compute/WholeStageResultIterator.cc
+++ b/cpp/velox/compute/WholeStageResultIterator.cc
@@ -364,6 +364,7 @@ void WholeStageResultIterator::collectMetrics() {
metrics_->get(Metrics::kWallNanos)[metricIndex] =
second->cpuWallTiming.wallNanos;
metrics_->get(Metrics::kPeakMemoryBytes)[metricIndex] =
second->peakMemoryBytes;
metrics_->get(Metrics::kNumMemoryAllocations)[metricIndex] =
second->numMemoryAllocations;
+ metrics_->get(Metrics::kSpilledInputBytes)[metricIndex] =
second->spilledInputBytes;
metrics_->get(Metrics::kSpilledBytes)[metricIndex] =
second->spilledBytes;
metrics_->get(Metrics::kSpilledRows)[metricIndex] = second->spilledRows;
metrics_->get(Metrics::kSpilledPartitions)[metricIndex] =
second->spilledPartitions;
diff --git a/gluten-data/src/main/java/org/apache/gluten/metrics/Metrics.java
b/gluten-data/src/main/java/org/apache/gluten/metrics/Metrics.java
index 4e91823e5..c4dcbb65a 100644
--- a/gluten-data/src/main/java/org/apache/gluten/metrics/Metrics.java
+++ b/gluten-data/src/main/java/org/apache/gluten/metrics/Metrics.java
@@ -32,6 +32,7 @@ public class Metrics implements IMetrics {
public long[] scanTime;
public long[] peakMemoryBytes;
public long[] numMemoryAllocations;
+ public long[] spilledInputBytes;
public long[] spilledBytes;
public long[] spilledRows;
public long[] spilledPartitions;
@@ -69,6 +70,7 @@ public class Metrics implements IMetrics {
long veloxToArrow,
long[] peakMemoryBytes,
long[] numMemoryAllocations,
+ long[] spilledInputBytes,
long[] spilledBytes,
long[] spilledRows,
long[] spilledPartitions,
@@ -101,6 +103,7 @@ public class Metrics implements IMetrics {
this.singleMetric.veloxToArrow = veloxToArrow;
this.peakMemoryBytes = peakMemoryBytes;
this.numMemoryAllocations = numMemoryAllocations;
+ this.spilledInputBytes = spilledInputBytes;
this.spilledBytes = spilledBytes;
this.spilledRows = spilledRows;
this.spilledPartitions = spilledPartitions;
@@ -138,6 +141,7 @@ public class Metrics implements IMetrics {
wallNanos[index],
peakMemoryBytes[index],
numMemoryAllocations[index],
+ spilledInputBytes[index],
spilledBytes[index],
spilledRows[index],
spilledPartitions[index],
diff --git
a/gluten-data/src/main/java/org/apache/gluten/metrics/OperatorMetrics.java
b/gluten-data/src/main/java/org/apache/gluten/metrics/OperatorMetrics.java
index cb155afed..cad04987e 100644
--- a/gluten-data/src/main/java/org/apache/gluten/metrics/OperatorMetrics.java
+++ b/gluten-data/src/main/java/org/apache/gluten/metrics/OperatorMetrics.java
@@ -30,6 +30,7 @@ public class OperatorMetrics implements IOperatorMetrics {
public long scanTime;
public long peakMemoryBytes;
public long numMemoryAllocations;
+ public long spilledInputBytes;
public long spilledBytes;
public long spilledRows;
public long spilledPartitions;
@@ -64,6 +65,7 @@ public class OperatorMetrics implements IOperatorMetrics {
long wallNanos,
long peakMemoryBytes,
long numMemoryAllocations,
+ long spilledInputBytes,
long spilledBytes,
long spilledRows,
long spilledPartitions,
@@ -95,6 +97,7 @@ public class OperatorMetrics implements IOperatorMetrics {
this.scanTime = scanTime;
this.peakMemoryBytes = peakMemoryBytes;
this.numMemoryAllocations = numMemoryAllocations;
+ this.spilledInputBytes = spilledInputBytes;
this.spilledBytes = spilledBytes;
this.spilledRows = spilledRows;
this.spilledPartitions = spilledPartitions;
diff --git
a/gluten-data/src/main/scala/org/apache/gluten/metrics/HashAggregateMetricsUpdater.scala
b/gluten-data/src/main/scala/org/apache/gluten/metrics/HashAggregateMetricsUpdater.scala
index 5337af9e6..5053cc2ba 100644
---
a/gluten-data/src/main/scala/org/apache/gluten/metrics/HashAggregateMetricsUpdater.scala
+++
b/gluten-data/src/main/scala/org/apache/gluten/metrics/HashAggregateMetricsUpdater.scala
@@ -19,6 +19,8 @@ package org.apache.gluten.metrics
import org.apache.gluten.substrait.AggregationParams
import org.apache.spark.sql.execution.metric.SQLMetric
+import org.apache.spark.sql.utils.SparkMetricsUtil
+import org.apache.spark.util.TaskResources
trait HashAggregateMetricsUpdater extends MetricsUpdater {
def updateAggregationMetrics(
@@ -81,5 +83,13 @@ class HashAggregateMetricsUpdaterImpl(val metrics:
Map[String, SQLMetric])
rowConstructionWallNanos += aggregationMetrics.get(idx).wallNanos
idx += 1
}
+ if (TaskResources.inSparkTask()) {
+ SparkMetricsUtil.incMemoryBytesSpilled(
+ TaskResources.getLocalTaskContext().taskMetrics(),
+ aggMetrics.spilledInputBytes)
+ SparkMetricsUtil.incDiskBytesSpilled(
+ TaskResources.getLocalTaskContext().taskMetrics(),
+ aggMetrics.spilledBytes)
+ }
}
}
diff --git
a/gluten-data/src/main/scala/org/apache/gluten/metrics/JoinMetricsUpdater.scala
b/gluten-data/src/main/scala/org/apache/gluten/metrics/JoinMetricsUpdater.scala
index 60be8418a..fe1fa2ad6 100644
---
a/gluten-data/src/main/scala/org/apache/gluten/metrics/JoinMetricsUpdater.scala
+++
b/gluten-data/src/main/scala/org/apache/gluten/metrics/JoinMetricsUpdater.scala
@@ -20,6 +20,8 @@ import org.apache.gluten.metrics.Metrics.SingleMetric
import org.apache.gluten.substrait.JoinParams
import org.apache.spark.sql.execution.metric.SQLMetric
+import org.apache.spark.sql.utils.SparkMetricsUtil
+import org.apache.spark.util.TaskResources
import java.util
@@ -150,6 +152,20 @@ class HashJoinMetricsUpdater(override val metrics:
Map[String, SQLMetric])
streamPreProjectionWallNanos += joinMetrics.get(idx).wallNanos
idx += 1
}
+ if (TaskResources.inSparkTask()) {
+ SparkMetricsUtil.incMemoryBytesSpilled(
+ TaskResources.getLocalTaskContext().taskMetrics(),
+ hashProbeMetrics.spilledInputBytes)
+ SparkMetricsUtil.incDiskBytesSpilled(
+ TaskResources.getLocalTaskContext().taskMetrics(),
+ hashProbeMetrics.spilledBytes)
+ SparkMetricsUtil.incMemoryBytesSpilled(
+ TaskResources.getLocalTaskContext().taskMetrics(),
+ hashBuildMetrics.spilledInputBytes)
+ SparkMetricsUtil.incDiskBytesSpilled(
+ TaskResources.getLocalTaskContext().taskMetrics(),
+ hashBuildMetrics.spilledBytes)
+ }
}
}
diff --git
a/gluten-data/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala
b/gluten-data/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala
index eff4245d5..8eea58272 100644
--- a/gluten-data/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala
+++ b/gluten-data/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala
@@ -105,6 +105,7 @@ object MetricsUtil extends Logging {
var wallNanos: Long = 0
var peakMemoryBytes: Long = 0
var numMemoryAllocations: Long = 0
+ var spilledInputBytes: Long = 0
var spilledBytes: Long = 0
var spilledRows: Long = 0
var spilledPartitions: Long = 0
@@ -130,6 +131,7 @@ object MetricsUtil extends Logging {
wallNanos += metrics.wallNanos
peakMemoryBytes = peakMemoryBytes.max(metrics.peakMemoryBytes)
numMemoryAllocations += metrics.numMemoryAllocations
+ spilledInputBytes += metrics.spilledInputBytes
spilledBytes += metrics.spilledBytes
spilledRows += metrics.spilledRows
spilledPartitions += metrics.spilledPartitions
@@ -162,6 +164,7 @@ object MetricsUtil extends Logging {
wallNanos,
peakMemoryBytes,
numMemoryAllocations,
+ spilledInputBytes,
spilledBytes,
spilledRows,
spilledPartitions,
diff --git
a/gluten-data/src/main/scala/org/apache/gluten/metrics/SortMetricsUpdater.scala
b/gluten-data/src/main/scala/org/apache/gluten/metrics/SortMetricsUpdater.scala
index 138740350..0ba48c62c 100644
---
a/gluten-data/src/main/scala/org/apache/gluten/metrics/SortMetricsUpdater.scala
+++
b/gluten-data/src/main/scala/org/apache/gluten/metrics/SortMetricsUpdater.scala
@@ -17,6 +17,8 @@
package org.apache.gluten.metrics
import org.apache.spark.sql.execution.metric.SQLMetric
+import org.apache.spark.sql.utils.SparkMetricsUtil
+import org.apache.spark.util.TaskResources
class SortMetricsUpdater(val metrics: Map[String, SQLMetric]) extends
MetricsUpdater {
@@ -34,6 +36,14 @@ class SortMetricsUpdater(val metrics: Map[String,
SQLMetric]) extends MetricsUpd
metrics("spilledRows") += operatorMetrics.spilledRows
metrics("spilledPartitions") += operatorMetrics.spilledPartitions
metrics("spilledFiles") += operatorMetrics.spilledFiles
+ if (TaskResources.inSparkTask()) {
+ SparkMetricsUtil.incMemoryBytesSpilled(
+ TaskResources.getLocalTaskContext().taskMetrics(),
+ operatorMetrics.spilledInputBytes)
+ SparkMetricsUtil.incDiskBytesSpilled(
+ TaskResources.getLocalTaskContext().taskMetrics(),
+ operatorMetrics.spilledBytes)
+ }
}
}
}
diff --git
a/gluten-data/src/main/scala/org/apache/spark/sql/utils/SparkMetricsUtil.scala
b/gluten-data/src/main/scala/org/apache/spark/sql/utils/SparkMetricsUtil.scala
new file mode 100644
index 000000000..af8bbb08e
--- /dev/null
+++
b/gluten-data/src/main/scala/org/apache/spark/sql/utils/SparkMetricsUtil.scala
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.utils
+
+import org.apache.spark.executor.TaskMetrics
+
+object SparkMetricsUtil {
+ def incMemoryBytesSpilled(task: TaskMetrics, v: Long): Unit =
task.incMemoryBytesSpilled(v)
+ def incDiskBytesSpilled(task: TaskMetrics, v: Long): Unit =
task.incDiskBytesSpilled(v)
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]