This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push:
new 30cb7edecbf0 [SPARK-47521][CORE] Use `Utils.tryWithResource` during
reading shuffle data from external storage
30cb7edecbf0 is described below
commit 30cb7edecbf0ef7aed1e216ad147ebb318aea09c
Author: maheshbehera <[email protected]>
AuthorDate: Fri Mar 22 10:44:55 2024 -0700
[SPARK-47521][CORE] Use `Utils.tryWithResource` during reading shuffle data
from external storage
### What changes were proposed in this pull request?
In method FallbackStorage.open, file open is guarded by
Utils.tryWithResource to avoid file handle leakage incase of failure during
read.
### Why are the changes needed?
To avoid file handle leakage in case of read failure.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Existing UTs
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #45663 from maheshk114/SPARK-47521.
Authored-by: maheshbehera <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit 245669053a34cb1d4a84689230e5bd1d163be5c6)
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../main/scala/org/apache/spark/storage/FallbackStorage.scala | 10 +++++-----
1 file changed, 5 insertions(+), 5 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala
b/core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala
index eb23fb4b1c84..161120393490 100644
--- a/core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala
+++ b/core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala
@@ -188,15 +188,15 @@ private[spark] object FallbackStorage extends Logging {
val name = ShuffleDataBlockId(shuffleId, mapId, NOOP_REDUCE_ID).name
val hash = JavaUtils.nonNegativeHash(name)
val dataFile = new Path(fallbackPath, s"$appId/$shuffleId/$hash/$name")
- val f = fallbackFileSystem.open(dataFile)
val size = nextOffset - offset
logDebug(s"To byte array $size")
val array = new Array[Byte](size.toInt)
val startTimeNs = System.nanoTime()
- f.seek(offset)
- f.readFully(array)
- logDebug(s"Took ${(System.nanoTime() - startTimeNs) / (1000 *
1000)}ms")
- f.close()
+ Utils.tryWithResource(fallbackFileSystem.open(dataFile)) { f =>
+ f.seek(offset)
+ f.readFully(array)
+ logDebug(s"Took ${(System.nanoTime() - startTimeNs) / (1000 *
1000)}ms")
+ }
new NioManagedBuffer(ByteBuffer.wrap(array))
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]