This is an automated email from the ASF dual-hosted git repository. kerwinzhang pushed a commit to branch batches-read in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
commit 72475b110ffad7670bd3b73cf0bde579923c3cd9 Author: xiyu.zk <[email protected]> AuthorDate: Thu Oct 12 15:29:51 2023 +0800 [CELEBORN-1265] Fix batches read metric for gluten columnar shuffle --- .../apache/spark/shuffle/celeborn/CelebornShuffleReader.scala | 11 ++++------- .../shuffle/celeborn/GlutenShuffleDependencyHelper.scala | 9 --------- 2 files changed, 4 insertions(+), 16 deletions(-) diff --git a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala index 663983b58..dd8cba1a7 100644 --- a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala +++ b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala @@ -173,14 +173,11 @@ class CelebornShuffleReader[K, C]( } val iterWithUpdatedRecordsRead = - if (GlutenShuffleDependencyHelper.isGlutenDep(dep.getClass.getName)) { - GlutenShuffleDependencyHelper.withUpdatedRecordsRead(recordIter, metrics) - } else { - recordIter.map { record => - metrics.incRecordsRead(1) - record - } + recordIter.map { record => + metrics.incRecordsRead(1) + record } + val metricIter = CompletionIterator[(Any, Any), Iterator[(Any, Any)]]( iterWithUpdatedRecordsRead, context.taskMetrics().mergeShuffleReadMetrics()) diff --git a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/GlutenShuffleDependencyHelper.scala b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/GlutenShuffleDependencyHelper.scala index 4d743cda8..7e0fc85e2 100644 --- a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/GlutenShuffleDependencyHelper.scala +++ b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/GlutenShuffleDependencyHelper.scala @@ -32,13 +32,4 @@ object GlutenShuffleDependencyHelper { // scalastyle:on "org.apache.spark.shuffle.ColumnarShuffleDependency".equals(depName) } - - def withUpdatedRecordsRead( - input: Iterator[(Any, Any)], - metrics: ShuffleReadMetricsReporter): Iterator[(Any, Any)] = { - input.map { record => - metrics.incRecordsRead(record._2.asInstanceOf[ColumnarBatch].numRows()) - record - } - } }
