Repository: tez
Updated Branches:
  refs/heads/master 74d04a48a -> 842abc17e


TEZ-1312. rename vertex.addInput/Output() to vertex.addDataSource/Sink() (Chen 
He via bikas)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/842abc17
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/842abc17
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/842abc17

Branch: refs/heads/master
Commit: 842abc17e43c464b35576400230f96757ae503c7
Parents: 74d04a4
Author: Bikas Saha <[email protected]>
Authored: Thu Jul 24 16:22:50 2014 -0700
Committer: Bikas Saha <[email protected]>
Committed: Thu Jul 24 16:22:50 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../java/org/apache/tez/dag/api/Vertex.java     | 43 ++++++++++----------
 .../org/apache/tez/dag/api/VertexGroup.java     |  8 ++--
 .../org/apache/tez/dag/api/TestDAGVerify.java   | 18 ++++----
 .../tez/dag/app/dag/impl/TestDAGImpl.java       |  4 +-
 .../tez/dag/history/utils/TestDAGUtils.java     |  6 +--
 .../mapreduce/examples/FilterLinesByWord.java   |  4 +-
 .../examples/FilterLinesByWordOneToOne.java     |  4 +-
 .../mapreduce/examples/IntersectDataGen.java    |  6 +--
 .../mapreduce/examples/IntersectExample.java    |  6 +--
 .../mapreduce/examples/IntersectValidate.java   |  4 +-
 .../tez/mapreduce/examples/UnionExample.java    | 12 +++---
 .../tez/mapreduce/examples/WordCount.java       |  4 +-
 .../apache/tez/mapreduce/hadoop/MRHelpers.java  |  6 +--
 .../org/apache/tez/mapreduce/input/MRInput.java |  2 +-
 .../org/apache/tez/test/TestDAGRecovery.java    |  2 +-
 .../org/apache/tez/test/TestDAGRecovery2.java   |  2 +-
 17 files changed, 68 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/842abc17/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e7a67db..5ebb28d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -30,6 +30,8 @@ INCOMPATIBLE CHANGES
   TezRuntimeConfiguration (bikas)
   TEZ-1134. InputInitializer and OutputCommitter implicitly use payloads of
   the input and output (bikas)
+  TEZ-1312. rename vertex.addInput/Output() to vertex.addDataSource/Sink()
+  (Chen He via bikas)
 
 Release 0.4.0-incubating: 2014-04-05
 

http://git-wip-us.apache.org/repos/asf/tez/blob/842abc17/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java 
b/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
index 4dca9ef..664df46 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
@@ -213,8 +213,8 @@ public class Vertex {
   }
   
   /**
-   * Specifies an Input for a Vertex. This is meant to be used when a Vertex
-   * reads Input directly from an external source </p>
+   * Specifies an external data source for a Vertex. This is meant to be used
+   * when a Vertex reads Input directly from an external source </p>
    * 
    * For vertices which read data generated by another vertex - use the
    * {@link DAG addEdge} method.
@@ -223,10 +223,10 @@ public class Vertex {
    * also from an external source, a combination of this API and the 
DAG.addEdge
    * API can be used. </p>
    * 
-   * Note: If more than one RootInput exists on a vertex, which generates 
events which need to be
-   * routed, or generates information to set parallelism, a custom vertex 
manager should be setup
-   * to handle this. Not using a custom vertex manager for such a scenario 
will lead to a
-   * runtime failure. 
+   * Note: If more than one RootInput exists on a vertex, which generates 
events
+   * which need to be routed, or generates information to set parallelism, a
+   * custom vertex manager should be setup to handle this. Not using a custom
+   * vertex manager for such a scenario will lead to a runtime failure.
    * 
    * @param inputName
    *          the name of the input. This will be used when accessing the input
@@ -244,7 +244,7 @@ public class Vertex {
    *          vertex parallelism should be set to -1. Can be null.
    * @return this Vertex
    */
