Repository: tez
Updated Branches:
refs/heads/branch-0.8 71ec8564f -> 48f9f4c2a
TEZ-3348. NullPointerException in Tez MROutput while trying to write using
Parquet's DeprecatedParquetOutputFormat. (Piyush Narang via hitesh)
(cherry picked from commit 280a98ed431aeb15890de1cca5397f63f0455905)
Conflicts:
CHANGES.txt
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/48f9f4c2
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/48f9f4c2
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/48f9f4c2
Branch: refs/heads/branch-0.8
Commit: 48f9f4c2afeb7d0dc1c521920115f1d047e25aa8
Parents: 71ec856
Author: Hitesh Shah <[email protected]>
Authored: Tue Jul 19 15:00:03 2016 -0700
Committer: Hitesh Shah <[email protected]>
Committed: Tue Jul 19 15:01:40 2016 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/tez/mapreduce/output/MROutput.java | 5 +-
.../tez/mapreduce/output/TestMROutput.java | 99 +++++++++++++++++++-
3 files changed, 102 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/48f9f4c2/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 1419aa6..098b525 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-3348. NullPointerException in Tez MROutput while trying to write using
Parquet's DeprecatedParquetOutputFormat.
TEZ-3356. Fix initializing of stats when custom ShuffleVertexManager is used.
TEZ-3329. Tez ATS data is incomplete for a vertex which fails or gets killed
before initialization.
TEZ-3235. Modify Example TestOrderedWordCount job to test the IPC limit for
large dag plans.
http://git-wip-us.apache.org/repos/asf/tez/blob/48f9f4c2/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
----------------------------------------------------------------------
diff --git
a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
index ec83bf5..043085d 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
@@ -395,6 +395,8 @@ public class MROutput extends AbstractLogicalOutput {
throw new IOException(cnfe);
}
+ initCommitter(jobConf, useNewApi);
+
try {
newRecordWriter =
newOutputFormat.getRecordWriter(newApiTaskAttemptContext);
@@ -409,6 +411,8 @@ public class MROutput extends AbstractLogicalOutput {
oldOutputFormat = jobConf.getOutputFormat();
outputFormatClassName = oldOutputFormat.getClass().getName();
+ initCommitter(jobConf, useNewApi);
+
FileSystem fs = FileSystem.get(jobConf);
String finalName = getOutputName();
@@ -416,7 +420,6 @@ public class MROutput extends AbstractLogicalOutput {
oldOutputFormat.getRecordWriter(
fs, jobConf, finalName, new
MRReporter(getContext().getCounters()));
}
- initCommitter(jobConf, useNewApi);
LOG.info(getContext().getDestinationVertexName() + ": "
+ "outputFormat=" + outputFormatClassName
http://git-wip-us.apache.org/repos/asf/tez/blob/48f9f4c2/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java
----------------------------------------------------------------------
diff --git
a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java
b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java
index 05bcd98..8b52cc9 100644
---
a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java
+++
b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java
@@ -34,14 +34,17 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.DataSinkDescriptor;
@@ -166,6 +169,58 @@ public class TestMROutput {
assertEquals(org.apache.hadoop.mapred.FileOutputCommitter.class,
output.committer.getClass());
}
+ // test to try and use the WorkOutputPathOutputFormat - this checks that the
getDefaultWorkFile is
+ // set while creating recordWriters
+ @Test(timeout = 5000)
+ public void testNewAPI_WorkOutputPathOutputFormat() throws Exception {
+ String outputPath = "/tmp/output";
+ Configuration conf = new Configuration();
+ conf.setBoolean(MRConfig.IS_MAP_PROCESSOR, true);
+ DataSinkDescriptor dataSink = MROutput
+ .createConfigBuilder(conf,
NewAPI_WorkOutputPathReadingOutputFormat.class, outputPath)
+ .build();
+
+ OutputContext outputContext =
createMockOutputContext(dataSink.getOutputDescriptor().getUserPayload());
+ MROutput output = new MROutput(outputContext, 2);
+ output.initialize();
+
+ assertEquals(true, output.isMapperOutput);
+ assertEquals(true, output.useNewApi);
+ assertEquals(NewAPI_WorkOutputPathReadingOutputFormat.class,
output.newOutputFormat.getClass());
+ assertNull(output.oldOutputFormat);
+ assertNotNull(output.newApiTaskAttemptContext);
+ assertNull(output.oldApiTaskAttemptContext);
+ assertNotNull(output.newRecordWriter);
+ assertNull(output.oldRecordWriter);
+ assertEquals(FileOutputCommitter.class, output.committer.getClass());
+ }
+
+ // test to try and use the WorkOutputPathOutputFormat - this checks that the
workOutput path is
+ // set while creating recordWriters
+ @Test(timeout = 5000)
+ public void testOldAPI_WorkOutputPathOutputFormat() throws Exception {
+ String outputPath = "/tmp/output";
+ Configuration conf = new Configuration();
+ conf.setBoolean(MRConfig.IS_MAP_PROCESSOR, false);
+ DataSinkDescriptor dataSink = MROutput
+ .createConfigBuilder(conf,
OldAPI_WorkOutputPathReadingOutputFormat.class, outputPath)
+ .build();
+
+ OutputContext outputContext =
createMockOutputContext(dataSink.getOutputDescriptor().getUserPayload());
+ MROutput output = new MROutput(outputContext, 2);
+ output.initialize();
+
+ assertEquals(false, output.isMapperOutput);
+ assertEquals(false, output.useNewApi);
+ assertEquals(OldAPI_WorkOutputPathReadingOutputFormat.class,
output.oldOutputFormat.getClass());
+ assertNull(output.newOutputFormat);
+ assertNotNull(output.oldApiTaskAttemptContext);
+ assertNull(output.newApiTaskAttemptContext);
+ assertNotNull(output.oldRecordWriter);
+ assertNull(output.newRecordWriter);
+ assertEquals(org.apache.hadoop.mapred.FileOutputCommitter.class,
output.committer.getClass());
+ }
+
private OutputContext createMockOutputContext(UserPayload payload) {
OutputContext outputContext = mock(OutputContext.class);
ApplicationId appId =
ApplicationId.newInstance(System.currentTimeMillis(), 1);
@@ -199,7 +254,7 @@ public class TestMROutput {
new Path(new Path(System.getProperty("test.build.data", "/tmp")),
"TestMapOutput").makeQualified(fs.getUri(),
fs.getWorkingDirectory());
- LogicalIOProcessorRuntimeTask task = new LogicalIOProcessorRuntimeTask(
+ return new LogicalIOProcessorRuntimeTask(
taskSpec,
0,
conf,
@@ -209,7 +264,6 @@ public class TestMROutput {
new HashMap<String, String>(),
HashMultimap.<String, String>create(), null, "", new
ExecutionContextImpl("localhost"),
Runtime.getRuntime().maxMemory(), true, new DefaultHadoopShim());
- return task;
}
public static class TestOutputCommitter extends OutputCommitter {
@@ -282,6 +336,47 @@ public class TestMROutput {
}
}
+ // OldAPI OutputFormat class that reads the workoutput path while creating
recordWriters
+ public static class OldAPI_WorkOutputPathReadingOutputFormat extends
org.apache.hadoop.mapred.FileOutputFormat<String, String> {
+ public static class NoOpRecordWriter implements
org.apache.hadoop.mapred.RecordWriter<String, String> {
+ @Override
+ public void write(String key, String value) throws IOException {}
+
+ @Override
+ public void close(Reporter reporter) throws IOException {}
+ }
+
+ @Override
+ public org.apache.hadoop.mapred.RecordWriter<String, String>
getRecordWriter(
+ FileSystem ignored, JobConf job, String name, Progressable progress)
throws IOException {
+ // check work output path is not null
+ Path workOutputPath =
org.apache.hadoop.mapred.FileOutputFormat.getWorkOutputPath(job);
+ assertNotNull(workOutputPath);
+ return new NoOpRecordWriter();
+ }
+ }
+
+ // NewAPI OutputFormat class that reads the default work file while creating
recordWriters
+ public static class NewAPI_WorkOutputPathReadingOutputFormat extends
FileOutputFormat<String, String> {
+ public static class NoOpRecordWriter extends RecordWriter<String, String> {
+ @Override
+ public void write(String key, String value) throws IOException,
InterruptedException {
+ }
+
+ @Override
+ public void close(TaskAttemptContext context) throws IOException,
InterruptedException {
+ }
+ }
+
+ @Override
+ public RecordWriter<String, String> getRecordWriter(TaskAttemptContext
job) throws IOException, InterruptedException {
+ // check default work file is not null
+ Path workOutputPath = getDefaultWorkFile(job, ".foo");
+ assertNotNull(workOutputPath);
+ return new NoOpRecordWriter();
+ }
+ }
+
public static class TestProcessor extends SimpleProcessor {
public TestProcessor(ProcessorContext context) {
super(context);