Repository: tez Updated Branches: refs/heads/master 2e62e98ec -> 24e17a4e5
TEZ-2690. Add critical path analyser (bikas) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/24e17a4e Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/24e17a4e Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/24e17a4e Branch: refs/heads/master Commit: 24e17a4e5af7d81a08ca2019052896cf3b97f6a6 Parents: 2e62e98 Author: Bikas Saha <[email protected]> Authored: Tue Aug 25 11:10:10 2015 -0700 Committer: Bikas Saha <[email protected]> Committed: Tue Aug 25 11:10:10 2015 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../parser/datamodel/TaskAttemptInfo.java | 43 ++ .../tez/history/parser/datamodel/TaskInfo.java | 9 + .../history/parser/datamodel/VertexInfo.java | 20 +- tez-tools/analyzers/job-analyzer/pom.xml | 16 +- .../tez/analyzer/plugins/AnalyzerDriver.java | 39 ++ .../analyzer/plugins/CriticalPathAnalyzer.java | 398 ++++++++++++---- .../tez/analyzer/plugins/TezAnalyzerBase.java | 117 +++++ .../VertexLevelCriticalPathAnalyzer.java | 142 ++++++ .../org/apache/tez/analyzer/utils/SVGUtils.java | 461 ++++++++++--------- 10 files changed, 953 insertions(+), 293 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/24e17a4e/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 9fa3d33..d2c39e9 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -10,6 +10,7 @@ INCOMPATIBLE CHANGES TEZ-2468. Change the minimum Java version to Java 7. ALL CHANGES: + TEZ-2690. Add critical path analyser TEZ-2734. Add a test to verify the filename generated by OnDiskMerge. TEZ-2732. DefaultSorter throws ArrayIndex exceptions on 2047 Mb size sort buffers TEZ-2687. ATS History shutdown happens before the min-held containers are released http://git-wip-us.apache.org/repos/asf/tez/blob/24e17a4e/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java index ba676a2..ccec0db 100644 --- a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java +++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java @@ -19,7 +19,9 @@ package org.apache.tez.history.parser.datamodel; import com.google.common.base.Preconditions; +import com.google.common.base.Strings; import com.google.common.collect.Maps; +import com.google.common.collect.Ordering; import org.apache.hadoop.util.StringInterner; import org.apache.tez.common.ATSConstants; @@ -29,6 +31,7 @@ import org.apache.tez.common.counters.TezCounter; import org.codehaus.jettison.json.JSONException; import org.codehaus.jettison.json.JSONObject; +import java.util.Comparator; import java.util.Map; import static org.apache.hadoop.classification.InterfaceStability.Evolving; @@ -53,6 +56,7 @@ public class TaskAttemptInfo extends BaseInfo { private final long lastDataEventTime; private final String lastDataEventSourceTA; private final String terminationCause; + private final long executionTimeInterval; private TaskInfo taskInfo; @@ -88,6 +92,17 @@ public class TaskAttemptInfo extends BaseInfo { otherInfoNode.optString(ATSConstants.LAST_DATA_EVENT_SOURCE_TA)); terminationCause = StringInterner .weakIntern(otherInfoNode.optString(ATSConstants.TASK_ATTEMPT_ERROR_ENUM)); + executionTimeInterval = (endTime > startTime) ? (endTime - startTime) : 0; + } + + public static Ordering<TaskAttemptInfo> orderingOnAllocationTime() { + return Ordering.from(new Comparator<TaskAttemptInfo>() { + @Override + public int compare(TaskAttemptInfo o1, TaskAttemptInfo o2) { + return (o1.getAllocationTime() < o2.getAllocationTime() ? -1 + : o1.getAllocationTime() > o2.getAllocationTime() ? 1 : 0); + } + }); } void setTaskInfo(TaskInfo taskInfo) { @@ -105,6 +120,22 @@ public class TaskAttemptInfo extends BaseInfo { public final long getFinishTimeInterval() { return endTime - (getTaskInfo().getVertexInfo().getDagInfo().getStartTime()); } + + public final long getExecutionTimeInterval() { + return executionTimeInterval; + } + + public final long getAllocationToEndTimeInterval() { + return (endTime - allocationTime); + } + + public final long getAllocationToStartTimeInterval() { + return (startTime - allocationTime); + } + + public final long getCreationToAllocationTimeInterval() { + return (allocationTime - creationTime); + } public final long getStartTime() { return startTime; @@ -141,6 +172,11 @@ public class TaskAttemptInfo extends BaseInfo { public final long getAllocationTime() { return allocationTime; } + + public final String getShortName() { + return getTaskInfo().getVertexInfo().getVertexName() + " : " + + taskAttemptId.substring(taskAttemptId.lastIndexOf('_', taskAttemptId.lastIndexOf('_') - 1) + 1); + } @Override public final String getDiagnostics() { @@ -169,6 +205,13 @@ public class TaskAttemptInfo extends BaseInfo { } return false; } + + public final String getDetailedStatus() { + if (!Strings.isNullOrEmpty(getTerminationCause())) { + return getStatus() + ":" + getTerminationCause(); + } + return getStatus(); + } public final TezCounter getLocalityInfo() { Map<String, TezCounter> dataLocalTask = getCounter(DAGCounter.class.getName(), http://git-wip-us.apache.org/repos/asf/tez/blob/24e17a4e/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskInfo.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskInfo.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskInfo.java index 5e63efa..a30d311 100644 --- a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskInfo.java +++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskInfo.java @@ -28,6 +28,7 @@ import com.google.common.collect.Multimap; import com.google.common.collect.Multimaps; import com.google.common.collect.Ordering; +import org.apache.directory.api.util.Strings; import org.apache.hadoop.util.StringInterner; import org.apache.tez.dag.api.oldrecords.TaskAttemptState; import org.codehaus.jettison.json.JSONException; @@ -204,6 +205,14 @@ public class TaskInfo extends BaseInfo { * @return TaskAttemptInfo */ public final TaskAttemptInfo getSuccessfulTaskAttempt() { + if (Strings.isNotEmpty(getSuccessfulAttemptId())) { + for (TaskAttemptInfo attemptInfo : getTaskAttempts()) { + if (attemptInfo.getTaskAttemptId().equals(getSuccessfulAttemptId())) { + return attemptInfo; + } + } + } + // fall back to checking status if successful attemt id is not available for (TaskAttemptInfo attemptInfo : getTaskAttempts()) { if (attemptInfo.getStatus().equalsIgnoreCase(TaskAttemptState.SUCCEEDED.toString())) { return attemptInfo; http://git-wip-us.apache.org/repos/asf/tez/blob/24e17a4e/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VertexInfo.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VertexInfo.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VertexInfo.java index 6e227a5..35da2d4 100644 --- a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VertexInfo.java +++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VertexInfo.java @@ -74,6 +74,8 @@ public class VertexInfo extends BaseInfo { private final List<AdditionalInputOutputDetails> additionalInputInfoList; private final List<AdditionalInputOutputDetails> additionalOutputInfoList; + + private long avgExecutionTimeInterval = -1; private DagInfo dagInfo; @@ -143,7 +145,7 @@ public class VertexInfo extends BaseInfo { this.additionalInputInfoList.clear(); this.additionalInputInfoList.addAll(additionalInputInfoList); } - + void setAdditionalOutputInfoList(List<AdditionalInputOutputDetails> additionalOutputInfoList) { this.additionalOutputInfoList.clear(); this.additionalOutputInfoList.addAll(additionalOutputInfoList); @@ -192,6 +194,22 @@ public class VertexInfo extends BaseInfo { } return getLastTaskToFinish().getFinishTimeInterval(); } + + public final long getAvgExecutionTimeInterval() { + if (avgExecutionTimeInterval == -1) { + long totalExecutionTime = 0; + long totalAttempts = 0; + for (TaskInfo task : getTasks()) { + TaskAttemptInfo attempt = task.getSuccessfulTaskAttempt(); + totalExecutionTime += attempt.getExecutionTimeInterval(); + totalAttempts++; + } + if (totalAttempts > 0) { + avgExecutionTimeInterval = Math.round(totalExecutionTime*1.0/totalAttempts); + } + } + return avgExecutionTimeInterval; + } public final long getStartTime() { return startTime; http://git-wip-us.apache.org/repos/asf/tez/blob/24e17a4e/tez-tools/analyzers/job-analyzer/pom.xml ---------------------------------------------------------------------- diff --git a/tez-tools/analyzers/job-analyzer/pom.xml b/tez-tools/analyzers/job-analyzer/pom.xml index 36b12fe..543ba1b 100644 --- a/tez-tools/analyzers/job-analyzer/pom.xml +++ b/tez-tools/analyzers/job-analyzer/pom.xml @@ -42,15 +42,21 @@ <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> </dependency> - <dependency> - <groupId>org.plutext</groupId> - <artifactId>jaxb-svg11</artifactId> - <version>1.0.2</version> - </dependency> </dependencies> <build> <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <configuration> + <archive> + <manifest> + <mainClass>org.apache.tez.analyzer.plugins.AnalyzerDriver</mainClass> + </manifest> + </archive> + </configuration> + </plugin> <plugin> <groupId>org.apache.rat</groupId> <artifactId>apache-rat-plugin</artifactId> http://git-wip-us.apache.org/repos/asf/tez/blob/24e17a4e/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/AnalyzerDriver.java ---------------------------------------------------------------------- diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/AnalyzerDriver.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/AnalyzerDriver.java new file mode 100644 index 0000000..33dbead --- /dev/null +++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/AnalyzerDriver.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.analyzer.plugins; + +import org.apache.hadoop.util.ProgramDriver; + +public class AnalyzerDriver { + + public static void main(String argv[]){ + int exitCode = -1; + ProgramDriver pgd = new ProgramDriver(); + try { + pgd.addClass("CriticalPath", CriticalPathAnalyzer.class, + "Find the critical path of a DAG"); + exitCode = pgd.run(argv); + } catch(Throwable e){ + e.printStackTrace(); + } + + System.exit(exitCode); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tez/blob/24e17a4e/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java ---------------------------------------------------------------------- diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java index 88d45f3..448e785 100644 --- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java +++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java @@ -18,78 +18,342 @@ package org.apache.tez.analyzer.plugins; -import com.google.common.base.Functions; -import com.google.common.base.Splitter; -import com.google.common.base.Strings; -import com.google.common.collect.ImmutableSortedMap; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Ordering; +import java.io.File; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; + import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.StringInterner; +import org.apache.hadoop.util.ToolRunner; import org.apache.tez.analyzer.Analyzer; import org.apache.tez.analyzer.CSVResult; -import org.apache.tez.analyzer.utils.Utils; +import org.apache.tez.analyzer.plugins.CriticalPathAnalyzer.CriticalPathStep.EntityType; +import org.apache.tez.analyzer.utils.SVGUtils; import org.apache.tez.dag.api.TezException; +import org.apache.tez.dag.api.oldrecords.TaskAttemptState; +import org.apache.tez.history.parser.datamodel.Container; import org.apache.tez.history.parser.datamodel.DagInfo; +import org.apache.tez.history.parser.datamodel.TaskAttemptInfo; import org.apache.tez.history.parser.datamodel.VertexInfo; -import java.io.File; -import java.io.IOException; -import java.util.List; -import java.util.Map; - -/** - * Identify a set of vertices which fall in the critical path in a DAG. - */ -public class CriticalPathAnalyzer implements Analyzer { - private final Configuration config; - - private static final String[] headers = { "CriticalPath", "Score" }; +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; - private final CSVResult csvResult; +public class CriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer { - private static final String DOT_FILE_DIR = "tez.critical-path.analyzer.dot.output.loc"; - private static final String DOT_FILE_DIR_DEFAULT = "."; //current directory + String succeededState = StringInterner.weakIntern(TaskAttemptState.SUCCEEDED.name()); + String failedState = StringInterner.weakIntern(TaskAttemptState.FAILED.name()); - private final String dotFileLocation; + private final static String DATA_DEPENDENCY = "Data-Dependency"; + private final static String INIT_DEPENDENCY = "Init-Dependency"; + private final static String COMMIT_DEPENDENCY = "Commit-Dependency"; + private final static String NON_DATA_DEPENDENCY = "Non-Data-Dependency"; + private final static String OUTPUT_LOST = "Previous version outputs lost"; - private static final String CONNECTOR = "-->"; + public static class CriticalPathStep { + public enum EntityType { + ATTEMPT, + VERTEX_INIT, + DAG_COMMIT + } - public CriticalPathAnalyzer(Configuration config) { - this.config = config; - this.csvResult = new CSVResult(headers); - this.dotFileLocation = config.get(DOT_FILE_DIR, DOT_FILE_DIR_DEFAULT); + EntityType type; + TaskAttemptInfo attempt; + String reason; // reason linking this to the previous step on the critical path + long startCriticalPathTime; // time at which attempt is on critical path + long stopCriticalPathTime; // time at which attempt is off critical path + List<String> notes = Lists.newLinkedList(); + + public CriticalPathStep(TaskAttemptInfo attempt, EntityType type) { + this.type = type; + this.attempt = attempt; + } + public EntityType getType() { + return type; + } + public TaskAttemptInfo getAttempt() { + return attempt; + } + public long getStartCriticalTime() { + return startCriticalPathTime; + } + public long getStopCriticalTime() { + return stopCriticalPathTime; + } + public String getReason() { + return reason; + } + public List<String> getNotes() { + return notes; + } } + + List<CriticalPathStep> criticalPath = Lists.newLinkedList(); + + Map<String, TaskAttemptInfo> attempts = Maps.newHashMap(); - @Override public void analyze(DagInfo dagInfo) throws TezException { - Map<String, Long> result = Maps.newLinkedHashMap(); - getCriticalPath("", dagInfo.getVertices().get(dagInfo.getVertices().size() - 1), 0, result); + public CriticalPathAnalyzer() { + } - Map<String, Long> sortedByValues = sortByValues(result); - for (Map.Entry<String, Long> entry : sortedByValues.entrySet()) { - List<String> record = Lists.newLinkedList(); - record.add(entry.getKey()); - record.add(entry.getValue() + ""); - csvResult.addRecord(record.toArray(new String[record.size()])); + @Override + public void analyze(DagInfo dagInfo) throws TezException { + // get all attempts in the dag and find the last failed/succeeded attempt. + // ignore killed attempt to handle kills that happen upon dag completion + TaskAttemptInfo lastAttempt = null; + long lastAttemptFinishTime = 0; + for (VertexInfo vertex : dagInfo.getVertices()) { + for (TaskAttemptInfo attempt : vertex.getTaskAttempts()) { + attempts.put(attempt.getTaskAttemptId(), attempt); + if (attempt.getStatus().equals(succeededState) || + attempt.getStatus().equals(failedState)) { + if (lastAttemptFinishTime < attempt.getFinishTime()) { + lastAttempt = attempt; + lastAttemptFinishTime = attempt.getFinishTime(); + } + } + } + } + + if (lastAttempt == null) { + System.out.println("Cannot find last attempt to finish in DAG " + dagInfo.getDagId()); + return; } + + createCriticalPath(dagInfo, lastAttempt, lastAttemptFinishTime, attempts); + + analyzeCriticalPath(dagInfo); - String dotFile = dotFileLocation + File.separator + dagInfo.getDagId() + ".dot"; - try { - List<String> criticalVertices = null; - if (!sortedByValues.isEmpty()) { - String criticalPath = sortedByValues.keySet().iterator().next(); - criticalVertices = getVertexNames(criticalPath); - } else { - criticalVertices = Lists.newLinkedList(); + saveCriticalPathAsSVG(dagInfo); + } + + private void saveCriticalPathAsSVG(DagInfo dagInfo) { + SVGUtils svg = new SVGUtils(); + String outputFileName = getOutputDir() + File.separator + dagInfo.getDagId() + ".svg"; + System.out.println("Writing output to: " + outputFileName); + svg.saveCriticalPathAsSVG(dagInfo, outputFileName, criticalPath); + } + + private void analyzeCriticalPath(DagInfo dag) { + if (!criticalPath.isEmpty()) { + System.out.println("Walking critical path for dag " + dag.getDagId()); + long dagStartTime = dag.getStartTime(); + long dagTime = dag.getFinishTime() - dagStartTime; + long totalAttemptCriticalTime = 0; + for (int i = 0; i < criticalPath.size(); ++i) { + CriticalPathStep step = criticalPath.get(i); + totalAttemptCriticalTime += (step.stopCriticalPathTime - step.startCriticalPathTime); + TaskAttemptInfo attempt = step.attempt; + if (step.getType() == EntityType.ATTEMPT) { + // analyze execution overhead + long avgExecutionTime = attempt.getTaskInfo().getVertexInfo() + .getAvgExecutionTimeInterval(); + if (avgExecutionTime * 1.25 < attempt.getExecutionTimeInterval()) { + step.notes + .add("Potential straggler. Execution time " + attempt.getExecutionTimeInterval() + + " compared to vertex average of " + avgExecutionTime); + } + + if (attempt.getStartTime() > step.startCriticalPathTime) { + // the attempt is critical before launching. So allocation overhead needs analysis + // analyzer allocation overhead + Container container = attempt.getContainer(); + if (container != null) { + Collection<TaskAttemptInfo> attempts = dag.getContainerMapping().get(container); + if (attempts != null && !attempts.isEmpty()) { + // arrange attempts by allocation time + List<TaskAttemptInfo> attemptsList = Lists.newArrayList(attempts); + Collections.sort(attemptsList, TaskAttemptInfo.orderingOnAllocationTime()); + // walk the list to record allocation time before the current attempt + long containerPreviousAllocatedTime = 0; + for (TaskAttemptInfo containerAttempt : attemptsList) { + if (containerAttempt.getTaskAttemptId().equals(attempt.getTaskAttemptId())) { + break; + } + System.out.println("Container: " + container.getId() + " running att: " + + containerAttempt.getTaskAttemptId() + " wait att: " + attempt.getTaskAttemptId()); + containerPreviousAllocatedTime += containerAttempt.getAllocationToEndTimeInterval(); + } + if (containerPreviousAllocatedTime == 0) { + step.notes.add("Container " + container.getId() + " newly allocated."); + } else { + if (containerPreviousAllocatedTime >= attempt.getCreationToAllocationTimeInterval()) { + step.notes.add("Container " + container.getId() + " was fully allocated"); + } else { + step.notes.add("Container " + container.getId() + " allocated for " + + SVGUtils.getTimeStr(containerPreviousAllocatedTime) + " out of " + + SVGUtils.getTimeStr(attempt.getCreationToAllocationTimeInterval()) + + " of allocation wait time"); + } + } + } + } + } + } } - Utils.generateDAGVizFile(dagInfo, dotFile, criticalVertices); - } catch (IOException e) { - throw new TezException(e); + System.out + .println("DAG time taken: " + dagTime + " TotalAttemptTime: " + totalAttemptCriticalTime + + " DAG finish time: " + dag.getFinishTime() + " DAG start time: " + dagStartTime); } } + + private void createCriticalPath(DagInfo dagInfo, TaskAttemptInfo lastAttempt, + long lastAttemptFinishTime, Map<String, TaskAttemptInfo> attempts) { + List<CriticalPathStep> tempCP = Lists.newLinkedList(); + if (lastAttempt != null) { + TaskAttemptInfo currentAttempt = lastAttempt; + CriticalPathStep currentStep = new CriticalPathStep(currentAttempt, EntityType.DAG_COMMIT); + long currentAttemptStopCriticalPathTime = lastAttemptFinishTime; + + // add the commit step + currentStep.stopCriticalPathTime = dagInfo.getFinishTime(); + currentStep.startCriticalPathTime = currentAttemptStopCriticalPathTime; + currentStep.reason = COMMIT_DEPENDENCY; + tempCP.add(currentStep); + while (true) { + Preconditions.checkState(currentAttempt != null); + Preconditions.checkState(currentAttemptStopCriticalPathTime > 0); + System.out.println( + "Step: " + tempCP.size() + " Attempt: " + currentAttempt.getTaskAttemptId()); + currentStep = new CriticalPathStep(currentAttempt, EntityType.ATTEMPT); + currentStep.stopCriticalPathTime = currentAttemptStopCriticalPathTime; + tempCP.add(currentStep); + + // find the next attempt on the critical path + boolean dataDependency = false; + // find out predecessor dependency + if (currentAttempt.getLastDataEventTime() > currentAttempt.getCreationTime()) { + dataDependency = true; + } + + long startCriticalPathTime = 0; + String nextAttemptId = null; + String reason = null; + if (dataDependency) { + // last data event was produced after the attempt was scheduled. use + // data dependency + // typically case when scheduling ahead of time + System.out.println("Has data dependency"); + if (!Strings.isNullOrEmpty(currentAttempt.getLastDataEventSourceTA())) { + // there is a valid data causal TA. Use it. + nextAttemptId = currentAttempt.getLastDataEventSourceTA(); + reason = DATA_DEPENDENCY; + startCriticalPathTime = currentAttempt.getLastDataEventTime(); + System.out.println("Using data dependency " + nextAttemptId); + } else { + // there is no valid data causal TA. This means data event came from the same vertex + VertexInfo vertex = currentAttempt.getTaskInfo().getVertexInfo(); + Preconditions.checkState(!vertex.getAdditionalInputInfoList().isEmpty(), + "Vertex: " + vertex.getVertexId() + " has no external inputs but the last data event " + + "TA is null for " + currentAttempt.getTaskAttemptId()); + nextAttemptId = null; + reason = INIT_DEPENDENCY; + System.out.println("Using init dependency"); + } + } else { + // attempt was scheduled after last data event. use scheduling dependency + // typically happens for retries + System.out.println("Has scheduling dependency"); + if (!Strings.isNullOrEmpty(currentAttempt.getCreationCausalTA())) { + // there is a scheduling causal TA. Use it. + nextAttemptId = currentAttempt.getCreationCausalTA(); + reason = NON_DATA_DEPENDENCY; + TaskAttemptInfo nextAttempt = attempts.get(nextAttemptId); + if (nextAttempt != null) { + VertexInfo currentVertex = currentAttempt.getTaskInfo().getVertexInfo(); + VertexInfo nextVertex = nextAttempt.getTaskInfo().getVertexInfo(); + if (!nextVertex.getVertexName().equals(currentVertex.getVertexName())){ + // cause from different vertex. Might be rerun to re-generate outputs + for (VertexInfo outVertex : currentVertex.getOutputVertices()) { + if (nextVertex.getVertexName().equals(outVertex.getVertexName())) { + // next vertex is an output vertex + reason = OUTPUT_LOST; + break; + } + } + } + } + startCriticalPathTime = currentAttempt.getCreationTime(); + System.out.println("Using scheduling dependency " + nextAttemptId); + } else { + // there is no scheduling causal TA. + if (!Strings.isNullOrEmpty(currentAttempt.getLastDataEventSourceTA())) { + // there is a data event going to the vertex. Count the time between data event and + // scheduling time as Initializer/Manager overhead and follow data dependency + nextAttemptId = currentAttempt.getLastDataEventSourceTA(); + reason = DATA_DEPENDENCY; + startCriticalPathTime = currentAttempt.getLastDataEventTime(); + long overhead = currentAttempt.getCreationTime() + - currentAttempt.getLastDataEventTime(); + currentStep.notes + .add("Initializer/VertexManager scheduling overhead " + overhead + " ms"); + System.out.println("Using data dependency " + nextAttemptId); + } else { + // there is no scheduling causal TA and no data event casual TA. + // the vertex has external input that sent the last data events + // or the vertex has external input but does not use events + // or the vertex has no external inputs or edges + nextAttemptId = null; + reason = INIT_DEPENDENCY; + System.out.println("Using init dependency"); + } + } + } + + currentStep.startCriticalPathTime = startCriticalPathTime; + currentStep.reason = reason; + + if (Strings.isNullOrEmpty(nextAttemptId)) { + Preconditions.checkState(reason.equals(INIT_DEPENDENCY)); + Preconditions.checkState(startCriticalPathTime == 0); + // no predecessor attempt found. this is the last step in the critical path + // assume attempts start critical path time is when its scheduled. before that is + // vertex initialization time + currentStep.startCriticalPathTime = currentStep.attempt.getCreationTime(); + + // add vertex init step + long initStepStopCriticalTime = currentStep.startCriticalPathTime; + currentStep = new CriticalPathStep(currentAttempt, EntityType.VERTEX_INIT); + currentStep.stopCriticalPathTime = initStepStopCriticalTime; + currentStep.startCriticalPathTime = dagInfo.getStartTime(); + currentStep.reason = INIT_DEPENDENCY; + tempCP.add(currentStep); + + if (!tempCP.isEmpty()) { + for (int i=tempCP.size() - 1; i>=0; --i) { + criticalPath.add(tempCP.get(i)); + } + } + return; + } + + currentAttempt = attempts.get(nextAttemptId); + currentAttemptStopCriticalPathTime = startCriticalPathTime; + } + } + } + @Override public CSVResult getResult() throws TezException { + String[] headers = { "Entity", "PathReason", "Status", "CriticalStartTime", + "CriticalStopTime", "Notes" }; + + CSVResult csvResult = new CSVResult(headers); + for (CriticalPathStep step : criticalPath) { + String entity = (step.getType() == EntityType.ATTEMPT ? step.getAttempt().getTaskAttemptId() + : (step.getType() == EntityType.VERTEX_INIT + ? step.attempt.getTaskInfo().getVertexInfo().getVertexName() : "DAG COMMIT")); + String [] record = {entity, step.getReason(), + step.getAttempt().getDetailedStatus(), String.valueOf(step.getStartCriticalTime()), + String.valueOf(step.getStopCriticalTime()), + Joiner.on(";").join(step.getNotes())}; + csvResult.addRecord(record); + } return csvResult; } @@ -105,38 +369,12 @@ public class CriticalPathAnalyzer implements Analyzer { @Override public Configuration getConfiguration() { - return config; + return getConf(); } - - private static Map<String, Long> sortByValues(Map<String, Long> result) { - //Sort result by time in reverse order - final Ordering<String> reversValueOrdering = - Ordering.natural().reverse().nullsLast().onResultOf(Functions.forMap(result, null)); - Map<String, Long> orderedMap = ImmutableSortedMap.copyOf(result, reversValueOrdering); - return orderedMap; + + public static void main(String[] args) throws Exception { + int res = ToolRunner.run(new Configuration(), new CriticalPathAnalyzer(), args); + System.exit(res); } - private static void getCriticalPath(String predecessor, VertexInfo dest, long time, - Map<String, Long> result) { - String destVertexName = (dest != null) ? (dest.getVertexName()) : ""; - - if (dest != null) { - time += dest.getTimeTaken(); - predecessor += destVertexName + CONNECTOR; - - for (VertexInfo incomingVertex : dest.getInputVertices()) { - getCriticalPath(predecessor, incomingVertex, time, result); - } - - result.put(predecessor, time); - } - } - - private static List<String> getVertexNames(String criticalPath) { - if (Strings.isNullOrEmpty(criticalPath)) { - return Lists.newLinkedList(); - } - return Lists.newLinkedList(Splitter.on(CONNECTOR).trimResults().omitEmptyStrings().split - (criticalPath)); - } } http://git-wip-us.apache.org/repos/asf/tez/blob/24e17a4e/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TezAnalyzerBase.java ---------------------------------------------------------------------- diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TezAnalyzerBase.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TezAnalyzerBase.java new file mode 100644 index 0000000..3eb2f57 --- /dev/null +++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TezAnalyzerBase.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.analyzer.plugins; + +import java.io.File; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.GnuParser; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.OptionBuilder; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.util.Tool; +import org.apache.tez.analyzer.Analyzer; +import org.apache.tez.history.parser.ATSFileParser; +import org.apache.tez.history.parser.datamodel.DagInfo; + +import com.google.common.base.Preconditions; + +public abstract class TezAnalyzerBase extends Configured implements Tool, Analyzer { + + + private static final String ATS_FILE_NAME = "atsFileName"; + private static final String OUTPUT_DIR = "outputDir"; + private static final String DAG_ID = "dagId"; + private static final String HELP = "help"; + + private String outputDir; + + @SuppressWarnings("static-access") + private static Options buildOptions() { + Option dagIdOption = OptionBuilder.withArgName(DAG_ID).withLongOpt(DAG_ID) + .withDescription("DagId that needs to be analyzed").hasArg().isRequired(true).create(); + + Option outputDirOption = OptionBuilder.withArgName(OUTPUT_DIR).withLongOpt(OUTPUT_DIR) + .withDescription("Directory to write outputs to.").hasArg().isRequired(false).create(); + + Option inputATSFileNameOption = OptionBuilder.withArgName(ATS_FILE_NAME).withLongOpt + (ATS_FILE_NAME) + .withDescription("File with ATS data for the DAG").hasArg() + .isRequired(true).create(); + Option help = OptionBuilder.withArgName(HELP).withLongOpt + (HELP) + .withDescription("print help") + .isRequired(false).create(); + + Options opts = new Options(); + opts.addOption(dagIdOption); + opts.addOption(outputDirOption); + opts.addOption(inputATSFileNameOption); + opts.addOption(help); + return opts; + } + + protected String getOutputDir() { + return outputDir; + } + + private void printUsage() { + System.err.println("Analyzer base options are"); + Options options = buildOptions(); + for (Object obj : options.getOptions()) { + Option option = (Option) obj; + System.err.println(option.getArgName() + " : " + option.getDescription()); + } + } + + @Override + public int run(String[] args) throws Exception { + //Parse downloaded contents + CommandLine cmdLine = null; + try { + cmdLine = new GnuParser().parse(buildOptions(), args); + } catch (ParseException e) { + System.err.println("Invalid options on command line"); + printUsage(); + return -1; + } + + if(cmdLine.hasOption(HELP)) { + printUsage(); + return 0; + } + + outputDir = cmdLine.getOptionValue(OUTPUT_DIR); + if (outputDir == null) { + outputDir = System.getProperty("user.dir"); + } + + File file = new File(cmdLine.getOptionValue(ATS_FILE_NAME)); + String dagId = cmdLine.getOptionValue(DAG_ID); + + ATSFileParser parser = new ATSFileParser(file); + DagInfo dagInfo = parser.getDAGData(dagId); + Preconditions.checkState(dagInfo.getDagId().equals(dagId)); + analyze(dagInfo); + return 0; + } + +} http://git-wip-us.apache.org/repos/asf/tez/blob/24e17a4e/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/VertexLevelCriticalPathAnalyzer.java ---------------------------------------------------------------------- diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/VertexLevelCriticalPathAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/VertexLevelCriticalPathAnalyzer.java new file mode 100644 index 0000000..9661ea3 --- /dev/null +++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/VertexLevelCriticalPathAnalyzer.java @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.analyzer.plugins; + +import com.google.common.base.Functions; +import com.google.common.base.Splitter; +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableSortedMap; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Ordering; +import org.apache.hadoop.conf.Configuration; +import org.apache.tez.analyzer.Analyzer; +import org.apache.tez.analyzer.CSVResult; +import org.apache.tez.analyzer.utils.Utils; +import org.apache.tez.dag.api.TezException; +import org.apache.tez.history.parser.datamodel.DagInfo; +import org.apache.tez.history.parser.datamodel.VertexInfo; + +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.Map; + +/** + * Identify a set of vertices which fall in the critical path in a DAG. + */ +public class VertexLevelCriticalPathAnalyzer implements Analyzer { + private final Configuration config; + + private static final String[] headers = { "CriticalPath", "Score" }; + + private final CSVResult csvResult; + + private static final String DOT_FILE_DIR = "tez.critical-path.analyzer.dot.output.loc"; + private static final String DOT_FILE_DIR_DEFAULT = "."; //current directory + + private final String dotFileLocation; + + private static final String CONNECTOR = "-->"; + + public VertexLevelCriticalPathAnalyzer(Configuration config) { + this.config = config; + this.csvResult = new CSVResult(headers); + this.dotFileLocation = config.get(DOT_FILE_DIR, DOT_FILE_DIR_DEFAULT); + } + + @Override public void analyze(DagInfo dagInfo) throws TezException { + Map<String, Long> result = Maps.newLinkedHashMap(); + getCriticalPath("", dagInfo.getVertices().get(dagInfo.getVertices().size() - 1), 0, result); + + Map<String, Long> sortedByValues = sortByValues(result); + for (Map.Entry<String, Long> entry : sortedByValues.entrySet()) { + List<String> record = Lists.newLinkedList(); + record.add(entry.getKey()); + record.add(entry.getValue() + ""); + csvResult.addRecord(record.toArray(new String[record.size()])); + } + + String dotFile = dotFileLocation + File.separator + dagInfo.getDagId() + ".dot"; + try { + List<String> criticalVertices = null; + if (!sortedByValues.isEmpty()) { + String criticalPath = sortedByValues.keySet().iterator().next(); + criticalVertices = getVertexNames(criticalPath); + } else { + criticalVertices = Lists.newLinkedList(); + } + Utils.generateDAGVizFile(dagInfo, dotFile, criticalVertices); + } catch (IOException e) { + throw new TezException(e); + } + } + + @Override + public CSVResult getResult() throws TezException { + return csvResult; + } + + @Override + public String getName() { + return "CriticalPathAnalyzer"; + } + + @Override + public String getDescription() { + return "Analyze vertex level critical path of the DAG"; + } + + @Override + public Configuration getConfiguration() { + return config; + } + + private static Map<String, Long> sortByValues(Map<String, Long> result) { + //Sort result by time in reverse order + final Ordering<String> reversValueOrdering = + Ordering.natural().reverse().nullsLast().onResultOf(Functions.forMap(result, null)); + Map<String, Long> orderedMap = ImmutableSortedMap.copyOf(result, reversValueOrdering); + return orderedMap; + } + + private static void getCriticalPath(String predecessor, VertexInfo dest, long time, + Map<String, Long> result) { + String destVertexName = (dest != null) ? (dest.getVertexName()) : ""; + + if (dest != null) { + time += dest.getTimeTaken(); + predecessor += destVertexName + CONNECTOR; + + for (VertexInfo incomingVertex : dest.getInputVertices()) { + getCriticalPath(predecessor, incomingVertex, time, result); + } + + result.put(predecessor, time); + } + } + + private static List<String> getVertexNames(String criticalPath) { + if (Strings.isNullOrEmpty(criticalPath)) { + return Lists.newLinkedList(); + } + return Lists.newLinkedList(Splitter.on(CONNECTOR).trimResults().omitEmptyStrings().split + (criticalPath)); + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/24e17a4e/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/SVGUtils.java ---------------------------------------------------------------------- diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/SVGUtils.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/SVGUtils.java index 4a582bb..50fe033 100644 --- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/SVGUtils.java +++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/SVGUtils.java @@ -18,247 +18,294 @@ package org.apache.tez.analyzer.utils; import org.apache.commons.io.IOUtils; -import org.apache.tez.dag.api.oldrecords.TaskAttemptState; +import org.apache.commons.io.output.FileWriterWithEncoding; +import org.apache.tez.analyzer.plugins.CriticalPathAnalyzer.CriticalPathStep; +import org.apache.tez.analyzer.plugins.CriticalPathAnalyzer.CriticalPathStep.EntityType; import org.apache.tez.history.parser.datamodel.DagInfo; import org.apache.tez.history.parser.datamodel.TaskAttemptInfo; -import org.apache.tez.history.parser.datamodel.TaskInfo; -import org.apache.tez.history.parser.datamodel.VertexInfo; -import org.plutext.jaxb.svg11.Line; -import org.plutext.jaxb.svg11.ObjectFactory; -import org.plutext.jaxb.svg11.Svg; -import org.plutext.jaxb.svg11.Title; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import javax.xml.bind.JAXBContext; -import javax.xml.bind.JAXBElement; -import javax.xml.bind.JAXBException; -import javax.xml.bind.Marshaller; -import javax.xml.namespace.QName; -import java.io.BufferedReader; +import com.google.common.base.Joiner; + import java.io.BufferedWriter; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; import java.io.IOException; -import java.io.InputStreamReader; -import java.io.OutputStreamWriter; -import java.util.Collection; -import java.util.Comparator; -import java.util.TreeSet; +import java.text.DecimalFormat; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.TimeUnit; public class SVGUtils { - private static final String UTF8 = "UTF-8"; - - private static final Logger LOG = LoggerFactory.getLogger(SVGUtils.class); - - - private final ObjectFactory objectFactory; - private final Svg svg; - private final QName titleName = new QName("title"); - private static int MAX_DAG_RUNTIME = 0; private static final int SCREEN_WIDTH = 1800; - private final DagInfo dagInfo; - - //Gap between various components - private static final int DAG_GAP = 70; - private static final int VERTEX_GAP = 50; - private static final int TASK_GAP = 5; - private static final int STROKE_WIDTH = 5; - - //To compute the size of the graph. - private long MIN_X = Long.MAX_VALUE; - private long MAX_X = Long.MIN_VALUE; + public SVGUtils() { + } - private int x1 = 0; - private int y1 = 0; - private int y2 = 0; + private int Y_MAX; + private int X_MAX; + private static final DecimalFormat secondFormat = new DecimalFormat("#.##"); + private static final int X_BASE = 100; + private static final int Y_BASE = 100; + private static final int TICK = 1; + private static final int STEP_GAP = 50; + private static final int TEXT_SIZE = 20; + private static final String RUNTIME_COLOR = "LightGreen"; + private static final String ALLOCATION_OVERHEAD_COLOR = "GoldenRod"; + private static final String LAUNCH_OVERHEAD_COLOR = "DarkSalmon"; + private static final String BORDER_COLOR = "Sienna"; + private static final String VERTEX_INIT_COMMIT_COLOR = "orange"; + private static final String CRITICAL_COLOR = "IndianRed"; + private static final float RECT_OPACITY = 1.0f; + private static final String TITLE_BR = " "; - public SVGUtils(DagInfo dagInfo) { - this.dagInfo = dagInfo; - this.objectFactory = new ObjectFactory(); - this.svg = objectFactory.createSvg(); + public static String getTimeStr(final long millis) { + long minutes = TimeUnit.MILLISECONDS.toMinutes(millis) + - TimeUnit.HOURS.toMinutes(TimeUnit.MILLISECONDS.toHours(millis)); + long hours = TimeUnit.MILLISECONDS.toHours(millis); + StringBuilder b = new StringBuilder(); + b.append(hours == 0 ? "" : String.valueOf(hours) + "h"); + b.append(minutes == 0 ? "" : String.valueOf(minutes) + "m"); + long seconds = millis - TimeUnit.MINUTES.toMillis( + TimeUnit.MILLISECONDS.toMinutes(millis)); + b.append(secondFormat.format(seconds/1000.0) + "s"); + + return b.toString(); } - - private Line createLine(int x1, int y1, int x2, int y2) { - Line line = objectFactory.createLine(); - line.setX1(scaleDown(x1) + ""); - line.setY1(y1 + ""); - line.setX2(scaleDown(x2) + ""); - line.setY2(y2 + ""); - return line; + + List<String> svgLines = new LinkedList<>(); + + private final int addOffsetX(int x) { + int xOff = x + X_BASE; + X_MAX = Math.max(X_MAX, xOff); + return xOff; } - - private Title createTitle(String msg) { - Title t = objectFactory.createTitle(); - t.setContent(msg); - return t; + + private final int addOffsetY(int y) { + int yOff = y + Y_BASE; + Y_MAX = Math.max(Y_MAX, yOff); + return yOff; } - - private Title createTitleForVertex(VertexInfo vertex) { - String titleStr = vertex.getVertexName() + ":" - + (vertex.getFinishTimeInterval()) - + " ms, RelativeTimeToDAG:" - + (vertex.getInitTime() - this.dagInfo.getStartTime()) - + " ms, counters:" + vertex.getTezCounters(); - Title title = createTitle(titleStr); - return title; + + private int scaleDown(int len) { + return Math.round((len * 1.0f / MAX_DAG_RUNTIME) * SCREEN_WIDTH); } - - private Title createTitleForTaskAttempt(TaskAttemptInfo taskAttemptInfo) { - String titleStr = "RelativeTimeToVertex:" - + (taskAttemptInfo.getStartTime() - - taskAttemptInfo.getTaskInfo().getVertexInfo().getInitTime()) + - " ms, " + taskAttemptInfo.toString() + ", counters:" + taskAttemptInfo.getTezCounters(); - Title title = createTitle(titleStr); - return title; + + private void addRectStr(int x, int width, int y, int height, + String fillColor, String borderColor, float opacity, String title) { + String rectStyle = "stroke: " + borderColor + "; fill: " + fillColor + "; opacity: " + opacity; + String rectStr = "<rect x=\"" + addOffsetX(scaleDown(x)) + "\"" + + " y=\"" + addOffsetY(y) + "\"" + + " width=\"" + scaleDown(width) + "\"" + + " height=\"" + height + "\"" + + " style=\"" + rectStyle + "\"" + + " >" + + " <title>" + title +"</title>" + + " </rect>"; + svgLines.add(rectStr); } - - /** - * Draw DAG from dagInfo - * - * @param dagInfo - */ - private void drawDAG(DagInfo dagInfo) { - Title title = createTitle(dagInfo.getDagId() + " : " + dagInfo.getTimeTaken() + " ms"); - int duration = (int) dagInfo.getFinishTimeInterval(); - MAX_DAG_RUNTIME = duration; - MIN_X = Math.min(dagInfo.getStartTimeInterval(), MIN_X); - MAX_X = Math.max(dagInfo.getFinishTimeInterval(), MAX_X); - Line line = createLine(x1, y1, x1 + duration, y2); - line.getSVGDescriptionClass().add(new JAXBElement<Title>(titleName, Title.class, title)); - line.setStyle("stroke: black; stroke-width:20"); - line.setOpacity("0.3"); - svg.getSVGDescriptionClassOrSVGAnimationClassOrSVGStructureClass().add(line); - drawVertex(); + + private void addTextStr(int x, int y, String text, String anchor, int size, String title) { + String textStyle = "text-anchor: " + anchor + "; font-size: " + size + "px;"; + String textStr = "<text x=\"" + addOffsetX(scaleDown(x)) + "\" " + + "y=\"" + addOffsetY(y) + "\" " + + "style=\"" + textStyle + "\" transform=\"\">" + + text + + " <title>" + title +"</title>" + + "</text>"; + svgLines.add(textStr); } - - private Collection<VertexInfo> getSortedVertices() { - Collection<VertexInfo> vertices = this.dagInfo.getVertices(); - // Add corresponding vertex details - TreeSet<VertexInfo> vertexSet = new TreeSet<VertexInfo>( - new Comparator<VertexInfo>() { - @Override - public int compare(VertexInfo o1, VertexInfo o2) { - return (int) (o1.getFirstTaskStartTimeInterval() - o2.getFirstTaskStartTimeInterval()); - } - }); - vertexSet.addAll(vertices); - return vertexSet; + + private void addLineStr(int x1, int y1, int x2, int y2, String color, String title, int width) { + String style = "stroke: " + color + "; stroke-width:" + width; + String str = "<line x1=\"" + addOffsetX(scaleDown(x1)) + "\"" + + " y1=\"" + addOffsetY(y1) + "\"" + + " x2=\"" + addOffsetX(scaleDown(x2)) + "\"" + + " y2=\"" + addOffsetY(y2) + "\"" + + " style=\"" + style + "\"" + + " >" + + " <title>" + title +"</title>" + + " </line>"; + svgLines.add(str); } - - private Collection<TaskInfo> getSortedTasks(VertexInfo vertexInfo) { - Collection<TaskInfo> tasks = vertexInfo.getTasks(); - // Add corresponding task details - TreeSet<TaskInfo> taskSet = new TreeSet<TaskInfo>(new Comparator<TaskInfo>() { - @Override - public int compare(TaskInfo o1, TaskInfo o2) { - return (int) (o1.getSuccessfulTaskAttempt().getStartTimeInterval() - - o2.getSuccessfulTaskAttempt().getStartTimeInterval()); + + public void drawStep(CriticalPathStep step, long dagStartTime, int yOffset) { + if (step.getType() != EntityType.ATTEMPT) { + // draw initial vertex or final commit overhead + StringBuilder title = new StringBuilder(); + String text = null; + if (step.getType() == EntityType.VERTEX_INIT) { + String vertex = step.getAttempt().getTaskInfo().getVertexInfo().getVertexName(); + text = vertex + " : Init"; + title.append(text).append(TITLE_BR); + } else { + text = "Output Commit"; + title.append(text).append(TITLE_BR); } - }); - taskSet.addAll(tasks); - return taskSet; - } + title.append("Critical Path Dependency: " + step.getReason()).append(TITLE_BR); + title.append( + "Critical Time: " + getTimeStr(step.getStopCriticalTime() - step.getStartCriticalTime())) + .append(""); + title.append(Joiner.on(TITLE_BR).join(step.getNotes())); + String titleStr = title.toString(); + int stopTimeInterval = (int) (step.getStopCriticalTime() - dagStartTime); + int startTimeInterval = (int) (step.getStartCriticalTime() - dagStartTime); + addRectStr(startTimeInterval, + (stopTimeInterval - startTimeInterval), yOffset * STEP_GAP, STEP_GAP, + VERTEX_INIT_COMMIT_COLOR, BORDER_COLOR, RECT_OPACITY, titleStr); + addTextStr((stopTimeInterval + startTimeInterval) / 2, + (yOffset * STEP_GAP + STEP_GAP / 2), + text, "middle", + TEXT_SIZE, titleStr); + } else { + TaskAttemptInfo attempt = step.getAttempt(); + int startCriticalTimeInterval = (int) (step.getStartCriticalTime() - dagStartTime); + int stopCriticalTimeInterval = (int) (step.getStopCriticalTime() - dagStartTime); + int creationTimeInterval = (int) (attempt.getCreationTime() - dagStartTime); + int allocationTimeInterval = (int) (attempt.getAllocationTime() - dagStartTime); + int launchTimeInterval = (int) (attempt.getStartTime() - dagStartTime); + int finishTimeInterval = (int) (attempt.getFinishTime() - dagStartTime); + System.out.println(attempt.getTaskAttemptId() + " " + creationTimeInterval + " " + + allocationTimeInterval + " " + launchTimeInterval + " " + finishTimeInterval); + + StringBuilder title = new StringBuilder(); + title.append("Attempt: " + attempt.getTaskAttemptId()).append(TITLE_BR); + title.append("Critical Path Dependency: " + step.getReason()).append(TITLE_BR); + title.append("Completion Status: " + attempt.getDetailedStatus()).append(TITLE_BR); + title.append( + "Critical Time Contribution: " + + getTimeStr(step.getStopCriticalTime() - step.getStartCriticalTime())).append(TITLE_BR); + title.append("Critical start at: " + getTimeStr(startCriticalTimeInterval)).append(TITLE_BR); + title.append("Critical stop at: " + getTimeStr(stopCriticalTimeInterval)).append(TITLE_BR); + title.append("Created at: " + getTimeStr(creationTimeInterval)).append(TITLE_BR); + title.append("Allocated at: " + getTimeStr(allocationTimeInterval)).append(TITLE_BR); + title.append("Launched at: " + getTimeStr(launchTimeInterval)).append(TITLE_BR); + title.append("Finished at: " + getTimeStr(finishTimeInterval)).append(TITLE_BR); + title.append(Joiner.on(TITLE_BR).join(step.getNotes())); + String titleStr = title.toString(); - /** - * Draw the vertices - * - */ - public void drawVertex() { - Collection<VertexInfo> vertices = getSortedVertices(); - for (VertexInfo vertex : vertices) { - //Set vertex start time as the one when its first task attempt started executing - x1 = (int) vertex.getStartTimeInterval(); - y1 += VERTEX_GAP; - int duration = ((int) (vertex.getTimeTaken())); - Line line = createLine(x1, y1, x1 + duration, y1); - line.setStyle("stroke: red; stroke-width:" + STROKE_WIDTH); - line.setOpacity("0.3"); + addRectStr(creationTimeInterval, allocationTimeInterval - creationTimeInterval, + yOffset * STEP_GAP, STEP_GAP, ALLOCATION_OVERHEAD_COLOR, BORDER_COLOR, RECT_OPACITY, + titleStr); - Title vertexTitle = createTitleForVertex(vertex); - line.getSVGDescriptionClass().add( - new JAXBElement<Title>(titleName, Title.class, vertexTitle)); - svg.getSVGDescriptionClassOrSVGAnimationClassOrSVGStructureClass().add(line); - // For each vertex, draw the tasks - drawTask(vertex); + addRectStr(allocationTimeInterval, launchTimeInterval - allocationTimeInterval, + yOffset * STEP_GAP, STEP_GAP, LAUNCH_OVERHEAD_COLOR, BORDER_COLOR, RECT_OPACITY, + titleStr); + + addRectStr(launchTimeInterval, finishTimeInterval - launchTimeInterval, yOffset * STEP_GAP, + STEP_GAP, RUNTIME_COLOR, BORDER_COLOR, RECT_OPACITY, titleStr); + + addTextStr((finishTimeInterval + creationTimeInterval) / 2, + (yOffset * STEP_GAP + STEP_GAP / 2), attempt.getShortName(), "middle", TEXT_SIZE, titleStr); } - x1 = x1 + (int) dagInfo.getFinishTimeInterval(); - y1 = y1 + DAG_GAP; - y2 = y1; } - /** - * Draw tasks - * - * @param vertex - */ - public void drawTask(VertexInfo vertex) { - Collection<TaskInfo> tasks = getSortedTasks(vertex); - for (TaskInfo task : tasks) { - for (TaskAttemptInfo taskAttemptInfo : task.getTaskAttempts()) { - x1 = (int) taskAttemptInfo.getStartTimeInterval(); - y1 += TASK_GAP; - int duration = (int) taskAttemptInfo.getTimeTaken(); - Line line = createLine(x1, y1, x1 + duration, y1); - String color = - taskAttemptInfo.getStatus().equalsIgnoreCase(TaskAttemptState.SUCCEEDED.name()) - ? "green" : "red"; - line.setStyle("stroke: " + color + "; stroke-width:" + STROKE_WIDTH); - Title title = createTitleForTaskAttempt(taskAttemptInfo); - line.getSVGDescriptionClass().add( - new JAXBElement<Title>(titleName, Title.class, title)); - svg.getSVGDescriptionClassOrSVGAnimationClassOrSVGStructureClass() - .add(line); + private void drawCritical(DagInfo dagInfo, List<CriticalPathStep> criticalPath) { + int duration = (int) dagInfo.getFinishTimeInterval(); + MAX_DAG_RUNTIME = duration; + long dagStartTime = dagInfo.getStartTime(); + int dagStartTimeInterval = 0; // this is 0 since we are offseting from the dag start time + int dagFinishTimeInterval = (int) (dagInfo.getFinishTime() - dagStartTime); + + // draw grid + addLineStr(dagStartTimeInterval, 0, dagFinishTimeInterval, 0, BORDER_COLOR, "", TICK); + int yGrid = (criticalPath.size() + 2)*STEP_GAP; + for (int i=0; i<11; ++i) { + int x = Math.round(((dagFinishTimeInterval - dagStartTimeInterval)/10.0f)*i); + addLineStr(x, 0, x, yGrid, BORDER_COLOR, "", TICK); + addTextStr(x, 0, getTimeStr(x), "left", TEXT_SIZE, ""); + } + addLineStr(dagStartTimeInterval, yGrid, dagFinishTimeInterval, yGrid, BORDER_COLOR, "", TICK); + addTextStr((dagFinishTimeInterval + dagStartTimeInterval) / 2, yGrid + STEP_GAP, + "Critical Path for " + dagInfo.getName() + " (" + dagInfo.getDagId() + ")", "middle", + TEXT_SIZE, ""); + + // draw steps + for (int i=1; i<=criticalPath.size(); ++i) { + CriticalPathStep step = criticalPath.get(i-1); + drawStep(step, dagStartTime, i); + } + + // draw critical path on top + for (int i=1; i<=criticalPath.size(); ++i) { + CriticalPathStep step = criticalPath.get(i-1); + boolean isLast = i == criticalPath.size(); + + // draw critical path for step + int startCriticalTimeInterval = (int) (step.getStartCriticalTime() - dagStartTime); + int stopCriticalTimeInterval = (int) (step.getStopCriticalTime() - dagStartTime); + addLineStr(startCriticalTimeInterval, (i + 1) * STEP_GAP, stopCriticalTimeInterval, + (i + 1) * STEP_GAP, CRITICAL_COLOR, "Critical Time " + step.getAttempt().getShortName(), TICK*5); + + if (isLast) { + // last step. add commit overhead + int stepStopCriticalTimeInterval = (int) (step.getStopCriticalTime() - dagStartTime); + addLineStr(stepStopCriticalTimeInterval, (i + 1) * STEP_GAP, dagFinishTimeInterval, + (i + 1) * STEP_GAP, CRITICAL_COLOR, + "Critical Time " + step.getAttempt().getTaskInfo().getVertexInfo().getVertexName(), TICK*5); + } else { + // connect to next step in critical path + addLineStr(stopCriticalTimeInterval, (i + 1) * STEP_GAP, stopCriticalTimeInterval, + (i + 2) * STEP_GAP, CRITICAL_COLOR, "Critical Time " + step.getAttempt().getShortName(), TICK*5); } } + + // draw legend + int legendX = 0; + int legendY = (criticalPath.size() + 2) * STEP_GAP; + int legendWidth = 10000; + + addRectStr(legendX, legendWidth, legendY, STEP_GAP, VERTEX_INIT_COMMIT_COLOR, BORDER_COLOR, RECT_OPACITY, ""); + addTextStr(legendX, legendY + STEP_GAP/2, "Vertex Init/Commit Overhead", "left", TEXT_SIZE, ""); + legendY += STEP_GAP; + addRectStr(legendX, legendWidth, legendY, STEP_GAP, ALLOCATION_OVERHEAD_COLOR, BORDER_COLOR, RECT_OPACITY, ""); + addTextStr(legendX, legendY + STEP_GAP/2, "Task Allocation Overhead", "left", TEXT_SIZE, ""); + legendY += STEP_GAP; + addRectStr(legendX, legendWidth, legendY, STEP_GAP, LAUNCH_OVERHEAD_COLOR, BORDER_COLOR, RECT_OPACITY, ""); + addTextStr(legendX, legendY + STEP_GAP/2, "Task Launch Overhead", "left", TEXT_SIZE, ""); + legendY += STEP_GAP; + addRectStr(legendX, legendWidth, legendY, STEP_GAP, RUNTIME_COLOR, BORDER_COLOR, RECT_OPACITY, ""); + addTextStr(legendX, legendY + STEP_GAP/2, "Task Execution Time", "left", TEXT_SIZE, ""); + + Y_MAX += Y_BASE*2; + X_MAX += X_BASE*2; } - - /** - * Convert DAG to graph - * - * @throws java.io.IOException - * @throws javax.xml.bind.JAXBException - */ - public void saveAsSVG(String fileName) throws IOException, JAXBException { - drawDAG(dagInfo); - svg.setHeight("" + y2); - svg.setWidth("" + (MAX_X - MIN_X)); - String tempFileName = System.nanoTime() + ".svg"; - File file = new File(tempFileName); - JAXBContext jaxbContext = JAXBContext.newInstance(Svg.class); - Marshaller jaxbMarshaller = jaxbContext.createMarshaller(); - jaxbMarshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, true); - jaxbMarshaller.marshal(svg, file); - //TODO: dirty workaround to get rid of XMLRootException issue - BufferedReader reader = new BufferedReader( - new InputStreamReader(new FileInputStream(file), UTF8)); - BufferedWriter writer = new BufferedWriter( - new OutputStreamWriter(new FileOutputStream(fileName), UTF8)); + + public void saveCriticalPathAsSVG(DagInfo dagInfo, + String fileName, List<CriticalPathStep> criticalPath) { + drawCritical(dagInfo, criticalPath); + saveFileStr(fileName); + } + + private void saveFileStr(String fileName) { + String header = "<?xml version=\"1.0\" standalone=\"no\"?> " + + "<!DOCTYPE svg PUBLIC \"-//W3C//DTD SVG 1.1//EN\" " + + "\"http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd\">" + + "<svg xmlns=\"http://www.w3.org/2000/svg\" version=\"1.1\" " + + "xmlns:xlink=\"http://www.w3.org/1999/xlink\" " + + "height=\"" + Y_MAX + "\" " + + "width=\"" + X_MAX + "\"> " + + "<script type=\"text/ecmascript\" " + + "xlink:href=\"http://code.jquery.com/jquery-2.1.1.min.js\" />"; + String footer = "</svg>"; + String newline = System.getProperty("line.separator"); + BufferedWriter writer = null; try { - while (reader.ready()) { - String line = reader.readLine(); - if (line != null) { - line = line.replaceAll( - " xmlns:ns3=\"http://www.w3.org/2000/svg\" xmlns=\"\"", ""); - writer.write(line); - writer.newLine(); - } + writer = new BufferedWriter(new FileWriterWithEncoding(fileName, "UTF-8")); + writer.write(header); + writer.write(newline); + for (String str : svgLines) { + writer.write(str); + writer.write(newline); } + writer.write(footer); + } catch (IOException e) { + throw new RuntimeException(e); } finally { - IOUtils.closeQuietly(reader); - IOUtils.closeQuietly(writer); - if (file.exists()) { - boolean deleted = file.delete(); - LOG.debug("Deleted {}" + file.getAbsolutePath()); + if (writer != null) { + IOUtils.closeQuietly(writer); } } - } - private float scaleDown(int len) { - return (len * 1.0f / MAX_DAG_RUNTIME) * SCREEN_WIDTH; } + } \ No newline at end of file
