This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 7b2b9f732 [CELEBORN-1265] Fix batches read metric for gluten columnar
shuffle
7b2b9f732 is described below
commit 7b2b9f7327293eeab0d418c1a4b9cdc30c7e1c08
Author: xiyu.zk <[email protected]>
AuthorDate: Wed Feb 7 17:41:10 2024 +0800
[CELEBORN-1265] Fix batches read metric for gluten columnar shuffle
### What changes were proposed in this pull request?
Fix batches read metric for gluten columnar shuffle
### Why are the changes needed?

Due to the fix in
[Gluten-4025](https://github.com/oap-project/gluten/pull/4051) for the records
read metric issue, the read metric of CelebornShuffleReader does not need
additional processing, otherwise the batches read metric will have the issue
shown in the graph.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
CI
Closes #2289 from kerwin-zk/batches-read.
Authored-by: xiyu.zk <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
---
.../apache/spark/shuffle/celeborn/CelebornShuffleReader.scala | 11 ++++-------
.../shuffle/celeborn/GlutenShuffleDependencyHelper.scala | 9 ---------
2 files changed, 4 insertions(+), 16 deletions(-)
diff --git
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
index 663983b58..dd8cba1a7 100644
---
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
+++
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
@@ -173,14 +173,11 @@ class CelebornShuffleReader[K, C](
}
val iterWithUpdatedRecordsRead =
- if (GlutenShuffleDependencyHelper.isGlutenDep(dep.getClass.getName)) {
- GlutenShuffleDependencyHelper.withUpdatedRecordsRead(recordIter,
metrics)
- } else {
- recordIter.map { record =>
- metrics.incRecordsRead(1)
- record
- }
+ recordIter.map { record =>
+ metrics.incRecordsRead(1)
+ record
}
+
val metricIter = CompletionIterator[(Any, Any), Iterator[(Any, Any)]](
iterWithUpdatedRecordsRead,
context.taskMetrics().mergeShuffleReadMetrics())
diff --git
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/GlutenShuffleDependencyHelper.scala
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/GlutenShuffleDependencyHelper.scala
index 4d743cda8..7e0fc85e2 100644
---
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/GlutenShuffleDependencyHelper.scala
+++
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/GlutenShuffleDependencyHelper.scala
@@ -32,13 +32,4 @@ object GlutenShuffleDependencyHelper {
// scalastyle:on
"org.apache.spark.shuffle.ColumnarShuffleDependency".equals(depName)
}
-
- def withUpdatedRecordsRead(
- input: Iterator[(Any, Any)],
- metrics: ShuffleReadMetricsReporter): Iterator[(Any, Any)] = {
- input.map { record =>
- metrics.incRecordsRead(record._2.asInstanceOf[ColumnarBatch].numRows())
- record
- }
- }
}