This is an automated email from the ASF dual-hosted git repository.

marong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new cba61a4e6e [GLUTEN-9769][CELEBORN]  Use the correct shuffle time 
metrics for celeborn columnar shuffle (#9770)
cba61a4e6e is described below

commit cba61a4e6eb9fea9947c8e36d87240c4f20dcf29
Author: wankun <[email protected]>
AuthorDate: Sat May 31 03:15:54 2025 +0800

    [GLUTEN-9769][CELEBORN]  Use the correct shuffle time metrics for celeborn 
columnar shuffle (#9770)
---
 .../VeloxCelebornColumnarShuffleWriter.scala        | 21 ++++++++++-----------
 .../spark/shuffle/ColumnarShuffleWriter.scala       |  4 ++--
 .../apache/spark/shuffle/utils/ShuffleUtil.scala    |  3 +--
 .../spark/shuffle/GlutenShuffleWriterWrapper.scala  | 12 ++----------
 4 files changed, 15 insertions(+), 25 deletions(-)

diff --git 
a/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala
 
b/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala
index b7af7c02a8..547e2322fb 100644
--- 
a/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala
+++ 
b/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala
@@ -19,7 +19,6 @@ package org.apache.spark.shuffle
 import org.apache.gluten.backendsapi.BackendsApiManager
 import org.apache.gluten.columnarbatch.ColumnarBatches
 import org.apache.gluten.config.GlutenConfig
-import org.apache.gluten.config.ReservedKeys
 import org.apache.gluten.memory.memtarget.{MemoryTarget, Spiller, Spillers}
 import org.apache.gluten.runtime.Runtimes
 import org.apache.gluten.vectorized._
@@ -29,6 +28,7 @@ import 
org.apache.spark.internal.config.{SHUFFLE_DISK_WRITE_BUFFER_SIZE, SHUFFLE
 import org.apache.spark.memory.SparkMemoryUtil
 import org.apache.spark.scheduler.MapStatus
 import org.apache.spark.shuffle.celeborn.CelebornShuffleHandle
+import org.apache.spark.sql.execution.metric.SQLMetric
 import org.apache.spark.sql.vectorized.ColumnarBatch
 import org.apache.spark.util.SparkResourceUtil
 
@@ -51,8 +51,6 @@ class VeloxCelebornColumnarShuffleWriter[K, V](
     celebornConf,
     client,
     writeMetrics) {
-  private val isSort = 
!ReservedKeys.GLUTEN_HASH_SHUFFLE_WRITER.equals(shuffleWriterType)
-
   private val runtime =
     Runtimes.contextInstance(BackendsApiManager.getBackendName, 
"CelebornShuffleWriter")
 
@@ -61,9 +59,15 @@ class VeloxCelebornColumnarShuffleWriter[K, V](
   private var splitResult: GlutenSplitResult = _
 
   private def availableOffHeapPerTask(): Long = {
-    val perTask =
-      SparkMemoryUtil.getCurrentAvailableOffHeapMemory / 
SparkResourceUtil.getTaskSlots(conf)
-    perTask
+    SparkMemoryUtil.getCurrentAvailableOffHeapMemory / 
SparkResourceUtil.getTaskSlots(conf)
+  }
+
+  private val nativeMetrics: SQLMetric = {
+    if (dep.isSort) {
+      dep.metrics("sortTime")
+    } else {
+      dep.metrics("splitTime")
+    }
   }
 
   @throws[IOException]
@@ -99,11 +103,6 @@ class VeloxCelebornColumnarShuffleWriter[K, V](
     splitResult = jniWrapper.stop(nativeShuffleWriter)
 
     dep.metrics("shuffleWallTime").add(System.nanoTime() - startTime)
-    val nativeMetrics = if (isSort) {
-      dep.metrics("sortTime")
-    } else {
-      dep.metrics("splitTime")
-    }
     nativeMetrics
       .add(
         dep.metrics("shuffleWallTime").value - splitResult.getTotalPushTime -
diff --git 
a/backends-velox/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala
 
b/backends-velox/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala
index 3f24205a9d..4478140780 100644
--- 
a/backends-velox/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala
+++ 
b/backends-velox/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala
@@ -38,12 +38,12 @@ class ColumnarShuffleWriter[K, V](
     shuffleBlockResolver: IndexShuffleBlockResolver,
     handle: BaseShuffleHandle[K, V, V],
     mapId: Long,
-    writeMetrics: ShuffleWriteMetricsReporter,
-    isSort: Boolean)
+    writeMetrics: ShuffleWriteMetricsReporter)
   extends ShuffleWriter[K, V]
   with Logging {
 
   private val dep = 
handle.dependency.asInstanceOf[ColumnarShuffleDependency[K, V, V]]
+  protected val isSort: Boolean = dep.isSort
 
   private val conf = SparkEnv.get.conf
 
diff --git 
a/backends-velox/src/main/scala/org/apache/spark/shuffle/utils/ShuffleUtil.scala
 
b/backends-velox/src/main/scala/org/apache/spark/shuffle/utils/ShuffleUtil.scala
index 7e3bf50fc8..d0589c90d6 100644
--- 
a/backends-velox/src/main/scala/org/apache/spark/shuffle/utils/ShuffleUtil.scala
+++ 
b/backends-velox/src/main/scala/org/apache/spark/shuffle/utils/ShuffleUtil.scala
@@ -27,7 +27,6 @@ object ShuffleUtil {
         parameters.shuffleBlockResolver,
         parameters.columnarShuffleHandle,
         parameters.mapId,
-        parameters.metrics,
-        parameters.isSort))
+        parameters.metrics))
   }
 }
diff --git 
a/gluten-substrait/src/main/scala/org/apache/spark/shuffle/GlutenShuffleWriterWrapper.scala
 
b/gluten-substrait/src/main/scala/org/apache/spark/shuffle/GlutenShuffleWriterWrapper.scala
index 5170bd2b2a..c5560df25a 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/spark/shuffle/GlutenShuffleWriterWrapper.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/spark/shuffle/GlutenShuffleWriterWrapper.scala
@@ -26,8 +26,7 @@ case class GenShuffleWriterParameters[K, V](
     shuffleBlockResolver: IndexShuffleBlockResolver,
     columnarShuffleHandle: ColumnarShuffleHandle[K, V],
     mapId: Long,
-    metrics: ShuffleWriteMetricsReporter,
-    isSort: Boolean = false)
+    metrics: ShuffleWriteMetricsReporter)
 
 object GlutenShuffleWriterWrapper {
 
@@ -36,16 +35,9 @@ object GlutenShuffleWriterWrapper {
       columnarShuffleHandle: ColumnarShuffleHandle[K, V],
       mapId: Long,
       metrics: ShuffleWriteMetricsReporter): ShuffleWriter[K, V] = {
-    val isSort =
-      
columnarShuffleHandle.dependency.asInstanceOf[ColumnarShuffleDependency[K, V, 
V]].isSort
     BackendsApiManager.getSparkPlanExecApiInstance
       .genColumnarShuffleWriter(
-        GenShuffleWriterParameters(
-          shuffleBlockResolver,
-          columnarShuffleHandle,
-          mapId,
-          metrics,
-          isSort))
+        GenShuffleWriterParameters(shuffleBlockResolver, 
columnarShuffleHandle, mapId, metrics))
       .shuffleWriter
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to