This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch branch-0.5
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/branch-0.5 by this push:
     new 7d150c0aa [CELEBORN-1500] Filter out empty InputStreams
7d150c0aa is described below

commit 7d150c0aa78885d63cf97a49359dfdf10a5a220b
Author: zky.zhoukeyong <[email protected]>
AuthorDate: Thu Jul 11 17:40:55 2024 +0800

    [CELEBORN-1500] Filter out empty InputStreams
    
    ### What changes were proposed in this pull request?
    Filter out empty InputStreams in CelebornShuffleReader to avoid possible
    overhead in serializerInstance.deserializeStream.
    
    ### Why are the changes needed?
    ditto
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Passes GA.
    
    Closes #2614 from waitinfuture/1500.
    
    Authored-by: zky.zhoukeyong <[email protected]>
    Signed-off-by: mingji <[email protected]>
    (cherry picked from commit 1ed92454421ed7c1606f5bb915266a7ec37d8042)
---
 .../org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala     | 4 +++-
 .../org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala     | 4 +++-
 2 files changed, 6 insertions(+), 2 deletions(-)

diff --git 
a/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
 
b/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
index ac74f92d6..4a3092275 100644
--- 
a/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
+++ 
b/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
@@ -148,7 +148,9 @@ class CelebornShuffleReader[K, C](
       } else {
         (partitionId, CelebornInputStream.empty())
       }
-    }).map { case (partitionId, inputStream) =>
+    }).filter {
+      case (_, inputStream) => inputStream != CelebornInputStream.empty()
+    }.map { case (partitionId, inputStream) =>
       (partitionId, 
serializerInstance.deserializeStream(inputStream).asKeyValueIterator)
     }.flatMap { case (partitionId, iter) =>
       try {
diff --git 
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
 
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
index 0b5bfb401..745cc0770 100644
--- 
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
+++ 
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
@@ -280,7 +280,9 @@ class CelebornShuffleReader[K, C](
       } else {
         (partitionId, CelebornInputStream.empty())
       }
-    }).map { case (partitionId, inputStream) =>
+    }).filter {
+      case (_, inputStream) => inputStream != CelebornInputStream.empty()
+    }.map { case (partitionId, inputStream) =>
       (partitionId, 
serializerInstance.deserializeStream(inputStream).asKeyValueIterator)
     }.flatMap { case (partitionId, iter) =>
       try {

Reply via email to