Repository: tez Updated Branches: refs/heads/master 8bfbdfefa -> 280a98ed4
TEZ-3348. NullPointerException in Tez MROutput while trying to write using Parquet's DeprecatedParquetOutputFormat. (Piyush Narang via hitesh) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/280a98ed Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/280a98ed Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/280a98ed Branch: refs/heads/master Commit: 280a98ed431aeb15890de1cca5397f63f0455905 Parents: 8bfbdfe Author: Hitesh Shah <[email protected]> Authored: Tue Jul 19 15:00:03 2016 -0700 Committer: Hitesh Shah <[email protected]> Committed: Tue Jul 19 15:00:03 2016 -0700 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../apache/tez/mapreduce/output/MROutput.java | 5 +- .../tez/mapreduce/output/TestMROutput.java | 99 +++++++++++++++++++- 3 files changed, 103 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/280a98ed/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 6e29110..c400512 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3348. NullPointerException in Tez MROutput while trying to write using Parquet's DeprecatedParquetOutputFormat. TEZ-3356. Fix initializing of stats when custom ShuffleVertexManager is used. TEZ-3235. Modify Example TestOrderedWordCount job to test the IPC limit for large dag plans. TEZ-3337. Do not log empty fields of TaskAttemptFinishedEvent to avoid confusion. @@ -83,6 +84,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3348. NullPointerException in Tez MROutput while trying to write using Parquet's DeprecatedParquetOutputFormat. TEZ-3356. Fix initializing of stats when custom ShuffleVertexManager is used. TEZ-3329. Tez ATS data is incomplete for a vertex which fails or gets killed before initialization TEZ-3235. Modify Example TestOrderedWordCount job to test the IPC limit for large dag plans. http://git-wip-us.apache.org/repos/asf/tez/blob/280a98ed/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java index ec83bf5..043085d 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java @@ -395,6 +395,8 @@ public class MROutput extends AbstractLogicalOutput { throw new IOException(cnfe); } + initCommitter(jobConf, useNewApi); + try { newRecordWriter = newOutputFormat.getRecordWriter(newApiTaskAttemptContext); @@ -409,6 +411,8 @@ public class MROutput extends AbstractLogicalOutput { oldOutputFormat = jobConf.getOutputFormat(); outputFormatClassName = oldOutputFormat.getClass().getName(); + initCommitter(jobConf, useNewApi); + FileSystem fs = FileSystem.get(jobConf); String finalName = getOutputName(); @@ -416,7 +420,6 @@ public class MROutput extends AbstractLogicalOutput { oldOutputFormat.getRecordWriter( fs, jobConf, finalName, new MRReporter(getContext().getCounters())); } - initCommitter(jobConf, useNewApi); LOG.info(getContext().getDestinationVertexName() + ": " + "outputFormat=" + outputFormatClassName http://git-wip-us.apache.org/repos/asf/tez/blob/280a98ed/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java index 05bcd98..8b52cc9 100644 --- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java @@ -34,14 +34,17 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; +import org.apache.hadoop.util.Progressable; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.api.DataSinkDescriptor; @@ -166,6 +169,58 @@ public class TestMROutput { assertEquals(org.apache.hadoop.mapred.FileOutputCommitter.class, output.committer.getClass()); } + // test to try and use the WorkOutputPathOutputFormat - this checks that the getDefaultWorkFile is + // set while creating recordWriters + @Test(timeout = 5000) + public void testNewAPI_WorkOutputPathOutputFormat() throws Exception { + String outputPath = "/tmp/output"; + Configuration conf = new Configuration(); + conf.setBoolean(MRConfig.IS_MAP_PROCESSOR, true); + DataSinkDescriptor dataSink = MROutput + .createConfigBuilder(conf, NewAPI_WorkOutputPathReadingOutputFormat.class, outputPath) + .build(); + + OutputContext outputContext = createMockOutputContext(dataSink.getOutputDescriptor().getUserPayload()); + MROutput output = new MROutput(outputContext, 2); + output.initialize(); + + assertEquals(true, output.isMapperOutput); + assertEquals(true, output.useNewApi); + assertEquals(NewAPI_WorkOutputPathReadingOutputFormat.class, output.newOutputFormat.getClass()); + assertNull(output.oldOutputFormat); + assertNotNull(output.newApiTaskAttemptContext); + assertNull(output.oldApiTaskAttemptContext); + assertNotNull(output.newRecordWriter); + assertNull(output.oldRecordWriter); + assertEquals(FileOutputCommitter.class, output.committer.getClass()); + } + + // test to try and use the WorkOutputPathOutputFormat - this checks that the workOutput path is + // set while creating recordWriters + @Test(timeout = 5000) + public void testOldAPI_WorkOutputPathOutputFormat() throws Exception { + String outputPath = "/tmp/output"; + Configuration conf = new Configuration(); + conf.setBoolean(MRConfig.IS_MAP_PROCESSOR, false); + DataSinkDescriptor dataSink = MROutput + .createConfigBuilder(conf, OldAPI_WorkOutputPathReadingOutputFormat.class, outputPath) + .build(); + + OutputContext outputContext = createMockOutputContext(dataSink.getOutputDescriptor().getUserPayload()); + MROutput output = new MROutput(outputContext, 2); + output.initialize(); + + assertEquals(false, output.isMapperOutput); + assertEquals(false, output.useNewApi); + assertEquals(OldAPI_WorkOutputPathReadingOutputFormat.class, output.oldOutputFormat.getClass()); + assertNull(output.newOutputFormat); + assertNotNull(output.oldApiTaskAttemptContext); + assertNull(output.newApiTaskAttemptContext); + assertNotNull(output.oldRecordWriter); + assertNull(output.newRecordWriter); + assertEquals(org.apache.hadoop.mapred.FileOutputCommitter.class, output.committer.getClass()); + } + private OutputContext createMockOutputContext(UserPayload payload) { OutputContext outputContext = mock(OutputContext.class); ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); @@ -199,7 +254,7 @@ public class TestMROutput { new Path(new Path(System.getProperty("test.build.data", "/tmp")), "TestMapOutput").makeQualified(fs.getUri(), fs.getWorkingDirectory()); - LogicalIOProcessorRuntimeTask task = new LogicalIOProcessorRuntimeTask( + return new LogicalIOProcessorRuntimeTask( taskSpec, 0, conf, @@ -209,7 +264,6 @@ public class TestMROutput { new HashMap<String, String>(), HashMultimap.<String, String>create(), null, "", new ExecutionContextImpl("localhost"), Runtime.getRuntime().maxMemory(), true, new DefaultHadoopShim()); - return task; } public static class TestOutputCommitter extends OutputCommitter { @@ -282,6 +336,47 @@ public class TestMROutput { } } + // OldAPI OutputFormat class that reads the workoutput path while creating recordWriters + public static class OldAPI_WorkOutputPathReadingOutputFormat extends org.apache.hadoop.mapred.FileOutputFormat<String, String> { + public static class NoOpRecordWriter implements org.apache.hadoop.mapred.RecordWriter<String, String> { + @Override + public void write(String key, String value) throws IOException {} + + @Override + public void close(Reporter reporter) throws IOException {} + } + + @Override + public org.apache.hadoop.mapred.RecordWriter<String, String> getRecordWriter( + FileSystem ignored, JobConf job, String name, Progressable progress) throws IOException { + // check work output path is not null + Path workOutputPath = org.apache.hadoop.mapred.FileOutputFormat.getWorkOutputPath(job); + assertNotNull(workOutputPath); + return new NoOpRecordWriter(); + } + } + + // NewAPI OutputFormat class that reads the default work file while creating recordWriters + public static class NewAPI_WorkOutputPathReadingOutputFormat extends FileOutputFormat<String, String> { + public static class NoOpRecordWriter extends RecordWriter<String, String> { + @Override + public void write(String key, String value) throws IOException, InterruptedException { + } + + @Override + public void close(TaskAttemptContext context) throws IOException, InterruptedException { + } + } + + @Override + public RecordWriter<String, String> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException { + // check default work file is not null + Path workOutputPath = getDefaultWorkFile(job, ".foo"); + assertNotNull(workOutputPath); + return new NoOpRecordWriter(); + } + } + public static class TestProcessor extends SimpleProcessor { public TestProcessor(ProcessorContext context) { super(context);
