This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/branch-0.3 by this push:
     new f9cf5f3f6 [CELEBORN-917][GLUTEN] Record read metric should be 
compatible with Gluten shuffle serde
f9cf5f3f6 is described below

commit f9cf5f3f61651796853eafe5ab5fc6006b1fdb9d
Author: youxiduo <[email protected]>
AuthorDate: Fri Aug 25 15:01:09 2023 +0800

    [CELEBORN-917][GLUTEN] Record read metric should be compatible with Gluten 
shuffle serde
    
    ### What changes were proposed in this pull request?
    
    When updating record read metric, we should consider if the input record is 
`ColumnarBatch`. So if the serde is the Gluten columnar batch, we should use 
`ColumnarBatch.numRows`.
    
    ### Why are the changes needed?
    
    Make the shuffle record read metric correct.
    
    ### Does this PR introduce _any_ user-facing change?
    yes, the metrics changed
    
    ### How was this patch tested?
    manually test
    
    before:
    <img width="415" alt="image" 
src="https://github.com/apache/incubator-celeborn/assets/12025282/221ab814-4b02-4688-80ab-31f21cd900a4";>
    
    after:
    <img width="415" alt="image" 
src="https://github.com/apache/incubator-celeborn/assets/12025282/1c7257c0-2f30-41c3-9ea8-6bc5cda3de85";>
    
    Closes #1838 from ulysses-you/gluten.
    
    Lead-authored-by: youxiduo <[email protected]>
    Co-authored-by: Xiduo You <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
    (cherry picked from commit b3700dd169198af3982ff3525624eec2c1a59597)
    Signed-off-by: Cheng Pan <[email protected]>
---
 .../shuffle/celeborn/CelebornShuffleReader.scala   | 14 +++++--
 .../celeborn/GlutenColumnarBatchSerdeHelper.scala  | 44 ++++++++++++++++++++++
 2 files changed, 54 insertions(+), 4 deletions(-)

diff --git 
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
 
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
index 5ce891907..36ee31448 100644
--- 
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
+++ 
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
@@ -97,11 +97,17 @@ class CelebornShuffleReader[K, C](
     }).flatMap(
       serializerInstance.deserializeStream(_).asKeyValueIterator)
 
+    val iterWithUpdatedRecordsRead =
+      if 
(GlutenColumnarBatchSerdeHelper.isGlutenSerde(serializerInstance.getClass.getName))
 {
+        GlutenColumnarBatchSerdeHelper.withUpdatedRecordsRead(recordIter, 
metrics)
+      } else {
+        recordIter.map { record =>
+          metrics.incRecordsRead(1)
+          record
+        }
+      }
     val metricIter = CompletionIterator[(Any, Any), Iterator[(Any, Any)]](
-      recordIter.map { record =>
-        metrics.incRecordsRead(1)
-        record
-      },
+      iterWithUpdatedRecordsRead,
       context.taskMetrics().mergeShuffleReadMetrics())
 
     // An interruptible iterator must be used here in order to support task 
cancellation
diff --git 
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/GlutenColumnarBatchSerdeHelper.scala
 
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/GlutenColumnarBatchSerdeHelper.scala
new file mode 100644
index 000000000..259bb954d
--- /dev/null
+++ 
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/GlutenColumnarBatchSerdeHelper.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.celeborn
+
+import org.apache.spark.shuffle.ShuffleReadMetricsReporter
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+/**
+ * A helper class to be compatible with Gluten Celeborn.
+ */
+object GlutenColumnarBatchSerdeHelper {
+
+  def isGlutenSerde(serdeName: String): Boolean = {
+    // scalastyle:off
+    // see Gluten
+    // 
https://github.com/oap-project/gluten/blob/main/gluten-celeborn/src/main/scala/org/apache/spark/shuffle/CelebornColumnarBatchSerializer.scala
+    // scalastyle:on
+    
"org.apache.spark.shuffle.CelebornColumnarBatchSerializer".equals(serdeName)
+  }
+
+  def withUpdatedRecordsRead(
+      input: Iterator[(Any, Any)],
+      metrics: ShuffleReadMetricsReporter): Iterator[(Any, Any)] = {
+    input.map { record =>
+      metrics.incRecordsRead(record._2.asInstanceOf[ColumnarBatch].numRows())
+      record
+    }
+  }
+}

Reply via email to