This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new 30cb7edecbf0 [SPARK-47521][CORE] Use `Utils.tryWithResource` during 
reading shuffle data from external storage
30cb7edecbf0 is described below

commit 30cb7edecbf0ef7aed1e216ad147ebb318aea09c
Author: maheshbehera <maheshbeh...@microsoft.com>
AuthorDate: Fri Mar 22 10:44:55 2024 -0700

    [SPARK-47521][CORE] Use `Utils.tryWithResource` during reading shuffle data 
from external storage
    
    ### What changes were proposed in this pull request?
    
    In method FallbackStorage.open, file open is guarded by 
Utils.tryWithResource to avoid file handle leakage incase of failure during 
read.
    
    ### Why are the changes needed?
    
    To avoid file handle leakage in case of read failure.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Existing UTs
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #45663 from maheshk114/SPARK-47521.
    
    Authored-by: maheshbehera <maheshbeh...@microsoft.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
    (cherry picked from commit 245669053a34cb1d4a84689230e5bd1d163be5c6)
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../main/scala/org/apache/spark/storage/FallbackStorage.scala  | 10 +++++-----
 1 file changed, 5 insertions(+), 5 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala 
b/core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala
index eb23fb4b1c84..161120393490 100644
--- a/core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala
+++ b/core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala
@@ -188,15 +188,15 @@ private[spark] object FallbackStorage extends Logging {
         val name = ShuffleDataBlockId(shuffleId, mapId, NOOP_REDUCE_ID).name
         val hash = JavaUtils.nonNegativeHash(name)
         val dataFile = new Path(fallbackPath, s"$appId/$shuffleId/$hash/$name")
-        val f = fallbackFileSystem.open(dataFile)
         val size = nextOffset - offset
         logDebug(s"To byte array $size")
         val array = new Array[Byte](size.toInt)
         val startTimeNs = System.nanoTime()
-        f.seek(offset)
-        f.readFully(array)
-        logDebug(s"Took ${(System.nanoTime() - startTimeNs) / (1000 * 
1000)}ms")
-        f.close()
+        Utils.tryWithResource(fallbackFileSystem.open(dataFile)) { f =>
+          f.seek(offset)
+          f.readFully(array)
+          logDebug(s"Took ${(System.nanoTime() - startTimeNs) / (1000 * 
1000)}ms")
+        }
         new NioManagedBuffer(ByteBuffer.wrap(array))
       }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to