This is an automated email from the ASF dual-hosted git repository.
marong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new cba61a4e6e [GLUTEN-9769][CELEBORN] Use the correct shuffle time
metrics for celeborn columnar shuffle (#9770)
cba61a4e6e is described below
commit cba61a4e6eb9fea9947c8e36d87240c4f20dcf29
Author: wankun <[email protected]>
AuthorDate: Sat May 31 03:15:54 2025 +0800
[GLUTEN-9769][CELEBORN] Use the correct shuffle time metrics for celeborn
columnar shuffle (#9770)
---
.../VeloxCelebornColumnarShuffleWriter.scala | 21 ++++++++++-----------
.../spark/shuffle/ColumnarShuffleWriter.scala | 4 ++--
.../apache/spark/shuffle/utils/ShuffleUtil.scala | 3 +--
.../spark/shuffle/GlutenShuffleWriterWrapper.scala | 12 ++----------
4 files changed, 15 insertions(+), 25 deletions(-)
diff --git
a/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala
b/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala
index b7af7c02a8..547e2322fb 100644
---
a/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala
+++
b/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala
@@ -19,7 +19,6 @@ package org.apache.spark.shuffle
import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.columnarbatch.ColumnarBatches
import org.apache.gluten.config.GlutenConfig
-import org.apache.gluten.config.ReservedKeys
import org.apache.gluten.memory.memtarget.{MemoryTarget, Spiller, Spillers}
import org.apache.gluten.runtime.Runtimes
import org.apache.gluten.vectorized._
@@ -29,6 +28,7 @@ import
org.apache.spark.internal.config.{SHUFFLE_DISK_WRITE_BUFFER_SIZE, SHUFFLE
import org.apache.spark.memory.SparkMemoryUtil
import org.apache.spark.scheduler.MapStatus
import org.apache.spark.shuffle.celeborn.CelebornShuffleHandle
+import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.SparkResourceUtil
@@ -51,8 +51,6 @@ class VeloxCelebornColumnarShuffleWriter[K, V](
celebornConf,
client,
writeMetrics) {
- private val isSort =
!ReservedKeys.GLUTEN_HASH_SHUFFLE_WRITER.equals(shuffleWriterType)
-
private val runtime =
Runtimes.contextInstance(BackendsApiManager.getBackendName,
"CelebornShuffleWriter")
@@ -61,9 +59,15 @@ class VeloxCelebornColumnarShuffleWriter[K, V](
private var splitResult: GlutenSplitResult = _
private def availableOffHeapPerTask(): Long = {
- val perTask =
- SparkMemoryUtil.getCurrentAvailableOffHeapMemory /
SparkResourceUtil.getTaskSlots(conf)
- perTask
+ SparkMemoryUtil.getCurrentAvailableOffHeapMemory /
SparkResourceUtil.getTaskSlots(conf)
+ }
+
+ private val nativeMetrics: SQLMetric = {
+ if (dep.isSort) {
+ dep.metrics("sortTime")
+ } else {
+ dep.metrics("splitTime")
+ }
}
@throws[IOException]
@@ -99,11 +103,6 @@ class VeloxCelebornColumnarShuffleWriter[K, V](
splitResult = jniWrapper.stop(nativeShuffleWriter)
dep.metrics("shuffleWallTime").add(System.nanoTime() - startTime)
- val nativeMetrics = if (isSort) {
- dep.metrics("sortTime")
- } else {
- dep.metrics("splitTime")
- }
nativeMetrics
.add(
dep.metrics("shuffleWallTime").value - splitResult.getTotalPushTime -
diff --git
a/backends-velox/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala
b/backends-velox/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala
index 3f24205a9d..4478140780 100644
---
a/backends-velox/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala
+++
b/backends-velox/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala
@@ -38,12 +38,12 @@ class ColumnarShuffleWriter[K, V](
shuffleBlockResolver: IndexShuffleBlockResolver,
handle: BaseShuffleHandle[K, V, V],
mapId: Long,
- writeMetrics: ShuffleWriteMetricsReporter,
- isSort: Boolean)
+ writeMetrics: ShuffleWriteMetricsReporter)
extends ShuffleWriter[K, V]
with Logging {
private val dep =
handle.dependency.asInstanceOf[ColumnarShuffleDependency[K, V, V]]
+ protected val isSort: Boolean = dep.isSort
private val conf = SparkEnv.get.conf
diff --git
a/backends-velox/src/main/scala/org/apache/spark/shuffle/utils/ShuffleUtil.scala
b/backends-velox/src/main/scala/org/apache/spark/shuffle/utils/ShuffleUtil.scala
index 7e3bf50fc8..d0589c90d6 100644
---
a/backends-velox/src/main/scala/org/apache/spark/shuffle/utils/ShuffleUtil.scala
+++
b/backends-velox/src/main/scala/org/apache/spark/shuffle/utils/ShuffleUtil.scala
@@ -27,7 +27,6 @@ object ShuffleUtil {
parameters.shuffleBlockResolver,
parameters.columnarShuffleHandle,
parameters.mapId,
- parameters.metrics,
- parameters.isSort))
+ parameters.metrics))
}
}
diff --git
a/gluten-substrait/src/main/scala/org/apache/spark/shuffle/GlutenShuffleWriterWrapper.scala
b/gluten-substrait/src/main/scala/org/apache/spark/shuffle/GlutenShuffleWriterWrapper.scala
index 5170bd2b2a..c5560df25a 100644
---
a/gluten-substrait/src/main/scala/org/apache/spark/shuffle/GlutenShuffleWriterWrapper.scala
+++
b/gluten-substrait/src/main/scala/org/apache/spark/shuffle/GlutenShuffleWriterWrapper.scala
@@ -26,8 +26,7 @@ case class GenShuffleWriterParameters[K, V](
shuffleBlockResolver: IndexShuffleBlockResolver,
columnarShuffleHandle: ColumnarShuffleHandle[K, V],
mapId: Long,
- metrics: ShuffleWriteMetricsReporter,
- isSort: Boolean = false)
+ metrics: ShuffleWriteMetricsReporter)
object GlutenShuffleWriterWrapper {
@@ -36,16 +35,9 @@ object GlutenShuffleWriterWrapper {
columnarShuffleHandle: ColumnarShuffleHandle[K, V],
mapId: Long,
metrics: ShuffleWriteMetricsReporter): ShuffleWriter[K, V] = {
- val isSort =
-
columnarShuffleHandle.dependency.asInstanceOf[ColumnarShuffleDependency[K, V,
V]].isSort
BackendsApiManager.getSparkPlanExecApiInstance
.genColumnarShuffleWriter(
- GenShuffleWriterParameters(
- shuffleBlockResolver,
- columnarShuffleHandle,
- mapId,
- metrics,
- isSort))
+ GenShuffleWriterParameters(shuffleBlockResolver,
columnarShuffleHandle, mapId, metrics))
.shuffleWriter
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]