Repository: tez Updated Branches: refs/heads/master 261bbdd59 -> a37a367b4
TEZ-3958: Add internal vertex priority information into the tez dag.dot debug information (Jaume Marhuenda via Gopal V) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/5949b0ce Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/5949b0ce Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/5949b0ce Branch: refs/heads/master Commit: 5949b0cef6e9b23c1ac5c75461c027b40cafc91d Parents: 261bbdd Author: Jaume Marhuenda <[email protected]> Authored: Mon Aug 27 20:34:41 2018 -0700 Committer: Gopal V <[email protected]> Committed: Mon Aug 27 20:35:20 2018 -0700 ---------------------------------------------------------------------- tez-dag/src/main/java/org/apache/tez/Utils.java | 167 +++++++++++++++++++ .../org/apache/tez/dag/app/DAGAppMaster.java | 80 +-------- .../java/org/apache/tez/dag/app/dag/DAG.java | 9 + .../apache/tez/dag/app/dag/DAGScheduler.java | 22 +++ .../apache/tez/dag/app/dag/impl/DAGImpl.java | 9 + .../app/dag/impl/DAGSchedulerNaturalOrder.java | 5 +- .../DAGSchedulerNaturalOrderControlled.java | 5 +- .../tez/dag/app/dag/impl/TestDAGScheduler.java | 13 +- 8 files changed, 222 insertions(+), 88 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/5949b0ce/tez-dag/src/main/java/org/apache/tez/Utils.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/Utils.java b/tez-dag/src/main/java/org/apache/tez/Utils.java index 6f03a67..dbde327 100644 --- a/tez-dag/src/main/java/org/apache/tez/Utils.java +++ b/tez-dag/src/main/java/org/apache/tez/Utils.java @@ -14,18 +14,30 @@ package org.apache.tez; +import javax.annotation.Nullable; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.event.Event; +import org.apache.tez.common.TezCommonUtils; +import org.apache.tez.dag.api.records.DAGProtos; import org.apache.tez.dag.app.AppContext; import org.apache.tez.dag.app.dag.DAG; +import org.apache.tez.dag.app.dag.DAGScheduler; import org.apache.tez.dag.app.dag.DAGTerminationCause; +import org.apache.tez.dag.app.dag.Vertex; import org.apache.tez.dag.app.dag.event.DAGEventTerminateDag; import org.apache.tez.dag.records.TezDAGID; +import org.apache.tez.dag.utils.Graph; import org.apache.tez.serviceplugins.api.DagInfo; import org.apache.tez.serviceplugins.api.ServicePluginError; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; +import java.util.HashMap; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + @InterfaceAudience.Private /** * Utility class within the tez-dag module @@ -34,6 +46,11 @@ public class Utils { private static final Logger LOG = LoggerFactory.getLogger(Utils.class); + /** + * Pattern to clean the labels in the .dot generation. + */ + private static Pattern sanitizeLabelPattern = Pattern.compile("[:\\-\\W]+"); + public static String getContainerLauncherIdentifierString(int launcherIndex, AppContext appContext) { String name; try { @@ -92,6 +109,156 @@ public class Utils { } } + /** + * Generate a visualization file. + * @param dag DAG. + * @param dagPB DAG plan. + * @param scheduler scheduler that provide the priorities of the vertexes. + */ + public static void generateDAGVizFile(final DAG dag, + final DAGProtos.DAGPlan dagPB, @Nullable final DAGScheduler scheduler) { + generateDAGVizFile(dag, dagPB, TezCommonUtils.getTrimmedStrings( + System.getenv(ApplicationConstants.Environment.LOG_DIRS.name())), + scheduler); + } + + /** + * Generate a visualization file. + * @param dag DAG. + * @param dagPB DAG plan. + * @param logDirs directories where the file will be written. + * @param scheduler scheduler that will provide the priorities + * of the vertexes. + */ + public static void generateDAGVizFile(final DAG dag, + final DAGProtos.DAGPlan dagPB, + final String[] logDirs, final @Nullable DAGScheduler scheduler) { + TezDAGID dagId = dag.getID(); + + HashMap<String, Vertex> nameToVertex = null; + if (scheduler != null) { + nameToVertex = new HashMap<>(dag.getVertices().size()); + for (Vertex v: dag.getVertices().values()) { + nameToVertex.put(v.getName(), v); + } + } + + Graph graph = new Graph(sanitizeLabelForViz(dagPB.getName())); + for (DAGProtos.VertexPlan vertexPlan : dagPB.getVertexList()) { + StringBuilder nodeLabel = new StringBuilder( + sanitizeLabelForViz(vertexPlan.getName()) + + "[" + getShortClassName( + vertexPlan.getProcessorDescriptor().getClassName())); + + if (scheduler != null) { + Vertex vertex = nameToVertex.get(vertexPlan.getName()); + if (vertex != null) { + try { + int priority = (scheduler.getPriorityLowLimit(dag, vertex) + + scheduler.getPriorityHighLimit(dag,vertex)) / 2; + nodeLabel.append(", priority=").append(priority).append("]"); + } catch (UnsupportedOperationException e) { + LOG.info("The DAG graphviz file with priorities will not" + + " be generate since the scheduler " + + scheduler.getClass().getSimpleName() + " doesn't" + + " override the methods to get the priorities"); + return; + } + } + } + Graph.Node n = graph.newNode(sanitizeLabelForViz(vertexPlan.getName()), + nodeLabel.toString()); + for (DAGProtos.RootInputLeafOutputProto input + : vertexPlan.getInputsList()) { + Graph.Node inputNode = graph.getNode( + sanitizeLabelForViz(vertexPlan.getName()) + + "_" + sanitizeLabelForViz(input.getName())); + inputNode.setLabel(sanitizeLabelForViz(vertexPlan.getName()) + + "[" + sanitizeLabelForViz(input.getName()) + "]"); + inputNode.setShape("box"); + inputNode.addEdge(n, "Input" + + " [inputClass=" + getShortClassName( + input.getIODescriptor().getClassName()) + + ", initializer=" + getShortClassName( + input.getControllerDescriptor().getClassName()) + "]"); + } + for (DAGProtos.RootInputLeafOutputProto output + : vertexPlan.getOutputsList()) { + Graph.Node outputNode = graph.getNode(sanitizeLabelForViz( + vertexPlan.getName()) + + "_" + sanitizeLabelForViz(output.getName())); + outputNode.setLabel(sanitizeLabelForViz(vertexPlan.getName()) + + "[" + sanitizeLabelForViz(output.getName()) + "]"); + outputNode.setShape("box"); + n.addEdge(outputNode, "Output" + + " [outputClass=" + getShortClassName( + output.getIODescriptor().getClassName()) + + ", committer=" + getShortClassName( + output.getControllerDescriptor().getClassName()) + "]"); + } + } + + for (DAGProtos.EdgePlan e : dagPB.getEdgeList()) { + + Graph.Node n = graph.getNode(sanitizeLabelForViz( + e.getInputVertexName())); + n.addEdge(graph.getNode(sanitizeLabelForViz( + e.getOutputVertexName())), + "[" + + "input=" + getShortClassName(e.getEdgeSource().getClassName()) + + ", output=" + getShortClassName( + e.getEdgeDestination().getClassName()) + + ", dataMovement=" + e.getDataMovementType().name().trim() + + ", schedulingType=" + + e.getSchedulingType().name().trim() + "]"); + } + + String outputFile = ""; + if (logDirs != null && logDirs.length != 0) { + outputFile += logDirs[0]; + outputFile += File.separator; + } + outputFile += dagId.toString(); + // Means we have set the priorities + if (scheduler != null) { + outputFile += "_priority"; + } + outputFile += ".dot"; + + try { + LOG.info("Generating DAG graphviz file" + + ", dagId=" + dagId.toString() + + ", filePath=" + outputFile); + graph.save(outputFile); + } catch (Exception e) { + LOG.warn("Error occurred when trying to save graph structure" + + " for dag " + dagId.toString(), e); + } + } + + /** + * Get the short name of the class. + * @param className long name + * @return short name + */ + private static String getShortClassName(final String className) { + int pos = className.lastIndexOf("."); + if (pos != -1 && pos < className.length() - 1) { + return className.substring(pos + 1); + } + return className; + } + + /** + * Replace some characters with underscores. + * @param label label to sanitize + * @return the label with the replaced characters + */ + private static String sanitizeLabelForViz(final String label) { + Matcher m = sanitizeLabelPattern.matcher(label); + return m.replaceAll("_"); + } + @SuppressWarnings("unchecked") private static void sendEvent(AppContext appContext, Event<?> event) { appContext.getEventHandler().handle(event); http://git-wip-us.apache.org/repos/asf/tez/blob/5949b0ce/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index c4b8df0..42a9d57 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -55,8 +55,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.regex.Matcher; -import java.util.regex.Pattern; import com.google.common.collect.BiMap; import com.google.common.collect.HashBiMap; @@ -66,6 +64,7 @@ import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.GnuParser; import org.apache.commons.cli.Options; import org.apache.commons.lang.exception.ExceptionUtils; +import org.apache.tez.Utils; import org.apache.tez.client.CallerContext; import org.apache.tez.client.TezClientUtils; import org.apache.tez.common.TezUtils; @@ -131,7 +130,6 @@ import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.api.client.DAGClientHandler; import org.apache.tez.dag.api.client.DAGClientServer; -import org.apache.tez.dag.api.records.DAGProtos; import org.apache.tez.dag.api.records.DAGProtos.DAGPlan; import org.apache.tez.dag.api.records.DAGProtos.PlanLocalResourcesProto; import org.apache.tez.dag.api.records.DAGProtos.VertexPlan; @@ -179,7 +177,6 @@ import org.apache.tez.dag.history.events.DAGSubmittedEvent; import org.apache.tez.dag.history.utils.DAGUtils; import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezVertexID; -import org.apache.tez.dag.utils.Graph; import org.apache.tez.dag.utils.RelocalizationUtils; import org.apache.tez.dag.utils.Simple2LevelVersionComparator; import org.apache.tez.hadoop.shim.HadoopShim; @@ -227,8 +224,6 @@ public class DAGAppMaster extends AbstractService { public static final int SHUTDOWN_HOOK_PRIORITY = 30; private static final Joiner PATH_JOINER = Joiner.on('/'); - private static Pattern sanitizeLabelPattern = Pattern.compile("[:\\-\\W]+"); - @VisibleForTesting static final String INVALID_SESSION_ERR_MSG = "Initial application attempt in session mode failed. " + "Application cannot recover and continue properly as DAG recovery has been disabled"; @@ -1038,82 +1033,11 @@ public class DAGAppMaster extends AbstractService { LOG.warn("Failed to generate json for DAG", e); } - generateDAGVizFile(dagId, dagPB, logDirs); + Utils.generateDAGVizFile(newDag, dagPB, logDirs, newDag.getDAGScheduler()); writePBTextFile(newDag); return newDag; } // end createDag() - String getShortClassName(String className) { - int pos = className.lastIndexOf("."); - if (pos != -1 && pos < className.length()-1) { - return className.substring(pos+1); - } - return className; - } - - - private String sanitizeLabelForViz(String label) { - Matcher m = sanitizeLabelPattern.matcher(label); - return m.replaceAll("_"); - } - - private void generateDAGVizFile(TezDAGID dagId, DAGPlan dagPB, String[] logDirs) { - Graph graph = new Graph(sanitizeLabelForViz(dagPB.getName())); - - for (VertexPlan v : dagPB.getVertexList()) { - String nodeLabel = sanitizeLabelForViz(v.getName()) - + "[" + getShortClassName(v.getProcessorDescriptor().getClassName() + "]"); - Graph.Node n = graph.newNode(sanitizeLabelForViz(v.getName()), nodeLabel); - for (DAGProtos.RootInputLeafOutputProto input : v.getInputsList()) { - Graph.Node inputNode = graph.getNode(sanitizeLabelForViz(v.getName()) - + "_" + sanitizeLabelForViz(input.getName())); - inputNode.setLabel(sanitizeLabelForViz(v.getName()) - + "[" + sanitizeLabelForViz(input.getName()) + "]"); - inputNode.setShape("box"); - inputNode.addEdge(n, "Input" - + " [inputClass=" + getShortClassName(input.getIODescriptor().getClassName()) - + ", initializer=" + getShortClassName(input.getControllerDescriptor().getClassName()) + "]"); - } - for (DAGProtos.RootInputLeafOutputProto output : v.getOutputsList()) { - Graph.Node outputNode = graph.getNode(sanitizeLabelForViz(v.getName()) - + "_" + sanitizeLabelForViz(output.getName())); - outputNode.setLabel(sanitizeLabelForViz(v.getName()) - + "[" + sanitizeLabelForViz(output.getName()) + "]"); - outputNode.setShape("box"); - n.addEdge(outputNode, "Output" - + " [outputClass=" + getShortClassName(output.getIODescriptor().getClassName()) - + ", committer=" + getShortClassName(output.getControllerDescriptor().getClassName()) + "]"); - } - } - - for (DAGProtos.EdgePlan e : dagPB.getEdgeList()) { - - Graph.Node n = graph.getNode(sanitizeLabelForViz(e.getInputVertexName())); - n.addEdge(graph.getNode(sanitizeLabelForViz(e.getOutputVertexName())), - "[" - + "input=" + getShortClassName(e.getEdgeSource().getClassName()) - + ", output=" + getShortClassName(e.getEdgeDestination().getClassName()) - + ", dataMovement=" + e.getDataMovementType().name().trim() - + ", schedulingType=" + e.getSchedulingType().name().trim() + "]"); - } - - String outputFile = ""; - if (logDirs != null && logDirs.length != 0) { - outputFile += logDirs[0]; - outputFile += File.separator; - } - outputFile += dagId.toString() + ".dot"; - - try { - LOG.info("Generating DAG graphviz file" - + ", dagId=" + dagId.toString() - + ", filePath=" + outputFile); - graph.save(outputFile); - } catch (Exception e) { - LOG.warn("Error occurred when trying to save graph structure" - + " for dag " + dagId.toString(), e); - } - } private void writePBTextFile(DAG dag) { if (dag.getConf().getBoolean(TezConfiguration.TEZ_GENERATE_DEBUG_ARTIFACTS, http://git-wip-us.apache.org/repos/asf/tez/blob/5949b0ce/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java index 10c4257..5c2eba1 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java @@ -38,6 +38,8 @@ import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezVertexID; import org.apache.tez.serviceplugins.api.DagInfo; +import javax.annotation.Nullable; + /** * Main interface to interact with the job. */ @@ -97,4 +99,11 @@ public interface DAG extends DagInfo { org.apache.tez.dag.api.Vertex.VertexExecutionContext getDefaultExecutionContext(); + /** + * + * @return the DAGScheduler that will schedule + * this DAG, null if it doesn't exist + */ + @Nullable DAGScheduler getDAGScheduler(); + } http://git-wip-us.apache.org/repos/asf/tez/blob/5949b0ce/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGScheduler.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGScheduler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGScheduler.java index 3055cd3..2fa735e 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGScheduler.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGScheduler.java @@ -89,4 +89,26 @@ public abstract class DAGScheduler { public abstract void scheduleTaskEx(DAGEventSchedulerUpdate event); public abstract void taskCompletedEx(DAGEventSchedulerUpdate event); + + /** + * Get the low limit priority for a particular vertex. + * @param vertex to get the priority of + * @return the priority + */ + public int getPriorityLowLimit(final DAG dag, final Vertex vertex) { + final int vertexDistanceFromRoot = vertex.getDistanceFromRoot(); + return ((vertexDistanceFromRoot + 1) * dag.getTotalVertices() * 3) + + (vertex.getVertexId().getId() * 3); + } + + /** + * Get the low hight priority for a particular vertex. Default + * to the low limit priority minus two. + * @param vertex to get the priority of + * @return the priority + */ + public int getPriorityHighLimit(final DAG dag, final Vertex vertex) { + return getPriorityLowLimit(dag, vertex) - 2; + } + } http://git-wip-us.apache.org/repos/asf/tez/blob/5949b0ce/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java index bd5e0ff..6dcc7f0 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java @@ -42,6 +42,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.exception.ExceptionUtils; +import org.apache.tez.Utils; import org.apache.tez.common.TezUtilsInternal; import org.apache.tez.common.counters.LimitExceededException; import org.apache.tez.dag.app.dag.event.DAGEventTerminateDag; @@ -1620,6 +1621,9 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, } } + // This is going to override the previously generated file + // which didn't have the priorities + Utils.generateDAGVizFile(this, jobPlan, dagScheduler); return DAGState.INITED; } @@ -2382,6 +2386,11 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, } } + @Override + public DAGScheduler getDAGScheduler() { + return dagScheduler; + } + // output of either vertex or vertex group public static class OutputKey { String outputName; http://git-wip-us.apache.org/repos/asf/tez/blob/5949b0ce/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrder.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrder.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrder.java index 3a16f46..2383db8 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrder.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrder.java @@ -46,11 +46,10 @@ public class DAGSchedulerNaturalOrder extends DAGScheduler { public void scheduleTaskEx(DAGEventSchedulerUpdate event) { TaskAttempt attempt = event.getAttempt(); Vertex vertex = dag.getVertex(attempt.getVertexID()); - int vertexDistanceFromRoot = vertex.getDistanceFromRoot(); // natural priority. Handles failures and retries. - int priorityLowLimit = ((vertexDistanceFromRoot + 1) * dag.getTotalVertices() * 3) + (vertex.getVertexId().getId() * 3); - int priorityHighLimit = priorityLowLimit - 2; + int priorityLowLimit = getPriorityLowLimit(dag, vertex); + int priorityHighLimit = getPriorityHighLimit(dag, vertex); if (LOG.isDebugEnabled()) { LOG.debug("Scheduling " + attempt.getID() + " between priorityLow: " + priorityLowLimit http://git-wip-us.apache.org/repos/asf/tez/blob/5949b0ce/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrderControlled.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrderControlled.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrderControlled.java index 34cc92f..c51783b 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrderControlled.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrderControlled.java @@ -76,11 +76,10 @@ public class DAGSchedulerNaturalOrderControlled extends DAGScheduler { public void scheduleTaskEx(DAGEventSchedulerUpdate event) { TaskAttempt attempt = event.getAttempt(); Vertex vertex = dag.getVertex(attempt.getVertexID()); - int vertexDistanceFromRoot = vertex.getDistanceFromRoot(); // natural priority. Handles failures and retries. - int priorityLowLimit = ((vertexDistanceFromRoot + 1) * dag.getTotalVertices() * 3) + (vertex.getVertexId().getId() * 3); - int priorityHighLimit = priorityLowLimit - 2; + int priorityLowLimit = getPriorityLowLimit(dag, vertex); + int priorityHighLimit = getPriorityHighLimit(dag, vertex); TaskAttemptEventSchedule attemptEvent = new TaskAttemptEventSchedule( attempt.getID(), priorityLowLimit, priorityHighLimit); http://git-wip-us.apache.org/repos/asf/tez/blob/5949b0ce/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGScheduler.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGScheduler.java index f38f689..07c361a 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGScheduler.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGScheduler.java @@ -58,14 +58,19 @@ public class TestDAGScheduler { TaskAttempt mockAttempt = mock(TaskAttempt.class); when(mockDag.getVertex((TezVertexID) any())).thenReturn(mockVertex); when(mockDag.getTotalVertices()).thenReturn(4); - when(mockVertex.getDistanceFromRoot()).thenReturn(0).thenReturn(1) - .thenReturn(2); + when(mockVertex.getDistanceFromRoot()) + .thenReturn(0).thenReturn(0) + .thenReturn(1).thenReturn(1) + .thenReturn(2).thenReturn(2); TezVertexID vId0 = TezVertexID.fromString("vertex_1436907267600_195589_1_00"); TezVertexID vId1 = TezVertexID.fromString("vertex_1436907267600_195589_1_01"); TezVertexID vId2 = TezVertexID.fromString("vertex_1436907267600_195589_1_02"); TezVertexID vId3 = TezVertexID.fromString("vertex_1436907267600_195589_1_03"); - when(mockVertex.getVertexId()).thenReturn(vId0).thenReturn(vId1) - .thenReturn(vId2).thenReturn(vId3); + when(mockVertex.getVertexId()) + .thenReturn(vId0).thenReturn(vId0) + .thenReturn(vId1).thenReturn(vId1) + .thenReturn(vId2).thenReturn(vId2) + .thenReturn(vId3).thenReturn(vId3); DAGEventSchedulerUpdate event = new DAGEventSchedulerUpdate( DAGEventSchedulerUpdate.UpdateType.TA_SCHEDULE, mockAttempt);
