This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push: new 30cb7edecbf0 [SPARK-47521][CORE] Use `Utils.tryWithResource` during reading shuffle data from external storage 30cb7edecbf0 is described below commit 30cb7edecbf0ef7aed1e216ad147ebb318aea09c Author: maheshbehera <maheshbeh...@microsoft.com> AuthorDate: Fri Mar 22 10:44:55 2024 -0700 [SPARK-47521][CORE] Use `Utils.tryWithResource` during reading shuffle data from external storage ### What changes were proposed in this pull request? In method FallbackStorage.open, file open is guarded by Utils.tryWithResource to avoid file handle leakage incase of failure during read. ### Why are the changes needed? To avoid file handle leakage in case of read failure. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing UTs ### Was this patch authored or co-authored using generative AI tooling? No Closes #45663 from maheshk114/SPARK-47521. Authored-by: maheshbehera <maheshbeh...@microsoft.com> Signed-off-by: Dongjoon Hyun <dh...@apple.com> (cherry picked from commit 245669053a34cb1d4a84689230e5bd1d163be5c6) Signed-off-by: Dongjoon Hyun <dh...@apple.com> --- .../main/scala/org/apache/spark/storage/FallbackStorage.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala b/core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala index eb23fb4b1c84..161120393490 100644 --- a/core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala +++ b/core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala @@ -188,15 +188,15 @@ private[spark] object FallbackStorage extends Logging { val name = ShuffleDataBlockId(shuffleId, mapId, NOOP_REDUCE_ID).name val hash = JavaUtils.nonNegativeHash(name) val dataFile = new Path(fallbackPath, s"$appId/$shuffleId/$hash/$name") - val f = fallbackFileSystem.open(dataFile) val size = nextOffset - offset logDebug(s"To byte array $size") val array = new Array[Byte](size.toInt) val startTimeNs = System.nanoTime() - f.seek(offset) - f.readFully(array) - logDebug(s"Took ${(System.nanoTime() - startTimeNs) / (1000 * 1000)}ms") - f.close() + Utils.tryWithResource(fallbackFileSystem.open(dataFile)) { f => + f.seek(offset) + f.readFully(array) + logDebug(s"Took ${(System.nanoTime() - startTimeNs) / (1000 * 1000)}ms") + } new NioManagedBuffer(ByteBuffer.wrap(array)) } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org