Repository: tez Updated Branches: refs/heads/master 51fb3e4cf -> 7f699f18f
TEZ-3294. DAG.createDag() does not clear local state on repeat calls. (Harish Jaiprakash via hitesh) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/7f699f18 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/7f699f18 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/7f699f18 Branch: refs/heads/master Commit: 7f699f18ff94ca2b44a0eda650f31e1054c79ade Parents: 51fb3e4 Author: Hitesh Shah <[email protected]> Authored: Mon Jun 13 19:27:49 2016 -0700 Committer: Hitesh Shah <[email protected]> Committed: Mon Jun 13 19:27:49 2016 -0700 ---------------------------------------------------------------------- CHANGES.txt | 3 ++ .../main/java/org/apache/tez/dag/api/DAG.java | 33 ++++++++++++-------- .../org/apache/tez/client/TestTezClient.java | 28 +++++++++++++++++ .../java/org/apache/tez/dag/api/TestDAG.java | 28 +++++++++++++++++ 4 files changed, 79 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/7f699f18/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 73a11fa..b9c6b0e 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3294. DAG.createDag() does not clear local state on repeat calls. TEZ-3297. Deadlock scenario in AM during ShuffleVertexManager auto reduce. TEZ-3296. Tez fails to compile against hadoop 2.8 after MAPREDUCE-5870 TEZ-3295. TestOrderedWordCount should handle relative input/output paths. @@ -59,6 +60,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3294. DAG.createDag() does not clear local state on repeat calls. TEZ-3297. Deadlock scenario in AM during ShuffleVertexManager auto reduce. TEZ-3296. Tez fails to compile against hadoop 2.8 after MAPREDUCE-5870 TEZ-3290. Set full task attempt id string in MRInput configuration object. @@ -508,6 +510,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3294. DAG.createDag() does not clear local state on repeat calls. TEZ-3297. Deadlock scenario in AM during ShuffleVertexManager auto reduce. TEZ-3296. Tez fails to compile against hadoop 2.8 after MAPREDUCE-5870 TEZ-3280. LOG MRInputHelpers split generation message as INFO http://git-wip-us.apache.org/repos/asf/tez/blob/7f699f18/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java index 902ff21..0eb51e1 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java @@ -21,9 +21,11 @@ import java.net.URI; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.Deque; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -99,8 +101,6 @@ public class DAG { private Map<String,String> dagConf = new HashMap<String, String>(); private VertexExecutionContext defaultExecutionContext; - private Stack<String> topologicalVertexStack = new Stack<String>(); - private DAG(String name) { this.name = name; } @@ -552,7 +552,7 @@ public class DAG { } @VisibleForTesting - void verify(boolean restricted) throws IllegalStateException { + Deque<String> verify(boolean restricted) throws IllegalStateException { if (vertices.isEmpty()) { throw new IllegalStateException("Invalid dag containing 0 vertices"); } @@ -655,8 +655,8 @@ public class DAG { // When additional inputs are supported, this can be chceked easily (and early) // within the addInput / addOutput call itself. - detectCycles(edgeMap, vertexMap); - + Deque<String> topologicalVertexStack = detectCycles(edgeMap, vertexMap); + checkAndInferOneToOneParallelism(); if (restricted) { @@ -673,29 +673,36 @@ public class DAG { } } } + + return topologicalVertexStack; } // Adaptation of Tarjan's algorithm for connected components. // http://en.wikipedia.org/wiki/Tarjan%27s_strongly_connected_components_algorithm - private void detectCycles(Map<Vertex, List<Edge>> edgeMap, Map<String, AnnotatedVertex> vertexMap) + private Deque<String> detectCycles(Map<Vertex, List<Edge>> edgeMap, + Map<String, AnnotatedVertex> vertexMap) throws IllegalStateException { + Deque<String> topologicalVertexStack = new LinkedList<String>(); Integer nextIndex = 0; // boxed integer so it is passed by reference. Stack<AnnotatedVertex> stack = new Stack<DAG.AnnotatedVertex>(); for (AnnotatedVertex av : vertexMap.values()) { if (av.index == -1) { assert stack.empty(); - strongConnect(av, vertexMap, edgeMap, stack, nextIndex); + strongConnect(av, vertexMap, edgeMap, stack, nextIndex, topologicalVertexStack); } } + return topologicalVertexStack; } // part of Tarjan's algorithm for connected components. // http://en.wikipedia.org/wiki/Tarjan%27s_strongly_connected_components_algorithm private void strongConnect( - AnnotatedVertex av, - Map<String, AnnotatedVertex> vertexMap, - Map<Vertex, List<Edge>> edgeMap, - Stack<AnnotatedVertex> stack, Integer nextIndex) throws IllegalStateException { + AnnotatedVertex av, + Map<String, AnnotatedVertex> vertexMap, + Map<Vertex, List<Edge>> edgeMap, + Stack<AnnotatedVertex> stack, + Integer nextIndex, + Deque<String> topologicalVertexStack) throws IllegalStateException { av.index = nextIndex; av.lowlink = nextIndex; nextIndex++; @@ -707,7 +714,7 @@ public class DAG { for (Edge e : edgeMap.get(av.v)) { AnnotatedVertex outVertex = vertexMap.get(e.getOutputVertex().getName()); if (outVertex.index == -1) { - strongConnect(outVertex, vertexMap, edgeMap, stack, nextIndex); + strongConnect(outVertex, vertexMap, edgeMap, stack, nextIndex, topologicalVertexStack); av.lowlink = Math.min(av.lowlink, outVertex.lowlink); } else if (outVertex.onstack) { // strongly connected component detected, but we will wait till later so that the full cycle can be displayed. @@ -760,7 +767,7 @@ public class DAG { Map<String, LocalResource> tezJarResources, LocalResource binaryConfig, boolean tezLrsAsArchive, Map<String, String> additionalConfigs, ServicePluginsDescriptor servicePluginsDescriptor, JavaOptsChecker javaOptsChecker) { - verify(true); + Deque<String> topologicalVertexStack = verify(true); DAGPlan.Builder dagBuilder = DAGPlan.newBuilder(); dagBuilder.setName(this.name); http://git-wip-us.apache.org/repos/asf/tez/blob/7f699f18/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java ---------------------------------------------------------------------- diff --git a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java index 42b762c..d49ba48 100644 --- a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java +++ b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java @@ -77,6 +77,7 @@ import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezConfigurationConstants; import org.apache.tez.dag.api.TezConstants; import org.apache.tez.dag.api.TezException; +import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.api.UserPayload; import org.apache.tez.dag.api.Vertex; import org.apache.tez.dag.api.client.DAGClient; @@ -745,4 +746,31 @@ public class TestTezClient { Assert.fail("Failed to retrieve local host information"); } } + + @Test(timeout = 5000) + public void testClientResubmit() throws Exception { + TezClientForTest client = configureAndCreateTezClient(null, true, null); + client.start(); + Map<String, LocalResource> lrDAG = Collections.singletonMap("LR1", + LocalResource.newInstance( + URL.newInstance("file", "localhost", 0, "/test1"), + LocalResourceType.FILE, + LocalResourceVisibility.PUBLIC, 1, 1)); + Vertex vertex1 = Vertex.create("Vertex1", ProcessorDescriptor.create("P1"), 1, + Resource.newInstance(1, 1)); + vertex1.setTaskLaunchCmdOpts("-XX:+UseParallelGC -XX:+UseG1GC"); + Vertex vertex2 = Vertex.create("Vertex2", ProcessorDescriptor.create("P2"), 1, + Resource.newInstance(1, 1)); + vertex2.setTaskLaunchCmdOpts("-XX:+UseParallelGC -XX:+UseG1GC"); + DAG dag = DAG.create("DAG").addVertex(vertex1).addVertex(vertex2).addTaskLocalFiles(lrDAG); + for (int i = 0; i < 3; ++i) { + try { + client.submitDAG(dag); + Assert.fail("Expected TezUncheckedException here."); + } catch(TezUncheckedException ex) { + Assert.assertTrue(ex.getMessage().contains("Invalid/conflicting GC options found")); + } + } + client.stop(); + } } http://git-wip-us.apache.org/repos/asf/tez/blob/7f699f18/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java ---------------------------------------------------------------------- diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java index 839e780..ae5dfbb 100644 --- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java +++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java @@ -18,11 +18,19 @@ package org.apache.tez.dag.api; +import java.util.Collections; +import java.util.Map; + +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.LocalResourceType; +import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.URL; import org.apache.tez.client.CallerContext; import org.apache.tez.dag.api.EdgeProperty.DataMovementType; import org.apache.tez.dag.api.EdgeProperty.DataSourceType; import org.apache.tez.dag.api.EdgeProperty.SchedulingType; +import org.apache.tez.dag.api.records.DAGProtos.DAGPlan; import org.junit.Assert; import org.junit.Test; @@ -333,4 +341,24 @@ public class TestDAG { } + @Test + public void testRecreateDAG() { + Map<String, LocalResource> lrDAG = Collections.singletonMap("LR1", + LocalResource.newInstance( + URL.newInstance("file", "localhost", 0, "/test1"), + LocalResourceType.FILE, + LocalResourceVisibility.PUBLIC, 1, 1)); + Vertex v1 = Vertex.create("v1", ProcessorDescriptor.create("dummyProcessor1"), 1, + Resource.newInstance(1, 1)); + Vertex v2 = Vertex.create("v2", ProcessorDescriptor.create("dummyProcessor2"), 1, + Resource.newInstance(1, 1)); + DAG dag = DAG.create("dag1").addVertex(v1).addVertex(v2).addTaskLocalFiles(lrDAG); + + TezConfiguration tezConf = new TezConfiguration(); + DAGPlan firstPlan = dag.createDag(tezConf, null, null, null, false); + for (int i = 0; i < 3; ++i) { + DAGPlan dagPlan = dag.createDag(tezConf, null, null, null, false); + Assert.assertEquals(dagPlan, firstPlan); + } + } }
