Repository: tez Updated Branches: refs/heads/master c35e5cc86 -> fede4c771
TEZ-2888. Make critical path calculation resilient to AM crash (bikas) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/fede4c77 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/fede4c77 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/fede4c77 Branch: refs/heads/master Commit: fede4c7713bfa34c825fbc9dc532475f98b67e2e Parents: c35e5cc Author: Bikas Saha <[email protected]> Authored: Tue Oct 27 06:37:28 2015 -0700 Committer: Bikas Saha <[email protected]> Committed: Tue Oct 27 06:37:28 2015 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../parser/datamodel/TaskAttemptInfo.java | 14 ++ .../history/parser/datamodel/VertexInfo.java | 20 +- .../analyzer/plugins/CriticalPathAnalyzer.java | 185 +++++++++++++++++-- .../org/apache/tez/analyzer/utils/SVGUtils.java | 8 +- 5 files changed, 202 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/fede4c77/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 69746ac..02df677 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -6,6 +6,7 @@ Release 0.8.2: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-2888. Make critical path calculation resilient to AM crash TEZ-2899. Tez UI: DAG getting created with huge horizontal gap in between vertices TEZ-2907. NPE in IFile.Reader.getLength during final merge operation TEZ-2903. Stop using proprietary APIs in RPCLoadGen. http://git-wip-us.apache.org/repos/asf/tez/blob/fede4c77/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java index acbefea..d373513 100644 --- a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java +++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java @@ -162,6 +162,20 @@ public class TaskAttemptInfo extends BaseInfo { public final long getExecutionTimeInterval() { return executionTimeInterval; } + + public final long getPostDataExecutionTimeInterval() { + if (getStartTime() > 0 && getFinishTime() > 0) { + // start time defaults to the actual start time + long postDataStartTime = startTime; + if (getLastDataEvents() != null && !getLastDataEvents().isEmpty()) { + // if last data event is after the start time then use last data event time + long lastEventTime = getLastDataEvents().get(getLastDataEvents().size()-1).getTimestamp(); + postDataStartTime = startTime > lastEventTime ? startTime : lastEventTime; + } + return (getFinishTime() - postDataStartTime); + } + return -1; + } public final long getAllocationToEndTimeInterval() { return (endTime - allocationTime); http://git-wip-us.apache.org/repos/asf/tez/blob/fede4c77/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VertexInfo.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VertexInfo.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VertexInfo.java index 7259667..50647fe 100644 --- a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VertexInfo.java +++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VertexInfo.java @@ -75,7 +75,7 @@ public class VertexInfo extends BaseInfo { private final List<AdditionalInputOutputDetails> additionalInputInfoList; private final List<AdditionalInputOutputDetails> additionalOutputInfoList; - private long avgExecutionTimeInterval = -1; + private long avgPostDataExecutionTimeInterval = -1; private DagInfo dagInfo; @@ -86,7 +86,7 @@ public class VertexInfo extends BaseInfo { jsonObject.getString(Constants.ENTITY_TYPE).equalsIgnoreCase (Constants.TEZ_VERTEX_ID)); - vertexId = StringInterner.weakIntern(jsonObject.optString(Constants.ENTITY_TYPE)); + vertexId = StringInterner.weakIntern(jsonObject.optString(Constants.ENTITY)); taskInfoMap = Maps.newHashMap(); inEdgeList = Lists.newLinkedList(); @@ -195,22 +195,26 @@ public class VertexInfo extends BaseInfo { return getLastTaskToFinish().getFinishTimeInterval(); } - public final long getAvgExecutionTimeInterval() { - if (avgExecutionTimeInterval == -1) { + public final long getAvgPostDataExecutionTimeInterval() { + if (avgPostDataExecutionTimeInterval == -1) { long totalExecutionTime = 0; long totalAttempts = 0; for (TaskInfo task : getTasks()) { TaskAttemptInfo attempt = task.getSuccessfulTaskAttempt(); if (attempt != null) { - totalExecutionTime += attempt.getExecutionTimeInterval(); - totalAttempts++; + // count only time after last data was received + long execTime = attempt.getPostDataExecutionTimeInterval(); + if (execTime >= 0) { + totalExecutionTime += execTime; + totalAttempts++; + } } } if (totalAttempts > 0) { - avgExecutionTimeInterval = Math.round(totalExecutionTime*1.0/totalAttempts); + avgPostDataExecutionTimeInterval = Math.round(totalExecutionTime*1.0/totalAttempts); } } - return avgExecutionTimeInterval; + return avgPostDataExecutionTimeInterval; } public final long getStartTime() { http://git-wip-us.apache.org/repos/asf/tez/blob/fede4c77/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java ---------------------------------------------------------------------- diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java index 4062142..d4efdf9 100644 --- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java +++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java @@ -19,6 +19,7 @@ package org.apache.tez.analyzer.plugins; import java.io.File; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -104,6 +105,9 @@ public class CriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer { Map<String, TaskAttemptInfo> attempts = Maps.newHashMap(); + int maxConcurrency = 0; + ArrayList<TimeInfo> concurrencyByTime = Lists.newArrayList(); + public CriticalPathAnalyzer() { } @@ -153,6 +157,92 @@ public class CriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer { svg.saveCriticalPathAsSVG(dagInfo, outputFileName, criticalPath); } + static class TimeInfo implements Comparable<TimeInfo> { + long timestamp; + int count; + boolean start; + TimeInfo(long timestamp, boolean start) { + this.timestamp = timestamp; + this.start = start; + } + + @Override + public int compareTo(TimeInfo o) { + return Long.compare(this.timestamp, o.timestamp); + } + + @Override + public int hashCode() { + return (int)((timestamp >> 32) ^ timestamp); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if(o == null) { + return false; + } + if (o.getClass() == this.getClass()) { + TimeInfo other = (TimeInfo) o; + return (this.compareTo(other) == 0); + } + else { + return false; + } + } + } + + private void determineConcurrency(DagInfo dag) { + ArrayList<TimeInfo> timeInfo = Lists.newArrayList(); + for (VertexInfo v : dag.getVertices()) { + for (TaskInfo t : v.getTasks()) { + for (TaskAttemptInfo a : t.getTaskAttempts()) { + if (a.getStartTime() > 0) { + timeInfo.add(new TimeInfo(a.getStartTime(), true)); + timeInfo.add(new TimeInfo(a.getFinishTime(), false)); + } + } + } + } + Collections.sort(timeInfo); + + int concurrency = 0; + TimeInfo lastTimeInfo = null; + for (TimeInfo t : timeInfo) { + concurrency += (t.start) ? 1 : -1; + maxConcurrency = (concurrency > maxConcurrency) ? concurrency : maxConcurrency; + if (lastTimeInfo == null || lastTimeInfo.timestamp < t.timestamp) { + lastTimeInfo = t; + lastTimeInfo.count = concurrency; + concurrencyByTime.add(lastTimeInfo); + } else { + // lastTimeInfo.timestamp == t.timestamp + lastTimeInfo.count = concurrency; + } + } +// for (TimeInfo t : concurrencyByTime) { +// System.out.println(t.timestamp + " " + t.count); +// } + } + + private int getIntervalMaxConcurrency(long begin, long end) { + int concurrency = 0; + for (TimeInfo timeInfo : concurrencyByTime) { + if (timeInfo.timestamp < begin) { + continue; + } + if (timeInfo.timestamp > end) { + break; + } + if (timeInfo.count > concurrency) { + concurrency = timeInfo.count; + } + } + return concurrency; + } + private void analyzeAllocationOverhead(DagInfo dag) { List<TaskAttemptInfo> preemptedAttempts = Lists.newArrayList(); for (VertexInfo v : dag.getVertices()) { @@ -175,6 +265,7 @@ public class CriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer { long creationTime = attempt.getCreationTime(); long allocationTime = attempt.getAllocationTime(); + long finishTime = attempt.getFinishTime(); if (allocationTime < step.startCriticalPathTime) { // allocated before it became critical continue; @@ -190,7 +281,7 @@ public class CriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer { Collections.sort(attemptsList, TaskAttemptInfo.orderingOnAllocationTime()); // walk the list to record allocation time before the current attempt long containerPreviousAllocatedTime = 0; - int wavesForVertex = 1; + int reUsesForVertex = 1; for (TaskAttemptInfo containerAttempt : attemptsList) { if (containerAttempt.getTaskAttemptId().equals(attempt.getTaskAttemptId())) { break; @@ -199,14 +290,28 @@ public class CriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer { attempt.getTaskInfo().getVertexInfo().getVertexId())) { // another task from the same vertex ran in this container. So there are multiple // waves for this vertex on this container. - wavesForVertex++; + reUsesForVertex++; + } + long cAllocTime = containerAttempt.getAllocationTime(); + long cFinishTime = containerAttempt.getFinishTime(); + if (cFinishTime > creationTime) { + // for containerAttempts that used the container while this attempt was waiting + // add up time container was allocated to containerAttempt. Account for allocations + // that started before this attempt was created. + containerPreviousAllocatedTime += + (cFinishTime - (cAllocTime > creationTime ? cAllocTime : creationTime)); } - System.out.println("Container: " + container.getId() + " running att: " + - containerAttempt.getTaskAttemptId() + " wait att: " + attempt.getTaskAttemptId()); - containerPreviousAllocatedTime += containerAttempt.getAllocationToEndTimeInterval(); } - if (wavesForVertex > 1) { - step.notes.add("Container ran multiple waves for this vertex."); + int numVertexTasks = attempt.getTaskInfo().getVertexInfo().getNumTasks(); + int intervalMaxConcurrency = getIntervalMaxConcurrency(creationTime, finishTime); + double numWaves = getWaves(numVertexTasks, intervalMaxConcurrency); + + if (reUsesForVertex > 1) { + step.notes.add("Container ran multiple tasks for this vertex. "); + if (numWaves < 1) { + // less than 1 wave total but still ran more than 1 on this container + step.notes.add("Vertex potentially seeing contention from other branches in the DAG. "); + } } if (containerPreviousAllocatedTime == 0) { step.notes.add("Container newly allocated."); @@ -223,7 +328,12 @@ public class CriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer { } // look for internal preemptions while attempt was waiting for allocation for (TaskAttemptInfo a : preemptedAttempts) { - if (a.getFinishTime() > creationTime && a.getFinishTime() < allocationTime){ + if (a.getTaskInfo().getVertexInfo().getVertexId() + .equals(attempt.getTaskInfo().getVertexInfo().getVertexId())) { + // dont preempt same vertex task. ideally this should look at priority but we dont have it + continue; + } + if (a.getFinishTime() > creationTime && a.getFinishTime() < allocationTime) { // found an attempt that was preempted within this time interval step.notes.add("Potentially waited for preemption of " + a.getShortName()); } @@ -232,6 +342,41 @@ public class CriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer { } } + private double getWaves(int numTasks, int concurrency) { + double numWaves = (numTasks*1.0) / concurrency; + numWaves = (double)Math.round(numWaves * 10d) / 10d; // convert to 1 decimal place + return numWaves; + } + + private void analyzeWaves(DagInfo dag) { + for (int i = 0; i < criticalPath.size(); ++i) { + CriticalPathStep step = criticalPath.get(i); + TaskAttemptInfo attempt = step.attempt; + if (step.getType() != EntityType.ATTEMPT) { + continue; + } + long creationTime = attempt.getCreationTime(); + long finishTime = attempt.getFinishTime(); + + int numVertexTasks = attempt.getTaskInfo().getVertexInfo().getNumTasks(); + if (numVertexTasks <= 1) { + continue; + } + int intervalMaxConcurrency = getIntervalMaxConcurrency(creationTime, finishTime); + double numWaves = getWaves(numVertexTasks, intervalMaxConcurrency); + + step.notes.add("Vertex ran " + numVertexTasks + + " tasks in " + numWaves + + " waves with available concurrency of " + intervalMaxConcurrency); + if (numWaves > 1) { + if (numWaves%1 < 0.5) { + // more than 1 wave needed and last wave is small + step.notes.add("Last partial wave did not use full concurrency. "); + } + } + } + } + private void analyzeStragglers(DagInfo dag) { long dagStartTime = dag.getStartTime(); long dagTime = dag.getFinishTime() - dagStartTime; @@ -246,17 +391,18 @@ public class CriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer { // there were read errors. that could have delayed the attempt. ignore this continue; } - long avgExecutionTime = attempt.getTaskInfo().getVertexInfo() - .getAvgExecutionTimeInterval(); - if (avgExecutionTime <= 0) { + long avgPostDataExecutionTime = attempt.getTaskInfo().getVertexInfo() + .getAvgPostDataExecutionTimeInterval(); + if (avgPostDataExecutionTime <= 0) { continue; } - if (avgExecutionTime * 1.25 < attempt.getExecutionTimeInterval()) { + long attemptExecTime = attempt.getPostDataExecutionTimeInterval(); + if (avgPostDataExecutionTime * 1.25 < attemptExecTime) { step.notes - .add("Potential straggler. Execution time " + - SVGUtils.getTimeStr(attempt.getExecutionTimeInterval()) + .add("Potential straggler. Post Data Execution time " + + SVGUtils.getTimeStr(attemptExecTime) + " compared to vertex average of " + - SVGUtils.getTimeStr(avgExecutionTime)); + SVGUtils.getTimeStr(avgPostDataExecutionTime)); } } } @@ -267,7 +413,9 @@ public class CriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer { private void analyzeCriticalPath(DagInfo dag) { if (!criticalPath.isEmpty()) { + determineConcurrency(dag); analyzeStragglers(dag); + analyzeWaves(dag); analyzeAllocationOverhead(dag); } } @@ -281,7 +429,12 @@ public class CriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer { long currentAttemptStopCriticalPathTime = lastAttemptFinishTime; // add the commit step - currentStep.stopCriticalPathTime = dagInfo.getFinishTime(); + if (dagInfo.getFinishTime() > 0) { + currentStep.stopCriticalPathTime = dagInfo.getFinishTime(); + } else { + // AM crashed and no dag finished written + currentStep.stopCriticalPathTime = currentAttemptStopCriticalPathTime; + } currentStep.startCriticalPathTime = currentAttemptStopCriticalPathTime; currentStep.reason = CriticalPathDependency.COMMIT_DEPENDENCY; tempCP.add(currentStep); http://git-wip-us.apache.org/repos/asf/tez/blob/fede4c77/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/SVGUtils.java ---------------------------------------------------------------------- diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/SVGUtils.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/SVGUtils.java index 2e94ec0..78cb921 100644 --- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/SVGUtils.java +++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/SVGUtils.java @@ -219,11 +219,15 @@ public class SVGUtils { } private void drawCritical(DagInfo dagInfo, List<CriticalPathStep> criticalPath) { - int duration = (int) dagInfo.getFinishTimeInterval(); - MAX_DAG_RUNTIME = duration; long dagStartTime = dagInfo.getStartTime(); int dagStartTimeInterval = 0; // this is 0 since we are offseting from the dag start time int dagFinishTimeInterval = (int) (dagInfo.getFinishTime() - dagStartTime); + if (dagInfo.getFinishTime() <= 0) { + // AM crashed. no dag finish time written + dagFinishTimeInterval =(int) (criticalPath.get(criticalPath.size()-1).getStopCriticalTime() + - dagStartTime); + } + MAX_DAG_RUNTIME = dagFinishTimeInterval; // draw grid addLineStr(dagStartTimeInterval, 0, dagFinishTimeInterval, 0, BORDER_COLOR, "", TICK);
