Repository: tez Updated Branches: refs/heads/master 501a351d5 -> ad7604dd6
TEZ-3547. Add TaskAssignment Analyzer (Dharmesh Kakadia via rbalamohan) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/ad7604dd Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/ad7604dd Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/ad7604dd Branch: refs/heads/master Commit: ad7604dd6e69cb9afed04fbcfd3d75219ce5330c Parents: 501a351 Author: Rajesh Balamohan <[email protected]> Authored: Tue Nov 29 05:04:06 2016 +0530 Committer: Rajesh Balamohan <[email protected]> Committed: Tue Nov 29 05:04:06 2016 +0530 ---------------------------------------------------------------------- .../tez/analyzer/plugins/AnalyzerDriver.java | 2 + .../plugins/TaskAssignmentAnalyzer.java | 103 +++++++++++++++++++ 2 files changed, 105 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/ad7604dd/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/AnalyzerDriver.java ---------------------------------------------------------------------- diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/AnalyzerDriver.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/AnalyzerDriver.java index 57b21cb..2273155 100644 --- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/AnalyzerDriver.java +++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/AnalyzerDriver.java @@ -44,6 +44,8 @@ public class AnalyzerDriver { "Print slow task details in a DAG"); pgd.addClass("SpillAnalyzer", SpillAnalyzerImpl.class, "Print spill details in a DAG"); + pgd.addClass("TaskAssignmentAnalyzer", TaskAssignmentAnalyzer.class, + "Print task-to-node assignment details of a DAG"); pgd.addClass("TaskConcurrencyAnalyzer", TaskConcurrencyAnalyzer.class, "Print the task concurrency details in a DAG"); pgd.addClass("VertexLevelCriticalPathAnalyzer", VertexLevelCriticalPathAnalyzer.class, http://git-wip-us.apache.org/repos/asf/tez/blob/ad7604dd/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TaskAssignmentAnalyzer.java ---------------------------------------------------------------------- diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TaskAssignmentAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TaskAssignmentAnalyzer.java new file mode 100644 index 0000000..ce6fa41 --- /dev/null +++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TaskAssignmentAnalyzer.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.analyzer.plugins; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.ToolRunner; +import org.apache.tez.analyzer.Analyzer; +import org.apache.tez.analyzer.CSVResult; +import org.apache.tez.analyzer.Result; +import org.apache.tez.dag.api.TezException; +import org.apache.tez.history.parser.datamodel.DagInfo; +import org.apache.tez.history.parser.datamodel.TaskAttemptInfo; +import org.apache.tez.history.parser.datamodel.VertexInfo; + +import java.util.HashMap; +import java.util.Map; + +/** + * Get the Task assignments on different nodes of the cluster. + */ +public class TaskAssignmentAnalyzer extends TezAnalyzerBase + implements Analyzer { + private final String[] headers = { "vertex", "node", "numTasks", "load" }; + private final Configuration config; + private final CSVResult csvResult; + + public TaskAssignmentAnalyzer(Configuration config) { + this.config = config; + csvResult = new CSVResult(headers); + } + + @Override + public void analyze(DagInfo dagInfo) throws TezException { + Map<String, Integer> map = new HashMap<>(); + for (VertexInfo vertex : dagInfo.getVertices()) { + map.clear(); + for (TaskAttemptInfo attempt : vertex.getTaskAttempts()) { + Integer previousValue = map.get(attempt.getNodeId()); + map.put(attempt.getNodeId(), + previousValue == null ? 1 : previousValue + 1); + } + double mean = vertex.getTaskAttempts().size() / Math.max(1.0, map.size()); + for (Map.Entry<String, Integer> assignment : map.entrySet()) { + addARecord(vertex.getVertexName(), assignment.getKey(), + assignment.getValue(), assignment.getValue() * 100 / mean); + } + } + } + + private void addARecord(String vertexName, String node, int numTasks, + double load) { + String[] record = new String[4]; + record[0] = vertexName; + record[1] = node; + record[2] = String.valueOf(numTasks); + record[3] = String.format("%.2f", load); + csvResult.addRecord(record); + } + + @Override + public Result getResult() throws TezException { + return csvResult; + } + + @Override + public String getName() { + return "Task Assignment Analyzer"; + } + + @Override + public String getDescription() { + return "Get the Task assignments on different nodes of the cluster"; + } + + @Override + public Configuration getConfiguration() { + return config; + } + + public static void main(String[] args) throws Exception { + Configuration config = new Configuration(); + TaskAssignmentAnalyzer analyzer = new TaskAssignmentAnalyzer(config); + int res = ToolRunner.run(config, analyzer, args); + analyzer.printResults(); + System.exit(res); + } +}
