This is an automated email from the ASF dual-hosted git repository. abstractdog pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/tez.git
The following commit(s) were added to refs/heads/master by this push: new 154fd341d TEZ-4472: Use unique DAG names in tests. (#293) (Ayush Saxena reviewed by Laszlo Bodor) 154fd341d is described below commit 154fd341ddf9e53fc4e475d7a1db70a57d8914c3 Author: Ayush Saxena <ayushsax...@apache.org> AuthorDate: Fri Jun 23 17:34:40 2023 +0530 TEZ-4472: Use unique DAG names in tests. (#293) (Ayush Saxena reviewed by Laszlo Bodor) --- .../java/org/apache/tez/client/TestTezClient.java | 27 ++--- .../org/apache/tez/client/TestTezClientUtils.java | 6 +- .../test/java/org/apache/tez/dag/api/TestDAG.java | 16 +-- .../java/org/apache/tez/dag/api/TestDAGPlan.java | 16 +-- .../java/org/apache/tez/dag/api/TestDAGVerify.java | 72 ++++++------- ...estDAGClientAMProtocolBlockingPBServerImpl.java | 2 +- .../apache/tez/dag/app/TestMockDAGAppMaster.java | 4 +- .../org/apache/tez/dag/app/TestSpeculation.java | 8 +- .../apache/tez/dag/app/dag/impl/TestCommit.java | 116 +++++++++++---------- .../apache/tez/dag/app/dag/impl/TestDAGImpl.java | 11 +- .../tez/dag/app/dag/impl/TestVertexImpl.java | 6 +- .../apache/tez/dag/history/utils/TestDAGUtils.java | 9 +- .../apache/tez/mapreduce/TestMRRJobsDAGApi.java | 4 +- .../src/test/java/org/apache/tez/test/TestAM.java | 2 +- .../org/apache/tez/test/TestFaultTolerance.java | 2 +- .../java/org/apache/tez/test/TestLocalMode.java | 19 ++-- 16 files changed, 170 insertions(+), 150 deletions(-) diff --git a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java index 728076cbe..e4d03cc60 100644 --- a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java +++ b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java @@ -202,13 +202,13 @@ public class TestTezClient { } @Test (timeout = 5000) - public void testTezclientApp() throws Exception { - testTezClient(false, true); + public void testTezClientApp() throws Exception { + testTezClient(false, true, "testTezClientApp"); } @Test (timeout = 5000) - public void testTezclientSession() throws Exception { - testTezClient(true, true); + public void testTezClientSession() throws Exception { + testTezClient(true, true, "testTezClientSession"); } @Test (timeout = 5000) @@ -246,7 +246,7 @@ public class TestTezClient { ProcessorDescriptor processorDescriptor = ProcessorDescriptor.create("P"); processorDescriptor.setUserPayload(UserPayload.create(ByteBuffer.allocate(payloadSize))); Vertex vertex = Vertex.create("Vertex", processorDescriptor, 1, Resource.newInstance(1, 1)); - DAG dag = DAG.create("DAG").addVertex(vertex); + DAG dag = DAG.create("DAG-testTezClientSessionLargeDAGPlan").addVertex(vertex); client.start(); client.addAppMasterLocalFiles(localResourceMap); @@ -277,7 +277,7 @@ public class TestTezClient { @Test (timeout = 5000) public void testGetClient() throws Exception { /* BEGIN first TezClient usage without calling stop() */ - TezClientForTest client = testTezClient(true, false); + TezClientForTest client = testTezClient(true, false, "testGetClient"); /* END first TezClient usage without calling stop() */ /* BEGIN reuse of AM from new TezClient */ @@ -295,7 +295,7 @@ public class TestTezClient { LocalResourceVisibility.PUBLIC, 1, 1)); Vertex vertex = Vertex.create("Vertex", ProcessorDescriptor.create("P"), 1, Resource.newInstance(1, 1)); - DAG dag = DAG.create("DAG").addVertex(vertex).addTaskLocalFiles(lrDAG); + DAG dag = DAG.create("DAG-testGetClient").addVertex(vertex).addTaskLocalFiles(lrDAG); //Bind TezClient to existing app and submit a dag DAGClient dagClient = client2.getClient(existingAppId).submitDAG(dag); @@ -317,7 +317,7 @@ public class TestTezClient { /* END reuse of AM from new TezClient */ } - public TezClientForTest testTezClient(boolean isSession, boolean shouldStop) throws Exception { + public TezClientForTest testTezClient(boolean isSession, boolean shouldStop, String dagName) throws Exception { Map<String, LocalResource> lrs = Maps.newHashMap(); String lrName1 = "LR1"; lrs.put(lrName1, LocalResource.newInstance(URL.newInstance("file", "localhost", 0, "/test"), @@ -351,7 +351,7 @@ public class TestTezClient { LocalResourceVisibility.PUBLIC, 1, 1)); Vertex vertex = Vertex.create("Vertex", ProcessorDescriptor.create("P"), 1, Resource.newInstance(1, 1)); - DAG dag = DAG.create("DAG").addVertex(vertex).addTaskLocalFiles(lrDAG); + DAG dag = DAG.create("DAG-" + dagName).addVertex(vertex).addTaskLocalFiles(lrDAG); if (!isSession) { when(client.sessionAmProxy.getAMStatus(any(), any())) .thenReturn(GetAMStatusResponseProto.newBuilder().setStatus(TezAppMasterStatusProto.SHUTDOWN).build()); @@ -391,7 +391,7 @@ public class TestTezClient { when(client.mockYarnClient.getApplicationReport(appId2).getYarnApplicationState()) .thenReturn(YarnApplicationState.RUNNING); - dag = DAG.create("DAG").addVertex( + dag = DAG.create("DAG-2-" + dagName).addVertex( Vertex.create("Vertex", ProcessorDescriptor.create("P"), 1, Resource.newInstance(1, 1))); dagClient = client.submitDAG(dag); @@ -603,7 +603,8 @@ public class TestTezClient { LocalResourceVisibility.PUBLIC, 1, 1)); Vertex vertex = Vertex.create("Vertex", ProcessorDescriptor.create("P"), 1, Resource.newInstance(1, 1)).addTaskLocalFiles(lrVertex); - DAG dag = DAG.create("DAG").addVertex(vertex).addTaskLocalFiles(lrDAG); + DAG dag = + DAG.create("DAG-testMultipleSubmissionsJob-session-" + isSession).addVertex(vertex).addTaskLocalFiles(lrDAG); // the dag resource will be added to the vertex once client1.submitDAG(dag); @@ -694,7 +695,7 @@ public class TestTezClient { Vertex vertex = Vertex.create("Vertex", ProcessorDescriptor.create("P"), 1, Resource.newInstance(1, 1)); - DAG dag = DAG.create("DAG").addVertex(vertex); + DAG dag = DAG.create("DAG-testSubmitDAGAppFailed").addVertex(vertex); try { client.submitDAG(dag); @@ -884,7 +885,7 @@ public class TestTezClient { Vertex vertex2 = Vertex.create("Vertex2", ProcessorDescriptor.create("P2"), 1, Resource.newInstance(1, 1)); vertex2.setTaskLaunchCmdOpts("-XX:+UseParallelGC -XX:+UseG1GC"); - DAG dag = DAG.create("DAG").addVertex(vertex1).addVertex(vertex2).addTaskLocalFiles(lrDAG); + DAG dag = DAG.create("DAG-testClientResubmit").addVertex(vertex1).addVertex(vertex2).addTaskLocalFiles(lrDAG); for (int i = 0; i < 3; ++i) { try { client.submitDAG(dag); diff --git a/tez-api/src/test/java/org/apache/tez/client/TestTezClientUtils.java b/tez-api/src/test/java/org/apache/tez/client/TestTezClientUtils.java index 0341f27f6..a2f1ce117 100644 --- a/tez-api/src/test/java/org/apache/tez/client/TestTezClientUtils.java +++ b/tez-api/src/test/java/org/apache/tez/client/TestTezClientUtils.java @@ -408,7 +408,7 @@ public class TestTezClientUtils { tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, STAGING_DIR.getAbsolutePath()); ApplicationId appId = ApplicationId.newInstance(1000, 1); - DAG dag = DAG.create("testdag"); + DAG dag = DAG.create("testdag-testSessionTokenInAmClc"); dag.addVertex(Vertex.create("testVertex", ProcessorDescriptor.create("processorClassname"), 1) .setTaskLaunchCmdOpts("initialLaunchOpts")); @@ -449,7 +449,7 @@ public class TestTezClientUtils { Credentials credentials = new Credentials(); JobTokenSecretManager jobTokenSecretManager = new JobTokenSecretManager(); TezClientUtils.createSessionToken(appId.toString(), jobTokenSecretManager, credentials); - DAG dag = DAG.create("testdag"); + DAG dag = DAG.create("DAG-testAMLoggingOptsSimple"); dag.addVertex(Vertex.create("testVertex", ProcessorDescriptor.create("processorClassname"), 1) .setTaskLaunchCmdOpts("initialLaunchOpts")); AMConfiguration amConf = @@ -490,7 +490,7 @@ public class TestTezClientUtils { Credentials credentials = new Credentials(); JobTokenSecretManager jobTokenSecretManager = new JobTokenSecretManager(); TezClientUtils.createSessionToken(appId.toString(), jobTokenSecretManager, credentials); - DAG dag = DAG.create("testdag"); + DAG dag = DAG.create("DAG-testAMLoggingOptsPerLogger"); dag.addVertex(Vertex.create("testVertex", ProcessorDescriptor.create("processorClassname"), 1) .setTaskLaunchCmdOpts("initialLaunchOpts")); AMConfiguration amConf = diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java index 05c4e30cf..5ec57c414 100644 --- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java +++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java @@ -47,7 +47,7 @@ public class TestDAG { dummyTaskCount, dummyTaskResource); Vertex v2 = Vertex.create("v1", ProcessorDescriptor.create("Processor"), dummyTaskCount, dummyTaskResource); - DAG dag = DAG.create("testDAG"); + DAG dag = DAG.create("DAG-testDuplicatedVertices"); dag.addVertex(v1); try { dag.addVertex(v2); @@ -74,7 +74,7 @@ public class TestDAG { SchedulingType.CONCURRENT, OutputDescriptor.create("output"), InputDescriptor.create("input"))); - DAG dag = DAG.create("testDAG"); + DAG dag = DAG.create("DAG-testDuplicatedEdges"); dag.addVertex(v1); dag.addVertex(v2); dag.addEdge(edge1); @@ -96,7 +96,7 @@ public class TestDAG { Vertex v3 = Vertex.create("v3", ProcessorDescriptor.create("Processor"), dummyTaskCount, dummyTaskResource); - DAG dag = DAG.create("testDAG"); + DAG dag = DAG.create("DAG-testDuplicatedVertexGroup"); dag.createVertexGroup("group_1", v1, v2); try { @@ -123,7 +123,7 @@ public class TestDAG { ProcessorDescriptor.create("Processor"), dummyTaskCount, dummyTaskResource); - DAG dag = DAG.create("testDag"); + DAG dag = DAG.create("DAG-testDuplicatedGroupInputEdge"); String groupName1 = "uv12"; VertexGroup uv12 = dag.createVertexGroup(groupName1, v1, v2); @@ -156,7 +156,7 @@ public class TestDAG { @Test(timeout = 5000) public void testDAGConf() { - DAG dag = DAG.create("dag1"); + DAG dag = DAG.create("DAG-testDAGConf"); // it's OK to set custom configuration dag.setConf("unknown_conf", "value"); @@ -281,7 +281,7 @@ public class TestDAG { @Test(timeout = 5000) public void testDuplicatedOutput_2() { - DAG dag = DAG.create("dag1"); + DAG dag = DAG.create("DAG-testDuplicatedOutput_2"); Vertex v1 = Vertex.create("v1", ProcessorDescriptor.create("dummyProcessor")); DataSinkDescriptor dataSink = DataSinkDescriptor.create(OutputDescriptor.create("dummyOutput"), null, null); @@ -354,7 +354,7 @@ public class TestDAG { Resource.newInstance(1, 1)); Vertex v2 = Vertex.create("v2", ProcessorDescriptor.create("dummyProcessor2"), 1, Resource.newInstance(1, 1)); - DAG dag = DAG.create("dag1").addVertex(v1).addVertex(v2).addTaskLocalFiles(lrDAG); + DAG dag = DAG.create("DAG-testRecreateDAG").addVertex(v1).addVertex(v2).addTaskLocalFiles(lrDAG); TezConfiguration tezConf = new TezConfiguration(); DAGPlan firstPlan = dag.createDag(tezConf, null, null, null, false); @@ -375,7 +375,7 @@ public class TestDAG { Resource.newInstance(1, 1)); Vertex v2 = Vertex.create("v2", ProcessorDescriptor.create("dummyProcessor2"), 1, Resource.newInstance(1, 1)); - DAG dag = DAG.create("dag1").addVertex(v1).addVertex(v2).addTaskLocalFiles(lrDAG); + DAG dag = DAG.create("DAG-testCreateDAGForHistoryLogLevel").addVertex(v1).addVertex(v2).addTaskLocalFiles(lrDAG); TezConfiguration tezConf = new TezConfiguration(); diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java index 8e1011feb..d5f6b0af9 100644 --- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java +++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java @@ -107,7 +107,7 @@ public class TestDAGPlan { @Test(timeout = 5000) public void testEdgeManagerSerde() { - DAG dag = DAG.create("testDag"); + DAG dag = DAG.create("DAG-testEdgeManagerSerde"); ProcessorDescriptor pd1 = ProcessorDescriptor.create("processor1") .setUserPayload(UserPayload.create(ByteBuffer.wrap("processor1Bytes".getBytes()))); ProcessorDescriptor pd2 = ProcessorDescriptor.create("processor2") @@ -144,7 +144,7 @@ public class TestDAGPlan { @Test(timeout = 5000) public void testUserPayloadSerde() { - DAG dag = DAG.create("testDag"); + DAG dag = DAG.create("DAG-testUserPayloadSerde"); ProcessorDescriptor pd1 = ProcessorDescriptor.create("processor1"). setUserPayload(UserPayload.create(ByteBuffer.wrap("processor1Bytes".getBytes()))); ProcessorDescriptor pd2 = ProcessorDescriptor.create("processor2"). @@ -205,7 +205,7 @@ public class TestDAGPlan { @Test(timeout = 5000) public void userVertexOrderingIsMaintained() { - DAG dag = DAG.create("testDag"); + DAG dag = DAG.create("DAG-userVertexOrderingIsMaintained"); ProcessorDescriptor pd1 = ProcessorDescriptor.create("processor1"). setUserPayload(UserPayload.create(ByteBuffer.wrap("processor1Bytes".getBytes()))); ProcessorDescriptor pd2 = ProcessorDescriptor.create("processor2"). @@ -278,7 +278,7 @@ public class TestDAGPlan { @Test (timeout=5000) public void testCredentialsSerde() { - DAG dag = DAG.create("testDag"); + DAG dag = DAG.create("DAG-testCredentialsSerde"); ProcessorDescriptor pd1 = ProcessorDescriptor.create("processor1"). setUserPayload(UserPayload.create(ByteBuffer.wrap("processor1Bytes".getBytes()))); ProcessorDescriptor pd2 = ProcessorDescriptor.create("processor2"). @@ -322,7 +322,7 @@ public class TestDAGPlan { @Test(timeout = 5000) public void testInvalidExecContext_1() { - DAG dag = DAG.create("dag1"); + DAG dag = DAG.create("DAG-testInvalidExecContext_1"); dag.setExecutionContext(VertexExecutionContext.createExecuteInAm(true)); Vertex v1 = Vertex.create("testvertex", ProcessorDescriptor.create("processor1"), 1); dag.addVertex(v1); @@ -364,7 +364,7 @@ public class TestDAGPlan { VertexExecutionContext.create("plugin", "plugin", "invalidplugin"); - DAG dag = DAG.create("dag1"); + DAG dag = DAG.create("DAG-testInvalidExecContext_2"); dag.setExecutionContext(VertexExecutionContext.createExecuteInContainers(true)); Vertex v1 = Vertex.create("testvertex", ProcessorDescriptor.create("processor1"), 1); dag.addVertex(v1); @@ -429,7 +429,7 @@ public class TestDAGPlan { @Test(timeout = 5000) public void testServiceDescriptorPropagation() { - DAG dag = DAG.create("testDag"); + DAG dag = DAG.create("DAG-testServiceDescriptorPropagation"); ProcessorDescriptor pd1 = ProcessorDescriptor.create("processor1"). setUserPayload(UserPayload.create(ByteBuffer.wrap("processor1Bytes".getBytes()))); ProcessorDescriptor pd2 = ProcessorDescriptor.create("processor2"). @@ -492,7 +492,7 @@ public class TestDAGPlan { @Test(timeout = 5000) public void testInvalidJavaOpts() { - DAG dag = DAG.create("testDag"); + DAG dag = DAG.create("DAG-testInvalidJavaOpts"); ProcessorDescriptor pd1 = ProcessorDescriptor.create("processor1") .setUserPayload(UserPayload.create(ByteBuffer.wrap("processor1Bytes".getBytes()))); Vertex v1 = Vertex.create("v1", pd1, 10, Resource.newInstance(1024, 1)); diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java index bde4622d9..1ba877769 100644 --- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java +++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java @@ -73,7 +73,7 @@ public class TestDAGVerify { DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, OutputDescriptor.create(dummyOutputClassName), InputDescriptor.create(dummyInputClassName))); - DAG dag = DAG.create("testDag"); + DAG dag = DAG.create("DAG-testVerifyScatterGather"); dag.addVertex(v1); dag.addVertex(v2); dag.addEdge(e1); @@ -94,7 +94,7 @@ public class TestDAGVerify { SchedulingType.SEQUENTIAL, OutputDescriptor.create(dummyOutputClassName), InputDescriptor.create(dummyInputClassName))); - DAG dag = DAG.create("testDag"); + DAG dag = DAG.create("DAG-testVerifyCustomEdge"); dag.addVertex(v1); dag.addVertex(v2); dag.addEdge(e1); @@ -114,7 +114,7 @@ public class TestDAGVerify { DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, OutputDescriptor.create(dummyOutputClassName), InputDescriptor.create(dummyInputClassName))); - DAG dag = DAG.create("testDag"); + DAG dag = DAG.create("DAG-testVerifyOneToOne"); dag.addVertex(v1); dag.addVertex(v2); dag.addEdge(e1); @@ -143,7 +143,7 @@ public class TestDAGVerify { DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, OutputDescriptor.create(dummyOutputClassName), InputDescriptor.create(dummyInputClassName))); - DAG dag = DAG.create("testDag"); + DAG dag = DAG.create("DAG-testVerifyOneToOneInferParallelism"); dag.addVertex(v1); dag.addVertex(v2); dag.addVertex(v3); @@ -177,7 +177,7 @@ public class TestDAGVerify { DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, OutputDescriptor.create(dummyOutputClassName), InputDescriptor.create(dummyInputClassName))); - DAG dag = DAG.create("testDag"); + DAG dag = DAG.create("DAG-testVerifyOneToOneInferParallelismReverseOrder"); dag.addVertex(v3); dag.addVertex(v1); dag.addVertex(v2); @@ -204,7 +204,7 @@ public class TestDAGVerify { DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, OutputDescriptor.create(dummyOutputClassName), InputDescriptor.create(dummyInputClassName))); - DAG dag = DAG.create("testDag"); + DAG dag = DAG.create("DAG-testVerifyOneToOneNoInferParallelism"); dag.addVertex(v1); dag.addVertex(v2); dag.addEdge(e1); @@ -234,7 +234,7 @@ public class TestDAGVerify { DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, OutputDescriptor.create(dummyOutputClassName), InputDescriptor.create(dummyInputClassName))); - DAG dag = DAG.create("testDag"); + DAG dag = DAG.create("DAG-testVerifyOneToOneIncorrectParallelism1"); dag.addVertex(v1); dag.addVertex(v2); dag.addVertex(v3); @@ -280,7 +280,7 @@ public class TestDAGVerify { DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, OutputDescriptor.create(dummyOutputClassName), InputDescriptor.create(dummyInputClassName))); - DAG dag = DAG.create("testDag"); + DAG dag = DAG.create("DAG-testVerifyOneToOneIncorrectParallelism2"); dag.addVertex(v1); dag.addVertex(v2); dag.addVertex(v3); @@ -290,7 +290,7 @@ public class TestDAGVerify { dag.addEdge(e3); try { dag.verify(); - Assert.assertTrue(false); + Assert.fail(); } catch (TezUncheckedException e) { Assert.assertTrue(e.getMessage().contains( "1-1 Edge. Destination vertex parallelism must match source vertex")); @@ -310,7 +310,7 @@ public class TestDAGVerify { DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, OutputDescriptor.create(dummyOutputClassName), InputDescriptor.create(dummyInputClassName))); - DAG dag = DAG.create("testDag"); + DAG dag = DAG.create("DAG-testVerifyBroadcast"); dag.addVertex(v1); dag.addVertex(v2); dag.addEdge(e1); @@ -330,7 +330,7 @@ public class TestDAGVerify { DataSourceType.EPHEMERAL, SchedulingType.SEQUENTIAL, OutputDescriptor.create(dummyOutputClassName), InputDescriptor.create(dummyInputClassName))); - DAG dag = DAG.create("testDag"); + DAG dag = DAG.create("DAG-testVerify3"); dag.addVertex(v1); dag.addVertex(v2); dag.addEdge(e1); @@ -350,7 +350,7 @@ public class TestDAGVerify { DataSourceType.EPHEMERAL, SchedulingType.CONCURRENT, OutputDescriptor.create(dummyOutputClassName), InputDescriptor.create(dummyInputClassName))); - DAG dag = DAG.create("testDag"); + DAG dag = DAG.create("DAG-testVerify4"); dag.addVertex(v1); dag.addVertex(v2); dag.addEdge(e1); @@ -397,7 +397,7 @@ public class TestDAGVerify { DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, OutputDescriptor.create("dummy output class"), InputDescriptor.create("dummy input class"))); - DAG dag = DAG.create("testDag"); + DAG dag = DAG.create("DAG-testCycle1"); dag.addVertex(v1); dag.addVertex(v2); dag.addVertex(v3); @@ -457,7 +457,7 @@ public class TestDAGVerify { DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, OutputDescriptor.create("dummy output class"), InputDescriptor.create("dummy input class"))); - DAG dag = DAG.create("testDag"); + DAG dag = DAG.create("DAG-testCycle2"); dag.addVertex(v1); dag.addVertex(v2); dag.addVertex(v3); @@ -489,7 +489,7 @@ public class TestDAGVerify { DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, OutputDescriptor.create("dummy output class"), InputDescriptor.create("dummy input class"))); - DAG dag = DAG.create("testDag"); + DAG dag = DAG.create("DAG-testSelfCycle"); dag.addVertex(v1); dag.addEdge(e1); try{ @@ -513,7 +513,7 @@ public class TestDAGVerify { ProcessorDescriptor.create("MapProcessor"), dummyTaskCount, dummyTaskResource); try { - DAG dag = DAG.create("testDag"); + DAG dag = DAG.create("DAG-repeatedVertexName"); dag.addVertex(v1); dag.addVertex(v1repeat); dag.verify(); @@ -543,7 +543,7 @@ public class TestDAGVerify { OutputDescriptor.create("dummy output class"), InputDescriptor.create("dummy input class"))); - DAG dag = DAG.create("testDag"); + DAG dag = DAG.create("DAG-testInputAndInputVertexNameCollision"); dag.addVertex(v1); dag.addVertex(v2); dag.addEdge(e1); @@ -567,7 +567,7 @@ public class TestDAGVerify { OutputDescriptor.create("dummy output class"), InputDescriptor.create("dummy input class"))); - DAG dag = DAG.create("testDag"); + DAG dag = DAG.create("DAG-testOutputAndOutputVertexNameCollision"); dag.addVertex(v1); dag.addVertex(v2); dag.addEdge(e1); @@ -585,7 +585,7 @@ public class TestDAGVerify { v1.addDataSink("v2", DataSinkDescriptor.create(null, null, null)); - DAG dag = DAG.create("testDag"); + DAG dag = DAG.create("DAG-testOutputAndVertexNameCollision"); dag.addVertex(v1); dag.addVertex(v2); dag.verify(); @@ -602,7 +602,7 @@ public class TestDAGVerify { v1.addDataSource("v2", DataSourceDescriptor.create(null, null, null)); - DAG dag = DAG.create("testDag"); + DAG dag = DAG.create("DAG-testInputAndVertexNameCollision"); dag.addVertex(v1); dag.addVertex(v2); dag.verify(); @@ -632,7 +632,7 @@ public class TestDAGVerify { DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, OutputDescriptor.create("dummy output class"), InputDescriptor.create("dummy input class"))); - DAG dag = DAG.create("testDag"); + DAG dag = DAG.create("DAG-BinaryInputAllowed"); dag.addVertex(v1); dag.addVertex(v2); dag.addVertex(v3); @@ -656,7 +656,7 @@ public class TestDAGVerify { ProcessorDescriptor.create("Processor"), dummyTaskCount, dummyTaskResource); - DAG dag = DAG.create("testDag"); + DAG dag = DAG.create("DAG-testVertexGroupWithMultipleOutputEdges"); VertexGroup uv12 = dag.createVertexGroup("uv12", v1, v2); OutputDescriptor outDesc = new OutputDescriptor(); uv12.addDataSink("uvOut", DataSinkDescriptor.create(outDesc, null, null)); @@ -712,7 +712,7 @@ public class TestDAGVerify { ProcessorDescriptor.create("Processor"), dummyTaskCount, dummyTaskResource); - DAG dag = DAG.create("testDag"); + DAG dag = DAG.create("DAG-testVertexGroup"); String groupName1 = "uv12"; VertexGroup uv12 = dag.createVertexGroup(groupName1, v1, v2); OutputDescriptor outDesc = new OutputDescriptor(); @@ -794,7 +794,7 @@ public class TestDAGVerify { ProcessorDescriptor.create("Processor"), -1, dummyTaskResource); - DAG dag = DAG.create("testDag"); + DAG dag = DAG.create("DAG-testVertexGroupOneToOne"); String groupName1 = "uv12"; VertexGroup uv12 = dag.createVertexGroup(groupName1, v1, v2); OutputDescriptor outDesc = new OutputDescriptor(); @@ -856,7 +856,7 @@ public class TestDAGVerify { DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, OutputDescriptor.create("dummy output class"), InputDescriptor.create("dummy input class"))); - DAG dag = DAG.create("testDag"); + DAG dag = DAG.create("DAG-BinaryOutput"); dag.addVertex(v1); dag.addVertex(v2); dag.addVertex(v3); @@ -874,7 +874,7 @@ public class TestDAGVerify { public void testDagWithNoVertices() { IllegalStateException ex=null; try { - DAG dag = DAG.create("testDag"); + DAG dag = DAG.create("DAG-testDagWithNoVertices"); dag.verify(); } catch (IllegalStateException e){ @@ -921,7 +921,7 @@ public class TestDAGVerify { @Test(timeout = 5000) public void testMultipleRootInputsAllowed() { - DAG dag = DAG.create("testDag"); + DAG dag = DAG.create("DAG-testMultipleRootInputsAllowed"); ProcessorDescriptor pd1 = ProcessorDescriptor.create("processor1") .setUserPayload(UserPayload.create(ByteBuffer.wrap("processor1Bytes".getBytes()))); Vertex v1 = Vertex.create("v1", pd1, 10, Resource.newInstance(1024, 1)); @@ -966,7 +966,7 @@ public class TestDAGVerify { InputInitializerDescriptor.create(dummyInputInitClassName), dummyTaskCount, null, vLoc, lrs2); v1.addDataSource("i1", ds); - DAG dag = DAG.create("testDag"); + DAG dag = DAG.create("DAG-testDAGCreateDataInference"); dag.addVertex(v1); dag.addTaskLocalFiles(lrs1); DAGPlan dagPlan = dag.createDag(new TezConfiguration(), null, null, null, true); @@ -1007,7 +1007,7 @@ public class TestDAGVerify { null, -1, null, null, lrs2); v1.addDataSource("i1", ds); - DAG dag = DAG.create("testDag"); + DAG dag = DAG.create("DAG-testInferredFilesFail"); dag.addVertex(v1); dag.addTaskLocalFiles(lrs); try { @@ -1027,7 +1027,7 @@ public class TestDAGVerify { @Test(timeout = 5000) public void testDAGAccessControls() { - DAG dag = DAG.create("testDag"); + DAG dag = DAG.create("DAG-testDAGAccessControls"); ProcessorDescriptor pd1 = ProcessorDescriptor.create("processor1") .setUserPayload(UserPayload.create(ByteBuffer.wrap("processor1Bytes".getBytes()))); Vertex v1 = Vertex.create("v1", pd1, 10, Resource.newInstance(1024, 1)); @@ -1055,7 +1055,7 @@ public class TestDAGVerify { // v1 has input initializer @Test(timeout = 5000) public void testDAGInvalidParallelism1() { - DAG dag = DAG.create("testDAG"); + DAG dag = DAG.create("DAG-testDAGInvalidParallelism1"); Vertex v1 = Vertex.create("v1", ProcessorDescriptor.create(dummyProcessorClassName)); dag.addVertex(v1); try { @@ -1076,7 +1076,7 @@ public class TestDAGVerify { // v1 has custom vertex manager @Test(timeout = 5000) public void testDAGInvalidParallelism2() { - DAG dag = DAG.create("testDAG"); + DAG dag = DAG.create("DAG-testDAGInvalidParallelism2"); Vertex v1 = Vertex.create("v1", ProcessorDescriptor.create(dummyProcessorClassName)); dag.addVertex(v1); try { @@ -1095,7 +1095,7 @@ public class TestDAGVerify { // v1 has 1-1 united source vertex v0 which has input initializer @Test(timeout = 5000) public void testDAGInvalidParallelism3() { - DAG dag = DAG.create("testDAG"); + DAG dag = DAG.create("DAG-testDAGInvalidParallelism3"); Vertex v1 = Vertex.create("v1", ProcessorDescriptor.create(dummyProcessorClassName)); dag.addVertex(v1); try { @@ -1122,7 +1122,7 @@ public class TestDAGVerify { // v1 has an 1-1 united parent v0 which has custom vertex manager @Test//(timeout = 5000) public void testDAGInvalidParallelism4() { - DAG dag = DAG.create("testDAG"); + DAG dag = DAG.create("DAG-testDAGInvalidParallelism4"); Vertex v1 = Vertex.create("v1", ProcessorDescriptor.create(dummyProcessorClassName)); dag.addVertex(v1); try { @@ -1146,7 +1146,7 @@ public class TestDAGVerify { @Test public void testDAGWithSplitsOnClient() { - DAG dag = DAG.create("testDag"); + DAG dag = DAG.create("DAG-testDAGWithSplitsOnClient"); // Mimic map which has a data source and shards set when splits are generated in the client Vertex v1 = Vertex.create("v1", ProcessorDescriptor.create(dummyProcessorClassName)); @@ -1170,7 +1170,7 @@ public class TestDAGVerify { // Verifies failure in case of a file size difference. Does not verify sha differences. @Test(timeout = 5000) public void testDAGWithConflictingResource() { - DAG dag = DAG.create("dag"); + DAG dag = DAG.create("DAG-testDAGWithConflictingResource"); Map<String, LocalResource> localResourceMap = new HashMap<>(); String commonResourceKey = "local resource"; localResourceMap.put("lr", LocalResource.newInstance(null, LocalResourceType.FILE, diff --git a/tez-dag/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClientAMProtocolBlockingPBServerImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClientAMProtocolBlockingPBServerImpl.java index 1aea8f793..5f6552ee8 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClientAMProtocolBlockingPBServerImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClientAMProtocolBlockingPBServerImpl.java @@ -75,7 +75,7 @@ public class TestDAGClientAMProtocolBlockingPBServerImpl { @SuppressWarnings("unchecked") public void testSubmitDagInSessionWithLargeDagPlan() throws Exception { int maxIPCMsgSize = 1024; - String dagPlanName = "dagplan-name"; + String dagPlanName = "DAG-testSubmitDagInSessionWithLargeDagPlan"; File requestFile = tmpFolder.newFile("request-file"); TezConfiguration conf = new TezConfiguration(); conf.setInt(CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH, maxIPCMsgSize); diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java index 6c0231c9b..6ec73e74e 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java @@ -900,7 +900,7 @@ public class TestMockDAGAppMaster { lrVertex.put(lrName2, LocalResource.newInstance(URL.newInstance("file", "localhost", 0, "/test1"), LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, 1, 1)); - DAG dag = DAG.create("test").addTaskLocalFiles(lrDAG); + DAG dag = DAG.create("DAG-testMultipleSubmissions").addTaskLocalFiles(lrDAG); Vertex vA = Vertex.create("A", ProcessorDescriptor.create("Proc.class"), 5).addTaskLocalFiles(lrVertex); dag.addVertex(vA); @@ -1212,7 +1212,7 @@ public class TestMockDAGAppMaster { MockContainerLauncher mockLauncher = mockApp.getContainerLauncher(); mockLauncher.startScheduling(true); - DAG dag = DAG.create("test"); + DAG dag = DAG.create("DAG-testDAGFinishedRecoveryError"); Vertex vA = Vertex.create("A", ProcessorDescriptor.create("Proc.class"), 5); dag.addVertex(vA); diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestSpeculation.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestSpeculation.java index 302281af9..e7fc161dd 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestSpeculation.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestSpeculation.java @@ -300,7 +300,7 @@ public class TestSpeculation { TezConfiguration.TEZ_AM_LEGACY_SPECULATIVE_SINGLE_TASK_VERTEX_TIMEOUT, entry.getKey()); - DAG dag = DAG.create("test"); + DAG dag = DAG.create("DAG-testSingleTaskSpeculation"); Vertex vA = Vertex.create("A", ProcessorDescriptor.create("Proc.class"), 1); @@ -343,7 +343,7 @@ public class TestSpeculation { * @throws Exception the exception */ public void testBasicSpeculation(boolean withProgress) throws Exception { - DAG dag = DAG.create("test"); + DAG dag = DAG.create("DAG-testBasicSpeculation"); Vertex vA = Vertex.create("A", ProcessorDescriptor.create("Proc.class"), 5); dag.addVertex(vA); @@ -426,7 +426,7 @@ public class TestSpeculation { @Retry @Test (timeout=30000) public void testBasicSpeculationPerVertexConf() throws Exception { - DAG dag = DAG.create("test"); + DAG dag = DAG.create("DAG-testBasicSpeculationPerVertexConf"); String vNameNoSpec = "A"; String vNameSpec = "B"; Vertex vA = Vertex.create(vNameNoSpec, ProcessorDescriptor.create("Proc.class"), 5); @@ -487,7 +487,7 @@ public class TestSpeculation { @Retry @Test (timeout=30000) public void testBasicSpeculationNotUseful() throws Exception { - DAG dag = DAG.create("test"); + DAG dag = DAG.create("DAG-testBasicSpeculationNotUseful"); Vertex vA = Vertex.create("A", ProcessorDescriptor.create("Proc.class"), 5); dag.addVertex(vA); diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java index 436c522ef..f4d2daaac 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java @@ -400,7 +400,7 @@ public class TestCommit { // v2->v3 // vertex_group (v1, v2) private DAGPlan createDAGPlan(boolean vertexGroupCommitSucceeded, - boolean v3CommitSucceeded) throws Exception { + boolean v3CommitSucceeded, String dagName) throws Exception { LOG.info("Setting up group dag plan"); int dummyTaskCount = 1; Resource dummyTaskResource = Resource.newInstance(1, 1); @@ -414,7 +414,7 @@ public class TestCommit { "vertex3", ProcessorDescriptor.create("Processor"), dummyTaskCount, dummyTaskResource); - DAG dag = DAG.create("testDag"); + DAG testDag = DAG.create("DAG-" + dagName); String groupName1 = "uv12"; OutputCommitterDescriptor ocd1 = OutputCommitterDescriptor.create( CountingOutputCommitter.class.getName()).setUserPayload( @@ -427,8 +427,8 @@ public class TestCommit { .wrap(new CountingOutputCommitter.CountingOutputCommitterConfig( !v3CommitSucceeded, true).toUserPayload()))); - org.apache.tez.dag.api.VertexGroup uv12 = dag.createVertexGroup(groupName1, - v1, v2); + org.apache.tez.dag.api.VertexGroup uv12 = + testDag.createVertexGroup(groupName1, v1, v2); OutputDescriptor outDesc = OutputDescriptor.create("output.class"); uv12.addDataSink("v12Out", DataSinkDescriptor.create(outDesc, ocd1, null)); v3.addDataSink("v3Out", DataSinkDescriptor.create(outDesc, ocd2, null)); @@ -440,18 +440,19 @@ public class TestCommit { InputDescriptor.create("dummy input class")), InputDescriptor .create("merge.class")); - dag.addVertex(v1); - dag.addVertex(v2); - dag.addVertex(v3); - dag.addEdge(e1); - return dag.createDag(conf, null, null, null, true); + testDag.addVertex(v1); + testDag.addVertex(v2); + testDag.addVertex(v3); + testDag.addEdge(e1); + return testDag.createDag(conf, null, null, null, true); } // v1->v3 // v2->v3 // vertex_group (v1, v2) has 2 shared outputs - private DAGPlan createDAGPlanWith2VertexGroupOutputs(boolean vertexGroupCommitSucceeded1, - boolean vertexGroupCommitSucceeded2, boolean v3CommitSucceeded) throws Exception { + private DAGPlan createDAGPlanWith2VertexGroupOutputs( + boolean vertexGroupCommitSucceeded1, boolean vertexGroupCommitSucceeded2, + boolean v3CommitSucceeded, String dagName) throws Exception { LOG.info("Setting up group dag plan"); int dummyTaskCount = 1; Resource dummyTaskResource = Resource.newInstance(1, 1); @@ -465,7 +466,7 @@ public class TestCommit { "vertex3", ProcessorDescriptor.create("Processor"), dummyTaskCount, dummyTaskResource); - DAG dag = DAG.create("testDag"); + DAG testDag = DAG.create("DAG-" + dagName); String groupName1 = "uv12"; OutputCommitterDescriptor ocd1 = OutputCommitterDescriptor.create( CountingOutputCommitter.class.getName()).setUserPayload( @@ -483,8 +484,8 @@ public class TestCommit { .wrap(new CountingOutputCommitter.CountingOutputCommitterConfig( !v3CommitSucceeded, true).toUserPayload()))); - org.apache.tez.dag.api.VertexGroup uv12 = dag.createVertexGroup(groupName1, - v1, v2); + org.apache.tez.dag.api.VertexGroup uv12 = + testDag.createVertexGroup(groupName1, v1, v2); OutputDescriptor outDesc = OutputDescriptor.create("output.class"); uv12.addDataSink("v12Out1", DataSinkDescriptor.create(outDesc, ocd1, null)); uv12.addDataSink("v12Out2", DataSinkDescriptor.create(outDesc, ocd2, null)); @@ -497,21 +498,22 @@ public class TestCommit { InputDescriptor.create("dummy input class")), InputDescriptor .create("merge.class")); - dag.addVertex(v1); - dag.addVertex(v2); - dag.addVertex(v3); - dag.addEdge(e1); - return dag.createDag(conf, null, null, null, true); + testDag.addVertex(v1); + testDag.addVertex(v2); + testDag.addVertex(v3); + testDag.addEdge(e1); + return testDag.createDag(conf, null, null, null, true); } private DAGPlan createDAGPlan_SingleVertexWith2Committer( - boolean commit1Succeed, boolean commit2Succeed) throws IOException { - return createDAGPlan_SingleVertexWith2Committer(commit1Succeed, commit2Succeed, false); + boolean commit1Succeed, boolean commit2Succeed, String dagName) throws IOException { + return createDAGPlan_SingleVertexWith2Committer(commit1Succeed, commit2Succeed, false, dagName); } // used for route event error in VM - private DAGPlan createDAGPlan_SingleVertexWith2Committer - (boolean commit1Succeed, boolean commit2Succeed, boolean customVM) throws IOException { + private DAGPlan createDAGPlan_SingleVertexWith2Committer( + boolean commit1Succeed, boolean commit2Succeed, boolean customVM, + String dagName) throws IOException { LOG.info("Setting up group dag plan"); int dummyTaskCount = 1; Resource dummyTaskResource = Resource.newInstance(1, 1); @@ -534,12 +536,12 @@ public class TestCommit { .wrap(new CountingOutputCommitter.CountingOutputCommitterConfig( !commit2Succeed, true).toUserPayload()))); - DAG dag = DAG.create("testDag"); - dag.addVertex(v1); + DAG testDag = DAG.create("DAG-" + dagName); + testDag.addVertex(v1); OutputDescriptor outDesc = OutputDescriptor.create("output.class"); v1.addDataSink("v1Out_1", DataSinkDescriptor.create(outDesc, ocd1, null)); v1.addDataSink("v1Out_2", DataSinkDescriptor.create(outDesc, ocd2, null)); - return dag.createDag(conf, null, null, null, true); + return testDag.createDag(conf, null, null, null, true); } private void initDAG(DAGImpl dag) { @@ -559,7 +561,8 @@ public class TestCommit { public void testVertexCommit_OnDAGSuccess() throws Exception { conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, true); - setupDAG(createDAGPlan_SingleVertexWith2Committer(true, true)); + setupDAG(createDAGPlan_SingleVertexWith2Committer(true, true, + "testVertexCommit_OnDAGSuccess")); initDAG(dag); startDAG(dag); VertexImpl v1 = (VertexImpl) dag.getVertex("vertex1"); @@ -590,7 +593,8 @@ public class TestCommit { public void testVertexCommit_OnVertexSuccess() throws Exception { conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, false); - setupDAG(createDAGPlan_SingleVertexWith2Committer(true, true)); + setupDAG(createDAGPlan_SingleVertexWith2Committer(true, true, + "testVertexCommit_OnVertexSuccess")); initDAG(dag); startDAG(dag); VertexImpl v1 = (VertexImpl) dag.getVertex("vertex1"); @@ -629,7 +633,8 @@ public class TestCommit { public void testVertexCommitFail1_OnVertexSuccess() throws Exception { conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, false); - setupDAG(createDAGPlan_SingleVertexWith2Committer(false, true)); + setupDAG(createDAGPlan_SingleVertexWith2Committer(false, true, + "testVertexCommitFail1_OnVertexSuccess")); initDAG(dag); startDAG(dag); VertexImpl v1 = (VertexImpl) dag.getVertex("vertex1"); @@ -665,7 +670,8 @@ public class TestCommit { public void testVertexCommitFail2_OnVertexSuccess() throws Exception { conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, false); - setupDAG(createDAGPlan_SingleVertexWith2Committer(true, false)); + setupDAG(createDAGPlan_SingleVertexWith2Committer(true, false, + "testVertexCommitFail2_OnVertexSuccess")); initDAG(dag); startDAG(dag); VertexImpl v1 = (VertexImpl) dag.getVertex("vertex1"); @@ -703,7 +709,8 @@ public class TestCommit { public void testVertexKilledWhileCommitting() throws Exception { conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, false); - setupDAG(createDAGPlan_SingleVertexWith2Committer(true, true)); + setupDAG(createDAGPlan_SingleVertexWith2Committer(true, true, + "testVertexKilledWhileCommitting")); initDAG(dag); startDAG(dag); VertexImpl v1 = (VertexImpl) dag.getVertex("vertex1"); @@ -742,7 +749,8 @@ public class TestCommit { public void testVertexRescheduleWhileCommitting() throws Exception { conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, false); - setupDAG(createDAGPlan_SingleVertexWith2Committer(true, true)); + setupDAG(createDAGPlan_SingleVertexWith2Committer(true, true, + "testVertexRescheduleWhileCommitting")); initDAG(dag); startDAG(dag); VertexImpl v1 = (VertexImpl) dag.getVertex("vertex1"); @@ -782,7 +790,8 @@ public class TestCommit { public void testVertexRouteEventErrorWhileCommitting() throws Exception { conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, false); - setupDAG(createDAGPlan_SingleVertexWith2Committer(true, true, true)); + setupDAG(createDAGPlan_SingleVertexWith2Committer(true, true, true, + "testVertexRouteEventErrorWhileCommitting")); initDAG(dag); startDAG(dag); VertexImpl v1 = (VertexImpl) dag.getVertex("vertex1"); @@ -827,7 +836,8 @@ public class TestCommit { public void testVertexInternalErrorWhileCommiting() throws Exception { conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, false); - setupDAG(createDAGPlan_SingleVertexWith2Committer(true, true)); + setupDAG(createDAGPlan_SingleVertexWith2Committer(true, true, + "testVertexInternalErrorWhileCommiting")); initDAG(dag); startDAG(dag); VertexImpl v1 = (VertexImpl) dag.getVertex("vertex1"); @@ -868,7 +878,7 @@ public class TestCommit { public void testDAGCommitSucceeded_OnDAGSuccess() throws Exception { conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, true); - setupDAG(createDAGPlan(true, true)); + setupDAG(createDAGPlan(true, true, "testDAGCommitSucceeded_OnDAGSuccess")); initDAG(dag); startDAG(dag); VertexImpl v1 = (VertexImpl) dag.getVertex("vertex1"); @@ -924,7 +934,7 @@ public class TestCommit { public void testDAGCommitFail1_OnDAGSuccess() throws Exception { conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, true); - setupDAG(createDAGPlan(true, false)); + setupDAG(createDAGPlan(true, false, "testDAGCommitFail1_OnDAGSuccess")); initDAG(dag); startDAG(dag); VertexImpl v1 = (VertexImpl) dag.getVertex("vertex1"); @@ -985,7 +995,7 @@ public class TestCommit { public void testDAGCommitFail2_OnDAGSuccess() throws Exception { conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, true); - setupDAG(createDAGPlan(false, true)); + setupDAG(createDAGPlan(false, true, "testDAGCommitFail2_OnDAGSuccess")); initDAG(dag); startDAG(dag); VertexImpl v1 = (VertexImpl) dag.getVertex("vertex1"); @@ -1045,7 +1055,7 @@ public class TestCommit { public void testDAGCommitSucceeded1_OnVertexSuccess() throws Exception { conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, false); - setupDAG(createDAGPlan(true, true)); + setupDAG(createDAGPlan(true, true, "testDAGCommitSucceeded1_OnVertexSuccess")); initDAG(dag); startDAG(dag); VertexImpl v1 = (VertexImpl) dag.getVertex("vertex1"); @@ -1103,7 +1113,7 @@ public class TestCommit { public void testDAGCommitSucceeded2_OnVertexSuccess() throws Exception { conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, false); - setupDAG(createDAGPlan(true, true)); + setupDAG(createDAGPlan(true, true, "testDAGCommitSucceeded2_OnVertexSuccess")); initDAG(dag); startDAG(dag); VertexImpl v1 = (VertexImpl) dag.getVertex("vertex1"); @@ -1163,7 +1173,7 @@ public class TestCommit { public void testDAGCommitSucceeded3_OnVertexSuccess() throws Exception { conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, false); - setupDAG(createDAGPlanWith2VertexGroupOutputs(true, true, true)); + setupDAG(createDAGPlanWith2VertexGroupOutputs(true, true, true, "testDAGCommitSucceeded3_OnVertexSuccess")); initDAG(dag); startDAG(dag); VertexImpl v1 = (VertexImpl) dag.getVertex("vertex1"); @@ -1228,7 +1238,7 @@ public class TestCommit { public void testDAGCommitFail1_OnVertexSuccess() throws Exception { conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, false); - setupDAG(createDAGPlan(false, true)); + setupDAG(createDAGPlan(false, true, "testDAGCommitFail1_OnVertexSuccess")); initDAG(dag); startDAG(dag); VertexImpl v1 = (VertexImpl) dag.getVertex("vertex1"); @@ -1287,7 +1297,7 @@ public class TestCommit { public void testDAGCommitFail2_OnVertexSuccess() throws Exception { conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, false); - setupDAG(createDAGPlan(true, false)); + setupDAG(createDAGPlan(true, false, "testDAGCommitFail2_OnVertexSuccess")); initDAG(dag); startDAG(dag); VertexImpl v1 = (VertexImpl) dag.getVertex("vertex1"); @@ -1346,7 +1356,7 @@ public class TestCommit { public void testDAGCommitFail3_OnVertexSuccess() throws Exception { conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, false); - setupDAG(createDAGPlan(true, false)); + setupDAG(createDAGPlan(true, false, "testDAGCommitFail3_OnVertexSuccess")); initDAG(dag); startDAG(dag); VertexImpl v1 = (VertexImpl) dag.getVertex("vertex1"); @@ -1409,7 +1419,7 @@ public class TestCommit { public void testDAGCommitFail4_OnVertexSuccess() throws Exception { conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, false); - setupDAG(createDAGPlan(false, true)); + setupDAG(createDAGPlan(false, true, "testDAGCommitFail4_OnVertexSuccess")); initDAG(dag); startDAG(dag); VertexImpl v1 = (VertexImpl) dag.getVertex("vertex1"); @@ -1468,7 +1478,7 @@ public class TestCommit { public void testDAGInternalErrorWhileCommiting_OnDAGSuccess() throws Exception { conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, true); - setupDAG(createDAGPlan(true, true)); + setupDAG(createDAGPlan(true, true, "testDAGInternalErrorWhileCommiting_OnDAGSuccess")); initDAG(dag); startDAG(dag); VertexImpl v1 = (VertexImpl) dag.getVertex("vertex1"); @@ -1530,7 +1540,7 @@ public class TestCommit { private void _testDAGTerminatedWhileCommitting1_OnDAGSuccess(DAGTerminationCause terminationCause) throws Exception { conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, true); - setupDAG(createDAGPlan(true, true)); + setupDAG(createDAGPlan(true, true, "_testDAGTerminatedWhileCommitting1_OnDAGSuccess")); initDAG(dag); startDAG(dag); VertexImpl v1 = (VertexImpl) dag.getVertex("vertex1"); @@ -1595,7 +1605,7 @@ public class TestCommit { private void _testDAGTerminatedWhileCommitting1_OnVertexSuccess(DAGTerminationCause terminationCause) throws Exception { conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, false); - setupDAG(createDAGPlan(true, true)); + setupDAG(createDAGPlan(true, true, "_testDAGTerminatedWhileCommitting1_OnVertexSuccess")); initDAG(dag); startDAG(dag); VertexImpl v1 = (VertexImpl) dag.getVertex("vertex1"); @@ -1665,7 +1675,7 @@ public class TestCommit { private void _testDAGKilledWhileRunning_OnVertexSuccess(DAGTerminationCause terminationCause) throws Exception { conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, false); - setupDAG(createDAGPlan(true, true)); + setupDAG(createDAGPlan(true, true, "_testDAGKilledWhileRunning_OnVertexSuccess")); initDAG(dag); startDAG(dag); VertexImpl v1 = (VertexImpl) dag.getVertex("vertex1"); @@ -1724,7 +1734,7 @@ public class TestCommit { public void testDAGCommitVertexRerunWhileCommitting_OnDAGSuccess() throws Exception { conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, true); - setupDAG(createDAGPlan(true, true)); + setupDAG(createDAGPlan(true, true, "testDAGCommitVertexRerunWhileCommitting_OnDAGSuccess")); initDAG(dag); startDAG(dag); VertexImpl v1 = (VertexImpl) dag.getVertex("vertex1"); @@ -1783,7 +1793,7 @@ public class TestCommit { public void testDAGCommitInternalErrorWhileCommiting_OnDAGSuccess() throws Exception { conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, true); - setupDAG(createDAGPlan(true, true)); + setupDAG(createDAGPlan(true, true, "testDAGCommitInternalErrorWhileCommiting_OnDAGSuccess")); initDAG(dag); startDAG(dag); VertexImpl v1 = (VertexImpl) dag.getVertex("vertex1"); @@ -1831,7 +1841,7 @@ public class TestCommit { public void testVertexGroupCommitFinishedEventFail_OnVertexSuccess() throws Exception { conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, false); - setupDAG(createDAGPlan(true, true)); + setupDAG(createDAGPlan(true, true, "testVertexGroupCommitFinishedEventFail_OnVertexSuccess")); historyEventHandler.failVertexGroupCommitFinishedEvent = true; initDAG(dag); @@ -1886,7 +1896,7 @@ public class TestCommit { public void testDAGCommitStartedEventFail_OnDAGSuccess() throws Exception { conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, true); - setupDAG(createDAGPlan(true, true)); + setupDAG(createDAGPlan(true, true, "testDAGCommitStartedEventFail_OnDAGSuccess")); historyEventHandler.failDAGCommitStartedEvent = true; initDAG(dag); @@ -1947,7 +1957,7 @@ public class TestCommit { private void _testCommitCanceled_OnDAGSuccess(DAGTerminationCause terminationCause) throws Exception { conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, true); - setupDAG(createDAGPlan(true, true)); + setupDAG(createDAGPlan(true, true, "testDAGCommitStartedEventFail_OnDAGSuccess")); // create customized ThreadPoolExecutor to wait before schedule new task rawExecutor = new ControlledThreadPoolExecutor(1); execService = MoreExecutors.listeningDecorator(rawExecutor); diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java index 9a3292e8b..025bf8ad3 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java @@ -48,6 +48,8 @@ import org.apache.tez.dag.api.TezConstants; import org.apache.tez.dag.app.dag.event.DAGEventTerminateDag; import org.apache.tez.hadoop.shim.DefaultHadoopShim; import org.apache.tez.hadoop.shim.HadoopShim; +import org.junit.Rule; +import org.junit.rules.TestName; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -161,6 +163,9 @@ import com.google.protobuf.ByteString; public class TestDAGImpl { + @Rule + public TestName testName = new TestName(); + private static final Logger LOG = LoggerFactory.getLogger(TestDAGImpl.class); private DAGPlan dagPlan; private TezDAGID dagId; @@ -426,7 +431,7 @@ public class TestDAGImpl { } // Create a plan with 3 vertices: A, B, C. Group(A,B)->C - static DAGPlan createGroupDAGPlan() { + static DAGPlan createGroupDAGPlan(String dagName) { LOG.info("Setting up group dag plan"); int dummyTaskCount = 1; Resource dummyTaskResource = Resource.newInstance(1, 1); @@ -440,7 +445,7 @@ public class TestDAGImpl { ProcessorDescriptor.create("Processor"), dummyTaskCount, dummyTaskResource); - DAG dag = DAG.create("testDag"); + DAG dag = DAG.create("DAG-" + dagName); String groupName1 = "uv12"; OutputCommitterDescriptor ocd = OutputCommitterDescriptor.create( TotalCountingOutputCommitter.class.getName()); @@ -906,7 +911,7 @@ public class TestDAGImpl { doReturn(defaultShim).when(groupAppContext).getHadoopShim(); groupDagId = TezDAGID.getInstance(appAttemptId.getApplicationId(), 3); - groupDagPlan = createGroupDAGPlan(); + groupDagPlan = createGroupDAGPlan(testName.getMethodName()); groupDag = new DAGImpl(groupDagId, conf, groupDagPlan, dispatcher.getEventHandler(), taskCommunicatorManagerInterface, fsTokens, clock, "user", thh, diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java index 10ecfe503..139f2fd91 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java @@ -809,9 +809,9 @@ public class TestVertexImpl { return dag; } - private DAGPlan createDAGPlanWithMixedEdges() { + private DAGPlan createDAGPlanWithMixedEdges(String dagName) { LOG.info("Setting up mixed edge dag plan"); - org.apache.tez.dag.api.DAG dag = org.apache.tez.dag.api.DAG.create("MixedEdges"); + org.apache.tez.dag.api.DAG dag = org.apache.tez.dag.api.DAG.create("DAG-" + dagName); org.apache.tez.dag.api.Vertex v1 = org.apache.tez.dag.api.Vertex.create("vertex1", ProcessorDescriptor.create("v1.class"), 1, Resource.newInstance(0, 0)); org.apache.tez.dag.api.Vertex v2 = org.apache.tez.dag.api.Vertex.create("vertex2", @@ -4602,7 +4602,7 @@ public class TestVertexImpl { @Test(timeout = 5000) public void testVertexManagerHeuristic() throws TezException { setupPreDagCreation(); - dagPlan = createDAGPlanWithMixedEdges(); + dagPlan = createDAGPlanWithMixedEdges("testVertexManagerHeuristic"); setupPostDagCreation(false); initAllVertices(VertexState.INITED); Assert.assertEquals(ImmediateStartVertexManager.class, diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java b/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java index 4d4577ac7..f70224dc7 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java @@ -56,7 +56,7 @@ import com.google.common.collect.Sets; public class TestDAGUtils { @SuppressWarnings("deprecation") - private DAGPlan createDAG() { + private DAGPlan createDAG(String dagName) { // Create a plan with 3 vertices: A, B, C. Group(A,B)->C Configuration conf = new Configuration(false); int dummyTaskCount = 1; @@ -73,7 +73,7 @@ public class TestDAGUtils { ProcessorDescriptor.create("Processor").setHistoryText("vertex3 Processor HistoryText"), dummyTaskCount, dummyTaskResource); - DAG dag = DAG.create("testDag"); + DAG dag = DAG.create("DAG-" + dagName); dag.setCallerContext(CallerContext.create("context1", "callerId1", "callerType1", "desc1")); dag.setDAGInfo("dagInfo"); String groupName1 = "uv12"; @@ -102,7 +102,7 @@ public class TestDAGUtils { @Test(timeout = 5000) @SuppressWarnings("unchecked") public void testConvertDAGPlanToATSMap() throws IOException, JSONException { - DAGPlan dagPlan = createDAG(); + DAGPlan dagPlan = createDAG("testConvertDAGPlanToATSMap"); Map<String,TezVertexID> idNameMap = new HashMap<String, TezVertexID>(); ApplicationId appId = ApplicationId.newInstance(1, 1); TezDAGID dagId = TezDAGID.getInstance(appId, 1); @@ -115,7 +115,8 @@ public class TestDAGUtils { Map<String, Object> atsMap = DAGUtils.convertDAGPlanToATSMap(dagPlan); Assert.assertTrue(atsMap.containsKey(DAGUtils.DAG_NAME_KEY)); - Assert.assertEquals("testDag", atsMap.get(DAGUtils.DAG_NAME_KEY)); + Assert.assertEquals("DAG-testConvertDAGPlanToATSMap", + atsMap.get(DAGUtils.DAG_NAME_KEY)); Assert.assertTrue(atsMap.containsKey(DAGUtils.DAG_INFO_KEY)); Assert.assertTrue(atsMap.containsKey(DAGUtils.DAG_CONTEXT_KEY)); Map<String, String> contextMap = (Map<String, String>)atsMap.get(DAGUtils.DAG_CONTEXT_KEY); diff --git a/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java b/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java index 95d5bcf30..339c46780 100644 --- a/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java +++ b/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java @@ -189,7 +189,7 @@ public class TestMRRJobsDAGApi { public void testSleepJob() throws TezException, IOException, InterruptedException { SleepProcessorConfig spConf = new SleepProcessorConfig(1); - DAG dag = DAG.create("TezSleepProcessor"); + DAG dag = DAG.create("DAG-testSleepJob"); Vertex vertex = Vertex.create("SleepVertex", ProcessorDescriptor.create( SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 1, Resource.newInstance(1024, 1)); @@ -232,7 +232,7 @@ public class TestMRRJobsDAGApi { public void testNonDefaultFSStagingDir() throws Exception { SleepProcessorConfig spConf = new SleepProcessorConfig(1); - DAG dag = DAG.create("TezSleepProcessor"); + DAG dag = DAG.create("DAG-testNonDefaultFSStagingDir"); Vertex vertex = Vertex.create("SleepVertex", ProcessorDescriptor.create( SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 1, Resource.newInstance(1024, 1)); diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestAM.java b/tez-tests/src/test/java/org/apache/tez/test/TestAM.java index 93fb0b473..a31fa7e8a 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/TestAM.java +++ b/tez-tests/src/test/java/org/apache/tez/test/TestAM.java @@ -103,7 +103,7 @@ public class TestAM { public void testAMWebUIService() throws TezException, IOException, InterruptedException { SleepProcessorConfig spConf = new SleepProcessorConfig(1); - DAG dag = DAG.create("TezSleepProcessor"); + DAG dag = DAG.create("DAG-testAMWebUIService"); Vertex vertex = Vertex.create("SleepVertex", ProcessorDescriptor.create(SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 1, Resource.newInstance(1024, 1)); diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java b/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java index b7ba9dd58..bd7074650 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java +++ b/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java @@ -843,7 +843,7 @@ public class TestFaultTolerance { public void testCartesianProduct() throws Exception { Configuration dagConf = new Configuration(); dagConf.setDouble(TezConfiguration.TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES_FRACTION, 0.25); - DAG dag = DAG.create("dag"); + DAG dag = DAG.create("DAG-testCartesianProduct"); Configuration vertexConf = new Configuration(); vertexConf.setInt(TestProcessor.getVertexConfName( diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestLocalMode.java b/tez-tests/src/test/java/org/apache/tez/test/TestLocalMode.java index 14832d36c..035d39b13 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/TestLocalMode.java +++ b/tez-tests/src/test/java/org/apache/tez/test/TestLocalMode.java @@ -133,7 +133,7 @@ public class TestLocalMode { TezClient tezClient1 = TezClient.create("commonName", tezConf1, true); tezClient1.start(); - DAG dag1 = createSimpleDAG("dag1", SleepProcessor.class.getName()); + DAG dag1 = createSimpleDAG("testMultipleClientsWithSession", SleepProcessor.class.getName()); DAGClient dagClient1 = tezClient1.submitDAG(dag1); dagClient1.waitForCompletion(); @@ -145,7 +145,7 @@ public class TestLocalMode { tezClient1.stop(); TezConfiguration tezConf2 = createConf(); - DAG dag2 = createSimpleDAG("dag2", SleepProcessor.class.getName()); + DAG dag2 = createSimpleDAG("testMultipleClientsWithSession_2", SleepProcessor.class.getName()); TezClient tezClient2 = TezClient.create("commonName", tezConf2, true); tezClient2.start(); DAGClient dagClient2 = tezClient2.submitDAG(dag2); @@ -165,7 +165,7 @@ public class TestLocalMode { TezClient tezClient1 = TezClient.create("commonName", tezConf1, false); tezClient1.start(); - DAG dag1 = createSimpleDAG("dag1", SleepProcessor.class.getName()); + DAG dag1 = createSimpleDAG("testMultipleClientsWithoutSession", SleepProcessor.class.getName()); DAGClient dagClient1 = tezClient1.submitDAG(dag1); dagClient1.waitForCompletion(); @@ -177,7 +177,7 @@ public class TestLocalMode { TezConfiguration tezConf2 = createConf(); - DAG dag2 = createSimpleDAG("dag2", SleepProcessor.class.getName()); + DAG dag2 = createSimpleDAG("testMultipleClientsWithoutSession_2", SleepProcessor.class.getName()); TezClient tezClient2 = TezClient.create("commonName", tezConf2, false); tezClient2.start(); DAGClient dagClient2 = tezClient2.submitDAG(dag2); @@ -198,7 +198,7 @@ public class TestLocalMode { TezClient tezClient1 = TezClient.create("commonName", tezConf1, false); tezClient1.start(); - DAG dag1 = createSimpleDAG("dag1", SleepProcessor.class.getName()); + DAG dag1 = createSimpleDAG("testNoSysExitOnSuccessfulDAG", SleepProcessor.class.getName()); DAGClient dagClient1 = tezClient1.submitDAG(dag1); dagClient1.waitForCompletion(); @@ -213,14 +213,14 @@ public class TestLocalMode { } @Test(timeout = 20000) - public void testNoSysExitOnFailinglDAG() throws TezException, InterruptedException, + public void testNoSysExitOnFailingDAG() throws TezException, InterruptedException, IOException { TezConfiguration tezConf1 = createConf(); // Run in non-session mode so that the AM terminates TezClient tezClient1 = TezClient.create("commonName", tezConf1, false); tezClient1.start(); - DAG dag1 = createSimpleDAG("dag1", FailingProcessor.class.getName()); + DAG dag1 = createSimpleDAG("testNoSysExitOnFailingDAG", FailingProcessor.class.getName()); DAGClient dagClient1 = tezClient1.submitDAG(dag1); dagClient1.waitForCompletion(); @@ -261,11 +261,14 @@ public class TestLocalMode { } private DAG createSimpleDAG(String dagName, String processorName) { - DAG dag = DAG.create(dagName).addVertex( + DAG dag = DAG.create(generateDagName("DAG-" + dagName)).addVertex( Vertex.create(SleepProcessor.SLEEP_VERTEX_NAME, ProcessorDescriptor.create(processorName).setUserPayload( new SleepProcessor.SleepProcessorConfig(SLEEP_PROCESSOR_TIME_TO_SLEEP_MS).toUserPayload()), 1)); return dag; } + private String generateDagName(String baseName) { + return baseName + (useDfs ? "_useDfs" : "") + (useLocalModeWithoutNetwork ? "_useLocalModeWithoutNetwork" : ""); + } @Test(timeout=30000) public void testMultiDAGsOnSession() throws IOException, TezException, InterruptedException {