This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 820c17ad7 [CELEBORN-1140] Use try-with-resources to avoid
FSDataInputStream not being closed
820c17ad7 is described below
commit 820c17ad7da5e2ed861b93b2552373faedebcf92
Author: jiaoqingbo <[email protected]>
AuthorDate: Fri Nov 24 17:55:32 2023 +0800
[CELEBORN-1140] Use try-with-resources to avoid FSDataInputStream not being
closed
### What changes were proposed in this pull request?
As Title
### Why are the changes needed?
As Title
### Does this PR introduce _any_ user-facing change?
NO
### How was this patch tested?
PASS GA
Closes #2113 from jiaoqingbo/1140.
Authored-by: jiaoqingbo <[email protected]>
Signed-off-by: mingji <[email protected]>
---
.../celeborn/client/read/DfsPartitionReader.java | 45 ++++++++++++----------
1 file changed, 24 insertions(+), 21 deletions(-)
diff --git
a/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java
b/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java
index 7acc4fc8c..d379a1139 100644
---
a/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java
+++
b/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java
@@ -178,15 +178,16 @@ public class DfsPartitionReader implements
PartitionReader {
private List<Long> getChunkOffsetsFromUnsortedIndex(CelebornConf conf,
PartitionLocation location)
throws IOException {
- FSDataInputStream indexInputStream =
+ List<Long> offsets;
+ try (FSDataInputStream indexInputStream =
ShuffleClient.getHdfsFs(conf)
- .open(new
Path(Utils.getIndexFilePath(location.getStorageInfo().getFilePath())));
- List<Long> offsets = new ArrayList<>();
- int offsetCount = indexInputStream.readInt();
- for (int i = 0; i < offsetCount; i++) {
- offsets.add(indexInputStream.readLong());
+ .open(new
Path(Utils.getIndexFilePath(location.getStorageInfo().getFilePath())))) {
+ offsets = new ArrayList<>();
+ int offsetCount = indexInputStream.readInt();
+ for (int i = 0; i < offsetCount; i++) {
+ offsets.add(indexInputStream.readLong());
+ }
}
- indexInputStream.close();
return offsets;
}
@@ -194,20 +195,22 @@ public class DfsPartitionReader implements
PartitionReader {
CelebornConf conf, PartitionLocation location, int startMapIndex, int
endMapIndex)
throws IOException {
String indexPath =
Utils.getIndexFilePath(location.getStorageInfo().getFilePath());
- FSDataInputStream indexInputStream =
ShuffleClient.getHdfsFs(conf).open(new Path(indexPath));
- logger.debug("read sorted index {}", indexPath);
- long indexSize = ShuffleClient.getHdfsFs(conf).getFileStatus(new
Path(indexPath)).getLen();
- // Index size won't be large, so it's safe to do the conversion.
- byte[] indexBuffer = new byte[(int) indexSize];
- indexInputStream.readFully(0L, indexBuffer);
- List<Long> offsets =
- new ArrayList<>(
- ShuffleBlockInfoUtils.getChunkOffsetsFromShuffleBlockInfos(
- startMapIndex,
- endMapIndex,
- shuffleChunkSize,
-
ShuffleBlockInfoUtils.parseShuffleBlockInfosFromByteBuffer(indexBuffer)));
- indexInputStream.close();
+ List<Long> offsets;
+ try (FSDataInputStream indexInputStream =
+ ShuffleClient.getHdfsFs(conf).open(new Path(indexPath))) {
+ logger.debug("read sorted index {}", indexPath);
+ long indexSize = ShuffleClient.getHdfsFs(conf).getFileStatus(new
Path(indexPath)).getLen();
+ // Index size won't be large, so it's safe to do the conversion.
+ byte[] indexBuffer = new byte[(int) indexSize];
+ indexInputStream.readFully(0L, indexBuffer);
+ offsets =
+ new ArrayList<>(
+ ShuffleBlockInfoUtils.getChunkOffsetsFromShuffleBlockInfos(
+ startMapIndex,
+ endMapIndex,
+ shuffleChunkSize,
+
ShuffleBlockInfoUtils.parseShuffleBlockInfosFromByteBuffer(indexBuffer)));
+ }
return offsets;
}