Repository: tez Updated Branches: refs/heads/master 2e52635d9 -> 74d04a48a
http://git-wip-us.apache.org/repos/asf/tez/blob/74d04a48/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java index 8071e52..7becaad 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java @@ -68,8 +68,10 @@ import org.apache.tez.dag.api.EdgeProperty; import org.apache.tez.dag.api.EdgeProperty.DataSourceType; import org.apache.tez.dag.api.EdgeProperty.SchedulingType; import org.apache.tez.dag.api.InputDescriptor; +import org.apache.tez.dag.api.InputInitializerDescriptor; import org.apache.tez.dag.api.OutputDescriptor; import org.apache.tez.dag.api.ProcessorDescriptor; +import org.apache.tez.dag.api.RootInputLeafOutput; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.VertexLocationHint; import org.apache.tez.dag.api.EdgeProperty.DataMovementType; @@ -436,9 +438,11 @@ public class TestVertexImpl { .setType(PlanVertexType.NORMAL) .addInputs( RootInputLeafOutputProto.newBuilder() - .setInitializerClassName(initializerClassName) + .setControllerDescriptor( + TezEntityDescriptorProto.newBuilder().setClassName( + initializerClassName)) .setName("input1") - .setEntityDescriptor( + .setIODescriptor( TezEntityDescriptorProto.newBuilder() .setClassName("InputClazz") .build() @@ -462,9 +466,11 @@ public class TestVertexImpl { .setType(PlanVertexType.NORMAL) .addInputs( RootInputLeafOutputProto.newBuilder() - .setInitializerClassName(initializerClassName) + .setControllerDescriptor( + TezEntityDescriptorProto.newBuilder().setClassName( + initializerClassName)) .setName("input2") - .setEntityDescriptor( + .setIODescriptor( TezEntityDescriptorProto.newBuilder() .setClassName("InputClazz") .build() @@ -489,9 +495,11 @@ public class TestVertexImpl { .setType(PlanVertexType.NORMAL) .addInputs( RootInputLeafOutputProto.newBuilder() - .setInitializerClassName(initializerClassName) + .setControllerDescriptor( + TezEntityDescriptorProto.newBuilder().setClassName( + initializerClassName)) .setName("input3") - .setEntityDescriptor( + .setIODescriptor( TezEntityDescriptorProto.newBuilder() .setClassName("InputClazz") .build() @@ -518,9 +526,11 @@ public class TestVertexImpl { .setType(PlanVertexType.NORMAL) .addInputs( RootInputLeafOutputProto.newBuilder() - .setInitializerClassName(initializerClassName) + .setControllerDescriptor( + TezEntityDescriptorProto.newBuilder().setClassName( + initializerClassName)) .setName("input4") - .setEntityDescriptor( + .setIODescriptor( TezEntityDescriptorProto.newBuilder() .setClassName("InputClazz") .build() @@ -583,9 +593,11 @@ public class TestVertexImpl { .setType(PlanVertexType.NORMAL) .addInputs( RootInputLeafOutputProto.newBuilder() - .setInitializerClassName("IrrelevantInitializerClassName") + .setControllerDescriptor( + TezEntityDescriptorProto.newBuilder().setClassName( + "IrrelevantInitializerClassName")) .setName("input1") - .setEntityDescriptor( + .setIODescriptor( TezEntityDescriptorProto.newBuilder() .setClassName("InputClazz") .build() @@ -630,9 +642,11 @@ public class TestVertexImpl { .setType(PlanVertexType.NORMAL) .addInputs( RootInputLeafOutputProto.newBuilder() - .setInitializerClassName(initializerClassName) + .setControllerDescriptor( + TezEntityDescriptorProto.newBuilder().setClassName( + initializerClassName)) .setName("input1") - .setEntityDescriptor( + .setIODescriptor( TezEntityDescriptorProto.newBuilder() .setClassName("InputClazz") .build() @@ -697,9 +711,11 @@ public class TestVertexImpl { numTasks = -1; v1Builder.addInputs( RootInputLeafOutputProto.newBuilder() - .setInitializerClassName(initializerClassName) + .setControllerDescriptor( + TezEntityDescriptorProto.newBuilder().setClassName( + initializerClassName)) .setName("input1") - .setEntityDescriptor( + .setIODescriptor( TezEntityDescriptorProto.newBuilder() .setClassName("InputClazz") .build() @@ -1026,11 +1042,13 @@ public class TestVertexImpl { ) .addOutputs( DAGProtos.RootInputLeafOutputProto.newBuilder() - .setEntityDescriptor( + .setIODescriptor( TezEntityDescriptorProto.newBuilder().setClassName("output").build() ) .setName("outputx") - .setInitializerClassName(CountingOutputCommitter.class.getName()) + .setControllerDescriptor( + TezEntityDescriptorProto.newBuilder().setClassName( + CountingOutputCommitter.class.getName())) ) .setTaskConfig( PlanTaskConfiguration.newBuilder() @@ -2340,13 +2358,14 @@ public class TestVertexImpl { List<RootInputLeafOutputProto> outputs = new ArrayList<RootInputLeafOutputProto>(); outputs.add(RootInputLeafOutputProto.newBuilder() - .setInitializerClassName(CountingOutputCommitter.class.getName()) - .setName("output_v2") - .setEntityDescriptor( - TezEntityDescriptorProto.newBuilder() - .setUserPayload(ByteString.copyFrom( + .setControllerDescriptor( + TezEntityDescriptorProto.newBuilder().setClassName( + CountingOutputCommitter.class.getName()).setUserPayload(ByteString.copyFrom( new CountingOutputCommitter.CountingOutputCommitterConfig() - .toUserPayload())).build()) + .toUserPayload())).build()) + .setName("output_v2") + .setIODescriptor( + TezEntityDescriptorProto.newBuilder().setClassName("output.class")) .build()); v.setAdditionalOutputs(outputs); @@ -2452,13 +2471,14 @@ public class TestVertexImpl { List<RootInputLeafOutputProto> outputs = new ArrayList<RootInputLeafOutputProto>(); outputs.add(RootInputLeafOutputProto.newBuilder() - .setInitializerClassName(CountingOutputCommitter.class.getName()) - .setName("output_v2") - .setEntityDescriptor( - TezEntityDescriptorProto.newBuilder() - .setUserPayload(ByteString.copyFrom( + .setControllerDescriptor( + TezEntityDescriptorProto.newBuilder().setClassName( + CountingOutputCommitter.class.getName()).setUserPayload(ByteString.copyFrom( new CountingOutputCommitter.CountingOutputCommitterConfig( true, true, false).toUserPayload())).build()) + .setName("output_v2") + .setIODescriptor( + TezEntityDescriptorProto.newBuilder().setClassName("output.class")) .build()); v.setAdditionalOutputs(outputs); @@ -2495,13 +2515,14 @@ public class TestVertexImpl { List<RootInputLeafOutputProto> outputs = new ArrayList<RootInputLeafOutputProto>(); outputs.add(RootInputLeafOutputProto.newBuilder() - .setInitializerClassName(CountingOutputCommitter.class.getName()) - .setName("output_v2") - .setEntityDescriptor( - TezEntityDescriptorProto.newBuilder() - .setUserPayload(ByteString.copyFrom( + .setControllerDescriptor( + TezEntityDescriptorProto.newBuilder().setClassName( + CountingOutputCommitter.class.getName()).setUserPayload(ByteString.copyFrom( new CountingOutputCommitter.CountingOutputCommitterConfig( true, true, true).toUserPayload())).build()) + .setName("output_v2") + .setIODescriptor( + TezEntityDescriptorProto.newBuilder().setClassName("output.class")) .build()); v.setAdditionalOutputs(outputs); @@ -2793,6 +2814,7 @@ public class TestVertexImpl { Assert.assertEquals(true, initializerManager2.hasShutDown); } + @SuppressWarnings("unchecked") @Test(timeout = 10000) public void testRootInputInitializerEvent() throws Exception { useCustomInitializer = true; @@ -2814,7 +2836,6 @@ public class TestVertexImpl { Assert.assertEquals(VertexState.INITIALIZING, v2.getState()); dispatcher.await(); - RootInputInitializerManagerWithRunningInitializer manager2 = v2.getRootInputInitializerManager(); // Wait for the initializer to be invoked - which may be a separate thread. while (!initializer.initStarted.get()) { Thread.sleep(10); @@ -3052,6 +3073,7 @@ public class TestVertexImpl { } } + @SuppressWarnings("rawtypes") private static class VertexImplWithRunningInputInitializer extends VertexImpl { private RootInputInitializerManagerWithRunningInitializer rootInputInitializerManager; @@ -3087,10 +3109,6 @@ public class TestVertexImpl { } return rootInputInitializerManager; } - - RootInputInitializerManagerWithRunningInitializer getRootInputInitializerManager() { - return rootInputInitializerManager; - } } @SuppressWarnings("rawtypes") @@ -3150,7 +3168,7 @@ public class TestVertexImpl { @Override protected TezRootInputInitializer createInitializer( - RootInputLeafOutputDescriptor<InputDescriptor> input) { + RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> input) { return presetInitializer; } } @@ -3159,7 +3177,7 @@ public class TestVertexImpl { private static class RootInputInitializerManagerControlled extends RootInputInitializerManager { - private List<RootInputLeafOutputDescriptor<InputDescriptor>> inputs; + private List<RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> inputs; private final EventHandler eventHandler; private final DrainDispatcher dispatcher; private final TezVertexID vertexID; @@ -3177,13 +3195,13 @@ public class TestVertexImpl { @Override public void runInputInitializers( - List<RootInputLeafOutputDescriptor<InputDescriptor>> inputs) { + List<RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> inputs) { this.inputs = inputs; } @Override protected TezRootInputInitializer createInitializer( - RootInputLeafOutputDescriptor<InputDescriptor> input) { + RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> input) { return new TezRootInputInitializer() { @Override @@ -3207,14 +3225,14 @@ public class TestVertexImpl { public void failInputInitialization() { super.runInputInitializers(inputs); eventHandler.handle(new VertexEventRootInputFailed(vertexID, inputs - .get(0).getEntityName(), + .get(0).getName(), new RuntimeException("MockInitializerFailed"))); dispatcher.await(); } public void completeInputInitialization() { eventHandler.handle(new VertexEventRootInputInitialized(vertexID, inputs.get(0) - .getEntityName(), null)); + .getName(), null)); dispatcher.await(); } @@ -3223,7 +3241,7 @@ public class TestVertexImpl { RootInputUpdatePayloadEvent event = new RootInputUpdatePayloadEvent(payload); events.add(event); eventHandler.handle(new VertexEventRootInputInitialized(vertexID, inputs - .get(0).getEntityName(), events)); + .get(0).getName(), events)); dispatcher.await(); } @@ -3240,7 +3258,7 @@ public class TestVertexImpl { events.add(diEvent); } eventHandler.handle(new VertexEventRootInputInitialized(vertexID, inputs - .get(initializerIndex).getEntityName(), events)); + .get(initializerIndex).getName(), events)); dispatcher.await(); } } http://git-wip-us.apache.org/repos/asf/tez/blob/74d04a48/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java b/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java index f926471..0b7b395 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java @@ -33,12 +33,12 @@ import org.apache.tez.dag.api.EdgeProperty.DataSourceType; import org.apache.tez.dag.api.EdgeProperty.SchedulingType; import org.apache.tez.dag.api.GroupInputEdge; import org.apache.tez.dag.api.InputDescriptor; +import org.apache.tez.dag.api.OutputCommitterDescriptor; import org.apache.tez.dag.api.OutputDescriptor; import org.apache.tez.dag.api.ProcessorDescriptor; import org.apache.tez.dag.api.records.DAGProtos.DAGPlan; import org.apache.tez.runtime.api.OutputCommitter; import org.codehaus.jettison.json.JSONException; -import org.codehaus.jettison.json.JSONObject; import org.junit.Assert; import org.junit.Test; @@ -68,8 +68,9 @@ public class TestDAGUtils { org.apache.tez.dag.api.VertexGroup uv12 = dag.createVertexGroup(groupName1, v1, v2); OutputDescriptor outDesc = new OutputDescriptor("output.class") .setHistoryText("uvOut HistoryText"); - uv12.addOutput("uvOut", outDesc, OutputCommitter.class); - v3.addOutput("uvOut", outDesc, OutputCommitter.class); + OutputCommitterDescriptor ocd = new OutputCommitterDescriptor(OutputCommitter.class.getName()); + uv12.addOutput("uvOut", outDesc, ocd); + v3.addOutput("uvOut", outDesc, ocd); GroupInputEdge e1 = new GroupInputEdge(uv12, v3, new EdgeProperty(DataMovementType.SCATTER_GATHER, @@ -86,6 +87,7 @@ public class TestDAGUtils { } @Test + @SuppressWarnings("unchecked") public void testConvertDAGPlanToATSMap() throws IOException, JSONException { DAGPlan dagPlan = createDAG(); Map<String, Object> atsMap = DAGUtils.convertDAGPlanToATSMap(dagPlan); http://git-wip-us.apache.org/repos/asf/tez/blob/74d04a48/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java index 78f8a0f..3a26171 100644 --- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java +++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java @@ -56,6 +56,8 @@ import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.api.DAG; import org.apache.tez.dag.api.Edge; import org.apache.tez.dag.api.InputDescriptor; +import org.apache.tez.dag.api.InputInitializerDescriptor; +import org.apache.tez.dag.api.OutputCommitterDescriptor; import org.apache.tez.dag.api.OutputDescriptor; import org.apache.tez.dag.api.ProcessorDescriptor; import org.apache.tez.dag.api.TezConfiguration; @@ -200,7 +202,7 @@ public class FilterLinesByWord extends Configured implements Tool { stage1Vertex.addInput("MRInput", new InputDescriptor(MRInputLegacy.class.getName()) .setUserPayload(MRHelpers.createMRInputPayload(stage1Payload, null)), - initializerClazz); + (initializerClazz==null ? null : new InputInitializerDescriptor(initializerClazz.getName()))); // Setup stage2 Vertex Vertex stage2Vertex = new Vertex("stage2", new ProcessorDescriptor( @@ -212,7 +214,8 @@ public class FilterLinesByWord extends Configured implements Tool { // Configure the Output for stage2 OutputDescriptor od = new OutputDescriptor(MROutput.class.getName()) .setUserPayload(MRHelpers.createUserPayloadFromConf(stage2Conf)); - stage2Vertex.addOutput("MROutput", od, MROutputCommitter.class); + OutputCommitterDescriptor ocd = new OutputCommitterDescriptor(MROutputCommitter.class.getName()); + stage2Vertex.addOutput("MROutput", od, ocd); UnorderedUnpartitionedKVEdgeConfigurer edgeConf = UnorderedUnpartitionedKVEdgeConfigurer .newBuilder(Text.class.getName(), TextLongPair.class.getName()).build(); http://git-wip-us.apache.org/repos/asf/tez/blob/74d04a48/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java index 1289b58..53eb590 100644 --- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java +++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java @@ -48,6 +48,8 @@ import org.apache.tez.client.TezClient; import org.apache.tez.dag.api.DAG; import org.apache.tez.dag.api.Edge; import org.apache.tez.dag.api.InputDescriptor; +import org.apache.tez.dag.api.InputInitializerDescriptor; +import org.apache.tez.dag.api.OutputCommitterDescriptor; import org.apache.tez.dag.api.OutputDescriptor; import org.apache.tez.dag.api.ProcessorDescriptor; import org.apache.tez.dag.api.TezConfiguration; @@ -186,7 +188,7 @@ public class FilterLinesByWordOneToOne extends Configured implements Tool { stage1Vertex.addInput("MRInput", new InputDescriptor(MRInputLegacy.class.getName()) .setUserPayload(MRHelpers.createMRInputPayload(stage1Payload, null)), - initializerClazz); + (initializerClazz==null ? null : new InputInitializerDescriptor(initializerClazz.getName()))); // Setup stage2 Vertex Vertex stage2Vertex = new Vertex("stage2", new ProcessorDescriptor( @@ -199,7 +201,7 @@ public class FilterLinesByWordOneToOne extends Configured implements Tool { stage2Vertex.addOutput("MROutput", new OutputDescriptor(MROutput.class.getName()).setUserPayload(MRHelpers .createUserPayloadFromConf(stage2Conf)), - MROutputCommitter.class); + new OutputCommitterDescriptor(MROutputCommitter.class.getName())); UnorderedUnpartitionedKVEdgeConfigurer edgeConf = UnorderedUnpartitionedKVEdgeConfigurer .newBuilder(Text.class.getName(), TextLongPair.class.getName()).build(); http://git-wip-us.apache.org/repos/asf/tez/blob/74d04a48/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectDataGen.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectDataGen.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectDataGen.java index e5ab546..58b952b 100644 --- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectDataGen.java +++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectDataGen.java @@ -44,6 +44,7 @@ import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.tez.client.TezClient; import org.apache.tez.dag.api.DAG; +import org.apache.tez.dag.api.OutputCommitterDescriptor; import org.apache.tez.dag.api.OutputDescriptor; import org.apache.tez.dag.api.ProcessorDescriptor; import org.apache.tez.dag.api.TezConfiguration; @@ -214,13 +215,13 @@ public class IntersectDataGen extends Configured implements Tool { largeOutSizePerTask, smallOutSizePerTask)), numTasks, MRHelpers.getMapResource(tezConf)); genDataVertex.addOutput(STREAM_OUTPUT_NAME, new OutputDescriptor(MROutput.class.getName()).setUserPayload(streamOutputPayload), - MROutputCommitter.class); + new OutputCommitterDescriptor(MROutputCommitter.class.getName())); genDataVertex.addOutput(HASH_OUTPUT_NAME, new OutputDescriptor(MROutput.class.getName()).setUserPayload(hashOutputPayload), - MROutputCommitter.class); + new OutputCommitterDescriptor(MROutputCommitter.class.getName())); genDataVertex.addOutput(EXPECTED_OUTPUT_NAME, new OutputDescriptor(MROutput.class.getName()).setUserPayload(expectedOutputPayload), - MROutputCommitter.class); + new OutputCommitterDescriptor(MROutputCommitter.class.getName())); dag.addVertex(genDataVertex); http://git-wip-us.apache.org/repos/asf/tez/blob/74d04a48/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectExample.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectExample.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectExample.java index 6c76354..1353080 100644 --- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectExample.java +++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectExample.java @@ -45,6 +45,8 @@ import org.apache.tez.client.TezClient; import org.apache.tez.dag.api.DAG; import org.apache.tez.dag.api.Edge; import org.apache.tez.dag.api.InputDescriptor; +import org.apache.tez.dag.api.InputInitializerDescriptor; +import org.apache.tez.dag.api.OutputCommitterDescriptor; import org.apache.tez.dag.api.OutputDescriptor; import org.apache.tez.dag.api.ProcessorDescriptor; import org.apache.tez.dag.api.TezConfiguration; @@ -212,19 +214,22 @@ public class IntersectExample extends Configured implements Tool { new ProcessorDescriptor(ForwardingProcessor.class.getName()), -1, MRHelpers.getMapResource(tezConf)).addInput("streamfile", new InputDescriptor(MRInput.class.getName()) - .setUserPayload(streamInputPayload), MRInputAMSplitGenerator.class); + .setUserPayload(streamInputPayload), + new InputInitializerDescriptor(MRInputAMSplitGenerator.class.getName())); Vertex hashFileVertex = new Vertex("partitioner2", new ProcessorDescriptor( ForwardingProcessor.class.getName()), -1, MRHelpers.getMapResource(tezConf)).addInput("hashfile", new InputDescriptor(MRInput.class.getName()) - .setUserPayload(hashInputPayload), MRInputAMSplitGenerator.class); + .setUserPayload(hashInputPayload), + new InputInitializerDescriptor(MRInputAMSplitGenerator.class.getName())); Vertex intersectVertex = new Vertex("intersect", new ProcessorDescriptor( IntersectProcessor.class.getName()), numPartitions, MRHelpers.getReduceResource(tezConf)).addOutput("finalOutput", new OutputDescriptor(MROutput.class.getName()) - .setUserPayload(finalOutputPayload), MROutputCommitter.class); + .setUserPayload(finalOutputPayload), + new OutputCommitterDescriptor(MROutputCommitter.class.getName())); Edge e1 = new Edge(streamFileVertex, intersectVertex, edgeConf.createDefaultEdgeProperty()); http://git-wip-us.apache.org/repos/asf/tez/blob/74d04a48/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectValidate.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectValidate.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectValidate.java index 3a74429..0b91efb 100644 --- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectValidate.java +++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectValidate.java @@ -42,6 +42,7 @@ import org.apache.tez.common.counters.TezCounter; import org.apache.tez.dag.api.DAG; import org.apache.tez.dag.api.Edge; import org.apache.tez.dag.api.InputDescriptor; +import org.apache.tez.dag.api.InputInitializerDescriptor; import org.apache.tez.dag.api.ProcessorDescriptor; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezException; @@ -213,13 +214,13 @@ public class IntersectValidate extends Configured implements Tool { ForwardingProcessor.class.getName()), -1, MRHelpers.getMapResource(tezConf)).addInput("lhs", new InputDescriptor( MRInput.class.getName()).setUserPayload(streamInputPayload), - MRInputAMSplitGenerator.class); + new InputInitializerDescriptor(MRInputAMSplitGenerator.class.getName())); Vertex rhsVertex = new Vertex(RHS_INPUT_NAME, new ProcessorDescriptor( ForwardingProcessor.class.getName()), -1, MRHelpers.getMapResource(tezConf)).addInput("rhs", new InputDescriptor( MRInput.class.getName()).setUserPayload(hashInputPayload), - MRInputAMSplitGenerator.class); + new InputInitializerDescriptor(MRInputAMSplitGenerator.class.getName())); Vertex intersectValidateVertex = new Vertex("intersectvalidate", new ProcessorDescriptor(IntersectValidateProcessor.class.getName()), http://git-wip-us.apache.org/repos/asf/tez/blob/74d04a48/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java index 015e8e3..14fe441 100644 --- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java +++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java @@ -26,7 +26,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -65,6 +64,7 @@ import org.apache.tez.client.TezClientUtils; import org.apache.tez.client.TezClient; import org.apache.tez.dag.api.DAG; import org.apache.tez.dag.api.Edge; +import org.apache.tez.dag.api.InputInitializerDescriptor; import org.apache.tez.dag.api.ProcessorDescriptor; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezUncheckedException; @@ -575,12 +575,14 @@ public class MRRSleepJob extends Configured implements Tool { } if (generateSplitsInAM) { - MRHelpers.addMRInput(mapVertex, mapInputPayload, MRInputAMSplitGenerator.class); + MRHelpers.addMRInput(mapVertex, mapInputPayload, + new InputInitializerDescriptor(MRInputAMSplitGenerator.class.getName())); } else { if (writeSplitsToDFS) { MRHelpers.addMRInput(mapVertex, mapInputPayload, null); } else { - MRHelpers.addMRInput(mapVertex, mapInputPayload, MRInputSplitDistributor.class); + MRHelpers.addMRInput(mapVertex, mapInputPayload, + new InputInitializerDescriptor(MRInputSplitDistributor.class.getName())); } } vertices.add(mapVertex); http://git-wip-us.apache.org/repos/asf/tez/blob/74d04a48/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java index 051bfee..f66e60f 100644 --- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java +++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java @@ -59,6 +59,7 @@ import org.apache.tez.client.TezClientUtils; import org.apache.tez.client.TezClient; import org.apache.tez.dag.api.DAG; import org.apache.tez.dag.api.Edge; +import org.apache.tez.dag.api.InputInitializerDescriptor; import org.apache.tez.dag.api.ProcessorDescriptor; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezException; @@ -224,7 +225,9 @@ public class OrderedWordCount extends Configured implements Tool { Class<? extends TezRootInputInitializer> initializerClazz = generateSplitsInClient ? null : MRInputAMSplitGenerator.class; - MRHelpers.addMRInput(mapVertex, mapInputPayload, initializerClazz); + MRHelpers.addMRInput(mapVertex, mapInputPayload, + (initializerClazz==null) ? null : + new InputInitializerDescriptor(initializerClazz.getName())); vertices.add(mapVertex); ByteArrayOutputStream iROutputStream = new ByteArrayOutputStream(4096); http://git-wip-us.apache.org/repos/asf/tez/blob/74d04a48/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java index 74bf570..e2f073f 100644 --- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java +++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java @@ -38,6 +38,8 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.tez.client.TezClient; import org.apache.tez.dag.api.GroupInputEdge; +import org.apache.tez.dag.api.InputInitializerDescriptor; +import org.apache.tez.dag.api.OutputCommitterDescriptor; import org.apache.tez.dag.api.VertexGroup; import org.apache.tez.dag.api.DAG; import org.apache.tez.dag.api.Edge; @@ -180,17 +182,19 @@ public class UnionExample { Vertex mapVertex1 = new Vertex("map1", new ProcessorDescriptor( TokenProcessor.class.getName()), numMaps, MRHelpers.getMapResource(tezConf)); - mapVertex1.addInput("MRInput", id, MRInputAMSplitGenerator.class); + InputInitializerDescriptor iid = + new InputInitializerDescriptor(MRInputAMSplitGenerator.class.getName()); + mapVertex1.addInput("MRInput", id, iid); Vertex mapVertex2 = new Vertex("map2", new ProcessorDescriptor( TokenProcessor.class.getName()), numMaps, MRHelpers.getMapResource(tezConf)); - mapVertex2.addInput("MRInput", id, MRInputAMSplitGenerator.class); + mapVertex2.addInput("MRInput", id, iid); Vertex mapVertex3 = new Vertex("map3", new ProcessorDescriptor( TokenProcessor.class.getName()), numMaps, MRHelpers.getMapResource(tezConf)); - mapVertex3.addInput("MRInput", id, MRInputAMSplitGenerator.class); + mapVertex3.addInput("MRInput", id, iid); Vertex checkerVertex = new Vertex("checker", new ProcessorDescriptor( @@ -202,14 +206,15 @@ public class UnionExample { OutputDescriptor od = new OutputDescriptor(MROutput.class.getName()) .setUserPayload(MROutput.createUserPayload( outputConf, TextOutputFormat.class.getName(), true)); - checkerVertex.addOutput("union", od, MROutputCommitter.class); + OutputCommitterDescriptor ocd = new OutputCommitterDescriptor(MROutputCommitter.class.getName()); + checkerVertex.addOutput("union", od, ocd); Configuration allPartsConf = new Configuration(tezConf); allPartsConf.set(FileOutputFormat.OUTDIR, outputPath+"-all-parts"); OutputDescriptor od2 = new OutputDescriptor(MROutput.class.getName()) .setUserPayload(MROutput.createUserPayload( allPartsConf, TextOutputFormat.class.getName(), true)); - checkerVertex.addOutput("all-parts", od2, MROutputCommitter.class); + checkerVertex.addOutput("all-parts", od2, ocd); Configuration partsConf = new Configuration(tezConf); partsConf.set(FileOutputFormat.OUTDIR, outputPath+"-parts"); @@ -218,7 +223,7 @@ public class UnionExample { OutputDescriptor od1 = new OutputDescriptor(MROutput.class.getName()) .setUserPayload(MROutput.createUserPayload( partsConf, TextOutputFormat.class.getName(), true)); - unionVertex.addOutput("parts", od1, MROutputCommitter.class); + unionVertex.addOutput("parts", od1, ocd); OrderedPartitionedKVEdgeConfigurer edgeConf = OrderedPartitionedKVEdgeConfigurer .newBuilder(Text.class.getName(), IntWritable.class.getName(), http://git-wip-us.apache.org/repos/asf/tez/blob/74d04a48/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java index d5e6154..61f5cd9 100644 --- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java +++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java @@ -42,6 +42,8 @@ import org.apache.tez.client.TezClient; import org.apache.tez.dag.api.DAG; import org.apache.tez.dag.api.Edge; import org.apache.tez.dag.api.InputDescriptor; +import org.apache.tez.dag.api.InputInitializerDescriptor; +import org.apache.tez.dag.api.OutputCommitterDescriptor; import org.apache.tez.dag.api.OutputDescriptor; import org.apache.tez.dag.api.ProcessorDescriptor; import org.apache.tez.dag.api.TezConfiguration; @@ -61,6 +63,7 @@ import org.apache.tez.runtime.library.api.KeyValuesReader; import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfigurer; import com.google.common.base.Preconditions; + import org.apache.tez.runtime.library.partitioner.HashPartitioner; @@ -116,21 +119,24 @@ public class WordCount extends Configured implements Tool { InputDescriptor id = new InputDescriptor(MRInput.class.getName()) .setUserPayload(MRInput.createUserPayload(inputConf, TextInputFormat.class.getName(), true, true)); + InputInitializerDescriptor iid = new InputInitializerDescriptor( + MRInputAMSplitGenerator.class.getName()); Configuration outputConf = new Configuration(tezConf); outputConf.set(FileOutputFormat.OUTDIR, outputPath); OutputDescriptor od = new OutputDescriptor(MROutput.class.getName()) .setUserPayload(MROutput.createUserPayload( outputConf, TextOutputFormat.class.getName(), true)); + OutputCommitterDescriptor ocd = new OutputCommitterDescriptor(MROutputCommitter.class.getName()); Vertex tokenizerVertex = new Vertex("tokenizer", new ProcessorDescriptor( TokenProcessor.class.getName()), -1, MRHelpers.getMapResource(tezConf)); - tokenizerVertex.addInput("MRInput", id, MRInputAMSplitGenerator.class); + tokenizerVertex.addInput("MRInput", id, iid); Vertex summerVertex = new Vertex("summer", new ProcessorDescriptor( SumProcessor.class.getName()), 1, MRHelpers.getReduceResource(tezConf)); - summerVertex.addOutput("MROutput", od, MROutputCommitter.class); + summerVertex.addOutput("MROutput", od, ocd); OrderedPartitionedKVEdgeConfigurer edgeConf = OrderedPartitionedKVEdgeConfigurer .newBuilder(Text.class.getName(), IntWritable.class.getName(), http://git-wip-us.apache.org/repos/asf/tez/blob/74d04a48/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java index e919516..9311754 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java @@ -56,12 +56,12 @@ public class MROutputCommitter extends OutputCommitter { @Override public void initialize(OutputCommitterContext context) throws IOException { - byte[] userPayload = context.getUserPayload(); + byte[] userPayload = context.getOutputUserPayload(); if (userPayload == null) { jobConf = new JobConf(); } else { jobConf = new JobConf( - MRHelpers.createConfFromUserPayload(context.getUserPayload())); + MRHelpers.createConfFromUserPayload(context.getOutputUserPayload())); } // Read all credentials into the credentials instance stored in JobConf. http://git-wip-us.apache.org/repos/asf/tez/blob/74d04a48/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java index 87a98b9..4e1e0b6 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java @@ -66,7 +66,7 @@ public class MRInputAMSplitGenerator implements TezRootInputInitializer { sw = new Stopwatch().start(); } MRInputUserPayloadProto userPayloadProto = MRHelpers - .parseMRInputPayload(rootInputContext.getUserPayload()); + .parseMRInputPayload(rootInputContext.getInputUserPayload()); if (LOG.isDebugEnabled()) { sw.stop(); LOG.debug("Time to parse MRInput payload into prot: " http://git-wip-us.apache.org/repos/asf/tez/blob/74d04a48/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java index 7d11ab3..a2aa5d8 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java @@ -59,7 +59,7 @@ public class MRInputSplitDistributor implements TezRootInputInitializer { if (LOG.isDebugEnabled()) { sw = new Stopwatch().start(); } - MRInputUserPayloadProto userPayloadProto = MRHelpers.parseMRInputPayload(rootInputContext.getUserPayload()); + MRInputUserPayloadProto userPayloadProto = MRHelpers.parseMRInputPayload(rootInputContext.getInputUserPayload()); if (LOG.isDebugEnabled()) { sw.stop(); LOG.debug("Time to parse MRInput payload into prot: " http://git-wip-us.apache.org/repos/asf/tez/blob/74d04a48/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java index 38d0711..29ea6a6 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java @@ -64,6 +64,8 @@ import org.apache.tez.common.TezUtils; import org.apache.tez.common.TezYARNUtils; import org.apache.tez.common.security.TokenCache; import org.apache.tez.dag.api.InputDescriptor; +import org.apache.tez.dag.api.InputInitializerDescriptor; +import org.apache.tez.dag.api.OutputCommitterDescriptor; import org.apache.tez.dag.api.OutputDescriptor; import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.api.Vertex; @@ -77,7 +79,6 @@ import org.apache.tez.mapreduce.partition.MRPartitioner; import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRInputUserPayloadProto; import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitProto; import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitsProto; -import org.apache.tez.runtime.api.TezRootInputInitializer; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; import com.google.common.base.Function; @@ -979,7 +980,7 @@ public class MRHelpers { * @param initClazz class to init the input in the AM */ public static void addMRInput(Vertex vertex, byte[] userPayload, - Class<? extends TezRootInputInitializer> initClazz) { + InputInitializerDescriptor initClazz) { InputDescriptor id = new InputDescriptor(MRInputLegacy.class.getName()) .setUserPayload(userPayload); vertex.addInput("MRInput", id, initClazz); @@ -997,14 +998,14 @@ public class MRHelpers { public static void addMROutput(Vertex vertex, byte[] userPayload) { OutputDescriptor od = new OutputDescriptor(MROutput.class.getName()) .setUserPayload(userPayload); - vertex.addOutput("MROutput", od, MROutputCommitter.class); + vertex.addOutput("MROutput", od, new OutputCommitterDescriptor(MROutputCommitter.class.getName())); } @Private public static void addMROutputLegacy(Vertex vertex, byte[] userPayload) { OutputDescriptor od = new OutputDescriptor(MROutputLegacy.class.getName()) .setUserPayload(userPayload); - vertex.addOutput("MROutput", od, MROutputCommitter.class); + vertex.addOutput("MROutput", od, new OutputCommitterDescriptor(MROutputCommitter.class.getName())); } @SuppressWarnings("unchecked") http://git-wip-us.apache.org/repos/asf/tez/blob/74d04a48/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java index 4e3e5e7..eb2ba89 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java @@ -94,8 +94,10 @@ public class MRInput extends MRInputBase { * the InputFormat will be grouped in the AM based on available * resources, locality etc. This option may be set to true only when * using MRInputAMSplitGenerator as the initializer class in - * {@link Vertex#addInput(String, org.apache.tez.dag.api.InputDescriptor, Class)} - * @return returns the user payload to be set on the InputDescriptor of MRInput + * {@link Vertex#addInput(String, org.apache.tez.dag.api.InputDescriptor, + * org.apache.tez.dag.api.InputInitializerDescriptor)} + * @return returns the user payload to be set on the InputDescriptor of + * MRInput * @throws IOException */ public static byte[] createUserPayload(Configuration conf, http://git-wip-us.apache.org/repos/asf/tez/blob/74d04a48/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputSplitDistributor.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputSplitDistributor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputSplitDistributor.java index 5d6ec0d..84c945e 100644 --- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputSplitDistributor.java +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputSplitDistributor.java @@ -168,7 +168,7 @@ public class TestMRInputSplitDistributor { } @Override - public byte[] getUserPayload() { + public byte[] getInputUserPayload() { return payload; } @@ -202,6 +202,11 @@ public class TestMRInputSplitDistributor { throw new UnsupportedOperationException("getVertexNumTasks not implemented in this mock"); } + @Override + public byte[] getUserPayload() { + throw new UnsupportedOperationException("getUserPayload not implemented in this mock"); + } + } @Private http://git-wip-us.apache.org/repos/asf/tez/blob/74d04a48/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java b/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java index ea0378d..67fc6a5 100644 --- a/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java +++ b/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java @@ -75,6 +75,7 @@ import org.apache.tez.dag.api.EdgeProperty.DataMovementType; import org.apache.tez.dag.api.EdgeProperty.DataSourceType; import org.apache.tez.dag.api.EdgeProperty.SchedulingType; import org.apache.tez.dag.api.InputDescriptor; +import org.apache.tez.dag.api.InputInitializerDescriptor; import org.apache.tez.dag.api.OutputDescriptor; import org.apache.tez.dag.api.ProcessorDescriptor; import org.apache.tez.dag.api.TezConfiguration; @@ -472,7 +473,9 @@ public class TestMRRJobsDAGApi { Vertex stage1Vertex = new Vertex("map", new ProcessorDescriptor( MapProcessor.class.getName()).setUserPayload(stage1Payload), stage1NumTasks, Resource.newInstance(256, 1)); - MRHelpers.addMRInput(stage1Vertex, stage1InputPayload, inputInitializerClazz); + MRHelpers.addMRInput(stage1Vertex, stage1InputPayload, + (inputInitializerClazz==null) ? null : + new InputInitializerDescriptor(inputInitializerClazz.getName())); Vertex stage2Vertex = new Vertex("ireduce", new ProcessorDescriptor( ReduceProcessor.class.getName()).setUserPayload(stage2Payload), 1, Resource.newInstance(256, 1)); @@ -666,7 +669,7 @@ public class TestMRRJobsDAGApi { public static class MRInputAMSplitGeneratorRelocalizationTest extends MRInputAMSplitGenerator { public List<Event> initialize(TezRootInputInitializerContext rootInputContext) throws Exception { MRInputUserPayloadProto userPayloadProto = MRHelpers - .parseMRInputPayload(rootInputContext.getUserPayload()); + .parseMRInputPayload(rootInputContext.getInputUserPayload()); Configuration conf = MRHelpers.createConfFromByteString(userPayloadProto .getConfigurationBytes()); http://git-wip-us.apache.org/repos/asf/tez/blob/74d04a48/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java index 87e6472..b0f937a 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java +++ b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java @@ -29,6 +29,7 @@ import org.apache.tez.client.TezClientUtils; import org.apache.tez.client.TezClient; import org.apache.tez.dag.api.DAG; import org.apache.tez.dag.api.InputDescriptor; +import org.apache.tez.dag.api.InputInitializerDescriptor; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.client.DAGClient; import org.apache.tez.dag.api.client.DAGStatus; @@ -174,7 +175,7 @@ public class TestDAGRecovery { DAG dag = SimpleVTestDAG.createDAG("DelayedInitDAG", null); dag.getVertex("v1").addInput("i1", new InputDescriptor(NoOpInput.class.getName()), - FailingInputInitializer.class); + new InputInitializerDescriptor(FailingInputInitializer.class.getName())); runDAGAndVerify(dag, State.SUCCEEDED); } http://git-wip-us.apache.org/repos/asf/tez/blob/74d04a48/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery2.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery2.java b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery2.java index 06e4a87..6b3727a 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery2.java +++ b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery2.java @@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.tez.client.TezClientUtils; import org.apache.tez.client.TezClient; import org.apache.tez.dag.api.DAG; +import org.apache.tez.dag.api.OutputCommitterDescriptor; import org.apache.tez.dag.api.OutputDescriptor; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.client.DAGClient; @@ -177,8 +178,9 @@ public class TestDAGRecovery2 { od.setUserPayload(new MultiAttemptDAG.FailingOutputCommitter.FailingOutputCommitterConfig(true) .toUserPayload()); - dag.getVertex("v3").addOutput("FailingOutput", od, - MultiAttemptDAG.FailingOutputCommitter.class); + OutputCommitterDescriptor ocd = new OutputCommitterDescriptor( + MultiAttemptDAG.FailingOutputCommitter.class.getName()); + dag.getVertex("v3").addOutput("FailingOutput", od, ocd); runDAGAndVerify(dag, State.FAILED); } http://git-wip-us.apache.org/repos/asf/tez/blob/74d04a48/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java b/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java index 51d3d9e..1dc8879 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java +++ b/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java @@ -154,7 +154,7 @@ public class MultiAttemptDAG { public void initialize(OutputCommitterContext context) throws Exception { FailingOutputCommitterConfig config = new FailingOutputCommitterConfig(); - config.fromUserPayload(context.getUserPayload()); + config.fromUserPayload(context.getOutputUserPayload()); failOnCommit = config.failOnCommit; }
