This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 1ed924544 [CELEBORN-1500] Filter out empty InputStreams
1ed924544 is described below
commit 1ed92454421ed7c1606f5bb915266a7ec37d8042
Author: zky.zhoukeyong <[email protected]>
AuthorDate: Thu Jul 11 17:40:55 2024 +0800
[CELEBORN-1500] Filter out empty InputStreams
### What changes were proposed in this pull request?
Filter out empty InputStreams in CelebornShuffleReader to avoid possible
overhead in serializerInstance.deserializeStream.
### Why are the changes needed?
ditto
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Passes GA.
Closes #2614 from waitinfuture/1500.
Authored-by: zky.zhoukeyong <[email protected]>
Signed-off-by: mingji <[email protected]>
---
.../org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala | 4 +++-
.../org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala | 4 +++-
2 files changed, 6 insertions(+), 2 deletions(-)
diff --git
a/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
b/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
index 512d8f41c..9cabfebc9 100644
---
a/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
+++
b/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
@@ -147,7 +147,9 @@ class CelebornShuffleReader[K, C](
} else {
(partitionId, CelebornInputStream.empty())
}
- }).map { case (partitionId, inputStream) =>
+ }).filter {
+ case (_, inputStream) => inputStream != CelebornInputStream.empty()
+ }.map { case (partitionId, inputStream) =>
(partitionId,
serializerInstance.deserializeStream(inputStream).asKeyValueIterator)
}.flatMap { case (partitionId, iter) =>
try {
diff --git
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
index b18733e77..b95499d0e 100644
---
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
+++
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
@@ -258,7 +258,9 @@ class CelebornShuffleReader[K, C](
} else {
(partitionId, CelebornInputStream.empty())
}
- }).map { case (partitionId, inputStream) =>
+ }).filter {
+ case (_, inputStream) => inputStream != CelebornInputStream.empty()
+ }.map { case (partitionId, inputStream) =>
(partitionId,
serializerInstance.deserializeStream(inputStream).asKeyValueIterator)
}.flatMap { case (partitionId, iter) =>
try {