TEZ-31: implement DAG.verify() to validate structural properties of dag created through tez-dag-api
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/645c470e Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/645c470e Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/645c470e Branch: refs/heads/master Commit: 645c470eeb1c4476437498fed639c3c20f57c672 Parents: d660948 Author: mikelid <[email protected]> Authored: Tue May 14 17:13:14 2013 -0700 Committer: mikelid <[email protected]> Committed: Tue May 14 17:13:14 2013 -0700 ---------------------------------------------------------------------- .../src/main/java/org/apache/tez/dag/api/DAG.java | 149 +++++++++++- .../java/org/apache/tez/dag/api/TestDAGVerify.java | 194 +++++++++++++++ 2 files changed, 340 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/645c470e/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAG.java ---------------------------------------------------------------------- diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAG.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAG.java index 171921d..300ef31 100644 --- a/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAG.java +++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAG.java @@ -23,6 +23,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Stack; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceType; @@ -87,13 +88,155 @@ public class DAG { // FIXME rename to Topology this.name = name; } - public void verify() throws TezException { // FIXME better exception - //FIXME are task resources compulsory or will the DAG AM put in a default - //for each vertex if not specified? + // AnnotatedVertex is used by verify() + private class AnnotatedVertex { + Vertex v; + + int index; //for Tarjan's algorithm + int lowlink; //for Tarjan's algorithm + boolean onstack; //for Tarjan's algorithm + + int inDegree; + int outDegree; + + private AnnotatedVertex(Vertex v){ + this.v = v; + index = -1; + lowlink = -1; + inDegree = 0; + outDegree = 0; + } } + + // verify() + // + // Default rules + // Illegal: + // - duplicate vertex id + // - cycles + // + // Ok: + // - orphaned vertex. Occurs in map-only + // - islands. Occurs if job has unrelated workflows. + // + // Not yet categorized: + // - orphaned vertex in DAG of >1 vertex. Could be unrelated map-only job. + // - v1->v2 via two edges. perhaps some self-join job would use this? + // + // "restricted" mode: + // In short term, the supported DAGs are limited. Call with restricted=true for these verifications. + // Illegal: + // - any vertex with more than one input or output edge. (n-ary input, n-ary merge) + public void verify() throws IllegalStateException { + verify(true); + } + + public void verify(boolean restricted) throws IllegalStateException { + Map<Vertex, List<Edge>> edgeMap = new HashMap<Vertex, List<Edge>>(); + for(Edge e : edges){ + Vertex inputVertex = e.getInputVertex(); + List<Edge> edgeList = edgeMap.get(inputVertex); + if(edgeList == null){ + edgeList = new ArrayList<Edge>(); + edgeMap.put(inputVertex, edgeList); + } + edgeList.add(e); + } + // check for duplicate vertex names, and prepare for cycle detection + Map<String, AnnotatedVertex> vertexMap = new HashMap<String, AnnotatedVertex>(); + for(Vertex v : vertices){ + if(vertexMap.containsKey(v.getVertexName())){ + throw new IllegalStateException("DAG contains multiple vertices with name: " + v.getVertexName()); + } + vertexMap.put(v.getVertexName(), new AnnotatedVertex(v)); + } + + detectCycles(edgeMap, vertexMap); + + if(restricted){ + for(Edge e : edges){ + vertexMap.get(e.getInputVertex().getVertexName()).outDegree++; + vertexMap.get(e.getOutputVertex().getVertexName()).inDegree++; + } + for(AnnotatedVertex av: vertexMap.values()){ + if(av.inDegree > 1){ + throw new IllegalStateException("Vertex has inDegree>1: " + av.v.getVertexName()); + } + if(av.outDegree > 1){ + throw new IllegalStateException("Vertex has outDegree>1: " + av.v.getVertexName()); + } + } + } + } + + // Adaptation of Tarjan's algorithm for connected components. + // http://en.wikipedia.org/wiki/Tarjan%27s_strongly_connected_components_algorithm + private void detectCycles(Map<Vertex, List<Edge>> edgeMap, Map<String, AnnotatedVertex> vertexMap) + throws IllegalStateException{ + Integer nextIndex = 0; // boxed integer so it is passed by reference. + Stack<AnnotatedVertex> stack = new Stack<DAG.AnnotatedVertex>(); + for(AnnotatedVertex av: vertexMap.values()){ + if(av.index == -1){ + assert stack.empty(); + strongConnect(av, vertexMap, edgeMap, stack, nextIndex); + } + } + } + + // part of Tarjan's algorithm for connected components. + // http://en.wikipedia.org/wiki/Tarjan%27s_strongly_connected_components_algorithm + private void strongConnect( + AnnotatedVertex av, + Map<String, AnnotatedVertex> vertexMap, + Map<Vertex, List<Edge>> edgeMap, + Stack<AnnotatedVertex> stack, Integer nextIndex) throws IllegalStateException{ + av.index = nextIndex; + av.lowlink = nextIndex; + nextIndex++; + stack.push(av); + av.onstack = true; + + List<Edge> edges = edgeMap.get(av.v); + if(edges != null){ + for(Edge e : edgeMap.get(av.v)){ + AnnotatedVertex outVertex = vertexMap.get(e.getOutputVertex().getVertexName()); + if(outVertex.index == -1){ + strongConnect(outVertex, vertexMap, edgeMap, stack, nextIndex); + av.lowlink = Math.min(av.lowlink, outVertex.lowlink); + } + else if(outVertex.onstack){ + // strongly connected component detected, but we will wait till later so that the full cycle can be displayed. + // update lowlink in case outputVertex should be considered the root of this component. + av.lowlink = Math.min(av.lowlink, outVertex.index); + } + } + } + + if(av.lowlink == av.index ){ + AnnotatedVertex pop = stack.pop(); + pop.onstack = false; + if(pop != av){ + // there was something on the stack other than this "av". + // this indicates there is a scc/cycle. It comprises all nodes from top of stack to "av" + StringBuilder message = new StringBuilder(); + message.append(av.v.getVertexName() + " <- "); + for( ; pop != av; pop = stack.pop()){ + message.append(pop.v.getVertexName() + " <- "); + pop.onstack = false; + } + message.append(av.v.getVertexName()); + throw new IllegalStateException("DAG contains a cycle: " + message); + } + } + } + + // create protobuf message describing DAG public DAGPlan createDag(){ + + verify(true); + DAGPlan.Builder jobBuilder = DAGPlan.newBuilder(); jobBuilder.setName(this.name); http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/645c470e/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java ---------------------------------------------------------------------- diff --git a/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java b/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java new file mode 100644 index 0000000..4022604 --- /dev/null +++ b/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java @@ -0,0 +1,194 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.api; + +import org.apache.tez.dag.api.EdgeProperty.ConnectionPattern; +import org.apache.tez.dag.api.EdgeProperty.SourceType; +import org.junit.Assert; +import org.junit.Test; + +public class TestDAGVerify { + + private final String dummyProcessorClassName = TestDAGVerify.class.getName(); + private final String dummyInputClassName = TestDAGVerify.class.getName(); + private final String dummyOutputClassName = TestDAGVerify.class.getName(); + private final int dummyTaskCount = 2; + + // v1 + // | + // v2 + @Test + public void testVerify1() { + Vertex v1 = new Vertex("v1",dummyProcessorClassName, dummyTaskCount); + Vertex v2 = new Vertex("v2","org.apache.tez.mapreduce.processor.reduce.MapProcessor", dummyTaskCount); + Edge e1 = new Edge(v1, v2, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE, dummyInputClassName, dummyOutputClassName)); + DAG dag = new DAG(); + dag.addVertex(v1); + dag.addVertex(v2); + dag.addEdge(e1); + dag.verify(); + } + + // v1 <---- + // | ^ + // v2 ^ + // | | ^ + // v3 v4 + @Test + public void testCycle1() { + IllegalStateException ex=null; + Vertex v1 = new Vertex("v1","org.apache.tez.mapreduce.processor.reduce.MapProcessor", dummyTaskCount); + Vertex v2 = new Vertex("v2","org.apache.tez.mapreduce.processor.reduce.MapProcessor", dummyTaskCount); + Vertex v3 = new Vertex("v3","org.apache.tez.mapreduce.processor.reduce.MapProcessor", dummyTaskCount); + Vertex v4 = new Vertex("v4","org.apache.tez.mapreduce.processor.reduce.MapProcessor", dummyTaskCount); + Edge e1 = new Edge(v1, v2, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE, "dummy input class", "dummy output class")); + Edge e2 = new Edge(v2, v3, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE, "dummy input class", "dummy output class")); + Edge e3 = new Edge(v2, v4, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE, "dummy input class", "dummy output class")); + Edge e4 = new Edge(v4, v1, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE, "dummy input class", "dummy output class")); + DAG dag = new DAG(); + dag.addVertex(v1); + dag.addVertex(v2); + dag.addVertex(v3); + dag.addVertex(v4); + dag.addEdge(e1); + dag.addEdge(e2); + dag.addEdge(e3); + dag.addEdge(e4); + try{ + dag.verify(); + } + catch (IllegalStateException e){ + ex = e; + } + Assert.assertNotNull(ex); + System.out.println(ex.getMessage()); + Assert.assertTrue(ex.getMessage().startsWith("DAG contains a cycle")); + } + +// v1 +// | +// -> v2 +// ^ | | +// v3 v4 + @Test + public void testCycle2() { + IllegalStateException ex=null; + Vertex v1 = new Vertex("v1","org.apache.tez.mapreduce.processor.reduce.MapProcessor", dummyTaskCount); + Vertex v2 = new Vertex("v2","org.apache.tez.mapreduce.processor.reduce.MapProcessor", dummyTaskCount); + Vertex v3 = new Vertex("v3","org.apache.tez.mapreduce.processor.reduce.MapProcessor", dummyTaskCount); + Vertex v4 = new Vertex("v4","org.apache.tez.mapreduce.processor.reduce.MapProcessor", dummyTaskCount); + Edge e1 = new Edge(v1, v2, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE, "dummy input class", "dummy output class")); + Edge e2 = new Edge(v2, v3, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE, "dummy input class", "dummy output class")); + Edge e3 = new Edge(v2, v4, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE, "dummy input class", "dummy output class")); + Edge e4 = new Edge(v3, v2, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE, "dummy input class", "dummy output class")); + DAG dag = new DAG(); + dag.addVertex(v1); + dag.addVertex(v2); + dag.addVertex(v3); + dag.addVertex(v4); + dag.addEdge(e1); + dag.addEdge(e2); + dag.addEdge(e3); + dag.addEdge(e4); + try{ + dag.verify(); + } + catch (IllegalStateException e){ + ex = e; + } + Assert.assertNotNull(ex); + System.out.println(ex.getMessage()); + Assert.assertTrue(ex.getMessage().startsWith("DAG contains a cycle")); + } + + @Test + public void repeatedVertexName() { + IllegalStateException ex=null; + Vertex v1 = new Vertex("v1","org.apache.tez.mapreduce.processor.reduce.MapProcessor", dummyTaskCount); + Vertex v1repeat = new Vertex("v1","org.apache.tez.mapreduce.processor.reduce.MapProcessor", dummyTaskCount); + DAG dag = new DAG(); + dag.addVertex(v1); + dag.addVertex(v1repeat); + try { + dag.verify(); + } + catch (IllegalStateException e){ + ex = e; + } + Assert.assertNotNull(ex); + System.out.println(ex.getMessage()); + Assert.assertTrue(ex.getMessage().startsWith("DAG contains multiple vertices with name")); + } + + // v1 v2 + // | | + // v3 + @Test + public void BinaryInput() { + IllegalStateException ex=null; + try { + Vertex v1 = new Vertex("v1","org.apache.tez.mapreduce.processor.reduce.MapProcessor", dummyTaskCount); + Vertex v2 = new Vertex("v2","org.apache.tez.mapreduce.processor.reduce.MapProcessor", dummyTaskCount); + Vertex v3 = new Vertex("v3","org.apache.tez.mapreduce.processor.reduce.MapProcessor", dummyTaskCount); + Edge e1 = new Edge(v1, v3, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE, "dummy input class", "dummy output class")); + Edge e2 = new Edge(v2, v3, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE, "dummy input class", "dummy output class")); + DAG dag = new DAG(); + dag.addVertex(v1); + dag.addVertex(v2); + dag.addVertex(v3); + dag.addEdge(e1); + dag.addEdge(e2); + dag.verify(); + } + catch (IllegalStateException e){ + ex = e; + } + Assert.assertNotNull(ex); + System.out.println(ex.getMessage()); + Assert.assertTrue(ex.getMessage().startsWith("Vertex has inDegree>1")); + } + + // v1 + // | | + // v2 v3 + @Test + public void BinaryOutput() { + IllegalStateException ex=null; + try { + Vertex v1 = new Vertex("v1","org.apache.tez.mapreduce.processor.reduce.MapProcessor", dummyTaskCount); + Vertex v2 = new Vertex("v2","org.apache.tez.mapreduce.processor.reduce.MapProcessor", dummyTaskCount); + Vertex v3 = new Vertex("v3","org.apache.tez.mapreduce.processor.reduce.MapProcessor", dummyTaskCount); + Edge e1 = new Edge(v1, v2, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE, "dummy input class", "dummy output class")); + Edge e2 = new Edge(v1, v2, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE, "dummy input class", "dummy output class")); + DAG dag = new DAG(); + dag.addVertex(v1); + dag.addVertex(v2); + dag.addVertex(v3); + dag.addEdge(e1); + dag.addEdge(e2); + dag.verify(); + } + catch (IllegalStateException e){ + ex = e; + } + Assert.assertNotNull(ex); + System.out.println(ex.getMessage()); + Assert.assertTrue(ex.getMessage().startsWith("Vertex has outDegree>1")); + } +}