-  public Vertex addInput(String inputName, InputDescriptor inputDescriptor,
+  public Vertex addDataSource(String inputName, InputDescriptor 
inputDescriptor,
       @Nullable InputInitializerDescriptor inputInitializerDescriptor) {
     additionalInputs
         .add(new RootInputLeafOutput<InputDescriptor, 
InputInitializerDescriptor>(
@@ -253,8 +253,8 @@ public class Vertex {
   }
 
   /**
-   * Specifies an Output for a Vertex. This is meant to be used when a Vertex
-   * writes Output directly to an external destination. </p>
+   * Specifies an external data sink for a Vertex. This is meant to be used 
when
+   * a Vertex writes Output directly to an external destination. </p>
    * 
    * If an output of the vertex is meant to be consumed by another Vertex in 
the
    * DAG - use the {@link DAG addEdge} method.
@@ -267,19 +267,20 @@ public class Vertex {
    *          the name of the output. This will be used when accessing the
    *          output in the {@link LogicalIOProcessor}
    * @param outputDescriptor
-   * @param outputCommitterDescriptor Specify committer to be used for the 
output
-   *           Can be null. After all tasks in the vertex (or in the DAG) have 
-   *           completed, the committer (if specified) is invoked to commit 
the 
-   *           outputs. Commit is a data sink specific operation that usually 
-   *           determines the visibility of the output to external observers.
-   *           E.g. moving output files from temporary dirs to the real output 
-   *           dir. When there are multiple executions of a task, the commit 
-   *           process also helps decide which execution will be included in 
the 
-   *           final output. Users should consider whether their application 
or 
-   *           data sink need a commit operation.
+   * @param outputCommitterDescriptor
+   *          Specify committer to be used for the output Can be null. After 
all
+   *          tasks in the vertex (or in the DAG) have completed, the committer
+   *          (if specified) is invoked to commit the outputs. Commit is a data
+   *          sink specific operation that usually determines the visibility of
+   *          the output to external observers. E.g. moving output files from
+   *          temporary dirs to the real output dir. When there are multiple
+   *          executions of a task, the commit process also helps decide which
+   *          execution will be included in the final output. Users should
+   *          consider whether their application or data sink need a commit
+   *          operation.
    * @return this Vertex
    */
-  public Vertex addOutput(String outputName, OutputDescriptor outputDescriptor,
+  public Vertex addDataSink(String outputName, OutputDescriptor 
outputDescriptor,
       @Nullable OutputCommitterDescriptor outputCommitterDescriptor) {
     additionalOutputs
         .add(new RootInputLeafOutput<OutputDescriptor, 
OutputCommitterDescriptor>(
@@ -287,7 +288,7 @@ public class Vertex {
     return this;
   }
   
-  Vertex addAdditionalOutput(RootInputLeafOutput<OutputDescriptor, 
OutputCommitterDescriptor> output) {
+  Vertex addAdditionalDataSink(RootInputLeafOutput<OutputDescriptor, 
OutputCommitterDescriptor> output) {
     additionalOutputs.add(output);
     return this;
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/842abc17/tez-api/src/main/java/org/apache/tez/dag/api/VertexGroup.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/VertexGroup.java 
b/tez-api/src/main/java/org/apache/tez/dag/api/VertexGroup.java
index 0952ab3..991350b 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/VertexGroup.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/VertexGroup.java
@@ -87,10 +87,10 @@ public class VertexGroup {
   }
   
   /**
-   * Add an common output to the group of vertices.
-   * Refer to {@link Vertex#addOutput(String, OutputDescriptor, 
OutputCommitterDescriptor)}
+   * Add an common data sink to the group of vertices.
+   * Refer to {@link Vertex#addDataSink(String, OutputDescriptor, 
OutputCommitterDescriptor)}
    */
-  public VertexGroup addOutput(String outputName, OutputDescriptor 
outputDescriptor,
+  public VertexGroup addDataSink(String outputName, OutputDescriptor 
outputDescriptor,
       @Nullable OutputCommitterDescriptor committerDescriptor) {
     RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor> 
leafOutput = 
         new RootInputLeafOutput<OutputDescriptor, 
OutputCommitterDescriptor>(outputName,
@@ -99,7 +99,7 @@ public class VertexGroup {
     
     // also add output to its members
     for (Vertex member : getMembers()) {
-      member.addAdditionalOutput(leafOutput);
+      member.addAdditionalDataSink(leafOutput);
     }
     
     return this;

http://git-wip-us.apache.org/repos/asf/tez/blob/842abc17/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java 
b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
index cd3414b..91c04fc 100644
--- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
@@ -482,7 +482,7 @@ public class TestDAGVerify {
         new ProcessorDescriptor("MapProcessor"),
         dummyTaskCount, dummyTaskResource);
     
-    v2.addInput("v1", new InputDescriptor(), null);
+    v2.addDataSource("v1", new InputDescriptor(), null);
     
     Edge e1 = new Edge(v1, v2,
         new EdgeProperty(DataMovementType.SCATTER_GATHER, 
@@ -506,7 +506,7 @@ public class TestDAGVerify {
         new ProcessorDescriptor("MapProcessor"),
         dummyTaskCount, dummyTaskResource);
     
-    v1.addOutput("v2", new OutputDescriptor(), null);
+    v1.addDataSink("v2", new OutputDescriptor(), null);
     
     Edge e1 = new Edge(v1, v2,
         new EdgeProperty(DataMovementType.SCATTER_GATHER, 
@@ -530,7 +530,7 @@ public class TestDAGVerify {
         new ProcessorDescriptor("MapProcessor"),
         dummyTaskCount, dummyTaskResource);
     
-    v1.addOutput("v2", new OutputDescriptor(), null);
+    v1.addDataSink("v2", new OutputDescriptor(), null);
     
     DAG dag = new DAG("testDag");
     dag.addVertex(v1);
@@ -547,7 +547,7 @@ public class TestDAGVerify {
         new ProcessorDescriptor("MapProcessor"),
         dummyTaskCount, dummyTaskResource);
     
-    v1.addInput("v2", new InputDescriptor(), null);
+    v1.addDataSource("v2", new InputDescriptor(), null);
     
     DAG dag = new DAG("testDag");
     dag.addVertex(v1);
@@ -606,7 +606,7 @@ public class TestDAGVerify {
     DAG dag = new DAG("testDag");
     VertexGroup uv12 = dag.createVertexGroup("uv12", v1, v2);
     OutputDescriptor outDesc = new OutputDescriptor();
-    uv12.addOutput("uvOut", outDesc, null);
+    uv12.addDataSink("uvOut", outDesc, null);
     
     GroupInputEdge e1 = new GroupInputEdge(uv12, v3,
         new EdgeProperty(DataMovementType.SCATTER_GATHER, 
@@ -663,7 +663,7 @@ public class TestDAGVerify {
     String groupName1 = "uv12";
     VertexGroup uv12 = dag.createVertexGroup(groupName1, v1, v2);
     OutputDescriptor outDesc = new OutputDescriptor();
-    uv12.addOutput("uvOut", outDesc, null);
+    uv12.addDataSink("uvOut", outDesc, null);
     
     String groupName2 = "uv23";
     VertexGroup uv23 = dag.createVertexGroup(groupName2, v2, v3);
@@ -745,7 +745,7 @@ public class TestDAGVerify {
     String groupName1 = "uv12";
     VertexGroup uv12 = dag.createVertexGroup(groupName1, v1, v2);
     OutputDescriptor outDesc = new OutputDescriptor();
-    uv12.addOutput("uvOut", outDesc, null);
+    uv12.addDataSink("uvOut", outDesc, null);
     
     String groupName2 = "uv23";
     VertexGroup uv23 = dag.createVertexGroup(groupName2, v2, v3);
@@ -880,8 +880,8 @@ public class TestDAGVerify {
         .getBytes());
     InputDescriptor inputDescriptor2 = new 
InputDescriptor("input2").setUserPayload("inputBytes"
         .getBytes());
-    v1.addInput("input1", inputDescriptor1, null);
-    v1.addInput("input2", inputDescriptor2, null);
+    v1.addDataSource("input1", inputDescriptor1, null);
+    v1.addDataSource("input2", inputDescriptor2, null);
 
     dag.addVertex(v1);
 

http://git-wip-us.apache.org/repos/asf/tez/blob/842abc17/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java 
b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
index 0bc42c1..a50771b 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
@@ -360,8 +360,8 @@ public class TestDAGImpl {
         TotalCountingOutputCommitter.class.getName());
     org.apache.tez.dag.api.VertexGroup uv12 = 
dag.createVertexGroup(groupName1, v1, v2);
     OutputDescriptor outDesc = new OutputDescriptor("output.class");
-    uv12.addOutput("uvOut", outDesc, ocd);
-    v3.addOutput("uvOut", outDesc, ocd);
+    uv12.addDataSink("uvOut", outDesc, ocd);
+    v3.addDataSink("uvOut", outDesc, ocd);
     
     GroupInputEdge e1 = new GroupInputEdge(uv12, v3,
         new EdgeProperty(DataMovementType.SCATTER_GATHER, 

http://git-wip-us.apache.org/repos/asf/tez/blob/842abc17/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java 
b/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java
index 0b7b395..5f0f1c9 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java
@@ -54,7 +54,7 @@ public class TestDAGUtils {
     org.apache.tez.dag.api.Vertex v1 = new 
org.apache.tez.dag.api.Vertex("vertex1",
         new ProcessorDescriptor("Processor").setHistoryText("vertex1 Processor 
HistoryText"),
         dummyTaskCount, dummyTaskResource);
-    v1.addInput("input1", new 
InputDescriptor("input.class").setHistoryText("input HistoryText"),
+    v1.addDataSource("input1", new 
InputDescriptor("input.class").setHistoryText("input HistoryText"),
         null);
     org.apache.tez.dag.api.Vertex v2 = new 
org.apache.tez.dag.api.Vertex("vertex2",
         new ProcessorDescriptor("Processor").setHistoryText("vertex2 Processor 
HistoryText"),
@@ -69,8 +69,8 @@ public class TestDAGUtils {
     OutputDescriptor outDesc = new OutputDescriptor("output.class")
         .setHistoryText("uvOut HistoryText");
     OutputCommitterDescriptor ocd = new 
OutputCommitterDescriptor(OutputCommitter.class.getName());
-    uv12.addOutput("uvOut", outDesc, ocd);
-    v3.addOutput("uvOut", outDesc, ocd);
+    uv12.addDataSink("uvOut", outDesc, ocd);
+    v3.addDataSink("uvOut", outDesc, ocd);
 
     GroupInputEdge e1 = new GroupInputEdge(uv12, v3,
         new EdgeProperty(DataMovementType.SCATTER_GATHER,

http://git-wip-us.apache.org/repos/asf/tez/blob/842abc17/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
----------------------------------------------------------------------
diff --git 
a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
 
b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
index 3a26171..9588c72 100644
--- 
a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
+++ 
b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
@@ -199,7 +199,7 @@ public class FilterLinesByWord extends Configured 
implements Tool {
     // Configure the Input for stage1
     Class<? extends TezRootInputInitializer> initializerClazz = 
generateSplitsInClient ? null
         : MRInputAMSplitGenerator.class;
-    stage1Vertex.addInput("MRInput",
+    stage1Vertex.addDataSource("MRInput",
         new InputDescriptor(MRInputLegacy.class.getName())
             .setUserPayload(MRHelpers.createMRInputPayload(stage1Payload, 
null)),
         (initializerClazz==null ? null : new 
InputInitializerDescriptor(initializerClazz.getName())));
@@ -215,7 +215,7 @@ public class FilterLinesByWord extends Configured 
implements Tool {
     OutputDescriptor od = new OutputDescriptor(MROutput.class.getName())
         .setUserPayload(MRHelpers.createUserPayloadFromConf(stage2Conf));
     OutputCommitterDescriptor ocd = new 
OutputCommitterDescriptor(MROutputCommitter.class.getName());
-    stage2Vertex.addOutput("MROutput", od, ocd);
+    stage2Vertex.addDataSink("MROutput", od, ocd);
 
     UnorderedUnpartitionedKVEdgeConfigurer edgeConf = 
UnorderedUnpartitionedKVEdgeConfigurer
         .newBuilder(Text.class.getName(), 
TextLongPair.class.getName()).build();

http://git-wip-us.apache.org/repos/asf/tez/blob/842abc17/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java
----------------------------------------------------------------------
diff --git 
a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java
 
b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java
index 53eb590..9143351 100644
--- 
a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java
+++ 
b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java
@@ -185,7 +185,7 @@ public class FilterLinesByWordOneToOne extends Configured 
implements Tool {
     // Configure the Input for stage1
     Class<? extends TezRootInputInitializer> initializerClazz = 
generateSplitsInClient ? null
         : MRInputAMSplitGenerator.class;
-    stage1Vertex.addInput("MRInput",
+    stage1Vertex.addDataSource("MRInput",
         new InputDescriptor(MRInputLegacy.class.getName())
             .setUserPayload(MRHelpers.createMRInputPayload(stage1Payload, 
null)),
             (initializerClazz==null ? null : new 
InputInitializerDescriptor(initializerClazz.getName())));
@@ -198,7 +198,7 @@ public class FilterLinesByWordOneToOne extends Configured 
implements Tool {
     stage2Vertex.setTaskLocalFiles(commonLocalResources);
 
     // Configure the Output for stage2
-    stage2Vertex.addOutput("MROutput",
+    stage2Vertex.addDataSink("MROutput",
         new OutputDescriptor(MROutput.class.getName()).setUserPayload(MRHelpers
             .createUserPayloadFromConf(stage2Conf)),
             new OutputCommitterDescriptor(MROutputCommitter.class.getName()));

http://git-wip-us.apache.org/repos/asf/tez/blob/842abc17/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectDataGen.java
----------------------------------------------------------------------
diff --git 
a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectDataGen.java
 
b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectDataGen.java
index 58b952b..9ed33f9 100644
--- 
a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectDataGen.java
+++ 
b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectDataGen.java
@@ -213,13 +213,13 @@ public class IntersectDataGen extends Configured 
implements Tool {
     Vertex genDataVertex = new Vertex("datagen", new ProcessorDescriptor(
         
GenDataProcessor.class.getName()).setUserPayload(GenDataProcessor.createConfiguration(
         largeOutSizePerTask, smallOutSizePerTask)), numTasks, 
MRHelpers.getMapResource(tezConf));
-    genDataVertex.addOutput(STREAM_OUTPUT_NAME,
+    genDataVertex.addDataSink(STREAM_OUTPUT_NAME,
         new 
OutputDescriptor(MROutput.class.getName()).setUserPayload(streamOutputPayload),
         new OutputCommitterDescriptor(MROutputCommitter.class.getName()));
-    genDataVertex.addOutput(HASH_OUTPUT_NAME,
+    genDataVertex.addDataSink(HASH_OUTPUT_NAME,
         new 
OutputDescriptor(MROutput.class.getName()).setUserPayload(hashOutputPayload),
         new OutputCommitterDescriptor(MROutputCommitter.class.getName()));
-    genDataVertex.addOutput(EXPECTED_OUTPUT_NAME,
+    genDataVertex.addDataSink(EXPECTED_OUTPUT_NAME,
         new 
OutputDescriptor(MROutput.class.getName()).setUserPayload(expectedOutputPayload),
         new OutputCommitterDescriptor(MROutputCommitter.class.getName()));
 

http://git-wip-us.apache.org/repos/asf/tez/blob/842abc17/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectExample.java
----------------------------------------------------------------------
diff --git 
a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectExample.java
 
b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectExample.java
index 1353080..c5bf792 100644
--- 
a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectExample.java
+++ 
b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectExample.java
@@ -212,21 +212,21 @@ public class IntersectExample extends Configured 
implements Tool {
     // Change the way resources are setup - no MRHelpers
     Vertex streamFileVertex = new Vertex("partitioner1",
         new ProcessorDescriptor(ForwardingProcessor.class.getName()), -1,
-        MRHelpers.getMapResource(tezConf)).addInput("streamfile",
+        MRHelpers.getMapResource(tezConf)).addDataSource("streamfile",
         new InputDescriptor(MRInput.class.getName())
             .setUserPayload(streamInputPayload), 
             new 
InputInitializerDescriptor(MRInputAMSplitGenerator.class.getName()));
 
     Vertex hashFileVertex = new Vertex("partitioner2", new ProcessorDescriptor(
         ForwardingProcessor.class.getName()), -1,
-        MRHelpers.getMapResource(tezConf)).addInput("hashfile",
+        MRHelpers.getMapResource(tezConf)).addDataSource("hashfile",
         new InputDescriptor(MRInput.class.getName())
             .setUserPayload(hashInputPayload), 
             new 
InputInitializerDescriptor(MRInputAMSplitGenerator.class.getName()));
 
     Vertex intersectVertex = new Vertex("intersect", new ProcessorDescriptor(
         IntersectProcessor.class.getName()), numPartitions,
-        MRHelpers.getReduceResource(tezConf)).addOutput("finalOutput",
+        MRHelpers.getReduceResource(tezConf)).addDataSink("finalOutput",
         new OutputDescriptor(MROutput.class.getName())
             .setUserPayload(finalOutputPayload), 
         new OutputCommitterDescriptor(MROutputCommitter.class.getName()));

http://git-wip-us.apache.org/repos/asf/tez/blob/842abc17/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectValidate.java
----------------------------------------------------------------------
diff --git 
a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectValidate.java
 
b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectValidate.java
index 0b91efb..585ee63 100644
--- 
a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectValidate.java
+++ 
b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectValidate.java
@@ -212,13 +212,13 @@ public class IntersectValidate extends Configured 
implements Tool {
     // Change the way resources are setup - no MRHelpers
     Vertex lhsVertex = new Vertex(LHS_INPUT_NAME, new ProcessorDescriptor(
         ForwardingProcessor.class.getName()), -1,
-        MRHelpers.getMapResource(tezConf)).addInput("lhs", new InputDescriptor(
+        MRHelpers.getMapResource(tezConf)).addDataSource("lhs", new 
InputDescriptor(
         MRInput.class.getName()).setUserPayload(streamInputPayload),
         new 
InputInitializerDescriptor(MRInputAMSplitGenerator.class.getName()));
 
     Vertex rhsVertex = new Vertex(RHS_INPUT_NAME, new ProcessorDescriptor(
         ForwardingProcessor.class.getName()), -1,
-        MRHelpers.getMapResource(tezConf)).addInput("rhs", new InputDescriptor(
+        MRHelpers.getMapResource(tezConf)).addDataSource("rhs", new 
InputDescriptor(
         MRInput.class.getName()).setUserPayload(hashInputPayload),
         new 
InputInitializerDescriptor(MRInputAMSplitGenerator.class.getName()));
 

http://git-wip-us.apache.org/repos/asf/tez/blob/842abc17/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java
----------------------------------------------------------------------
diff --git 
a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java
 
b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java
index e2f073f..fdbe187 100644
--- 
a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java
+++ 
b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java
@@ -184,17 +184,17 @@ public class UnionExample {
         numMaps, MRHelpers.getMapResource(tezConf));
     InputInitializerDescriptor iid = 
         new 
InputInitializerDescriptor(MRInputAMSplitGenerator.class.getName());
-    mapVertex1.addInput("MRInput", id, iid);
+    mapVertex1.addDataSource("MRInput", id, iid);
 
     Vertex mapVertex2 = new Vertex("map2", new ProcessorDescriptor(
         TokenProcessor.class.getName()),
         numMaps, MRHelpers.getMapResource(tezConf));
-    mapVertex2.addInput("MRInput", id, iid);
+    mapVertex2.addDataSource("MRInput", id, iid);
 
     Vertex mapVertex3 = new Vertex("map3", new ProcessorDescriptor(
         TokenProcessor.class.getName()),
         numMaps, MRHelpers.getMapResource(tezConf));
-    mapVertex3.addInput("MRInput", id, iid);
+    mapVertex3.addDataSource("MRInput", id, iid);
 
     Vertex checkerVertex = new Vertex("checker",
         new ProcessorDescriptor(
@@ -207,14 +207,14 @@ public class UnionExample {
       .setUserPayload(MROutput.createUserPayload(
           outputConf, TextOutputFormat.class.getName(), true));
     OutputCommitterDescriptor ocd = new 
OutputCommitterDescriptor(MROutputCommitter.class.getName());
-    checkerVertex.addOutput("union", od, ocd);
+    checkerVertex.addDataSink("union", od, ocd);
 
     Configuration allPartsConf = new Configuration(tezConf);
     allPartsConf.set(FileOutputFormat.OUTDIR, outputPath+"-all-parts");
     OutputDescriptor od2 = new OutputDescriptor(MROutput.class.getName())
       .setUserPayload(MROutput.createUserPayload(
           allPartsConf, TextOutputFormat.class.getName(), true));
-    checkerVertex.addOutput("all-parts", od2, ocd);
+    checkerVertex.addDataSink("all-parts", od2, ocd);
 
     Configuration partsConf = new Configuration(tezConf);
     partsConf.set(FileOutputFormat.OUTDIR, outputPath+"-parts");
@@ -223,7 +223,7 @@ public class UnionExample {
     OutputDescriptor od1 = new OutputDescriptor(MROutput.class.getName())
       .setUserPayload(MROutput.createUserPayload(
           partsConf, TextOutputFormat.class.getName(), true));
-    unionVertex.addOutput("parts", od1, ocd);
+    unionVertex.addDataSink("parts", od1, ocd);
 
     OrderedPartitionedKVEdgeConfigurer edgeConf = 
OrderedPartitionedKVEdgeConfigurer
         .newBuilder(Text.class.getName(), IntWritable.class.getName(),

http://git-wip-us.apache.org/repos/asf/tez/blob/842abc17/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java
----------------------------------------------------------------------
diff --git 
a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java
 
b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java
index 61f5cd9..00fc326 100644
--- 
a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java
+++ 
b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java
@@ -131,12 +131,12 @@ public class WordCount extends Configured implements Tool 
{
 
     Vertex tokenizerVertex = new Vertex("tokenizer", new ProcessorDescriptor(
         TokenProcessor.class.getName()), -1, 
MRHelpers.getMapResource(tezConf));
-    tokenizerVertex.addInput("MRInput", id, iid);
+    tokenizerVertex.addDataSource("MRInput", id, iid);
 
     Vertex summerVertex = new Vertex("summer",
         new ProcessorDescriptor(
             SumProcessor.class.getName()), 1, 
MRHelpers.getReduceResource(tezConf));
-    summerVertex.addOutput("MROutput", od, ocd);
+    summerVertex.addDataSink("MROutput", od, ocd);
 
     OrderedPartitionedKVEdgeConfigurer edgeConf = 
OrderedPartitionedKVEdgeConfigurer
         .newBuilder(Text.class.getName(), IntWritable.class.getName(),

http://git-wip-us.apache.org/repos/asf/tez/blob/842abc17/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
----------------------------------------------------------------------
diff --git 
a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java 
b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
index 29ea6a6..92dc0c5 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
@@ -983,7 +983,7 @@ public class MRHelpers {
       InputInitializerDescriptor initClazz) {
     InputDescriptor id = new InputDescriptor(MRInputLegacy.class.getName())
         .setUserPayload(userPayload);
-    vertex.addInput("MRInput", id, initClazz);
+    vertex.addDataSource("MRInput", id, initClazz);
   }
 
   /**
@@ -998,14 +998,14 @@ public class MRHelpers {
   public static void addMROutput(Vertex vertex, byte[] userPayload) {
     OutputDescriptor od = new OutputDescriptor(MROutput.class.getName())
         .setUserPayload(userPayload);
-    vertex.addOutput("MROutput", od, new 
OutputCommitterDescriptor(MROutputCommitter.class.getName()));
+    vertex.addDataSink("MROutput", od, new 
OutputCommitterDescriptor(MROutputCommitter.class.getName()));
   }
 
   @Private
   public static void addMROutputLegacy(Vertex vertex, byte[] userPayload) {
     OutputDescriptor od = new OutputDescriptor(MROutputLegacy.class.getName())
         .setUserPayload(userPayload);
-    vertex.addOutput("MROutput", od, new 
OutputCommitterDescriptor(MROutputCommitter.class.getName()));
+    vertex.addDataSink("MROutput", od, new 
OutputCommitterDescriptor(MROutputCommitter.class.getName()));
   }
 
   @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/tez/blob/842abc17/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
----------------------------------------------------------------------
diff --git 
a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java 
b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
index eb2ba89..2f7c13e 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
@@ -94,7 +94,7 @@ public class MRInput extends MRInputBase {
    *          the InputFormat will be grouped in the AM based on available
    *          resources, locality etc. This option may be set to true only when
    *          using MRInputAMSplitGenerator as the initializer class in
-   *          {@link Vertex#addInput(String, 
org.apache.tez.dag.api.InputDescriptor, 
+   *          {@link Vertex#addDataSource(String, 
org.apache.tez.dag.api.InputDescriptor, 
    *          org.apache.tez.dag.api.InputInitializerDescriptor)}
    * @return returns the user payload to be set on the InputDescriptor of
    *         MRInput

http://git-wip-us.apache.org/repos/asf/tez/blob/842abc17/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java 
b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java
index b0f937a..ddbec28 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java
@@ -173,7 +173,7 @@ public class TestDAGRecovery {
   @Test(timeout=120000)
   public void testDelayedInit() throws Exception {
     DAG dag = SimpleVTestDAG.createDAG("DelayedInitDAG", null);
-    dag.getVertex("v1").addInput("i1",
+    dag.getVertex("v1").addDataSource("i1",
         new InputDescriptor(NoOpInput.class.getName()),
         new 
InputInitializerDescriptor(FailingInputInitializer.class.getName()));
     runDAGAndVerify(dag, State.SUCCEEDED);

http://git-wip-us.apache.org/repos/asf/tez/blob/842abc17/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery2.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery2.java 
b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery2.java
index 6b3727a..ca8f00b 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery2.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery2.java
@@ -180,7 +180,7 @@ public class TestDAGRecovery2 {
             .toUserPayload());
     OutputCommitterDescriptor ocd = new OutputCommitterDescriptor(
         MultiAttemptDAG.FailingOutputCommitter.class.getName());
-    dag.getVertex("v3").addOutput("FailingOutput", od, ocd);
+    dag.getVertex("v3").addDataSink("FailingOutput", od, ocd);
     runDAGAndVerify(dag, State.FAILED);
   }
 

Reply via email to