This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 820c17ad7 [CELEBORN-1140] Use try-with-resources to avoid 
FSDataInputStream not being closed
820c17ad7 is described below

commit 820c17ad7da5e2ed861b93b2552373faedebcf92
Author: jiaoqingbo <[email protected]>
AuthorDate: Fri Nov 24 17:55:32 2023 +0800

    [CELEBORN-1140] Use try-with-resources to avoid FSDataInputStream not being 
closed
    
    ### What changes were proposed in this pull request?
    
    As Title
    
    ### Why are the changes needed?
    
    As Title
    
    ### Does this PR introduce _any_ user-facing change?
    
    NO
    
    ### How was this patch tested?
    
    PASS GA
    
    Closes #2113 from jiaoqingbo/1140.
    
    Authored-by: jiaoqingbo <[email protected]>
    Signed-off-by: mingji <[email protected]>
---
 .../celeborn/client/read/DfsPartitionReader.java   | 45 ++++++++++++----------
 1 file changed, 24 insertions(+), 21 deletions(-)

diff --git 
a/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java 
b/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java
index 7acc4fc8c..d379a1139 100644
--- 
a/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java
+++ 
b/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java
@@ -178,15 +178,16 @@ public class DfsPartitionReader implements 
PartitionReader {
 
   private List<Long> getChunkOffsetsFromUnsortedIndex(CelebornConf conf, 
PartitionLocation location)
       throws IOException {
-    FSDataInputStream indexInputStream =
+    List<Long> offsets;
+    try (FSDataInputStream indexInputStream =
         ShuffleClient.getHdfsFs(conf)
-            .open(new 
Path(Utils.getIndexFilePath(location.getStorageInfo().getFilePath())));
-    List<Long> offsets = new ArrayList<>();
-    int offsetCount = indexInputStream.readInt();
-    for (int i = 0; i < offsetCount; i++) {
-      offsets.add(indexInputStream.readLong());
+            .open(new 
Path(Utils.getIndexFilePath(location.getStorageInfo().getFilePath())))) {
+      offsets = new ArrayList<>();
+      int offsetCount = indexInputStream.readInt();
+      for (int i = 0; i < offsetCount; i++) {
+        offsets.add(indexInputStream.readLong());
+      }
     }
-    indexInputStream.close();
     return offsets;
   }
 
@@ -194,20 +195,22 @@ public class DfsPartitionReader implements 
PartitionReader {
       CelebornConf conf, PartitionLocation location, int startMapIndex, int 
endMapIndex)
       throws IOException {
     String indexPath = 
Utils.getIndexFilePath(location.getStorageInfo().getFilePath());
-    FSDataInputStream indexInputStream = 
ShuffleClient.getHdfsFs(conf).open(new Path(indexPath));
-    logger.debug("read sorted index {}", indexPath);
-    long indexSize = ShuffleClient.getHdfsFs(conf).getFileStatus(new 
Path(indexPath)).getLen();
-    // Index size won't be large, so it's safe to do the conversion.
-    byte[] indexBuffer = new byte[(int) indexSize];
-    indexInputStream.readFully(0L, indexBuffer);
-    List<Long> offsets =
-        new ArrayList<>(
-            ShuffleBlockInfoUtils.getChunkOffsetsFromShuffleBlockInfos(
-                startMapIndex,
-                endMapIndex,
-                shuffleChunkSize,
-                
ShuffleBlockInfoUtils.parseShuffleBlockInfosFromByteBuffer(indexBuffer)));
-    indexInputStream.close();
+    List<Long> offsets;
+    try (FSDataInputStream indexInputStream =
+        ShuffleClient.getHdfsFs(conf).open(new Path(indexPath))) {
+      logger.debug("read sorted index {}", indexPath);
+      long indexSize = ShuffleClient.getHdfsFs(conf).getFileStatus(new 
Path(indexPath)).getLen();
+      // Index size won't be large, so it's safe to do the conversion.
+      byte[] indexBuffer = new byte[(int) indexSize];
+      indexInputStream.readFully(0L, indexBuffer);
+      offsets =
+          new ArrayList<>(
+              ShuffleBlockInfoUtils.getChunkOffsetsFromShuffleBlockInfos(
+                  startMapIndex,
+                  endMapIndex,
+                  shuffleChunkSize,
+                  
ShuffleBlockInfoUtils.parseShuffleBlockInfosFromByteBuffer(indexBuffer)));
+    }
     return offsets;
   }
 

Reply via email to