Repository: tez Updated Branches: refs/heads/master fbca9f4c0 -> b5497d7c1
http://git-wip-us.apache.org/repos/asf/tez/blob/b5497d7c/tez-examples/src/main/java/org/apache/tez/examples/WordCount.java ---------------------------------------------------------------------- diff --git a/tez-examples/src/main/java/org/apache/tez/examples/WordCount.java b/tez-examples/src/main/java/org/apache/tez/examples/WordCount.java index 52cff91..ba69077 100644 --- a/tez-examples/src/main/java/org/apache/tez/examples/WordCount.java +++ b/tez-examples/src/main/java/org/apache/tez/examples/WordCount.java @@ -26,6 +26,7 @@ import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; @@ -59,6 +60,11 @@ import org.apache.tez.runtime.library.processor.SimpleProcessor; * of occurrences of a word in a distributed text data set. */ public class WordCount extends Configured implements Tool { + + static String INPUT = "Input"; + static String OUTPUT = "Output"; + static String TOKENIZER = "Tokenizer"; + static String SUMMATION = "Summation"; /* * Example code to write a processor in Tez. @@ -84,8 +90,9 @@ public class WordCount extends Configured implements Tool { // of casting the input/output. This allows the actual input/output type to be replaced // without affecting the semantic guarantees of the data type that are represented by // the reader and writer. - KeyValueReader kvReader = (KeyValueReader) getInputs().values().iterator().next().getReader(); - KeyValueWriter kvWriter = (KeyValueWriter) getOutputs().values().iterator().next().getWriter(); + // The inputs/outputs are referenced via the names assigned in the DAG. + KeyValueReader kvReader = (KeyValueReader) getInputs().get(INPUT).getReader(); + KeyValueWriter kvWriter = (KeyValueWriter) getOutputs().get(SUMMATION).getWriter(); while (kvReader.next()) { StringTokenizer itr = new StringTokenizer(kvReader.getCurrentValue().toString()); while (itr.hasMoreTokens()) { @@ -116,11 +123,11 @@ public class WordCount extends Configured implements Tool { public void run() throws Exception { Preconditions.checkArgument(getInputs().size() == 1); Preconditions.checkArgument(getOutputs().size() == 1); - KeyValueWriter kvWriter = (KeyValueWriter) getOutputs().values().iterator().next().getWriter(); + KeyValueWriter kvWriter = (KeyValueWriter) getOutputs().get(OUTPUT).getWriter(); // The KeyValues reader provides all values for a given key. The aggregation of values per key // is done by the LogicalInput. Since the key is the word and the values are its counts in // the different TokenProcessors, summing all values per key provides the sum for that word. - KeyValuesReader kvReader = (KeyValuesReader) getInputs().values().iterator().next().getReader(); + KeyValuesReader kvReader = (KeyValuesReader) getInputs().get(TOKENIZER).getReader(); while (kvReader.next()) { Text word = (Text) kvReader.getCurrentKey(); int sum = 0; @@ -150,8 +157,8 @@ public class WordCount extends Configured implements Tool { // Create a vertex that reads the data from the data source and tokenizes it using the // TokenProcessor. The number of tasks that will do the work for this vertex will be decided // using the information provided by the data source descriptor. - Vertex tokenizerVertex = Vertex.create("Tokenizer", ProcessorDescriptor.create( - TokenProcessor.class.getName())).addDataSource("Input", dataSource); + Vertex tokenizerVertex = Vertex.create(TOKENIZER, ProcessorDescriptor.create( + TokenProcessor.class.getName())).addDataSource(INPUT, dataSource); // Create the edge that represents the movement and semantics of data between the producer // Tokenizer vertex and the consumer Summation vertex. In order to perform the summation in @@ -172,9 +179,9 @@ public class WordCount extends Configured implements Tool { // The number of tasks that do the work of this vertex depends on the number of partitions used // to distribute the sum processing. In this case, its been made configurable via the // numPartitions parameter. - Vertex summationVertex = Vertex.create("Summation", + Vertex summationVertex = Vertex.create(SUMMATION, ProcessorDescriptor.create(SumProcessor.class.getName()), numPartitions) - .addDataSink("Output", dataSink); + .addDataSink(OUTPUT, dataSink); // No need to add jar containing this class as assumed to be part of the Tez jars. Otherwise // we would have to add the jars for this code as local files to the vertices. @@ -202,6 +209,8 @@ public class WordCount extends Configured implements Tool { } else { tezConf = new TezConfiguration(); } + + UserGroupInformation.setConfiguration(tezConf); // Create the TezClient to submit the DAG. Pass the tezConf that has all necessary global and // dag specific configurations http://git-wip-us.apache.org/repos/asf/tez/blob/b5497d7c/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java index 736c54b..1fbacdf 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java +++ b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java @@ -51,9 +51,9 @@ import org.apache.tez.dag.api.client.DAGClient; import org.apache.tez.dag.api.client.DAGStatus; import org.apache.tez.examples.OrderedWordCount; import org.apache.tez.examples.SimpleSessionExample; -import org.apache.tez.examples.IntersectDataGen; -import org.apache.tez.examples.IntersectExample; -import org.apache.tez.examples.IntersectValidate; +import org.apache.tez.examples.JoinDataGen; +import org.apache.tez.examples.JoinExample; +import org.apache.tez.examples.JoinValidate; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; @@ -118,7 +118,7 @@ public class TestTezJobs { @Test(timeout = 60000) public void testIntersectExample() throws Exception { - IntersectExample intersectExample = new IntersectExample(); + JoinExample intersectExample = new JoinExample(); intersectExample.setConf(new Configuration(mrrTezCluster.getConfig())); Path stagingDirPath = new Path("/tmp/tez-staging-dir"); Path inPath1 = new Path("/tmp/inPath1"); @@ -192,18 +192,18 @@ public class TestTezJobs { tezSession = TezClient.create("IntersectExampleSession", tezConf); tezSession.start(); - IntersectDataGen dataGen = new IntersectDataGen(); + JoinDataGen dataGen = new JoinDataGen(); String[] dataGenArgs = new String[] { dataPath1.toString(), "1048576", dataPath2.toString(), "524288", expectedOutputPath.toString(), "2" }; assertEquals(0, dataGen.run(tezConf, dataGenArgs, tezSession)); - IntersectExample intersect = new IntersectExample(); + JoinExample intersect = new JoinExample(); String[] intersectArgs = new String[] { dataPath1.toString(), dataPath2.toString(), "2", outPath.toString() }; assertEquals(0, intersect.run(tezConf, intersectArgs, tezSession)); - IntersectValidate intersectValidate = new IntersectValidate(); + JoinValidate intersectValidate = new JoinValidate(); String[] intersectValidateArgs = new String[] { expectedOutputPath.toString(), outPath.toString(), "3" }; assertEquals(0, intersectValidate.run(tezConf, intersectValidateArgs, tezSession));
