This is an automated email from the ASF dual-hosted git repository.
changchen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 944c92623 [GLUTEN-6695][CH] Introduce shuffleWallTime in CHMetricsApi
to calculate the overall shuffle write time (#6696)
944c92623 is described below
commit 944c92623ae07430d48b7569ae52ec14eefcfb0a
Author: Nicholas Jiang <[email protected]>
AuthorDate: Sun Aug 4 14:37:03 2024 +0800
[GLUTEN-6695][CH] Introduce shuffleWallTime in CHMetricsApi to calculate
the overall shuffle write time (#6696)
---
.../scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala | 1 +
.../org/apache/spark/shuffle/CHCelebornColumnarShuffleWriter.scala | 4 ++++
2 files changed, 5 insertions(+)
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala
index 85b298fa4..0ff53e1c5 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala
@@ -222,6 +222,7 @@ class CHMetricsApi extends MetricsApi with Logging with
LogLevelUtil {
"spillTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "time to
spill"),
"compressTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "time
to compress"),
"prepareTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "time
to prepare"),
+ "shuffleWallTime" -> SQLMetrics.createNanoTimingMetric(sparkContext,
"shuffle wall time"),
"avgReadBatchNumRows" -> SQLMetrics
.createAverageMetric(sparkContext, "avg read batch num rows"),
"numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input
rows"),
diff --git
a/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornColumnarShuffleWriter.scala
b/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornColumnarShuffleWriter.scala
index ae22a0890..c7d7957c1 100644
---
a/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornColumnarShuffleWriter.scala
+++
b/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornColumnarShuffleWriter.scala
@@ -63,7 +63,9 @@ class CHCelebornColumnarShuffleWriter[K, V](
} else {
initShuffleWriter(cb)
val col = cb.column(0).asInstanceOf[CHColumnVector]
+ val startTime = System.nanoTime()
jniWrapper.split(nativeShuffleWriter, col.getBlockAddress)
+ dep.metrics("shuffleWallTime").add(System.nanoTime() - startTime)
dep.metrics("numInputRows").add(cb.numRows)
dep.metrics("inputBatches").add(1)
// This metric is important, AQE use it to decide if EliminateLimit
@@ -77,8 +79,10 @@ class CHCelebornColumnarShuffleWriter[K, V](
return
}
+ val startTime = System.nanoTime()
splitResult = jniWrapper.stop(nativeShuffleWriter)
+ dep.metrics("shuffleWallTime").add(System.nanoTime() - startTime)
dep.metrics("splitTime").add(splitResult.getSplitTime)
dep.metrics("IOTime").add(splitResult.getDiskWriteTime)
dep.metrics("serializeTime").add(splitResult.getSerializationTime)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]