Repository: tez Updated Branches: refs/heads/master e5a79fd44 -> 5ba6cf9d3
TEZ-2739. Improve handling of read errors in critical path analyzer (bikas) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/5ba6cf9d Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/5ba6cf9d Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/5ba6cf9d Branch: refs/heads/master Commit: 5ba6cf9d3cc2812497d939322003ec05227bd672 Parents: e5a79fd Author: Bikas Saha <[email protected]> Authored: Mon Aug 31 11:10:57 2015 -0700 Committer: Bikas Saha <[email protected]> Committed: Mon Aug 31 11:10:57 2015 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/tez/history/ATSImportTool.java | 2 +- .../tez/history/parser/datamodel/TaskInfo.java | 9 +- tez-tools/analyzers/job-analyzer/pom.xml | 109 ++- .../analyzer/plugins/CriticalPathAnalyzer.java | 138 +++- .../org/apache/tez/analyzer/utils/SVGUtils.java | 24 +- .../org/apache/tez/analyzer/TestAnalyzer.java | 662 +++++++++++++++++++ 7 files changed, 901 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/5ba6cf9d/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index e145916..029d776 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -11,6 +11,7 @@ ALL CHANGES: same name TEZ-2747. Update master to reflect 0.8.0-alpha release. TEZ-2662. Provide a way to check whether AM or task opts are valid and error if not. + TEZ-2739. Improve handling of read errors in critical path analyzer Release 0.8.0-alpha: 2015-08-29 http://git-wip-us.apache.org/repos/asf/tez/blob/5ba6cf9d/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/ATSImportTool.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/ATSImportTool.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/ATSImportTool.java index 0e53d27..737df76 100644 --- a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/ATSImportTool.java +++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/ATSImportTool.java @@ -433,7 +433,7 @@ public class ATSImportTool extends Configured implements Tool { } @VisibleForTesting - static int process(String[] args) { + public static int process(String[] args) { Options options = buildOptions(); int result = -1; try { http://git-wip-us.apache.org/repos/asf/tez/blob/5ba6cf9d/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskInfo.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskInfo.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskInfo.java index 7a89166..c6f89d6 100644 --- a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskInfo.java +++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskInfo.java @@ -20,6 +20,7 @@ package org.apache.tez.history.parser.datamodel; import com.google.common.base.Preconditions; import com.google.common.base.Predicate; +import com.google.common.base.Strings; import com.google.common.collect.Iterables; import com.google.common.collect.LinkedHashMultimap; import com.google.common.collect.Lists; @@ -205,14 +206,14 @@ public class TaskInfo extends BaseInfo { * @return TaskAttemptInfo */ public final TaskAttemptInfo getSuccessfulTaskAttempt() { - if (isNotNullOrEmpty(getSuccessfulAttemptId())) { + if (!Strings.isNullOrEmpty(getSuccessfulAttemptId())) { for (TaskAttemptInfo attemptInfo : getTaskAttempts()) { if (attemptInfo.getTaskAttemptId().equals(getSuccessfulAttemptId())) { return attemptInfo; } } } - // fall back to checking status if successful attemt id is not available + // fall back to checking status if successful attempt id is not available for (TaskAttemptInfo attemptInfo : getTaskAttempts()) { if (attemptInfo.getStatus().equalsIgnoreCase(TaskAttemptState.SUCCEEDED.toString())) { return attemptInfo; @@ -350,8 +351,4 @@ public class TaskInfo extends BaseInfo { sb.append("]"); return sb.toString(); } - - private static boolean isNotNullOrEmpty(String str) { - return str != null && !str.isEmpty(); - } } http://git-wip-us.apache.org/repos/asf/tez/blob/5ba6cf9d/tez-tools/analyzers/job-analyzer/pom.xml ---------------------------------------------------------------------- diff --git a/tez-tools/analyzers/job-analyzer/pom.xml b/tez-tools/analyzers/job-analyzer/pom.xml index 13cf48d..37a06bf 100644 --- a/tez-tools/analyzers/job-analyzer/pom.xml +++ b/tez-tools/analyzers/job-analyzer/pom.xml @@ -26,10 +26,6 @@ <dependencies> <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-common</artifactId> - </dependency> - <dependency> <groupId>io.dropwizard.metrics</groupId> <artifactId>metrics-core</artifactId> </dependency> @@ -39,9 +35,114 @@ <version>${project.version}</version> </dependency> <dependency> + <groupId>org.apache.tez</groupId> + <artifactId>tez-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.tez</groupId> + <artifactId>tez-common</artifactId> + </dependency> + <dependency> + <groupId>org.apache.tez</groupId> + <artifactId>tez-dag</artifactId> + </dependency> + <dependency> + <groupId>org.apache.tez</groupId> + <artifactId>tez-tests</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.tez</groupId> + <artifactId>tez-tests</artifactId> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.tez</groupId> + <artifactId>tez-yarn-timeline-history</artifactId> + </dependency> + <dependency> + <groupId>org.apache.tez</groupId> + <artifactId>tez-yarn-timeline-history</artifactId> + <scope>test</scope> + <type>test-jar</type> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-common</artifactId> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-client</artifactId> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-server-tests</artifactId> + <scope>test</scope> + <type>test-jar</type> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs</artifactId> + <scope>test</scope> + <type>test-jar</type> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <scope>test</scope> + <type>test-jar</type> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-core</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-common</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-jobclient</artifactId> + <scope>test</scope> + <type>test-jar</type> + </dependency> + <dependency> + <groupId>org.codehaus.jettison</groupId> + <artifactId>jettison</artifactId> + </dependency> + <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-json</artifactId> + </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/tez/blob/5ba6cf9d/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java ---------------------------------------------------------------------- diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java index 448e785..c8d4225 100644 --- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java +++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java @@ -33,6 +33,7 @@ import org.apache.tez.analyzer.plugins.CriticalPathAnalyzer.CriticalPathStep.Ent import org.apache.tez.analyzer.utils.SVGUtils; import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.oldrecords.TaskAttemptState; +import org.apache.tez.dag.records.TaskAttemptTerminationCause; import org.apache.tez.history.parser.datamodel.Container; import org.apache.tez.history.parser.datamodel.DagInfo; import org.apache.tez.history.parser.datamodel.TaskAttemptInfo; @@ -49,11 +50,15 @@ public class CriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer { String succeededState = StringInterner.weakIntern(TaskAttemptState.SUCCEEDED.name()); String failedState = StringInterner.weakIntern(TaskAttemptState.FAILED.name()); - private final static String DATA_DEPENDENCY = "Data-Dependency"; - private final static String INIT_DEPENDENCY = "Init-Dependency"; - private final static String COMMIT_DEPENDENCY = "Commit-Dependency"; - private final static String NON_DATA_DEPENDENCY = "Non-Data-Dependency"; - private final static String OUTPUT_LOST = "Previous version outputs lost"; + public enum CriticalPathDependency { + DATA_DEPENDENCY, + INIT_DEPENDENCY, + COMMIT_DEPENDENCY, + RETRY_DEPENDENCY, + OUTPUT_RECREATE_DEPENDENCY + } + + public static final String DRAW_SVG = "tez.critical-path-analyzer.draw-svg"; public static class CriticalPathStep { public enum EntityType { @@ -64,7 +69,7 @@ public class CriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer { EntityType type; TaskAttemptInfo attempt; - String reason; // reason linking this to the previous step on the critical path + CriticalPathDependency reason; // reason linking this to the previous step on the critical path long startCriticalPathTime; // time at which attempt is on critical path long stopCriticalPathTime; // time at which attempt is off critical path List<String> notes = Lists.newLinkedList(); @@ -85,7 +90,7 @@ public class CriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer { public long getStopCriticalTime() { return stopCriticalPathTime; } - public String getReason() { + public CriticalPathDependency getReason() { return reason; } public List<String> getNotes() { @@ -128,7 +133,13 @@ public class CriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer { analyzeCriticalPath(dagInfo); - saveCriticalPathAsSVG(dagInfo); + if (getConf().getBoolean(DRAW_SVG, true)) { + saveCriticalPathAsSVG(dagInfo); + } + } + + public List<CriticalPathStep> getCriticalPath() { + return criticalPath; } private void saveCriticalPathAsSVG(DagInfo dagInfo) { @@ -212,7 +223,7 @@ public class CriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer { // add the commit step currentStep.stopCriticalPathTime = dagInfo.getFinishTime(); currentStep.startCriticalPathTime = currentAttemptStopCriticalPathTime; - currentStep.reason = COMMIT_DEPENDENCY; + currentStep.reason = CriticalPathDependency.COMMIT_DEPENDENCY; tempCP.add(currentStep); while (true) { @@ -233,7 +244,7 @@ public class CriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer { long startCriticalPathTime = 0; String nextAttemptId = null; - String reason = null; + CriticalPathDependency reason = null; if (dataDependency) { // last data event was produced after the attempt was scheduled. use // data dependency @@ -242,7 +253,7 @@ public class CriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer { if (!Strings.isNullOrEmpty(currentAttempt.getLastDataEventSourceTA())) { // there is a valid data causal TA. Use it. nextAttemptId = currentAttempt.getLastDataEventSourceTA(); - reason = DATA_DEPENDENCY; + reason = CriticalPathDependency.DATA_DEPENDENCY; startCriticalPathTime = currentAttempt.getLastDataEventTime(); System.out.println("Using data dependency " + nextAttemptId); } else { @@ -252,7 +263,7 @@ public class CriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer { "Vertex: " + vertex.getVertexId() + " has no external inputs but the last data event " + "TA is null for " + currentAttempt.getTaskAttemptId()); nextAttemptId = null; - reason = INIT_DEPENDENCY; + reason = CriticalPathDependency.INIT_DEPENDENCY; System.out.println("Using init dependency"); } } else { @@ -262,9 +273,9 @@ public class CriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer { if (!Strings.isNullOrEmpty(currentAttempt.getCreationCausalTA())) { // there is a scheduling causal TA. Use it. nextAttemptId = currentAttempt.getCreationCausalTA(); - reason = NON_DATA_DEPENDENCY; + reason = CriticalPathDependency.RETRY_DEPENDENCY; TaskAttemptInfo nextAttempt = attempts.get(nextAttemptId); - if (nextAttempt != null) { + if (nextAttemptId != null) { VertexInfo currentVertex = currentAttempt.getTaskInfo().getVertexInfo(); VertexInfo nextVertex = nextAttempt.getTaskInfo().getVertexInfo(); if (!nextVertex.getVertexName().equals(currentVertex.getVertexName())){ @@ -272,7 +283,7 @@ public class CriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer { for (VertexInfo outVertex : currentVertex.getOutputVertices()) { if (nextVertex.getVertexName().equals(outVertex.getVertexName())) { // next vertex is an output vertex - reason = OUTPUT_LOST; + reason = CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY; break; } } @@ -286,7 +297,7 @@ public class CriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer { // there is a data event going to the vertex. Count the time between data event and // scheduling time as Initializer/Manager overhead and follow data dependency nextAttemptId = currentAttempt.getLastDataEventSourceTA(); - reason = DATA_DEPENDENCY; + reason = CriticalPathDependency.DATA_DEPENDENCY; startCriticalPathTime = currentAttempt.getLastDataEventTime(); long overhead = currentAttempt.getCreationTime() - currentAttempt.getLastDataEventTime(); @@ -299,17 +310,102 @@ public class CriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer { // or the vertex has external input but does not use events // or the vertex has no external inputs or edges nextAttemptId = null; - reason = INIT_DEPENDENCY; + reason = CriticalPathDependency.INIT_DEPENDENCY; System.out.println("Using init dependency"); } } } - + + + if (!Strings.isNullOrEmpty(nextAttemptId)) { + TaskAttemptInfo nextAttempt = attempts.get(nextAttemptId); + TaskAttemptInfo attemptToCheck = nextAttempt; + + // check if the next attempt is already on critical path to prevent infinite loop + boolean foundLoop = false; + CriticalPathDependency prevReason = null; + for (CriticalPathStep previousStep : tempCP) { + if (previousStep.attempt.equals(attemptToCheck)) { + foundLoop = true; + prevReason = previousStep.reason; + } + } + + if (foundLoop) { + // found a loop - find the next step based on heuristics + /* only the losing outputs causes us to backtrack. There are 2 cases + * 1) Step N reported last data event to this step + * -> Step N+1 (current step) is the retry for read error reported + * -> read error was reported by the Step N attempt and it did not exit after the + * error + * -> So scheduling dependency of Step N points back to step N+1 + * 2) Step N reported last data event to this step + * -> Step N+1 is a retry for a read error reported + * -> Step N+2 is the attempt that reported the read error + * -> Step N+3 is the last data event of N+2 and points back to N+1 + */ + System.out.println("Reset " + currentAttempt.getTaskAttemptId() + + " cause: " + currentAttempt.getTerminationCause() + + " time: " + currentAttempt.getFinishTime() + + " reason: " + reason + + " because of: " + attemptToCheck.getTaskAttemptId()); + TaskAttemptInfo attemptWithLostAncestor = currentAttempt; + if (reason != CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY) { + // Case 2 above. If reason == CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY + // then its Case 1 above + Preconditions.checkState(prevReason.equals( + CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY), prevReason); + reason = CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY; + attemptWithLostAncestor = nextAttempt; + } + System.out.println("Reset " + currentAttempt.getTaskAttemptId() + + " cause: " + currentAttempt.getTerminationCause() + + " time: " + currentAttempt.getFinishTime() + + " reason: " + reason + + " because of: " + attemptToCheck.getTaskAttemptId() + + " looking at: " + attemptWithLostAncestor.getTaskAttemptId()); + Preconditions.checkState(reason == CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY); + // we dont track all input events to the consumer. So just jump to + // the previous successful version of the current attempt + TaskAttemptInfo prevSuccAttempt = null; + for (TaskAttemptInfo prevAttempt : attemptWithLostAncestor.getTaskInfo().getTaskAttempts()) { + System.out.println("Looking at " + prevAttempt.getTaskAttemptId() + + " cause: " + prevAttempt.getTerminationCause() + + " time: " + prevAttempt.getFinishTime()); + if (prevAttempt.getTerminationCause() + .equals(TaskAttemptTerminationCause.OUTPUT_LOST.name())) { + if (prevAttempt.getFinishTime() < currentAttempt.getFinishTime()) { + // attempt finished before current attempt + if (prevSuccAttempt == null + || prevAttempt.getFinishTime() > prevSuccAttempt.getFinishTime()) { + // keep the latest attempt that had lost outputs + prevSuccAttempt = prevAttempt; + } + } + } + } + Preconditions.checkState(prevSuccAttempt != null, + attemptWithLostAncestor.getTaskAttemptId()); + System.out + .println("Resetting nextAttempt to : " + prevSuccAttempt.getTaskAttemptId() + + " from " + nextAttempt.getTaskAttemptId()); + nextAttemptId = prevSuccAttempt.getTaskAttemptId(); + if (attemptWithLostAncestor == currentAttempt) { + startCriticalPathTime = currentAttempt.getCreationTime(); + } else { + startCriticalPathTime = prevSuccAttempt.getFinishTime(); + } + } + + } + currentStep.startCriticalPathTime = startCriticalPathTime; currentStep.reason = reason; + + Preconditions.checkState(currentStep.stopCriticalPathTime >= currentStep.startCriticalPathTime); if (Strings.isNullOrEmpty(nextAttemptId)) { - Preconditions.checkState(reason.equals(INIT_DEPENDENCY)); + Preconditions.checkState(reason.equals(CriticalPathDependency.INIT_DEPENDENCY)); Preconditions.checkState(startCriticalPathTime == 0); // no predecessor attempt found. this is the last step in the critical path // assume attempts start critical path time is when its scheduled. before that is @@ -321,7 +417,7 @@ public class CriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer { currentStep = new CriticalPathStep(currentAttempt, EntityType.VERTEX_INIT); currentStep.stopCriticalPathTime = initStepStopCriticalTime; currentStep.startCriticalPathTime = dagInfo.getStartTime(); - currentStep.reason = INIT_DEPENDENCY; + currentStep.reason = CriticalPathDependency.INIT_DEPENDENCY; tempCP.add(currentStep); if (!tempCP.isEmpty()) { @@ -348,7 +444,7 @@ public class CriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer { String entity = (step.getType() == EntityType.ATTEMPT ? step.getAttempt().getTaskAttemptId() : (step.getType() == EntityType.VERTEX_INIT ? step.attempt.getTaskInfo().getVertexInfo().getVertexName() : "DAG COMMIT")); - String [] record = {entity, step.getReason(), + String [] record = {entity, step.getReason().name(), step.getAttempt().getDetailedStatus(), String.valueOf(step.getStartCriticalTime()), String.valueOf(step.getStopCriticalTime()), Joiner.on(";").join(step.getNotes())}; http://git-wip-us.apache.org/repos/asf/tez/blob/5ba6cf9d/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/SVGUtils.java ---------------------------------------------------------------------- diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/SVGUtils.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/SVGUtils.java index 50fe033..44408d4 100644 --- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/SVGUtils.java +++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/SVGUtils.java @@ -252,19 +252,19 @@ public class SVGUtils { // draw legend int legendX = 0; int legendY = (criticalPath.size() + 2) * STEP_GAP; - int legendWidth = 10000; + int legendWidth = dagFinishTimeInterval/5; - addRectStr(legendX, legendWidth, legendY, STEP_GAP, VERTEX_INIT_COMMIT_COLOR, BORDER_COLOR, RECT_OPACITY, ""); - addTextStr(legendX, legendY + STEP_GAP/2, "Vertex Init/Commit Overhead", "left", TEXT_SIZE, ""); - legendY += STEP_GAP; - addRectStr(legendX, legendWidth, legendY, STEP_GAP, ALLOCATION_OVERHEAD_COLOR, BORDER_COLOR, RECT_OPACITY, ""); - addTextStr(legendX, legendY + STEP_GAP/2, "Task Allocation Overhead", "left", TEXT_SIZE, ""); - legendY += STEP_GAP; - addRectStr(legendX, legendWidth, legendY, STEP_GAP, LAUNCH_OVERHEAD_COLOR, BORDER_COLOR, RECT_OPACITY, ""); - addTextStr(legendX, legendY + STEP_GAP/2, "Task Launch Overhead", "left", TEXT_SIZE, ""); - legendY += STEP_GAP; - addRectStr(legendX, legendWidth, legendY, STEP_GAP, RUNTIME_COLOR, BORDER_COLOR, RECT_OPACITY, ""); - addTextStr(legendX, legendY + STEP_GAP/2, "Task Execution Time", "left", TEXT_SIZE, ""); + addRectStr(legendX, legendWidth, legendY, STEP_GAP/2, VERTEX_INIT_COMMIT_COLOR, BORDER_COLOR, RECT_OPACITY, ""); + addTextStr(legendX, legendY + STEP_GAP/3, "Vertex Init/Commit Overhead", "left", TEXT_SIZE, ""); + legendY += STEP_GAP/2; + addRectStr(legendX, legendWidth, legendY, STEP_GAP/2, ALLOCATION_OVERHEAD_COLOR, BORDER_COLOR, RECT_OPACITY, ""); + addTextStr(legendX, legendY + STEP_GAP/3, "Task Allocation Overhead", "left", TEXT_SIZE, ""); + legendY += STEP_GAP/2; + addRectStr(legendX, legendWidth, legendY, STEP_GAP/2, LAUNCH_OVERHEAD_COLOR, BORDER_COLOR, RECT_OPACITY, ""); + addTextStr(legendX, legendY + STEP_GAP/3, "Task Launch Overhead", "left", TEXT_SIZE, ""); + legendY += STEP_GAP/2; + addRectStr(legendX, legendWidth, legendY, STEP_GAP/2, RUNTIME_COLOR, BORDER_COLOR, RECT_OPACITY, ""); + addTextStr(legendX, legendY + STEP_GAP/3, "Task Execution Time", "left", TEXT_SIZE, ""); Y_MAX += Y_BASE*2; X_MAX += X_BASE*2; http://git-wip-us.apache.org/repos/asf/tez/blob/5ba6cf9d/tez-tools/analyzers/job-analyzer/src/test/java/org/apache/tez/analyzer/TestAnalyzer.java ---------------------------------------------------------------------- diff --git a/tez-tools/analyzers/job-analyzer/src/test/java/org/apache/tez/analyzer/TestAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/test/java/org/apache/tez/analyzer/TestAnalyzer.java new file mode 100644 index 0000000..9a75461 --- /dev/null +++ b/tez-tools/analyzers/job-analyzer/src/test/java/org/apache/tez/analyzer/TestAnalyzer.java @@ -0,0 +1,662 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.analyzer; + +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.util.Collections; +import java.util.List; +import java.util.Random; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.tez.analyzer.plugins.CriticalPathAnalyzer; +import org.apache.tez.analyzer.plugins.CriticalPathAnalyzer.CriticalPathDependency; +import org.apache.tez.analyzer.plugins.CriticalPathAnalyzer.CriticalPathStep; +import org.apache.tez.analyzer.plugins.CriticalPathAnalyzer.CriticalPathStep.EntityType; +import org.apache.tez.client.TezClient; +import org.apache.tez.dag.api.DAG; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.client.DAGClient; +import org.apache.tez.dag.api.client.DAGStatus; +import org.apache.tez.dag.history.logging.ats.ATSHistoryLoggingService; +import org.apache.tez.dag.records.TezDAGID; +import org.apache.tez.history.ATSImportTool; +import org.apache.tez.history.parser.ATSFileParser; +import org.apache.tez.history.parser.datamodel.DagInfo; +import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; +import org.apache.tez.test.SimpleTestDAG; +import org.apache.tez.test.SimpleTestDAG3Vertices; +import org.apache.tez.test.TestInput; +import org.apache.tez.test.TestProcessor; +import org.apache.tez.test.dag.SimpleReverseVTestDAG; +import org.apache.tez.test.dag.SimpleVTestDAG; +import org.apache.tez.tests.MiniTezClusterWithTimeline; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Joiner; +import com.google.common.collect.Lists; + +public class TestAnalyzer { + private static final Logger LOG = LoggerFactory.getLogger(TestAnalyzer.class); + + private static String TEST_ROOT_DIR = + "target" + Path.SEPARATOR + TestAnalyzer.class.getName() + "-tmpDir"; + private static String DOWNLOAD_DIR = TEST_ROOT_DIR + Path.SEPARATOR + "download"; + private final static String SIMPLE_HISTORY_DIR = "/tmp/simplehistory/"; + + private static MiniDFSCluster dfsCluster; + private static MiniTezClusterWithTimeline miniTezCluster; + + private static Configuration conf = new Configuration(); + private static FileSystem fs; + + private static TezClient tezSession = null; + + private static int numDAGs = 0; + + @BeforeClass + public static void setupClass() throws Exception { + conf = new Configuration(); + conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_EDITS_NOEDITLOGCHANNELFLUSH, false); + EditLogFileOutputStream.setShouldSkipFsyncForTesting(true); + conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR); + dfsCluster = + new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build(); + fs = dfsCluster.getFileSystem(); + conf.set("fs.defaultFS", fs.getUri().toString()); + + setupTezCluster(); + numDAGs = 0; + } + + @AfterClass + public static void tearDownClass() throws Exception { + LOG.info("Stopping mini clusters"); + if (tezSession != null) { + tezSession.stop(); + } + if (miniTezCluster != null) { + miniTezCluster.stop(); + miniTezCluster = null; + } + if (dfsCluster != null) { + dfsCluster.shutdown(); + dfsCluster = null; + } + } + + public CriticalPathAnalyzer setupCPAnalyzer() { + Configuration analyzerConf = new Configuration(false); + analyzerConf.setBoolean(CriticalPathAnalyzer.DRAW_SVG, false); + CriticalPathAnalyzer cp = new CriticalPathAnalyzer(); + cp.setConf(analyzerConf); + return cp; + } + + public static void setupTezCluster() throws Exception { + conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_CONNECT_TIMEOUT, 3 * 1000); + conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT, 3 * 1000); + conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_FAILURES_LIMIT, 2); + + //Enable per edge counters + conf.setBoolean(TezConfiguration.TEZ_TASK_GENERATE_COUNTERS_PER_IO, true); + conf.setBoolean(TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS, true); + conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); + conf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS, ATSHistoryLoggingService + .class.getName()); + + conf.set(TezConfiguration.TEZ_SIMPLE_HISTORY_LOGGING_DIR, SIMPLE_HISTORY_DIR); + + miniTezCluster = + new MiniTezClusterWithTimeline(TestAnalyzer.class.getName(), 4, 1, 1, true); + + miniTezCluster.init(conf); + miniTezCluster.start(); + + TezConfiguration tezConf = new TezConfiguration(miniTezCluster.getConfig()); + tezConf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); + tezConf.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS, "0.0.0.0:8188"); + tezConf.setBoolean(TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS, true); + tezConf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS, + ATSHistoryLoggingService.class.getName()); + + Path remoteStagingDir = dfsCluster.getFileSystem().makeQualified(new Path(TEST_ROOT_DIR, String + .valueOf(new Random().nextInt(100000)))); + + tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, + remoteStagingDir.toString()); + tezConf.setBoolean(TezConfiguration.TEZ_AM_NODE_BLACKLISTING_ENABLED, false); + + tezSession = TezClient.create("TestFaultTolerance", tezConf, true); + tezSession.start(); + + } + + StepCheck createStep(String attempt, CriticalPathDependency reason) { + return new StepCheck(attempt, reason); + } + + class StepCheck { + String attempt; // attempt is the TaskAttemptInfo short name with regex + CriticalPathDependency reason; + StepCheck(String attempt, CriticalPathDependency reason) { + this.attempt = attempt; + this.reason = reason; + } + String getAttemptDetail() { + return attempt; + } + CriticalPathDependency getReason() { + return reason; + } + } + + DagInfo runDAGAndVerify(DAG dag, DAGStatus.State finalState, List<StepCheck[]> steps) throws Exception { + tezSession.waitTillReady(); + numDAGs++; + LOG.info("XXX Running DAG name: " + dag.getName()); + DAGClient dagClient = tezSession.submitDAG(dag); + DAGStatus dagStatus = dagClient.getDAGStatus(null); + while (!dagStatus.isCompleted()) { + LOG.info("Waiting for dag to complete. Sleeping for 500ms." + + " DAG name: " + dag.getName() + + " DAG appContext: " + dagClient.getExecutionContext() + + " Current state: " + dagStatus.getState()); + Thread.sleep(100); + dagStatus = dagClient.getDAGStatus(null); + } + + Assert.assertEquals(finalState, dagStatus.getState()); + + String dagId = TezDAGID.getInstance(tezSession.getAppMasterApplicationId(), numDAGs).toString(); + DagInfo dagInfo = getDagInfo(dagId); + + verifyCriticalPath(dagInfo, steps); + return dagInfo; + } + + DagInfo getDagInfo(String dagId) throws Exception { + // sleep for a bit to let ATS events be sent from AM + Thread.sleep(1000); + //Export the data from ATS + String[] args = { "--dagId=" + dagId, "--downloadDir=" + DOWNLOAD_DIR }; + + int result = ATSImportTool.process(args); + assertTrue(result == 0); + + //Parse ATS data and verify results + //Parse downloaded contents + File downloadedFile = new File(DOWNLOAD_DIR + + Path.SEPARATOR + dagId + + Path.SEPARATOR + dagId + ".zip"); + ATSFileParser parser = new ATSFileParser(downloadedFile); + DagInfo dagInfo = parser.getDAGData(dagId); + assertTrue(dagInfo.getDagId().equals(dagId)); + return dagInfo; + } + + void verifyCriticalPath(DagInfo dagInfo, List<StepCheck[]> stepsOptions) throws Exception { + CriticalPathAnalyzer cp = setupCPAnalyzer(); + cp.analyze(dagInfo); + + List<CriticalPathStep> criticalPath = cp.getCriticalPath(); + + for (CriticalPathStep step : criticalPath) { + LOG.info("XXX Step: " + step.getType()); + if (step.getType() == EntityType.ATTEMPT) { + LOG.info("XXX Attempt: " + step.getAttempt().getShortName() + " " + step.getAttempt().getDetailedStatus()); + } + LOG.info("XXX Reason: " + step.getReason()); + String notes = Joiner.on(";").join(step.getNotes()); + LOG.info("XXX Notes: " + notes); + } + + boolean foundMatchingLength = false; + for (StepCheck[] steps : stepsOptions) { + if (steps.length + 2 == criticalPath.size()) { + foundMatchingLength = true; + Assert.assertEquals(CriticalPathStep.EntityType.VERTEX_INIT, criticalPath.get(0).getType()); + Assert.assertEquals(criticalPath.get(1).getAttempt().getShortName(), + criticalPath.get(0).getAttempt().getShortName()); + + for (int i=1; i<criticalPath.size() - 1; ++i) { + CriticalPathStep step = criticalPath.get(i); + Assert.assertEquals(CriticalPathStep.EntityType.ATTEMPT, step.getType()); + Assert.assertTrue(steps[i-1].getAttemptDetail(), + step.getAttempt().getShortName().matches(steps[i-1].getAttemptDetail())); + //Assert.assertEquals(steps[i-1].getAttemptDetail(), step.getAttempt().getShortName()); + Assert.assertEquals(steps[i-1].getReason(), step.getReason()); + } + + Assert.assertEquals(CriticalPathStep.EntityType.DAG_COMMIT, + criticalPath.get(criticalPath.size() - 1).getType()); + break; + } + } + + Assert.assertTrue(foundMatchingLength); + + } + + @Test (timeout=60000) + public void testBasicSuccessScatterGather() throws Exception { + Configuration testConf = new Configuration(false); + testConf.setInt(SimpleTestDAG.TEZ_SIMPLE_DAG_NUM_TASKS, 1); + StepCheck[] check = { + createStep("v1 : 000000_0", CriticalPathDependency.INIT_DEPENDENCY), + createStep("v2 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY) + }; + DAG dag = SimpleTestDAG.createDAG("testBasicSuccessScatterGather", testConf); + runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED, Collections.singletonList(check)); + } + + @Test (timeout=60000) + public void testBasicTaskFailure() throws Exception { + Configuration testConf = new Configuration(false); + testConf.setInt(SimpleTestDAG.TEZ_SIMPLE_DAG_NUM_TASKS, 1); + testConf.setBoolean(TestProcessor.getVertexConfName( + TestProcessor.TEZ_FAILING_PROCESSOR_DO_FAIL, "v1"), true); + testConf.set(TestProcessor.getVertexConfName( + TestProcessor.TEZ_FAILING_PROCESSOR_FAILING_TASK_INDEX, "v1"), "0"); + testConf.setInt(TestProcessor.getVertexConfName( + TestProcessor.TEZ_FAILING_PROCESSOR_FAILING_UPTO_TASK_ATTEMPT, "v1"), 0); + + StepCheck[] check = { + createStep("v1 : 000000_0", CriticalPathDependency.INIT_DEPENDENCY), + createStep("v1 : 000000_1", CriticalPathDependency.RETRY_DEPENDENCY), + createStep("v2 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY), + }; + DAG dag = SimpleTestDAG.createDAG("testBasicTaskFailure", testConf); + runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED, Collections.singletonList(check)); + } + + @Test (timeout=60000) + public void testTaskMultipleFailures() throws Exception { + Configuration testConf = new Configuration(false); + testConf.setInt(SimpleTestDAG.TEZ_SIMPLE_DAG_NUM_TASKS, 1); + testConf.setBoolean(TestProcessor.getVertexConfName( + TestProcessor.TEZ_FAILING_PROCESSOR_DO_FAIL, "v1"), true); + testConf.set(TestProcessor.getVertexConfName( + TestProcessor.TEZ_FAILING_PROCESSOR_FAILING_TASK_INDEX, "v1"), "0"); + testConf.setInt(TestProcessor.getVertexConfName( + TestProcessor.TEZ_FAILING_PROCESSOR_FAILING_UPTO_TASK_ATTEMPT, "v1"), 1); + + StepCheck[] check = { + createStep("v1 : 000000_0", CriticalPathDependency.INIT_DEPENDENCY), + createStep("v1 : 000000_1", CriticalPathDependency.RETRY_DEPENDENCY), + createStep("v1 : 000000_2", CriticalPathDependency.RETRY_DEPENDENCY), + createStep("v2 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY), + }; + + DAG dag = SimpleTestDAG.createDAG("testTaskMultipleFailures", testConf); + runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED, Collections.singletonList(check)); + } + + @Test (timeout=60000) + public void testBasicInputFailureWithExit() throws Exception { + Configuration testConf = new Configuration(false); + testConf.setInt(SimpleTestDAG.TEZ_SIMPLE_DAG_NUM_TASKS, 1); + testConf.setBoolean(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_DO_FAIL, "v2"), true); + testConf.setBoolean(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_DO_FAIL_AND_EXIT, "v2"), true); + testConf.set(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_FAILING_TASK_INDEX, "v2"), "0"); + testConf.set(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_FAILING_TASK_ATTEMPT, "v2"), "0"); + testConf.set(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_FAILING_INPUT_INDEX, "v2"), "0"); + + StepCheck[] check = { + createStep("v1 : 000000_0", CriticalPathDependency.INIT_DEPENDENCY), + createStep("v2 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY), + createStep("v1 : 000000_1", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY), + createStep("v2 : 000000_1", CriticalPathDependency.DATA_DEPENDENCY), + }; + + DAG dag = SimpleTestDAG.createDAG("testBasicInputFailureWithExit", testConf); + runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED, Collections.singletonList(check)); + } + + @Test (timeout=60000) + public void testBasicInputFailureWithoutExit() throws Exception { + Configuration testConf = new Configuration(false); + testConf.setInt(SimpleTestDAG.TEZ_SIMPLE_DAG_NUM_TASKS, 1); + testConf.setBoolean(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_DO_FAIL, "v2"), true); + testConf.set(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_FAILING_TASK_INDEX, "v2"), "0"); + testConf.set(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_FAILING_TASK_ATTEMPT, "v2"), "0"); + testConf.set(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_FAILING_INPUT_INDEX, "v2"), "0"); + + StepCheck[] check = { + createStep("v1 : 000000_0", CriticalPathDependency.INIT_DEPENDENCY), + createStep("v1 : 000000_1", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY), + createStep("v2 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY), + }; + + DAG dag = SimpleTestDAG.createDAG("testBasicInputFailureWithoutExit", testConf); + runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED, Collections.singletonList(check)); + } + + @Test (timeout=60000) + public void testMultiVersionInputFailureWithExit() throws Exception { + Configuration testConf = new Configuration(false); + testConf.setInt(SimpleTestDAG.TEZ_SIMPLE_DAG_NUM_TASKS, 1); + testConf.setBoolean(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_DO_FAIL, "v2"), true); + testConf.setBoolean(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_DO_FAIL_AND_EXIT, "v2"), true); + testConf.set(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_FAILING_TASK_INDEX, "v2"), "0"); + testConf.set(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_FAILING_TASK_ATTEMPT, "v2"), "0,1"); + testConf.set(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_FAILING_INPUT_INDEX, "v2"), "0"); + testConf.setInt(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_FAILING_UPTO_INPUT_ATTEMPT, "v2"), 1); + + StepCheck[] check = { + createStep("v1 : 000000_0", CriticalPathDependency.INIT_DEPENDENCY), + createStep("v2 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY), + createStep("v1 : 000000_1", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY), + createStep("v2 : 000000_1", CriticalPathDependency.DATA_DEPENDENCY), + createStep("v1 : 000000_2", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY), + createStep("v2 : 000000_2", CriticalPathDependency.DATA_DEPENDENCY), + }; + + DAG dag = SimpleTestDAG.createDAG("testMultiVersionInputFailureWithExit", testConf); + runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED, Collections.singletonList(check)); + } + + @Test (timeout=60000) + public void testMultiVersionInputFailureWithoutExit() throws Exception { + Configuration testConf = new Configuration(false); + testConf.setInt(SimpleTestDAG.TEZ_SIMPLE_DAG_NUM_TASKS, 1); + testConf.setBoolean(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_DO_FAIL, "v2"), true); + testConf.set(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_FAILING_TASK_INDEX, "v2"), "0"); + testConf.set(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_FAILING_TASK_ATTEMPT, "v2"), "0"); + testConf.set(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_FAILING_INPUT_INDEX, "v2"), "0"); + testConf.setInt(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_FAILING_UPTO_INPUT_ATTEMPT, "v2"), 1); + + StepCheck[] check = { + createStep("v1 : 000000_0", CriticalPathDependency.INIT_DEPENDENCY), + createStep("v1 : 000000_1", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY), + createStep("v1 : 000000_2", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY), + createStep("v2 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY), + }; + + DAG dag = SimpleTestDAG.createDAG("testMultiVersionInputFailureWithoutExit", testConf); + runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED, Collections.singletonList(check)); + } + + /** + * Sets configuration for cascading input failure tests that + * use SimpleTestDAG3Vertices. + * @param testConf configuration + * @param failAndExit whether input failure should trigger attempt exit + */ + private void setCascadingInputFailureConfig(Configuration testConf, + boolean failAndExit) { + // v2 attempt0 succeeds. + // v2 task0 attempt1 input0 fails up to version 0. + testConf.setInt(SimpleTestDAG3Vertices.TEZ_SIMPLE_DAG_NUM_TASKS, 1); + testConf.setBoolean(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_DO_FAIL, "v2"), true); + testConf.setBoolean(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_DO_FAIL_AND_EXIT, "v2"), failAndExit); + testConf.set(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_FAILING_TASK_INDEX, "v2"), "0"); + testConf.set(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_FAILING_TASK_ATTEMPT, "v2"), "1"); + testConf.set(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_FAILING_INPUT_INDEX, "v2"), "0"); + testConf.setInt(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_FAILING_UPTO_INPUT_ATTEMPT, "v2"), + 0); + + //v3 all-tasks attempt0 input0 fails up to version 0. + testConf.setBoolean(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_DO_FAIL, "v3"), true); + testConf.setBoolean(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_DO_FAIL_AND_EXIT, "v3"), failAndExit); + testConf.set(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_FAILING_TASK_INDEX, "v3"), "-1"); + testConf.set(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_FAILING_TASK_ATTEMPT, "v3"), "0"); + testConf.set(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_FAILING_INPUT_INDEX, "v3"), "0"); + testConf.setInt(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_FAILING_UPTO_INPUT_ATTEMPT, "v3"), + 0); + } + + /** + * Test cascading input failure without exit. Expecting success. + * v1 -- v2 -- v3 + * v3 all-tasks attempt0 input0 fails. Wait. Triggering v2 rerun. + * v2 task0 attempt1 input0 fails. Wait. Triggering v1 rerun. + * v1 attempt1 rerun and succeeds. v2 accepts v1 attempt1 output. v2 attempt1 succeeds. + * v3 attempt0 accepts v2 attempt1 output. + * + * AM vertex succeeded order is v1, v2, v1, v2, v3. + * @throws Exception + */ + @Test (timeout=60000) + public void testCascadingInputFailureWithoutExitSuccess() throws Exception { + Configuration testConf = new Configuration(false); + setCascadingInputFailureConfig(testConf, false); + + StepCheck[] check = { + createStep("v1 : 000000_0", CriticalPathDependency.INIT_DEPENDENCY), + createStep("v1 : 000000_1", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY), + createStep("v2 : 000000_1", CriticalPathDependency.DATA_DEPENDENCY), + createStep("v3 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY), + }; + + DAG dag = SimpleTestDAG3Vertices.createDAG( + "testCascadingInputFailureWithoutExitSuccess", testConf); + runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED, Collections.singletonList(check)); + } + + /** + * Test cascading input failure with exit. Expecting success. + * v1 -- v2 -- v3 + * v3 all-tasks attempt0 input0 fails. v3 attempt0 exits. Triggering v2 rerun. + * v2 task0 attempt1 input0 fails. v2 attempt1 exits. Triggering v1 rerun. + * v1 attempt1 rerun and succeeds. v2 accepts v1 attempt1 output. v2 attempt2 succeeds. + * v3 attempt1 accepts v2 attempt2 output. + * + * AM vertex succeeded order is v1, v2, v3, v1, v2, v3. + * @throws Exception + */ + @Test (timeout=60000) + public void testCascadingInputFailureWithExitSuccess() throws Exception { + Configuration testConf = new Configuration(false); + setCascadingInputFailureConfig(testConf, true); + + StepCheck[] check = { + createStep("v1 : 000000_0", CriticalPathDependency.INIT_DEPENDENCY), + createStep("v2 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY), + createStep("v3 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY), + createStep("v2 : 000000_1", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY), + createStep("v1 : 000000_1", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY), + createStep("v2 : 000000_2", CriticalPathDependency.DATA_DEPENDENCY), + createStep("v3 : 000000_1", CriticalPathDependency.DATA_DEPENDENCY), + }; + + DAG dag = SimpleTestDAG3Vertices.createDAG( + "testCascadingInputFailureWithExitSuccess", testConf); + runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED, Collections.singletonList(check)); + } + + /** + * Input failure of v3 causes rerun of both both v1 and v2 vertices. + * v1 v2 + * \ / + * v3 + * + * @throws Exception + */ + @Test (timeout=60000) + public void testInputFailureCausesRerunOfTwoVerticesWithoutExit() throws Exception { + Configuration testConf = new Configuration(false); + testConf.setInt(SimpleVTestDAG.TEZ_SIMPLE_V_DAG_NUM_TASKS, 1); + testConf.setBoolean(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_DO_FAIL, "v3"), true); + testConf.setBoolean(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_DO_FAIL_AND_EXIT, "v3"), false); + testConf.set(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_FAILING_TASK_INDEX, "v3"), "0"); + testConf.set(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_FAILING_TASK_ATTEMPT, "v3"), "0"); + testConf.set(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_FAILING_INPUT_INDEX, "v3"), "-1"); + testConf.set(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_FAILING_UPTO_INPUT_ATTEMPT, "v3"), "1"); + + StepCheck[] check = { + // use regex for either vertices being possible on the path + createStep("v[12] : 000000_0", CriticalPathDependency.INIT_DEPENDENCY), + createStep("v[12] : 000000_1", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY), + createStep("v[12] : 000000_2", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY), + createStep("v3 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY), + }; + + DAG dag = SimpleVTestDAG.createDAG( + "testInputFailureCausesRerunOfTwoVerticesWithoutExit", testConf); + runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED, Collections.singletonList(check)); + } + + /** + * Downstream(v3) attempt failure of a vertex connected with + * 2 upstream vertices.. + * v1 v2 + * \ / + * v3 + * + * @throws Exception + */ + @Test (timeout=60000) + public void testAttemptOfDownstreamVertexConnectedWithTwoUpstreamVerticesFailure() throws Exception { + Configuration testConf = new Configuration(false); + testConf.setInt(SimpleVTestDAG.TEZ_SIMPLE_V_DAG_NUM_TASKS, 1); + testConf.setBoolean(TestProcessor.getVertexConfName( + TestProcessor.TEZ_FAILING_PROCESSOR_DO_FAIL, "v3"), true); + testConf.set(TestProcessor.getVertexConfName( + TestProcessor.TEZ_FAILING_PROCESSOR_FAILING_TASK_INDEX, "v3"), "0"); + testConf.setInt(TestProcessor.getVertexConfName( + TestProcessor.TEZ_FAILING_PROCESSOR_FAILING_UPTO_TASK_ATTEMPT, "v3"), 1); + + StepCheck[] check = { + // use regex for either vertices being possible on the path + createStep("v[12] : 000000_0", CriticalPathDependency.INIT_DEPENDENCY), + createStep("v3 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY), + createStep("v3 : 000000_1", CriticalPathDependency.RETRY_DEPENDENCY), + createStep("v3 : 000000_2", CriticalPathDependency.RETRY_DEPENDENCY), + }; + + DAG dag = SimpleVTestDAG.createDAG( + "testAttemptOfDownstreamVertexConnectedWithTwoUpstreamVerticesFailure", testConf); + runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED, Collections.singletonList(check)); + } + + /** + * Input failure of v2,v3 trigger v1 rerun. + * Both v2 and v3 report error on v1 and dont exit. So one of them triggers next + * version of v1 and also consume the output of the next version. While the other + * consumes the output of the next version of v1. + * Reruns can send output to 2 downstream vertices. + * v1 + * / \ + * v2 v3 + * + * Also covers multiple consumer vertices report failure against same producer task. + * @throws Exception + */ + @Test (timeout=60000) + public void testInputFailureRerunCanSendOutputToTwoDownstreamVertices() throws Exception { + Configuration testConf = new Configuration(false); + testConf.setInt(SimpleReverseVTestDAG.TEZ_SIMPLE_REVERSE_V_DAG_NUM_TASKS, 1); + testConf.setBoolean(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_DO_FAIL, "v2"), true); + testConf.setBoolean(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_DO_FAIL_AND_EXIT, "v2"), false); + testConf.set(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_FAILING_TASK_INDEX, "v2"), "-1"); + testConf.set(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_FAILING_TASK_ATTEMPT, "v2"), "0"); + testConf.set(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_FAILING_INPUT_INDEX, "v2"), "-1"); + testConf.set(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_FAILING_UPTO_INPUT_ATTEMPT, "v2"), "0"); + + testConf.setBoolean(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_DO_FAIL, "v3"), true); + testConf.setBoolean(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_DO_FAIL_AND_EXIT, "v3"), false); + testConf.set(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_FAILING_TASK_INDEX, "v3"), "-1"); + testConf.set(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_FAILING_TASK_ATTEMPT, "v3"), "0"); + testConf.set(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_FAILING_INPUT_INDEX, "v3"), "-1"); + testConf.set(TestInput.getVertexConfName( + TestInput.TEZ_FAILING_INPUT_FAILING_UPTO_INPUT_ATTEMPT, "v3"), "0"); + + List<StepCheck[]> stepsOptions = Lists.newLinkedList(); + StepCheck[] check1 = { + // use regex for either vertices being possible on the path + createStep("v1 : 000000_0", CriticalPathDependency.INIT_DEPENDENCY), + createStep("v[23] : 000000_0", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY), + createStep("v1 : 000000_1", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY), + createStep("v[23] : 000000_0", CriticalPathDependency.DATA_DEPENDENCY), + }; + StepCheck[] check2 = { + createStep("v1 : 000000_0", CriticalPathDependency.INIT_DEPENDENCY), + createStep("v1 : 000000_1", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY), + createStep("v[23] : 000000_0", CriticalPathDependency.DATA_DEPENDENCY), + }; + stepsOptions.add(check1); + stepsOptions.add(check2); + DAG dag = SimpleReverseVTestDAG.createDAG( + "testInputFailureRerunCanSendOutputToTwoDownstreamVertices", testConf); + runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED, stepsOptions); + } + +} \ No newline at end of file
