Repository: tez Updated Branches: refs/heads/branch-0.7 868ca531c -> 8c8db7c51
http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/SVGUtils.java ---------------------------------------------------------------------- diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/SVGUtils.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/SVGUtils.java new file mode 100644 index 0000000..b91cdc4 --- /dev/null +++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/SVGUtils.java @@ -0,0 +1,334 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tez.analyzer.utils; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.io.output.FileWriterWithEncoding; +import org.apache.tez.analyzer.plugins.CriticalPathAnalyzer.CriticalPathStep; +import org.apache.tez.analyzer.plugins.CriticalPathAnalyzer.CriticalPathStep.EntityType; +import org.apache.tez.history.parser.datamodel.DagInfo; +import org.apache.tez.history.parser.datamodel.TaskAttemptInfo; + +import com.google.common.base.Joiner; + +import java.io.BufferedWriter; +import java.io.IOException; +import java.text.DecimalFormat; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +public class SVGUtils { + + private static int MAX_DAG_RUNTIME = 0; + private static final int SCREEN_WIDTH = 1800; + + public SVGUtils() { + } + + private int Y_MAX; + private int X_MAX; + private static final DecimalFormat secondFormat = new DecimalFormat("#.##"); + private static final int X_BASE = 100; + private static final int Y_BASE = 100; + private static final int TICK = 1; + private static final int STEP_GAP = 50; + private static final int TEXT_SIZE = 20; + private static final String RUNTIME_COLOR = "LightGreen"; + private static final String ALLOCATION_OVERHEAD_COLOR = "GoldenRod"; + private static final String LAUNCH_OVERHEAD_COLOR = "DarkSalmon"; + private static final String BORDER_COLOR = "Sienna"; + private static final String VERTEX_INIT_COMMIT_COLOR = "LightSalmon"; + private static final String CRITICAL_COLOR = "IndianRed"; + private static final float RECT_OPACITY = 1.0f; + private static final String TITLE_BR = " "; + + public static String getTimeStr(final long millis) { + long minutes = TimeUnit.MILLISECONDS.toMinutes(millis) + - TimeUnit.HOURS.toMinutes(TimeUnit.MILLISECONDS.toHours(millis)); + long hours = TimeUnit.MILLISECONDS.toHours(millis); + StringBuilder b = new StringBuilder(); + b.append(hours == 0 ? "" : String.valueOf(hours) + "h"); + b.append(minutes == 0 ? "" : String.valueOf(minutes) + "m"); + long seconds = millis - TimeUnit.MINUTES.toMillis( + TimeUnit.MILLISECONDS.toMinutes(millis)); + b.append(secondFormat.format(seconds/1000.0) + "s"); + + return b.toString(); + } + + List<String> svgLines = new LinkedList<String>(); + + private final int addOffsetX(int x) { + int xOff = x + X_BASE; + X_MAX = Math.max(X_MAX, xOff); + return xOff; + } + + private final int addOffsetY(int y) { + int yOff = y + Y_BASE; + Y_MAX = Math.max(Y_MAX, yOff); + return yOff; + } + + private int scaleDown(int len) { + return Math.round((len * 1.0f / MAX_DAG_RUNTIME) * SCREEN_WIDTH); + } + + private void addRectStr(int x, int width, int y, int height, + String fillColor, String borderColor, float opacity, String title) { + String rectStyle = "stroke: " + borderColor + "; fill: " + fillColor + "; opacity: " + opacity; + String rectStr = "<rect x=\"" + addOffsetX(scaleDown(x)) + "\"" + + " y=\"" + addOffsetY(y) + "\"" + + " width=\"" + scaleDown(width) + "\"" + + " height=\"" + height + "\"" + + " style=\"" + rectStyle + "\"" + + " >" + + " <title>" + title +"</title>" + + " </rect>"; + svgLines.add(rectStr); + } + + private void addTextStr(int x, int y, String text, String anchor, int size, String title, boolean italic) { + String textStyle = "text-anchor: " + anchor + "; font-style: " + (italic?"italic":"normal") + + "; font-size: " + size + "px;"; + String textStr = "<text x=\"" + addOffsetX(scaleDown(x)) + "\" " + + "y=\"" + addOffsetY(y) + "\" " + + "style=\"" + textStyle + "\" transform=\"\">" + + text + + " <title>" + title +"</title>" + + "</text>"; + svgLines.add(textStr); + } + + private void addLineStr(int x1, int y1, int x2, int y2, String color, String title, int width) { + String style = "stroke: " + color + "; stroke-width:" + width; + String str = "<line x1=\"" + addOffsetX(scaleDown(x1)) + "\"" + + " y1=\"" + addOffsetY(y1) + "\"" + + " x2=\"" + addOffsetX(scaleDown(x2)) + "\"" + + " y2=\"" + addOffsetY(y2) + "\"" + + " style=\"" + style + "\"" + + " >" + + " <title>" + title +"</title>" + + " </line>"; + svgLines.add(str); + } + + public void drawStep(CriticalPathStep step, long dagStartTime, int yOffset) { + if (step.getType() != EntityType.ATTEMPT) { + // draw initial vertex or final commit overhead + StringBuilder title = new StringBuilder(); + String text = null; + if (step.getType() == EntityType.VERTEX_INIT) { + String vertex = step.getAttempt().getTaskInfo().getVertexInfo().getVertexName(); + text = vertex + " : Init"; + title.append(text).append(TITLE_BR); + } else { + text = "Output Commit"; + title.append(text).append(TITLE_BR); + } + title.append("Critical Path Dependency: " + step.getReason()).append(TITLE_BR); + title.append( + "Critical Time: " + getTimeStr(step.getStopCriticalTime() - step.getStartCriticalTime())) + .append(""); + title.append(Joiner.on(TITLE_BR).join(step.getNotes())); + String titleStr = title.toString(); + int stopTimeInterval = (int) (step.getStopCriticalTime() - dagStartTime); + int startTimeInterval = (int) (step.getStartCriticalTime() - dagStartTime); + addRectStr(startTimeInterval, + (stopTimeInterval - startTimeInterval), yOffset * STEP_GAP, STEP_GAP, + VERTEX_INIT_COMMIT_COLOR, BORDER_COLOR, RECT_OPACITY, titleStr); + addTextStr((stopTimeInterval + startTimeInterval) / 2, + (yOffset * STEP_GAP + STEP_GAP / 2), + text, "middle", + TEXT_SIZE, titleStr, false); + } else { + TaskAttemptInfo attempt = step.getAttempt(); + int startCriticalTimeInterval = (int) (step.getStartCriticalTime() - dagStartTime); + int stopCriticalTimeInterval = (int) (step.getStopCriticalTime() - dagStartTime); + int creationTimeInterval = (int) (attempt.getCreationTime() - dagStartTime); + int allocationTimeInterval = attempt.getAllocationTime() > 0 ? + (int) (attempt.getAllocationTime() - dagStartTime) : 0; + int launchTimeInterval = attempt.getStartTime() > 0 ? + (int) (attempt.getStartTime() - dagStartTime) : 0; + int finishTimeInterval = (int) (attempt.getFinishTime() - dagStartTime); + System.out.println(attempt.getTaskAttemptId() + " " + creationTimeInterval + " " + + allocationTimeInterval + " " + launchTimeInterval + " " + finishTimeInterval); + + StringBuilder title = new StringBuilder(); + title.append("Attempt: " + attempt.getTaskAttemptId()).append(TITLE_BR); + title.append("Critical Path Dependency: " + step.getReason()).append(TITLE_BR); + title.append("Completion Status: " + attempt.getDetailedStatus()).append(TITLE_BR); + title.append( + "Critical Time Contribution: " + + getTimeStr(step.getStopCriticalTime() - step.getStartCriticalTime())).append(TITLE_BR); + title.append("Critical start at: " + getTimeStr(startCriticalTimeInterval)).append(TITLE_BR); + title.append("Critical stop at: " + getTimeStr(stopCriticalTimeInterval)).append(TITLE_BR); + title.append("Created at: " + getTimeStr(creationTimeInterval)).append(TITLE_BR); + if (allocationTimeInterval > 0) { + title.append("Allocated at: " + getTimeStr(allocationTimeInterval)).append(TITLE_BR); + } + if (launchTimeInterval > 0) { + title.append("Launched at: " + getTimeStr(launchTimeInterval)).append(TITLE_BR); + } + title.append("Finished at: " + getTimeStr(finishTimeInterval)).append(TITLE_BR); + title.append(Joiner.on(TITLE_BR).join(step.getNotes())); + String titleStr = title.toString(); + + // handle cases when attempt fails before allocation or launch + if (allocationTimeInterval > 0) { + addRectStr(creationTimeInterval, allocationTimeInterval - creationTimeInterval, + yOffset * STEP_GAP, STEP_GAP, ALLOCATION_OVERHEAD_COLOR, BORDER_COLOR, RECT_OPACITY, + titleStr); + if (launchTimeInterval > 0) { + addRectStr(allocationTimeInterval, launchTimeInterval - allocationTimeInterval, + yOffset * STEP_GAP, STEP_GAP, LAUNCH_OVERHEAD_COLOR, BORDER_COLOR, RECT_OPACITY, + titleStr); + addRectStr(launchTimeInterval, finishTimeInterval - launchTimeInterval, yOffset * STEP_GAP, + STEP_GAP, RUNTIME_COLOR, BORDER_COLOR, RECT_OPACITY, titleStr); + } else { + // no launch - so allocate to finish drawn - ended while launching + addRectStr(allocationTimeInterval, finishTimeInterval - allocationTimeInterval, yOffset * STEP_GAP, + STEP_GAP, LAUNCH_OVERHEAD_COLOR, BORDER_COLOR, RECT_OPACITY, titleStr); + } + } else { + // no allocation - so create to finish drawn - ended while allocating + addRectStr(creationTimeInterval, finishTimeInterval - creationTimeInterval, yOffset * STEP_GAP, + STEP_GAP, ALLOCATION_OVERHEAD_COLOR, BORDER_COLOR, RECT_OPACITY, titleStr); + } + + addTextStr((finishTimeInterval + creationTimeInterval) / 2, + (yOffset * STEP_GAP + STEP_GAP / 2), attempt.getShortName(), "middle", TEXT_SIZE, + titleStr, !attempt.isSucceeded()); + } + } + + private void drawCritical(DagInfo dagInfo, List<CriticalPathStep> criticalPath) { + long dagStartTime = dagInfo.getStartTime(); + int dagStartTimeInterval = 0; // this is 0 since we are offseting from the dag start time + int dagFinishTimeInterval = (int) (dagInfo.getFinishTime() - dagStartTime); + if (dagInfo.getFinishTime() <= 0) { + // AM crashed. no dag finish time written + dagFinishTimeInterval =(int) (criticalPath.get(criticalPath.size()-1).getStopCriticalTime() + - dagStartTime); + } + MAX_DAG_RUNTIME = dagFinishTimeInterval; + + // draw grid + addLineStr(dagStartTimeInterval, 0, dagFinishTimeInterval, 0, BORDER_COLOR, "", TICK); + int yGrid = (criticalPath.size() + 2)*STEP_GAP; + for (int i=0; i<11; ++i) { + int x = Math.round(((dagFinishTimeInterval - dagStartTimeInterval)/10.0f)*i); + addLineStr(x, 0, x, yGrid, BORDER_COLOR, "", TICK); + addTextStr(x, 0, getTimeStr(x), "left", TEXT_SIZE, "", false); + } + addLineStr(dagStartTimeInterval, yGrid, dagFinishTimeInterval, yGrid, BORDER_COLOR, "", TICK); + addTextStr((dagFinishTimeInterval + dagStartTimeInterval) / 2, yGrid + STEP_GAP, + "Critical Path for " + dagInfo.getName() + " (" + dagInfo.getDagId() + ")", "middle", + TEXT_SIZE, "", false); + + // draw steps + for (int i=1; i<=criticalPath.size(); ++i) { + CriticalPathStep step = criticalPath.get(i-1); + drawStep(step, dagStartTime, i); + } + + // draw critical path on top + for (int i=1; i<=criticalPath.size(); ++i) { + CriticalPathStep step = criticalPath.get(i-1); + boolean isLast = i == criticalPath.size(); + + // draw critical path for step + int startCriticalTimeInterval = (int) (step.getStartCriticalTime() - dagStartTime); + int stopCriticalTimeInterval = (int) (step.getStopCriticalTime() - dagStartTime); + addLineStr(startCriticalTimeInterval, (i + 1) * STEP_GAP, stopCriticalTimeInterval, + (i + 1) * STEP_GAP, CRITICAL_COLOR, "Critical Time " + step.getAttempt().getShortName(), TICK*5); + + if (isLast) { + // last step. add commit overhead + int stepStopCriticalTimeInterval = (int) (step.getStopCriticalTime() - dagStartTime); + addLineStr(stepStopCriticalTimeInterval, (i + 1) * STEP_GAP, dagFinishTimeInterval, + (i + 1) * STEP_GAP, CRITICAL_COLOR, + "Critical Time " + step.getAttempt().getTaskInfo().getVertexInfo().getVertexName(), TICK*5); + } else { + // connect to next step in critical path + addLineStr(stopCriticalTimeInterval, (i + 1) * STEP_GAP, stopCriticalTimeInterval, + (i + 2) * STEP_GAP, CRITICAL_COLOR, "Critical Time " + step.getAttempt().getShortName(), TICK*5); + } + } + + // draw legend + int legendX = 0; + int legendY = (criticalPath.size() + 2) * STEP_GAP; + int legendWidth = dagFinishTimeInterval/5; + + addRectStr(legendX, legendWidth, legendY, STEP_GAP/2, VERTEX_INIT_COMMIT_COLOR, BORDER_COLOR, RECT_OPACITY, ""); + addTextStr(legendX, legendY + STEP_GAP/3, "Vertex Init/Commit Overhead", "left", TEXT_SIZE, "", false); + legendY += STEP_GAP/2; + addRectStr(legendX, legendWidth, legendY, STEP_GAP/2, ALLOCATION_OVERHEAD_COLOR, BORDER_COLOR, RECT_OPACITY, ""); + addTextStr(legendX, legendY + STEP_GAP/3, "Task Allocation Overhead", "left", TEXT_SIZE, "", false); + legendY += STEP_GAP/2; + addRectStr(legendX, legendWidth, legendY, STEP_GAP/2, LAUNCH_OVERHEAD_COLOR, BORDER_COLOR, RECT_OPACITY, ""); + addTextStr(legendX, legendY + STEP_GAP/3, "Task Launch Overhead", "left", TEXT_SIZE, "", false); + legendY += STEP_GAP/2; + addRectStr(legendX, legendWidth, legendY, STEP_GAP/2, RUNTIME_COLOR, BORDER_COLOR, RECT_OPACITY, ""); + addTextStr(legendX, legendY + STEP_GAP/3, "Task Execution Time", "left", TEXT_SIZE, "", false); + + Y_MAX += Y_BASE*2; + X_MAX += X_BASE*2; + } + + public void saveCriticalPathAsSVG(DagInfo dagInfo, + String fileName, List<CriticalPathStep> criticalPath) { + drawCritical(dagInfo, criticalPath); + saveFileStr(fileName); + } + + private void saveFileStr(String fileName) { + String header = "<?xml version=\"1.0\" standalone=\"no\"?> " + + "<!DOCTYPE svg PUBLIC \"-//W3C//DTD SVG 1.1//EN\" " + + "\"http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd\">" + + "<svg xmlns=\"http://www.w3.org/2000/svg\" version=\"1.1\" " + + "xmlns:xlink=\"http://www.w3.org/1999/xlink\" " + + "height=\"" + Y_MAX + "\" " + + "width=\"" + X_MAX + "\"> " + + "<script type=\"text/ecmascript\" " + + "xlink:href=\"http://code.jquery.com/jquery-2.1.1.min.js\" />"; + String footer = "</svg>"; + String newline = System.getProperty("line.separator"); + BufferedWriter writer = null; + try { + writer = new BufferedWriter(new FileWriterWithEncoding(fileName, "UTF-8")); + writer.write(header); + writer.write(newline); + for (String str : svgLines) { + writer.write(str); + writer.write(newline); + } + writer.write(footer); + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + if (writer != null) { + IOUtils.closeQuietly(writer); + } + } + + } + +} http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/Utils.java ---------------------------------------------------------------------- diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/Utils.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/Utils.java new file mode 100644 index 0000000..8bcf265 --- /dev/null +++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/Utils.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.analyzer.utils; + +import com.sun.istack.Nullable; +import org.apache.tez.dag.utils.Graph; +import org.apache.tez.history.parser.datamodel.AdditionalInputOutputDetails; +import org.apache.tez.history.parser.datamodel.DagInfo; +import org.apache.tez.history.parser.datamodel.EdgeInfo; +import org.apache.tez.history.parser.datamodel.VertexInfo; + +import java.io.IOException; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class Utils { + + private static Pattern sanitizeLabelPattern = Pattern.compile("[:\\-\\W]+"); + + public static String getShortClassName(String className) { + int pos = className.lastIndexOf("."); + if (pos != -1 && pos < className.length() - 1) { + return className.substring(pos + 1); + } + return className; + } + + public static String sanitizeLabelForViz(String label) { + Matcher m = sanitizeLabelPattern.matcher(label); + return m.replaceAll("_"); + } + + public static void generateDAGVizFile(DagInfo dagInfo, String fileName, + @Nullable List<String> criticalVertices) throws IOException { + Graph graph = new Graph(sanitizeLabelForViz(dagInfo.getName())); + + for (VertexInfo v : dagInfo.getVertices()) { + String nodeLabel = sanitizeLabelForViz(v.getVertexName()) + + "[" + getShortClassName(v.getProcessorClassName() + + ", tasks=" + v.getTasks().size() + ", time=" + v.getTimeTaken() +" ms]"); + Graph.Node n = graph.newNode(sanitizeLabelForViz(v.getVertexName()), nodeLabel); + + boolean criticalVertex = (criticalVertices != null) ? criticalVertices.contains(v + .getVertexName()) : false; + if (criticalVertex) { + n.setColor("red"); + } + + + for (AdditionalInputOutputDetails input : v.getAdditionalInputInfoList()) { + Graph.Node inputNode = graph.getNode(sanitizeLabelForViz(v.getVertexName()) + + "_" + sanitizeLabelForViz(input.getName())); + inputNode.setLabel(sanitizeLabelForViz(v.getVertexName()) + + "[" + sanitizeLabelForViz(input.getName()) + "]"); + inputNode.setShape("box"); + inputNode.addEdge(n, "Input name=" + input.getName() + + " [inputClass=" + getShortClassName(input.getClazz()) + + ", initializer=" + getShortClassName(input.getInitializer()) + "]"); + } + for (AdditionalInputOutputDetails output : v.getAdditionalOutputInfoList()) { + Graph.Node outputNode = graph.getNode(sanitizeLabelForViz(v.getVertexName()) + + "_" + sanitizeLabelForViz(output.getName())); + outputNode.setLabel(sanitizeLabelForViz(v.getVertexName()) + + "[" + sanitizeLabelForViz(output.getName()) + "]"); + outputNode.setShape("box"); + n.addEdge(outputNode, "Output name=" + output.getName() + + " [outputClass=" + getShortClassName(output.getClazz()) + + ", committer=" + getShortClassName(output.getInitializer()) + "]"); + } + + } + + for (EdgeInfo e : dagInfo.getEdges()) { + Graph.Node n = graph.getNode(sanitizeLabelForViz(e.getInputVertexName())); + n.addEdge(graph.getNode(sanitizeLabelForViz(e.getOutputVertexName())), + "[input=" + getShortClassName(e.getEdgeSourceClass()) + + ", output=" + getShortClassName(e.getEdgeDestinationClass()) + + ", dataMovement=" + e.getDataMovementType().trim() + "]"); + } + + graph.save(fileName); + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-tools/analyzers/job-analyzer/src/test/java/org/apache/tez/analyzer/TestAnalyzer.java ---------------------------------------------------------------------- diff --git a/tez-tools/analyzers/job-analyzer/src/test/java/org/apache/tez/analyzer/TestAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/test/java/org/apache/tez/analyzer/TestAnalyzer.java new file mode 100644 index 0000000..f680f59 --- /dev/null +++ b/tez-tools/analyzers/job-analyzer/src/test/java/org/apache/tez/analyzer/TestAnalyzer.java @@ -0,0 +1,823 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.analyzer; + +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.util.Collections; +import java.util.List; +import java.util.Random; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.tez.analyzer.plugins.CriticalPathAnalyzer; +import org.apache.tez.analyzer.plugins.CriticalPathAnalyzer.CriticalPathDependency; +import org.apache.tez.analyzer.plugins.CriticalPathAnalyzer.CriticalPathStep; +import org.apache.tez.analyzer.plugins.CriticalPathAnalyzer.CriticalPathStep.EntityType; +import org.apache.tez.client.TezClient; +import org.apache.tez.dag.api.DAG; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.TezConstants; +import org.apache.tez.dag.api.client.DAGClient; +import org.apache.tez.dag.api.client.DAGStatus; +import org.apache.tez.dag.history.logging.ats.ATSHistoryLoggingService; +import org.apache.tez.dag.history.logging.impl.SimpleHistoryLoggingService; +import org.apache.tez.dag.records.TaskAttemptTerminationCause; +import org.apache.tez.dag.records.TezDAGID; +import org.apache.tez.history.ATSImportTool; +import org.apache.tez.history.parser.ATSFileParser; +import org.apache.tez.history.parser.SimpleHistoryParser; +import org.apache.tez.history.parser.datamodel.DagInfo; +import org.apache.tez.test.SimpleTestDAG; +import org.apache.tez.test.SimpleTestDAG3Vertices; +import org.apache.tez.test.TestInput; +import org.apache.tez.test.TestProcessor; +import org.apache.tez.test.dag.SimpleReverseVTestDAG; +import org.apache.tez.test.dag.SimpleVTestDAG; +import org.apache.tez.tests.MiniTezClusterWithTimeline; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Joiner; +import com.google.common.collect.Lists; + +public class TestAnalyzer { + private static final Logger LOG = LoggerFactory.getLogger(TestAnalyzer.class); + + private static String TEST_ROOT_DIR = + "target" + Path.SEPARATOR + TestAnalyzer.class.getName() + "-tmpDir"; + private static String DOWNLOAD_DIR = TEST_ROOT_DIR + Path.SEPARATOR + "download"; + private final static String SIMPLE_HISTORY_DIR = "/tmp/simplehistory/"; + private final static String HISTORY_TXT = "history.txt"; + + private static MiniDFSCluster dfsCluster; + private static MiniTezClusterWithTimeline miniTezCluster; + + private static Configuration conf = new Configuration(); + private static FileSystem fs; + + private static TezClient tezSession = null; + + private boolean usingATS = true; + private boolean downloadedSimpleHistoryFile = false; + + @BeforeClass + public static void setupClass() throws Exception { + conf = new Configuration(); + conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_EDITS_NOEDITLOGCHANNELFLUSH, false); + EditLogFileOutputStream.setShouldSkipFsyncForTesting(true); + conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR); + dfsCluster = + new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build(); + fs = dfsCluster.getFileSystem(); + conf.set("fs.defaultFS", fs.getUri().toString()); + + setupTezCluster(); + } + + @AfterClass + public static void tearDownClass() throws Exception { + LOG.info("Stopping mini clusters"); + if (miniTezCluster != null) { + miniTezCluster.stop(); + miniTezCluster = null; + } + if (dfsCluster != null) { + dfsCluster.shutdown(); + dfsCluster = null; + } + } + + private CriticalPathAnalyzer setupCPAnalyzer() { + Configuration analyzerConf = new Configuration(false); + analyzerConf.setBoolean(CriticalPathAnalyzer.DRAW_SVG, false); + CriticalPathAnalyzer cp = new CriticalPathAnalyzer(); + cp.setConf(analyzerConf); + return cp; + } + + private static void setupTezCluster() throws Exception { + // make the test run faster by speeding heartbeat frequency + conf.setInt(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, 100); + conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); + conf.setBoolean(TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS, true); + conf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS, ATSHistoryLoggingService + .class.getName()); + + miniTezCluster = + new MiniTezClusterWithTimeline(TestAnalyzer.class.getName(), 1, 1, 1, true); + + miniTezCluster.init(conf); + miniTezCluster.start(); + } + + private TezConfiguration createCommonTezLog() throws Exception { + TezConfiguration tezConf = new TezConfiguration(miniTezCluster.getConfig()); + + tezConf.setInt(TezConfiguration.TEZ_AM_RM_HEARTBEAT_INTERVAL_MS_MAX, 100); + Path remoteStagingDir = dfsCluster.getFileSystem().makeQualified(new Path(TEST_ROOT_DIR, String + .valueOf(new Random().nextInt(100000)))); + + tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, + remoteStagingDir.toString()); + tezConf.setBoolean(TezConfiguration.TEZ_AM_NODE_BLACKLISTING_ENABLED, false); + + return tezConf; + } + + private void createTezSessionATS() throws Exception { + TezConfiguration tezConf = createCommonTezLog(); + tezConf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); + tezConf.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS, "0.0.0.0:8188"); + tezConf.setBoolean(TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS, true); + tezConf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS, + ATSHistoryLoggingService.class.getName()); + + Path remoteStagingDir = dfsCluster.getFileSystem().makeQualified(new Path(TEST_ROOT_DIR, String + .valueOf(new Random().nextInt(100000)))); + + tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, + remoteStagingDir.toString()); + tezConf.setBoolean(TezConfiguration.TEZ_AM_NODE_BLACKLISTING_ENABLED, false); + + tezSession = TezClient.create("TestAnalyzer", tezConf, true); + tezSession.start(); + } + + private void createTezSessionSimpleHistory() throws Exception { + TezConfiguration tezConf = createCommonTezLog(); + tezConf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS, + SimpleHistoryLoggingService.class.getName()); + + tezConf.set(TezConfiguration.TEZ_SIMPLE_HISTORY_LOGGING_DIR, SIMPLE_HISTORY_DIR); + + Path remoteStagingDir = dfsCluster.getFileSystem().makeQualified(new Path(TEST_ROOT_DIR, String + .valueOf(new Random().nextInt(100000)))); + + tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, + remoteStagingDir.toString()); + tezConf.setBoolean(TezConfiguration.TEZ_AM_NODE_BLACKLISTING_ENABLED, false); + + tezSession = TezClient.create("TestFaultTolerance", tezConf, true); + tezSession.start(); + } + + private StepCheck createStep(String attempt, CriticalPathDependency reason) { + return createStep(attempt, reason, null, null); + } + + private StepCheck createStep(String attempt, CriticalPathDependency reason, + TaskAttemptTerminationCause errCause, List<String> notes) { + return new StepCheck(attempt, reason, errCause, notes); + } + + private class StepCheck { + String attempt; // attempt is the TaskAttemptInfo short name with regex + CriticalPathDependency reason; + TaskAttemptTerminationCause errCause; + List<String> notesStr; + + StepCheck(String attempt, CriticalPathDependency reason, + TaskAttemptTerminationCause cause, List<String> notes) { + this.attempt = attempt; + this.reason = reason; + this.errCause = cause; + this.notesStr = notes; + } + String getAttemptDetail() { + return attempt; + } + CriticalPathDependency getReason() { + return reason; + } + TaskAttemptTerminationCause getErrCause() { + return errCause; + } + List<String> getNotesStr() { + return notesStr; + } + } + + private void runDAG(DAG dag, DAGStatus.State finalState) throws Exception { + tezSession.waitTillReady(); + LOG.info("ABC Running DAG name: " + dag.getName()); + DAGClient dagClient = tezSession.submitDAG(dag); + DAGStatus dagStatus = dagClient.getDAGStatus(null); + while (!dagStatus.isCompleted()) { + LOG.info("Waiting for dag to complete. Sleeping for 500ms." + + " DAG name: " + dag.getName() + + " DAG appContext: " + dagClient.getExecutionContext() + + " Current state: " + dagStatus.getState()); + Thread.sleep(100); + dagStatus = dagClient.getDAGStatus(null); + } + + Assert.assertEquals(finalState, dagStatus.getState()); + } + + private void verify(ApplicationId appId, int dagNum, List<StepCheck[]> steps) throws Exception { + String dagId = TezDAGID.getInstance(appId, dagNum).toString(); + DagInfo dagInfo = getDagInfo(dagId); + + verifyCriticalPath(dagInfo, steps); + } + + private DagInfo getDagInfo(String dagId) throws Exception { + // sleep for a bit to let ATS events be sent from AM + DagInfo dagInfo = null; + if (usingATS) { + //Export the data from ATS + String[] args = { "--dagId=" + dagId, "--downloadDir=" + DOWNLOAD_DIR }; + + int result = ATSImportTool.process(args); + assertTrue(result == 0); + + //Parse ATS data and verify results + //Parse downloaded contents + File downloadedFile = new File(DOWNLOAD_DIR + + Path.SEPARATOR + dagId + ".zip"); + ATSFileParser parser = new ATSFileParser(downloadedFile); + dagInfo = parser.getDAGData(dagId); + assertTrue(dagInfo.getDagId().equals(dagId)); + } else { + if (!downloadedSimpleHistoryFile) { + downloadedSimpleHistoryFile = true; + TezDAGID tezDAGID = TezDAGID.fromString(dagId); + ApplicationAttemptId applicationAttemptId = ApplicationAttemptId.newInstance(tezDAGID + .getApplicationId(), 1); + Path historyPath = new Path(miniTezCluster.getConfig().get("fs.defaultFS") + + SIMPLE_HISTORY_DIR + HISTORY_TXT + "." + + applicationAttemptId); + FileSystem fs = historyPath.getFileSystem(miniTezCluster.getConfig()); + + Path localPath = new Path(DOWNLOAD_DIR, HISTORY_TXT); + fs.copyToLocalFile(historyPath, localPath); + } + //Now parse via SimpleHistory + File localFile = new File(DOWNLOAD_DIR, HISTORY_TXT); + SimpleHistoryParser parser = new SimpleHistoryParser(localFile); + dagInfo = parser.getDAGData(dagId); + assertTrue(dagInfo.getDagId().equals(dagId)); + } + return dagInfo; + } + + private void verifyCriticalPath(DagInfo dagInfo, List<StepCheck[]> stepsOptions) throws Exception { + CriticalPathAnalyzer cp = setupCPAnalyzer(); + cp.analyze(dagInfo); + + List<CriticalPathStep> criticalPath = cp.getCriticalPath(); + + for (CriticalPathStep step : criticalPath) { + LOG.info("ABC Step: " + step.getType()); + if (step.getType() == EntityType.ATTEMPT) { + LOG.info("ABC Attempt: " + step.getAttempt().getShortName() + + " " + step.getAttempt().getDetailedStatus()); + } + LOG.info("ABC Reason: " + step.getReason()); + String notes = Joiner.on(";").join(step.getNotes()); + LOG.info("ABC Notes: " + notes); + } + + boolean foundMatchingLength = false; + for (StepCheck[] steps : stepsOptions) { + if (steps.length + 2 == criticalPath.size()) { + foundMatchingLength = true; + Assert.assertEquals(CriticalPathStep.EntityType.VERTEX_INIT, criticalPath.get(0).getType()); + Assert.assertEquals(criticalPath.get(1).getAttempt().getShortName(), + criticalPath.get(0).getAttempt().getShortName()); + + for (int i=1; i<criticalPath.size() - 1; ++i) { + StepCheck check = steps[i-1]; + CriticalPathStep step = criticalPath.get(i); + Assert.assertEquals(CriticalPathStep.EntityType.ATTEMPT, step.getType()); + Assert.assertTrue(check.getAttemptDetail(), + step.getAttempt().getShortName().matches(check.getAttemptDetail())); + Assert.assertEquals(steps[i-1].getReason(), step.getReason()); + if (check.getErrCause() != null) { + Assert.assertEquals(check.getErrCause(), + TaskAttemptTerminationCause.valueOf(step.getAttempt().getTerminationCause())); + } + if (check.getNotesStr() != null) { + String notes = Joiner.on("#").join(step.getNotes()); + for (String note : check.getNotesStr()) { + Assert.assertTrue(note, notes.contains(notes)); + } + } + } + + Assert.assertEquals(CriticalPathStep.EntityType.DAG_COMMIT, + criticalPath.get(criticalPath.size() - 1).getType()); + break; + } + } + + Assert.assertTrue(foundMatchingLength); + + } + + @Test (timeout=300000) + public void testWithATS() throws Exception { + usingATS = true; + createTezSessionATS(); + runTests(); + } + + @Test (timeout=300000) + public void testWithSimpleHistory() throws Exception { + usingATS = false; + createTezSessionSimpleHistory(); + runTests(); + } + + private void runTests() throws Exception { + ApplicationId appId = tezSession.getAppMasterApplicationId(); + List<List<StepCheck[]>> stepsOptions = Lists.newArrayList(); + // run all test dags + stepsOptions.add(testAttemptOfDownstreamVertexConnectedWithTwoUpstreamVerticesFailure()); + stepsOptions.add(testInputFailureCausesRerunOfTwoVerticesWithoutExit()); + stepsOptions.add(testMultiVersionInputFailureWithoutExit()); + stepsOptions.add(testCascadingInputFailureWithoutExitSuccess()); + stepsOptions.add(testTaskMultipleFailures()); + stepsOptions.add(testBasicInputFailureWithoutExit()); + stepsOptions.add(testBasicTaskFailure()); + stepsOptions.add(testBasicSuccessScatterGather()); + stepsOptions.add(testMultiVersionInputFailureWithExit()); + stepsOptions.add(testBasicInputFailureWithExit()); + stepsOptions.add(testInputFailureRerunCanSendOutputToTwoDownstreamVertices()); + stepsOptions.add(testCascadingInputFailureWithExitSuccess()); + stepsOptions.add(testInternalPreemption()); + + // close session to flush + if (tezSession != null) { + tezSession.stop(); + } + Thread.sleep((TezConstants.TEZ_DAG_SLEEP_TIME_BEFORE_EXIT*3)/2); + + // verify all dags + for (int i=0; i<stepsOptions.size(); ++i) { + verify(appId, i+1, stepsOptions.get(i)); + } + } + + private List<StepCheck[]> testBasicSuccessScatterGather() throws Exception { + Configuration testConf = new Configuration(false); + testConf.setInt(SimpleTestDAG.TEZ_SIMPLE_DAG_NUM_TASKS, 1); + StepCheck[] check = { + createStep("v1 : 000000_0", CriticalPathDependency.INIT_DEPENDENCY), + createStep("v2 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY) + }; + DAG dag = SimpleTestDAG.createDAG("testBasicSuccessScatterGather", testConf); + runDAG(dag, DAGStatus.State.SUCCEEDED); + return Collections.singletonList(check); + } + + private List<StepCheck[]> testBasicTaskFailure() throws Exception { + Configuration testConf = new Configuration(false); + testConf.setInt(SimpleTestDAG.TEZ_SIMPLE_DAG_NUM_TASKS, 1); + testConf.setBoolean(TestProcessor.getVertexConfName( + TestProcessor.TEZ_FAILING_PROCESSOR_DO_FAIL, "v1"), true); + testConf.set(TestProcessor.getVertexConfName( + TestProcessor.TEZ_FAILING_PROCESSOR_FAILING_TASK_INDEX, "v1"), "0"); + testConf.setInt(TestProcessor.getVertexConfName( + TestProcessor.TEZ_FAILING_PROCESSOR_FAILING_UPTO_TASK_ATTEMPT, "v1"), 0); + + StepCheck[] check = { + createStep("v1 : 000000_0", CriticalPathDependency.INIT_DEPENDENCY), + createStep("v1 : 000000_1", CriticalPathDependency.RETRY_DEPENDENCY), + createStep("v2 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY), + }; + DAG dag = SimpleTestDAG.createDAG("testBasicTaskFailure", testConf); + runDAG(dag, DAGStatus.State.SUCCEEDED); + return Collections.singletonList(check); + } + + private List<StepCheck[]> testTaskMultipleFailures() throws Exception { + Configuration testConf = new Configuration(false); + testConf.setInt(SimpleTestDAG.TEZ_SIMPLE_DAG_NUM_TASKS, 1); + testConf.setBoolean(TestProcessor.getVertexConfName( + TestProcessor.TEZ_FAILING_PROCESSOR_DO_FAIL, "v1"), true); + testConf.set(TestProcessor.getVertexConfName( + TestProcessor.TEZ_FAILING_PROCESSOR_FAILING_TASK_INDEX, "v1"), "0"); + testConf.setInt(TestProcessor.getVertexConfName( + TestProcessor.TEZ_FAILING_PROCESSOR_FAILING_UPTO_TASK_ATTEMPT, "v1"), 1); + + StepCheck[] check = { + createStep("v1 : 000000_0", CriticalPathDependency.INIT_DEPENDENCY), + createStep("v1 : 000000_1", CriticalPathDependency.RETRY_DEPENDENCY), + createStep("v1 : 000000_2", CriticalPathDependency.RETRY_DEPENDENCY), + createStep("v2 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY), + }; + + DAG dag = SimpleTestDAG.createDAG("testTaskMultipleFailures", testConf); + runDAG(dag, DAGStatus.State.SUCCEEDED); + return Collections.singletonList(check); + } + + private List<StepCheck[]> testBasicInputFailureWithExit() throws Exception { + Configuration testConf = new Configuration(false); + testConf.setInt(SimpleTestDAG.TEZ_SIMPLE_DAG_NUM_TASKS, 1); + testConf.setBoolean(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_DO_FAIL, "v2"), true); + testConf.setBoolean(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_DO_FAIL_AND_EXIT, "v2"), true); + testConf.set(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_FAILING_TASK_INDEX, "v2"), "0"); + testConf.set(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_FAILING_TASK_ATTEMPT, "v2"), "0"); + testConf.set(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_FAILING_INPUT_INDEX, "v2"), "0"); + + StepCheck[] check = { + createStep("v1 : 000000_0", CriticalPathDependency.INIT_DEPENDENCY), + createStep("v2 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY), + createStep("v1 : 000000_1", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY), + createStep("v2 : 000000_1", CriticalPathDependency.DATA_DEPENDENCY), + }; + + DAG dag = SimpleTestDAG.createDAG("testBasicInputFailureWithExit", testConf); + runDAG(dag, DAGStatus.State.SUCCEEDED); + return Collections.singletonList(check); + } + + private List<StepCheck[]> testBasicInputFailureWithoutExit() throws Exception { + Configuration testConf = new Configuration(false); + testConf.setInt(SimpleTestDAG.TEZ_SIMPLE_DAG_NUM_TASKS, 1); + testConf.setBoolean(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_DO_FAIL, "v2"), true); + testConf.set(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_FAILING_TASK_INDEX, "v2"), "0"); + testConf.set(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_FAILING_TASK_ATTEMPT, "v2"), "0"); + testConf.set(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_FAILING_INPUT_INDEX, "v2"), "0"); + + StepCheck[] check = { + createStep("v1 : 000000_0", CriticalPathDependency.INIT_DEPENDENCY), + createStep("v2 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY), + createStep("v1 : 000000_1", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY), + createStep("v2 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY), + }; + + DAG dag = SimpleTestDAG.createDAG("testBasicInputFailureWithoutExit", testConf); + runDAG(dag, DAGStatus.State.SUCCEEDED); + return Collections.singletonList(check); + } + + private List<StepCheck[]> testMultiVersionInputFailureWithExit() throws Exception { + Configuration testConf = new Configuration(false); + testConf.setInt(SimpleTestDAG.TEZ_SIMPLE_DAG_NUM_TASKS, 1); + testConf.setBoolean(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_DO_FAIL, "v2"), true); + testConf.setBoolean(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_DO_FAIL_AND_EXIT, "v2"), true); + testConf.set(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_FAILING_TASK_INDEX, "v2"), "0"); + testConf.set(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_FAILING_TASK_ATTEMPT, "v2"), "0,1"); + testConf.set(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_FAILING_INPUT_INDEX, "v2"), "0"); + testConf.setInt(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_FAILING_UPTO_INPUT_ATTEMPT, "v2"), 1); + + StepCheck[] check = { + createStep("v1 : 000000_0", CriticalPathDependency.INIT_DEPENDENCY), + createStep("v2 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY), + createStep("v1 : 000000_1", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY), + createStep("v2 : 000000_1", CriticalPathDependency.DATA_DEPENDENCY), + createStep("v1 : 000000_2", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY), + createStep("v2 : 000000_2", CriticalPathDependency.DATA_DEPENDENCY), + }; + + DAG dag = SimpleTestDAG.createDAG("testMultiVersionInputFailureWithExit", testConf); + runDAG(dag, DAGStatus.State.SUCCEEDED); + return Collections.singletonList(check); + } + + private List<StepCheck[]> testMultiVersionInputFailureWithoutExit() throws Exception { + Configuration testConf = new Configuration(false); + testConf.setInt(SimpleTestDAG.TEZ_SIMPLE_DAG_NUM_TASKS, 1); + testConf.setBoolean(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_DO_FAIL, "v2"), true); + testConf.set(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_FAILING_TASK_INDEX, "v2"), "0"); + testConf.set(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_FAILING_TASK_ATTEMPT, "v2"), "0"); + testConf.set(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_FAILING_INPUT_INDEX, "v2"), "0"); + testConf.setInt(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_FAILING_UPTO_INPUT_ATTEMPT, "v2"), 1); + + StepCheck[] check = { + createStep("v1 : 000000_0", CriticalPathDependency.INIT_DEPENDENCY), + createStep("v2 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY), + createStep("v1 : 000000_1", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY), + createStep("v2 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY), + createStep("v1 : 000000_2", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY), + createStep("v2 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY), + }; + + DAG dag = SimpleTestDAG.createDAG("testMultiVersionInputFailureWithoutExit", testConf); + runDAG(dag, DAGStatus.State.SUCCEEDED); + return Collections.singletonList(check); + } + + /** + * Sets configuration for cascading input failure tests that + * use SimpleTestDAG3Vertices. + * @param testConf configuration + * @param failAndExit whether input failure should trigger attempt exit + */ + private void setCascadingInputFailureConfig(Configuration testConf, + boolean failAndExit, + int numTasks) { + // v2 attempt0 succeeds. + // v2 all tasks attempt1 input0 fail up to version 0. + testConf.setInt(SimpleTestDAG3Vertices.TEZ_SIMPLE_DAG_NUM_TASKS, numTasks); + testConf.setBoolean(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_DO_FAIL, "v2"), true); + testConf.setBoolean(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_DO_FAIL_AND_EXIT, "v2"), failAndExit); + testConf.set(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_FAILING_TASK_INDEX, "v2"), "-1"); + testConf.set(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_FAILING_TASK_ATTEMPT, "v2"), "1"); + testConf.set(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_FAILING_INPUT_INDEX, "v2"), "0"); + testConf.setInt(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_FAILING_UPTO_INPUT_ATTEMPT, "v2"), + 0); + + //v3 task0 attempt0 all inputs fails up to version 0. + testConf.setBoolean(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_DO_FAIL, "v3"), true); + testConf.setBoolean(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_DO_FAIL_AND_EXIT, "v3"), failAndExit); + testConf.set(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_FAILING_TASK_INDEX, "v3"), "0"); + testConf.set(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_FAILING_TASK_ATTEMPT, "v3"), "0"); + testConf.set(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_FAILING_INPUT_INDEX, "v3"), "-1"); + testConf.setInt(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_FAILING_UPTO_INPUT_ATTEMPT, "v3"), + 0); + } + + /** + * Test cascading input failure without exit. Expecting success. + * v1 -- v2 -- v3 + * v3 all-tasks attempt0 input0 fails. Wait. Triggering v2 rerun. + * v2 task0 attempt1 input0 fails. Wait. Triggering v1 rerun. + * v1 attempt1 rerun and succeeds. v2 accepts v1 attempt1 output. v2 attempt1 succeeds. + * v3 attempt0 accepts v2 attempt1 output. + * + * AM vertex succeeded order is v1, v2, v1, v2, v3. + * @throws Exception + */ + private List<StepCheck[]> testCascadingInputFailureWithoutExitSuccess() throws Exception { + Configuration testConf = new Configuration(false); + setCascadingInputFailureConfig(testConf, false, 1); + + StepCheck[] check = { + createStep("v1 : 000000_0", CriticalPathDependency.INIT_DEPENDENCY), + createStep("v2 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY), + createStep("v3 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY), + createStep("v2 : 000000_1", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY), + createStep("v1 : 000000_1", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY), + createStep("v2 : 000000_1", CriticalPathDependency.DATA_DEPENDENCY), + createStep("v3 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY), + }; + + DAG dag = SimpleTestDAG3Vertices.createDAG( + "testCascadingInputFailureWithoutExitSuccess", testConf); + runDAG(dag, DAGStatus.State.SUCCEEDED); + return Collections.singletonList(check); + } + + /** + * Test cascading input failure with exit. Expecting success. + * v1 -- v2 -- v3 + * v3 all-tasks attempt0 input0 fails. v3 attempt0 exits. Triggering v2 rerun. + * v2 task0 attempt1 input0 fails. v2 attempt1 exits. Triggering v1 rerun. + * v1 attempt1 rerun and succeeds. v2 accepts v1 attempt1 output. v2 attempt2 succeeds. + * v3 attempt1 accepts v2 attempt2 output. + * + * AM vertex succeeded order is v1, v2, v3, v1, v2, v3. + * @throws Exception + */ + private List<StepCheck[]> testCascadingInputFailureWithExitSuccess() throws Exception { + Configuration testConf = new Configuration(false); + setCascadingInputFailureConfig(testConf, true, 1); + + StepCheck[] check = { + createStep("v1 : 000000_0", CriticalPathDependency.INIT_DEPENDENCY), + createStep("v2 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY), + createStep("v3 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY), + createStep("v2 : 000000_1", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY), + createStep("v1 : 000000_1", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY), + createStep("v2 : 000000_2", CriticalPathDependency.DATA_DEPENDENCY), + createStep("v3 : 000000_1", CriticalPathDependency.DATA_DEPENDENCY), + }; + + DAG dag = SimpleTestDAG3Vertices.createDAG( + "testCascadingInputFailureWithExitSuccess", testConf); + runDAG(dag, DAGStatus.State.SUCCEEDED); + return Collections.singletonList(check); + } + + /** + * 1 NM is running and can run 4 containers based on YARN mini cluster defaults and + * Tez defaults for AM/task memory + * v3 task0 reports read errors against both tasks of v2. This re-starts both of them. + * Now all 4 slots are occupied 1 AM + 3 tasks + * Now retries of v2 report read error against 1 task of v1. That re-starts. + * Retry of v1 task has no space - so it preempts the least priority task (current tez logic) + * v3 is preempted and re-run. Shows up on critical path as preempted failure. + * Also v1 retry attempts note show that it caused preemption of v3 + * @throws Exception + */ + private List<StepCheck[]> testInternalPreemption() throws Exception { + Configuration testConf = new Configuration(false); + setCascadingInputFailureConfig(testConf, false, 2); + + StepCheck[] check = { + createStep("v1 : 00000[01]_0", CriticalPathDependency.INIT_DEPENDENCY), + createStep("v2 : 00000[01]_0", CriticalPathDependency.DATA_DEPENDENCY), + createStep("v3 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY, + TaskAttemptTerminationCause.INTERNAL_PREEMPTION, null), + createStep("v2 : 00000[01]_1", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY), + createStep("v1 : 000000_1", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY, + null, Collections.singletonList("preemption of v3")), + createStep("v2 : 00000[01]_1", CriticalPathDependency.DATA_DEPENDENCY), + createStep("v3 : 000000_1", CriticalPathDependency.DATA_DEPENDENCY) + }; + + DAG dag = SimpleTestDAG3Vertices.createDAG( + "testInternalPreemption", testConf); + runDAG(dag, DAGStatus.State.SUCCEEDED); + return Collections.singletonList(check); + } + + /** + * Input failure of v3 causes rerun of both both v1 and v2 vertices. + * v1 v2 + * \ / + * v3 + * + * @throws Exception + */ + private List<StepCheck[]> testInputFailureCausesRerunOfTwoVerticesWithoutExit() throws Exception { + Configuration testConf = new Configuration(false); + testConf.setInt(SimpleVTestDAG.TEZ_SIMPLE_V_DAG_NUM_TASKS, 1); + testConf.setBoolean(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_DO_FAIL, "v3"), true); + testConf.setBoolean(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_DO_FAIL_AND_EXIT, "v3"), false); + testConf.set(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_FAILING_TASK_INDEX, "v3"), "0"); + testConf.set(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_FAILING_TASK_ATTEMPT, "v3"), "0"); + testConf.set(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_FAILING_INPUT_INDEX, "v3"), "-1"); + testConf.set(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_FAILING_UPTO_INPUT_ATTEMPT, "v3"), "1"); + + StepCheck[] check = { + // use regex for either vertices being possible on the path + createStep("v[12] : 000000_0", CriticalPathDependency.INIT_DEPENDENCY), + createStep("v3 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY), + createStep("v[12] : 000000_[01]", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY), + createStep("v3 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY), + createStep("v[12] : 000000_[012]", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY), + createStep("v3 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY), + createStep("v[12] : 000000_[12]", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY), + createStep("v3 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY), + createStep("v[12] : 000000_2", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY), + createStep("v3 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY), + }; + + DAG dag = SimpleVTestDAG.createDAG( + "testInputFailureCausesRerunOfTwoVerticesWithoutExit", testConf); + runDAG(dag, DAGStatus.State.SUCCEEDED); + return Collections.singletonList(check); + } + + /** + * Downstream(v3) attempt failure of a vertex connected with + * 2 upstream vertices.. + * v1 v2 + * \ / + * v3 + * + * @throws Exception + */ + private List<StepCheck[]> testAttemptOfDownstreamVertexConnectedWithTwoUpstreamVerticesFailure() + throws Exception { + Configuration testConf = new Configuration(false); + testConf.setInt(SimpleVTestDAG.TEZ_SIMPLE_V_DAG_NUM_TASKS, 1); + testConf.setBoolean(TestProcessor.getVertexConfName( + TestProcessor.TEZ_FAILING_PROCESSOR_DO_FAIL, "v3"), true); + testConf.set(TestProcessor.getVertexConfName( + TestProcessor.TEZ_FAILING_PROCESSOR_FAILING_TASK_INDEX, "v3"), "0"); + testConf.setInt(TestProcessor.getVertexConfName( + TestProcessor.TEZ_FAILING_PROCESSOR_FAILING_UPTO_TASK_ATTEMPT, "v3"), 1); + + StepCheck[] check = { + // use regex for either vertices being possible on the path + createStep("v[12] : 000000_0", CriticalPathDependency.INIT_DEPENDENCY), + createStep("v3 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY), + createStep("v3 : 000000_1", CriticalPathDependency.RETRY_DEPENDENCY), + createStep("v3 : 000000_2", CriticalPathDependency.RETRY_DEPENDENCY), + }; + + DAG dag = SimpleVTestDAG.createDAG( + "testAttemptOfDownstreamVertexConnectedWithTwoUpstreamVerticesFailure", testConf); + runDAG(dag, DAGStatus.State.SUCCEEDED); + return Collections.singletonList(check); + } + + /** + * Input failure of v2,v3 trigger v1 rerun. + * Both v2 and v3 report error on v1 and dont exit. So one of them triggers next + * version of v1 and also consume the output of the next version. While the other + * consumes the output of the next version of v1. + * Reruns can send output to 2 downstream vertices. + * v1 + * / \ + * v2 v3 + * + * Also covers multiple consumer vertices report failure against same producer task. + * @throws Exception + */ + private List<StepCheck[]> testInputFailureRerunCanSendOutputToTwoDownstreamVertices() throws Exception { + Configuration testConf = new Configuration(false); + testConf.setInt(SimpleReverseVTestDAG.TEZ_SIMPLE_REVERSE_V_DAG_NUM_TASKS, 1); + testConf.setBoolean(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_DO_FAIL, "v2"), true); + testConf.setBoolean(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_DO_FAIL_AND_EXIT, "v2"), false); + testConf.set(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_FAILING_TASK_INDEX, "v2"), "-1"); + testConf.set(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_FAILING_TASK_ATTEMPT, "v2"), "0"); + testConf.set(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_FAILING_INPUT_INDEX, "v2"), "-1"); + testConf.set(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_FAILING_UPTO_INPUT_ATTEMPT, "v2"), "0"); + + testConf.setBoolean(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_DO_FAIL, "v3"), true); + testConf.setBoolean(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_DO_FAIL_AND_EXIT, "v3"), false); + testConf.set(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_FAILING_TASK_INDEX, "v3"), "-1"); + testConf.set(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_FAILING_TASK_ATTEMPT, "v3"), "0"); + testConf.set(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_FAILING_INPUT_INDEX, "v3"), "-1"); + testConf.set(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_FAILING_UPTO_INPUT_ATTEMPT, "v3"), "0"); + + StepCheck[] check = { + // use regex for either vertices being possible on the path + createStep("v1 : 000000_0", CriticalPathDependency.INIT_DEPENDENCY), + createStep("v[23] : 000000_0", CriticalPathDependency.DATA_DEPENDENCY), + createStep("v1 : 000000_1", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY), + createStep("v[23] : 000000_0", CriticalPathDependency.DATA_DEPENDENCY), + }; + DAG dag = SimpleReverseVTestDAG.createDAG( + "testInputFailureRerunCanSendOutputToTwoDownstreamVertices", testConf); + runDAG(dag, DAGStatus.State.SUCCEEDED); + return Collections.singletonList(check); + } + +} http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-tools/analyzers/pom.xml ---------------------------------------------------------------------- diff --git a/tez-tools/analyzers/pom.xml b/tez-tools/analyzers/pom.xml new file mode 100644 index 0000000..afc70a6 --- /dev/null +++ b/tez-tools/analyzers/pom.xml @@ -0,0 +1,51 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. See accompanying LICENSE file. +--> + +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.tez</groupId> + <artifactId>tez-tools</artifactId> + <version>0.7.1-SNAPSHOT</version> + </parent> + <artifactId>tez-perf-analyzer</artifactId> + <packaging>pom</packaging> + + <profiles> + <profile> + <id>hadoop24</id> + <activation> + <activeByDefault>false</activeByDefault> + </activation> + <modules> + <module>job-analyzer</module> + </modules> + </profile> + <profile> + <id>hadoop26</id> + <activation> + <property> + <name>!skipATS</name> + </property> + </activation> + <modules> + <module>job-analyzer</module> + </modules> + </profile> + </profiles> + +</project> http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-tools/pom.xml ---------------------------------------------------------------------- diff --git a/tez-tools/pom.xml b/tez-tools/pom.xml index f487fcb..1d6f929 100644 --- a/tez-tools/pom.xml +++ b/tez-tools/pom.xml @@ -26,6 +26,10 @@ <artifactId>tez-tools</artifactId> <packaging>pom</packaging> + <modules> + <module>analyzers</module> + </modules> + <build> <plugins> <plugin>
