This is an automated email from the ASF dual-hosted git repository.

hashutosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tez.git


The following commit(s) were added to refs/heads/master by this push:
     new 2c94033  TEZ-4185 : Tez may skip file permission update on 
intermediate output (Attila Magyar via Ashutosh Chauhan)
2c94033 is described below

commit 2c94033ea3222f78b6e2439b32d39da906127734
Author: Attila Magyar <[email protected]>
AuthorDate: Thu Jun 4 18:15:53 2020 -0700

    TEZ-4185 : Tez may skip file permission update on intermediate output 
(Attila Magyar via Ashutosh Chauhan)
    
    Signed-off-by: Ashutosh Chauhan <[email protected]>
---
 .../library/common/sort/impl/PipelinedSorter.java  | 14 +++-----
 .../library/common/sort/impl/TezSpillRecord.java   |  9 +++--
 .../common/sort/impl/dflt/DefaultSorter.java       | 14 +++-----
 .../writers/UnorderedPartitionedKVWriter.java      | 22 ++++--------
 .../writers/TestUnorderedPartitionedKVWriter.java  | 39 +++++++++++++---------
 5 files changed, 43 insertions(+), 55 deletions(-)

diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
index 610cae9..bc68a5f 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
@@ -69,7 +69,7 @@ import org.apache.tez.util.StopWatch;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
-import static 
org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord.SPILL_FILE_PERMS;
+import static 
org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord.ensureSpillFilePermissions;
 
 @SuppressWarnings({"unchecked", "rawtypes"})
 public class PipelinedSorter extends ExternalSorter {
@@ -488,9 +488,7 @@ public class PipelinedSorter extends ExternalSorter {
             * MAP_OUTPUT_INDEX_RECORD_LENGTH);
     spillFilePaths.put(numSpills, filename);
     FSDataOutputStream out = rfs.create(filename, true, 4096);
-    if 
(!SPILL_FILE_PERMS.equals(SPILL_FILE_PERMS.applyUMask(FsPermission.getUMask(conf))))
 {
-      rfs.setPermission(filename, SPILL_FILE_PERMS);
-    }
+    ensureSpillFilePermissions(filename, rfs);
 
     try {
       LOG.info(outputContext.getDestinationVertexName() + ": Spilling to " + 
filename.toString() +
@@ -576,9 +574,7 @@ public class PipelinedSorter extends ExternalSorter {
         mapOutputFile.getSpillFileForWrite(numSpills, size);
       spillFilePaths.put(numSpills, filename);
       out = rfs.create(filename, true, 4096);
-      if 
(!SPILL_FILE_PERMS.equals(SPILL_FILE_PERMS.applyUMask(FsPermission.getUMask(conf))))
 {
-        rfs.setPermission(filename, SPILL_FILE_PERMS);
-      }
+      ensureSpillFilePermissions(filename, rfs);
       LOG.info(outputContext.getDestinationVertexName() + ": Spilling to " + 
filename.toString());
       for (int i = 0; i < partitions; ++i) {
         if (isThreadInterrupted()) {
@@ -761,9 +757,7 @@ public class PipelinedSorter extends ExternalSorter {
       }
       //The output stream for the final single output file
       FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096);
-      if 
(!SPILL_FILE_PERMS.equals(SPILL_FILE_PERMS.applyUMask(FsPermission.getUMask(conf))))
 {
-        rfs.setPermission(finalOutputFile, SPILL_FILE_PERMS);
-      }
+      ensureSpillFilePermissions(finalOutputFile, rfs);
 
       final TezSpillRecord spillRec = new TezSpillRecord(partitions);
 
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezSpillRecord.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezSpillRecord.java
index e16b7a0..1c9edee 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezSpillRecord.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezSpillRecord.java
@@ -143,10 +143,13 @@ public class TezSpillRecord {
       } else {
         out.close();
       }
-      if 
(!SPILL_FILE_PERMS.equals(SPILL_FILE_PERMS.applyUMask(FsPermission.getUMask(job))))
 {
-        rfs.setPermission(loc, SPILL_FILE_PERMS);
-      }
+      ensureSpillFilePermissions(loc, rfs);
     }
   }
 
+  public static void ensureSpillFilePermissions(Path loc, FileSystem rfs) 
throws IOException {
+    if 
(!SPILL_FILE_PERMS.equals(SPILL_FILE_PERMS.applyUMask(FsPermission.getUMask(rfs.getConf()))))
 {
+      rfs.setPermission(loc, SPILL_FILE_PERMS);
+    }
+  }
 }
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
index 124f078..d0a18b4 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
@@ -64,7 +64,7 @@ import 
org.apache.tez.runtime.library.common.sort.impl.TezMerger.Segment;
 
 import org.apache.tez.common.Preconditions;
 
-import static 
org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord.SPILL_FILE_PERMS;
+import static 
org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord.ensureSpillFilePermissions;
 
 @SuppressWarnings({"unchecked", "rawtypes"})
 public final class DefaultSorter extends ExternalSorter implements 
IndexedSortable {
@@ -896,9 +896,7 @@ public final class DefaultSorter extends ExternalSorter 
implements IndexedSortab
           mapOutputFile.getSpillFileForWrite(numSpills, size);
       spillFilePaths.put(numSpills, filename);
       out = rfs.create(filename);
-      if 
(!SPILL_FILE_PERMS.equals(SPILL_FILE_PERMS.applyUMask(FsPermission.getUMask(conf))))
 {
-        rfs.setPermission(filename, SPILL_FILE_PERMS);
-      }
+      ensureSpillFilePermissions(filename, rfs);
 
       int spindex = mstart;
       final InMemValBytes value = createInMemValBytes();
@@ -1007,9 +1005,7 @@ public final class DefaultSorter extends ExternalSorter 
implements IndexedSortab
           mapOutputFile.getSpillFileForWrite(numSpills, size);
       spillFilePaths.put(numSpills, filename);
       out = rfs.create(filename);
-      if 
(!SPILL_FILE_PERMS.equals(SPILL_FILE_PERMS.applyUMask(FsPermission.getUMask(conf))))
 {
-        rfs.setPermission(filename, SPILL_FILE_PERMS);
-      }
+      ensureSpillFilePermissions(filename, rfs);
 
       // we don't run the combiner for a single record
       for (int i = 0; i < partitions; ++i) {
@@ -1283,9 +1279,7 @@ public final class DefaultSorter extends ExternalSorter 
implements IndexedSortab
 
     //The output stream for the final single output file
     FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096);
-    if 
(!SPILL_FILE_PERMS.equals(SPILL_FILE_PERMS.applyUMask(FsPermission.getUMask(conf))))
 {
-      rfs.setPermission(finalOutputFile, SPILL_FILE_PERMS);
-    }
+    ensureSpillFilePermissions(finalOutputFile, rfs);
 
     if (numSpills == 0) {
       // TODO Change event generation to say there is no data rather than 
generating a dummy file
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
index 62170d9..ffce5c6 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
@@ -87,7 +87,7 @@ import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.protobuf.ByteString;
 
-import static 
org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord.SPILL_FILE_PERMS;
+import static 
org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord.ensureSpillFilePermissions;
 
 public class UnorderedPartitionedKVWriter extends 
BaseUnorderedPartitionedKVWriter {
 
@@ -308,9 +308,7 @@ public class UnorderedPartitionedKVWriter extends 
BaseUnorderedPartitionedKVWrit
         finalOutPath = outputFileHandler.getOutputFileForWrite();
         writer = new IFile.Writer(conf, rfs, finalOutPath, keyClass, valClass,
             codec, outputRecordsCounter, outputRecordBytesCounter);
-        if 
(!SPILL_FILE_PERMS.equals(SPILL_FILE_PERMS.applyUMask(FsPermission.getUMask(conf))))
 {
-          rfs.setPermission(finalOutPath, SPILL_FILE_PERMS);
-        }
+        ensureSpillFilePermissions(finalOutPath, rfs);
       }
     } else {
       skipBuffers = false;
@@ -628,9 +626,7 @@ public class UnorderedPartitionedKVWriter extends 
BaseUnorderedPartitionedKVWrit
       }
       LOG.info("Writing spill " + spillNumber + " to " + 
spillPathDetails.outputFilePath.toString());
       FSDataOutputStream out = rfs.create(spillPathDetails.outputFilePath);
-      if 
(!SPILL_FILE_PERMS.equals(SPILL_FILE_PERMS.applyUMask(FsPermission.getUMask(conf))))
 {
-        rfs.setPermission(spillPathDetails.outputFilePath, SPILL_FILE_PERMS);
-      }
+      ensureSpillFilePermissions(spillPathDetails.outputFilePath, rfs);
       TezSpillRecord spillRecord = new TezSpillRecord(numPartitions);
       DataInputBuffer key = new DataInputBuffer();
       DataInputBuffer val = new DataInputBuffer();
@@ -728,9 +724,7 @@ public class UnorderedPartitionedKVWriter extends 
BaseUnorderedPartitionedKVWrit
       if (((IFile.FileBackedInMemIFileWriter) writer).isDataFlushedToDisk()) {
         this.finalOutPath =
             ((IFile.FileBackedInMemIFileWriter) writer).getOutputPath();
-        if 
(!SPILL_FILE_PERMS.equals(SPILL_FILE_PERMS.applyUMask(FsPermission.getUMask(conf))))
 {
-          rfs.setPermission(finalOutPath, SPILL_FILE_PERMS);
-        }
+        ensureSpillFilePermissions(finalOutPath, rfs);
         
additionalSpillBytesWritternCounter.increment(writer.getCompressedLength());
       }
     }
@@ -1080,9 +1074,7 @@ public class UnorderedPartitionedKVWriter extends 
BaseUnorderedPartitionedKVWrit
     FSDataOutputStream out = null;
     try {
       out = rfs.create(finalOutPath);
-      if 
(!SPILL_FILE_PERMS.equals(SPILL_FILE_PERMS.applyUMask(FsPermission.getUMask(conf))))
 {
-        rfs.setPermission(finalOutPath, SPILL_FILE_PERMS);
-      }
+      ensureSpillFilePermissions(finalOutPath, rfs);
       Writer writer = null;
 
       for (int i = 0; i < numPartitions; i++) {
@@ -1171,9 +1163,7 @@ public class UnorderedPartitionedKVWriter extends 
BaseUnorderedPartitionedKVWrit
       final TezSpillRecord spillRecord = new TezSpillRecord(numPartitions);
       final Path outPath = spillPathDetails.outputFilePath;
       out = rfs.create(outPath);
-      if 
(!SPILL_FILE_PERMS.equals(SPILL_FILE_PERMS.applyUMask(FsPermission.getUMask(conf))))
 {
-        rfs.setPermission(outPath, SPILL_FILE_PERMS);
-      }
+      ensureSpillFilePermissions(outPath, rfs);
       BitSet emptyPartitions = null;
       if (pipelinedShuffle || !isFinalMergeEnabled) {
         emptyPartitions = new BitSet(numPartitions);
diff --git 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
index 3692392..f4e99ec 100644
--- 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
+++ 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
@@ -53,6 +53,7 @@ import java.util.regex.Pattern;
 
 import com.google.protobuf.ByteString;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.runtime.api.TaskFailureType;
 import org.apache.tez.runtime.api.events.VertexManagerEvent;
@@ -520,10 +521,7 @@ public class TestUnorderedPartitionedKVWriter {
     if (numRecordsWritten > 0) {
       assertTrue(localFs.exists(outputFilePath));
       assertTrue(localFs.exists(spillFilePath));
-      assertEquals("Incorrect output permissions", (short)0640,
-          localFs.getFileStatus(outputFilePath).getPermission().toShort());
-      assertEquals("Incorrect index permissions", (short)0640,
-          localFs.getFileStatus(spillFilePath).getPermission().toShort());
+      checkPermissions(outputFilePath, spillFilePath);
     } else {
       return;
     }
@@ -812,16 +810,24 @@ public class TestUnorderedPartitionedKVWriter {
         Path indexFile = taskOutput.getSpillIndexFileForWrite(i, 10);
         assertTrue(localFs.exists(outputFile));
         assertTrue(localFs.exists(indexFile));
-        assertEquals("Incorrect output permissions", (short)0640,
-            localFs.getFileStatus(outputFile).getPermission().toShort());
-        assertEquals("Incorrect index permissions", (short)0640,
-            localFs.getFileStatus(indexFile).getPermission().toShort());
+        checkPermissions(outputFile, indexFile);
       }
     } else {
       return;
     }
   }
 
+  private void checkPermissions(Path outputFile, Path indexFile) throws 
IOException {
+    assertEquals("Incorrect output permissions (user)", FsAction.READ_WRITE,
+        localFs.getFileStatus(outputFile).getPermission().getUserAction());
+    assertEquals("Incorrect output permissions (group)", FsAction.READ,
+        localFs.getFileStatus(outputFile).getPermission().getGroupAction());
+    assertEquals("Incorrect index permissions (user)", FsAction.READ_WRITE,
+        localFs.getFileStatus(indexFile).getPermission().getUserAction());
+    assertEquals("Incorrect index permissions (group)", FsAction.READ,
+        localFs.getFileStatus(indexFile).getPermission().getGroupAction());
+  }
+
   private void verifyEmptyPartitions(DataMovementEventPayloadProto eventProto,
       int numRecordsWritten, int numPartitions, Set<Integer> skippedPartitions)
       throws IOException {
@@ -1065,10 +1071,7 @@ public class TestUnorderedPartitionedKVWriter {
         Path outputPath = new Path(outputContext.getWorkDirs()[0],
             "output/" + eventProto.getPathComponent() + "/" + 
Constants.TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING);
         Path indexPath = 
outputPath.suffix(Constants.TEZ_RUNTIME_TASK_OUTPUT_INDEX_SUFFIX_STRING);
-        assertEquals("Incorrect output permissions", (short)0640,
-            localFs.getFileStatus(outputPath).getPermission().toShort());
-        assertEquals("Incorrect index permissions", (short)0640,
-            localFs.getFileStatus(indexPath).getPermission().toShort());
+        checkPermissions(outputPath, indexPath);
       } else {
         assertEquals(0, eventProto.getSpillId());
         if (outputRecordsCounter.getValue() > 0) {
@@ -1289,12 +1292,16 @@ public class TestUnorderedPartitionedKVWriter {
 
     boolean isInMem= eventProto.getData().hasData();
     assertTrue(localFs.exists(outputFilePath));
-    assertEquals("Incorrect output permissions", (short) 0640,
-            localFs.getFileStatus(outputFilePath).getPermission().toShort());
+    assertEquals("Incorrect output permissions (user)", FsAction.READ_WRITE,
+            
localFs.getFileStatus(outputFilePath).getPermission().getUserAction());
+    assertEquals("Incorrect output permissions (group)", FsAction.READ,
+            
localFs.getFileStatus(outputFilePath).getPermission().getGroupAction());
     if( !isInMem ) {
       assertTrue(localFs.exists(spillFilePath));
-      assertEquals("Incorrect index permissions", (short) 0640,
-              localFs.getFileStatus(spillFilePath).getPermission().toShort());
+      assertEquals("Incorrect index permissions (user)", FsAction.READ_WRITE,
+              
localFs.getFileStatus(spillFilePath).getPermission().getUserAction());
+      assertEquals("Incorrect index permissions (group)", FsAction.READ,
+              
localFs.getFileStatus(spillFilePath).getPermission().getGroupAction());
 
       // verify no intermediate spill files have been left around
       synchronized (kvWriter.spillInfoList) {

Reply via email to