This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 7b2b9f732 [CELEBORN-1265] Fix batches read metric for gluten columnar 
shuffle
7b2b9f732 is described below

commit 7b2b9f7327293eeab0d418c1a4b9cdc30c7e1c08
Author: xiyu.zk <[email protected]>
AuthorDate: Wed Feb 7 17:41:10 2024 +0800

    [CELEBORN-1265] Fix batches read metric for gluten columnar shuffle
    
    ### What changes were proposed in this pull request?
    Fix batches read metric for gluten columnar shuffle
    
    ### Why are the changes needed?
    
![image](https://github.com/apache/incubator-celeborn/assets/107825064/c862e83b-8e3e-4705-a151-41e5b6675d7a)
    
    Due to the fix in 
[Gluten-4025](https://github.com/oap-project/gluten/pull/4051) for the records 
read metric issue, the read metric of CelebornShuffleReader does not need 
additional processing, otherwise the batches read metric will have the issue 
shown in the graph.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    CI
    
    Closes #2289 from kerwin-zk/batches-read.
    
    Authored-by: xiyu.zk <[email protected]>
    Signed-off-by: zky.zhoukeyong <[email protected]>
---
 .../apache/spark/shuffle/celeborn/CelebornShuffleReader.scala | 11 ++++-------
 .../shuffle/celeborn/GlutenShuffleDependencyHelper.scala      |  9 ---------
 2 files changed, 4 insertions(+), 16 deletions(-)

diff --git 
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
 
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
index 663983b58..dd8cba1a7 100644
--- 
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
+++ 
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
@@ -173,14 +173,11 @@ class CelebornShuffleReader[K, C](
     }
 
     val iterWithUpdatedRecordsRead =
-      if (GlutenShuffleDependencyHelper.isGlutenDep(dep.getClass.getName)) {
-        GlutenShuffleDependencyHelper.withUpdatedRecordsRead(recordIter, 
metrics)
-      } else {
-        recordIter.map { record =>
-          metrics.incRecordsRead(1)
-          record
-        }
+      recordIter.map { record =>
+        metrics.incRecordsRead(1)
+        record
       }
+
     val metricIter = CompletionIterator[(Any, Any), Iterator[(Any, Any)]](
       iterWithUpdatedRecordsRead,
       context.taskMetrics().mergeShuffleReadMetrics())
diff --git 
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/GlutenShuffleDependencyHelper.scala
 
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/GlutenShuffleDependencyHelper.scala
index 4d743cda8..7e0fc85e2 100644
--- 
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/GlutenShuffleDependencyHelper.scala
+++ 
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/GlutenShuffleDependencyHelper.scala
@@ -32,13 +32,4 @@ object GlutenShuffleDependencyHelper {
     // scalastyle:on
     "org.apache.spark.shuffle.ColumnarShuffleDependency".equals(depName)
   }
-
-  def withUpdatedRecordsRead(
-      input: Iterator[(Any, Any)],
-      metrics: ShuffleReadMetricsReporter): Iterator[(Any, Any)] = {
-    input.map { record =>
-      metrics.incRecordsRead(record._2.asInstanceOf[ColumnarBatch].numRows())
-      record
-    }
-  }
 }

Reply via email to