http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/LocalityAnalyzer.java ---------------------------------------------------------------------- diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/LocalityAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/LocalityAnalyzer.java new file mode 100644 index 0000000..ec72df1 --- /dev/null +++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/LocalityAnalyzer.java @@ -0,0 +1,204 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.analyzer.plugins; + +import com.google.common.collect.Lists; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.ToolRunner; +import org.apache.tez.analyzer.Analyzer; +import org.apache.tez.analyzer.CSVResult; +import org.apache.tez.common.counters.DAGCounter; +import org.apache.tez.common.counters.FileSystemCounter; +import org.apache.tez.common.counters.TezCounter; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.TezException; +import org.apache.tez.history.parser.datamodel.DagInfo; +import org.apache.tez.history.parser.datamodel.TaskAttemptInfo; +import org.apache.tez.history.parser.datamodel.VertexInfo; + +import java.util.List; +import java.util.Map; + + +/** + * Get locality information for tasks for vertices and get their task execution times. + * This would be helpeful to co-relate if the vertex runtime is anyways related to the data + * locality. + */ +public class LocalityAnalyzer extends TezAnalyzerBase implements Analyzer { + + private final String[] headers = { "vertexName", "numTasks", "dataLocalRatio", "rackLocalRatio", + "otherRatio", "avgDataLocalTaskRuntime", "avgRackLocalTaskRuntime", + "avgOtherLocalTaskRuntime", "noOfInputs", "avgHDFSBytesRead_DataLocal", + "avgHDFSBytesRead_RackLocal", "avgHDFSBytesRead_Others", "recommendation" }; + + private static final String DATA_LOCAL_RATIO = "tez.locality-analyzer.data.local.ratio"; + private static final float DATA_LOCAL_RATIO_DEFAULT = 0.5f; + + private final Configuration config; + + private final CSVResult csvResult; + + public LocalityAnalyzer(Configuration config) { + this.config = config; + csvResult = new CSVResult(headers); + } + + @Override + public void analyze(DagInfo dagInfo) throws TezException { + for (VertexInfo vertexInfo : dagInfo.getVertices()) { + String vertexName = vertexInfo.getVertexName(); + + Map<String, TezCounter> dataLocalTask = vertexInfo.getCounter(DAGCounter.class.getName(), + DAGCounter.DATA_LOCAL_TASKS.toString()); + Map<String, TezCounter> rackLocalTask = vertexInfo.getCounter(DAGCounter.class.getName(), + DAGCounter.RACK_LOCAL_TASKS.toString()); + + long dataLocalTasks = 0; + long rackLocalTasks = 0; + + if (!dataLocalTask.isEmpty()) { + dataLocalTasks = dataLocalTask.get(DAGCounter.class.getName()).getValue(); + } + + if (!rackLocalTask.isEmpty()) { + rackLocalTasks = rackLocalTask.get(DAGCounter.class.getName()).getValue(); + } + + long totalVertexTasks = vertexInfo.getNumTasks(); + + if (dataLocalTasks > 0 || rackLocalTasks > 0) { + //compute locality details. + float dataLocalRatio = dataLocalTasks * 1.0f / totalVertexTasks; + float rackLocalRatio = rackLocalTasks * 1.0f / totalVertexTasks; + float othersRatio = (totalVertexTasks - (dataLocalTasks + rackLocalTasks)) * 1.0f / + totalVertexTasks; + + List<String> record = Lists.newLinkedList(); + record.add(vertexName); + record.add(totalVertexTasks + ""); + record.add(dataLocalRatio + ""); + record.add(rackLocalRatio + ""); + record.add(othersRatio + ""); + + TaskAttemptDetails dataLocalResult = computeAverages(vertexInfo, + DAGCounter.DATA_LOCAL_TASKS); + TaskAttemptDetails rackLocalResult = computeAverages(vertexInfo, + DAGCounter.RACK_LOCAL_TASKS); + TaskAttemptDetails otherTaskResult = computeAverages(vertexInfo, + DAGCounter.OTHER_LOCAL_TASKS); + + record.add(dataLocalResult.avgRuntime + ""); + record.add(rackLocalResult.avgRuntime + ""); + record.add(otherTaskResult.avgRuntime + ""); + + //Get the number of inputs to this vertex + record.add(vertexInfo.getInputEdges().size() + + vertexInfo.getAdditionalInputInfoList().size() + ""); + + //Get the avg HDFS bytes read in this vertex for different type of locality + record.add(dataLocalResult.avgHDFSBytesRead + ""); + record.add(rackLocalResult.avgHDFSBytesRead + ""); + record.add(otherTaskResult.avgHDFSBytesRead + ""); + + String recommendation = ""; + if (dataLocalRatio < config.getFloat(DATA_LOCAL_RATIO, DATA_LOCAL_RATIO_DEFAULT)) { + recommendation = "Data locality is poor for this vertex. Try tuning " + + TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS + ", " + + TezConfiguration.TEZ_AM_CONTAINER_REUSE_RACK_FALLBACK_ENABLED + ", " + + TezConfiguration.TEZ_AM_CONTAINER_REUSE_NON_LOCAL_FALLBACK_ENABLED; + } + + record.add(recommendation); + csvResult.addRecord(record.toArray(new String[record.size()])); + } + } + } + + /** + * Compute counter averages for specific vertex + * + * @param vertexInfo + * @param counter + * @return task attempt details + */ + private TaskAttemptDetails computeAverages(VertexInfo vertexInfo, DAGCounter counter) { + long totalTime = 0; + long totalTasks = 0; + long totalHDFSBytesRead = 0; + + TaskAttemptDetails result = new TaskAttemptDetails(); + + for(TaskAttemptInfo attemptInfo : vertexInfo.getTaskAttempts()) { + Map<String, TezCounter> localityCounter = attemptInfo.getCounter(DAGCounter.class.getName(), + counter.toString()); + + if (!localityCounter.isEmpty() && + localityCounter.get(DAGCounter.class.getName()).getValue() > 0) { + totalTime += attemptInfo.getTimeTaken(); + totalTasks++; + + //get HDFSBytes read counter + Map<String, TezCounter> hdfsBytesReadCounter = attemptInfo.getCounter(FileSystemCounter + .class.getName(), FileSystemCounter.HDFS_BYTES_READ.name()); + for(Map.Entry<String, TezCounter> entry : hdfsBytesReadCounter.entrySet()) { + totalHDFSBytesRead += entry.getValue().getValue(); + } + } + } + if (totalTasks > 0) { + result.avgRuntime = (totalTime * 1.0f / totalTasks); + result.avgHDFSBytesRead = (totalHDFSBytesRead * 1.0f / totalTasks); + } + return result; + } + + @Override public CSVResult getResult() throws TezException { + return csvResult; + } + + @Override public String getName() { + return "Locality Analyzer"; + } + + @Override public String getDescription() { + return "Analyze for locality information (data local, rack local, off-rack)"; + } + + @Override public Configuration getConfiguration() { + return config; + } + + /** + * Placeholder for task attempt details + */ + static class TaskAttemptDetails { + float avgHDFSBytesRead; + float avgRuntime; + } + + public static void main(String[] args) throws Exception { + Configuration config = new Configuration(); + LocalityAnalyzer analyzer = new LocalityAnalyzer(config); + int res = ToolRunner.run(config, analyzer, args); + analyzer.printResults(); + System.exit(res); + } +}
http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/ShuffleTimeAnalyzer.java ---------------------------------------------------------------------- diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/ShuffleTimeAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/ShuffleTimeAnalyzer.java new file mode 100644 index 0000000..57e91c6 --- /dev/null +++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/ShuffleTimeAnalyzer.java @@ -0,0 +1,223 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.analyzer.plugins; + +import com.google.common.base.Strings; +import com.google.common.collect.Lists; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.ToolRunner; +import org.apache.tez.analyzer.Analyzer; +import org.apache.tez.analyzer.CSVResult; +import org.apache.tez.common.counters.TaskCounter; +import org.apache.tez.common.counters.TezCounter; +import org.apache.tez.dag.api.TezException; +import org.apache.tez.history.parser.datamodel.DagInfo; +import org.apache.tez.history.parser.datamodel.TaskAttemptInfo; +import org.apache.tez.history.parser.datamodel.VertexInfo; + +import java.util.List; +import java.util.Map; + + +/** + * Analyze the time taken by merge phase, shuffle phase, time taken to do realistic work etc in + * tasks. + * + * Just dump REDUCE_INPUT_GROUPS, REDUCE_INPUT_RECORDS, its ratio and SHUFFLE_BYTES for tasks + * grouped by vertices. Provide time taken as well. Just render it as a table for now. + * + */ +public class ShuffleTimeAnalyzer extends TezAnalyzerBase implements Analyzer { + + /** + * ratio of (total time taken by task - shuffle time) / (total time taken by task) + */ + private static final String REAL_WORK_DONE_RATIO = "tez.shuffle-time-analyzer.real-work.done.ratio"; + private static final float REAL_WORK_DONE_RATIO_DEFAULT = 0.5f; + + /** + * Number of min records that needs to get in as reduce input records. + */ + private static final String MIN_SHUFFLE_RECORDS = "tez.shuffle-time-analyzer.shuffle.min.records"; + private static final long MIN_SHUFFLE_RECORDS_DEFAULT = 10000; + + private static final String[] headers = { "vertexName", "taskAttemptId", "Node", "counterGroup", + "Comments", "REDUCE_INPUT_GROUPS", "REDUCE_INPUT_RECORDS", "ratio", "SHUFFLE_BYTES", + "TotalTime", "Time_taken_to_receive_all_events", "MERGE_PHASE_TIME", "SHUFFLE_PHASE_TIME", + "TimeTaken_For_Real_Task", "FIRST_EVENT_RECEIVED", "LAST_EVENT_RECEIVED", + "SHUFFLE_BYTES_DISK_DIRECT" }; + + private final CSVResult csvResult = new CSVResult(headers); + + private final Configuration config; + + private final float realWorkDoneRatio; + private final long minShuffleRecords; + + + public ShuffleTimeAnalyzer(Configuration config) { + this.config = config; + + realWorkDoneRatio = config.getFloat + (REAL_WORK_DONE_RATIO, REAL_WORK_DONE_RATIO_DEFAULT); + minShuffleRecords = config.getLong(MIN_SHUFFLE_RECORDS, MIN_SHUFFLE_RECORDS_DEFAULT); + } + + @Override + public void analyze(DagInfo dagInfo) throws TezException { + + for (VertexInfo vertexInfo : dagInfo.getVertices()) { + for (TaskAttemptInfo attemptInfo : vertexInfo.getTaskAttempts()) { + //counter_group (basically source) --> counter + Map<String, TezCounter> reduceInputGroups = attemptInfo.getCounter(TaskCounter + .REDUCE_INPUT_GROUPS.toString()); + Map<String, TezCounter> reduceInputRecords = attemptInfo.getCounter(TaskCounter + .REDUCE_INPUT_RECORDS.toString()); + + if (reduceInputGroups == null) { + continue; + } + + for (Map.Entry<String, TezCounter> entry : reduceInputGroups.entrySet()) { + String counterGroupName = entry.getKey(); + long reduceInputGroupsVal = entry.getValue().getValue(); + long reduceInputRecordsVal = (reduceInputRecords.get(counterGroupName) != null) ? + reduceInputRecords.get(counterGroupName).getValue() : 0; + + if (reduceInputRecordsVal <= 0) { + continue; + } + float ratio = (reduceInputGroupsVal * 1.0f / reduceInputRecordsVal); + + if (ratio > 0 && reduceInputRecordsVal > minShuffleRecords) { + List<String> result = Lists.newLinkedList(); + result.add(vertexInfo.getVertexName()); + result.add(attemptInfo.getTaskAttemptId()); + result.add(attemptInfo.getNodeId()); + result.add(counterGroupName); + + //Real work done in the task + String comments = ""; + String mergePhaseTime = getCounterValue(TaskCounter.MERGE_PHASE_TIME, + counterGroupName, attemptInfo); + String timeTakenForRealWork = ""; + if (!Strings.isNullOrEmpty(mergePhaseTime)) { + long realWorkDone = attemptInfo.getTimeTaken() - Long.parseLong(mergePhaseTime); + + if ((realWorkDone * 1.0f / attemptInfo.getTimeTaken()) < realWorkDoneRatio) { + comments = "Time taken in shuffle is more than the actual work being done in task. " + + " Check if source/destination machine is a slow node. Check if merge phase " + + "time is more to understand disk bottlenecks in this node. Check for skew"; + } + + timeTakenForRealWork = Long.toString(realWorkDone); + } + result.add(comments); + + result.add(reduceInputGroupsVal + ""); + result.add(reduceInputRecordsVal + ""); + result.add("" + (1.0f * reduceInputGroupsVal / reduceInputRecordsVal)); + result.add(getCounterValue(TaskCounter.SHUFFLE_BYTES, counterGroupName, attemptInfo)); + + result.add(Long.toString(attemptInfo.getTimeTaken())); + + //Total time taken for receiving all events from source tasks + result.add(getOverheadFromSourceTasks(counterGroupName, attemptInfo)); + result.add(getCounterValue(TaskCounter.MERGE_PHASE_TIME, counterGroupName, attemptInfo)); + result.add(getCounterValue(TaskCounter.SHUFFLE_PHASE_TIME, counterGroupName, attemptInfo)); + + result.add(timeTakenForRealWork); + + result.add(getCounterValue(TaskCounter.FIRST_EVENT_RECEIVED, counterGroupName, attemptInfo)); + result.add(getCounterValue(TaskCounter.LAST_EVENT_RECEIVED, counterGroupName, attemptInfo)); + result.add(getCounterValue(TaskCounter.SHUFFLE_BYTES_DISK_DIRECT, counterGroupName, attemptInfo)); + + csvResult.addRecord(result.toArray(new String[result.size()])); + } + } + } + } + + } + + /** + * Time taken to receive all events from source tasks + * + * @param counterGroupName + * @param attemptInfo + * @return String + */ + private String getOverheadFromSourceTasks(String counterGroupName, TaskAttemptInfo attemptInfo) { + String firstEventReceived = getCounterValue(TaskCounter.FIRST_EVENT_RECEIVED, + counterGroupName, attemptInfo); + String lastEventReceived = getCounterValue(TaskCounter.LAST_EVENT_RECEIVED, + counterGroupName, attemptInfo); + + if (!Strings.isNullOrEmpty(firstEventReceived) && !Strings.isNullOrEmpty(lastEventReceived)) { + return Long.toString(Long.parseLong(lastEventReceived) - Long.parseLong(firstEventReceived)); + } else { + return ""; + } + } + + private String getCounterValue(TaskCounter counter, String counterGroupName, + TaskAttemptInfo attemptInfo) { + Map<String, TezCounter> tezCounterMap = attemptInfo.getCounter(counter.toString()); + if (tezCounterMap != null) { + for (Map.Entry<String, TezCounter> entry : tezCounterMap.entrySet()) { + String groupName = entry.getKey(); + long val = entry.getValue().getValue(); + if (groupName.equals(counterGroupName)) { + return Long.toString(val); + } + } + } + return ""; + } + + @Override + public CSVResult getResult() throws TezException { + return csvResult; + } + + @Override + public String getName() { + return "Shuffle time analyzer"; + } + + @Override + public String getDescription() { + return "Analyze the time taken for shuffle, merge " + + "and the real work done in the task"; + } + + @Override + public Configuration getConfiguration() { + return config; + } + + public static void main(String[] args) throws Exception { + Configuration config = new Configuration(); + ShuffleTimeAnalyzer analyzer = new ShuffleTimeAnalyzer(config); + int res = ToolRunner.run(config, analyzer, args); + analyzer.printResults(); + System.exit(res); + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SkewAnalyzer.java ---------------------------------------------------------------------- diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SkewAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SkewAnalyzer.java new file mode 100644 index 0000000..067d871 --- /dev/null +++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SkewAnalyzer.java @@ -0,0 +1,323 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.analyzer.plugins; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.ToolRunner; +import org.apache.tez.analyzer.Analyzer; +import org.apache.tez.analyzer.CSVResult; +import org.apache.tez.common.counters.TaskCounter; +import org.apache.tez.common.counters.TezCounter; +import org.apache.tez.dag.api.TezException; +import org.apache.tez.history.parser.datamodel.DagInfo; +import org.apache.tez.history.parser.datamodel.TaskAttemptInfo; +import org.apache.tez.history.parser.datamodel.VertexInfo; + +import java.util.List; +import java.util.Map; + + +/** + * <p/> + * Identify the skew (RECORD_INPUT_GROUPS / REDUCE_INPUT_RECORDS) ratio for all task attempts + * and report if they are below a certain threshold. + * <p/> + * <p/> + * - Case 1: Ratio of (reduce_input_groups / reduce_input_records) < 0.2 && SHUFFLE_BYTES > 1 GB + * per task attempt from a source. This means couple of keys having too many records. Either + * partitioning is wrong, or we need to increase memory limit for this vertex. + * <p/> + * - Case 2: Ratio of (reduce_input_groups / reduce_input_records) > 0.6 & Number of reduce input + * records in task attempt is closer to say 60% of overall number of records + * in vertex level & numTasks in vertex is greater than 1. This might have any number of reducer + * groups. This means that, partitioning is wrong (can also consider reducing number of tasks + * for that vertex). In some cases, too many reducers are launched and this can help find those. + * <p/> + * - Case 3: Ratio of (reduce_input_groups / reduce_input_records) is between 0.2 & 0.6 per task + * attempt & numTasks is greater than 1 & SHUFFLE_BYTES > 1 GB per task attempt from a + * source. This means, may be consider increasing parallelism based on the task attempt runtime. + * <p/> + */ +public class SkewAnalyzer extends TezAnalyzerBase implements Analyzer { + + /** + * Amount of bytes that was sent as shuffle bytes from source. If it is below this threshold, + * it would not be considered for analysis. + */ + private static final String SHUFFLE_BYTES_PER_ATTEMPT_PER_SOURCE = "tez.skew-analyzer.shuffle" + + ".bytes.per.source"; + private static final long SHUFFLE_BYTES_PER_ATTEMPT_PER_SOURCE_DEFAULT = 900 * 1024 * 1024l; + + //Min reducer input group : reducer keys ratio for computation + private static final String ATTEMPT_SHUFFLE_KEY_GROUP_MIN_RATIO = "tez.skew-analyzer.shuffle.key" + + ".group.min.ratio"; + private static final float ATTEMPT_SHUFFLE_KEY_GROUP_MIN_RATIO_DEFAULT = 0.2f; + + //Max reducer input group : reducer keys ratio for computation + private static final String ATTEMPT_SHUFFLE_KEY_GROUP_MAX_RATIO = "tez.skew-analyzer.shuffle.key" + + ".group.max.ratio"; + private static final float ATTEMPT_SHUFFLE_KEY_GROUP_MAX_RATIO_DEFAULT = 0.4f; + + + + private static final String[] headers = { "vertexName", "taskAttemptId", "counterGroup", "node", + "REDUCE_INPUT_GROUPS", "REDUCE_INPUT_RECORDS", "ratio", "SHUFFLE_BYTES", "timeTaken", + "observation" }; + + private final CSVResult csvResult = new CSVResult(headers); + + private final Configuration config; + + private final float minRatio; + private final float maxRatio; + private final long maxShuffleBytesPerSource; + + public SkewAnalyzer(Configuration config) { + this.config = config; + maxRatio = config.getFloat(ATTEMPT_SHUFFLE_KEY_GROUP_MAX_RATIO, + ATTEMPT_SHUFFLE_KEY_GROUP_MAX_RATIO_DEFAULT); + minRatio = config.getFloat(ATTEMPT_SHUFFLE_KEY_GROUP_MIN_RATIO, + ATTEMPT_SHUFFLE_KEY_GROUP_MIN_RATIO_DEFAULT); + maxShuffleBytesPerSource = config.getLong(SHUFFLE_BYTES_PER_ATTEMPT_PER_SOURCE, + SHUFFLE_BYTES_PER_ATTEMPT_PER_SOURCE_DEFAULT); + } + + @Override + public void analyze(DagInfo dagInfo) throws TezException { + Preconditions.checkArgument(dagInfo != null, "DAG can't be null"); + analyzeReducers(dagInfo); + } + + private void analyzeReducers(DagInfo dagInfo) { + for (VertexInfo vertexInfo : dagInfo.getVertices()) { + for (TaskAttemptInfo attemptInfo : vertexInfo.getTaskAttempts()) { + analyzeGroupSkewPerSource(attemptInfo); + analyzeRecordSkewPerSource(attemptInfo); + analyzeForParallelism(attemptInfo); + } + } + } + + /** + * Analyze scenario where couple keys are having too many records per source + * + * @param attemptInfo + */ + private void analyzeGroupSkewPerSource(TaskAttemptInfo attemptInfo) { + + //counter_group (basically source) --> counter + Map<String, TezCounter> reduceInputGroups = attemptInfo.getCounter(TaskCounter + .REDUCE_INPUT_GROUPS.toString()); + Map<String, TezCounter> reduceInputRecords = attemptInfo.getCounter(TaskCounter + .REDUCE_INPUT_RECORDS.toString()); + Map<String, TezCounter> shuffleBytes = attemptInfo.getCounter(TaskCounter.SHUFFLE_BYTES.toString()); + + + //tez counter for every source + for (Map.Entry<String, TezCounter> entry : reduceInputGroups.entrySet()) { + if (entry.getKey().equals(TaskCounter.class.getName())) { + //TODO: Tez counters always ends up adding fgroups and groups, due to which we end up + // getting TaskCounter details as well. + continue; + } + + String counterGroup = entry.getKey(); + long inputGroupsCount = entry.getValue().getValue(); + long inputRecordsCount = (reduceInputRecords.get(counterGroup) != null) ? reduceInputRecords + .get(counterGroup).getValue() : 0; + long shuffleBytesPerSource = (shuffleBytes.get(counterGroup) != null) ? shuffleBytes.get + (counterGroup).getValue() : 0; + + float ratio = (inputGroupsCount * 1.0f / inputRecordsCount); + + //Case 1: Couple of keys having too many records per source. + if (shuffleBytesPerSource > maxShuffleBytesPerSource) { + if (ratio < minRatio) { + List<String> result = Lists.newLinkedList(); + result.add(attemptInfo.getTaskInfo().getVertexInfo().getVertexName()); + result.add(attemptInfo.getTaskAttemptId()); + result.add(counterGroup); + result.add(attemptInfo.getNodeId()); + result.add(inputGroupsCount + ""); + result.add(inputRecordsCount + ""); + result.add(ratio + ""); + result.add(shuffleBytesPerSource + ""); + result.add(attemptInfo.getTimeTaken() + ""); + result.add("Please check partitioning. Otherwise consider increasing memLimit"); + + csvResult.addRecord(result.toArray(new String[result.size()])); + } + } + } + } + + /** + * Analyze scenario where one task is getting > 60% of the vertex level records + * + * @param attemptInfo + */ + private void analyzeRecordSkewPerSource(TaskAttemptInfo attemptInfo) { + + Map<String, TezCounter> vertexLevelReduceInputRecords = + attemptInfo.getTaskInfo().getVertexInfo() + .getCounter(TaskCounter.REDUCE_INPUT_RECORDS.toString()); + + int vertexNumTasks = attemptInfo.getTaskInfo().getVertexInfo().getNumTasks(); + + //counter_group (basically source) --> counter + Map<String, TezCounter> reduceInputGroups = attemptInfo.getCounter(TaskCounter + .REDUCE_INPUT_GROUPS.toString()); + Map<String, TezCounter> reduceInputRecords = attemptInfo.getCounter(TaskCounter + .REDUCE_INPUT_RECORDS.toString()); + Map<String, TezCounter> shuffleBytes = attemptInfo.getCounter(TaskCounter.SHUFFLE_BYTES.toString()); + + + //tez counter for every source + for (Map.Entry<String, TezCounter> entry : reduceInputGroups.entrySet()) { + if (entry.getKey().equals(TaskCounter.class.getName())) { + //TODO: Tez counters always ends up adding fgroups and groups, due to which we end up + // getting TaskCounter details as well. + continue; + } + + String counterGroup = entry.getKey(); + long inputGroupsCount = entry.getValue().getValue(); + long inputRecordsCount = (reduceInputRecords.get(counterGroup) != null) ? reduceInputRecords + .get(counterGroup).getValue() : 0; + long shuffleBytesPerSource = (shuffleBytes.get(counterGroup) != null) ?shuffleBytes.get + (counterGroup).getValue() : 0; + long vertexLevelInputRecordsCount = (vertexLevelReduceInputRecords.get(counterGroup) != + null) ? + vertexLevelReduceInputRecords.get(counterGroup).getValue() : 0; + + float ratio = (inputRecordsCount * 1.0f / vertexLevelInputRecordsCount); + + if (vertexNumTasks > 1) { + if (ratio > maxRatio) { + //input records > 60% of vertex level record count + if (inputRecordsCount > (vertexLevelInputRecordsCount * 0.60)) { + List<String> result = Lists.newLinkedList(); + result.add(attemptInfo.getTaskInfo().getVertexInfo().getVertexName()); + result.add(attemptInfo.getTaskAttemptId()); + result.add(counterGroup); + result.add(attemptInfo.getNodeId()); + result.add(inputGroupsCount + ""); + result.add(inputRecordsCount + ""); + result.add(ratio + ""); + result.add(shuffleBytesPerSource + ""); + result.add(attemptInfo.getTimeTaken() + ""); + result.add("Some task attempts are getting > 60% of reduce input records. " + + "Consider adjusting parallelism & check partition logic"); + + csvResult.addRecord(result.toArray(new String[result.size()])); + + } + } + } + } + } + + /** + * Analyze scenario where a vertex would need to increase parallelism + * + * @param attemptInfo + */ + private void analyzeForParallelism(TaskAttemptInfo attemptInfo) { + + //counter_group (basically source) --> counter + Map<String, TezCounter> reduceInputGroups = attemptInfo.getCounter(TaskCounter + .REDUCE_INPUT_GROUPS.toString()); + Map<String, TezCounter> reduceInputRecords = attemptInfo.getCounter(TaskCounter + .REDUCE_INPUT_RECORDS.toString()); + Map<String, TezCounter> shuffleBytes = attemptInfo.getCounter(TaskCounter.SHUFFLE_BYTES.toString()); + + //tez counter for every source + for (Map.Entry<String, TezCounter> entry : reduceInputGroups.entrySet()) { + if (entry.getKey().equals(TaskCounter.class.getName())) { + //TODO: Tez counters always ends up adding fgroups and groups, due to which we end up + // getting TaskCounter details as well. + continue; + } + + String counterGroup = entry.getKey(); + long inputGroupsCount = entry.getValue().getValue(); + long inputRecordsCount = (reduceInputRecords.get(counterGroup) != null) ? reduceInputRecords + .get(counterGroup).getValue() : 0; + long shuffleBytesPerSource = (shuffleBytes.get(counterGroup) != null) ? shuffleBytes.get + (counterGroup).getValue() : 0; + + float ratio = (inputGroupsCount * 1.0f / inputRecordsCount); + + //Case 3: Shuffle_Bytes > 1 GB. Ratio between 0.2 & < 0.6. Consider increasing + // parallelism based on task runtime. + if (shuffleBytesPerSource > SHUFFLE_BYTES_PER_ATTEMPT_PER_SOURCE_DEFAULT) { + if (ratio > minRatio && ratio < maxRatio) { + //couple of keys have too many records. Classic case of partition issue. + List<String> result = Lists.newLinkedList(); + result.add(attemptInfo.getTaskInfo().getVertexInfo().getVertexName()); + result.add(attemptInfo.getTaskAttemptId()); + result.add(counterGroup); + result.add(attemptInfo.getNodeId()); + result.add(inputGroupsCount + ""); + result.add(inputRecordsCount + ""); + result.add(ratio + ""); + result.add(shuffleBytesPerSource + ""); + result.add(attemptInfo.getTimeTaken() + ""); + result.add("Consider increasing parallelism."); + + csvResult.addRecord(result.toArray(new String[result.size()])); + } + } + } + + + } + + + @Override + public CSVResult getResult() throws TezException { + return csvResult; + } + + @Override + public String getName() { + return "Skew Analyzer"; + } + + @Override + public String getDescription() { + return "Analyzer reducer skews by mining reducer task counters"; + } + + @Override + public Configuration getConfiguration() { + return null; + } + + public static void main(String[] args) throws Exception { + Configuration config = new Configuration(); + SkewAnalyzer analyzer = new SkewAnalyzer(config); + int res = ToolRunner.run(config, analyzer, args); + analyzer.printResults(); + System.exit(res); + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowNodeAnalyzer.java ---------------------------------------------------------------------- diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowNodeAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowNodeAnalyzer.java new file mode 100644 index 0000000..a810a8a --- /dev/null +++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowNodeAnalyzer.java @@ -0,0 +1,197 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.analyzer.plugins; + +import com.google.common.base.Predicate; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Multimap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.ToolRunner; +import org.apache.tez.analyzer.Analyzer; +import org.apache.tez.analyzer.CSVResult; +import org.apache.tez.common.counters.FileSystemCounter; +import org.apache.tez.common.counters.TaskCounter; +import org.apache.tez.common.counters.TezCounter; +import org.apache.tez.common.counters.TezCounters; +import org.apache.tez.dag.api.TezException; +import org.apache.tez.dag.api.oldrecords.TaskAttemptState; +import org.apache.tez.history.parser.datamodel.DagInfo; +import org.apache.tez.history.parser.datamodel.TaskAttemptInfo; + +import java.util.Collection; +import java.util.List; + + +/** + * This will provide the set of nodes participated in the DAG in descending order of task execution + * time. + * <p/> + * Combine it with other counters to understand slow nodes better. + */ +public class SlowNodeAnalyzer extends TezAnalyzerBase implements Analyzer { + + private static final Log LOG = LogFactory.getLog(SlowNodeAnalyzer.class); + + private static final String[] headers = { "nodeName", "noOfTasksExecuted", "noOfKilledTasks", + "noOfFailedTasks", "avgSucceededTaskExecutionTime", "avgKilledTaskExecutionTime", + "avgFailedTaskExecutionTime", "avgHDFSBytesRead", "avgHDFSBytesWritten", + "avgFileBytesRead", "avgFileBytesWritten", "avgGCTimeMillis", "avgCPUTimeMillis" }; + + private final CSVResult csvResult = new CSVResult(headers); + + private final Configuration config; + + public SlowNodeAnalyzer(Configuration config) { + this.config = config; + } + + @Override + public void analyze(DagInfo dagInfo) throws TezException { + Multimap<String, TaskAttemptInfo> nodeDetails = dagInfo.getNodeDetails(); + for (String nodeName : nodeDetails.keySet()) { + List<String> record = Lists.newLinkedList(); + + Collection<TaskAttemptInfo> taskAttemptInfos = nodeDetails.get(nodeName); + + record.add(nodeName); + record.add(taskAttemptInfos.size() + ""); + record.add(getNumberOfTasks(taskAttemptInfos, TaskAttemptState.KILLED) + ""); + record.add(getNumberOfTasks(taskAttemptInfos, TaskAttemptState.FAILED) + ""); + + Iterable<TaskAttemptInfo> succeedTasks = getFilteredTaskAttempts(taskAttemptInfos, + TaskAttemptState.SUCCEEDED); + record.add(getAvgTaskExecutionTime(succeedTasks) + ""); + + Iterable<TaskAttemptInfo> killedTasks = getFilteredTaskAttempts(taskAttemptInfos, + TaskAttemptState.KILLED); + record.add(getAvgTaskExecutionTime(killedTasks) + ""); + + Iterable<TaskAttemptInfo> failedTasks = getFilteredTaskAttempts(taskAttemptInfos, + TaskAttemptState.FAILED); + record.add(getAvgTaskExecutionTime(failedTasks) + ""); + + record.add(getAvgCounter(taskAttemptInfos, FileSystemCounter.class + .getName(), FileSystemCounter.HDFS_BYTES_READ.name()) + ""); + record.add(getAvgCounter(taskAttemptInfos, FileSystemCounter.class + .getName(), FileSystemCounter.HDFS_BYTES_WRITTEN.name()) + ""); + record.add(getAvgCounter(taskAttemptInfos, FileSystemCounter.class + .getName(), FileSystemCounter.FILE_BYTES_READ.name()) + ""); + record.add(getAvgCounter(taskAttemptInfos, FileSystemCounter.class + .getName(), FileSystemCounter.FILE_BYTES_WRITTEN.name()) + ""); + record.add(getAvgCounter(taskAttemptInfos, TaskCounter.class + .getName(), TaskCounter.GC_TIME_MILLIS.name()) + ""); + record.add(getAvgCounter(taskAttemptInfos, TaskCounter.class + .getName(), TaskCounter.CPU_MILLISECONDS.name()) + ""); + + csvResult.addRecord(record.toArray(new String[record.size()])); + } + } + + private Iterable<TaskAttemptInfo> getFilteredTaskAttempts(Collection<TaskAttemptInfo> + taskAttemptInfos, final TaskAttemptState status) { + return Iterables.filter(taskAttemptInfos, new + Predicate<TaskAttemptInfo>() { + @Override public boolean apply(TaskAttemptInfo input) { + return input.getStatus().equalsIgnoreCase(status.toString()); + } + }); + } + + private float getAvgTaskExecutionTime(Iterable<TaskAttemptInfo> taskAttemptInfos) { + long totalTime = 0; + int size = 0; + for (TaskAttemptInfo attemptInfo : taskAttemptInfos) { + totalTime += attemptInfo.getTimeTaken(); + size++; + } + return (size > 0) ? (totalTime * 1.0f / size) : 0; + } + + private int getNumberOfTasks(Collection<TaskAttemptInfo> taskAttemptInfos, TaskAttemptState + status) { + int tasks = 0; + for (TaskAttemptInfo attemptInfo : taskAttemptInfos) { + if (attemptInfo.getStatus().equalsIgnoreCase(status.toString())) { + tasks++; + } + } + return tasks; + } + + private float getAvgCounter(Collection<TaskAttemptInfo> taskAttemptInfos, String + counterGroupName, String counterName) { + long total = 0; + int taskCount = 0; + for (TaskAttemptInfo attemptInfo : taskAttemptInfos) { + TezCounters tezCounters = attemptInfo.getTezCounters(); + TezCounter counter = tezCounters.findCounter(counterGroupName, counterName); + if (counter != null) { + total += counter.getValue(); + taskCount++; + } else { + LOG.info("Could not find counterGroupName=" + counterGroupName + ", counter=" + + counterName + " in " + attemptInfo); + } + } + return (taskCount > 0) ? (total * 1.0f / taskCount) : 0; + } + + @Override + public CSVResult getResult() throws TezException { + return csvResult; + } + + @Override + public String getName() { + return "Slow Node Analyzer"; + } + + @Override + public String getDescription() { + StringBuilder sb = new StringBuilder(); + sb.append("Analyze node details for the DAG.").append("\n"); + sb.append("This could be used to find out the set of nodes where the tasks are taking more " + + "time on average.").append("\n"); + sb.append("This could be used to find out the set of nodes where the tasks are taking more " + + "time on average and to understand whether too many tasks got scheduled on a node.") + .append("\n"); + sb.append("One needs to combine the task execution time with other metrics like bytes " + + "read/written etc to get better idea of bad nodes. In order to understand the slow " + + "nodes due to network, it might be worthwhile to consider the shuffle performance " + + "analyzer tool in tez-tools").append("\n"); + return sb.toString(); + } + + @Override + public Configuration getConfiguration() { + return config; + } + + public static void main(String[] args) throws Exception { + Configuration config = new Configuration(); + SlowNodeAnalyzer analyzer = new SlowNodeAnalyzer(config); + int res = ToolRunner.run(config, analyzer, args); + analyzer.printResults(); + System.exit(res); + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowTaskIdentifier.java ---------------------------------------------------------------------- diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowTaskIdentifier.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowTaskIdentifier.java new file mode 100644 index 0000000..d2474ad --- /dev/null +++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowTaskIdentifier.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.analyzer.plugins; + +import com.google.common.collect.Lists; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.ToolRunner; +import org.apache.tez.analyzer.Analyzer; +import org.apache.tez.analyzer.CSVResult; +import org.apache.tez.dag.api.TezException; +import org.apache.tez.history.parser.datamodel.DagInfo; +import org.apache.tez.history.parser.datamodel.TaskAttemptInfo; +import org.apache.tez.history.parser.datamodel.VertexInfo; + +import java.util.Collections; +import java.util.Comparator; +import java.util.List; + + +/** + * Analyze slow tasks in the DAG. Top 100 tasks are listed by default. + * + * <p/> + * //TODO: We do not get counters for killed task attempts yet. + */ +public class SlowTaskIdentifier extends TezAnalyzerBase implements Analyzer { + + private static final String[] headers = { "vertexName", "taskAttemptId", + "Node", "taskDuration", "Status", "diagnostics", + "NoOfInputs" }; + + private final CSVResult csvResult; + + private static final String NO_OF_TASKS = "tez.slow-task-analyzer.task.count"; + private static final int NO_OF_TASKS_DEFAULT = 100; + + private final Configuration config; + + public SlowTaskIdentifier(Configuration config) { + this.config = config; + this.csvResult = new CSVResult(headers); + } + + @Override + public void analyze(DagInfo dagInfo) throws TezException { + List<TaskAttemptInfo> taskAttempts = Lists.newArrayList(); + for(VertexInfo vertexInfo : dagInfo.getVertices()) { + taskAttempts.addAll(vertexInfo.getTaskAttempts()); + } + + //sort them by runtime in descending order + Collections.sort(taskAttempts, new Comparator<TaskAttemptInfo>() { + @Override public int compare(TaskAttemptInfo o1, TaskAttemptInfo o2) { + return (o1.getTimeTaken() > o2.getTimeTaken()) ? -1 : + ((o1.getTimeTaken() == o2.getTimeTaken()) ? + 0 : 1); + } + }); + + int limit = Math.min(taskAttempts.size(), + Math.max(0, config.getInt(NO_OF_TASKS, NO_OF_TASKS_DEFAULT))); + + if (limit == 0) { + return; + } + + for (int i = 0; i < limit - 1; i++) { + List<String> record = Lists.newLinkedList(); + record.add(taskAttempts.get(i).getTaskInfo().getVertexInfo().getVertexName()); + record.add(taskAttempts.get(i).getTaskAttemptId()); + record.add(taskAttempts.get(i).getContainer().getHost()); + record.add(taskAttempts.get(i).getTimeTaken() + ""); + record.add(taskAttempts.get(i).getStatus()); + record.add(taskAttempts.get(i).getDiagnostics()); + record.add(taskAttempts.get(i).getTaskInfo().getVertexInfo().getInputEdges().size() + ""); + + csvResult.addRecord(record.toArray(new String[record.size()])); + } + + } + + @Override + public CSVResult getResult() throws TezException { + return csvResult; + } + + @Override + public String getName() { + return "Slow Task Identifier"; + } + + @Override + public String getDescription() { + return "Identifies slow tasks in the DAG"; + } + + @Override + public Configuration getConfiguration() { + return config; + } + + public static void main(String[] args) throws Exception { + Configuration config = new Configuration(); + SlowTaskIdentifier analyzer = new SlowTaskIdentifier(config); + int res = ToolRunner.run(config, analyzer, args); + analyzer.printResults(); + System.exit(res); + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowestVertexAnalyzer.java ---------------------------------------------------------------------- diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowestVertexAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowestVertexAnalyzer.java new file mode 100644 index 0000000..33f2421 --- /dev/null +++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowestVertexAnalyzer.java @@ -0,0 +1,219 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.analyzer.plugins; + +import com.codahale.metrics.Histogram; +import com.codahale.metrics.MetricRegistry; +import com.google.common.collect.Lists; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.ToolRunner; +import org.apache.tez.analyzer.Analyzer; +import org.apache.tez.analyzer.CSVResult; +import org.apache.tez.common.counters.TaskCounter; +import org.apache.tez.common.counters.TezCounter; +import org.apache.tez.dag.api.TezException; +import org.apache.tez.history.parser.datamodel.DagInfo; +import org.apache.tez.history.parser.datamodel.TaskAttemptInfo; +import org.apache.tez.history.parser.datamodel.TaskInfo; +import org.apache.tez.history.parser.datamodel.VertexInfo; + +import java.util.List; +import java.util.Map; + +/** + * Identify the slowest vertex in the DAG. + */ +public class SlowestVertexAnalyzer extends TezAnalyzerBase implements Analyzer { + + private static final String[] headers = { "vertexName", "taskAttempts", "totalTime", + "shuffleTime", "shuffleTime_Max", "LastEventReceived", "LastEventReceivedFrom", + "75thPercentile", "95thPercentile", "98thPercentile", "Median", + "observation", "comments" }; + + private final CSVResult csvResult = new CSVResult(headers); + + private final Configuration config; + private final MetricRegistry metrics = new MetricRegistry(); + private Histogram taskAttemptRuntimeHistorgram; + + private final static String MAX_VERTEX_RUNTIME = "tez.slowest-vertex-analyzer.max.vertex.runtime"; + private final static long MAX_VERTEX_RUNTIME_DEFAULT = 100000; + + private final long vertexRuntimeThreshold; + + public SlowestVertexAnalyzer(Configuration config) { + this.config = config; + this.vertexRuntimeThreshold = Math.max(1, config.getLong(MAX_VERTEX_RUNTIME, + MAX_VERTEX_RUNTIME_DEFAULT)); + + } + + private long getTaskRuntime(VertexInfo vertexInfo) { + TaskInfo firstTaskToStart = vertexInfo.getFirstTaskToStart(); + TaskInfo lastTaskToFinish = vertexInfo.getLastTaskToFinish(); + + DagInfo dagInfo = vertexInfo.getDagInfo(); + long totalTime = ((lastTaskToFinish == null) ? + dagInfo.getFinishTime() : lastTaskToFinish.getFinishTime()) - + ((firstTaskToStart == null) ? dagInfo.getStartTime() : firstTaskToStart.getStartTime()); + return totalTime; + } + + @Override + public void analyze(DagInfo dagInfo) throws TezException { + + for (VertexInfo vertexInfo : dagInfo.getVertices()) { + String vertexName = vertexInfo.getVertexName(); + if (vertexInfo.getFirstTaskToStart() == null || vertexInfo.getLastTaskToFinish() == null) { + continue; + } + + long totalTime = getTaskRuntime(vertexInfo); + + long slowestLastEventTime = Long.MIN_VALUE; + String maxSourceName = ""; + taskAttemptRuntimeHistorgram = metrics.histogram(vertexName); + + + for (TaskAttemptInfo attemptInfo : vertexInfo.getTaskAttempts()) { + + taskAttemptRuntimeHistorgram.update(attemptInfo.getTimeTaken()); + + //Get the last event received from the incoming vertices + Map<String, TezCounter> lastEventReceivedMap = attemptInfo.getCounter( + TaskCounter.LAST_EVENT_RECEIVED.toString()); + + for (Map.Entry<String, TezCounter> entry : lastEventReceivedMap.entrySet()) { + if (entry.getKey().equals(TaskCounter.class.getName())) { + //TODO: Tez counters always ends up adding fgroups and groups, due to which we end up + // getting TaskCounter details as well. + continue; + } + //Find the slowest last event received + if (entry.getValue().getValue() > slowestLastEventTime) { + slowestLastEventTime = entry.getValue().getValue(); + maxSourceName = entry.getKey(); + } + } + } + + long shuffleMax = Long.MIN_VALUE; + String shuffleMaxSource = ""; + for (TaskAttemptInfo attemptInfo : vertexInfo.getTaskAttempts()) { + //Get the last event received from the incoming vertices + Map<String, TezCounter> lastEventReceivedMap = attemptInfo.getCounter( + TaskCounter.SHUFFLE_PHASE_TIME.toString()); + + for (Map.Entry<String, TezCounter> entry : lastEventReceivedMap.entrySet()) { + if (entry.getKey().equals(TaskCounter.class.getName())) { + //ignore. TODO: hack for taskcounter issue + continue; + } + //Find the slowest last event received + if (entry.getValue().getValue() > shuffleMax) { + shuffleMax = entry.getValue().getValue(); + shuffleMaxSource = entry.getKey(); + } + } + } + + String comments = ""; + + List<String> record = Lists.newLinkedList(); + record.add(vertexName); + record.add(vertexInfo.getTaskAttempts().size() + ""); + record.add(totalTime + ""); + record.add(Math.max(0, shuffleMax) + ""); + record.add(shuffleMaxSource); + record.add(Math.max(0, slowestLastEventTime) + ""); + record.add(maxSourceName); + //Finding out real_work done at vertex level might be meaningless (as it is quite posisble + // that it went to starvation). + + StringBuilder sb = new StringBuilder(); + double percentile75 = taskAttemptRuntimeHistorgram.getSnapshot().get75thPercentile(); + double percentile95 = taskAttemptRuntimeHistorgram.getSnapshot().get95thPercentile(); + double percentile98 = taskAttemptRuntimeHistorgram.getSnapshot().get98thPercentile(); + double percentile99 = taskAttemptRuntimeHistorgram.getSnapshot().get99thPercentile(); + double medianAttemptRuntime = taskAttemptRuntimeHistorgram.getSnapshot().getMedian(); + + record.add("75th=" + percentile75); + record.add("95th=" + percentile95); + record.add("98th=" + percentile98); + record.add("median=" + medianAttemptRuntime); + + if (percentile75 / percentile99 < 0.5) { + //looks like some straggler task is there. + sb.append("Looks like some straggler task is there"); + } + + record.add(sb.toString()); + + if (totalTime > 0 && vertexInfo.getTaskAttempts().size() > 0) { + if ((shuffleMax * 1.0f / totalTime) > 0.5) { + if ((slowestLastEventTime * 1.0f / totalTime) > 0.5) { + comments = "This vertex is slow due to its dependency on parent. Got a lot delayed last" + + " event received"; + } else { + comments = + "Spending too much time on shuffle. Check shuffle bytes from previous vertex"; + } + } else { + if (totalTime > vertexRuntimeThreshold) { //greater than X seconds. + comments = "Concentrate on this vertex (totalTime > " + vertexRuntimeThreshold + + " seconds)"; + } + } + } + + record.add(comments); + csvResult.addRecord(record.toArray(new String[record.size()])); + } + } + + + @Override + public CSVResult getResult() throws TezException { + return csvResult; + } + + @Override + public String getName() { + return "SlowVertexAnalyzer"; + } + + @Override + public String getDescription() { + return "Identify the slowest vertex in the DAG, which needs to be looked into first"; + } + + @Override + public Configuration getConfiguration() { + return config; + } + + public static void main(String[] args) throws Exception { + Configuration config = new Configuration(); + SlowestVertexAnalyzer analyzer = new SlowestVertexAnalyzer(config); + int res = ToolRunner.run(config, analyzer, args); + analyzer.printResults(); + System.exit(res); + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SpillAnalyzerImpl.java ---------------------------------------------------------------------- diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SpillAnalyzerImpl.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SpillAnalyzerImpl.java new file mode 100644 index 0000000..d69ca23 --- /dev/null +++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SpillAnalyzerImpl.java @@ -0,0 +1,145 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.analyzer.plugins; + +import com.google.common.collect.Lists; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.ToolRunner; +import org.apache.tez.analyzer.Analyzer; +import org.apache.tez.analyzer.CSVResult; +import org.apache.tez.common.counters.TaskCounter; +import org.apache.tez.common.counters.TezCounter; +import org.apache.tez.dag.api.TezException; +import org.apache.tez.history.parser.datamodel.DagInfo; +import org.apache.tez.history.parser.datamodel.TaskAttemptInfo; +import org.apache.tez.history.parser.datamodel.VertexInfo; +import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; + +import java.util.List; +import java.util.Map; + + +/** + * Find out tasks which have more than 1 spill (ADDITIONAL_SPILL_COUNT). + * <p/> + * Accompany this with OUTPUT_BYTES (> 1 GB data written) + */ +public class SpillAnalyzerImpl extends TezAnalyzerBase implements Analyzer { + + private static final String[] headers = { "vertexName", "taskAttemptId", + "Node", "counterGroupName", + "spillCount", "taskDuration", + "OUTPUT_BYTES", "OUTPUT_RECORDS", + "SPILLED_RECORDS", "Recommendation" }; + + private final CSVResult csvResult; + + /** + * Minimum output bytes that should be chunrned out by a task + */ + private static final String OUTPUT_BYTES_THRESHOLD = "tez.spill-analyzer.min.output.bytes" + + ".threshold"; + private static long OUTPUT_BYTES_THRESHOLD_DEFAULT = 1 * 1024 * 1024 * 1024l; + + private final long minOutputBytesPerTask; + + private final Configuration config; + + public SpillAnalyzerImpl(Configuration config) { + this.config = config; + minOutputBytesPerTask = Math.max(0, config.getLong(OUTPUT_BYTES_THRESHOLD, + OUTPUT_BYTES_THRESHOLD_DEFAULT)); + this.csvResult = new CSVResult(headers); + } + + @Override + public void analyze(DagInfo dagInfo) throws TezException { + for (VertexInfo vertexInfo : dagInfo.getVertices()) { + String vertexName = vertexInfo.getVertexName(); + + for (TaskAttemptInfo attemptInfo : vertexInfo.getTaskAttempts()) { + //Get ADDITIONAL_SPILL_COUNT, OUTPUT_BYTES for every source + Map<String, TezCounter> spillCountMap = + attemptInfo.getCounter(TaskCounter.ADDITIONAL_SPILL_COUNT.name()); + Map<String, TezCounter> spilledRecordsMap = + attemptInfo.getCounter(TaskCounter.SPILLED_RECORDS.name()); + Map<String, TezCounter> outputRecordsMap = + attemptInfo.getCounter(TaskCounter.OUTPUT_RECORDS.name()); + + Map<String, TezCounter> outputBytesMap = + attemptInfo.getCounter(TaskCounter.OUTPUT_BYTES.name()); + + for (Map.Entry<String, TezCounter> entry : spillCountMap.entrySet()) { + String source = entry.getKey(); + long spillCount = entry.getValue().getValue(); + long outBytes = outputBytesMap.get(source).getValue(); + + long outputRecords = outputRecordsMap.get(source).getValue(); + long spilledRecords = spilledRecordsMap.get(source).getValue(); + + if (spillCount > 1 && outBytes > minOutputBytesPerTask) { + List<String> recorList = Lists.newLinkedList(); + recorList.add(vertexName); + recorList.add(attemptInfo.getTaskAttemptId()); + recorList.add(attemptInfo.getNodeId()); + recorList.add(source); + recorList.add(spillCount + ""); + recorList.add(attemptInfo.getTimeTaken() + ""); + recorList.add(outBytes + ""); + recorList.add(outputRecords + ""); + recorList.add(spilledRecords + ""); + recorList.add("Consider increasing " + TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB + + ". Try increasing container size."); + + csvResult.addRecord(recorList.toArray(new String[recorList.size()])); + } + } + } + } + } + + @Override + public CSVResult getResult() throws TezException { + return csvResult; + } + + @Override + public String getName() { + return "SpillAnalyzer"; + } + + @Override + public String getDescription() { + return "Analyze spill details in the task"; + } + + @Override + public Configuration getConfiguration() { + return config; + } + + public static void main(String[] args) throws Exception { + Configuration config = new Configuration(); + SpillAnalyzerImpl analyzer = new SpillAnalyzerImpl(config); + int res = ToolRunner.run(config, analyzer, args); + analyzer.printResults(); + System.exit(res); + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TaskConcurrencyAnalyzer.java ---------------------------------------------------------------------- diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TaskConcurrencyAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TaskConcurrencyAnalyzer.java new file mode 100644 index 0000000..070294f --- /dev/null +++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TaskConcurrencyAnalyzer.java @@ -0,0 +1,148 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.analyzer.plugins; + +import com.google.common.collect.Lists; +import com.google.common.collect.TreeMultiset; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.ToolRunner; +import org.apache.tez.analyzer.Analyzer; +import org.apache.tez.analyzer.CSVResult; +import org.apache.tez.dag.api.TezException; +import org.apache.tez.history.parser.datamodel.DagInfo; +import org.apache.tez.history.parser.datamodel.TaskAttemptInfo; +import org.apache.tez.history.parser.datamodel.VertexInfo; + +import java.util.Comparator; +import java.util.List; + +/** + * Analyze concurrent tasks running in every vertex at regular intervals. + */ +public class TaskConcurrencyAnalyzer extends TezAnalyzerBase implements Analyzer { + + private static final String[] headers = { "time", "vertexName", "concurrentTasksRunning" }; + + private final CSVResult csvResult; + private final Configuration config; + + public TaskConcurrencyAnalyzer(Configuration conf) { + this.csvResult = new CSVResult(headers); + this.config = conf; + } + + private enum EventType {START, FINISH} + + static class TimeInfo { + EventType eventType; + long timestamp; + int concurrentTasks; + + public TimeInfo(EventType eventType, long timestamp) { + this.eventType = eventType; + this.timestamp = timestamp; + } + } + + @Override + public void analyze(DagInfo dagInfo) throws TezException { + + //For each vertex find the concurrent tasks running at any point + for (VertexInfo vertexInfo : dagInfo.getVertices()) { + List<TaskAttemptInfo> taskAttempts = + Lists.newLinkedList(vertexInfo.getTaskAttempts(true, null)); + + String vertexName = vertexInfo.getVertexName(); + + /** + * - Get sorted multi-set of timestamps (S1, S2,...E1, E2..). Possible to have multiple + * tasks starting/ending at same time. + * - Walk through the set + * - Increment concurrent tasks when start event is encountered + * - Decrement concurrent tasks when start event is encountered + */ + TreeMultiset<TimeInfo> timeInfoSet = TreeMultiset.create(new Comparator<TimeInfo>() { + @Override public int compare(TimeInfo o1, TimeInfo o2) { + return (o1.timestamp < o2.timestamp) ? -1 : + ((o1.timestamp == o2.timestamp) ? 0 : 1); + } + }); + + for (TaskAttemptInfo attemptInfo : taskAttempts) { + TimeInfo startTimeInfo = new TimeInfo(EventType.START, attemptInfo.getStartTime()); + TimeInfo stopTimeInfo = new TimeInfo(EventType.FINISH, attemptInfo.getFinishTime()); + + timeInfoSet.add(startTimeInfo); + timeInfoSet.add(stopTimeInfo); + } + + //Compute concurrent tasks in the list now. + int concurrentTasks = 0; + for(TimeInfo timeInfo : timeInfoSet.elementSet()) { + switch (timeInfo.eventType) { + case START: + concurrentTasks += timeInfoSet.count(timeInfo); + break; + case FINISH: + concurrentTasks -= timeInfoSet.count(timeInfo); + break; + default: + break; + } + timeInfo.concurrentTasks = concurrentTasks; + addToResult(vertexName, timeInfo.timestamp, timeInfo.concurrentTasks); + } + } + } + + private void addToResult(String vertexName, long currentTime, int concurrentTasks) { + String[] record = { currentTime + "", vertexName, concurrentTasks + "" }; + csvResult.addRecord(record); + } + + @Override + public CSVResult getResult() throws TezException { + return csvResult; + } + + @Override + public String getName() { + return "TaskConcurrencyAnalyzer"; + } + + @Override + public String getDescription() { + return "Analyze how many tasks were running in every vertex at given point in time. This " + + "would be helpful in understanding whether any starvation was there or not."; + } + + @Override + public Configuration getConfiguration() { + return config; + } + + public static void main(String[] args) throws Exception { + Configuration config = new Configuration(); + TaskConcurrencyAnalyzer analyzer = new TaskConcurrencyAnalyzer(config); + int res = ToolRunner.run(config, analyzer, args); + analyzer.printResults(); + System.exit(res); + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TezAnalyzerBase.java ---------------------------------------------------------------------- diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TezAnalyzerBase.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TezAnalyzerBase.java new file mode 100644 index 0000000..73e731a --- /dev/null +++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TezAnalyzerBase.java @@ -0,0 +1,213 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.analyzer.plugins; + +import java.io.File; +import java.util.Iterator; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.GnuParser; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.OptionBuilder; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.util.Tool; +import org.apache.tez.analyzer.Analyzer; +import org.apache.tez.analyzer.CSVResult; +import org.apache.tez.analyzer.Result; +import org.apache.tez.dag.api.TezException; +import org.apache.tez.history.ATSImportTool; +import org.apache.tez.history.parser.ATSFileParser; +import org.apache.tez.history.parser.SimpleHistoryParser; +import org.apache.tez.history.parser.datamodel.DagInfo; + +import com.google.common.base.Preconditions; + +public abstract class TezAnalyzerBase extends Configured implements Tool, Analyzer { + + + private static final String EVENT_FILE_NAME = "eventFileName"; + private static final String OUTPUT_DIR = "outputDir"; + private static final String SAVE_RESULTS = "saveResults"; + private static final String DAG_ID = "dagId"; + private static final String FROM_SIMPLE_HISTORY = "fromSimpleHistory"; + private static final String HELP = "help"; + + private static final int SEPARATOR_WIDTH = 80; + private static final int MIN_COL_WIDTH = 12; + + private String outputDir; + private boolean saveResults = false; + + @SuppressWarnings("static-access") + private static Options buildOptions() { + Option dagIdOption = OptionBuilder.withArgName(DAG_ID).withLongOpt(DAG_ID) + .withDescription("DagId that needs to be analyzed").hasArg().isRequired(true).create(); + + Option outputDirOption = OptionBuilder.withArgName(OUTPUT_DIR).withLongOpt(OUTPUT_DIR) + .withDescription("Directory to write outputs to.").hasArg().isRequired(false).create(); + + Option saveResults = OptionBuilder.withArgName(SAVE_RESULTS).withLongOpt(SAVE_RESULTS) + .withDescription("Saves results to output directory (optional)") + .hasArg(false).isRequired(false).create(); + + Option eventFileNameOption = OptionBuilder.withArgName(EVENT_FILE_NAME).withLongOpt + (EVENT_FILE_NAME) + .withDescription("File with event data for the DAG").hasArg() + .isRequired(false).create(); + + Option fromSimpleHistoryOption = OptionBuilder.withArgName(FROM_SIMPLE_HISTORY).withLongOpt + (FROM_SIMPLE_HISTORY) + .withDescription("Event data from Simple History logging. Must also specify event file") + .isRequired(false).create(); + + Option help = OptionBuilder.withArgName(HELP).withLongOpt + (HELP) + .withDescription("print help") + .isRequired(false).create(); + + Options opts = new Options(); + opts.addOption(dagIdOption); + opts.addOption(outputDirOption); + opts.addOption(saveResults); + opts.addOption(eventFileNameOption); + opts.addOption(fromSimpleHistoryOption); + opts.addOption(help); + return opts; + } + + protected String getOutputDir() { + return outputDir; + } + + private void printUsage() { + System.err.println("Analyzer base options are"); + Options options = buildOptions(); + for (Object obj : options.getOptions()) { + Option option = (Option) obj; + System.err.println(option.getArgName() + " : " + option.getDescription()); + } + } + + @Override + public int run(String[] args) throws Exception { + //Parse downloaded contents + CommandLine cmdLine = null; + try { + cmdLine = new GnuParser().parse(buildOptions(), args); + } catch (ParseException e) { + System.err.println("Invalid options on command line"); + printUsage(); + return -1; + } + saveResults = cmdLine.hasOption(SAVE_RESULTS); + + if(cmdLine.hasOption(HELP)) { + printUsage(); + return 0; + } + + outputDir = cmdLine.getOptionValue(OUTPUT_DIR); + if (outputDir == null) { + outputDir = System.getProperty("user.dir"); + } + + File file = null; + if (cmdLine.hasOption(EVENT_FILE_NAME)) { + file = new File(cmdLine.getOptionValue(EVENT_FILE_NAME)); + } + + String dagId = cmdLine.getOptionValue(DAG_ID); + + DagInfo dagInfo = null; + + if (file == null) { + if (cmdLine.hasOption(FROM_SIMPLE_HISTORY)) { + System.err.println("Event file name must be specified when using simple history"); + printUsage(); + return -2; + } + // using ATS - try to download directly + String[] importArgs = { "--dagId=" + dagId, "--downloadDir=" + outputDir }; + + int result = ATSImportTool.process(importArgs); + if (result != 0) { + System.err.println("Error downloading data from ATS"); + return -3; + } + + //Parse ATS data and verify results + //Parse downloaded contents + file = new File(outputDir + + Path.SEPARATOR + dagId + + Path.SEPARATOR + dagId + ".zip"); + } + + Preconditions.checkState(file != null); + if (!cmdLine.hasOption(FROM_SIMPLE_HISTORY)) { + ATSFileParser parser = new ATSFileParser(file); + dagInfo = parser.getDAGData(dagId); + } else { + SimpleHistoryParser parser = new SimpleHistoryParser(file); + dagInfo = parser.getDAGData(dagId); + } + Preconditions.checkState(dagInfo.getDagId().equals(dagId)); + analyze(dagInfo); + Result result = getResult(); + if (saveResults && (result instanceof CSVResult)) { + String fileName = outputDir + File.separator + + this.getClass().getName() + "_" + dagInfo.getDagId() + ".csv"; + ((CSVResult) result).dumpToFile(fileName); + System.out.println("Saved results in " + fileName); + } + return 0; + } + + public void printResults() throws TezException { + Result result = getResult(); + if (result instanceof CSVResult) { + String[] headers = ((CSVResult) result).getHeaders(); + + StringBuilder formatBuilder = new StringBuilder(); + int size = Math.max(MIN_COL_WIDTH, SEPARATOR_WIDTH / headers.length); + for (int i = 0; i < headers.length; i++) { + formatBuilder.append("%-").append(size).append("s "); + } + String format = formatBuilder.toString(); + + StringBuilder separator = new StringBuilder(); + for (int i = 0; i < SEPARATOR_WIDTH; i++) { + separator.append("-"); + } + + System.out.println(separator); + System.out.println(String.format(format.toString(), (String[]) headers)); + System.out.println(separator); + + Iterator<String[]> recordsIterator = ((CSVResult) result).getRecordsIterator(); + while (recordsIterator.hasNext()) { + String line = String.format(format, (String[]) recordsIterator.next()); + System.out.println(line); + } + System.out.println(separator); + } + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/VertexLevelCriticalPathAnalyzer.java ---------------------------------------------------------------------- diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/VertexLevelCriticalPathAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/VertexLevelCriticalPathAnalyzer.java new file mode 100644 index 0000000..06b8983 --- /dev/null +++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/VertexLevelCriticalPathAnalyzer.java @@ -0,0 +1,152 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.analyzer.plugins; + +import com.google.common.base.Functions; +import com.google.common.base.Splitter; +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableSortedMap; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Ordering; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.ToolRunner; +import org.apache.tez.analyzer.Analyzer; +import org.apache.tez.analyzer.CSVResult; +import org.apache.tez.analyzer.utils.Utils; +import org.apache.tez.dag.api.TezException; +import org.apache.tez.history.parser.datamodel.DagInfo; +import org.apache.tez.history.parser.datamodel.VertexInfo; + +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.Map; + +/** + * Identify a set of vertices which fall in the critical path in a DAG. + */ +public class VertexLevelCriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer { + private final Configuration config; + + private static final String[] headers = { "CriticalPath", "Score" }; + + private final CSVResult csvResult; + + private static final String DOT_FILE_DIR = "tez.critical-path.analyzer.dot.output.loc"; + private static final String DOT_FILE_DIR_DEFAULT = "."; //current directory + + private final String dotFileLocation; + + private static final String CONNECTOR = "-->"; + + public VertexLevelCriticalPathAnalyzer(Configuration config) { + this.config = config; + this.csvResult = new CSVResult(headers); + this.dotFileLocation = config.get(DOT_FILE_DIR, DOT_FILE_DIR_DEFAULT); + } + + @Override public void analyze(DagInfo dagInfo) throws TezException { + Map<String, Long> result = Maps.newLinkedHashMap(); + getCriticalPath("", dagInfo.getVertices().get(dagInfo.getVertices().size() - 1), 0, result); + + Map<String, Long> sortedByValues = sortByValues(result); + for (Map.Entry<String, Long> entry : sortedByValues.entrySet()) { + List<String> record = Lists.newLinkedList(); + record.add(entry.getKey()); + record.add(entry.getValue() + ""); + csvResult.addRecord(record.toArray(new String[record.size()])); + } + + String dotFile = dotFileLocation + File.separator + dagInfo.getDagId() + ".dot"; + try { + List<String> criticalVertices = null; + if (!sortedByValues.isEmpty()) { + String criticalPath = sortedByValues.keySet().iterator().next(); + criticalVertices = getVertexNames(criticalPath); + } else { + criticalVertices = Lists.newLinkedList(); + } + Utils.generateDAGVizFile(dagInfo, dotFile, criticalVertices); + } catch (IOException e) { + throw new TezException(e); + } + } + + @Override + public CSVResult getResult() throws TezException { + return csvResult; + } + + @Override + public String getName() { + return "CriticalPathAnalyzer"; + } + + @Override + public String getDescription() { + return "Analyze vertex level critical path of the DAG"; + } + + @Override + public Configuration getConfiguration() { + return config; + } + + private static Map<String, Long> sortByValues(Map<String, Long> result) { + //Sort result by time in reverse order + final Ordering<String> reversValueOrdering = + Ordering.natural().reverse().nullsLast().onResultOf(Functions.forMap(result, null)); + Map<String, Long> orderedMap = ImmutableSortedMap.copyOf(result, reversValueOrdering); + return orderedMap; + } + + private static void getCriticalPath(String predecessor, VertexInfo dest, long time, + Map<String, Long> result) { + String destVertexName = (dest != null) ? (dest.getVertexName()) : ""; + + if (dest != null) { + time += dest.getTimeTaken(); + predecessor += destVertexName + CONNECTOR; + + for (VertexInfo incomingVertex : dest.getInputVertices()) { + getCriticalPath(predecessor, incomingVertex, time, result); + } + + result.put(predecessor, time); + } + } + + private static List<String> getVertexNames(String criticalPath) { + if (Strings.isNullOrEmpty(criticalPath)) { + return Lists.newLinkedList(); + } + return Lists.newLinkedList(Splitter.on(CONNECTOR).trimResults().omitEmptyStrings().split + (criticalPath)); + } + + public static void main(String[] args) throws Exception { + Configuration config = new Configuration(); + VertexLevelCriticalPathAnalyzer analyzer = new VertexLevelCriticalPathAnalyzer(config); + int res = ToolRunner.run(config, analyzer, args); + analyzer.printResults(); + System.exit(res); + } +}
