This is an automated email from the ASF dual-hosted git repository.

kerwinzhang pushed a commit to branch batches-read
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git

commit 72475b110ffad7670bd3b73cf0bde579923c3cd9
Author: xiyu.zk <[email protected]>
AuthorDate: Thu Oct 12 15:29:51 2023 +0800

    [CELEBORN-1265] Fix batches read metric for gluten columnar shuffle
---
 .../apache/spark/shuffle/celeborn/CelebornShuffleReader.scala | 11 ++++-------
 .../shuffle/celeborn/GlutenShuffleDependencyHelper.scala      |  9 ---------
 2 files changed, 4 insertions(+), 16 deletions(-)

diff --git 
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
 
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
index 663983b58..dd8cba1a7 100644
--- 
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
+++ 
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
@@ -173,14 +173,11 @@ class CelebornShuffleReader[K, C](
     }
 
     val iterWithUpdatedRecordsRead =
-      if (GlutenShuffleDependencyHelper.isGlutenDep(dep.getClass.getName)) {
-        GlutenShuffleDependencyHelper.withUpdatedRecordsRead(recordIter, 
metrics)
-      } else {
-        recordIter.map { record =>
-          metrics.incRecordsRead(1)
-          record
-        }
+      recordIter.map { record =>
+        metrics.incRecordsRead(1)
+        record
       }
+
     val metricIter = CompletionIterator[(Any, Any), Iterator[(Any, Any)]](
       iterWithUpdatedRecordsRead,
       context.taskMetrics().mergeShuffleReadMetrics())
diff --git 
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/GlutenShuffleDependencyHelper.scala
 
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/GlutenShuffleDependencyHelper.scala
index 4d743cda8..7e0fc85e2 100644
--- 
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/GlutenShuffleDependencyHelper.scala
+++ 
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/GlutenShuffleDependencyHelper.scala
@@ -32,13 +32,4 @@ object GlutenShuffleDependencyHelper {
     // scalastyle:on
     "org.apache.spark.shuffle.ColumnarShuffleDependency".equals(depName)
   }
-
-  def withUpdatedRecordsRead(
-      input: Iterator[(Any, Any)],
-      metrics: ShuffleReadMetricsReporter): Iterator[(Any, Any)] = {
-    input.map { record =>
-      metrics.incRecordsRead(record._2.asInstanceOf[ColumnarBatch].numRows())
-      record
-    }
-  }
 }

Reply via email to