Repository: tez Updated Branches: refs/heads/master e1fc9cc4f -> b6f15dcdc
TEZ-2810. Support for showing allocation delays due to internal preemption (bikas) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/b6f15dcd Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/b6f15dcd Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/b6f15dcd Branch: refs/heads/master Commit: b6f15dcdcf3cae841c361983f32229858ff79f42 Parents: e1fc9cc Author: Bikas Saha <[email protected]> Authored: Fri Sep 11 17:31:34 2015 -0700 Committer: Bikas Saha <[email protected]> Committed: Fri Sep 11 17:31:34 2015 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../history/parser/datamodel/VertexInfo.java | 1 + .../analyzer/plugins/CriticalPathAnalyzer.java | 176 ++++++++++++------- .../org/apache/tez/analyzer/utils/SVGUtils.java | 48 +++-- .../org/apache/tez/analyzer/TestAnalyzer.java | 95 ++++++++-- 5 files changed, 226 insertions(+), 95 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/b6f15dcd/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 00b8282..60010d0 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ Release 0.8.1: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-2612. Support for showing allocation delays due to internal preemption TEZ-2808. Race condition between preemption and container assignment TEZ-2807. Log data in the finish event instead of the start event TEZ-2799. SimpleHistoryParser NPE http://git-wip-us.apache.org/repos/asf/tez/blob/b6f15dcd/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VertexInfo.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VertexInfo.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VertexInfo.java index 94547d4..7259667 100644 --- a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VertexInfo.java +++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VertexInfo.java @@ -403,6 +403,7 @@ public class VertexInfo extends BaseInfo { return Collections.unmodifiableList(outputVertices); } + // expensive method to call for large DAGs as it creates big lists on every call private List<TaskAttemptInfo> getTaskAttemptsInternal() { List<TaskAttemptInfo> taskAttemptInfos = Lists.newLinkedList(); for (TaskInfo taskInfo : getTasks()) { http://git-wip-us.apache.org/repos/asf/tez/blob/b6f15dcd/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java ---------------------------------------------------------------------- diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java index 350f783..3eb7701 100644 --- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java +++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java @@ -33,11 +33,13 @@ import org.apache.tez.analyzer.plugins.CriticalPathAnalyzer.CriticalPathStep.Ent import org.apache.tez.analyzer.utils.SVGUtils; import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.oldrecords.TaskAttemptState; +import org.apache.tez.dag.records.TaskAttemptTerminationCause; import org.apache.tez.history.parser.datamodel.Container; import org.apache.tez.history.parser.datamodel.DagInfo; import org.apache.tez.history.parser.datamodel.TaskAttemptInfo; import org.apache.tez.history.parser.datamodel.VertexInfo; import org.apache.tez.history.parser.datamodel.TaskAttemptInfo.DataDependencyEvent; +import org.apache.tez.history.parser.datamodel.TaskInfo; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; @@ -112,13 +114,15 @@ public class CriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer { TaskAttemptInfo lastAttempt = null; long lastAttemptFinishTime = 0; for (VertexInfo vertex : dagInfo.getVertices()) { - for (TaskAttemptInfo attempt : vertex.getTaskAttempts()) { - attempts.put(attempt.getTaskAttemptId(), attempt); - if (attempt.getStatus().equals(succeededState) || - attempt.getStatus().equals(failedState)) { - if (lastAttemptFinishTime < attempt.getFinishTime()) { - lastAttempt = attempt; - lastAttemptFinishTime = attempt.getFinishTime(); + for (TaskInfo task : vertex.getTasks()) { + for (TaskAttemptInfo attempt : task.getTaskAttempts()) { + attempts.put(attempt.getTaskAttemptId(), attempt); + if (attempt.getStatus().equals(succeededState) || + attempt.getStatus().equals(failedState)) { + if (lastAttemptFinishTime < attempt.getFinishTime()) { + lastAttempt = attempt; + lastAttemptFinishTime = attempt.getFinishTime(); + } } } } @@ -149,68 +153,112 @@ public class CriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer { svg.saveCriticalPathAsSVG(dagInfo, outputFileName, criticalPath); } - private void analyzeCriticalPath(DagInfo dag) { - if (!criticalPath.isEmpty()) { - System.out.println("Walking critical path for dag " + dag.getDagId()); - long dagStartTime = dag.getStartTime(); - long dagTime = dag.getFinishTime() - dagStartTime; - long totalAttemptCriticalTime = 0; - for (int i = 0; i < criticalPath.size(); ++i) { - CriticalPathStep step = criticalPath.get(i); - totalAttemptCriticalTime += (step.stopCriticalPathTime - step.startCriticalPathTime); - TaskAttemptInfo attempt = step.attempt; - if (step.getType() == EntityType.ATTEMPT) { - // analyze execution overhead - long avgExecutionTime = attempt.getTaskInfo().getVertexInfo() - .getAvgExecutionTimeInterval(); - if (avgExecutionTime * 1.25 < attempt.getExecutionTimeInterval()) { - step.notes - .add("Potential straggler. Execution time " + - SVGUtils.getTimeStr(attempt.getExecutionTimeInterval()) - + " compared to vertex average of " + - SVGUtils.getTimeStr(avgExecutionTime)); + private void analyzeAllocationOverhead(DagInfo dag) { + List<TaskAttemptInfo> preemptedAttempts = Lists.newArrayList(); + for (VertexInfo v : dag.getVertices()) { + for (TaskInfo t : v.getTasks()) { + for (TaskAttemptInfo a : t.getTaskAttempts()) { + if (a.getTerminationCause().equals( + TaskAttemptTerminationCause.INTERNAL_PREEMPTION.name())) { + System.out.println("Found preempted attempt " + a.getTaskAttemptId()); + preemptedAttempts.add(a); } - - if (attempt.getStartTime() > step.startCriticalPathTime) { - // the attempt is critical before launching. So allocation overhead needs analysis - // analyzer allocation overhead - Container container = attempt.getContainer(); - if (container != null) { - Collection<TaskAttemptInfo> attempts = dag.getContainerMapping().get(container); - if (attempts != null && !attempts.isEmpty()) { - // arrange attempts by allocation time - List<TaskAttemptInfo> attemptsList = Lists.newArrayList(attempts); - Collections.sort(attemptsList, TaskAttemptInfo.orderingOnAllocationTime()); - // walk the list to record allocation time before the current attempt - long containerPreviousAllocatedTime = 0; - for (TaskAttemptInfo containerAttempt : attemptsList) { - if (containerAttempt.getTaskAttemptId().equals(attempt.getTaskAttemptId())) { - break; - } - System.out.println("Container: " + container.getId() + " running att: " + - containerAttempt.getTaskAttemptId() + " wait att: " + attempt.getTaskAttemptId()); - containerPreviousAllocatedTime += containerAttempt.getAllocationToEndTimeInterval(); - } - if (containerPreviousAllocatedTime == 0) { - step.notes.add("Container " + container.getId() + " newly allocated."); - } else { - if (containerPreviousAllocatedTime >= attempt.getCreationToAllocationTimeInterval()) { - step.notes.add("Container " + container.getId() + " was fully allocated"); - } else { - step.notes.add("Container " + container.getId() + " allocated for " + - SVGUtils.getTimeStr(containerPreviousAllocatedTime) + " out of " + - SVGUtils.getTimeStr(attempt.getCreationToAllocationTimeInterval()) + - " of allocation wait time"); - } - } - } + } + } + } + for (int i = 0; i < criticalPath.size(); ++i) { + CriticalPathStep step = criticalPath.get(i); + TaskAttemptInfo attempt = step.attempt; + if (step.getType() != EntityType.ATTEMPT) { + continue; + } + + long creationTime = attempt.getCreationTime(); + long allocationTime = attempt.getAllocationTime(); + if (allocationTime < step.startCriticalPathTime) { + // allocated before it became critical + continue; + } + + // the attempt is critical before allocation. So allocation overhead needs analysis + Container container = attempt.getContainer(); + if (container != null) { + Collection<TaskAttemptInfo> attempts = dag.getContainerMapping().get(container); + if (attempts != null && !attempts.isEmpty()) { + // arrange attempts by allocation time + List<TaskAttemptInfo> attemptsList = Lists.newArrayList(attempts); + Collections.sort(attemptsList, TaskAttemptInfo.orderingOnAllocationTime()); + // walk the list to record allocation time before the current attempt + long containerPreviousAllocatedTime = 0; + for (TaskAttemptInfo containerAttempt : attemptsList) { + if (containerAttempt.getTaskAttemptId().equals(attempt.getTaskAttemptId())) { + break; + } + System.out.println("Container: " + container.getId() + " running att: " + + containerAttempt.getTaskAttemptId() + " wait att: " + attempt.getTaskAttemptId()); + containerPreviousAllocatedTime += containerAttempt.getAllocationToEndTimeInterval(); + } + if (containerPreviousAllocatedTime == 0) { + step.notes.add("Container " + container.getId() + " newly allocated."); + } else { + if (containerPreviousAllocatedTime >= attempt.getCreationToAllocationTimeInterval()) { + step.notes.add("Container " + container.getId() + " was fully allocated"); + } else { + step.notes.add("Container " + container.getId() + " allocated for " + + SVGUtils.getTimeStr(containerPreviousAllocatedTime) + " out of " + + SVGUtils.getTimeStr(attempt.getCreationToAllocationTimeInterval()) + + " of allocation wait time"); } } } + // look for internal preemptions while attempt was waiting for allocation + for (TaskAttemptInfo a : preemptedAttempts) { + if (a.getFinishTime() > creationTime && a.getFinishTime() < allocationTime){ + // found an attempt that was preempted within this time interval + step.notes.add("Potentially waited for preemption of " + a.getShortName()); + } + } } - System.out - .println("DAG time taken: " + dagTime + " TotalAttemptTime: " + totalAttemptCriticalTime - + " DAG finish time: " + dag.getFinishTime() + " DAG start time: " + dagStartTime); + } + } + + private void analyzeStragglers(DagInfo dag) { + long dagStartTime = dag.getStartTime(); + long dagTime = dag.getFinishTime() - dagStartTime; + long totalAttemptCriticalTime = 0; + for (int i = 0; i < criticalPath.size(); ++i) { + CriticalPathStep step = criticalPath.get(i); + totalAttemptCriticalTime += (step.stopCriticalPathTime - step.startCriticalPathTime); + TaskAttemptInfo attempt = step.attempt; + if (step.getType() == EntityType.ATTEMPT) { + // analyze execution overhead + if (attempt.getLastDataEvents().size() > 1) { + // there were read errors. that could have delayed the attempt. ignore this + continue; + } + long avgExecutionTime = attempt.getTaskInfo().getVertexInfo() + .getAvgExecutionTimeInterval(); + if (avgExecutionTime <= 0) { + continue; + } + if (avgExecutionTime * 1.25 < attempt.getExecutionTimeInterval()) { + step.notes + .add("Potential straggler. Execution time " + + SVGUtils.getTimeStr(attempt.getExecutionTimeInterval()) + + " compared to vertex average of " + + SVGUtils.getTimeStr(avgExecutionTime)); + } + } + } + System.out + .println("DAG time taken: " + dagTime + " TotalAttemptTime: " + totalAttemptCriticalTime + + " DAG finish time: " + dag.getFinishTime() + " DAG start time: " + dagStartTime); + } + + private void analyzeCriticalPath(DagInfo dag) { + if (!criticalPath.isEmpty()) { + analyzeStragglers(dag); + analyzeAllocationOverhead(dag); } } http://git-wip-us.apache.org/repos/asf/tez/blob/b6f15dcd/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/SVGUtils.java ---------------------------------------------------------------------- diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/SVGUtils.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/SVGUtils.java index 61b1676..88a2105 100644 --- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/SVGUtils.java +++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/SVGUtils.java @@ -50,10 +50,11 @@ public class SVGUtils { private static final int STEP_GAP = 50; private static final int TEXT_SIZE = 20; private static final String RUNTIME_COLOR = "LightGreen"; + private static final String ERROR_COLOR = "Tomato"; private static final String ALLOCATION_OVERHEAD_COLOR = "GoldenRod"; private static final String LAUNCH_OVERHEAD_COLOR = "DarkSalmon"; private static final String BORDER_COLOR = "Sienna"; - private static final String VERTEX_INIT_COMMIT_COLOR = "orange"; + private static final String VERTEX_INIT_COMMIT_COLOR = "LightSalmon"; private static final String CRITICAL_COLOR = "IndianRed"; private static final float RECT_OPACITY = 1.0f; private static final String TITLE_BR = " "; @@ -162,8 +163,10 @@ public class SVGUtils { int startCriticalTimeInterval = (int) (step.getStartCriticalTime() - dagStartTime); int stopCriticalTimeInterval = (int) (step.getStopCriticalTime() - dagStartTime); int creationTimeInterval = (int) (attempt.getCreationTime() - dagStartTime); - int allocationTimeInterval = (int) (attempt.getAllocationTime() - dagStartTime); - int launchTimeInterval = (int) (attempt.getStartTime() - dagStartTime); + int allocationTimeInterval = attempt.getAllocationTime() > 0 ? + (int) (attempt.getAllocationTime() - dagStartTime) : 0; + int launchTimeInterval = attempt.getStartTime() > 0 ? + (int) (attempt.getStartTime() - dagStartTime) : 0; int finishTimeInterval = (int) (attempt.getFinishTime() - dagStartTime); System.out.println(attempt.getTaskAttemptId() + " " + creationTimeInterval + " " + allocationTimeInterval + " " + launchTimeInterval + " " + finishTimeInterval); @@ -178,22 +181,37 @@ public class SVGUtils { title.append("Critical start at: " + getTimeStr(startCriticalTimeInterval)).append(TITLE_BR); title.append("Critical stop at: " + getTimeStr(stopCriticalTimeInterval)).append(TITLE_BR); title.append("Created at: " + getTimeStr(creationTimeInterval)).append(TITLE_BR); - title.append("Allocated at: " + getTimeStr(allocationTimeInterval)).append(TITLE_BR); - title.append("Launched at: " + getTimeStr(launchTimeInterval)).append(TITLE_BR); + if (allocationTimeInterval > 0) { + title.append("Allocated at: " + getTimeStr(allocationTimeInterval)).append(TITLE_BR); + } + if (launchTimeInterval > 0) { + title.append("Launched at: " + getTimeStr(launchTimeInterval)).append(TITLE_BR); + } title.append("Finished at: " + getTimeStr(finishTimeInterval)).append(TITLE_BR); title.append(Joiner.on(TITLE_BR).join(step.getNotes())); String titleStr = title.toString(); - addRectStr(creationTimeInterval, allocationTimeInterval - creationTimeInterval, - yOffset * STEP_GAP, STEP_GAP, ALLOCATION_OVERHEAD_COLOR, BORDER_COLOR, RECT_OPACITY, - titleStr); - - addRectStr(allocationTimeInterval, launchTimeInterval - allocationTimeInterval, - yOffset * STEP_GAP, STEP_GAP, LAUNCH_OVERHEAD_COLOR, BORDER_COLOR, RECT_OPACITY, - titleStr); - - addRectStr(launchTimeInterval, finishTimeInterval - launchTimeInterval, yOffset * STEP_GAP, - STEP_GAP, RUNTIME_COLOR, BORDER_COLOR, RECT_OPACITY, titleStr); + // handle cases when attempt fails before allocation or launch + if (allocationTimeInterval > 0) { + addRectStr(creationTimeInterval, allocationTimeInterval - creationTimeInterval, + yOffset * STEP_GAP, STEP_GAP, ALLOCATION_OVERHEAD_COLOR, BORDER_COLOR, RECT_OPACITY, + titleStr); + if (launchTimeInterval > 0) { + addRectStr(allocationTimeInterval, launchTimeInterval - allocationTimeInterval, + yOffset * STEP_GAP, STEP_GAP, LAUNCH_OVERHEAD_COLOR, BORDER_COLOR, RECT_OPACITY, + titleStr); + addRectStr(launchTimeInterval, finishTimeInterval - launchTimeInterval, yOffset * STEP_GAP, + STEP_GAP, RUNTIME_COLOR, BORDER_COLOR, RECT_OPACITY, titleStr); + } else { + // no launch - so allocate to finish drawn + addRectStr(allocationTimeInterval, finishTimeInterval - allocationTimeInterval, yOffset * STEP_GAP, + STEP_GAP, ERROR_COLOR, BORDER_COLOR, RECT_OPACITY, titleStr); + } + } else { + // no allocation - so create to finish drawn + addRectStr(creationTimeInterval, finishTimeInterval - creationTimeInterval, yOffset * STEP_GAP, + STEP_GAP, ERROR_COLOR, BORDER_COLOR, RECT_OPACITY, titleStr); + } addTextStr((finishTimeInterval + creationTimeInterval) / 2, (yOffset * STEP_GAP + STEP_GAP / 2), attempt.getShortName(), "middle", TEXT_SIZE, http://git-wip-us.apache.org/repos/asf/tez/blob/b6f15dcd/tez-tools/analyzers/job-analyzer/src/test/java/org/apache/tez/analyzer/TestAnalyzer.java ---------------------------------------------------------------------- diff --git a/tez-tools/analyzers/job-analyzer/src/test/java/org/apache/tez/analyzer/TestAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/test/java/org/apache/tez/analyzer/TestAnalyzer.java index f3a69a6..f54a15a 100644 --- a/tez-tools/analyzers/job-analyzer/src/test/java/org/apache/tez/analyzer/TestAnalyzer.java +++ b/tez-tools/analyzers/job-analyzer/src/test/java/org/apache/tez/analyzer/TestAnalyzer.java @@ -42,6 +42,7 @@ import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.client.DAGClient; import org.apache.tez.dag.api.client.DAGStatus; import org.apache.tez.dag.history.logging.ats.ATSHistoryLoggingService; +import org.apache.tez.dag.records.TaskAttemptTerminationCause; import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.history.ATSImportTool; import org.apache.tez.history.parser.ATSFileParser; @@ -136,7 +137,7 @@ public class TestAnalyzer { conf.set(TezConfiguration.TEZ_SIMPLE_HISTORY_LOGGING_DIR, SIMPLE_HISTORY_DIR); miniTezCluster = - new MiniTezClusterWithTimeline(TestAnalyzer.class.getName(), 4, 1, 1, true); + new MiniTezClusterWithTimeline(TestAnalyzer.class.getName(), 1, 1, 1, true); miniTezCluster.init(conf); miniTezCluster.start(); @@ -161,15 +162,26 @@ public class TestAnalyzer { } StepCheck createStep(String attempt, CriticalPathDependency reason) { - return new StepCheck(attempt, reason); + return createStep(attempt, reason, null, null); + } + + StepCheck createStep(String attempt, CriticalPathDependency reason, + TaskAttemptTerminationCause errCause, List<String> notes) { + return new StepCheck(attempt, reason, errCause, notes); } class StepCheck { String attempt; // attempt is the TaskAttemptInfo short name with regex CriticalPathDependency reason; - StepCheck(String attempt, CriticalPathDependency reason) { + TaskAttemptTerminationCause errCause; + List<String> notesStr; + + StepCheck(String attempt, CriticalPathDependency reason, + TaskAttemptTerminationCause cause, List<String> notes) { this.attempt = attempt; this.reason = reason; + this.errCause = cause; + this.notesStr = notes; } String getAttemptDetail() { return attempt; @@ -177,6 +189,12 @@ public class TestAnalyzer { CriticalPathDependency getReason() { return reason; } + TaskAttemptTerminationCause getErrCause() { + return errCause; + } + List<String> getNotesStr() { + return notesStr; + } } DagInfo runDAGAndVerify(DAG dag, DAGStatus.State finalState, List<StepCheck[]> steps) throws Exception { @@ -232,7 +250,8 @@ public class TestAnalyzer { for (CriticalPathStep step : criticalPath) { LOG.info("ABC Step: " + step.getType()); if (step.getType() == EntityType.ATTEMPT) { - LOG.info("ABC Attempt: " + step.getAttempt().getShortName() + " " + step.getAttempt().getDetailedStatus()); + LOG.info("ABC Attempt: " + step.getAttempt().getShortName() + + " " + step.getAttempt().getDetailedStatus()); } LOG.info("ABC Reason: " + step.getReason()); String notes = Joiner.on(";").join(step.getNotes()); @@ -248,12 +267,22 @@ public class TestAnalyzer { criticalPath.get(0).getAttempt().getShortName()); for (int i=1; i<criticalPath.size() - 1; ++i) { + StepCheck check = steps[i-1]; CriticalPathStep step = criticalPath.get(i); Assert.assertEquals(CriticalPathStep.EntityType.ATTEMPT, step.getType()); - Assert.assertTrue(steps[i-1].getAttemptDetail(), - step.getAttempt().getShortName().matches(steps[i-1].getAttemptDetail())); - //Assert.assertEquals(steps[i-1].getAttemptDetail(), step.getAttempt().getShortName()); + Assert.assertTrue(check.getAttemptDetail(), + step.getAttempt().getShortName().matches(check.getAttemptDetail())); Assert.assertEquals(steps[i-1].getReason(), step.getReason()); + if (check.getErrCause() != null) { + Assert.assertEquals(check.getErrCause(), + TaskAttemptTerminationCause.valueOf(step.getAttempt().getTerminationCause())); + } + if (check.getNotesStr() != null) { + String notes = Joiner.on("#").join(step.getNotes()); + for (String note : check.getNotesStr()) { + Assert.assertTrue(note, notes.contains(notes)); + } + } } Assert.assertEquals(CriticalPathStep.EntityType.DAG_COMMIT, @@ -435,16 +464,17 @@ public class TestAnalyzer { * @param failAndExit whether input failure should trigger attempt exit */ private void setCascadingInputFailureConfig(Configuration testConf, - boolean failAndExit) { + boolean failAndExit, + int numTasks) { // v2 attempt0 succeeds. - // v2 task0 attempt1 input0 fails up to version 0. - testConf.setInt(SimpleTestDAG3Vertices.TEZ_SIMPLE_DAG_NUM_TASKS, 1); + // v2 all tasks attempt1 input0 fail up to version 0. + testConf.setInt(SimpleTestDAG3Vertices.TEZ_SIMPLE_DAG_NUM_TASKS, numTasks); testConf.setBoolean(TestInput.getVertexConfName( TestInput.TEZ_FAILING_INPUT_DO_FAIL, "v2"), true); testConf.setBoolean(TestInput.getVertexConfName( TestInput.TEZ_FAILING_INPUT_DO_FAIL_AND_EXIT, "v2"), failAndExit); testConf.set(TestInput.getVertexConfName( - TestInput.TEZ_FAILING_INPUT_FAILING_TASK_INDEX, "v2"), "0"); + TestInput.TEZ_FAILING_INPUT_FAILING_TASK_INDEX, "v2"), "-1"); testConf.set(TestInput.getVertexConfName( TestInput.TEZ_FAILING_INPUT_FAILING_TASK_ATTEMPT, "v2"), "1"); testConf.set(TestInput.getVertexConfName( @@ -453,17 +483,17 @@ public class TestAnalyzer { TestInput.TEZ_FAILING_INPUT_FAILING_UPTO_INPUT_ATTEMPT, "v2"), 0); - //v3 all-tasks attempt0 input0 fails up to version 0. + //v3 task0 attempt0 all inputs fails up to version 0. testConf.setBoolean(TestInput.getVertexConfName( TestInput.TEZ_FAILING_INPUT_DO_FAIL, "v3"), true); testConf.setBoolean(TestInput.getVertexConfName( TestInput.TEZ_FAILING_INPUT_DO_FAIL_AND_EXIT, "v3"), failAndExit); testConf.set(TestInput.getVertexConfName( - TestInput.TEZ_FAILING_INPUT_FAILING_TASK_INDEX, "v3"), "-1"); + TestInput.TEZ_FAILING_INPUT_FAILING_TASK_INDEX, "v3"), "0"); testConf.set(TestInput.getVertexConfName( TestInput.TEZ_FAILING_INPUT_FAILING_TASK_ATTEMPT, "v3"), "0"); testConf.set(TestInput.getVertexConfName( - TestInput.TEZ_FAILING_INPUT_FAILING_INPUT_INDEX, "v3"), "0"); + TestInput.TEZ_FAILING_INPUT_FAILING_INPUT_INDEX, "v3"), "-1"); testConf.setInt(TestInput.getVertexConfName( TestInput.TEZ_FAILING_INPUT_FAILING_UPTO_INPUT_ATTEMPT, "v3"), 0); @@ -483,7 +513,7 @@ public class TestAnalyzer { @Test (timeout=60000) public void testCascadingInputFailureWithoutExitSuccess() throws Exception { Configuration testConf = new Configuration(false); - setCascadingInputFailureConfig(testConf, false); + setCascadingInputFailureConfig(testConf, false, 1); StepCheck[] check = { createStep("v1 : 000000_0", CriticalPathDependency.INIT_DEPENDENCY), @@ -514,7 +544,7 @@ public class TestAnalyzer { @Test (timeout=60000) public void testCascadingInputFailureWithExitSuccess() throws Exception { Configuration testConf = new Configuration(false); - setCascadingInputFailureConfig(testConf, true); + setCascadingInputFailureConfig(testConf, true, 1); StepCheck[] check = { createStep("v1 : 000000_0", CriticalPathDependency.INIT_DEPENDENCY), @@ -532,6 +562,39 @@ public class TestAnalyzer { } /** + * 1 NM is running and can run 4 containers based on YARN mini cluster defaults and + * Tez defaults for AM/task memory + * v3 task0 reports read errors against both tasks of v2. This re-starts both of them. + * Now all 4 slots are occupied 1 AM + 3 tasks + * Now retries of v2 report read error against 1 task of v1. That re-starts. + * Retry of v1 task has no space - so it preempts the least priority task (current tez logic) + * v3 is preempted and re-run. Shows up on critical path as preempted failure. + * Also v1 retry attempts note show that it caused preemption of v3 + * @throws Exception + */ + @Test (timeout=60000) + public void testInternalPreemption() throws Exception { + Configuration testConf = new Configuration(false); + setCascadingInputFailureConfig(testConf, false, 2); + + StepCheck[] check = { + createStep("v1 : 00000[01]_0", CriticalPathDependency.INIT_DEPENDENCY), + createStep("v2 : 00000[01]_0", CriticalPathDependency.DATA_DEPENDENCY), + createStep("v3 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY, + TaskAttemptTerminationCause.INTERNAL_PREEMPTION, null), + createStep("v2 : 00000[01]_1", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY), + createStep("v1 : 000000_1", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY, + null, Collections.singletonList("preemption of v3")), + createStep("v2 : 00000[01]_1", CriticalPathDependency.DATA_DEPENDENCY), + createStep("v3 : 000000_1", CriticalPathDependency.DATA_DEPENDENCY) + }; + + DAG dag = SimpleTestDAG3Vertices.createDAG( + "testInternalPreemption", testConf); + runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED, Collections.singletonList(check)); + } + + /** * Input failure of v3 causes rerun of both both v1 and v2 vertices. * v1 v2 * \ /
