Repository: tez Updated Branches: refs/heads/branch-0.7 f478befcc -> 60c26c7b2
TEZ-3294. DAG.createDag() does not clear local state on repeat calls. (Harish Jaiprakash via hitesh) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/60c26c7b Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/60c26c7b Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/60c26c7b Branch: refs/heads/branch-0.7 Commit: 60c26c7b23e76dd80389c040cd74727c19217eca Parents: f478bef Author: Hitesh Shah <[email protected]> Authored: Tue Jun 14 15:02:38 2016 -0700 Committer: Hitesh Shah <[email protected]> Committed: Tue Jun 14 15:02:38 2016 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../main/java/org/apache/tez/dag/api/DAG.java | 33 ++++++++++++-------- .../org/apache/tez/client/TestTezClient.java | 29 +++++++++++++++++ .../java/org/apache/tez/dag/api/TestDAG.java | 28 +++++++++++++++++ 4 files changed, 78 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/60c26c7b/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 8d70d97..880bfeb 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3294. DAG.createDag() does not clear local state on repeat calls. TEZ-3297. Deadlock scenario in AM during ShuffleVertexManager auto reduce. TEZ-3296. Tez fails to compile against hadoop 2.8 after MAPREDUCE-5870 TEZ-3278. Hide Swimlane from Tez UI http://git-wip-us.apache.org/repos/asf/tez/blob/60c26c7b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java index 96788c5..cbcf32b 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java @@ -21,9 +21,11 @@ import java.net.URI; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.Deque; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -97,8 +99,6 @@ public class DAG { CallerContext callerContext; private Map<String,String> dagConf = new HashMap<String, String>(); - private Stack<String> topologicalVertexStack = new Stack<String>(); - private DAG(String name) { this.name = name; } @@ -529,7 +529,7 @@ public class DAG { } @VisibleForTesting - void verify(boolean restricted) throws IllegalStateException { + Deque<String> verify(boolean restricted) throws IllegalStateException { if (vertices.isEmpty()) { throw new IllegalStateException("Invalid dag containing 0 vertices"); } @@ -632,8 +632,8 @@ public class DAG { // When additional inputs are supported, this can be chceked easily (and early) // within the addInput / addOutput call itself. - detectCycles(edgeMap, vertexMap); - + Deque<String> topologicalVertexStack = detectCycles(edgeMap, vertexMap); + checkAndInferOneToOneParallelism(); if (restricted) { @@ -650,29 +650,36 @@ public class DAG { } } } + + return topologicalVertexStack; } // Adaptation of Tarjan's algorithm for connected components. // http://en.wikipedia.org/wiki/Tarjan%27s_strongly_connected_components_algorithm - private void detectCycles(Map<Vertex, List<Edge>> edgeMap, Map<String, AnnotatedVertex> vertexMap) + private Deque<String> detectCycles(Map<Vertex, List<Edge>> edgeMap, + Map<String, AnnotatedVertex> vertexMap) throws IllegalStateException { + Deque<String> topologicalVertexStack = new LinkedList<String>(); Integer nextIndex = 0; // boxed integer so it is passed by reference. Stack<AnnotatedVertex> stack = new Stack<DAG.AnnotatedVertex>(); for (AnnotatedVertex av : vertexMap.values()) { if (av.index == -1) { assert stack.empty(); - strongConnect(av, vertexMap, edgeMap, stack, nextIndex); + strongConnect(av, vertexMap, edgeMap, stack, nextIndex, topologicalVertexStack); } } + return topologicalVertexStack; } // part of Tarjan's algorithm for connected components. // http://en.wikipedia.org/wiki/Tarjan%27s_strongly_connected_components_algorithm private void strongConnect( - AnnotatedVertex av, - Map<String, AnnotatedVertex> vertexMap, - Map<Vertex, List<Edge>> edgeMap, - Stack<AnnotatedVertex> stack, Integer nextIndex) throws IllegalStateException { + AnnotatedVertex av, + Map<String, AnnotatedVertex> vertexMap, + Map<Vertex, List<Edge>> edgeMap, + Stack<AnnotatedVertex> stack, + Integer nextIndex, + Deque<String> topologicalVertexStack) throws IllegalStateException { av.index = nextIndex; av.lowlink = nextIndex; nextIndex++; @@ -684,7 +691,7 @@ public class DAG { for (Edge e : edgeMap.get(av.v)) { AnnotatedVertex outVertex = vertexMap.get(e.getOutputVertex().getName()); if (outVertex.index == -1) { - strongConnect(outVertex, vertexMap, edgeMap, stack, nextIndex); + strongConnect(outVertex, vertexMap, edgeMap, stack, nextIndex, topologicalVertexStack); av.lowlink = Math.min(av.lowlink, outVertex.lowlink); } else if (outVertex.onstack) { // strongly connected component detected, but we will wait till later so that the full cycle can be displayed. @@ -737,7 +744,7 @@ public class DAG { Map<String, LocalResource> tezJarResources, LocalResource binaryConfig, boolean tezLrsAsArchive, Map<String, String> additionalConfigs, JavaOptsChecker javaOptsChecker) { - verify(true); + Deque<String> topologicalVertexStack = verify(true); DAGPlan.Builder dagBuilder = DAGPlan.newBuilder(); dagBuilder.setName(this.name); http://git-wip-us.apache.org/repos/asf/tez/blob/60c26c7b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java ---------------------------------------------------------------------- diff --git a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java index c84df29..08bb156 100644 --- a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java +++ b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java @@ -61,6 +61,8 @@ import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezConfigurationConstants; import org.apache.tez.dag.api.TezConstants; import org.apache.tez.dag.api.TezException; +import org.apache.tez.dag.api.TezUncheckedException; +import org.apache.tez.dag.api.UserPayload; import org.apache.tez.dag.api.Vertex; import org.apache.tez.dag.api.client.DAGClient; import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolBlockingPB; @@ -509,4 +511,31 @@ public class TestTezClient { Assert.fail("Failed to retrieve local host information"); } } + + @Test(timeout = 5000) + public void testClientResubmit() throws Exception { + TezClientForTest client = configureAndCreateTezClient(null, true, null); + client.start(); + Map<String, LocalResource> lrDAG = Collections.singletonMap("LR1", + LocalResource.newInstance( + URL.newInstance("file", "localhost", 0, "/test1"), + LocalResourceType.FILE, + LocalResourceVisibility.PUBLIC, 1, 1)); + Vertex vertex1 = Vertex.create("Vertex1", ProcessorDescriptor.create("P1"), 1, + Resource.newInstance(1, 1)); + vertex1.setTaskLaunchCmdOpts("-XX:+UseParallelGC -XX:+UseG1GC"); + Vertex vertex2 = Vertex.create("Vertex2", ProcessorDescriptor.create("P2"), 1, + Resource.newInstance(1, 1)); + vertex2.setTaskLaunchCmdOpts("-XX:+UseParallelGC -XX:+UseG1GC"); + DAG dag = DAG.create("DAG").addVertex(vertex1).addVertex(vertex2).addTaskLocalFiles(lrDAG); + for (int i = 0; i < 3; ++i) { + try { + client.submitDAG(dag); + Assert.fail("Expected TezUncheckedException here."); + } catch(TezUncheckedException ex) { + Assert.assertTrue(ex.getMessage().contains("Invalid/conflicting GC options found")); + } + } + client.stop(); + } } http://git-wip-us.apache.org/repos/asf/tez/blob/60c26c7b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java ---------------------------------------------------------------------- diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java index aeef846..ebee17d 100644 --- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java +++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java @@ -18,11 +18,19 @@ package org.apache.tez.dag.api; +import java.util.Collections; +import java.util.Map; + +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.LocalResourceType; +import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.URL; import org.apache.tez.client.CallerContext; import org.apache.tez.dag.api.EdgeProperty.DataMovementType; import org.apache.tez.dag.api.EdgeProperty.DataSourceType; import org.apache.tez.dag.api.EdgeProperty.SchedulingType; +import org.apache.tez.dag.api.records.DAGProtos.DAGPlan; import org.junit.Assert; import org.junit.Test; @@ -333,4 +341,24 @@ public class TestDAG { } + @Test + public void testRecreateDAG() { + Map<String, LocalResource> lrDAG = Collections.singletonMap("LR1", + LocalResource.newInstance( + URL.newInstance("file", "localhost", 0, "/test1"), + LocalResourceType.FILE, + LocalResourceVisibility.PUBLIC, 1, 1)); + Vertex v1 = Vertex.create("v1", ProcessorDescriptor.create("dummyProcessor1"), 1, + Resource.newInstance(1, 1)); + Vertex v2 = Vertex.create("v2", ProcessorDescriptor.create("dummyProcessor2"), 1, + Resource.newInstance(1, 1)); + DAG dag = DAG.create("dag1").addVertex(v1).addVertex(v2).addTaskLocalFiles(lrDAG); + + TezConfiguration tezConf = new TezConfiguration(); + DAGPlan firstPlan = dag.createDag(tezConf, null, null, null, false); + for (int i = 0; i < 3; ++i) { + DAGPlan dagPlan = dag.createDag(tezConf, null, null, null, false); + Assert.assertEquals(dagPlan, firstPlan); + } + } }
