This is an automated email from the ASF dual-hosted git repository.

richox pushed a commit to branch dev-v6.0.0-parallel-scan-kdev-build
in repository https://gitbox.apache.org/repos/asf/auron.git

commit 60328bc9e8899dd814714670d7b717f767362432
Author: zhangli20 <[email protected]>
AuthorDate: Mon Dec 8 09:19:57 2025 +0000

    Fix: keep backward compatibility with celeborn-054This reverts commit 
437180b7506bb56da6ad0a77ba6f6e
    
    KDev_MR_link:https://ksurl.cn/aCmpUcJb
---
 .../celeborn/BlazeCelebornShuffleReader.scala      | 144 +++++++++++++++------
 1 file changed, 104 insertions(+), 40 deletions(-)

diff --git 
a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/celeborn/BlazeCelebornShuffleReader.scala
 
b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/celeborn/BlazeCelebornShuffleReader.scala
index 569061d3..34f681cc 100644
--- 
a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/celeborn/BlazeCelebornShuffleReader.scala
+++ 
b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/celeborn/BlazeCelebornShuffleReader.scala
@@ -25,6 +25,7 @@ import scala.reflect.ClassTag
 
 import org.apache.celeborn.client.read.CelebornInputStream
 import org.apache.celeborn.common.CelebornConf
+import org.apache.commons.lang3.reflect.FieldUtils
 import org.apache.spark.TaskContext
 import org.apache.spark.internal.Logging
 import org.apache.spark.shuffle.ShuffleReadMetricsReporter
