This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new d53b6e53c [CELEBORN-946][GLUTEN] Record read metric should be 
compatible with Gluten shuffle dependency
d53b6e53c is described below

commit d53b6e53c719d588da7f68185a2c5b8c453daa48
Author: xiyu.zk <[email protected]>
AuthorDate: Tue Sep 5 18:34:12 2023 +0800

    [CELEBORN-946][GLUTEN] Record read metric should be compatible with Gluten 
shuffle dependency
    
    ### What changes were proposed in this pull request?
    Currently judging whether it is a Gluten shuffle through serde is only 
applicable to Velox Backend. In order to adapt to ClickHouse Backend at the 
same time, it is more generic to use ColumnarShuffleDependency as the judgment 
basis.
    
    ### Why are the changes needed?
    
    ### Does this PR introduce _any_ user-facing change?
    
    ### How was this patch tested?
    
    Closes #1878 from kerwin-zk/gluten.
    
    Authored-by: xiyu.zk <[email protected]>
    Signed-off-by: zky.zhoukeyong <[email protected]>
---
 .../org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala | 4 ++--
 ...BatchSerdeHelper.scala => GlutenShuffleDependencyHelper.scala} | 8 ++++----
 2 files changed, 6 insertions(+), 6 deletions(-)

diff --git 
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
 
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
index f07ed4989..43cb22efd 100644
--- 
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
+++ 
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
@@ -131,8 +131,8 @@ class CelebornShuffleReader[K, C](
       serializerInstance.deserializeStream(_).asKeyValueIterator)
 
     val iterWithUpdatedRecordsRead =
-      if 
(GlutenColumnarBatchSerdeHelper.isGlutenSerde(serializerInstance.getClass.getName))
 {
-        GlutenColumnarBatchSerdeHelper.withUpdatedRecordsRead(recordIter, 
metrics)
+      if (GlutenShuffleDependencyHelper.isGlutenDep(dep.getClass.getName)) {
+        GlutenShuffleDependencyHelper.withUpdatedRecordsRead(recordIter, 
metrics)
       } else {
         recordIter.map { record =>
           metrics.incRecordsRead(1)
diff --git 
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/GlutenColumnarBatchSerdeHelper.scala
 
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/GlutenShuffleDependencyHelper.scala
similarity index 80%
rename from 
client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/GlutenColumnarBatchSerdeHelper.scala
rename to 
client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/GlutenShuffleDependencyHelper.scala
index 259bb954d..4d743cda8 100644
--- 
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/GlutenColumnarBatchSerdeHelper.scala
+++ 
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/GlutenShuffleDependencyHelper.scala
@@ -23,14 +23,14 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
 /**
  * A helper class to be compatible with Gluten Celeborn.
  */
-object GlutenColumnarBatchSerdeHelper {
+object GlutenShuffleDependencyHelper {
 
-  def isGlutenSerde(serdeName: String): Boolean = {
+  def isGlutenDep(depName: String): Boolean = {
     // scalastyle:off
     // see Gluten
-    // 
https://github.com/oap-project/gluten/blob/main/gluten-celeborn/src/main/scala/org/apache/spark/shuffle/CelebornColumnarBatchSerializer.scala
+    // 
https://github.com/oap-project/gluten/blob/main/gluten-core/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleDependency.scala
     // scalastyle:on
-    
"org.apache.spark.shuffle.CelebornColumnarBatchSerializer".equals(serdeName)
+    "org.apache.spark.shuffle.ColumnarShuffleDependency".equals(depName)
   }
 
   def withUpdatedRecordsRead(

Reply via email to