Repository: tez Updated Branches: refs/heads/master eadbfec44 -> ecd90dc1f
http://git-wip-us.apache.org/repos/asf/tez/blob/ecd90dc1/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/ShuffleTimeAnalyzer.java ---------------------------------------------------------------------- diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/ShuffleTimeAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/ShuffleTimeAnalyzer.java index 8df40ba..a570493 100644 --- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/ShuffleTimeAnalyzer.java +++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/ShuffleTimeAnalyzer.java @@ -18,6 +18,7 @@ package org.apache.tez.analyzer.plugins; +import com.google.common.base.Strings; import com.google.common.collect.Lists; import org.apache.hadoop.conf.Configuration; import org.apache.tez.analyzer.Analyzer; @@ -43,15 +44,21 @@ import java.util.Map; */ public class ShuffleTimeAnalyzer implements Analyzer { - private static final String SHUFFLE_TIME_RATIO = "tez.shuffle-time-analyzer.shuffle.ratio"; - private static final float SHUFFLE_TIME_RATIO_DEFAULT = 0.5f; + /** + * ratio of (total time taken by task - shuffle time) / (total time taken by task) + */ + private static final String REAL_WORK_DONE_RATIO = "tez.shuffle-time-analyzer.real-work.done.ratio"; + private static final float REAL_WORK_DONE_RATIO_DEFAULT = 0.5f; + /** + * Number of min records that needs to get in as reduce input records. + */ private static final String MIN_SHUFFLE_RECORDS = "tez.shuffle-time-analyzer.shuffle.min.records"; private static final long MIN_SHUFFLE_RECORDS_DEFAULT = 10000; private static final String[] headers = { "vertexName", "taskAttemptId", "Node", "counterGroup", "Comments", "REDUCE_INPUT_GROUPS", "REDUCE_INPUT_RECORDS", "ratio", "SHUFFLE_BYTES", - "Time taken to receive all events", "MERGE_PHASE_TIME", "SHUFFLE_PHASE_TIME", + "TotalTime", "Time_taken_to_receive_all_events", "MERGE_PHASE_TIME", "SHUFFLE_PHASE_TIME", "TimeTaken_For_Real_Task", "FIRST_EVENT_RECEIVED", "LAST_EVENT_RECEIVED", "SHUFFLE_BYTES_DISK_DIRECT" }; @@ -59,15 +66,15 @@ public class ShuffleTimeAnalyzer implements Analyzer { private final Configuration config; - private final float shuffleTimeRatio; + private final float realWorkDoneRatio; private final long minShuffleRecords; public ShuffleTimeAnalyzer(Configuration config) { this.config = config; - shuffleTimeRatio = config.getFloat - (SHUFFLE_TIME_RATIO, SHUFFLE_TIME_RATIO_DEFAULT); + realWorkDoneRatio = config.getFloat + (REAL_WORK_DONE_RATIO, REAL_WORK_DONE_RATIO_DEFAULT); minShuffleRecords = config.getLong(MIN_SHUFFLE_RECORDS, MIN_SHUFFLE_RECORDS_DEFAULT); } @@ -105,15 +112,20 @@ public class ShuffleTimeAnalyzer implements Analyzer { result.add(counterGroupName); //Real work done in the task - long timeTakenForRealWork = attemptInfo.getTimeTaken() - - Long.parseLong(getCounterValue(TaskCounter.MERGE_PHASE_TIME, counterGroupName, - attemptInfo)); - String comments = ""; - if ((timeTakenForRealWork * 1.0f / attemptInfo.getTimeTaken()) < shuffleTimeRatio) { - comments = "Time taken in shuffle is more than the actual work being done in task. " - + " Check if source/destination machine is a slow node. Check if merge phase " - + "time is more to understand disk bottlenecks in this node. Check for skew"; + String mergePhaseTime = getCounterValue(TaskCounter.MERGE_PHASE_TIME, + counterGroupName, attemptInfo); + String timeTakenForRealWork = ""; + if (!Strings.isNullOrEmpty(mergePhaseTime)) { + long realWorkDone = attemptInfo.getTimeTaken() - Long.parseLong(mergePhaseTime); + + if ((realWorkDone * 1.0f / attemptInfo.getTimeTaken()) < realWorkDoneRatio) { + comments = "Time taken in shuffle is more than the actual work being done in task. " + + " Check if source/destination machine is a slow node. Check if merge phase " + + "time is more to understand disk bottlenecks in this node. Check for skew"; + } + + timeTakenForRealWork = Long.toString(realWorkDone); } result.add(comments); @@ -122,13 +134,14 @@ public class ShuffleTimeAnalyzer implements Analyzer { result.add("" + (1.0f * reduceInputGroupsVal / reduceInputRecordsVal)); result.add(getCounterValue(TaskCounter.SHUFFLE_BYTES, counterGroupName, attemptInfo)); + result.add(Long.toString(attemptInfo.getTimeTaken())); + //Total time taken for receiving all events from source tasks result.add(getOverheadFromSourceTasks(counterGroupName, attemptInfo)); result.add(getCounterValue(TaskCounter.MERGE_PHASE_TIME, counterGroupName, attemptInfo)); result.add(getCounterValue(TaskCounter.SHUFFLE_PHASE_TIME, counterGroupName, attemptInfo)); - - result.add(Long.toString(timeTakenForRealWork)); + result.add(timeTakenForRealWork); result.add(getCounterValue(TaskCounter.FIRST_EVENT_RECEIVED, counterGroupName, attemptInfo)); result.add(getCounterValue(TaskCounter.LAST_EVENT_RECEIVED, counterGroupName, attemptInfo)); @@ -150,11 +163,16 @@ public class ShuffleTimeAnalyzer implements Analyzer { * @return String */ private String getOverheadFromSourceTasks(String counterGroupName, TaskAttemptInfo attemptInfo) { - long firstEventReceived = Long.parseLong(getCounterValue(TaskCounter.FIRST_EVENT_RECEIVED, - counterGroupName, attemptInfo)); - long lastEventReceived = Long.parseLong(getCounterValue(TaskCounter.LAST_EVENT_RECEIVED, - counterGroupName, attemptInfo)); - return Long.toString(lastEventReceived - firstEventReceived); + String firstEventReceived = getCounterValue(TaskCounter.FIRST_EVENT_RECEIVED, + counterGroupName, attemptInfo); + String lastEventReceived = getCounterValue(TaskCounter.LAST_EVENT_RECEIVED, + counterGroupName, attemptInfo); + + if (!Strings.isNullOrEmpty(firstEventReceived) && !Strings.isNullOrEmpty(lastEventReceived)) { + return Long.toString(Long.parseLong(lastEventReceived) - Long.parseLong(firstEventReceived)); + } else { + return ""; + } } private String getCounterValue(TaskCounter counter, String counterGroupName, http://git-wip-us.apache.org/repos/asf/tez/blob/ecd90dc1/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SkewAnalyzer.java ---------------------------------------------------------------------- diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SkewAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SkewAnalyzer.java index 8152344..f09380d 100644 --- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SkewAnalyzer.java +++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SkewAnalyzer.java @@ -57,6 +57,10 @@ import java.util.Map; */ public class SkewAnalyzer implements Analyzer { + /** + * Amount of bytes that was sent as shuffle bytes from source. If it is below this threshold, + * it would not be considered for analysis. + */ private static final String SHUFFLE_BYTES_PER_ATTEMPT_PER_SOURCE = "tez.skew-analyzer.shuffle" + ".bytes.per.source"; private static final long SHUFFLE_BYTES_PER_ATTEMPT_PER_SOURCE_DEFAULT = 900 * 1024 * 1024l; http://git-wip-us.apache.org/repos/asf/tez/blob/ecd90dc1/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowTaskIdentifier.java ---------------------------------------------------------------------- diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowTaskIdentifier.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowTaskIdentifier.java index 7c7f5c0..1a8d9d3 100644 --- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowTaskIdentifier.java +++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowTaskIdentifier.java @@ -41,7 +41,7 @@ import java.util.List; public class SlowTaskIdentifier implements Analyzer { private static final String[] headers = { "vertexName", "taskAttemptId", - "Node", "taskDuration", "Status", + "Node", "taskDuration", "Status", "diagnostics", "NoOfInputs" }; private final CSVResult csvResult; @@ -72,14 +72,21 @@ public class SlowTaskIdentifier implements Analyzer { } }); - int limit = config.getInt(NO_OF_TASKS, NO_OF_TASKS_DEFAULT); - for(int i=0;i<limit;i++) { + int limit = Math.min(taskAttempts.size(), + Math.max(0, config.getInt(NO_OF_TASKS, NO_OF_TASKS_DEFAULT))); + + if (limit == 0) { + return; + } + + for (int i = 0; i < limit - 1; i++) { List<String> record = Lists.newLinkedList(); record.add(taskAttempts.get(i).getTaskInfo().getVertexInfo().getVertexName()); record.add(taskAttempts.get(i).getTaskAttemptId()); record.add(taskAttempts.get(i).getContainer().getHost()); record.add(taskAttempts.get(i).getTimeTaken() + ""); record.add(taskAttempts.get(i).getStatus()); + record.add(taskAttempts.get(i).getDiagnostics()); record.add(taskAttempts.get(i).getTaskInfo().getVertexInfo().getInputEdges().size() + ""); csvResult.addRecord(record.toArray(new String[record.size()])); http://git-wip-us.apache.org/repos/asf/tez/blob/ecd90dc1/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowestVertexAnalyzer.java ---------------------------------------------------------------------- diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowestVertexAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowestVertexAnalyzer.java index b7fca0b..c8d9695 100644 --- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowestVertexAnalyzer.java +++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowestVertexAnalyzer.java @@ -29,6 +29,7 @@ import org.apache.tez.common.counters.TezCounter; import org.apache.tez.dag.api.TezException; import org.apache.tez.history.parser.datamodel.DagInfo; import org.apache.tez.history.parser.datamodel.TaskAttemptInfo; +import org.apache.tez.history.parser.datamodel.TaskInfo; import org.apache.tez.history.parser.datamodel.VertexInfo; import java.util.List; @@ -41,7 +42,7 @@ public class SlowestVertexAnalyzer implements Analyzer { private static final String[] headers = { "vertexName", "taskAttempts", "totalTime", "shuffleTime", "shuffleTime_Max", "LastEventReceived", "LastEventReceivedFrom", - "TimeTaken_ForRealWork", "75thPercentile", "95thPercentile", "98thPercentile", "Median", + "75thPercentile", "95thPercentile", "98thPercentile", "Median", "observation", "comments" }; private final CSVResult csvResult = new CSVResult(headers); @@ -50,8 +51,27 @@ public class SlowestVertexAnalyzer implements Analyzer { private final MetricRegistry metrics = new MetricRegistry(); private Histogram taskAttemptRuntimeHistorgram; + private final static String MAX_VERTEX_RUNTIME = "tez.slowest-vertex-analyzer.max.vertex.runtime"; + private final static long MAX_VERTEX_RUNTIME_DEFAULT = 100000; + + private final long vertexRuntimeThreshold; + public SlowestVertexAnalyzer(Configuration config) { this.config = config; + this.vertexRuntimeThreshold = Math.max(1, config.getLong(MAX_VERTEX_RUNTIME, + MAX_VERTEX_RUNTIME_DEFAULT)); + + } + + private long getTaskRuntime(VertexInfo vertexInfo) { + TaskInfo firstTaskToStart = vertexInfo.getFirstTaskToStart(); + TaskInfo lastTaskToFinish = vertexInfo.getLastTaskToFinish(); + + DagInfo dagInfo = vertexInfo.getDagInfo(); + long totalTime = ((lastTaskToFinish == null) ? + dagInfo.getFinishTime() : lastTaskToFinish.getFinishTime()) - + ((firstTaskToStart == null) ? dagInfo.getStartTime() : firstTaskToStart.getStartTime()); + return totalTime; } @Override @@ -59,9 +79,13 @@ public class SlowestVertexAnalyzer implements Analyzer { for (VertexInfo vertexInfo : dagInfo.getVertices()) { String vertexName = vertexInfo.getVertexName(); - long totalTime = vertexInfo.getTimeTaken(); + if (vertexInfo.getFirstTaskToStart() == null || vertexInfo.getLastTaskToFinish() == null) { + continue; + } + + long totalTime = getTaskRuntime(vertexInfo); - long max = Long.MIN_VALUE; + long slowestLastEventTime = Long.MIN_VALUE; String maxSourceName = ""; taskAttemptRuntimeHistorgram = metrics.histogram(vertexName); @@ -81,10 +105,8 @@ public class SlowestVertexAnalyzer implements Analyzer { continue; } //Find the slowest last event received - if (entry.getValue().getValue() > max) { - //w.r.t vertex start time. - max =(attemptInfo.getStartTimeInterval() + entry.getValue().getValue()) - - (vertexInfo.getStartTimeInterval()); + if (entry.getValue().getValue() > slowestLastEventTime) { + slowestLastEventTime = entry.getValue().getValue(); maxSourceName = entry.getKey(); } } @@ -104,9 +126,7 @@ public class SlowestVertexAnalyzer implements Analyzer { } //Find the slowest last event received if (entry.getValue().getValue() > shuffleMax) { - //w.r.t vertex start time. - shuffleMax =(attemptInfo.getStartTimeInterval() + entry.getValue().getValue()) - - (vertexInfo.getStartTimeInterval()); + shuffleMax = entry.getValue().getValue(); shuffleMaxSource = entry.getKey(); } } @@ -120,9 +140,10 @@ public class SlowestVertexAnalyzer implements Analyzer { record.add(totalTime + ""); record.add(Math.max(0, shuffleMax) + ""); record.add(shuffleMaxSource); - record.add(Math.max(0, max) + ""); + record.add(Math.max(0, slowestLastEventTime) + ""); record.add(maxSourceName); - record.add(Math.max(0,(totalTime - max)) + ""); + //Finding out real_work done at vertex level might be meaningless (as it is quite posisble + // that it went to starvation). StringBuilder sb = new StringBuilder(); double percentile75 = taskAttemptRuntimeHistorgram.getSnapshot().get75thPercentile(); @@ -145,7 +166,7 @@ public class SlowestVertexAnalyzer implements Analyzer { if (totalTime > 0 && vertexInfo.getTaskAttempts().size() > 0) { if ((shuffleMax * 1.0f / totalTime) > 0.5) { - if ((max * 1.0f / totalTime) > 0.5) { + if ((slowestLastEventTime * 1.0f / totalTime) > 0.5) { comments = "This vertex is slow due to its dependency on parent. Got a lot delayed last" + " event received"; } else { @@ -153,8 +174,9 @@ public class SlowestVertexAnalyzer implements Analyzer { "Spending too much time on shuffle. Check shuffle bytes from previous vertex"; } } else { - if (totalTime > 10000) { //greater than 10 seconds. //TODO: Configure it later. - comments = "Concentrate on this vertex (totalTime > 10 seconds)"; + if (totalTime > vertexRuntimeThreshold) { //greater than X seconds. + comments = "Concentrate on this vertex (totalTime > " + vertexRuntimeThreshold + + " seconds)"; } } } http://git-wip-us.apache.org/repos/asf/tez/blob/ecd90dc1/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SpillAnalyzerImpl.java ---------------------------------------------------------------------- diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SpillAnalyzerImpl.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SpillAnalyzerImpl.java index c650104..83b1bb0 100644 --- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SpillAnalyzerImpl.java +++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SpillAnalyzerImpl.java @@ -49,12 +49,21 @@ public class SpillAnalyzerImpl implements Analyzer { private final CSVResult csvResult; - private static long OUTPUT_BYTES_THRESHOLD = 1 * 1024 * 1024 * 1024l; + /** + * Minimum output bytes that should be chunrned out by a task + */ + private static final String OUTPUT_BYTES_THRESHOLD = "tez.spill-analyzer.min.output.bytes" + + ".threshold"; + private static long OUTPUT_BYTES_THRESHOLD_DEFAULT = 1 * 1024 * 1024 * 1024l; + + private final long minOutputBytesPerTask; private final Configuration config; public SpillAnalyzerImpl(Configuration config) { this.config = config; + minOutputBytesPerTask = Math.max(0, config.getLong(OUTPUT_BYTES_THRESHOLD, + OUTPUT_BYTES_THRESHOLD_DEFAULT)); this.csvResult = new CSVResult(headers); } @@ -83,7 +92,7 @@ public class SpillAnalyzerImpl implements Analyzer { long outputRecords = outputRecordsMap.get(source).getValue(); long spilledRecords = spilledRecordsMap.get(source).getValue(); - if (spillCount > 1 && outBytes > OUTPUT_BYTES_THRESHOLD) { + if (spillCount > 1 && outBytes > minOutputBytesPerTask) { List<String> recorList = Lists.newLinkedList(); recorList.add(vertexName); recorList.add(attemptInfo.getTaskAttemptId()); @@ -95,7 +104,7 @@ public class SpillAnalyzerImpl implements Analyzer { recorList.add(outputRecords + ""); recorList.add(spilledRecords + ""); recorList.add("Consider increasing " + TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB - + ", try increasing container size."); + + ". Try increasing container size."); csvResult.addRecord(recorList.toArray(new String[recorList.size()])); } http://git-wip-us.apache.org/repos/asf/tez/blob/ecd90dc1/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TaskConcurrencyAnalyzer.java ---------------------------------------------------------------------- diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TaskConcurrencyAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TaskConcurrencyAnalyzer.java new file mode 100644 index 0000000..c07ff83 --- /dev/null +++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TaskConcurrencyAnalyzer.java @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.analyzer.plugins; + +import com.google.common.collect.Lists; +import com.google.common.collect.TreeMultiset; +import org.apache.hadoop.conf.Configuration; +import org.apache.tez.analyzer.Analyzer; +import org.apache.tez.analyzer.CSVResult; +import org.apache.tez.dag.api.TezException; +import org.apache.tez.history.parser.datamodel.DagInfo; +import org.apache.tez.history.parser.datamodel.TaskAttemptInfo; +import org.apache.tez.history.parser.datamodel.VertexInfo; + +import java.util.Comparator; +import java.util.List; + +/** + * Analyze concurrent tasks running in every vertex at regular intervals. + */ +public class TaskConcurrencyAnalyzer implements Analyzer { + + private static final String[] headers = { "time", "vertexName", "concurrentTasksRunning" }; + + private final CSVResult csvResult; + private final Configuration config; + + public TaskConcurrencyAnalyzer(Configuration conf) { + this.csvResult = new CSVResult(headers); + this.config = conf; + } + + private enum EventType {START, FINISH} + + static class TimeInfo { + EventType eventType; + long timestamp; + int concurrentTasks; + + public TimeInfo(EventType eventType, long timestamp) { + this.eventType = eventType; + this.timestamp = timestamp; + } + } + + @Override + public void analyze(DagInfo dagInfo) throws TezException { + + //For each vertex find the concurrent tasks running at any point + for (VertexInfo vertexInfo : dagInfo.getVertices()) { + List<TaskAttemptInfo> taskAttempts = + Lists.newLinkedList(vertexInfo.getTaskAttempts(true, null)); + + String vertexName = vertexInfo.getVertexName(); + + /** + * - Get sorted multi-set of timestamps (S1, S2,...E1, E2..). Possible to have multiple + * tasks starting/ending at same time. + * - Walk through the set + * - Increment concurrent tasks when start event is encountered + * - Decrement concurrent tasks when start event is encountered + */ + TreeMultiset<TimeInfo> timeInfoSet = TreeMultiset.create(new Comparator<TimeInfo>() { + @Override public int compare(TimeInfo o1, TimeInfo o2) { + return (o1.timestamp < o2.timestamp) ? -1 : + ((o1.timestamp == o2.timestamp) ? 0 : 1); + } + }); + + for (TaskAttemptInfo attemptInfo : taskAttempts) { + TimeInfo startTimeInfo = new TimeInfo(EventType.START, attemptInfo.getStartTime()); + TimeInfo stopTimeInfo = new TimeInfo(EventType.FINISH, attemptInfo.getFinishTime()); + + timeInfoSet.add(startTimeInfo); + timeInfoSet.add(stopTimeInfo); + } + + //Compute concurrent tasks in the list now. + int concurrentTasks = 0; + for(TimeInfo timeInfo : timeInfoSet.elementSet()) { + switch (timeInfo.eventType) { + case START: + concurrentTasks += timeInfoSet.count(timeInfo); + break; + case FINISH: + concurrentTasks -= timeInfoSet.count(timeInfo); + break; + default: + break; + } + timeInfo.concurrentTasks = concurrentTasks; + addToResult(vertexName, timeInfo.timestamp, timeInfo.concurrentTasks); + } + } + } + + private void addToResult(String vertexName, long currentTime, int concurrentTasks) { + String[] record = { currentTime + "", vertexName, concurrentTasks + "" }; + csvResult.addRecord(record); + } + + @Override + public CSVResult getResult() throws TezException { + return csvResult; + } + + @Override + public String getName() { + return "TaskConcurrencyAnalyzer"; + } + + @Override + public String getDescription() { + return "Analyze how many tasks were running in every vertex at given point in time. This " + + "would be helpful in understanding whether any starvation was there or not."; + } + + @Override + public Configuration getConfiguration() { + return config; + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/ecd90dc1/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/SVGUtils.java ---------------------------------------------------------------------- diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/SVGUtils.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/SVGUtils.java new file mode 100644 index 0000000..4a582bb --- /dev/null +++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/SVGUtils.java @@ -0,0 +1,264 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tez.analyzer.utils; + +import org.apache.commons.io.IOUtils; +import org.apache.tez.dag.api.oldrecords.TaskAttemptState; +import org.apache.tez.history.parser.datamodel.DagInfo; +import org.apache.tez.history.parser.datamodel.TaskAttemptInfo; +import org.apache.tez.history.parser.datamodel.TaskInfo; +import org.apache.tez.history.parser.datamodel.VertexInfo; +import org.plutext.jaxb.svg11.Line; +import org.plutext.jaxb.svg11.ObjectFactory; +import org.plutext.jaxb.svg11.Svg; +import org.plutext.jaxb.svg11.Title; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.xml.bind.JAXBContext; +import javax.xml.bind.JAXBElement; +import javax.xml.bind.JAXBException; +import javax.xml.bind.Marshaller; +import javax.xml.namespace.QName; +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.util.Collection; +import java.util.Comparator; +import java.util.TreeSet; + +public class SVGUtils { + + private static final String UTF8 = "UTF-8"; + + private static final Logger LOG = LoggerFactory.getLogger(SVGUtils.class); + + + private final ObjectFactory objectFactory; + private final Svg svg; + private final QName titleName = new QName("title"); + + private static int MAX_DAG_RUNTIME = 0; + private static final int SCREEN_WIDTH = 1800; + + private final DagInfo dagInfo; + + //Gap between various components + private static final int DAG_GAP = 70; + private static final int VERTEX_GAP = 50; + private static final int TASK_GAP = 5; + private static final int STROKE_WIDTH = 5; + + //To compute the size of the graph. + private long MIN_X = Long.MAX_VALUE; + private long MAX_X = Long.MIN_VALUE; + + private int x1 = 0; + private int y1 = 0; + private int y2 = 0; + + public SVGUtils(DagInfo dagInfo) { + this.dagInfo = dagInfo; + this.objectFactory = new ObjectFactory(); + this.svg = objectFactory.createSvg(); + } + + private Line createLine(int x1, int y1, int x2, int y2) { + Line line = objectFactory.createLine(); + line.setX1(scaleDown(x1) + ""); + line.setY1(y1 + ""); + line.setX2(scaleDown(x2) + ""); + line.setY2(y2 + ""); + return line; + } + + private Title createTitle(String msg) { + Title t = objectFactory.createTitle(); + t.setContent(msg); + return t; + } + + private Title createTitleForVertex(VertexInfo vertex) { + String titleStr = vertex.getVertexName() + ":" + + (vertex.getFinishTimeInterval()) + + " ms, RelativeTimeToDAG:" + + (vertex.getInitTime() - this.dagInfo.getStartTime()) + + " ms, counters:" + vertex.getTezCounters(); + Title title = createTitle(titleStr); + return title; + } + + private Title createTitleForTaskAttempt(TaskAttemptInfo taskAttemptInfo) { + String titleStr = "RelativeTimeToVertex:" + + (taskAttemptInfo.getStartTime() - + taskAttemptInfo.getTaskInfo().getVertexInfo().getInitTime()) + + " ms, " + taskAttemptInfo.toString() + ", counters:" + taskAttemptInfo.getTezCounters(); + Title title = createTitle(titleStr); + return title; + } + + /** + * Draw DAG from dagInfo + * + * @param dagInfo + */ + private void drawDAG(DagInfo dagInfo) { + Title title = createTitle(dagInfo.getDagId() + " : " + dagInfo.getTimeTaken() + " ms"); + int duration = (int) dagInfo.getFinishTimeInterval(); + MAX_DAG_RUNTIME = duration; + MIN_X = Math.min(dagInfo.getStartTimeInterval(), MIN_X); + MAX_X = Math.max(dagInfo.getFinishTimeInterval(), MAX_X); + Line line = createLine(x1, y1, x1 + duration, y2); + line.getSVGDescriptionClass().add(new JAXBElement<Title>(titleName, Title.class, title)); + line.setStyle("stroke: black; stroke-width:20"); + line.setOpacity("0.3"); + svg.getSVGDescriptionClassOrSVGAnimationClassOrSVGStructureClass().add(line); + drawVertex(); + } + + private Collection<VertexInfo> getSortedVertices() { + Collection<VertexInfo> vertices = this.dagInfo.getVertices(); + // Add corresponding vertex details + TreeSet<VertexInfo> vertexSet = new TreeSet<VertexInfo>( + new Comparator<VertexInfo>() { + @Override + public int compare(VertexInfo o1, VertexInfo o2) { + return (int) (o1.getFirstTaskStartTimeInterval() - o2.getFirstTaskStartTimeInterval()); + } + }); + vertexSet.addAll(vertices); + return vertexSet; + } + + private Collection<TaskInfo> getSortedTasks(VertexInfo vertexInfo) { + Collection<TaskInfo> tasks = vertexInfo.getTasks(); + // Add corresponding task details + TreeSet<TaskInfo> taskSet = new TreeSet<TaskInfo>(new Comparator<TaskInfo>() { + @Override + public int compare(TaskInfo o1, TaskInfo o2) { + return (int) (o1.getSuccessfulTaskAttempt().getStartTimeInterval() + - o2.getSuccessfulTaskAttempt().getStartTimeInterval()); + } + }); + taskSet.addAll(tasks); + return taskSet; + } + + /** + * Draw the vertices + * + */ + public void drawVertex() { + Collection<VertexInfo> vertices = getSortedVertices(); + for (VertexInfo vertex : vertices) { + //Set vertex start time as the one when its first task attempt started executing + x1 = (int) vertex.getStartTimeInterval(); + y1 += VERTEX_GAP; + int duration = ((int) (vertex.getTimeTaken())); + Line line = createLine(x1, y1, x1 + duration, y1); + line.setStyle("stroke: red; stroke-width:" + STROKE_WIDTH); + line.setOpacity("0.3"); + + Title vertexTitle = createTitleForVertex(vertex); + line.getSVGDescriptionClass().add( + new JAXBElement<Title>(titleName, Title.class, vertexTitle)); + svg.getSVGDescriptionClassOrSVGAnimationClassOrSVGStructureClass().add(line); + // For each vertex, draw the tasks + drawTask(vertex); + } + x1 = x1 + (int) dagInfo.getFinishTimeInterval(); + y1 = y1 + DAG_GAP; + y2 = y1; + } + + /** + * Draw tasks + * + * @param vertex + */ + public void drawTask(VertexInfo vertex) { + Collection<TaskInfo> tasks = getSortedTasks(vertex); + for (TaskInfo task : tasks) { + for (TaskAttemptInfo taskAttemptInfo : task.getTaskAttempts()) { + x1 = (int) taskAttemptInfo.getStartTimeInterval(); + y1 += TASK_GAP; + int duration = (int) taskAttemptInfo.getTimeTaken(); + Line line = createLine(x1, y1, x1 + duration, y1); + String color = + taskAttemptInfo.getStatus().equalsIgnoreCase(TaskAttemptState.SUCCEEDED.name()) + ? "green" : "red"; + line.setStyle("stroke: " + color + "; stroke-width:" + STROKE_WIDTH); + Title title = createTitleForTaskAttempt(taskAttemptInfo); + line.getSVGDescriptionClass().add( + new JAXBElement<Title>(titleName, Title.class, title)); + svg.getSVGDescriptionClassOrSVGAnimationClassOrSVGStructureClass() + .add(line); + } + } + } + + /** + * Convert DAG to graph + * + * @throws java.io.IOException + * @throws javax.xml.bind.JAXBException + */ + public void saveAsSVG(String fileName) throws IOException, JAXBException { + drawDAG(dagInfo); + svg.setHeight("" + y2); + svg.setWidth("" + (MAX_X - MIN_X)); + String tempFileName = System.nanoTime() + ".svg"; + File file = new File(tempFileName); + JAXBContext jaxbContext = JAXBContext.newInstance(Svg.class); + Marshaller jaxbMarshaller = jaxbContext.createMarshaller(); + jaxbMarshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, true); + jaxbMarshaller.marshal(svg, file); + //TODO: dirty workaround to get rid of XMLRootException issue + BufferedReader reader = new BufferedReader( + new InputStreamReader(new FileInputStream(file), UTF8)); + BufferedWriter writer = new BufferedWriter( + new OutputStreamWriter(new FileOutputStream(fileName), UTF8)); + try { + while (reader.ready()) { + String line = reader.readLine(); + if (line != null) { + line = line.replaceAll( + " xmlns:ns3=\"http://www.w3.org/2000/svg\" xmlns=\"\"", ""); + writer.write(line); + writer.newLine(); + } + } + } finally { + IOUtils.closeQuietly(reader); + IOUtils.closeQuietly(writer); + if (file.exists()) { + boolean deleted = file.delete(); + LOG.debug("Deleted {}" + file.getAbsolutePath()); + } + } + } + + private float scaleDown(int len) { + return (len * 1.0f / MAX_DAG_RUNTIME) * SCREEN_WIDTH; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tez/blob/ecd90dc1/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/Utils.java ---------------------------------------------------------------------- diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/Utils.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/Utils.java new file mode 100644 index 0000000..8bcf265 --- /dev/null +++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/Utils.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.analyzer.utils; + +import com.sun.istack.Nullable; +import org.apache.tez.dag.utils.Graph; +import org.apache.tez.history.parser.datamodel.AdditionalInputOutputDetails; +import org.apache.tez.history.parser.datamodel.DagInfo; +import org.apache.tez.history.parser.datamodel.EdgeInfo; +import org.apache.tez.history.parser.datamodel.VertexInfo; + +import java.io.IOException; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class Utils { + + private static Pattern sanitizeLabelPattern = Pattern.compile("[:\\-\\W]+"); + + public static String getShortClassName(String className) { + int pos = className.lastIndexOf("."); + if (pos != -1 && pos < className.length() - 1) { + return className.substring(pos + 1); + } + return className; + } + + public static String sanitizeLabelForViz(String label) { + Matcher m = sanitizeLabelPattern.matcher(label); + return m.replaceAll("_"); + } + + public static void generateDAGVizFile(DagInfo dagInfo, String fileName, + @Nullable List<String> criticalVertices) throws IOException { + Graph graph = new Graph(sanitizeLabelForViz(dagInfo.getName())); + + for (VertexInfo v : dagInfo.getVertices()) { + String nodeLabel = sanitizeLabelForViz(v.getVertexName()) + + "[" + getShortClassName(v.getProcessorClassName() + + ", tasks=" + v.getTasks().size() + ", time=" + v.getTimeTaken() +" ms]"); + Graph.Node n = graph.newNode(sanitizeLabelForViz(v.getVertexName()), nodeLabel); + + boolean criticalVertex = (criticalVertices != null) ? criticalVertices.contains(v + .getVertexName()) : false; + if (criticalVertex) { + n.setColor("red"); + } + + + for (AdditionalInputOutputDetails input : v.getAdditionalInputInfoList()) { + Graph.Node inputNode = graph.getNode(sanitizeLabelForViz(v.getVertexName()) + + "_" + sanitizeLabelForViz(input.getName())); + inputNode.setLabel(sanitizeLabelForViz(v.getVertexName()) + + "[" + sanitizeLabelForViz(input.getName()) + "]"); + inputNode.setShape("box"); + inputNode.addEdge(n, "Input name=" + input.getName() + + " [inputClass=" + getShortClassName(input.getClazz()) + + ", initializer=" + getShortClassName(input.getInitializer()) + "]"); + } + for (AdditionalInputOutputDetails output : v.getAdditionalOutputInfoList()) { + Graph.Node outputNode = graph.getNode(sanitizeLabelForViz(v.getVertexName()) + + "_" + sanitizeLabelForViz(output.getName())); + outputNode.setLabel(sanitizeLabelForViz(v.getVertexName()) + + "[" + sanitizeLabelForViz(output.getName()) + "]"); + outputNode.setShape("box"); + n.addEdge(outputNode, "Output name=" + output.getName() + + " [outputClass=" + getShortClassName(output.getClazz()) + + ", committer=" + getShortClassName(output.getInitializer()) + "]"); + } + + } + + for (EdgeInfo e : dagInfo.getEdges()) { + Graph.Node n = graph.getNode(sanitizeLabelForViz(e.getInputVertexName())); + n.addEdge(graph.getNode(sanitizeLabelForViz(e.getOutputVertexName())), + "[input=" + getShortClassName(e.getEdgeSourceClass()) + + ", output=" + getShortClassName(e.getEdgeDestinationClass()) + + ", dataMovement=" + e.getDataMovementType().trim() + "]"); + } + + graph.save(fileName); + } +}
