TEZ-2162. org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat is not recognized (zjffdu)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/2960fd1c Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/2960fd1c Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/2960fd1c Branch: refs/heads/TEZ-2003 Commit: 2960fd1c2a6823f5fe03fbf993e925fff0de0eb0 Parents: f2e8e01 Author: Jeff Zhang <[email protected]> Authored: Tue Mar 17 10:19:07 2015 +0800 Committer: Jeff Zhang <[email protected]> Committed: Tue Mar 17 10:19:07 2015 +0800 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/tez/mapreduce/output/MROutput.java | 34 +++-- .../tez/mapreduce/output/TestMROutput.java | 147 +++++++++++++++++++ .../output/TestMROutputConfigBuilder.java | 141 ++++++++++++++++++ 4 files changed, 313 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/2960fd1c/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 0afd2b3..bc37de0 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -244,6 +244,7 @@ TEZ-UI CHANGES (TEZ-8): Release 0.5.4: Unreleased ALL CHANGES: + TEZ-2162. org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat is not recognized TEZ-2193. Check returned value from EdgeManagerPlugin before using it TEZ-2133. Secured Impersonation: Failed to delete tez scratch data dir TEZ-2058. Flaky test: TestTezJobs::testInvalidQueueSubmission. http://git-wip-us.apache.org/repos/asf/tez/blob/2960fd1c/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java index 94a3c1f..88c192d 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java @@ -28,8 +28,10 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Public; @@ -45,7 +47,6 @@ import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.tez.client.TezClientUtils; import org.apache.tez.common.TezUtils; import org.apache.tez.common.counters.TaskCounter; import org.apache.tez.common.counters.TezCounter; @@ -114,13 +115,23 @@ public class MROutput extends AbstractLogicalOutput { useNewApi = conf.getBoolean(MRJobConfig.NEW_API_REDUCER_CONFIG, true); try { if (useNewApi) { - this.outputFormat = conf.getClassByName(conf.get(MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR)); + String outputClass = conf.get(MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR); + if (StringUtils.isEmpty(outputClass)) { + throw new TezUncheckedException("no outputFormat setting on Configuration, useNewAPI:" + useNewApi); + } + this.outputFormat = conf.getClassByName(outputClass); Preconditions.checkState(org.apache.hadoop.mapreduce.OutputFormat.class - .isAssignableFrom(this.outputFormat)); + .isAssignableFrom(this.outputFormat), "outputFormat must be assignable from " + + "org.apache.hadoop.mapreduce.OutputFormat"); } else { - this.outputFormat = conf.getClassByName(conf.get("mapred.output.format.class")); + String outputClass = conf.get("mapred.output.format.class"); + if (StringUtils.isEmpty(outputClass)) { + throw new TezUncheckedException("no outputFormat setting on Configuration, useNewAPI:" + useNewApi); + } + this.outputFormat = conf.getClassByName(outputClass); Preconditions.checkState(org.apache.hadoop.mapred.OutputFormat.class - .isAssignableFrom(this.outputFormat)); + .isAssignableFrom(this.outputFormat), "outputFormat must be assignable from " + + "org.apache.hadoop.mapred.OutputFormat"); } } catch (ClassNotFoundException e) { throw new TezUncheckedException(e); @@ -132,7 +143,7 @@ public class MROutput extends AbstractLogicalOutput { private MROutputConfigBuilder setOutputPath(String outputPath) { if (!(org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.class.isAssignableFrom(outputFormat) || FileOutputFormat.class.isAssignableFrom(outputFormat))) { - throw new TezUncheckedException("When setting outputPath the outputFormat must " + + throw new TezUncheckedException("When setting outputPath the outputFormat must " + "be assignable from either org.apache.hadoop.mapred.FileOutputFormat or " + "org.apache.hadoop.mapreduce.lib.output.FileOutputFormat. " + "Otherwise use the non-path config builder." + @@ -311,10 +322,13 @@ public class MROutput extends AbstractLogicalOutput { private TezCounter outputRecordCounter; - private TaskAttemptContext newApiTaskAttemptContext; - private org.apache.hadoop.mapred.TaskAttemptContext oldApiTaskAttemptContext; + @VisibleForTesting + TaskAttemptContext newApiTaskAttemptContext; + @VisibleForTesting + org.apache.hadoop.mapred.TaskAttemptContext oldApiTaskAttemptContext; - private boolean isMapperOutput; + @VisibleForTesting + boolean isMapperOutput; protected OutputCommitter committer; @@ -334,7 +348,7 @@ public class MROutput extends AbstractLogicalOutput { this.jobConf = new JobConf(conf); // Add tokens to the jobConf - in case they are accessed within the RW / OF jobConf.getCredentials().mergeAll(UserGroupInformation.getCurrentUser().getCredentials()); - this.useNewApi = this.jobConf.getUseNewMapper(); + this.useNewApi = this.jobConf.getUseNewReducer(); this.isMapperOutput = jobConf.getBoolean(MRConfig.IS_MAP_PROCESSOR, false); jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, http://git-wip-us.apache.org/repos/asf/tez/blob/2960fd1c/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java new file mode 100644 index 0000000..b898fe0 --- /dev/null +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java @@ -0,0 +1,147 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tez.mapreduce.output; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.tez.common.counters.TezCounters; +import org.apache.tez.dag.api.DataSinkDescriptor; +import org.apache.tez.dag.api.UserPayload; +import org.apache.tez.mapreduce.hadoop.MRConfig; +import org.apache.tez.runtime.api.OutputContext; +import org.junit.Test; + + +public class TestMROutput { + + @Test(timeout = 5000) + public void testNewAPI_TextOutputFormat() throws Exception { + String outputPath = "/tmp/output"; + Configuration conf = new Configuration(); + conf.setBoolean(MRConfig.IS_MAP_PROCESSOR, true); + DataSinkDescriptor dataSink = MROutput + .createConfigBuilder(conf, TextOutputFormat.class, outputPath) + .build(); + + OutputContext outputContext = createMockOutputContext(dataSink.getOutputDescriptor().getUserPayload()); + MROutput output = new MROutput(outputContext, 2); + output.initialize(); + + assertEquals(true, output.isMapperOutput); + assertEquals(true, output.useNewApi); + assertEquals(TextOutputFormat.class, output.newOutputFormat.getClass()); + assertNull(output.oldOutputFormat); + assertNotNull(output.newApiTaskAttemptContext); + assertNull(output.oldApiTaskAttemptContext); + assertNotNull(output.newRecordWriter); + assertNull(output.oldRecordWriter); + assertEquals(FileOutputCommitter.class, output.committer.getClass()); + } + + @Test(timeout = 5000) + public void testOldAPI_TextOutputFormat() throws Exception { + String outputPath = "/tmp/output"; + Configuration conf = new Configuration(); + conf.setBoolean(MRConfig.IS_MAP_PROCESSOR, false); + DataSinkDescriptor dataSink = MROutput + .createConfigBuilder(conf, org.apache.hadoop.mapred.TextOutputFormat.class, outputPath) + .build(); + + OutputContext outputContext = createMockOutputContext(dataSink.getOutputDescriptor().getUserPayload()); + MROutput output = new MROutput(outputContext, 2); + output.initialize(); + + assertEquals(false, output.isMapperOutput); + assertEquals(false, output.useNewApi); + assertEquals(org.apache.hadoop.mapred.TextOutputFormat.class, output.oldOutputFormat.getClass()); + assertNull(output.newOutputFormat); + assertNotNull(output.oldApiTaskAttemptContext); + assertNull(output.newApiTaskAttemptContext); + assertNotNull(output.oldRecordWriter); + assertNull(output.newRecordWriter); + assertEquals(org.apache.hadoop.mapred.FileOutputCommitter.class, output.committer.getClass()); + } + + @Test(timeout = 5000) + public void testNewAPI_SequenceFileOutputFormat() throws Exception { + String outputPath = "/tmp/output"; + JobConf conf = new JobConf(); + conf.setOutputKeyClass(NullWritable.class); + conf.setOutputValueClass(Text.class); + DataSinkDescriptor dataSink = MROutput + .createConfigBuilder(conf, SequenceFileOutputFormat.class, outputPath) + .build(); + + OutputContext outputContext = createMockOutputContext(dataSink.getOutputDescriptor().getUserPayload()); + MROutput output = new MROutput(outputContext, 2); + output.initialize(); + assertEquals(true, output.useNewApi); + assertEquals(SequenceFileOutputFormat.class, output.newOutputFormat.getClass()); + assertNull(output.oldOutputFormat); + assertEquals(NullWritable.class, output.newApiTaskAttemptContext.getOutputKeyClass()); + assertEquals(Text.class, output.newApiTaskAttemptContext.getOutputValueClass()); + assertNull(output.oldApiTaskAttemptContext); + assertNotNull(output.newRecordWriter); + assertNull(output.oldRecordWriter); + assertEquals(FileOutputCommitter.class, output.committer.getClass()); + } + + @Test(timeout = 5000) + public void testOldAPI_SequenceFileOutputFormat() throws Exception { + String outputPath = "/tmp/output"; + JobConf conf = new JobConf(); + conf.setOutputKeyClass(NullWritable.class); + conf.setOutputValueClass(Text.class); + DataSinkDescriptor dataSink = MROutput + .createConfigBuilder(conf, org.apache.hadoop.mapred.SequenceFileOutputFormat.class, outputPath) + .build(); + + OutputContext outputContext = createMockOutputContext(dataSink.getOutputDescriptor().getUserPayload()); + MROutput output = new MROutput(outputContext, 2); + output.initialize(); + assertEquals(false, output.useNewApi); + assertEquals(org.apache.hadoop.mapred.SequenceFileOutputFormat.class, output.oldOutputFormat.getClass()); + assertNull(output.newOutputFormat); + assertEquals(NullWritable.class, output.oldApiTaskAttemptContext.getOutputKeyClass()); + assertEquals(Text.class, output.oldApiTaskAttemptContext.getOutputValueClass()); + assertNull(output.newApiTaskAttemptContext); + assertNotNull(output.oldRecordWriter); + assertNull(output.newRecordWriter); + assertEquals(org.apache.hadoop.mapred.FileOutputCommitter.class, output.committer.getClass()); + } + + private OutputContext createMockOutputContext(UserPayload payload) { + OutputContext outputContext = mock(OutputContext.class); + ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); + when(outputContext.getUserPayload()).thenReturn(payload); + when(outputContext.getApplicationId()).thenReturn(appId); + when(outputContext.getTaskVertexIndex()).thenReturn(1); + when(outputContext.getTaskAttemptNumber()).thenReturn(1); + when(outputContext.getCounters()).thenReturn(new TezCounters()); + return outputContext; + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/2960fd1c/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutputConfigBuilder.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutputConfigBuilder.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutputConfigBuilder.java new file mode 100644 index 0000000..b54ec3a --- /dev/null +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutputConfigBuilder.java @@ -0,0 +1,141 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tez.mapreduce.output; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; +import org.apache.tez.dag.api.TezUncheckedException; +import org.apache.tez.mapreduce.hadoop.MRJobConfig; +import org.junit.Test; + +import static org.junit.Assert.*; + + +public class TestMROutputConfigBuilder { + + @Test(timeout = 5000) + public void testNewAPI() { + Configuration conf = new Configuration(); + try { + MROutput.createConfigBuilder(conf, TextOutputFormat.class).build(); + fail(); + } catch (TezUncheckedException e) { + assertEquals("OutputPaths must be specified for OutputFormats based " + + "on org.apache.hadoop.mapreduce.lib.output.FileOutputFormat " + +"or org.apache.hadoop.mapred.FileOutputFormat", e.getMessage()); + } + MROutput.createConfigBuilder(conf, TextOutputFormat.class, "/tmp/output").build(); + + // no outputPaths needs to be specified + MROutput.createConfigBuilder(conf, DBOutputFormat.class).build(); + } + + @Test(timeout = 5000) + public void testNewAPI_ThroughConf() { + Configuration conf = new Configuration(); + try { + MROutput.createConfigBuilder(conf, null).build(); + fail(); + } catch (TezUncheckedException e) { + assertEquals("no outputFormat setting on Configuration, useNewAPI:true", e.getMessage()); + } + + // wrong output_format class + conf.set(MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR, + org.apache.hadoop.mapred.TextOutputFormat.class.getName()); + try { + MROutput.createConfigBuilder(conf, null).build(); + fail(); + } catch (IllegalStateException e) { + assertEquals("outputFormat must be assignable from org.apache.hadoop.mapreduce.OutputFormat", + e.getMessage()); + } + + // correct output_format class, but no output_dir specified + conf.set(MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR, + TextOutputFormat.class.getName()); + try { + MROutput.createConfigBuilder(conf, null).build(); + fail(); + } catch (TezUncheckedException e) { + assertEquals("OutputPaths must be specified for OutputFormats based " + + "on org.apache.hadoop.mapreduce.lib.output.FileOutputFormat " + +"or org.apache.hadoop.mapred.FileOutputFormat", e.getMessage()); + } + + conf.set(org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.OUTDIR, "/tmp/output"); + MROutput.createConfigBuilder(conf, null).build(); + } + + @Test(timeout = 5000 ) + public void testOldAPI() { + Configuration conf = new Configuration(); + try { + MROutput.createConfigBuilder(conf, org.apache.hadoop.mapred.TextOutputFormat.class).build(); + fail(); + } catch (TezUncheckedException e) { + assertEquals("OutputPaths must be specified for OutputFormats based " + + "on org.apache.hadoop.mapreduce.lib.output.FileOutputFormat " + +"or org.apache.hadoop.mapred.FileOutputFormat", e.getMessage()); + } + MROutput.createConfigBuilder(conf, org.apache.hadoop.mapred.TextOutputFormat.class, + "/tmp/output").build(); + + // no outputPaths needs to be specified + MROutput.createConfigBuilder(conf, org.apache.hadoop.mapred.lib.db.DBOutputFormat.class).build(); + } + + @Test(timeout = 5000) + public void testOldAPI_ThroughConf() { + Configuration conf = new Configuration(); + conf.setBoolean(MRJobConfig.NEW_API_REDUCER_CONFIG, false); + try { + MROutput.createConfigBuilder(conf, null).build(); + fail(); + } catch (TezUncheckedException e) { + assertEquals("no outputFormat setting on Configuration, useNewAPI:false", e.getMessage()); + } + + // wrong output_format class + conf.set("mapred.output.format.class", + TextOutputFormat.class.getName()); + try { + MROutput.createConfigBuilder(conf, null).build(); + fail(); + } catch (IllegalStateException e) { + assertEquals("outputFormat must be assignable from org.apache.hadoop.mapred.OutputFormat", + e.getMessage()); + } + + // correct output_format class, but no output_dir specified + conf.set("mapred.output.format.class", + org.apache.hadoop.mapred.TextOutputFormat.class.getName()); + try { + MROutput.createConfigBuilder(conf, null).build(); + fail(); + } catch (TezUncheckedException e) { + assertEquals("OutputPaths must be specified for OutputFormats based " + + "on org.apache.hadoop.mapreduce.lib.output.FileOutputFormat " + +"or org.apache.hadoop.mapred.FileOutputFormat", e.getMessage()); + } + + conf.set(org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.OUTDIR, "/tmp/output"); + MROutput.createConfigBuilder(conf, null).build(); + } +}