@@ -51,52 +52,115 @@ class BlazeCelebornShuffleReader[K, C](
     with Logging {
 
   override protected def readBlocks(): Iterator[InputStream] = {
-    // force disable decompression because compression is skipped in shuffle 
writer
-    val reader = new CelebornShuffleReader[K, C](
-      handle,
-      startPartition,
-      endPartition,
-      startMapIndex.getOrElse(0),
-      endMapIndex.getOrElse(Int.MaxValue),
-      context,
-      conf,
-      BlazeCelebornShuffleReader.createBypassingIncRecordsReadMetrics(metrics),
-      shuffleIdTracker,
-      false) {
-
-      override def newSerializerInstance(dep: ShuffleDependency[K, _, C]): 
SerializerInstance = {
-        new SerializerInstance {
-          override def serialize[T: ClassTag](t: T): ByteBuffer =
-            throw new UnsupportedOperationException(
-              "BlazeCelebornShuffleReader.newSerializerInstance")
-
-          override def deserialize[T: ClassTag](bytes: ByteBuffer): T =
-            throw new UnsupportedOperationException(
-              "BlazeCelebornShuffleReader.newSerializerInstance")
-
-          override def deserialize[T: ClassTag](bytes: ByteBuffer, loader: 
ClassLoader): T =
-            throw new UnsupportedOperationException(
-              "BlazeCelebornShuffleReader.newSerializerInstance")
-
-          override def serializeStream(s: OutputStream): SerializationStream =
-            throw new UnsupportedOperationException(
-              "BlazeCelebornShuffleReader.newSerializerInstance")
-
-          override def deserializeStream(s: InputStream): 
DeserializationStream = {
-            new DeserializationStream {
-              override def asKeyValueIterator: Iterator[(Any, Any)] = 
Iterator.single((null, s))
-
-              override def readObject[T: ClassTag](): T =
-                throw new UnsupportedOperationException()
-
-              override def close(): Unit = s.close()
+    val reader = {
+      try {
+        // for celeborn-060
+        new CelebornShuffleReader[K, C](
+          handle,
+          startPartition,
+          endPartition,
+          startMapIndex.getOrElse(0),
+          endMapIndex.getOrElse(Int.MaxValue),
+          context,
+          conf,
+          
BlazeCelebornShuffleReader.createBypassingIncRecordsReadMetrics(metrics),
+          shuffleIdTracker, false) {
+
+          override def newSerializerInstance(dep: ShuffleDependency[K, _, C]): 
SerializerInstance = {
+            new SerializerInstance {
+              override def serialize[T: ClassTag](t: T): ByteBuffer =
+                throw new UnsupportedOperationException(
+                  "BlazeCelebornShuffleReader.newSerializerInstance")
+
+              override def deserialize[T: ClassTag](bytes: ByteBuffer): T =
+                throw new UnsupportedOperationException(
+                  "BlazeCelebornShuffleReader.newSerializerInstance")
+
+              override def deserialize[T: ClassTag](bytes: ByteBuffer, loader: 
ClassLoader): T =
+                throw new UnsupportedOperationException(
+                  "BlazeCelebornShuffleReader.newSerializerInstance")
+
+              override def serializeStream(s: OutputStream): 
SerializationStream =
+                throw new UnsupportedOperationException(
+                  "BlazeCelebornShuffleReader.newSerializerInstance")
+
+              override def deserializeStream(s: InputStream): 
DeserializationStream = {
+                new DeserializationStream {
+                  override def asKeyValueIterator: Iterator[(Any, Any)] = 
Iterator.single((null, s))
+
+                  override def readObject[T: ClassTag](): T =
+                    throw new UnsupportedOperationException()
+
+                  override def close(): Unit = s.close()
+                }
+              }
             }
           }
         }
+      } catch {
+        case _: NoSuchMethodError =>
+          // for celeborn-054
+          new CelebornShuffleReader[K, C](
+            handle,
+            startPartition,
+            endPartition,
+            startMapIndex.getOrElse(0),
+            endMapIndex.getOrElse(Int.MaxValue),
+            context,
+            conf,
+            
BlazeCelebornShuffleReader.createBypassingIncRecordsReadMetrics(metrics),
+            shuffleIdTracker) {
+
+            override def newSerializerInstance(dep: ShuffleDependency[K, _, 
C]): SerializerInstance = {
+              new SerializerInstance {
+                override def serialize[T: ClassTag](t: T): ByteBuffer =
+                  throw new UnsupportedOperationException(
+                    "BlazeCelebornShuffleReader.newSerializerInstance")
+
+                override def deserialize[T: ClassTag](bytes: ByteBuffer): T =
+                  throw new UnsupportedOperationException(
+                    "BlazeCelebornShuffleReader.newSerializerInstance")
+
+                override def deserialize[T: ClassTag](bytes: ByteBuffer, 
loader: ClassLoader): T =
+                  throw new UnsupportedOperationException(
+                    "BlazeCelebornShuffleReader.newSerializerInstance")
+
+                override def serializeStream(s: OutputStream): 
SerializationStream =
+                  throw new UnsupportedOperationException(
+                    "BlazeCelebornShuffleReader.newSerializerInstance")
+
+                override def deserializeStream(s: InputStream): 
DeserializationStream = {
+                  new DeserializationStream {
+                    override def asKeyValueIterator: Iterator[(Any, Any)] = 
Iterator.single((null, s))
+
+                    override def readObject[T: ClassTag](): T =
+                      throw new UnsupportedOperationException()
+
+                    override def close(): Unit = s.close()
+                  }
+                }
+              }
+            }
+          }
       }
     }
 
-    reader.read().map { kv => kv._2.asInstanceOf[CelebornInputStream] }
+    reader.read().map { kv =>
+      val celebornInputStream = kv._2.asInstanceOf[CelebornInputStream]
+
+      // force disable decompression because compression is skipped in shuffle 
writer
+      try {
+        FieldUtils.writeField(
+          celebornInputStream,
+          "shuffleCompressionEnabled",
+          Boolean.box(false).asInstanceOf[Object],
+          true)
+      } catch {
+        case _: IllegalAccessException =>
+          // ignore if field not found (likely in celeborn-0.6.0)
+      }
+      celebornInputStream
+    }
   }
 }
 

Reply via email to