This is an automated email from the ASF dual-hosted git repository.

marong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 27150c87c4 [VL] Celeborn shuffle reader OOM with many empty input 
stream (#9221)
27150c87c4 is described below

commit 27150c87c447ef6e23a70c07a496429122954dde
Author: Rong Ma <[email protected]>
AuthorDate: Fri Apr 4 09:19:31 2025 +0100

    [VL] Celeborn shuffle reader OOM with many empty input stream (#9221)
---
 .../VeloxCelebornColumnarBatchSerializer.scala     | 30 ++++++++++++++--------
 1 file changed, 20 insertions(+), 10 deletions(-)

diff --git 
a/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerializer.scala
 
b/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerializer.scala
index dc314ba44a..fbf1c67303 100644
--- 
a/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerializer.scala
+++ 
b/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerializer.scala
@@ -16,8 +16,9 @@
  */
 package org.apache.spark.shuffle
 
-import org.apache.gluten.config.ReservedKeys.{GLUTEN_RSS_SORT_SHUFFLE_WRITER, 
GLUTEN_SORT_SHUFFLE_WRITER}
 import org.apache.gluten.backendsapi.BackendsApiManager
+import org.apache.gluten.config.GlutenConfig
+import org.apache.gluten.config.ReservedKeys.{GLUTEN_RSS_SORT_SHUFFLE_WRITER, 
GLUTEN_SORT_SHUFFLE_WRITER}
 import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
 import org.apache.gluten.runtime.Runtimes
 import org.apache.gluten.utils.ArrowAbiUtil
@@ -37,7 +38,6 @@ import org.apache.spark.task.{TaskResource, TaskResources}
 import org.apache.arrow.c.ArrowSchema
 import org.apache.arrow.memory.BufferAllocator
 import org.apache.celeborn.client.read.CelebornInputStream
-import org.apache.gluten.config.GlutenConfig
 
 import java.io._
 import java.nio.ByteBuffer
@@ -120,12 +120,8 @@ private class CelebornColumnarBatchSerializerInstance(
   private class TaskDeserializationStream(in: InputStream)
     extends DeserializationStream
     with TaskResource {
-    private val byteIn: JniByteInputStream = JniByteInputStreams.create(in)
-    private val wrappedOut: ColumnarBatchOutIterator = new 
ColumnarBatchOutIterator(
-      runtime,
-      ShuffleReaderJniWrapper
-        .create(runtime)
-        .readStream(shuffleReaderHandle, byteIn))
+    private var byteIn: JniByteInputStream = _
+    private var wrappedOut: ColumnarBatchOutIterator = _
 
     private var cb: ColumnarBatch = _
 
@@ -191,6 +187,7 @@ private class CelebornColumnarBatchSerializerInstance(
 
     @throws(classOf[EOFException])
     override def readValue[T: ClassTag](): T = {
+      initStream();
       if (cb != null) {
         cb.close()
         cb = null
@@ -245,13 +242,26 @@ private class CelebornColumnarBatchSerializerInstance(
         readBatchNumRows.set(numRowsTotal.toDouble / numBatchesTotal)
       }
       numOutputRows += numRowsTotal
-      wrappedOut.close()
-      byteIn.close()
+      if (byteIn != null) {
+        wrappedOut.close()
+        byteIn.close()
+      }
       if (cb != null) {
         cb.close()
       }
     }
 
+    private def initStream(): Unit = {
+      if (byteIn == null) {
+        byteIn = JniByteInputStreams.create(in)
+        wrappedOut = new ColumnarBatchOutIterator(
+          runtime,
+          ShuffleReaderJniWrapper
+            .create(runtime)
+            .readStream(shuffleReaderHandle, byteIn))
+      }
+    }
+
     override def resourceName(): String = getClass.getName
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to