This is an automated email from the ASF dual-hosted git repository.
hashutosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tez.git
The following commit(s) were added to refs/heads/master by this push:
new 2c94033 TEZ-4185 : Tez may skip file permission update on
intermediate output (Attila Magyar via Ashutosh Chauhan)
2c94033 is described below
commit 2c94033ea3222f78b6e2439b32d39da906127734
Author: Attila Magyar <[email protected]>
AuthorDate: Thu Jun 4 18:15:53 2020 -0700
TEZ-4185 : Tez may skip file permission update on intermediate output
(Attila Magyar via Ashutosh Chauhan)
Signed-off-by: Ashutosh Chauhan <[email protected]>
---
.../library/common/sort/impl/PipelinedSorter.java | 14 +++-----
.../library/common/sort/impl/TezSpillRecord.java | 9 +++--
.../common/sort/impl/dflt/DefaultSorter.java | 14 +++-----
.../writers/UnorderedPartitionedKVWriter.java | 22 ++++--------
.../writers/TestUnorderedPartitionedKVWriter.java | 39 +++++++++++++---------
5 files changed, 43 insertions(+), 55 deletions(-)
diff --git
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
index 610cae9..bc68a5f 100644
---
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
+++
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
@@ -69,7 +69,7 @@ import org.apache.tez.util.StopWatch;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import static
org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord.SPILL_FILE_PERMS;
+import static
org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord.ensureSpillFilePermissions;
@SuppressWarnings({"unchecked", "rawtypes"})
public class PipelinedSorter extends ExternalSorter {
@@ -488,9 +488,7 @@ public class PipelinedSorter extends ExternalSorter {
* MAP_OUTPUT_INDEX_RECORD_LENGTH);
spillFilePaths.put(numSpills, filename);
FSDataOutputStream out = rfs.create(filename, true, 4096);
- if
(!SPILL_FILE_PERMS.equals(SPILL_FILE_PERMS.applyUMask(FsPermission.getUMask(conf))))
{
- rfs.setPermission(filename, SPILL_FILE_PERMS);
- }
+ ensureSpillFilePermissions(filename, rfs);
try {
LOG.info(outputContext.getDestinationVertexName() + ": Spilling to " +
filename.toString() +
@@ -576,9 +574,7 @@ public class PipelinedSorter extends ExternalSorter {
mapOutputFile.getSpillFileForWrite(numSpills, size);
spillFilePaths.put(numSpills, filename);
out = rfs.create(filename, true, 4096);
- if
(!SPILL_FILE_PERMS.equals(SPILL_FILE_PERMS.applyUMask(FsPermission.getUMask(conf))))
{
- rfs.setPermission(filename, SPILL_FILE_PERMS);
- }
+ ensureSpillFilePermissions(filename, rfs);
LOG.info(outputContext.getDestinationVertexName() + ": Spilling to " +
filename.toString());
for (int i = 0; i < partitions; ++i) {
if (isThreadInterrupted()) {
@@ -761,9 +757,7 @@ public class PipelinedSorter extends ExternalSorter {
}
//The output stream for the final single output file
FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096);
- if
(!SPILL_FILE_PERMS.equals(SPILL_FILE_PERMS.applyUMask(FsPermission.getUMask(conf))))
{
- rfs.setPermission(finalOutputFile, SPILL_FILE_PERMS);
- }
+ ensureSpillFilePermissions(finalOutputFile, rfs);
final TezSpillRecord spillRec = new TezSpillRecord(partitions);
diff --git
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezSpillRecord.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezSpillRecord.java
index e16b7a0..1c9edee 100644
---
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezSpillRecord.java
+++
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezSpillRecord.java
@@ -143,10 +143,13 @@ public class TezSpillRecord {
} else {
out.close();
}
- if
(!SPILL_FILE_PERMS.equals(SPILL_FILE_PERMS.applyUMask(FsPermission.getUMask(job))))
{
- rfs.setPermission(loc, SPILL_FILE_PERMS);
- }
+ ensureSpillFilePermissions(loc, rfs);
}
}
+ public static void ensureSpillFilePermissions(Path loc, FileSystem rfs)
throws IOException {
+ if
(!SPILL_FILE_PERMS.equals(SPILL_FILE_PERMS.applyUMask(FsPermission.getUMask(rfs.getConf()))))
{
+ rfs.setPermission(loc, SPILL_FILE_PERMS);
+ }
+ }
}
diff --git
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
index 124f078..d0a18b4 100644
---
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
+++
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
@@ -64,7 +64,7 @@ import
org.apache.tez.runtime.library.common.sort.impl.TezMerger.Segment;
import org.apache.tez.common.Preconditions;
-import static
org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord.SPILL_FILE_PERMS;
+import static
org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord.ensureSpillFilePermissions;
@SuppressWarnings({"unchecked", "rawtypes"})
public final class DefaultSorter extends ExternalSorter implements
IndexedSortable {
@@ -896,9 +896,7 @@ public final class DefaultSorter extends ExternalSorter
implements IndexedSortab
mapOutputFile.getSpillFileForWrite(numSpills, size);
spillFilePaths.put(numSpills, filename);
out = rfs.create(filename);
- if
(!SPILL_FILE_PERMS.equals(SPILL_FILE_PERMS.applyUMask(FsPermission.getUMask(conf))))
{
- rfs.setPermission(filename, SPILL_FILE_PERMS);
- }
+ ensureSpillFilePermissions(filename, rfs);
int spindex = mstart;
final InMemValBytes value = createInMemValBytes();
@@ -1007,9 +1005,7 @@ public final class DefaultSorter extends ExternalSorter
implements IndexedSortab
mapOutputFile.getSpillFileForWrite(numSpills, size);
spillFilePaths.put(numSpills, filename);
out = rfs.create(filename);
- if
(!SPILL_FILE_PERMS.equals(SPILL_FILE_PERMS.applyUMask(FsPermission.getUMask(conf))))
{
- rfs.setPermission(filename, SPILL_FILE_PERMS);
- }
+ ensureSpillFilePermissions(filename, rfs);
// we don't run the combiner for a single record
for (int i = 0; i < partitions; ++i) {
@@ -1283,9 +1279,7 @@ public final class DefaultSorter extends ExternalSorter
implements IndexedSortab
//The output stream for the final single output file
FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096);
- if
(!SPILL_FILE_PERMS.equals(SPILL_FILE_PERMS.applyUMask(FsPermission.getUMask(conf))))
{
- rfs.setPermission(finalOutputFile, SPILL_FILE_PERMS);
- }
+ ensureSpillFilePermissions(finalOutputFile, rfs);
if (numSpills == 0) {
// TODO Change event generation to say there is no data rather than
generating a dummy file
diff --git
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
index 62170d9..ffce5c6 100644
---
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
+++
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
@@ -87,7 +87,7 @@ import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.protobuf.ByteString;
-import static
org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord.SPILL_FILE_PERMS;
+import static
org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord.ensureSpillFilePermissions;
public class UnorderedPartitionedKVWriter extends
BaseUnorderedPartitionedKVWriter {
@@ -308,9 +308,7 @@ public class UnorderedPartitionedKVWriter extends
BaseUnorderedPartitionedKVWrit
finalOutPath = outputFileHandler.getOutputFileForWrite();
writer = new IFile.Writer(conf, rfs, finalOutPath, keyClass, valClass,
codec, outputRecordsCounter, outputRecordBytesCounter);
- if
(!SPILL_FILE_PERMS.equals(SPILL_FILE_PERMS.applyUMask(FsPermission.getUMask(conf))))
{
- rfs.setPermission(finalOutPath, SPILL_FILE_PERMS);
- }
+ ensureSpillFilePermissions(finalOutPath, rfs);
}
} else {
skipBuffers = false;
@@ -628,9 +626,7 @@ public class UnorderedPartitionedKVWriter extends
BaseUnorderedPartitionedKVWrit
}
LOG.info("Writing spill " + spillNumber + " to " +
spillPathDetails.outputFilePath.toString());
FSDataOutputStream out = rfs.create(spillPathDetails.outputFilePath);
- if
(!SPILL_FILE_PERMS.equals(SPILL_FILE_PERMS.applyUMask(FsPermission.getUMask(conf))))
{
- rfs.setPermission(spillPathDetails.outputFilePath, SPILL_FILE_PERMS);
- }
+ ensureSpillFilePermissions(spillPathDetails.outputFilePath, rfs);
TezSpillRecord spillRecord = new TezSpillRecord(numPartitions);
DataInputBuffer key = new DataInputBuffer();
DataInputBuffer val = new DataInputBuffer();
@@ -728,9 +724,7 @@ public class UnorderedPartitionedKVWriter extends
BaseUnorderedPartitionedKVWrit
if (((IFile.FileBackedInMemIFileWriter) writer).isDataFlushedToDisk()) {
this.finalOutPath =
((IFile.FileBackedInMemIFileWriter) writer).getOutputPath();
- if
(!SPILL_FILE_PERMS.equals(SPILL_FILE_PERMS.applyUMask(FsPermission.getUMask(conf))))
{
- rfs.setPermission(finalOutPath, SPILL_FILE_PERMS);
- }
+ ensureSpillFilePermissions(finalOutPath, rfs);
additionalSpillBytesWritternCounter.increment(writer.getCompressedLength());
}
}
@@ -1080,9 +1074,7 @@ public class UnorderedPartitionedKVWriter extends
BaseUnorderedPartitionedKVWrit
FSDataOutputStream out = null;
try {
out = rfs.create(finalOutPath);
- if
(!SPILL_FILE_PERMS.equals(SPILL_FILE_PERMS.applyUMask(FsPermission.getUMask(conf))))
{
- rfs.setPermission(finalOutPath, SPILL_FILE_PERMS);
- }
+ ensureSpillFilePermissions(finalOutPath, rfs);
Writer writer = null;
for (int i = 0; i < numPartitions; i++) {
@@ -1171,9 +1163,7 @@ public class UnorderedPartitionedKVWriter extends
BaseUnorderedPartitionedKVWrit
final TezSpillRecord spillRecord = new TezSpillRecord(numPartitions);
final Path outPath = spillPathDetails.outputFilePath;
out = rfs.create(outPath);
- if
(!SPILL_FILE_PERMS.equals(SPILL_FILE_PERMS.applyUMask(FsPermission.getUMask(conf))))
{
- rfs.setPermission(outPath, SPILL_FILE_PERMS);
- }
+ ensureSpillFilePermissions(outPath, rfs);
BitSet emptyPartitions = null;
if (pipelinedShuffle || !isFinalMergeEnabled) {
emptyPartitions = new BitSet(numPartitions);
diff --git
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
index 3692392..f4e99ec 100644
---
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
+++
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
@@ -53,6 +53,7 @@ import java.util.regex.Pattern;
import com.google.protobuf.ByteString;
import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.permission.FsAction;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.runtime.api.TaskFailureType;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
@@ -520,10 +521,7 @@ public class TestUnorderedPartitionedKVWriter {
if (numRecordsWritten > 0) {
assertTrue(localFs.exists(outputFilePath));
assertTrue(localFs.exists(spillFilePath));
- assertEquals("Incorrect output permissions", (short)0640,
- localFs.getFileStatus(outputFilePath).getPermission().toShort());
- assertEquals("Incorrect index permissions", (short)0640,
- localFs.getFileStatus(spillFilePath).getPermission().toShort());
+ checkPermissions(outputFilePath, spillFilePath);
} else {
return;
}
@@ -812,16 +810,24 @@ public class TestUnorderedPartitionedKVWriter {
Path indexFile = taskOutput.getSpillIndexFileForWrite(i, 10);
assertTrue(localFs.exists(outputFile));
assertTrue(localFs.exists(indexFile));
- assertEquals("Incorrect output permissions", (short)0640,
- localFs.getFileStatus(outputFile).getPermission().toShort());
- assertEquals("Incorrect index permissions", (short)0640,
- localFs.getFileStatus(indexFile).getPermission().toShort());
+ checkPermissions(outputFile, indexFile);
}
} else {
return;
}
}
+ private void checkPermissions(Path outputFile, Path indexFile) throws
IOException {
+ assertEquals("Incorrect output permissions (user)", FsAction.READ_WRITE,
+ localFs.getFileStatus(outputFile).getPermission().getUserAction());
+ assertEquals("Incorrect output permissions (group)", FsAction.READ,
+ localFs.getFileStatus(outputFile).getPermission().getGroupAction());
+ assertEquals("Incorrect index permissions (user)", FsAction.READ_WRITE,
+ localFs.getFileStatus(indexFile).getPermission().getUserAction());
+ assertEquals("Incorrect index permissions (group)", FsAction.READ,
+ localFs.getFileStatus(indexFile).getPermission().getGroupAction());
+ }
+
private void verifyEmptyPartitions(DataMovementEventPayloadProto eventProto,
int numRecordsWritten, int numPartitions, Set<Integer> skippedPartitions)
throws IOException {
@@ -1065,10 +1071,7 @@ public class TestUnorderedPartitionedKVWriter {
Path outputPath = new Path(outputContext.getWorkDirs()[0],
"output/" + eventProto.getPathComponent() + "/" +
Constants.TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING);
Path indexPath =
outputPath.suffix(Constants.TEZ_RUNTIME_TASK_OUTPUT_INDEX_SUFFIX_STRING);
- assertEquals("Incorrect output permissions", (short)0640,
- localFs.getFileStatus(outputPath).getPermission().toShort());
- assertEquals("Incorrect index permissions", (short)0640,
- localFs.getFileStatus(indexPath).getPermission().toShort());
+ checkPermissions(outputPath, indexPath);
} else {
assertEquals(0, eventProto.getSpillId());
if (outputRecordsCounter.getValue() > 0) {
@@ -1289,12 +1292,16 @@ public class TestUnorderedPartitionedKVWriter {
boolean isInMem= eventProto.getData().hasData();
assertTrue(localFs.exists(outputFilePath));
- assertEquals("Incorrect output permissions", (short) 0640,
- localFs.getFileStatus(outputFilePath).getPermission().toShort());
+ assertEquals("Incorrect output permissions (user)", FsAction.READ_WRITE,
+
localFs.getFileStatus(outputFilePath).getPermission().getUserAction());
+ assertEquals("Incorrect output permissions (group)", FsAction.READ,
+
localFs.getFileStatus(outputFilePath).getPermission().getGroupAction());
if( !isInMem ) {
assertTrue(localFs.exists(spillFilePath));
- assertEquals("Incorrect index permissions", (short) 0640,
- localFs.getFileStatus(spillFilePath).getPermission().toShort());
+ assertEquals("Incorrect index permissions (user)", FsAction.READ_WRITE,
+
localFs.getFileStatus(spillFilePath).getPermission().getUserAction());
+ assertEquals("Incorrect index permissions (group)", FsAction.READ,
+
localFs.getFileStatus(spillFilePath).getPermission().getGroupAction());
// verify no intermediate spill files have been left around
synchronized (kvWriter.spillInfoList) {