TEZ-3215. Support for MultipleOutputs. (mingma)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/a328d469 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/a328d469 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/a328d469 Branch: refs/heads/TEZ-3334 Commit: a328d469d3de53eae087aee62b10140531a87722 Parents: a2f8cc3 Author: Ming Ma <[email protected]> Authored: Sun Oct 30 08:34:48 2016 -0700 Committer: Ming Ma <[email protected]> Committed: Sun Oct 30 08:34:48 2016 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../tez/mapreduce/hadoop/MRJobConfig.java | 6 + .../apache/tez/mapreduce/output/MROutput.java | 138 +++++++++---- .../tez/mapreduce/output/MultiMROutput.java | 203 +++++++++++++++++++ .../tez/mapreduce/output/TestMultiMROutput.java | 193 ++++++++++++++++++ .../library/api/KeyValueWriterWithBasePath.java | 49 +++++ 6 files changed, 546 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/a328d469/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 83e0b59..b4beb80 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3215. Support for MultipleOutputs. TEZ-3097. Flaky test: TestCommit.testDAGCommitStartedEventFail_OnDAGSuccess. TEZ-3487. Improvements in travis yml file to get builds to work. TEZ-3405. Support ability for AM to kill itself if there is no client heartbeating to it. http://git-wip-us.apache.org/repos/asf/tez/blob/a328d469/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java index 7db98bc..02c74b2 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java @@ -132,6 +132,12 @@ public interface MRJobConfig { public static final String MAPREDUCE_JOB_USER_CLASSPATH_FIRST = "mapreduce.job.user.classpath.first"; + public static String LAZY_OUTPUTFORMAT_OUTPUTFORMAT = + "mapreduce.output.lazyoutputformat.outputformat"; + + public static String FILEOUTPUTFORMAT_BASE_OUTPUT_NAME = + "mapreduce.output.basename"; + public static final String IO_SORT_FACTOR = "mapreduce.task.io.sort.factor"; public static final String IO_SORT_MB = "mapreduce.task.io.sort.mb"; http://git-wip-us.apache.org/repos/asf/tez/blob/a328d469/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java index 043085d..6ed70c5 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java @@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat; import org.apache.tez.runtime.library.api.IOInterruptedException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,6 +45,7 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobContext; import org.apache.hadoop.mapred.TaskAttemptID; import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; @@ -73,14 +75,14 @@ import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; * {@link MROutput} is an {@link Output} which allows key/values pairs * to be written by a processor. * - * It is compatible with all standard Apache Hadoop MapReduce + * It is compatible with all standard Apache Hadoop MapReduce * OutputFormat implementations. - * + * * This class is not meant to be extended by external projects. */ @Public public class MROutput extends AbstractLogicalOutput { - + /** * Helper class to configure {@link MROutput} * @@ -94,18 +96,36 @@ public class MROutput extends AbstractLogicalOutput { String outputClassName = MROutput.class.getName(); String outputPath; boolean doCommit = true; - - private MROutputConfigBuilder(Configuration conf, Class<?> outputFormatParam) { + + private MROutputConfigBuilder(Configuration conf, + Class<?> outputFormatParam, boolean useLazyOutputFormat) { this.conf = conf; if (outputFormatParam != null) { outputFormatProvided = true; - this.outputFormat = outputFormatParam; - if (org.apache.hadoop.mapred.OutputFormat.class.isAssignableFrom(outputFormatParam)) { + if (org.apache.hadoop.mapred.OutputFormat.class.isAssignableFrom( + outputFormatParam)) { useNewApi = false; - } else if (org.apache.hadoop.mapreduce.OutputFormat.class.isAssignableFrom(outputFormatParam)) { + if (!useLazyOutputFormat) { + this.outputFormat = outputFormatParam; + } else { + conf.setClass(MRJobConfig.LAZY_OUTPUTFORMAT_OUTPUTFORMAT, + outputFormatParam, + org.apache.hadoop.mapred.OutputFormat.class); + this.outputFormat = + org.apache.hadoop.mapred.lib.LazyOutputFormat.class; + } + } else if (OutputFormat.class.isAssignableFrom(outputFormatParam)) { useNewApi = true; + if (!useLazyOutputFormat) { + this.outputFormat = outputFormatParam; + } else { + conf.setClass(MRJobConfig.LAZY_OUTPUTFORMAT_OUTPUTFORMAT, + outputFormatParam, OutputFormat.class); + this.outputFormat = LazyOutputFormat.class; + } } else { - throw new TezUncheckedException("outputFormat must be assignable from either " + + throw new TezUncheckedException( + "outputFormat must be assignable from either " + "org.apache.hadoop.mapred.OutputFormat or " + "org.apache.hadoop.mapreduce.OutputFormat" + " Given: " + outputFormatParam.getName()); @@ -145,8 +165,21 @@ public class MROutput extends AbstractLogicalOutput { } private MROutputConfigBuilder setOutputPath(String outputPath) { - if (!(org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.class.isAssignableFrom(outputFormat) || - FileOutputFormat.class.isAssignableFrom(outputFormat))) { + boolean passNewLazyOutputFormatCheck = + (LazyOutputFormat.class.isAssignableFrom(outputFormat)) && + org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.class. + isAssignableFrom(conf.getClass( + MRJobConfig.LAZY_OUTPUTFORMAT_OUTPUTFORMAT, null)); + boolean passOldLazyOutputFormatCheck = + (org.apache.hadoop.mapred.lib.LazyOutputFormat.class. + isAssignableFrom(outputFormat)) && + FileOutputFormat.class.isAssignableFrom(conf.getClass( + MRJobConfig.LAZY_OUTPUTFORMAT_OUTPUTFORMAT, null)); + + if (!(org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.class. + isAssignableFrom(outputFormat) || + FileOutputFormat.class.isAssignableFrom(outputFormat) || + passNewLazyOutputFormatCheck || passOldLazyOutputFormatCheck)) { throw new TezUncheckedException("When setting outputPath the outputFormat must " + "be assignable from either org.apache.hadoop.mapred.FileOutputFormat or " + "org.apache.hadoop.mapreduce.lib.output.FileOutputFormat. " + @@ -277,7 +310,12 @@ public class MROutput extends AbstractLogicalOutput { */ public static MROutputConfigBuilder createConfigBuilder(Configuration conf, @Nullable Class<?> outputFormat) { - return new MROutputConfigBuilder(conf, outputFormat); + return createConfigBuilder(conf, outputFormat, false); + } + + public static MROutputConfigBuilder createConfigBuilder(Configuration conf, + @Nullable Class<?> outputFormat, boolean useLazyOutputFormat) { + return new MROutputConfigBuilder(conf, outputFormat, useLazyOutputFormat); } /** @@ -298,9 +336,14 @@ public class MROutput extends AbstractLogicalOutput { * @return {@link org.apache.tez.mapreduce.output.MROutput.MROutputConfigBuilder} */ public static MROutputConfigBuilder createConfigBuilder(Configuration conf, - @Nullable Class<?> outputFormat, - @Nullable String outputPath) { - MROutputConfigBuilder configurer = new MROutputConfigBuilder(conf, outputFormat); + @Nullable Class<?> outputFormat, @Nullable String outputPath) { + return createConfigBuilder(conf, outputFormat, outputPath, false); + } + + public static MROutputConfigBuilder createConfigBuilder(Configuration conf, + @Nullable Class<?> outputFormat, @Nullable String outputPath, + boolean useLazyOutputFormat) { + MROutputConfigBuilder configurer = createConfigBuilder(conf, outputFormat, useLazyOutputFormat); if (outputPath != null) { configurer.setOutputPath(outputPath); } @@ -312,9 +355,9 @@ public class MROutput extends AbstractLogicalOutput { private final NumberFormat taskNumberFormat = NumberFormat.getInstance(); private final NumberFormat nonTaskNumberFormat = NumberFormat.getInstance(); - private JobConf jobConf; + protected JobConf jobConf; boolean useNewApi; - private AtomicBoolean flushed = new AtomicBoolean(false); + protected AtomicBoolean flushed = new AtomicBoolean(false); @SuppressWarnings("rawtypes") org.apache.hadoop.mapreduce.OutputFormat newOutputFormat; @@ -326,7 +369,7 @@ public class MROutput extends AbstractLogicalOutput { @SuppressWarnings("rawtypes") org.apache.hadoop.mapred.RecordWriter oldRecordWriter; - private TezCounter outputRecordCounter; + protected TezCounter outputRecordCounter; @VisibleForTesting TaskAttemptContext newApiTaskAttemptContext; @@ -344,6 +387,12 @@ public class MROutput extends AbstractLogicalOutput { @Override public List<Event> initialize() throws IOException, InterruptedException { + List<Event> events = initializeBase(); + initWriter(); + return events; + } + + protected List<Event> initializeBase() throws IOException, InterruptedException { getContext().requestInitialMemory(0l, null); //mandatory call taskNumberFormat.setMinimumIntegerDigits(5); taskNumberFormat.setGroupingUsed(false); @@ -373,18 +422,18 @@ public class MROutput extends AbstractLogicalOutput { taskAttemptId.getTaskID().getId()); jobConf.set(JobContext.ID, taskAttemptId.getJobID().toString()); - if (useNewApi) { - // set the output part name to have a unique prefix - if (jobConf.get("mapreduce.output.basename") == null) { - jobConf.set("mapreduce.output.basename", getOutputFileNamePrefix()); - } - } - String outputFormatClassName; - outputRecordCounter = getContext().getCounters().findCounter(TaskCounter.OUTPUT_RECORDS); + outputRecordCounter = getContext().getCounters().findCounter( + TaskCounter.OUTPUT_RECORDS); if (useNewApi) { + // set the output part name to have a unique prefix + if (jobConf.get(MRJobConfig.FILEOUTPUTFORMAT_BASE_OUTPUT_NAME) == null) { + jobConf.set(MRJobConfig.FILEOUTPUTFORMAT_BASE_OUTPUT_NAME, + getOutputFileNamePrefix()); + } + newApiTaskAttemptContext = createTaskAttemptContext(taskAttemptId); try { newOutputFormat = @@ -396,13 +445,6 @@ public class MROutput extends AbstractLogicalOutput { } initCommitter(jobConf, useNewApi); - - try { - newRecordWriter = - newOutputFormat.getRecordWriter(newApiTaskAttemptContext); - } catch (InterruptedException e) { - throw new IOException("Interrupted while creating record writer", e); - } } else { oldApiTaskAttemptContext = new org.apache.tez.mapreduce.hadoop.mapred.TaskAttemptContextImpl( @@ -412,13 +454,6 @@ public class MROutput extends AbstractLogicalOutput { outputFormatClassName = oldOutputFormat.getClass().getName(); initCommitter(jobConf, useNewApi); - - FileSystem fs = FileSystem.get(jobConf); - String finalName = getOutputName(); - - oldRecordWriter = - oldOutputFormat.getRecordWriter( - fs, jobConf, finalName, new MRReporter(getContext().getCounters())); } LOG.info(getContext().getDestinationVertexName() + ": " @@ -427,6 +462,22 @@ public class MROutput extends AbstractLogicalOutput { return null; } + private void initWriter() throws IOException { + if (useNewApi) { + try { + newRecordWriter = + newOutputFormat.getRecordWriter(newApiTaskAttemptContext); + } catch (InterruptedException e) { + throw new IOException("Interrupted while creating record writer", e); + } + } else { + FileSystem fs = FileSystem.get(jobConf); + String finalName = getOutputName(getOutputFileNamePrefix()); + oldRecordWriter = oldOutputFormat.getRecordWriter( + fs, jobConf, finalName, new MRReporter(getContext().getCounters())); + } + } + @Override public void start() { } @@ -475,7 +526,7 @@ public class MROutput extends AbstractLogicalOutput { isMapperOutput, null); } - private String getOutputFileNamePrefix() { + protected String getOutputFileNamePrefix() { String prefix = jobConf.get(MRJobConfig.MROUTPUT_FILE_NAME_PREFIX); if (prefix == null) { prefix = "part-v" + @@ -485,10 +536,9 @@ public class MROutput extends AbstractLogicalOutput { return prefix; } - private String getOutputName() { + protected String getOutputName(String prefix) { // give a unique prefix to the output name - return getOutputFileNamePrefix() + - "-" + taskNumberFormat.format(getContext().getTaskIndex()); + return prefix + "-" + taskNumberFormat.format(getContext().getTaskIndex()); } /** http://git-wip-us.apache.org/repos/asf/tez/blob/a328d469/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MultiMROutput.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MultiMROutput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MultiMROutput.java new file mode 100644 index 0000000..0bd573b --- /dev/null +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MultiMROutput.java @@ -0,0 +1,203 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.mapreduce.output; + + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.tez.mapreduce.hadoop.MRJobConfig; +import org.apache.tez.runtime.api.Event; +import org.apache.tez.runtime.api.Output; +import org.apache.tez.runtime.api.OutputContext; +import org.apache.tez.runtime.library.api.IOInterruptedException; +import org.apache.tez.runtime.library.api.KeyValueWriterWithBasePath; +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.tez.mapreduce.hadoop.mapred.MRReporter; + +/** + * {@link MultiMROutput} is an {@link Output} which allows key/values pairs + * to be written by a processor to different output files. + * + * It is compatible with all standard Apache Hadoop MapReduce + * OutputFormat implementations. + * + */ +@Public +public class MultiMROutput extends MROutput { + + Map<String, org.apache.hadoop.mapreduce.RecordWriter<?, ?>> + newRecordWriters; + + Map<String, org.apache.hadoop.mapred.RecordWriter<?, ?>> + oldRecordWriters; + + public MultiMROutput(OutputContext outputContext, int numPhysicalOutputs) { + super(outputContext, numPhysicalOutputs); + } + + @Override + public List<Event> initialize() throws IOException, InterruptedException { + List<Event> events = super.initializeBase(); + if (useNewApi) { + newRecordWriters = new HashMap<>(); + } else { + oldRecordWriters = new HashMap<>(); + } + return events; + } + + /** + * Create an + * {@link org.apache.tez.mapreduce.output.MROutput.MROutputConfigBuilder} + * + * @param conf Configuration for the {@link MROutput} + * @param outputFormat FileInputFormat derived class + * @param outputPath Output path + * @return {@link org.apache.tez.mapreduce.output.MROutput.MROutputConfigBuilder} + */ + public static MROutputConfigBuilder createConfigBuilder(Configuration conf, + Class<?> outputFormat, String outputPath, boolean useLazyOutputFormat) { + return MROutput.createConfigBuilder(conf, outputFormat, outputPath, useLazyOutputFormat) + .setOutputClassName(MultiMROutput.class.getName()); + } + + @Override + public KeyValueWriterWithBasePath getWriter() throws IOException { + return new KeyValueWriterWithBasePath() { + + @SuppressWarnings("unchecked") + @Override + public void write(Object key, Object value) throws IOException { + throw new UnsupportedOperationException( + "Write without basePath isn't supported."); + } + + @SuppressWarnings("unchecked") + @Override + public void write(Object key, Object value, String basePath) + throws IOException { + if (basePath == null) { + throw new UnsupportedOperationException( + "Write without basePath isn't supported."); + } + if (basePath.length() > 0 && basePath.charAt(0) == '/' ) { + // The base path can't be absolute path starting with "/". + // Otherwise, it will cause the task temporary files being + // written outside the output committer's task work path. + throw new UnsupportedOperationException( + "Write with absolute basePath isn't supported."); + } + if (useNewApi) { + try { + getNewRecordWriter(newApiTaskAttemptContext, basePath).write( + key, value); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOInterruptedException( + "Interrupted while writing next key-value",e); + } + } else { + getOldRecordWriter(basePath).write(key, value); + } + outputRecordCounter.increment(1); + getContext().notifyProgress(); + } + }; + } + + /** + * Call this in the processor before finishing to ensure outputs that + * outputs have been flushed. Must be called before commit. + * @throws IOException + */ + @Override + public void flush() throws IOException { + if (flushed.getAndSet(true)) { + return; + } + try { + if (useNewApi) { + for (RecordWriter writer : newRecordWriters.values()) { + writer.close(newApiTaskAttemptContext); + } + } else { + for (org.apache.hadoop.mapred.RecordWriter writer : + oldRecordWriters.values()) { + writer.close(null); + } + } + } catch (InterruptedException e) { + throw new IOException("Interrupted while closing record writer", e); + } + } + + @SuppressWarnings("unchecked") + private synchronized RecordWriter getNewRecordWriter( + TaskAttemptContext taskContext, String baseFileName) + throws IOException, InterruptedException { + + // look for record-writer in the cache + RecordWriter writer = newRecordWriters.get(baseFileName); + + // If not in cache, create a new one + if (writer == null) { + // get the record writer from context output format + taskContext.getConfiguration().set( + MRJobConfig.FILEOUTPUTFORMAT_BASE_OUTPUT_NAME, baseFileName); + try { + writer = ((OutputFormat) ReflectionUtils.newInstance( + taskContext.getOutputFormatClass(), taskContext.getConfiguration())) + .getRecordWriter(taskContext); + } catch (ClassNotFoundException e) { + throw new IOException(e); + } + // add the record-writer to the cache + newRecordWriters.put(baseFileName, writer); + } + return writer; + } + + @SuppressWarnings("unchecked") + private synchronized org.apache.hadoop.mapred.RecordWriter + getOldRecordWriter(String baseFileName) throws IOException { + + // look for record-writer in the cache + org.apache.hadoop.mapred.RecordWriter writer = + oldRecordWriters.get(baseFileName); + + // If not in cache, create a new one + if (writer == null) { + FileSystem fs = FileSystem.get(jobConf); + String finalName = getOutputName(baseFileName); + writer = oldOutputFormat.getRecordWriter(fs, jobConf, + finalName, new MRReporter(getContext().getCounters())); + // add the record-writer to the cache + oldRecordWriters.put(baseFileName, writer); + } + return writer; + } +}; http://git-wip-us.apache.org/repos/asf/tez/blob/a328d469/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMultiMROutput.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMultiMROutput.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMultiMROutput.java new file mode 100644 index 0000000..3618e40 --- /dev/null +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMultiMROutput.java @@ -0,0 +1,193 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tez.mapreduce.output; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; +import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.tez.common.counters.TezCounters; +import org.apache.tez.dag.api.DataSinkDescriptor; +import org.apache.tez.dag.api.UserPayload; +import org.apache.tez.mapreduce.hadoop.MRConfig; +import org.apache.tez.runtime.api.OutputContext; +import org.apache.tez.runtime.api.OutputStatisticsReporter; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; + + +public class TestMultiMROutput { + + @Test(timeout = 5000) + public void testNewAPI_TextOutputFormat() throws Exception { + validate(true, TextOutputFormat.class, true, FileOutputCommitter.class, + false); + } + + @Test(timeout = 5000) + public void testOldAPI_TextOutputFormat() throws Exception { + validate(false, org.apache.hadoop.mapred.TextOutputFormat.class, false, + org.apache.hadoop.mapred.FileOutputCommitter.class, false); + } + + @Test(timeout = 5000) + public void testNewAPI_SequenceFileOutputFormat() throws Exception { + validate(true, SequenceFileOutputFormat.class, false, + FileOutputCommitter.class, false); + } + + @Test(timeout = 5000) + public void testOldAPI_SequenceFileOutputFormat() throws Exception { + validate(false, org.apache.hadoop.mapred.SequenceFileOutputFormat.class, + false, org.apache.hadoop.mapred.FileOutputCommitter.class, false); + } + + @Test(timeout = 5000) + public void testNewAPI_LazySequenceFileOutputFormat() throws Exception { + validate(true, SequenceFileOutputFormat.class, false, + FileOutputCommitter.class, true); + } + + @Test(timeout = 5000) + public void testOldAPI_LazySequenceFileOutputFormat() throws Exception { + validate(false, org.apache.hadoop.mapred.SequenceFileOutputFormat.class, + false, org.apache.hadoop.mapred.FileOutputCommitter.class, true); + } + + @Test(timeout = 5000) + public void testNewAPI_LazyTextOutputFormat() throws Exception { + validate(true, TextOutputFormat.class, false, + FileOutputCommitter.class, true); + } + + @Test(timeout = 5000) + public void testOldAPI_LazyTextOutputFormat() throws Exception { + validate(false, org.apache.hadoop.mapred.TextOutputFormat.class, false, + org.apache.hadoop.mapred.FileOutputCommitter.class, true); + } + + @Test(timeout = 5000) + public void testInvalidBasePath() throws Exception { + MultiMROutput outputs = createMROutputs(SequenceFileOutputFormat.class, + false, true); + try { + outputs.getWriter().write(new Text(Integer.toString(0)), + new Text("foo"), "/tmp"); + Assert.assertTrue(false); // should not come here + } catch (UnsupportedOperationException uoe) { + } + } + + private OutputContext createMockOutputContext(UserPayload payload) { + OutputContext outputContext = mock(OutputContext.class); + ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); + when(outputContext.getUserPayload()).thenReturn(payload); + when(outputContext.getApplicationId()).thenReturn(appId); + when(outputContext.getTaskVertexIndex()).thenReturn(1); + when(outputContext.getTaskAttemptNumber()).thenReturn(1); + when(outputContext.getCounters()).thenReturn(new TezCounters()); + when(outputContext.getStatisticsReporter()).thenReturn( + mock(OutputStatisticsReporter.class)); + return outputContext; + } + + private void validate(boolean expectedUseNewAPIValue, Class outputFormat, + boolean isMapper, Class committerClass, boolean useLazyOutputFormat) + throws InterruptedException, IOException { + MultiMROutput output = createMROutputs(outputFormat, isMapper, + useLazyOutputFormat); + + assertEquals(isMapper, output.isMapperOutput); + assertEquals(expectedUseNewAPIValue, output.useNewApi); + if (expectedUseNewAPIValue) { + if (useLazyOutputFormat) { + assertEquals(LazyOutputFormat.class, + output.newOutputFormat.getClass()); + } else { + assertEquals(outputFormat, output.newOutputFormat.getClass()); + } + assertNotNull(output.newApiTaskAttemptContext); + assertNull(output.oldOutputFormat); + assertEquals(Text.class, + output.newApiTaskAttemptContext.getOutputValueClass()); + assertEquals(Text.class, + output.newApiTaskAttemptContext.getOutputKeyClass()); + assertNull(output.oldApiTaskAttemptContext); + assertNotNull(output.newRecordWriters); + assertNull(output.oldRecordWriters); + } else { + if (!useLazyOutputFormat) { + assertEquals(outputFormat, output.oldOutputFormat.getClass()); + } else { + assertEquals(org.apache.hadoop.mapred.lib.LazyOutputFormat.class, + output.oldOutputFormat.getClass()); + } + assertNull(output.newOutputFormat); + assertNotNull(output.oldApiTaskAttemptContext); + assertNull(output.newApiTaskAttemptContext); + assertEquals(Text.class, + output.oldApiTaskAttemptContext.getOutputValueClass()); + assertEquals(Text.class, + output.oldApiTaskAttemptContext.getOutputKeyClass()); + assertNotNull(output.oldRecordWriters); + assertNull(output.newRecordWriters); + } + + assertEquals(committerClass, output.committer.getClass()); + int numOfUniqueKeys = 3; + for (int i=0; i<numOfUniqueKeys; i++) { + output.getWriter().write(new Text(Integer.toString(i)), + new Text("foo"), Integer.toString(i)); + } + output.close(); + if (expectedUseNewAPIValue) { + assertEquals(numOfUniqueKeys, output.newRecordWriters.size()); + } else { + assertEquals(numOfUniqueKeys, output.oldRecordWriters.size()); + } + } + + private MultiMROutput createMROutputs(Class outputFormat, + boolean isMapper, boolean useLazyOutputFormat) + throws InterruptedException, IOException { + String outputPath = "/tmp/output"; + JobConf conf = new JobConf(); + conf.setBoolean(MRConfig.IS_MAP_PROCESSOR, isMapper); + conf.setOutputKeyClass(Text.class); + conf.setOutputValueClass(Text.class); + DataSinkDescriptor dataSink = MultiMROutput.createConfigBuilder( + conf, outputFormat, outputPath, useLazyOutputFormat).build(); + + OutputContext outputContext = createMockOutputContext( + dataSink.getOutputDescriptor().getUserPayload()); + MultiMROutput output = new MultiMROutput(outputContext, 2); + output.initialize(); + return output; + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/a328d469/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValueWriterWithBasePath.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValueWriterWithBasePath.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValueWriterWithBasePath.java new file mode 100644 index 0000000..5446ca6 --- /dev/null +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValueWriterWithBasePath.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.runtime.library.api; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Evolving; +import org.apache.tez.runtime.api.Writer; + +/** + * A key/value(s) pair based {@link Writer} that supports + * output to different files. + */ +@Public +@Evolving +public abstract class KeyValueWriterWithBasePath extends KeyValueWriter { + /** + * Writes a key/value pair. + * + * @param key + * the key to write + * @param value + * the value to write + * @param basePath + * the base path of the output file. + * @throws IOException + * if an error occurs + * @throws {@link IOInterruptedException} if IO was interrupted + */ + public abstract void write(Object key, Object value, String basePath) + throws IOException; +}
