This is an automated email from the ASF dual-hosted git repository.

abstractdog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tez.git


The following commit(s) were added to refs/heads/master by this push:
     new 0c5cf6886 TEZ-4522: Use OpenFile where FileStatus is available. (#318) 
(Ayush Saxena reviewed by Laszlo Bodor)
0c5cf6886 is described below

commit 0c5cf688690bacc662a015b1bff0ecb556314b90
Author: Ayush Saxena <ayushsax...@apache.org>
AuthorDate: Mon Dec 4 20:29:55 2023 +0530

    TEZ-4522: Use OpenFile where FileStatus is available. (#318) (Ayush Saxena 
reviewed by Laszlo Bodor)
---
 .../main/java/org/apache/tez/common/TezCommonUtils.java  |  6 ++++--
 .../main/java/org/apache/tez/dag/app/RecoveryParser.java |  8 ++++----
 .../hadoop/mapreduce/split/SplitMetaInfoReaderTez.java   |  3 ++-
 .../runtime/library/common/sort/impl/TezSpillRecord.java | 16 +++++++---------
 .../java/org/apache/tez/tools/TFileRecordReader.java     |  8 +++++---
 5 files changed, 22 insertions(+), 19 deletions(-)

diff --git a/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java 
b/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java
index dd28eed39..28799c119 100644
--- a/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java
+++ b/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java
@@ -38,6 +38,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
@@ -293,9 +294,10 @@ public final class TezCommonUtils {
   public static void mkDirForAM(FileSystem fs, Path dir) throws IOException {
     FsPermission perm = new FsPermission(TEZ_AM_DIR_PERMISSION);
     fs.mkdirs(dir, perm);
-    if (!fs.getFileStatus(dir).getPermission().equals(perm)) {
+    FileStatus fileStatus = fs.getFileStatus(dir);
+    if (!fileStatus.getPermission().equals(perm)) {
       LOG.warn("Directory " + dir.toString() + " created with unexpected 
permissions : "
-            + fs.getFileStatus(dir).getPermission() + ". Fixing permissions to 
correct value : "
+            + fileStatus.getPermission() + ". Fixing permissions to correct 
value : "
             + perm.toString());
       fs.setPermission(dir, perm);
     }
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java 
b/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
index 656f38fb1..0f40700cf 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.functional.FutureIO;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.tez.common.TezCommonUtils;
@@ -430,12 +431,11 @@ public class RecoveryParser {
     return TezCommonUtils.getSummaryRecoveryPath(attemptRecoveryDataDir);
   }
 
-  private FSDataInputStream getSummaryStream(Path summaryPath) throws 
IOException {
+  private FSDataInputStream getSummaryStream(Path summaryPath, FileStatus 
summaryFileStatus) throws IOException {
     try {
-      return recoveryFS.open(summaryPath, recoveryBufferSize);
+      return 
FutureIO.awaitFuture(recoveryFS.openFile(summaryPath).withFileStatus(summaryFileStatus).build());
     } catch (FileNotFoundException fnf) {
       return null;
-
     }
   }
 
@@ -667,7 +667,7 @@ public class RecoveryParser {
           + ", len=" + summaryFileStatus.getLen()
           + ", lastModTime=" + summaryFileStatus.getModificationTime());
       FSDataInputStream summaryStream = getSummaryStream(
-          summaryFile);
+          summaryFile, summaryFileStatus);
       while (true) {
         RecoveryProtos.SummaryEventProto proto;
         try {
diff --git 
a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitMetaInfoReaderTez.java
 
b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitMetaInfoReaderTez.java
index 394c871ab..d69d21127 100644
--- 
a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitMetaInfoReaderTez.java
+++ 
b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitMetaInfoReaderTez.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
+import org.apache.hadoop.util.functional.FutureIO;
 import org.apache.tez.common.MRFrameworkConfigs;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
 
@@ -78,7 +79,7 @@ public final class SplitMetaInfoReaderTez {
         throw new IOException("Split metadata size exceeded " + maxMetaInfoSize
             + ". Aborting job ");
       }
-      in = fs.open(metaSplitFile);
+      in = 
FutureIO.awaitFuture(fs.openFile(metaSplitFile).withFileStatus(fStatus).build());
       byte[] header = new byte[JobSplit.META_SPLIT_FILE_HEADER.length];
       in.readFully(header);
       if (!Arrays.equals(JobSplit.META_SPLIT_FILE_HEADER, header)) {
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezSpillRecord.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezSpillRecord.java
index 1c9edeead..feed70f49 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezSpillRecord.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezSpillRecord.java
@@ -28,11 +28,13 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.PureJavaCrc32;
+import org.apache.hadoop.util.functional.FutureIO;
 import org.apache.tez.runtime.library.common.Constants;
 
 public class TezSpillRecord {
@@ -66,11 +68,10 @@ public class TezSpillRecord {
                      String expectedIndexOwner)
       throws IOException {
 
-    final FSDataInputStream in = rfs.open(indexFileName);
-    try {
-      final long length = rfs.getFileStatus(indexFileName).getLen();
-      final int partitions = 
-          (int) length / Constants.MAP_OUTPUT_INDEX_RECORD_LENGTH;
+    FileStatus fileStatus = rfs.getFileStatus(indexFileName);
+    final long length = fileStatus.getLen();
+    try (FSDataInputStream in = 
FutureIO.awaitFuture(rfs.openFile(indexFileName).withFileStatus(fileStatus).build()))
 {
+      final int partitions = (int) length / 
Constants.MAP_OUTPUT_INDEX_RECORD_LENGTH;
       final int size = partitions * Constants.MAP_OUTPUT_INDEX_RECORD_LENGTH;
 
       buf = ByteBuffer.allocate(size);
@@ -79,15 +80,12 @@ public class TezSpillRecord {
         CheckedInputStream chk = new CheckedInputStream(in, crc);
         IOUtils.readFully(chk, buf.array(), 0, size);
         if (chk.getChecksum().getValue() != in.readLong()) {
-          throw new ChecksumException("Checksum error reading spill index: " +
-                                indexFileName, -1);
+          throw new ChecksumException("Checksum error reading spill index: " + 
indexFileName, -1);
         }
       } else {
         IOUtils.readFully(in, buf.array(), 0, size);
       }
       entries = buf.asLongBuffer();
-    } finally {
-      in.close();
     }
   }
 
diff --git 
a/tez-tools/tez-tfile-parser/src/main/java/org/apache/tez/tools/TFileRecordReader.java
 
b/tez-tools/tez-tfile-parser/src/main/java/org/apache/tez/tools/TFileRecordReader.java
index 41744676e..90dc72962 100644
--- 
a/tez-tools/tez-tfile-parser/src/main/java/org/apache/tez/tools/TFileRecordReader.java
+++ 
b/tez-tools/tez-tfile-parser/src/main/java/org/apache/tez/tools/TFileRecordReader.java
@@ -23,6 +23,7 @@ import org.apache.commons.io.IOUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BytesWritable;
@@ -32,6 +33,7 @@ import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.util.functional.FutureIO;
 
 import java.io.BufferedReader;
 import java.io.EOFException;
@@ -74,9 +76,9 @@ public class TFileRecordReader extends RecordReader<Text, 
Text> {
 
     FileSystem fs = 
fileSplit.getPath().getFileSystem(context.getConfiguration());
     splitPath = fileSplit.getPath();
-    fin = fs.open(splitPath);
-    reader = new TFile.Reader(fin, fs.getFileStatus(splitPath).getLen(),
-        context.getConfiguration());
+    FileStatus fileStatus = fs.getFileStatus(splitPath);
+    fin = 
FutureIO.awaitFuture(fs.openFile(splitPath).withFileStatus(fileStatus).build());
+    reader = new TFile.Reader(fin, fileStatus.getLen(), 
context.getConfiguration());
     scanner = reader.createScannerByByteRange(start, fileSplit.getLength());
   }
 

Reply via email to