Repository: tez Updated Branches: refs/heads/master cf400882b -> e10d80fe6
TEZ-2645. Provide standard analyzers for job analysis (rbalamohan) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/e10d80fe Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/e10d80fe Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/e10d80fe Branch: refs/heads/master Commit: e10d80fe6008986900dadc51fc7823582d0dfb1a Parents: cf40088 Author: Rajesh Balamohan <[email protected]> Authored: Wed Jul 29 13:27:51 2015 +0530 Committer: Rajesh Balamohan <[email protected]> Committed: Wed Jul 29 13:27:51 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 1 + pom.xml | 5 + .../tez/common/counters/FileSystemCounter.java | 4 + .../analyzers/job-analyzer/findbugs-exclude.xml | 28 ++ tez-tools/analyzers/job-analyzer/pom.xml | 47 +++ .../java/org/apache/tez/analyzer/Analyzer.java | 64 ++++ .../java/org/apache/tez/analyzer/CSVResult.java | 113 +++++++ .../java/org/apache/tez/analyzer/Result.java | 39 +++ .../plugins/ContainerReuseAnalyzer.java | 87 ++++++ .../analyzer/plugins/CriticalPathAnalyzer.java | 110 +++++++ .../tez/analyzer/plugins/LocalityAnalyzer.java | 193 ++++++++++++ .../analyzer/plugins/ShuffleTimeAnalyzer.java | 195 ++++++++++++ .../tez/analyzer/plugins/SkewAnalyzer.java | 309 +++++++++++++++++++ .../tez/analyzer/plugins/SlowNodeAnalyzer.java | 188 +++++++++++ .../analyzer/plugins/SlowTaskIdentifier.java | 109 +++++++ .../analyzer/plugins/SlowestVertexAnalyzer.java | 188 +++++++++++ .../tez/analyzer/plugins/SpillAnalyzerImpl.java | 126 ++++++++ tez-tools/analyzers/pom.xml | 31 ++ tez-tools/pom.xml | 3 + 19 files changed, 1840 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/e10d80fe/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index f7f2e76..687f996 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -11,6 +11,7 @@ INCOMPATIBLE CHANGES TEZ-2468. Change the minimum Java version to Java 7. ALL CHANGES: + TEZ-2645. Provide standard analyzers for job analysis. TEZ-2627. Support for Tez Job Priorities. TEZ-2623. Fix module dependencies related to hadoop-auth. TEZ-2464. Move older releases to dist archive. http://git-wip-us.apache.org/repos/asf/tez/blob/e10d80fe/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index ddb7324..a0b82e4 100644 --- a/pom.xml +++ b/pom.xml @@ -180,6 +180,11 @@ <version>${pig.version}</version> </dependency> <dependency> + <groupId>io.dropwizard.metrics</groupId> + <artifactId>metrics-core</artifactId> + <version>3.1.0</version> + </dependency> + <dependency> <groupId>org.roaringbitmap</groupId> <artifactId>RoaringBitmap</artifactId> <version>0.4.9</version> http://git-wip-us.apache.org/repos/asf/tez/blob/e10d80fe/tez-api/src/main/java/org/apache/tez/common/counters/FileSystemCounter.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/common/counters/FileSystemCounter.java b/tez-api/src/main/java/org/apache/tez/common/counters/FileSystemCounter.java index 57d1053..73e3581 100644 --- a/tez-api/src/main/java/org/apache/tez/common/counters/FileSystemCounter.java +++ b/tez-api/src/main/java/org/apache/tez/common/counters/FileSystemCounter.java @@ -27,4 +27,8 @@ public enum FileSystemCounter { READ_OPS, LARGE_READ_OPS, WRITE_OPS, + HDFS_BYTES_READ, + HDFS_BYTES_WRITTEN, + FILE_BYTES_READ, + FILE_BYTES_WRITTEN } http://git-wip-us.apache.org/repos/asf/tez/blob/e10d80fe/tez-tools/analyzers/job-analyzer/findbugs-exclude.xml ---------------------------------------------------------------------- diff --git a/tez-tools/analyzers/job-analyzer/findbugs-exclude.xml b/tez-tools/analyzers/job-analyzer/findbugs-exclude.xml new file mode 100644 index 0000000..5bebb05 --- /dev/null +++ b/tez-tools/analyzers/job-analyzer/findbugs-exclude.xml @@ -0,0 +1,28 @@ +<!-- + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. See accompanying LICENSE file. +--> +<FindBugsFilter> + + + <Match> + <Class name="org.apache.tez.analyzer.CSVResult"/> + <Bug pattern="EI_EXPOSE_REP2"/> + </Match> + + <Match> + <Class name="org.apache.tez.analyzer.CSVResult"/> + <Bug pattern="EI_EXPOSE_REP"/> + </Match> + + +</FindBugsFilter> http://git-wip-us.apache.org/repos/asf/tez/blob/e10d80fe/tez-tools/analyzers/job-analyzer/pom.xml ---------------------------------------------------------------------- diff --git a/tez-tools/analyzers/job-analyzer/pom.xml b/tez-tools/analyzers/job-analyzer/pom.xml new file mode 100644 index 0000000..6312a34 --- /dev/null +++ b/tez-tools/analyzers/job-analyzer/pom.xml @@ -0,0 +1,47 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. See accompanying LICENSE file. +--> + +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.tez</groupId> + <artifactId>tez-perf-analyzer</artifactId> + <version>0.8.0-SNAPSHOT</version> + </parent> + <artifactId>job-analyzer</artifactId> + + <dependencies> + <dependency> + <groupId>io.dropwizard.metrics</groupId> + <artifactId>metrics-core</artifactId> + </dependency> + <dependency> + <groupId>org.apache.tez</groupId> + <artifactId>tez-history-parser</artifactId> + <version>${project.version}</version> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.rat</groupId> + <artifactId>apache-rat-plugin</artifactId> + </plugin> + </plugins> + </build> +</project> http://git-wip-us.apache.org/repos/asf/tez/blob/e10d80fe/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/Analyzer.java ---------------------------------------------------------------------- diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/Analyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/Analyzer.java new file mode 100644 index 0000000..6021c58 --- /dev/null +++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/Analyzer.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.analyzer; + +import org.apache.hadoop.conf.Configuration; +import org.apache.tez.dag.api.TezException; +import org.apache.tez.history.parser.datamodel.DagInfo; + + +public interface Analyzer { + + /** + * Analyze Dag + * + * @param dagInfo + * @throws TezException + */ + public void analyze(DagInfo dagInfo) throws TezException; + + /** + * Get the result of analysis + * + * @return analysis result + * @throws TezException + */ + public Result getResult() throws TezException; + + /** + * Get name of the analyzer + * + * @return name of analyze + */ + public String getName(); + + /** + * Get description of the analyzer + * + * @return description of analyzer + */ + public String getDescription(); + + /** + * Get config properties related to this analyzer + * + * @return config related to analyzer + */ + public Configuration getConfiguration(); +} http://git-wip-us.apache.org/repos/asf/tez/blob/e10d80fe/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/CSVResult.java ---------------------------------------------------------------------- diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/CSVResult.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/CSVResult.java new file mode 100644 index 0000000..5e454b4 --- /dev/null +++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/CSVResult.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.analyzer; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; +import org.apache.directory.api.util.Strings; +import org.apache.tez.dag.api.TezException; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileOutputStream; +import java.io.FileWriter; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.nio.charset.Charset; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; + +/** + * Simple placeholder for storing CSV results. + * Contains headers and records in string format. + */ +public class CSVResult implements Result { + + private final String[] header; + private final List<String[]> recordsList; + private String comments; + + public CSVResult(String[] header) { + this.header = header; + recordsList = Lists.newLinkedList(); + } + + public String[] getHeaders() { + return header; + } + + public void addRecord(String[] record) { + Preconditions.checkArgument(record != null, "Record can't be null"); + Preconditions.checkArgument(record.length == header.length, "Record length" + record.length + + " does not match header length " + header.length); + recordsList.add(record); + } + + public Iterator<String[]> getRecordsIterator() { + return Iterators.unmodifiableIterator(recordsList.iterator()); + } + + + public void setComments(String comments) { + this.comments = comments; + } + + @Override public String toJson() throws TezException { + return ""; + } + + @Override public String getComments() { + return comments; + } + + @Override public String toString() { + return "CSVResult{" + + "header=" + Arrays.toString(header) + + ", recordsList=" + recordsList + + '}'; + } + + //For testing + public void dumpToFile(String fileName) throws IOException { + OutputStreamWriter writer = new OutputStreamWriter( + new FileOutputStream(new File(fileName)), + Charset.forName("UTF-8").newEncoder()); + BufferedWriter bw = new BufferedWriter(writer); + for (String[] record : recordsList) { + + if (record.length != header.length) { + continue; //LOG error msg? + } + + StringBuilder sb = new StringBuilder(); + for(int i=0;i<record.length;i++) { + sb.append(Strings.isNotEmpty(record[i]) ? record[i] : " "); + if (i < record.length - 1) { + sb.append(","); + } + } + bw.write(sb.toString()); + bw.newLine(); + } + bw.flush(); + bw.close(); + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/e10d80fe/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/Result.java ---------------------------------------------------------------------- diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/Result.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/Result.java new file mode 100644 index 0000000..d1881eb --- /dev/null +++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/Result.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.analyzer; + +import org.apache.tez.dag.api.TezException; + +public interface Result { + + /** + * Convert result to json format + * + * @return json + * @throws TezException + */ + public String toJson() throws TezException; + + /** + * Recommendation / comments about the analysis if any. + * + * @return comments + */ + public String getComments(); +} http://git-wip-us.apache.org/repos/asf/tez/blob/e10d80fe/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/ContainerReuseAnalyzer.java ---------------------------------------------------------------------- diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/ContainerReuseAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/ContainerReuseAnalyzer.java new file mode 100644 index 0000000..905f966 --- /dev/null +++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/ContainerReuseAnalyzer.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.analyzer.plugins; + +import com.google.common.collect.Lists; +import com.google.common.collect.Multimap; +import org.apache.hadoop.conf.Configuration; +import org.apache.tez.analyzer.Analyzer; +import org.apache.tez.analyzer.CSVResult; +import org.apache.tez.dag.api.TezException; +import org.apache.tez.history.parser.datamodel.Container; +import org.apache.tez.history.parser.datamodel.DagInfo; +import org.apache.tez.history.parser.datamodel.TaskAttemptInfo; +import org.apache.tez.history.parser.datamodel.VertexInfo; + +import java.util.List; + + +/** + * Get container reuse information at a per vertex level basis. + */ +public class ContainerReuseAnalyzer implements Analyzer { + + private final Configuration config; + + private static final String[] headers = + { "vertexName", "taskAttempts", "node", "containerId", "reuseCount" }; + + private final CSVResult csvResult; + + public ContainerReuseAnalyzer(Configuration config) { + this.config = config; + this.csvResult = new CSVResult(headers); + } + + @Override + public void analyze(DagInfo dagInfo) throws TezException { + for (VertexInfo vertexInfo : dagInfo.getVertices()) { + Multimap<Container, TaskAttemptInfo> containers = vertexInfo.getContainersMapping(); + for (Container container : containers.keySet()) { + List<String> record = Lists.newLinkedList(); + record.add(vertexInfo.getVertexName()); + record.add(vertexInfo.getTaskAttempts().size() + ""); + record.add(container.getHost()); + record.add(container.getId()); + record.add(Integer.toString(containers.get(container).size())); + csvResult.addRecord(record.toArray(new String[record.size()])); + } + } + } + + @Override + public CSVResult getResult() throws TezException { + return csvResult; + } + + @Override + public String getName() { + return "Container Reuse Analyzer"; + } + + @Override + public String getDescription() { + return "Get details on container reuse analysis"; + } + + @Override + public Configuration getConfiguration() { + return config; + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/e10d80fe/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java ---------------------------------------------------------------------- diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java new file mode 100644 index 0000000..6748f3f --- /dev/null +++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.analyzer.plugins; + +import com.google.common.base.Functions; +import com.google.common.collect.ImmutableSortedMap; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Ordering; +import org.apache.hadoop.conf.Configuration; +import org.apache.tez.analyzer.Analyzer; +import org.apache.tez.analyzer.CSVResult; +import org.apache.tez.dag.api.TezException; +import org.apache.tez.history.parser.datamodel.DagInfo; +import org.apache.tez.history.parser.datamodel.VertexInfo; + +import java.util.List; +import java.util.Map; + +/** + * Identify a set of vertices which fall in the critical path in a DAG. + */ +public class CriticalPathAnalyzer implements Analyzer { + private final Configuration config; + + private static final String[] headers = { "CriticalPath", "Score" }; + + private final CSVResult csvResult; + + public CriticalPathAnalyzer(Configuration config) { + this.config = config; + this.csvResult = new CSVResult(headers); + } + + @Override public void analyze(DagInfo dagInfo) throws TezException { + Map<String, Long> result = Maps.newLinkedHashMap(); + getCriticalPath("", dagInfo.getVertices().get(dagInfo.getVertices().size() - 1), 0, result); + + System.out.println(); + System.out.println(); + + for (Map.Entry<String, Long> entry : sortByValues(result).entrySet()) { + List<String> record = Lists.newLinkedList(); + record.add(entry.getKey()); + record.add(entry.getValue() + ""); + csvResult.addRecord(record.toArray(new String[record.size()])); + System.out.println(entry.getKey() + ", " + entry.getValue()); + } + } + + @Override + public CSVResult getResult() throws TezException { + return csvResult; + } + + @Override + public String getName() { + return "CriticalPathAnalyzer"; + } + + @Override + public String getDescription() { + return "Analyze critical path of the DAG"; + } + + @Override + public Configuration getConfiguration() { + return config; + } + + private static Map<String, Long> sortByValues(Map<String, Long> result) { + //Sort result by time in reverse order + final Ordering<String> reversValueOrdering = + Ordering.natural().reverse().nullsLast().onResultOf(Functions.forMap(result, null)); + Map<String, Long> orderedMap = ImmutableSortedMap.copyOf(result, reversValueOrdering); + return orderedMap; + } + + private static void getCriticalPath(String predecessor, VertexInfo dest, long time, + Map<String, Long> result) { + String destVertexName = (dest != null) ? (dest.getVertexName()) : ""; + + if (dest != null) { + time += dest.getTimeTaken(); + predecessor += destVertexName + "-->"; + + for (VertexInfo incomingVertex : dest.getInputVertices()) { + getCriticalPath(predecessor, incomingVertex, time, result); + } + + result.put(predecessor, time); + } + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/e10d80fe/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/LocalityAnalyzer.java ---------------------------------------------------------------------- diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/LocalityAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/LocalityAnalyzer.java new file mode 100644 index 0000000..67b4c51 --- /dev/null +++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/LocalityAnalyzer.java @@ -0,0 +1,193 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.analyzer.plugins; + +import com.google.common.collect.Lists; +import org.apache.hadoop.conf.Configuration; +import org.apache.tez.analyzer.Analyzer; +import org.apache.tez.analyzer.CSVResult; +import org.apache.tez.common.counters.DAGCounter; +import org.apache.tez.common.counters.FileSystemCounter; +import org.apache.tez.common.counters.TezCounter; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.TezException; +import org.apache.tez.history.parser.datamodel.DagInfo; +import org.apache.tez.history.parser.datamodel.TaskAttemptInfo; +import org.apache.tez.history.parser.datamodel.VertexInfo; + +import java.util.List; +import java.util.Map; + + +/** + * Get locality information for tasks for vertices and get their task execution times. + * This would be helpeful to co-relate if the vertex runtime is anyways related to the data + * locality. + */ +public class LocalityAnalyzer implements Analyzer { + + private final String[] headers = { "vertexName", "numTasks", "dataLocalRatio", "rackLocalRatio", + "otherRatio", "avgDataLocalTaskRuntime", "avgRackLocalTaskRuntime", + "avgOtherLocalTaskRuntime", "noOfInputs", "avgHDFSBytesRead_DataLocal", + "avgHDFSBytesRead_RackLocal", "avgHDFSBytesRead_Others", "recommendation" }; + + private static final String DATA_LOCAL_RATIO = "tez.locality-analyzer.data.local.ratio"; + private static final float DATA_LOCAL_RATIO_DEFAULT = 0.5f; + + private final Configuration config; + + private final CSVResult csvResult; + + public LocalityAnalyzer(Configuration config) { + this.config = config; + csvResult = new CSVResult(headers); + } + + @Override + public void analyze(DagInfo dagInfo) throws TezException { + for (VertexInfo vertexInfo : dagInfo.getVertices()) { + String vertexName = vertexInfo.getVertexName(); + + Map<String, TezCounter> dataLocalTask = vertexInfo.getCounter(DAGCounter.class.getName(), + DAGCounter.DATA_LOCAL_TASKS.toString()); + Map<String, TezCounter> rackLocalTask = vertexInfo.getCounter(DAGCounter.class.getName(), + DAGCounter.RACK_LOCAL_TASKS.toString()); + + long dataLocalTasks = 0; + long rackLocalTasks = 0; + + if (!dataLocalTask.isEmpty()) { + dataLocalTasks = dataLocalTask.get(DAGCounter.class.getName()).getValue(); + } + + if (!rackLocalTask.isEmpty()) { + rackLocalTasks = rackLocalTask.get(DAGCounter.class.getName()).getValue(); + } + + long totalVertexTasks = vertexInfo.getNumTasks(); + + if (dataLocalTasks > 0 || rackLocalTasks > 0) { + //compute locality details. + float dataLocalRatio = dataLocalTasks * 1.0f / totalVertexTasks; + float rackLocalRatio = rackLocalTasks * 1.0f / totalVertexTasks; + float othersRatio = (totalVertexTasks - (dataLocalTasks + rackLocalTasks)) * 1.0f / + totalVertexTasks; + + List<String> record = Lists.newLinkedList(); + record.add(vertexName); + record.add(totalVertexTasks + ""); + record.add(dataLocalRatio + ""); + record.add(rackLocalRatio + ""); + record.add(othersRatio + ""); + + TaskAttemptDetails dataLocalResult = computeAverages(vertexInfo, + DAGCounter.DATA_LOCAL_TASKS); + TaskAttemptDetails rackLocalResult = computeAverages(vertexInfo, + DAGCounter.RACK_LOCAL_TASKS); + TaskAttemptDetails otherTaskResult = computeAverages(vertexInfo, + DAGCounter.OTHER_LOCAL_TASKS); + + record.add(dataLocalResult.avgRuntime + ""); + record.add(rackLocalResult.avgRuntime + ""); + record.add(otherTaskResult.avgRuntime + ""); + + //Get the number of inputs to this vertex + record.add(vertexInfo.getInputEdges().size() + ""); + + //Get the avg HDFS bytes read in this vertex for different type of locality + record.add(dataLocalResult.avgHDFSBytesRead + ""); + record.add(rackLocalResult.avgHDFSBytesRead + ""); + record.add(otherTaskResult.avgHDFSBytesRead + ""); + + String recommendation = ""; + if (dataLocalRatio < config.getFloat(DATA_LOCAL_RATIO, DATA_LOCAL_RATIO_DEFAULT)) { + recommendation = "Data locality is poor for this vertex. Try tuning " + + TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS + ", " + + TezConfiguration.TEZ_AM_CONTAINER_REUSE_RACK_FALLBACK_ENABLED + ", " + + TezConfiguration.TEZ_AM_CONTAINER_REUSE_NON_LOCAL_FALLBACK_ENABLED; + } + + record.add(recommendation); + csvResult.addRecord(record.toArray(new String[record.size()])); + } + } + } + + /** + * Compute counter averages for specific vertex + * + * @param vertexInfo + * @param counter + * @return task attempt details + */ + private TaskAttemptDetails computeAverages(VertexInfo vertexInfo, DAGCounter counter) { + long totalTime = 0; + long totalTasks = 0; + long totalHDFSBytesRead = 0; + + TaskAttemptDetails result = new TaskAttemptDetails(); + + for(TaskAttemptInfo attemptInfo : vertexInfo.getTaskAttempts()) { + Map<String, TezCounter> localityCounter = attemptInfo.getCounter(DAGCounter.class.getName(), + counter.toString()); + + if (!localityCounter.isEmpty() && + localityCounter.get(DAGCounter.class.getName()).getValue() > 0) { + totalTime += attemptInfo.getTimeTaken(); + totalTasks++; + + //get HDFSBytes read counter + Map<String, TezCounter> hdfsBytesReadCounter = attemptInfo.getCounter(FileSystemCounter + .class.getName(), FileSystemCounter.HDFS_BYTES_READ.name()); + for(Map.Entry<String, TezCounter> entry : hdfsBytesReadCounter.entrySet()) { + totalHDFSBytesRead += entry.getValue().getValue(); + } + } + } + if (totalTasks > 0) { + result.avgRuntime = (totalTime * 1.0f / totalTasks); + result.avgHDFSBytesRead = (totalHDFSBytesRead * 1.0f / totalTasks); + } + return result; + } + + @Override public CSVResult getResult() throws TezException { + return csvResult; + } + + @Override public String getName() { + return "Locality Analyzer"; + } + + @Override public String getDescription() { + return "Analyze for locality information (data local, rack local, off-rack)"; + } + + @Override public Configuration getConfiguration() { + return config; + } + + /** + * Placeholder for task attempt details + */ + static class TaskAttemptDetails { + float avgHDFSBytesRead; + float avgRuntime; + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/e10d80fe/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/ShuffleTimeAnalyzer.java ---------------------------------------------------------------------- diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/ShuffleTimeAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/ShuffleTimeAnalyzer.java new file mode 100644 index 0000000..8df40ba --- /dev/null +++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/ShuffleTimeAnalyzer.java @@ -0,0 +1,195 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.analyzer.plugins; + +import com.google.common.collect.Lists; +import org.apache.hadoop.conf.Configuration; +import org.apache.tez.analyzer.Analyzer; +import org.apache.tez.analyzer.CSVResult; +import org.apache.tez.common.counters.TaskCounter; +import org.apache.tez.common.counters.TezCounter; +import org.apache.tez.dag.api.TezException; +import org.apache.tez.history.parser.datamodel.DagInfo; +import org.apache.tez.history.parser.datamodel.TaskAttemptInfo; +import org.apache.tez.history.parser.datamodel.VertexInfo; + +import java.util.List; +import java.util.Map; + + +/** + * Analyze the time taken by merge phase, shuffle phase, time taken to do realistic work etc in + * tasks. + * + * Just dump REDUCE_INPUT_GROUPS, REDUCE_INPUT_RECORDS, its ratio and SHUFFLE_BYTES for tasks + * grouped by vertices. Provide time taken as well. Just render it as a table for now. + * + */ +public class ShuffleTimeAnalyzer implements Analyzer { + + private static final String SHUFFLE_TIME_RATIO = "tez.shuffle-time-analyzer.shuffle.ratio"; + private static final float SHUFFLE_TIME_RATIO_DEFAULT = 0.5f; + + private static final String MIN_SHUFFLE_RECORDS = "tez.shuffle-time-analyzer.shuffle.min.records"; + private static final long MIN_SHUFFLE_RECORDS_DEFAULT = 10000; + + private static final String[] headers = { "vertexName", "taskAttemptId", "Node", "counterGroup", + "Comments", "REDUCE_INPUT_GROUPS", "REDUCE_INPUT_RECORDS", "ratio", "SHUFFLE_BYTES", + "Time taken to receive all events", "MERGE_PHASE_TIME", "SHUFFLE_PHASE_TIME", + "TimeTaken_For_Real_Task", "FIRST_EVENT_RECEIVED", "LAST_EVENT_RECEIVED", + "SHUFFLE_BYTES_DISK_DIRECT" }; + + private final CSVResult csvResult = new CSVResult(headers); + + private final Configuration config; + + private final float shuffleTimeRatio; + private final long minShuffleRecords; + + + public ShuffleTimeAnalyzer(Configuration config) { + this.config = config; + + shuffleTimeRatio = config.getFloat + (SHUFFLE_TIME_RATIO, SHUFFLE_TIME_RATIO_DEFAULT); + minShuffleRecords = config.getLong(MIN_SHUFFLE_RECORDS, MIN_SHUFFLE_RECORDS_DEFAULT); + } + + @Override + public void analyze(DagInfo dagInfo) throws TezException { + + for (VertexInfo vertexInfo : dagInfo.getVertices()) { + for (TaskAttemptInfo attemptInfo : vertexInfo.getTaskAttempts()) { + //counter_group (basically source) --> counter + Map<String, TezCounter> reduceInputGroups = attemptInfo.getCounter(TaskCounter + .REDUCE_INPUT_GROUPS.toString()); + Map<String, TezCounter> reduceInputRecords = attemptInfo.getCounter(TaskCounter + .REDUCE_INPUT_RECORDS.toString()); + + if (reduceInputGroups == null) { + continue; + } + + for (Map.Entry<String, TezCounter> entry : reduceInputGroups.entrySet()) { + String counterGroupName = entry.getKey(); + long reduceInputGroupsVal = entry.getValue().getValue(); + long reduceInputRecordsVal = (reduceInputRecords.get(counterGroupName) != null) ? + reduceInputRecords.get(counterGroupName).getValue() : 0; + + if (reduceInputRecordsVal <= 0) { + continue; + } + float ratio = (reduceInputGroupsVal * 1.0f / reduceInputRecordsVal); + + if (ratio > 0 && reduceInputRecordsVal > minShuffleRecords) { + List<String> result = Lists.newLinkedList(); + result.add(vertexInfo.getVertexName()); + result.add(attemptInfo.getTaskAttemptId()); + result.add(attemptInfo.getNodeId()); + result.add(counterGroupName); + + //Real work done in the task + long timeTakenForRealWork = attemptInfo.getTimeTaken() - + Long.parseLong(getCounterValue(TaskCounter.MERGE_PHASE_TIME, counterGroupName, + attemptInfo)); + + String comments = ""; + if ((timeTakenForRealWork * 1.0f / attemptInfo.getTimeTaken()) < shuffleTimeRatio) { + comments = "Time taken in shuffle is more than the actual work being done in task. " + + " Check if source/destination machine is a slow node. Check if merge phase " + + "time is more to understand disk bottlenecks in this node. Check for skew"; + } + result.add(comments); + + result.add(reduceInputGroupsVal + ""); + result.add(reduceInputRecordsVal + ""); + result.add("" + (1.0f * reduceInputGroupsVal / reduceInputRecordsVal)); + result.add(getCounterValue(TaskCounter.SHUFFLE_BYTES, counterGroupName, attemptInfo)); + + //Total time taken for receiving all events from source tasks + result.add(getOverheadFromSourceTasks(counterGroupName, attemptInfo)); + result.add(getCounterValue(TaskCounter.MERGE_PHASE_TIME, counterGroupName, attemptInfo)); + result.add(getCounterValue(TaskCounter.SHUFFLE_PHASE_TIME, counterGroupName, attemptInfo)); + + + result.add(Long.toString(timeTakenForRealWork)); + + result.add(getCounterValue(TaskCounter.FIRST_EVENT_RECEIVED, counterGroupName, attemptInfo)); + result.add(getCounterValue(TaskCounter.LAST_EVENT_RECEIVED, counterGroupName, attemptInfo)); + result.add(getCounterValue(TaskCounter.SHUFFLE_BYTES_DISK_DIRECT, counterGroupName, attemptInfo)); + + csvResult.addRecord(result.toArray(new String[result.size()])); + } + } + } + } + + } + + /** + * Time taken to receive all events from source tasks + * + * @param counterGroupName + * @param attemptInfo + * @return String + */ + private String getOverheadFromSourceTasks(String counterGroupName, TaskAttemptInfo attemptInfo) { + long firstEventReceived = Long.parseLong(getCounterValue(TaskCounter.FIRST_EVENT_RECEIVED, + counterGroupName, attemptInfo)); + long lastEventReceived = Long.parseLong(getCounterValue(TaskCounter.LAST_EVENT_RECEIVED, + counterGroupName, attemptInfo)); + return Long.toString(lastEventReceived - firstEventReceived); + } + + private String getCounterValue(TaskCounter counter, String counterGroupName, + TaskAttemptInfo attemptInfo) { + Map<String, TezCounter> tezCounterMap = attemptInfo.getCounter(counter.toString()); + if (tezCounterMap != null) { + for (Map.Entry<String, TezCounter> entry : tezCounterMap.entrySet()) { + String groupName = entry.getKey(); + long val = entry.getValue().getValue(); + if (groupName.equals(counterGroupName)) { + return Long.toString(val); + } + } + } + return ""; + } + + @Override + public CSVResult getResult() throws TezException { + return csvResult; + } + + @Override + public String getName() { + return "Shuffle time analyzer"; + } + + @Override + public String getDescription() { + return "Analyze the time taken for shuffle, merge " + + "and the real work done in the task"; + } + + @Override + public Configuration getConfiguration() { + return config; + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/e10d80fe/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SkewAnalyzer.java ---------------------------------------------------------------------- diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SkewAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SkewAnalyzer.java new file mode 100644 index 0000000..8152344 --- /dev/null +++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SkewAnalyzer.java @@ -0,0 +1,309 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.analyzer.plugins; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import org.apache.hadoop.conf.Configuration; +import org.apache.tez.analyzer.Analyzer; +import org.apache.tez.analyzer.CSVResult; +import org.apache.tez.common.counters.TaskCounter; +import org.apache.tez.common.counters.TezCounter; +import org.apache.tez.dag.api.TezException; +import org.apache.tez.history.parser.datamodel.DagInfo; +import org.apache.tez.history.parser.datamodel.TaskAttemptInfo; +import org.apache.tez.history.parser.datamodel.VertexInfo; + +import java.util.List; +import java.util.Map; + + +/** + * <p/> + * Identify the skew (RECORD_INPUT_GROUPS / REDUCE_INPUT_RECORDS) ratio for all task attempts + * and report if they are below a certain threshold. + * <p/> + * <p/> + * - Case 1: Ratio of (reduce_input_groups / reduce_input_records) < 0.2 && SHUFFLE_BYTES > 1 GB + * per task attempt from a source. This means couple of keys having too many records. Either + * partitioning is wrong, or we need to increase memory limit for this vertex. + * <p/> + * - Case 2: Ratio of (reduce_input_groups / reduce_input_records) > 0.6 & Number of reduce input + * records in task attempt is closer to say 60% of overall number of records + * in vertex level & numTasks in vertex is greater than 1. This might have any number of reducer + * groups. This means that, partitioning is wrong (can also consider reducing number of tasks + * for that vertex). In some cases, too many reducers are launched and this can help find those. + * <p/> + * - Case 3: Ratio of (reduce_input_groups / reduce_input_records) is between 0.2 & 0.6 per task + * attempt & numTasks is greater than 1 & SHUFFLE_BYTES > 1 GB per task attempt from a + * source. This means, may be consider increasing parallelism based on the task attempt runtime. + * <p/> + */ +public class SkewAnalyzer implements Analyzer { + + private static final String SHUFFLE_BYTES_PER_ATTEMPT_PER_SOURCE = "tez.skew-analyzer.shuffle" + + ".bytes.per.source"; + private static final long SHUFFLE_BYTES_PER_ATTEMPT_PER_SOURCE_DEFAULT = 900 * 1024 * 1024l; + + //Min reducer input group : reducer keys ratio for computation + private static final String ATTEMPT_SHUFFLE_KEY_GROUP_MIN_RATIO = "tez.skew-analyzer.shuffle.key" + + ".group.min.ratio"; + private static final float ATTEMPT_SHUFFLE_KEY_GROUP_MIN_RATIO_DEFAULT = 0.2f; + + //Max reducer input group : reducer keys ratio for computation + private static final String ATTEMPT_SHUFFLE_KEY_GROUP_MAX_RATIO = "tez.skew-analyzer.shuffle.key" + + ".group.max.ratio"; + private static final float ATTEMPT_SHUFFLE_KEY_GROUP_MAX_RATIO_DEFAULT = 0.4f; + + + + private static final String[] headers = { "vertexName", "taskAttemptId", "counterGroup", "node", + "REDUCE_INPUT_GROUPS", "REDUCE_INPUT_RECORDS", "ratio", "SHUFFLE_BYTES", "timeTaken", + "observation" }; + + private final CSVResult csvResult = new CSVResult(headers); + + private final Configuration config; + + private final float minRatio; + private final float maxRatio; + private final long maxShuffleBytesPerSource; + + public SkewAnalyzer(Configuration config) { + this.config = config; + maxRatio = config.getFloat(ATTEMPT_SHUFFLE_KEY_GROUP_MAX_RATIO, + ATTEMPT_SHUFFLE_KEY_GROUP_MAX_RATIO_DEFAULT); + minRatio = config.getFloat(ATTEMPT_SHUFFLE_KEY_GROUP_MIN_RATIO, + ATTEMPT_SHUFFLE_KEY_GROUP_MIN_RATIO_DEFAULT); + maxShuffleBytesPerSource = config.getLong(SHUFFLE_BYTES_PER_ATTEMPT_PER_SOURCE, + SHUFFLE_BYTES_PER_ATTEMPT_PER_SOURCE_DEFAULT); + } + + @Override + public void analyze(DagInfo dagInfo) throws TezException { + Preconditions.checkArgument(dagInfo != null, "DAG can't be null"); + analyzeReducers(dagInfo); + } + + private void analyzeReducers(DagInfo dagInfo) { + for (VertexInfo vertexInfo : dagInfo.getVertices()) { + for (TaskAttemptInfo attemptInfo : vertexInfo.getTaskAttempts()) { + analyzeGroupSkewPerSource(attemptInfo); + analyzeRecordSkewPerSource(attemptInfo); + analyzeForParallelism(attemptInfo); + } + } + } + + /** + * Analyze scenario where couple keys are having too many records per source + * + * @param attemptInfo + */ + private void analyzeGroupSkewPerSource(TaskAttemptInfo attemptInfo) { + + //counter_group (basically source) --> counter + Map<String, TezCounter> reduceInputGroups = attemptInfo.getCounter(TaskCounter + .REDUCE_INPUT_GROUPS.toString()); + Map<String, TezCounter> reduceInputRecords = attemptInfo.getCounter(TaskCounter + .REDUCE_INPUT_RECORDS.toString()); + Map<String, TezCounter> shuffleBytes = attemptInfo.getCounter(TaskCounter.SHUFFLE_BYTES.toString()); + + + //tez counter for every source + for (Map.Entry<String, TezCounter> entry : reduceInputGroups.entrySet()) { + if (entry.getKey().equals(TaskCounter.class.getName())) { + //TODO: Tez counters always ends up adding fgroups and groups, due to which we end up + // getting TaskCounter details as well. + continue; + } + + String counterGroup = entry.getKey(); + long inputGroupsCount = entry.getValue().getValue(); + long inputRecordsCount = (reduceInputRecords.get(counterGroup) != null) ? reduceInputRecords + .get(counterGroup).getValue() : 0; + long shuffleBytesPerSource = (shuffleBytes.get(counterGroup) != null) ? shuffleBytes.get + (counterGroup).getValue() : 0; + + float ratio = (inputGroupsCount * 1.0f / inputRecordsCount); + + //Case 1: Couple of keys having too many records per source. + if (shuffleBytesPerSource > maxShuffleBytesPerSource) { + if (ratio < minRatio) { + List<String> result = Lists.newLinkedList(); + result.add(attemptInfo.getTaskInfo().getVertexInfo().getVertexName()); + result.add(attemptInfo.getTaskAttemptId()); + result.add(counterGroup); + result.add(attemptInfo.getNodeId()); + result.add(inputGroupsCount + ""); + result.add(inputRecordsCount + ""); + result.add(ratio + ""); + result.add(shuffleBytesPerSource + ""); + result.add(attemptInfo.getTimeTaken() + ""); + result.add("Please check partitioning. Otherwise consider increasing memLimit"); + + csvResult.addRecord(result.toArray(new String[result.size()])); + } + } + } + } + + /** + * Analyze scenario where one task is getting > 60% of the vertex level records + * + * @param attemptInfo + */ + private void analyzeRecordSkewPerSource(TaskAttemptInfo attemptInfo) { + + Map<String, TezCounter> vertexLevelReduceInputRecords = + attemptInfo.getTaskInfo().getVertexInfo() + .getCounter(TaskCounter.REDUCE_INPUT_RECORDS.toString()); + + int vertexNumTasks = attemptInfo.getTaskInfo().getVertexInfo().getNumTasks(); + + //counter_group (basically source) --> counter + Map<String, TezCounter> reduceInputGroups = attemptInfo.getCounter(TaskCounter + .REDUCE_INPUT_GROUPS.toString()); + Map<String, TezCounter> reduceInputRecords = attemptInfo.getCounter(TaskCounter + .REDUCE_INPUT_RECORDS.toString()); + Map<String, TezCounter> shuffleBytes = attemptInfo.getCounter(TaskCounter.SHUFFLE_BYTES.toString()); + + + //tez counter for every source + for (Map.Entry<String, TezCounter> entry : reduceInputGroups.entrySet()) { + if (entry.getKey().equals(TaskCounter.class.getName())) { + //TODO: Tez counters always ends up adding fgroups and groups, due to which we end up + // getting TaskCounter details as well. + continue; + } + + String counterGroup = entry.getKey(); + long inputGroupsCount = entry.getValue().getValue(); + long inputRecordsCount = (reduceInputRecords.get(counterGroup) != null) ? reduceInputRecords + .get(counterGroup).getValue() : 0; + long shuffleBytesPerSource = (shuffleBytes.get(counterGroup) != null) ?shuffleBytes.get + (counterGroup).getValue() : 0; + long vertexLevelInputRecordsCount = (vertexLevelReduceInputRecords.get(counterGroup) != + null) ? + vertexLevelReduceInputRecords.get(counterGroup).getValue() : 0; + + float ratio = (inputRecordsCount * 1.0f / vertexLevelInputRecordsCount); + + if (vertexNumTasks > 1) { + if (ratio > maxRatio) { + //input records > 60% of vertex level record count + if (inputRecordsCount > (vertexLevelInputRecordsCount * 0.60)) { + List<String> result = Lists.newLinkedList(); + result.add(attemptInfo.getTaskInfo().getVertexInfo().getVertexName()); + result.add(attemptInfo.getTaskAttemptId()); + result.add(counterGroup); + result.add(attemptInfo.getNodeId()); + result.add(inputGroupsCount + ""); + result.add(inputRecordsCount + ""); + result.add(ratio + ""); + result.add(shuffleBytesPerSource + ""); + result.add(attemptInfo.getTimeTaken() + ""); + result.add("Some task attempts are getting > 60% of reduce input records. " + + "Consider adjusting parallelism & check partition logic"); + + csvResult.addRecord(result.toArray(new String[result.size()])); + + } + } + } + } + } + + /** + * Analyze scenario where a vertex would need to increase parallelism + * + * @param attemptInfo + */ + private void analyzeForParallelism(TaskAttemptInfo attemptInfo) { + + //counter_group (basically source) --> counter + Map<String, TezCounter> reduceInputGroups = attemptInfo.getCounter(TaskCounter + .REDUCE_INPUT_GROUPS.toString()); + Map<String, TezCounter> reduceInputRecords = attemptInfo.getCounter(TaskCounter + .REDUCE_INPUT_RECORDS.toString()); + Map<String, TezCounter> shuffleBytes = attemptInfo.getCounter(TaskCounter.SHUFFLE_BYTES.toString()); + + //tez counter for every source + for (Map.Entry<String, TezCounter> entry : reduceInputGroups.entrySet()) { + if (entry.getKey().equals(TaskCounter.class.getName())) { + //TODO: Tez counters always ends up adding fgroups and groups, due to which we end up + // getting TaskCounter details as well. + continue; + } + + String counterGroup = entry.getKey(); + long inputGroupsCount = entry.getValue().getValue(); + long inputRecordsCount = (reduceInputRecords.get(counterGroup) != null) ? reduceInputRecords + .get(counterGroup).getValue() : 0; + long shuffleBytesPerSource = (shuffleBytes.get(counterGroup) != null) ? shuffleBytes.get + (counterGroup).getValue() : 0; + + float ratio = (inputGroupsCount * 1.0f / inputRecordsCount); + + //Case 3: Shuffle_Bytes > 1 GB. Ratio between 0.2 & < 0.6. Consider increasing + // parallelism based on task runtime. + if (shuffleBytesPerSource > SHUFFLE_BYTES_PER_ATTEMPT_PER_SOURCE_DEFAULT) { + if (ratio > minRatio && ratio < maxRatio) { + //couple of keys have too many records. Classic case of partition issue. + List<String> result = Lists.newLinkedList(); + result.add(attemptInfo.getTaskInfo().getVertexInfo().getVertexName()); + result.add(attemptInfo.getTaskAttemptId()); + result.add(counterGroup); + result.add(attemptInfo.getNodeId()); + result.add(inputGroupsCount + ""); + result.add(inputRecordsCount + ""); + result.add(ratio + ""); + result.add(shuffleBytesPerSource + ""); + result.add(attemptInfo.getTimeTaken() + ""); + result.add("Consider increasing parallelism."); + + csvResult.addRecord(result.toArray(new String[result.size()])); + } + } + } + + + } + + + @Override + public CSVResult getResult() throws TezException { + return csvResult; + } + + @Override + public String getName() { + return "Skew Analyzer"; + } + + @Override + public String getDescription() { + return "Analyzer reducer skews by mining reducer task counters"; + } + + @Override + public Configuration getConfiguration() { + return null; + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/e10d80fe/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowNodeAnalyzer.java ---------------------------------------------------------------------- diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowNodeAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowNodeAnalyzer.java new file mode 100644 index 0000000..407cf47 --- /dev/null +++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowNodeAnalyzer.java @@ -0,0 +1,188 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.analyzer.plugins; + +import com.google.common.base.Predicate; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Multimap; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.tez.analyzer.Analyzer; +import org.apache.tez.analyzer.CSVResult; +import org.apache.tez.common.counters.FileSystemCounter; +import org.apache.tez.common.counters.TaskCounter; +import org.apache.tez.common.counters.TezCounter; +import org.apache.tez.common.counters.TezCounters; +import org.apache.tez.dag.api.TezException; +import org.apache.tez.dag.api.oldrecords.TaskAttemptState; +import org.apache.tez.history.parser.datamodel.DagInfo; +import org.apache.tez.history.parser.datamodel.TaskAttemptInfo; + +import java.util.Collection; +import java.util.List; + + +/** + * This will provide the set of nodes participated in the DAG in descending order of task execution + * time. + * <p/> + * Combine it with other counters to understand slow nodes better. + */ +public class SlowNodeAnalyzer implements Analyzer { + + private static final Log LOG = LogFactory.getLog(SlowNodeAnalyzer.class); + + private static final String[] headers = { "nodeName", "noOfTasksExecuted", "noOfKilledTasks", + "noOfFailedTasks", "avgSucceededTaskExecutionTime", "avgKilledTaskExecutionTime", + "avgFailedTaskExecutionTime", "avgHDFSBytesRead", "avgHDFSBytesWritten", + "avgFileBytesRead", "avgFileBytesWritten", "avgGCTimeMillis", "avgCPUTimeMillis" }; + + private final CSVResult csvResult = new CSVResult(headers); + + private final Configuration config; + + public SlowNodeAnalyzer(Configuration config) { + this.config = config; + } + + @Override + public void analyze(DagInfo dagInfo) throws TezException { + Multimap<String, TaskAttemptInfo> nodeDetails = dagInfo.getNodeDetails(); + for (String nodeName : nodeDetails.keySet()) { + List<String> record = Lists.newLinkedList(); + + Collection<TaskAttemptInfo> taskAttemptInfos = nodeDetails.get(nodeName); + + record.add(nodeName); + record.add(taskAttemptInfos.size() + ""); + record.add(getNumberOfTasks(taskAttemptInfos, TaskAttemptState.KILLED) + ""); + record.add(getNumberOfTasks(taskAttemptInfos, TaskAttemptState.FAILED) + ""); + + Iterable<TaskAttemptInfo> succeedTasks = getFilteredTaskAttempts(taskAttemptInfos, + TaskAttemptState.SUCCEEDED); + record.add(getAvgTaskExecutionTime(succeedTasks) + ""); + + Iterable<TaskAttemptInfo> killedTasks = getFilteredTaskAttempts(taskAttemptInfos, + TaskAttemptState.KILLED); + record.add(getAvgTaskExecutionTime(killedTasks) + ""); + + Iterable<TaskAttemptInfo> failedTasks = getFilteredTaskAttempts(taskAttemptInfos, + TaskAttemptState.FAILED); + record.add(getAvgTaskExecutionTime(failedTasks) + ""); + + record.add(getAvgCounter(taskAttemptInfos, FileSystemCounter.class + .getName(), FileSystemCounter.HDFS_BYTES_READ.name()) + ""); + record.add(getAvgCounter(taskAttemptInfos, FileSystemCounter.class + .getName(), FileSystemCounter.HDFS_BYTES_WRITTEN.name()) + ""); + record.add(getAvgCounter(taskAttemptInfos, FileSystemCounter.class + .getName(), FileSystemCounter.FILE_BYTES_READ.name()) + ""); + record.add(getAvgCounter(taskAttemptInfos, FileSystemCounter.class + .getName(), FileSystemCounter.FILE_BYTES_WRITTEN.name()) + ""); + record.add(getAvgCounter(taskAttemptInfos, TaskCounter.class + .getName(), TaskCounter.GC_TIME_MILLIS.name()) + ""); + record.add(getAvgCounter(taskAttemptInfos, TaskCounter.class + .getName(), TaskCounter.CPU_MILLISECONDS.name()) + ""); + + csvResult.addRecord(record.toArray(new String[record.size()])); + } + } + + private Iterable<TaskAttemptInfo> getFilteredTaskAttempts(Collection<TaskAttemptInfo> + taskAttemptInfos, final TaskAttemptState status) { + return Iterables.filter(taskAttemptInfos, new + Predicate<TaskAttemptInfo>() { + @Override public boolean apply(TaskAttemptInfo input) { + return input.getStatus().equalsIgnoreCase(status.toString()); + } + }); + } + + private float getAvgTaskExecutionTime(Iterable<TaskAttemptInfo> taskAttemptInfos) { + long totalTime = 0; + int size = 0; + for (TaskAttemptInfo attemptInfo : taskAttemptInfos) { + totalTime += attemptInfo.getTimeTaken(); + size++; + } + return (size > 0) ? (totalTime * 1.0f / size) : 0; + } + + private int getNumberOfTasks(Collection<TaskAttemptInfo> taskAttemptInfos, TaskAttemptState + status) { + int tasks = 0; + for (TaskAttemptInfo attemptInfo : taskAttemptInfos) { + if (attemptInfo.getStatus().equalsIgnoreCase(status.toString())) { + tasks++; + } + } + return tasks; + } + + private float getAvgCounter(Collection<TaskAttemptInfo> taskAttemptInfos, String + counterGroupName, String counterName) { + long total = 0; + int taskCount = 0; + for (TaskAttemptInfo attemptInfo : taskAttemptInfos) { + TezCounters tezCounters = attemptInfo.getTezCounters(); + TezCounter counter = tezCounters.findCounter(counterGroupName, counterName); + if (counter != null) { + total += counter.getValue(); + taskCount++; + } else { + LOG.info("Could not find counterGroupName=" + counterGroupName + ", counter=" + + counterName + " in " + attemptInfo); + } + } + return (taskCount > 0) ? (total * 1.0f / taskCount) : 0; + } + + @Override + public CSVResult getResult() throws TezException { + return csvResult; + } + + @Override + public String getName() { + return "Slow Node Analyzer"; + } + + @Override + public String getDescription() { + StringBuilder sb = new StringBuilder(); + sb.append("Analyze node details for the DAG.").append("\n"); + sb.append("This could be used to find out the set of nodes where the tasks are taking more " + + "time on average.").append("\n"); + sb.append("This could be used to find out the set of nodes where the tasks are taking more " + + "time on average and to understand whether too many tasks got scheduled on a node.") + .append("\n"); + sb.append("One needs to combine the task execution time with other metrics like bytes " + + "read/written etc to get better idea of bad nodes. In order to understand the slow " + + "nodes due to network, it might be worthwhile to consider the shuffle performance " + + "analyzer tool in tez-tools").append("\n"); + return sb.toString(); + } + + @Override + public Configuration getConfiguration() { + return config; + } + +} http://git-wip-us.apache.org/repos/asf/tez/blob/e10d80fe/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowTaskIdentifier.java ---------------------------------------------------------------------- diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowTaskIdentifier.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowTaskIdentifier.java new file mode 100644 index 0000000..7c7f5c0 --- /dev/null +++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowTaskIdentifier.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.analyzer.plugins; + +import com.google.common.collect.Lists; +import org.apache.hadoop.conf.Configuration; +import org.apache.tez.analyzer.Analyzer; +import org.apache.tez.analyzer.CSVResult; +import org.apache.tez.dag.api.TezException; +import org.apache.tez.history.parser.datamodel.DagInfo; +import org.apache.tez.history.parser.datamodel.TaskAttemptInfo; +import org.apache.tez.history.parser.datamodel.VertexInfo; + +import java.util.Collections; +import java.util.Comparator; +import java.util.List; + + +/** + * Analyze slow tasks in the DAG. Top 100 tasks are listed by default. + * + * <p/> + * //TODO: We do not get counters for killed task attempts yet. + */ +public class SlowTaskIdentifier implements Analyzer { + + private static final String[] headers = { "vertexName", "taskAttemptId", + "Node", "taskDuration", "Status", + "NoOfInputs" }; + + private final CSVResult csvResult; + + private static final String NO_OF_TASKS = "tez.slow-task-analyzer.task.count"; + private static final int NO_OF_TASKS_DEFAULT = 100; + + private final Configuration config; + + public SlowTaskIdentifier(Configuration config) { + this.config = config; + this.csvResult = new CSVResult(headers); + } + + @Override + public void analyze(DagInfo dagInfo) throws TezException { + List<TaskAttemptInfo> taskAttempts = Lists.newArrayList(); + for(VertexInfo vertexInfo : dagInfo.getVertices()) { + taskAttempts.addAll(vertexInfo.getTaskAttempts()); + } + + //sort them by runtime in descending order + Collections.sort(taskAttempts, new Comparator<TaskAttemptInfo>() { + @Override public int compare(TaskAttemptInfo o1, TaskAttemptInfo o2) { + return (o1.getTimeTaken() > o2.getTimeTaken()) ? -1 : + ((o1.getTimeTaken() == o2.getTimeTaken()) ? + 0 : 1); + } + }); + + int limit = config.getInt(NO_OF_TASKS, NO_OF_TASKS_DEFAULT); + for(int i=0;i<limit;i++) { + List<String> record = Lists.newLinkedList(); + record.add(taskAttempts.get(i).getTaskInfo().getVertexInfo().getVertexName()); + record.add(taskAttempts.get(i).getTaskAttemptId()); + record.add(taskAttempts.get(i).getContainer().getHost()); + record.add(taskAttempts.get(i).getTimeTaken() + ""); + record.add(taskAttempts.get(i).getStatus()); + record.add(taskAttempts.get(i).getTaskInfo().getVertexInfo().getInputEdges().size() + ""); + + csvResult.addRecord(record.toArray(new String[record.size()])); + } + + } + + @Override + public CSVResult getResult() throws TezException { + return csvResult; + } + + @Override + public String getName() { + return "Slow Task Identifier"; + } + + @Override + public String getDescription() { + return "Identifies slow tasks in the DAG"; + } + + @Override + public Configuration getConfiguration() { + return config; + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/e10d80fe/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowestVertexAnalyzer.java ---------------------------------------------------------------------- diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowestVertexAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowestVertexAnalyzer.java new file mode 100644 index 0000000..7364506 --- /dev/null +++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowestVertexAnalyzer.java @@ -0,0 +1,188 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.analyzer.plugins; + +import com.codahale.metrics.Histogram; +import com.codahale.metrics.MetricRegistry; +import com.google.common.collect.Lists; +import org.apache.hadoop.conf.Configuration; +import org.apache.tez.analyzer.Analyzer; +import org.apache.tez.analyzer.CSVResult; +import org.apache.tez.common.counters.TaskCounter; +import org.apache.tez.common.counters.TezCounter; +import org.apache.tez.dag.api.TezException; +import org.apache.tez.history.parser.datamodel.DagInfo; +import org.apache.tez.history.parser.datamodel.TaskAttemptInfo; +import org.apache.tez.history.parser.datamodel.VertexInfo; + +import java.util.List; +import java.util.Map; + +/** + * Identify the slowest vertex in the DAG. + */ +public class SlowestVertexAnalyzer implements Analyzer { + + private static final String[] headers = { "vertexName", "taskAttempts", "totalTime", + "shuffleTime", "shuffleTime_Max", "LastEventReceived", "LastEventReceivedFrom", + "TimeTaken_ForRealWork", "75thPercentile", "95thPercentile", "98thPercentile", "Median", + "observation", "comments" }; + + private final CSVResult csvResult = new CSVResult(headers); + + private final Configuration config; + private final MetricRegistry metrics = new MetricRegistry(); + private Histogram taskAttemptRuntimeHistorgram; + + public SlowestVertexAnalyzer(Configuration config) { + this.config = config; + } + + @Override + public void analyze(DagInfo dagInfo) throws TezException { + + for (VertexInfo vertexInfo : dagInfo.getVertices()) { + String vertexName = vertexInfo.getVertexName(); + long totalTime = vertexInfo.getTimeTaken(); + + long max = Long.MIN_VALUE; + String maxSourceName = ""; + taskAttemptRuntimeHistorgram = metrics.histogram(vertexName); + + + for (TaskAttemptInfo attemptInfo : vertexInfo.getTaskAttempts()) { + + taskAttemptRuntimeHistorgram.update(attemptInfo.getTimeTaken()); + + //Get the last event received from the incoming vertices + Map<String, TezCounter> lastEventReceivedMap = attemptInfo.getCounter( + TaskCounter.LAST_EVENT_RECEIVED.toString()); + + for (Map.Entry<String, TezCounter> entry : lastEventReceivedMap.entrySet()) { + if (entry.getKey().equals(TaskCounter.class.getName())) { + //TODO: Tez counters always ends up adding fgroups and groups, due to which we end up + // getting TaskCounter details as well. + continue; + } + //Find the slowest last event received + if (entry.getValue().getValue() > max) { + //w.r.t vertex start time. + max =(attemptInfo.getStartTime() + entry.getValue().getValue()) - + (vertexInfo.getStartTime()); + maxSourceName = entry.getKey(); + } + } + } + + long shuffleMax = Long.MIN_VALUE; + String shuffleMaxSource = ""; + for (TaskAttemptInfo attemptInfo : vertexInfo.getTaskAttempts()) { + //Get the last event received from the incoming vertices + Map<String, TezCounter> lastEventReceivedMap = attemptInfo.getCounter( + TaskCounter.SHUFFLE_PHASE_TIME.toString()); + + for (Map.Entry<String, TezCounter> entry : lastEventReceivedMap.entrySet()) { + if (entry.getKey().equals(TaskCounter.class.getName())) { + //ignore. TODO: hack for taskcounter issue + continue; + } + //Find the slowest last event received + if (entry.getValue().getValue() > shuffleMax) { + //w.r.t vertex start time. + shuffleMax =(attemptInfo.getStartTime() + entry.getValue().getValue()) - + (vertexInfo.getStartTime()); + shuffleMaxSource = entry.getKey(); + } + } + } + + String comments = ""; + + List<String> record = Lists.newLinkedList(); + record.add(vertexName); + record.add(vertexInfo.getTaskAttempts().size() + ""); + record.add(totalTime + ""); + record.add(Math.max(0, shuffleMax) + ""); + record.add(shuffleMaxSource); + record.add(Math.max(0, max) + ""); + record.add(maxSourceName); + record.add(Math.max(0,(totalTime - max)) + ""); + + StringBuilder sb = new StringBuilder(); + double percentile75 = taskAttemptRuntimeHistorgram.getSnapshot().get75thPercentile(); + double percentile95 = taskAttemptRuntimeHistorgram.getSnapshot().get95thPercentile(); + double percentile98 = taskAttemptRuntimeHistorgram.getSnapshot().get98thPercentile(); + double percentile99 = taskAttemptRuntimeHistorgram.getSnapshot().get99thPercentile(); + double medianAttemptRuntime = taskAttemptRuntimeHistorgram.getSnapshot().getMedian(); + + record.add("75th=" + percentile75); + record.add("95th=" + percentile95); + record.add("98th=" + percentile98); + record.add("median=" + medianAttemptRuntime); + + if (percentile75 / percentile99 < 0.5) { + //looks like some straggler task is there. + sb.append("Looks like some straggler task is there"); + } + + record.add(sb.toString()); + + if (totalTime > 0 && vertexInfo.getTaskAttempts().size() > 0) { + if ((shuffleMax * 1.0f / totalTime) > 0.5) { + if ((max * 1.0f / totalTime) > 0.5) { + comments = "This vertex is slow due to its dependency on parent. Got a lot delayed last" + + " event received"; + } else { + comments = + "Spending too much time on shuffle. Check shuffle bytes from previous vertex"; + } + } else { + if (totalTime > 10000) { //greater than 10 seconds. //TODO: Configure it later. + comments = "Concentrate on this vertex (totalTime > 10 seconds)"; + } + } + } + + record.add(comments); + csvResult.addRecord(record.toArray(new String[record.size()])); + } + } + + + @Override + public CSVResult getResult() throws TezException { + return csvResult; + } + + @Override + public String getName() { + return "SlowVertexAnalyzer"; + } + + @Override + public String getDescription() { + return "Identify the slowest vertex in the DAG, which needs to be looked into first"; + } + + @Override + public Configuration getConfiguration() { + return config; + } + +} http://git-wip-us.apache.org/repos/asf/tez/blob/e10d80fe/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SpillAnalyzerImpl.java ---------------------------------------------------------------------- diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SpillAnalyzerImpl.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SpillAnalyzerImpl.java new file mode 100644 index 0000000..c650104 --- /dev/null +++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SpillAnalyzerImpl.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.analyzer.plugins; + +import com.google.common.collect.Lists; +import org.apache.hadoop.conf.Configuration; +import org.apache.tez.analyzer.Analyzer; +import org.apache.tez.analyzer.CSVResult; +import org.apache.tez.common.counters.TaskCounter; +import org.apache.tez.common.counters.TezCounter; +import org.apache.tez.dag.api.TezException; +import org.apache.tez.history.parser.datamodel.DagInfo; +import org.apache.tez.history.parser.datamodel.TaskAttemptInfo; +import org.apache.tez.history.parser.datamodel.VertexInfo; +import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; + +import java.util.List; +import java.util.Map; + + +/** + * Find out tasks which have more than 1 spill (ADDITIONAL_SPILL_COUNT). + * <p/> + * Accompany this with OUTPUT_BYTES (> 1 GB data written) + */ +public class SpillAnalyzerImpl implements Analyzer { + + private static final String[] headers = { "vertexName", "taskAttemptId", + "Node", "counterGroupName", + "spillCount", "taskDuration", + "OUTPUT_BYTES", "OUTPUT_RECORDS", + "SPILLED_RECORDS", "Recommendation" }; + + private final CSVResult csvResult; + + private static long OUTPUT_BYTES_THRESHOLD = 1 * 1024 * 1024 * 1024l; + + private final Configuration config; + + public SpillAnalyzerImpl(Configuration config) { + this.config = config; + this.csvResult = new CSVResult(headers); + } + + @Override + public void analyze(DagInfo dagInfo) throws TezException { + for (VertexInfo vertexInfo : dagInfo.getVertices()) { + String vertexName = vertexInfo.getVertexName(); + + for (TaskAttemptInfo attemptInfo : vertexInfo.getTaskAttempts()) { + //Get ADDITIONAL_SPILL_COUNT, OUTPUT_BYTES for every source + Map<String, TezCounter> spillCountMap = + attemptInfo.getCounter(TaskCounter.ADDITIONAL_SPILL_COUNT.name()); + Map<String, TezCounter> spilledRecordsMap = + attemptInfo.getCounter(TaskCounter.SPILLED_RECORDS.name()); + Map<String, TezCounter> outputRecordsMap = + attemptInfo.getCounter(TaskCounter.OUTPUT_RECORDS.name()); + + Map<String, TezCounter> outputBytesMap = + attemptInfo.getCounter(TaskCounter.OUTPUT_BYTES.name()); + + for (Map.Entry<String, TezCounter> entry : spillCountMap.entrySet()) { + String source = entry.getKey(); + long spillCount = entry.getValue().getValue(); + long outBytes = outputBytesMap.get(source).getValue(); + + long outputRecords = outputRecordsMap.get(source).getValue(); + long spilledRecords = spilledRecordsMap.get(source).getValue(); + + if (spillCount > 1 && outBytes > OUTPUT_BYTES_THRESHOLD) { + List<String> recorList = Lists.newLinkedList(); + recorList.add(vertexName); + recorList.add(attemptInfo.getTaskAttemptId()); + recorList.add(attemptInfo.getNodeId()); + recorList.add(source); + recorList.add(spillCount + ""); + recorList.add(attemptInfo.getTimeTaken() + ""); + recorList.add(outBytes + ""); + recorList.add(outputRecords + ""); + recorList.add(spilledRecords + ""); + recorList.add("Consider increasing " + TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB + + ", try increasing container size."); + + csvResult.addRecord(recorList.toArray(new String[recorList.size()])); + } + } + } + } + } + + @Override + public CSVResult getResult() throws TezException { + return csvResult; + } + + @Override + public String getName() { + return "SpillAnalyzer"; + } + + @Override + public String getDescription() { + return "Analyze spill details in the task"; + } + + @Override + public Configuration getConfiguration() { + return config; + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/e10d80fe/tez-tools/analyzers/pom.xml ---------------------------------------------------------------------- diff --git a/tez-tools/analyzers/pom.xml b/tez-tools/analyzers/pom.xml new file mode 100644 index 0000000..97ce541 --- /dev/null +++ b/tez-tools/analyzers/pom.xml @@ -0,0 +1,31 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. See accompanying LICENSE file. +--> + +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.tez</groupId> + <artifactId>tez-tools</artifactId> + <version>0.8.0-SNAPSHOT</version> + </parent> + <artifactId>tez-perf-analyzer</artifactId> + <packaging>pom</packaging> + + <modules> + <module>job-analyzer</module> + </modules> +</project> http://git-wip-us.apache.org/repos/asf/tez/blob/e10d80fe/tez-tools/pom.xml ---------------------------------------------------------------------- diff --git a/tez-tools/pom.xml b/tez-tools/pom.xml index bf8fdf8..ed13143 100644 --- a/tez-tools/pom.xml +++ b/tez-tools/pom.xml @@ -26,6 +26,9 @@ <artifactId>tez-tools</artifactId> <packaging>pom</packaging> + <modules> + <module>analyzers</module> + </modules> <build> <plugins> <plugin>
