Repository: tez Updated Branches: refs/heads/master 74d04a48a -> 842abc17e
TEZ-1312. rename vertex.addInput/Output() to vertex.addDataSource/Sink() (Chen He via bikas) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/842abc17 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/842abc17 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/842abc17 Branch: refs/heads/master Commit: 842abc17e43c464b35576400230f96757ae503c7 Parents: 74d04a4 Author: Bikas Saha <[email protected]> Authored: Thu Jul 24 16:22:50 2014 -0700 Committer: Bikas Saha <[email protected]> Committed: Thu Jul 24 16:22:50 2014 -0700 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../java/org/apache/tez/dag/api/Vertex.java | 43 ++++++++++---------- .../org/apache/tez/dag/api/VertexGroup.java | 8 ++-- .../org/apache/tez/dag/api/TestDAGVerify.java | 18 ++++---- .../tez/dag/app/dag/impl/TestDAGImpl.java | 4 +- .../tez/dag/history/utils/TestDAGUtils.java | 6 +-- .../mapreduce/examples/FilterLinesByWord.java | 4 +- .../examples/FilterLinesByWordOneToOne.java | 4 +- .../mapreduce/examples/IntersectDataGen.java | 6 +-- .../mapreduce/examples/IntersectExample.java | 6 +-- .../mapreduce/examples/IntersectValidate.java | 4 +- .../tez/mapreduce/examples/UnionExample.java | 12 +++--- .../tez/mapreduce/examples/WordCount.java | 4 +- .../apache/tez/mapreduce/hadoop/MRHelpers.java | 6 +-- .../org/apache/tez/mapreduce/input/MRInput.java | 2 +- .../org/apache/tez/test/TestDAGRecovery.java | 2 +- .../org/apache/tez/test/TestDAGRecovery2.java | 2 +- 17 files changed, 68 insertions(+), 65 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/842abc17/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index e7a67db..5ebb28d 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -30,6 +30,8 @@ INCOMPATIBLE CHANGES TezRuntimeConfiguration (bikas) TEZ-1134. InputInitializer and OutputCommitter implicitly use payloads of the input and output (bikas) + TEZ-1312. rename vertex.addInput/Output() to vertex.addDataSource/Sink() + (Chen He via bikas) Release 0.4.0-incubating: 2014-04-05 http://git-wip-us.apache.org/repos/asf/tez/blob/842abc17/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java b/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java index 4dca9ef..664df46 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java @@ -213,8 +213,8 @@ public class Vertex { } /** - * Specifies an Input for a Vertex. This is meant to be used when a Vertex - * reads Input directly from an external source </p> + * Specifies an external data source for a Vertex. This is meant to be used + * when a Vertex reads Input directly from an external source </p> * * For vertices which read data generated by another vertex - use the * {@link DAG addEdge} method. @@ -223,10 +223,10 @@ public class Vertex { * also from an external source, a combination of this API and the DAG.addEdge * API can be used. </p> * - * Note: If more than one RootInput exists on a vertex, which generates events which need to be - * routed, or generates information to set parallelism, a custom vertex manager should be setup - * to handle this. Not using a custom vertex manager for such a scenario will lead to a - * runtime failure. + * Note: If more than one RootInput exists on a vertex, which generates events + * which need to be routed, or generates information to set parallelism, a + * custom vertex manager should be setup to handle this. Not using a custom + * vertex manager for such a scenario will lead to a runtime failure. * * @param inputName * the name of the input. This will be used when accessing the input @@ -244,7 +244,7 @@ public class Vertex { * vertex parallelism should be set to -1. Can be null. * @return this Vertex */ - public Vertex addInput(String inputName, InputDescriptor inputDescriptor, + public Vertex addDataSource(String inputName, InputDescriptor inputDescriptor, @Nullable InputInitializerDescriptor inputInitializerDescriptor) { additionalInputs .add(new RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>( @@ -253,8 +253,8 @@ public class Vertex { } /** - * Specifies an Output for a Vertex. This is meant to be used when a Vertex - * writes Output directly to an external destination. </p> + * Specifies an external data sink for a Vertex. This is meant to be used when + * a Vertex writes Output directly to an external destination. </p> * * If an output of the vertex is meant to be consumed by another Vertex in the * DAG - use the {@link DAG addEdge} method. @@ -267,19 +267,20 @@ public class Vertex { * the name of the output. This will be used when accessing the * output in the {@link LogicalIOProcessor} * @param outputDescriptor - * @param outputCommitterDescriptor Specify committer to be used for the output - * Can be null. After all tasks in the vertex (or in the DAG) have - * completed, the committer (if specified) is invoked to commit the - * outputs. Commit is a data sink specific operation that usually - * determines the visibility of the output to external observers. - * E.g. moving output files from temporary dirs to the real output - * dir. When there are multiple executions of a task, the commit - * process also helps decide which execution will be included in the - * final output. Users should consider whether their application or - * data sink need a commit operation. + * @param outputCommitterDescriptor + * Specify committer to be used for the output Can be null. After all + * tasks in the vertex (or in the DAG) have completed, the committer + * (if specified) is invoked to commit the outputs. Commit is a data + * sink specific operation that usually determines the visibility of + * the output to external observers. E.g. moving output files from + * temporary dirs to the real output dir. When there are multiple + * executions of a task, the commit process also helps decide which + * execution will be included in the final output. Users should + * consider whether their application or data sink need a commit + * operation. * @return this Vertex */ - public Vertex addOutput(String outputName, OutputDescriptor outputDescriptor, + public Vertex addDataSink(String outputName, OutputDescriptor outputDescriptor, @Nullable OutputCommitterDescriptor outputCommitterDescriptor) { additionalOutputs .add(new RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor>( @@ -287,7 +288,7 @@ public class Vertex { return this; } - Vertex addAdditionalOutput(RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor> output) { + Vertex addAdditionalDataSink(RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor> output) { additionalOutputs.add(output); return this; } http://git-wip-us.apache.org/repos/asf/tez/blob/842abc17/tez-api/src/main/java/org/apache/tez/dag/api/VertexGroup.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/VertexGroup.java b/tez-api/src/main/java/org/apache/tez/dag/api/VertexGroup.java index 0952ab3..991350b 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/VertexGroup.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/VertexGroup.java @@ -87,10 +87,10 @@ public class VertexGroup { } /** - * Add an common output to the group of vertices. - * Refer to {@link Vertex#addOutput(String, OutputDescriptor, OutputCommitterDescriptor)} + * Add an common data sink to the group of vertices. + * Refer to {@link Vertex#addDataSink(String, OutputDescriptor, OutputCommitterDescriptor)} */ - public VertexGroup addOutput(String outputName, OutputDescriptor outputDescriptor, + public VertexGroup addDataSink(String outputName, OutputDescriptor outputDescriptor, @Nullable OutputCommitterDescriptor committerDescriptor) { RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor> leafOutput = new RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor>(outputName, @@ -99,7 +99,7 @@ public class VertexGroup { // also add output to its members for (Vertex member : getMembers()) { - member.addAdditionalOutput(leafOutput); + member.addAdditionalDataSink(leafOutput); } return this; http://git-wip-us.apache.org/repos/asf/tez/blob/842abc17/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java ---------------------------------------------------------------------- diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java index cd3414b..91c04fc 100644 --- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java +++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java @@ -482,7 +482,7 @@ public class TestDAGVerify { new ProcessorDescriptor("MapProcessor"), dummyTaskCount, dummyTaskResource); - v2.addInput("v1", new InputDescriptor(), null); + v2.addDataSource("v1", new InputDescriptor(), null); Edge e1 = new Edge(v1, v2, new EdgeProperty(DataMovementType.SCATTER_GATHER, @@ -506,7 +506,7 @@ public class TestDAGVerify { new ProcessorDescriptor("MapProcessor"), dummyTaskCount, dummyTaskResource); - v1.addOutput("v2", new OutputDescriptor(), null); + v1.addDataSink("v2", new OutputDescriptor(), null); Edge e1 = new Edge(v1, v2, new EdgeProperty(DataMovementType.SCATTER_GATHER, @@ -530,7 +530,7 @@ public class TestDAGVerify { new ProcessorDescriptor("MapProcessor"), dummyTaskCount, dummyTaskResource); - v1.addOutput("v2", new OutputDescriptor(), null); + v1.addDataSink("v2", new OutputDescriptor(), null); DAG dag = new DAG("testDag"); dag.addVertex(v1); @@ -547,7 +547,7 @@ public class TestDAGVerify { new ProcessorDescriptor("MapProcessor"), dummyTaskCount, dummyTaskResource); - v1.addInput("v2", new InputDescriptor(), null); + v1.addDataSource("v2", new InputDescriptor(), null); DAG dag = new DAG("testDag"); dag.addVertex(v1); @@ -606,7 +606,7 @@ public class TestDAGVerify { DAG dag = new DAG("testDag"); VertexGroup uv12 = dag.createVertexGroup("uv12", v1, v2); OutputDescriptor outDesc = new OutputDescriptor(); - uv12.addOutput("uvOut", outDesc, null); + uv12.addDataSink("uvOut", outDesc, null); GroupInputEdge e1 = new GroupInputEdge(uv12, v3, new EdgeProperty(DataMovementType.SCATTER_GATHER, @@ -663,7 +663,7 @@ public class TestDAGVerify { String groupName1 = "uv12"; VertexGroup uv12 = dag.createVertexGroup(groupName1, v1, v2); OutputDescriptor outDesc = new OutputDescriptor(); - uv12.addOutput("uvOut", outDesc, null); + uv12.addDataSink("uvOut", outDesc, null); String groupName2 = "uv23"; VertexGroup uv23 = dag.createVertexGroup(groupName2, v2, v3); @@ -745,7 +745,7 @@ public class TestDAGVerify { String groupName1 = "uv12"; VertexGroup uv12 = dag.createVertexGroup(groupName1, v1, v2); OutputDescriptor outDesc = new OutputDescriptor(); - uv12.addOutput("uvOut", outDesc, null); + uv12.addDataSink("uvOut", outDesc, null); String groupName2 = "uv23"; VertexGroup uv23 = dag.createVertexGroup(groupName2, v2, v3); @@ -880,8 +880,8 @@ public class TestDAGVerify { .getBytes()); InputDescriptor inputDescriptor2 = new InputDescriptor("input2").setUserPayload("inputBytes" .getBytes()); - v1.addInput("input1", inputDescriptor1, null); - v1.addInput("input2", inputDescriptor2, null); + v1.addDataSource("input1", inputDescriptor1, null); + v1.addDataSource("input2", inputDescriptor2, null); dag.addVertex(v1); http://git-wip-us.apache.org/repos/asf/tez/blob/842abc17/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java index 0bc42c1..a50771b 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java @@ -360,8 +360,8 @@ public class TestDAGImpl { TotalCountingOutputCommitter.class.getName()); org.apache.tez.dag.api.VertexGroup uv12 = dag.createVertexGroup(groupName1, v1, v2); OutputDescriptor outDesc = new OutputDescriptor("output.class"); - uv12.addOutput("uvOut", outDesc, ocd); - v3.addOutput("uvOut", outDesc, ocd); + uv12.addDataSink("uvOut", outDesc, ocd); + v3.addDataSink("uvOut", outDesc, ocd); GroupInputEdge e1 = new GroupInputEdge(uv12, v3, new EdgeProperty(DataMovementType.SCATTER_GATHER, http://git-wip-us.apache.org/repos/asf/tez/blob/842abc17/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java b/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java index 0b7b395..5f0f1c9 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java @@ -54,7 +54,7 @@ public class TestDAGUtils { org.apache.tez.dag.api.Vertex v1 = new org.apache.tez.dag.api.Vertex("vertex1", new ProcessorDescriptor("Processor").setHistoryText("vertex1 Processor HistoryText"), dummyTaskCount, dummyTaskResource); - v1.addInput("input1", new InputDescriptor("input.class").setHistoryText("input HistoryText"), + v1.addDataSource("input1", new InputDescriptor("input.class").setHistoryText("input HistoryText"), null); org.apache.tez.dag.api.Vertex v2 = new org.apache.tez.dag.api.Vertex("vertex2", new ProcessorDescriptor("Processor").setHistoryText("vertex2 Processor HistoryText"), @@ -69,8 +69,8 @@ public class TestDAGUtils { OutputDescriptor outDesc = new OutputDescriptor("output.class") .setHistoryText("uvOut HistoryText"); OutputCommitterDescriptor ocd = new OutputCommitterDescriptor(OutputCommitter.class.getName()); - uv12.addOutput("uvOut", outDesc, ocd); - v3.addOutput("uvOut", outDesc, ocd); + uv12.addDataSink("uvOut", outDesc, ocd); + v3.addDataSink("uvOut", outDesc, ocd); GroupInputEdge e1 = new GroupInputEdge(uv12, v3, new EdgeProperty(DataMovementType.SCATTER_GATHER, http://git-wip-us.apache.org/repos/asf/tez/blob/842abc17/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java index 3a26171..9588c72 100644 --- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java +++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java @@ -199,7 +199,7 @@ public class FilterLinesByWord extends Configured implements Tool { // Configure the Input for stage1 Class<? extends TezRootInputInitializer> initializerClazz = generateSplitsInClient ? null : MRInputAMSplitGenerator.class; - stage1Vertex.addInput("MRInput", + stage1Vertex.addDataSource("MRInput", new InputDescriptor(MRInputLegacy.class.getName()) .setUserPayload(MRHelpers.createMRInputPayload(stage1Payload, null)), (initializerClazz==null ? null : new InputInitializerDescriptor(initializerClazz.getName()))); @@ -215,7 +215,7 @@ public class FilterLinesByWord extends Configured implements Tool { OutputDescriptor od = new OutputDescriptor(MROutput.class.getName()) .setUserPayload(MRHelpers.createUserPayloadFromConf(stage2Conf)); OutputCommitterDescriptor ocd = new OutputCommitterDescriptor(MROutputCommitter.class.getName()); - stage2Vertex.addOutput("MROutput", od, ocd); + stage2Vertex.addDataSink("MROutput", od, ocd); UnorderedUnpartitionedKVEdgeConfigurer edgeConf = UnorderedUnpartitionedKVEdgeConfigurer .newBuilder(Text.class.getName(), TextLongPair.class.getName()).build(); http://git-wip-us.apache.org/repos/asf/tez/blob/842abc17/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java index 53eb590..9143351 100644 --- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java +++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java @@ -185,7 +185,7 @@ public class FilterLinesByWordOneToOne extends Configured implements Tool { // Configure the Input for stage1 Class<? extends TezRootInputInitializer> initializerClazz = generateSplitsInClient ? null : MRInputAMSplitGenerator.class; - stage1Vertex.addInput("MRInput", + stage1Vertex.addDataSource("MRInput", new InputDescriptor(MRInputLegacy.class.getName()) .setUserPayload(MRHelpers.createMRInputPayload(stage1Payload, null)), (initializerClazz==null ? null : new InputInitializerDescriptor(initializerClazz.getName()))); @@ -198,7 +198,7 @@ public class FilterLinesByWordOneToOne extends Configured implements Tool { stage2Vertex.setTaskLocalFiles(commonLocalResources); // Configure the Output for stage2 - stage2Vertex.addOutput("MROutput", + stage2Vertex.addDataSink("MROutput", new OutputDescriptor(MROutput.class.getName()).setUserPayload(MRHelpers .createUserPayloadFromConf(stage2Conf)), new OutputCommitterDescriptor(MROutputCommitter.class.getName())); http://git-wip-us.apache.org/repos/asf/tez/blob/842abc17/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectDataGen.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectDataGen.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectDataGen.java index 58b952b..9ed33f9 100644 --- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectDataGen.java +++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectDataGen.java @@ -213,13 +213,13 @@ public class IntersectDataGen extends Configured implements Tool { Vertex genDataVertex = new Vertex("datagen", new ProcessorDescriptor( GenDataProcessor.class.getName()).setUserPayload(GenDataProcessor.createConfiguration( largeOutSizePerTask, smallOutSizePerTask)), numTasks, MRHelpers.getMapResource(tezConf)); - genDataVertex.addOutput(STREAM_OUTPUT_NAME, + genDataVertex.addDataSink(STREAM_OUTPUT_NAME, new OutputDescriptor(MROutput.class.getName()).setUserPayload(streamOutputPayload), new OutputCommitterDescriptor(MROutputCommitter.class.getName())); - genDataVertex.addOutput(HASH_OUTPUT_NAME, + genDataVertex.addDataSink(HASH_OUTPUT_NAME, new OutputDescriptor(MROutput.class.getName()).setUserPayload(hashOutputPayload), new OutputCommitterDescriptor(MROutputCommitter.class.getName())); - genDataVertex.addOutput(EXPECTED_OUTPUT_NAME, + genDataVertex.addDataSink(EXPECTED_OUTPUT_NAME, new OutputDescriptor(MROutput.class.getName()).setUserPayload(expectedOutputPayload), new OutputCommitterDescriptor(MROutputCommitter.class.getName())); http://git-wip-us.apache.org/repos/asf/tez/blob/842abc17/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectExample.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectExample.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectExample.java index 1353080..c5bf792 100644 --- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectExample.java +++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectExample.java @@ -212,21 +212,21 @@ public class IntersectExample extends Configured implements Tool { // Change the way resources are setup - no MRHelpers Vertex streamFileVertex = new Vertex("partitioner1", new ProcessorDescriptor(ForwardingProcessor.class.getName()), -1, - MRHelpers.getMapResource(tezConf)).addInput("streamfile", + MRHelpers.getMapResource(tezConf)).addDataSource("streamfile", new InputDescriptor(MRInput.class.getName()) .setUserPayload(streamInputPayload), new InputInitializerDescriptor(MRInputAMSplitGenerator.class.getName())); Vertex hashFileVertex = new Vertex("partitioner2", new ProcessorDescriptor( ForwardingProcessor.class.getName()), -1, - MRHelpers.getMapResource(tezConf)).addInput("hashfile", + MRHelpers.getMapResource(tezConf)).addDataSource("hashfile", new InputDescriptor(MRInput.class.getName()) .setUserPayload(hashInputPayload), new InputInitializerDescriptor(MRInputAMSplitGenerator.class.getName())); Vertex intersectVertex = new Vertex("intersect", new ProcessorDescriptor( IntersectProcessor.class.getName()), numPartitions, - MRHelpers.getReduceResource(tezConf)).addOutput("finalOutput", + MRHelpers.getReduceResource(tezConf)).addDataSink("finalOutput", new OutputDescriptor(MROutput.class.getName()) .setUserPayload(finalOutputPayload), new OutputCommitterDescriptor(MROutputCommitter.class.getName())); http://git-wip-us.apache.org/repos/asf/tez/blob/842abc17/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectValidate.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectValidate.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectValidate.java index 0b91efb..585ee63 100644 --- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectValidate.java +++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectValidate.java @@ -212,13 +212,13 @@ public class IntersectValidate extends Configured implements Tool { // Change the way resources are setup - no MRHelpers Vertex lhsVertex = new Vertex(LHS_INPUT_NAME, new ProcessorDescriptor( ForwardingProcessor.class.getName()), -1, - MRHelpers.getMapResource(tezConf)).addInput("lhs", new InputDescriptor( + MRHelpers.getMapResource(tezConf)).addDataSource("lhs", new InputDescriptor( MRInput.class.getName()).setUserPayload(streamInputPayload), new InputInitializerDescriptor(MRInputAMSplitGenerator.class.getName())); Vertex rhsVertex = new Vertex(RHS_INPUT_NAME, new ProcessorDescriptor( ForwardingProcessor.class.getName()), -1, - MRHelpers.getMapResource(tezConf)).addInput("rhs", new InputDescriptor( + MRHelpers.getMapResource(tezConf)).addDataSource("rhs", new InputDescriptor( MRInput.class.getName()).setUserPayload(hashInputPayload), new InputInitializerDescriptor(MRInputAMSplitGenerator.class.getName())); http://git-wip-us.apache.org/repos/asf/tez/blob/842abc17/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java index e2f073f..fdbe187 100644 --- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java +++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java @@ -184,17 +184,17 @@ public class UnionExample { numMaps, MRHelpers.getMapResource(tezConf)); InputInitializerDescriptor iid = new InputInitializerDescriptor(MRInputAMSplitGenerator.class.getName()); - mapVertex1.addInput("MRInput", id, iid); + mapVertex1.addDataSource("MRInput", id, iid); Vertex mapVertex2 = new Vertex("map2", new ProcessorDescriptor( TokenProcessor.class.getName()), numMaps, MRHelpers.getMapResource(tezConf)); - mapVertex2.addInput("MRInput", id, iid); + mapVertex2.addDataSource("MRInput", id, iid); Vertex mapVertex3 = new Vertex("map3", new ProcessorDescriptor( TokenProcessor.class.getName()), numMaps, MRHelpers.getMapResource(tezConf)); - mapVertex3.addInput("MRInput", id, iid); + mapVertex3.addDataSource("MRInput", id, iid); Vertex checkerVertex = new Vertex("checker", new ProcessorDescriptor( @@ -207,14 +207,14 @@ public class UnionExample { .setUserPayload(MROutput.createUserPayload( outputConf, TextOutputFormat.class.getName(), true)); OutputCommitterDescriptor ocd = new OutputCommitterDescriptor(MROutputCommitter.class.getName()); - checkerVertex.addOutput("union", od, ocd); + checkerVertex.addDataSink("union", od, ocd); Configuration allPartsConf = new Configuration(tezConf); allPartsConf.set(FileOutputFormat.OUTDIR, outputPath+"-all-parts"); OutputDescriptor od2 = new OutputDescriptor(MROutput.class.getName()) .setUserPayload(MROutput.createUserPayload( allPartsConf, TextOutputFormat.class.getName(), true)); - checkerVertex.addOutput("all-parts", od2, ocd); + checkerVertex.addDataSink("all-parts", od2, ocd); Configuration partsConf = new Configuration(tezConf); partsConf.set(FileOutputFormat.OUTDIR, outputPath+"-parts"); @@ -223,7 +223,7 @@ public class UnionExample { OutputDescriptor od1 = new OutputDescriptor(MROutput.class.getName()) .setUserPayload(MROutput.createUserPayload( partsConf, TextOutputFormat.class.getName(), true)); - unionVertex.addOutput("parts", od1, ocd); + unionVertex.addDataSink("parts", od1, ocd); OrderedPartitionedKVEdgeConfigurer edgeConf = OrderedPartitionedKVEdgeConfigurer .newBuilder(Text.class.getName(), IntWritable.class.getName(), http://git-wip-us.apache.org/repos/asf/tez/blob/842abc17/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java index 61f5cd9..00fc326 100644 --- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java +++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java @@ -131,12 +131,12 @@ public class WordCount extends Configured implements Tool { Vertex tokenizerVertex = new Vertex("tokenizer", new ProcessorDescriptor( TokenProcessor.class.getName()), -1, MRHelpers.getMapResource(tezConf)); - tokenizerVertex.addInput("MRInput", id, iid); + tokenizerVertex.addDataSource("MRInput", id, iid); Vertex summerVertex = new Vertex("summer", new ProcessorDescriptor( SumProcessor.class.getName()), 1, MRHelpers.getReduceResource(tezConf)); - summerVertex.addOutput("MROutput", od, ocd); + summerVertex.addDataSink("MROutput", od, ocd); OrderedPartitionedKVEdgeConfigurer edgeConf = OrderedPartitionedKVEdgeConfigurer .newBuilder(Text.class.getName(), IntWritable.class.getName(), http://git-wip-us.apache.org/repos/asf/tez/blob/842abc17/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java index 29ea6a6..92dc0c5 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java @@ -983,7 +983,7 @@ public class MRHelpers { InputInitializerDescriptor initClazz) { InputDescriptor id = new InputDescriptor(MRInputLegacy.class.getName()) .setUserPayload(userPayload); - vertex.addInput("MRInput", id, initClazz); + vertex.addDataSource("MRInput", id, initClazz); } /** @@ -998,14 +998,14 @@ public class MRHelpers { public static void addMROutput(Vertex vertex, byte[] userPayload) { OutputDescriptor od = new OutputDescriptor(MROutput.class.getName()) .setUserPayload(userPayload); - vertex.addOutput("MROutput", od, new OutputCommitterDescriptor(MROutputCommitter.class.getName())); + vertex.addDataSink("MROutput", od, new OutputCommitterDescriptor(MROutputCommitter.class.getName())); } @Private public static void addMROutputLegacy(Vertex vertex, byte[] userPayload) { OutputDescriptor od = new OutputDescriptor(MROutputLegacy.class.getName()) .setUserPayload(userPayload); - vertex.addOutput("MROutput", od, new OutputCommitterDescriptor(MROutputCommitter.class.getName())); + vertex.addDataSink("MROutput", od, new OutputCommitterDescriptor(MROutputCommitter.class.getName())); } @SuppressWarnings("unchecked") http://git-wip-us.apache.org/repos/asf/tez/blob/842abc17/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java index eb2ba89..2f7c13e 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java @@ -94,7 +94,7 @@ public class MRInput extends MRInputBase { * the InputFormat will be grouped in the AM based on available * resources, locality etc. This option may be set to true only when * using MRInputAMSplitGenerator as the initializer class in - * {@link Vertex#addInput(String, org.apache.tez.dag.api.InputDescriptor, + * {@link Vertex#addDataSource(String, org.apache.tez.dag.api.InputDescriptor, * org.apache.tez.dag.api.InputInitializerDescriptor)} * @return returns the user payload to be set on the InputDescriptor of * MRInput http://git-wip-us.apache.org/repos/asf/tez/blob/842abc17/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java index b0f937a..ddbec28 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java +++ b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java @@ -173,7 +173,7 @@ public class TestDAGRecovery { @Test(timeout=120000) public void testDelayedInit() throws Exception { DAG dag = SimpleVTestDAG.createDAG("DelayedInitDAG", null); - dag.getVertex("v1").addInput("i1", + dag.getVertex("v1").addDataSource("i1", new InputDescriptor(NoOpInput.class.getName()), new InputInitializerDescriptor(FailingInputInitializer.class.getName())); runDAGAndVerify(dag, State.SUCCEEDED); http://git-wip-us.apache.org/repos/asf/tez/blob/842abc17/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery2.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery2.java b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery2.java index 6b3727a..ca8f00b 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery2.java +++ b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery2.java @@ -180,7 +180,7 @@ public class TestDAGRecovery2 { .toUserPayload()); OutputCommitterDescriptor ocd = new OutputCommitterDescriptor( MultiAttemptDAG.FailingOutputCommitter.class.getName()); - dag.getVertex("v3").addOutput("FailingOutput", od, ocd); + dag.getVertex("v3").addDataSink("FailingOutput", od, ocd); runDAGAndVerify(dag, State.FAILED); }
