This is an automated email from the ASF dual-hosted git repository.

changchen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 944c92623 [GLUTEN-6695][CH] Introduce shuffleWallTime in CHMetricsApi 
to calculate the overall shuffle write time (#6696)
944c92623 is described below

commit 944c92623ae07430d48b7569ae52ec14eefcfb0a
Author: Nicholas Jiang <[email protected]>
AuthorDate: Sun Aug 4 14:37:03 2024 +0800

    [GLUTEN-6695][CH] Introduce shuffleWallTime in CHMetricsApi to calculate 
the overall shuffle write time (#6696)
---
 .../scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala | 1 +
 .../org/apache/spark/shuffle/CHCelebornColumnarShuffleWriter.scala    | 4 ++++
 2 files changed, 5 insertions(+)

diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala
index 85b298fa4..0ff53e1c5 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala
@@ -222,6 +222,7 @@ class CHMetricsApi extends MetricsApi with Logging with 
LogLevelUtil {
       "spillTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "time to 
spill"),
       "compressTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "time 
to compress"),
       "prepareTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "time 
to prepare"),
+      "shuffleWallTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, 
"shuffle wall time"),
       "avgReadBatchNumRows" -> SQLMetrics
         .createAverageMetric(sparkContext, "avg read batch num rows"),
       "numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input 
rows"),
diff --git 
a/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornColumnarShuffleWriter.scala
 
b/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornColumnarShuffleWriter.scala
index ae22a0890..c7d7957c1 100644
--- 
a/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornColumnarShuffleWriter.scala
+++ 
b/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornColumnarShuffleWriter.scala
@@ -63,7 +63,9 @@ class CHCelebornColumnarShuffleWriter[K, V](
       } else {
         initShuffleWriter(cb)
         val col = cb.column(0).asInstanceOf[CHColumnVector]
+        val startTime = System.nanoTime()
         jniWrapper.split(nativeShuffleWriter, col.getBlockAddress)
+        dep.metrics("shuffleWallTime").add(System.nanoTime() - startTime)
         dep.metrics("numInputRows").add(cb.numRows)
         dep.metrics("inputBatches").add(1)
         // This metric is important, AQE use it to decide if EliminateLimit
@@ -77,8 +79,10 @@ class CHCelebornColumnarShuffleWriter[K, V](
       return
     }
 
+    val startTime = System.nanoTime()
     splitResult = jniWrapper.stop(nativeShuffleWriter)
 
+    dep.metrics("shuffleWallTime").add(System.nanoTime() - startTime)
     dep.metrics("splitTime").add(splitResult.getSplitTime)
     dep.metrics("IOTime").add(splitResult.getDiskWriteTime)
     dep.metrics("serializeTime").add(splitResult.getSerializationTime)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to