Repository: tez Updated Branches: refs/heads/master 5de0fb123 -> d932579b0
http://git-wip-us.apache.org/repos/asf/tez/blob/d932579b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/MultipleCommitsExample.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/MultipleCommitsExample.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/MultipleCommitsExample.java new file mode 100644 index 0000000..fe7984b --- /dev/null +++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/MultipleCommitsExample.java @@ -0,0 +1,270 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tez.mapreduce.examples; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.BooleanWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.TextOutputFormat; +import org.apache.hadoop.util.ToolRunner; +import org.apache.tez.client.TezClient; +import org.apache.tez.dag.api.DAG; +import org.apache.tez.dag.api.DataSinkDescriptor; +import org.apache.tez.dag.api.GroupInputEdge; +import org.apache.tez.dag.api.InputDescriptor; +import org.apache.tez.dag.api.ProcessorDescriptor; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.UserPayload; +import org.apache.tez.dag.api.Vertex; +import org.apache.tez.dag.api.VertexGroup; +import org.apache.tez.examples.TezExampleBase; +import org.apache.tez.mapreduce.output.MROutput; +import org.apache.tez.mapreduce.processor.SimpleMRProcessor; +import org.apache.tez.runtime.api.ProcessorContext; +import org.apache.tez.runtime.library.api.KeyValueWriter; +import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfig; +import org.apache.tez.runtime.library.input.ConcatenatedMergedKeyValuesInput; +import org.apache.tez.runtime.library.partitioner.HashPartitioner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * v1 -> v3 <br/> + * v2 -> v3 <br/> + * (v1,v2) is connected to v3 as vertex group. <br/> + * (v1,v2) have multiple shared outputs, each of them have its own multiple outputs. + * And v3 also has multiple outputs. </br> + */ +public class MultipleCommitsExample extends TezExampleBase { + + private static final Logger LOG = LoggerFactory.getLogger(MultipleCommitsExample.class); + private static final String UV12OutputNamePrefix = "uv12Output"; + private static final String V1OutputNamePrefix = "v1Output"; + private static final String V2OutputNamePrefix = "v2Output"; + private static final String V3OutputNamePrefix = "v3Output"; + + public static final String CommitOnVertexSuccessOption = "commitOnVertexSuccess"; + + @Override + protected void printUsage() { + System.err.println("Usage: " + + " multiplecommitsExample v1OutputPrefix v1OutputNum v2OutputPrefix v2OutputNum" + + " uv12OutputPrefix uv12OutputNum v3OutputPrefix v3OutputNum" + + " [" + CommitOnVertexSuccessOption + "]" + "(default false)"); + } + + @Override + protected int validateArgs(String[] otherArgs) { + if (otherArgs.length != 8 && otherArgs.length != 9) { + return 2; + } + if (otherArgs.length == 9 && !otherArgs[8].equals(CommitOnVertexSuccessOption)) { + return 2; + } + return 0; + } + + public static class MultipleOutputProcessor extends SimpleMRProcessor { + + MultipleOutputProcessorConfig config; + + public MultipleOutputProcessor(ProcessorContext context) { + super(context); + } + + @Override + public void initialize() throws Exception { + super.initialize(); + config = MultipleOutputProcessorConfig.fromUserPayload(getContext().getUserPayload()); + } + + @Override + public void run() throws Exception { + for (int i=0;i < config.outputNum;++i) { + KeyValueWriter writer = (KeyValueWriter) + getOutputs().get(config.outputNamePrefix+"_" + i).getWriter(); + writer.write(NullWritable.get(), new Text("dummy")); + } + for (int i=0;i < config.sharedOutputNum; ++i) { + KeyValueWriter writer = (KeyValueWriter) + getOutputs().get(config.sharedOutputNamePrefix +"_" + i).getWriter(); + writer.write(NullWritable.get(), new Text("dummy")); + } + } + + public static class MultipleOutputProcessorConfig implements Writable { + + String outputNamePrefix; + int outputNum; + String sharedOutputNamePrefix = null; + int sharedOutputNum; + + public MultipleOutputProcessorConfig(){ + + } + + public MultipleOutputProcessorConfig(String outputNamePrefix, int outputNum) { + this.outputNamePrefix = outputNamePrefix; + this.outputNum = outputNum; + } + + public MultipleOutputProcessorConfig(String outputNamePrefix, int outputNum, + String sharedOutputNamePrefix, int sharedOutputNum) { + this.outputNamePrefix = outputNamePrefix; + this.outputNum = outputNum; + this.sharedOutputNamePrefix = sharedOutputNamePrefix; + this.sharedOutputNum = sharedOutputNum; + } + + @Override + public void write(DataOutput out) throws IOException { + new Text(outputNamePrefix).write(out); + out.writeInt(outputNum); + if (sharedOutputNamePrefix != null) { + new BooleanWritable(true).write(out); + new Text(sharedOutputNamePrefix).write(out); + out.writeInt(sharedOutputNum); + } else { + new BooleanWritable(false).write(out); + } + } + + @Override + public void readFields(DataInput in) throws IOException { + Text outputNameText = new Text(); + outputNameText.readFields(in); + outputNamePrefix = outputNameText.toString(); + outputNum = in.readInt(); + BooleanWritable hasSharedOutputs = new BooleanWritable(); + hasSharedOutputs.readFields(in); + if (hasSharedOutputs.get()) { + Text sharedOutputNamePrefixText = new Text(); + sharedOutputNamePrefixText.readFields(in); + sharedOutputNamePrefix = sharedOutputNamePrefixText.toString(); + sharedOutputNum = in.readInt(); + } + } + + public UserPayload toUserPayload() throws IOException { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + this.write(new DataOutputStream(out)); + return UserPayload.create(ByteBuffer.wrap(out.toByteArray())); + } + + public static MultipleOutputProcessorConfig fromUserPayload(UserPayload payload) + throws IOException { + MultipleOutputProcessorConfig config = new MultipleOutputProcessorConfig(); + config.readFields(new DataInputStream( + new ByteArrayInputStream(payload.deepCopyAsArray()))); + return config; + } + } + } + + @Override + protected int runJob(String[] args, TezConfiguration tezConf, + TezClient tezClient) throws Exception { + boolean commitOnVertexSuccess = + args.length == 5 && args[4].equals(CommitOnVertexSuccessOption) ? true : false; + DAG dag = createDAG(tezConf, args[0], Integer.parseInt(args[1]), + args[2], Integer.parseInt(args[3]), + args[4], Integer.parseInt(args[5]), + args[6], Integer.parseInt(args[7]), + commitOnVertexSuccess); + LOG.info("Running MultipleCommitsExample"); + return runDag(dag, false, LOG); + } + + private DAG createDAG(TezConfiguration tezConf, + String v1OutputPathPrefix, int v1OutputNum, String v2OutputPathPrefix, int v2OutputNum, + String uv12OutputPathPrefix, int uv12OutputNum, + String v3OutputPathPrefix, int v3OutputNum, boolean commitOnVertexSuccess) throws IOException { + DAG dag = DAG.create("multipleCommitsDAG"); + dag.setConf(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, !commitOnVertexSuccess + ""); + Vertex v1 = Vertex.create("v1", ProcessorDescriptor.create(MultipleOutputProcessor.class.getName()) + .setUserPayload( + new MultipleOutputProcessor.MultipleOutputProcessorConfig( + V1OutputNamePrefix, v1OutputNum, UV12OutputNamePrefix, uv12OutputNum) + .toUserPayload()), 2); + Vertex v2 = Vertex.create("v2", ProcessorDescriptor.create(MultipleOutputProcessor.class.getName()) + .setUserPayload( + new MultipleOutputProcessor.MultipleOutputProcessorConfig( + V2OutputNamePrefix, v2OutputNum, UV12OutputNamePrefix, uv12OutputNum) + .toUserPayload()), 2); + // add data sinks for v1 + for (int i=0;i<v1OutputNum;++i) { + DataSinkDescriptor sink = MROutput.createConfigBuilder( + new Configuration(tezConf), TextOutputFormat.class, v1OutputPathPrefix + "_" + i).build(); + v1.addDataSink(V1OutputNamePrefix + "_" + i, sink); + } + // add data sinks for v2 + for (int i=0;i<v2OutputNum;++i) { + DataSinkDescriptor sink = MROutput.createConfigBuilder( + new Configuration(tezConf), TextOutputFormat.class, v2OutputPathPrefix + "_" + i).build(); + v2.addDataSink(V2OutputNamePrefix + "_" + i, sink); + } + // add data sinks for (v1,v2) + VertexGroup uv12 = dag.createVertexGroup("uv12", v1,v2); + for (int i=0;i<uv12OutputNum;++i) { + DataSinkDescriptor sink = MROutput.createConfigBuilder( + new Configuration(tezConf), TextOutputFormat.class, uv12OutputPathPrefix + "_" + i).build(); + uv12.addDataSink(UV12OutputNamePrefix + "_" + i, sink); + } + + Vertex v3 = Vertex.create("v3", ProcessorDescriptor.create(MultipleOutputProcessor.class.getName()) + .setUserPayload( + new MultipleOutputProcessor.MultipleOutputProcessorConfig(V3OutputNamePrefix, v3OutputNum) + .toUserPayload()), 2); + // add data sinks for v3 + for (int i=0;i<v3OutputNum;++i) { + DataSinkDescriptor sink = MROutput.createConfigBuilder( + new Configuration(tezConf), TextOutputFormat.class, v3OutputPathPrefix + "_" + i).build(); + v3.addDataSink(V3OutputNamePrefix + "_" + i, sink); + } + + OrderedPartitionedKVEdgeConfig edgeConfig = + OrderedPartitionedKVEdgeConfig.newBuilder( + NullWritable.class.getName(), Text.class.getName(), HashPartitioner.class.getName()) + .setFromConfiguration(tezConf) + .build(); + GroupInputEdge edge = GroupInputEdge.create(uv12, v3, edgeConfig.createDefaultEdgeProperty(), + InputDescriptor.create( + ConcatenatedMergedKeyValuesInput.class.getName())); + dag.addVertex(v1) + .addVertex(v2) + .addVertex(v3) + .addEdge(edge); + return dag; + } + + public static void main(String[] args) throws Exception { + int res = ToolRunner.run(new MultipleCommitsExample(), args); + System.exit(res); + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/d932579b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java index 3fe249c..db212fa 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java +++ b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java @@ -33,12 +33,14 @@ import java.util.BitSet; import java.util.EnumSet; import java.util.HashSet; import java.util.List; +import java.util.Random; import java.util.Set; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -72,6 +74,7 @@ import org.apache.tez.examples.JoinDataGen; import org.apache.tez.examples.HashJoinExample; import org.apache.tez.examples.JoinValidate; import org.apache.tez.examples.SortMergeJoinExample; +import org.apache.tez.mapreduce.examples.MultipleCommitsExample; import org.apache.tez.runtime.api.Event; import org.apache.tez.runtime.api.InputInitializer; import org.apache.tez.runtime.api.InputInitializerContext; @@ -820,6 +823,81 @@ public class TestTezJobs { } } + @Test(timeout = 60000) + public void testMultipleCommits_OnDAGSuccess() throws Exception { + Path stagingDirPath = new Path("/tmp/commit-staging-dir"); + Random rand = new Random(); + String v1OutputPathPrefix = "/tmp/commit-output-v1"; + int v1OutputNum = rand.nextInt(10) + 1; + String v2OutputPathPrefix = "/tmp/commit-output-v2"; + int v2OutputNum = rand.nextInt(10) + 1; + String uv12OutputPathPrefix = "/tmp/commit-output-uv12"; + int uv12OutputNum = rand.nextInt(10) + 1; + String v3OutputPathPrefix = "/tmp/commit-output-v3"; + int v3OutputNum = rand.nextInt(10) + 1; + TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig()); + tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDirPath.toString()); + TezClient tezSession = null; + + try { + MultipleCommitsExample job = new MultipleCommitsExample(); + Assert.assertTrue("MultipleCommitsExample failed", job.run(tezConf, + new String[]{ v1OutputPathPrefix, v1OutputNum + "", v2OutputPathPrefix, v2OutputNum + "", + uv12OutputPathPrefix, uv12OutputNum + "", v3OutputPathPrefix, v3OutputNum + ""}, null)==0); + verifyCommits(v1OutputPathPrefix, v1OutputNum); + verifyCommits(v2OutputPathPrefix, v2OutputNum); + verifyCommits(uv12OutputPathPrefix, uv12OutputNum); + verifyCommits(v3OutputPathPrefix, v3OutputNum); + } finally { + remoteFs.delete(stagingDirPath, true); + if (tezSession != null) { + tezSession.stop(); + } + } + } + + @Test(timeout = 60000) + public void testMultipleCommits_OnVertexSuccess() throws Exception { + Path stagingDirPath = new Path("/tmp/commit-staging-dir"); + Random rand = new Random(); + String v1OutputPathPrefix = "/tmp/commit-output-v1"; + int v1OutputNum = rand.nextInt(10) + 1; + String v2OutputPathPrefix = "/tmp/commit-output-v2"; + int v2OutputNum = rand.nextInt(10) + 1; + String uv12OutputPathPrefix = "/tmp/commit-output-uv12"; + int uv12OutputNum = rand.nextInt(10) + 1; + String v3OutputPathPrefix = "/tmp/commit-output-v3"; + int v3OutputNum = rand.nextInt(10) + 1; + TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig()); + tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDirPath.toString()); + TezClient tezSession = null; + + try { + MultipleCommitsExample job = new MultipleCommitsExample(); + Assert.assertTrue("MultipleCommitsExample failed", job.run(tezConf, + new String[]{ v1OutputPathPrefix, v1OutputNum + "", v2OutputPathPrefix, v2OutputNum + "", + uv12OutputPathPrefix, uv12OutputNum + "", v3OutputPathPrefix, v3OutputNum + "", + MultipleCommitsExample.CommitOnVertexSuccessOption}, null)==0); + verifyCommits(v1OutputPathPrefix, v1OutputNum); + verifyCommits(v2OutputPathPrefix, v2OutputNum); + verifyCommits(uv12OutputPathPrefix, uv12OutputNum); + verifyCommits(v3OutputPathPrefix, v3OutputNum); + } finally { + remoteFs.delete(stagingDirPath, true); + if (tezSession != null) { + tezSession.stop(); + } + } + } + + private void verifyCommits(String outputPrefix, int outputNum) throws IllegalArgumentException, IOException { + for (int i=0; i< outputNum; ++i) { + String outputDir = outputPrefix + "_" + i; + Assert.assertTrue("Output of " + outputDir + " is not succeeded", + remoteFs.exists(new Path( outputDir + "/_SUCCESS"))); + } + } + private static final String VERTEX_WITH_INITIALIZER_NAME = "VertexWithInitializer"; private static final String EVENT_GENERATING_VERTEX_NAME = "EventGeneratingVertex"; private static final String INPUT1_NAME = "Input1";
