Repository: tez
Updated Branches:
  refs/heads/master cf400882b -> e10d80fe6


TEZ-2645. Provide standard analyzers for job analysis (rbalamohan)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/e10d80fe
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/e10d80fe
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/e10d80fe

Branch: refs/heads/master
Commit: e10d80fe6008986900dadc51fc7823582d0dfb1a
Parents: cf40088
Author: Rajesh Balamohan <[email protected]>
Authored: Wed Jul 29 13:27:51 2015 +0530
Committer: Rajesh Balamohan <[email protected]>
Committed: Wed Jul 29 13:27:51 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 pom.xml                                         |   5 +
 .../tez/common/counters/FileSystemCounter.java  |   4 +
 .../analyzers/job-analyzer/findbugs-exclude.xml |  28 ++
 tez-tools/analyzers/job-analyzer/pom.xml        |  47 +++
 .../java/org/apache/tez/analyzer/Analyzer.java  |  64 ++++
 .../java/org/apache/tez/analyzer/CSVResult.java | 113 +++++++
 .../java/org/apache/tez/analyzer/Result.java    |  39 +++
 .../plugins/ContainerReuseAnalyzer.java         |  87 ++++++
 .../analyzer/plugins/CriticalPathAnalyzer.java  | 110 +++++++
 .../tez/analyzer/plugins/LocalityAnalyzer.java  | 193 ++++++++++++
 .../analyzer/plugins/ShuffleTimeAnalyzer.java   | 195 ++++++++++++
 .../tez/analyzer/plugins/SkewAnalyzer.java      | 309 +++++++++++++++++++
 .../tez/analyzer/plugins/SlowNodeAnalyzer.java  | 188 +++++++++++
 .../analyzer/plugins/SlowTaskIdentifier.java    | 109 +++++++
 .../analyzer/plugins/SlowestVertexAnalyzer.java | 188 +++++++++++
 .../tez/analyzer/plugins/SpillAnalyzerImpl.java | 126 ++++++++
 tez-tools/analyzers/pom.xml                     |  31 ++
 tez-tools/pom.xml                               |   3 +
 19 files changed, 1840 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/e10d80fe/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index f7f2e76..687f996 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -11,6 +11,7 @@ INCOMPATIBLE CHANGES
   TEZ-2468. Change the minimum Java version to Java 7.
 
 ALL CHANGES:
+  TEZ-2645. Provide standard analyzers for job analysis.
   TEZ-2627. Support for Tez Job Priorities.
   TEZ-2623. Fix module dependencies related to hadoop-auth.
   TEZ-2464. Move older releases to dist archive.

http://git-wip-us.apache.org/repos/asf/tez/blob/e10d80fe/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index ddb7324..a0b82e4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -180,6 +180,11 @@
         <version>${pig.version}</version>
       </dependency>
       <dependency>
+        <groupId>io.dropwizard.metrics</groupId>
+        <artifactId>metrics-core</artifactId>
+        <version>3.1.0</version>
+      </dependency>
+      <dependency>
         <groupId>org.roaringbitmap</groupId>
         <artifactId>RoaringBitmap</artifactId>
         <version>0.4.9</version>

http://git-wip-us.apache.org/repos/asf/tez/blob/e10d80fe/tez-api/src/main/java/org/apache/tez/common/counters/FileSystemCounter.java
----------------------------------------------------------------------
diff --git 
a/tez-api/src/main/java/org/apache/tez/common/counters/FileSystemCounter.java 
b/tez-api/src/main/java/org/apache/tez/common/counters/FileSystemCounter.java
index 57d1053..73e3581 100644
--- 
a/tez-api/src/main/java/org/apache/tez/common/counters/FileSystemCounter.java
+++ 
b/tez-api/src/main/java/org/apache/tez/common/counters/FileSystemCounter.java
@@ -27,4 +27,8 @@ public enum FileSystemCounter {
   READ_OPS,
   LARGE_READ_OPS,
   WRITE_OPS,
+  HDFS_BYTES_READ,
+  HDFS_BYTES_WRITTEN,
+  FILE_BYTES_READ,
+  FILE_BYTES_WRITTEN
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/e10d80fe/tez-tools/analyzers/job-analyzer/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/tez-tools/analyzers/job-analyzer/findbugs-exclude.xml 
b/tez-tools/analyzers/job-analyzer/findbugs-exclude.xml
new file mode 100644
index 0000000..5bebb05
--- /dev/null
+++ b/tez-tools/analyzers/job-analyzer/findbugs-exclude.xml
@@ -0,0 +1,28 @@
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+<FindBugsFilter>
+
+
+  <Match>
+    <Class name="org.apache.tez.analyzer.CSVResult"/>
+    <Bug pattern="EI_EXPOSE_REP2"/>
+  </Match>
+
+  <Match>
+    <Class name="org.apache.tez.analyzer.CSVResult"/>
+    <Bug pattern="EI_EXPOSE_REP"/>
+  </Match>
+
+
+</FindBugsFilter>

http://git-wip-us.apache.org/repos/asf/tez/blob/e10d80fe/tez-tools/analyzers/job-analyzer/pom.xml
----------------------------------------------------------------------
diff --git a/tez-tools/analyzers/job-analyzer/pom.xml 
b/tez-tools/analyzers/job-analyzer/pom.xml
new file mode 100644
index 0000000..6312a34
--- /dev/null
+++ b/tez-tools/analyzers/job-analyzer/pom.xml
@@ -0,0 +1,47 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.tez</groupId>
+    <artifactId>tez-perf-analyzer</artifactId>
+    <version>0.8.0-SNAPSHOT</version>
+  </parent>
+  <artifactId>job-analyzer</artifactId>
+
+  <dependencies>
+    <dependency>
+      <groupId>io.dropwizard.metrics</groupId>
+      <artifactId>metrics-core</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-history-parser</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.rat</groupId>
+        <artifactId>apache-rat-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>  
+</project>

http://git-wip-us.apache.org/repos/asf/tez/blob/e10d80fe/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/Analyzer.java
----------------------------------------------------------------------
diff --git 
a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/Analyzer.java
 
b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/Analyzer.java
new file mode 100644
index 0000000..6021c58
--- /dev/null
+++ 
b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/Analyzer.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.analyzer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.history.parser.datamodel.DagInfo;
+
+
+public interface Analyzer {
+
+  /**
+   * Analyze Dag
+   *
+   * @param dagInfo
+   * @throws TezException
+   */
+  public void analyze(DagInfo dagInfo) throws TezException;
+
+  /**
+   * Get the result of analysis
+   *
+   * @return analysis result
+   * @throws TezException
+   */
+  public Result getResult() throws TezException;
+
+  /**
+   * Get name of the analyzer
+   *
+   * @return name of analyze
+   */
+  public String getName();
+
+  /**
+   * Get description of the analyzer
+   *
+   * @return description of analyzer
+   */
+  public String getDescription();
+
+  /**
+   * Get config properties related to this analyzer
+   *
+   * @return config related to analyzer
+   */
+  public Configuration getConfiguration();
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/e10d80fe/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/CSVResult.java
----------------------------------------------------------------------
diff --git 
a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/CSVResult.java
 
b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/CSVResult.java
new file mode 100644
index 0000000..5e454b4
--- /dev/null
+++ 
b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/CSVResult.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.analyzer;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import org.apache.directory.api.util.Strings;
+import org.apache.tez.dag.api.TezException;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Simple placeholder for storing CSV results.
+ * Contains headers and records in string format.
+ */
+public class CSVResult implements Result {
+
+  private final String[] header;
+  private final List<String[]> recordsList;
+  private String comments;
+
+  public CSVResult(String[] header) {
+    this.header = header;
+    recordsList = Lists.newLinkedList();
+  }
+
+  public String[] getHeaders() {
+    return header;
+  }
+
+  public void addRecord(String[] record) {
+    Preconditions.checkArgument(record != null, "Record can't be null");
+    Preconditions.checkArgument(record.length == header.length, "Record 
length" + record.length +
+        " does not match header length " + header.length);
+    recordsList.add(record);
+  }
+
+  public Iterator<String[]> getRecordsIterator() {
+    return Iterators.unmodifiableIterator(recordsList.iterator());
+  }
+
+
+  public void setComments(String comments) {
+    this.comments = comments;
+  }
+
+  @Override public String toJson() throws TezException {
+    return "";
+  }
+
+  @Override public String getComments() {
+    return comments;
+  }
+
+  @Override public String toString() {
+    return "CSVResult{" +
+        "header=" + Arrays.toString(header) +
+        ", recordsList=" + recordsList +
+        '}';
+  }
+
+  //For testing
+  public void dumpToFile(String fileName) throws IOException {
+    OutputStreamWriter writer = new OutputStreamWriter(
+        new FileOutputStream(new File(fileName)),
+        Charset.forName("UTF-8").newEncoder());
+    BufferedWriter bw = new BufferedWriter(writer);
+    for (String[] record : recordsList) {
+
+      if (record.length != header.length) {
+        continue; //LOG error msg?
+      }
+
+      StringBuilder sb = new StringBuilder();
+      for(int i=0;i<record.length;i++) {
+        sb.append(Strings.isNotEmpty(record[i]) ? record[i] : " ");
+        if (i < record.length - 1) {
+          sb.append(",");
+        }
+      }
+      bw.write(sb.toString());
+      bw.newLine();
+    }
+    bw.flush();
+    bw.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/e10d80fe/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/Result.java
----------------------------------------------------------------------
diff --git 
a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/Result.java
 
b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/Result.java
new file mode 100644
index 0000000..d1881eb
--- /dev/null
+++ 
b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/Result.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.analyzer;
+
+import org.apache.tez.dag.api.TezException;
+
+public interface Result {
+
+  /**
+   * Convert result to json format
+   *
+   * @return json
+   * @throws TezException
+   */
+  public String toJson() throws TezException;
+
+  /**
+   * Recommendation / comments about the analysis if any.
+   *
+   * @return comments
+   */
+  public String getComments();
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/e10d80fe/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/ContainerReuseAnalyzer.java
----------------------------------------------------------------------
diff --git 
a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/ContainerReuseAnalyzer.java
 
b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/ContainerReuseAnalyzer.java
new file mode 100644
index 0000000..905f966
--- /dev/null
+++ 
b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/ContainerReuseAnalyzer.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.analyzer.plugins;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.analyzer.Analyzer;
+import org.apache.tez.analyzer.CSVResult;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.history.parser.datamodel.Container;
+import org.apache.tez.history.parser.datamodel.DagInfo;
+import org.apache.tez.history.parser.datamodel.TaskAttemptInfo;
+import org.apache.tez.history.parser.datamodel.VertexInfo;
+
+import java.util.List;
+
+
+/**
+ * Get container reuse information at a per vertex level basis.
+ */
+public class ContainerReuseAnalyzer implements Analyzer {
+
+  private final Configuration config;
+
+  private static final String[] headers =
+      { "vertexName", "taskAttempts", "node", "containerId", "reuseCount" };
+
+  private final CSVResult csvResult;
+
+  public ContainerReuseAnalyzer(Configuration config) {
+    this.config = config;
+    this.csvResult = new CSVResult(headers);
+  }
+
+  @Override
+  public void analyze(DagInfo dagInfo) throws TezException {
+    for (VertexInfo vertexInfo : dagInfo.getVertices()) {
+      Multimap<Container, TaskAttemptInfo> containers = 
vertexInfo.getContainersMapping();
+      for (Container container : containers.keySet()) {
+        List<String> record = Lists.newLinkedList();
+        record.add(vertexInfo.getVertexName());
+        record.add(vertexInfo.getTaskAttempts().size() + "");
+        record.add(container.getHost());
+        record.add(container.getId());
+        record.add(Integer.toString(containers.get(container).size()));
+        csvResult.addRecord(record.toArray(new String[record.size()]));
+      }
+    }
+  }
+
+  @Override
+  public CSVResult getResult() throws TezException {
+    return csvResult;
+  }
+
+  @Override
+  public String getName() {
+    return "Container Reuse Analyzer";
+  }
+
+  @Override
+  public String getDescription() {
+    return "Get details on container reuse analysis";
+  }
+
+  @Override
+  public Configuration getConfiguration() {
+    return config;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/e10d80fe/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java
----------------------------------------------------------------------
diff --git 
a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java
 
b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java
new file mode 100644
index 0000000..6748f3f
--- /dev/null
+++ 
b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.analyzer.plugins;
+
+import com.google.common.base.Functions;
+import com.google.common.collect.ImmutableSortedMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Ordering;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.analyzer.Analyzer;
+import org.apache.tez.analyzer.CSVResult;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.history.parser.datamodel.DagInfo;
+import org.apache.tez.history.parser.datamodel.VertexInfo;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Identify a set of vertices which fall in the critical path in a DAG.
+ */
+public class CriticalPathAnalyzer implements Analyzer {
+  private final Configuration config;
+
+  private static final String[] headers = { "CriticalPath", "Score" };
+
+  private final CSVResult csvResult;
+
+  public CriticalPathAnalyzer(Configuration config) {
+    this.config = config;
+    this.csvResult = new CSVResult(headers);
+  }
+
+  @Override public void analyze(DagInfo dagInfo) throws TezException {
+    Map<String, Long> result = Maps.newLinkedHashMap();
+    getCriticalPath("", dagInfo.getVertices().get(dagInfo.getVertices().size() 
- 1), 0, result);
+
+    System.out.println();
+    System.out.println();
+
+    for (Map.Entry<String, Long> entry : sortByValues(result).entrySet()) {
+      List<String> record = Lists.newLinkedList();
+      record.add(entry.getKey());
+      record.add(entry.getValue() + "");
+      csvResult.addRecord(record.toArray(new String[record.size()]));
+      System.out.println(entry.getKey() + ", " + entry.getValue());
+    }
+  }
+
+  @Override
+  public CSVResult getResult() throws TezException {
+    return csvResult;
+  }
+
+  @Override
+  public String getName() {
+    return "CriticalPathAnalyzer";
+  }
+
+  @Override
+  public String getDescription() {
+    return "Analyze critical path of the DAG";
+  }
+
+  @Override
+  public Configuration getConfiguration() {
+    return config;
+  }
+
+  private static Map<String, Long> sortByValues(Map<String, Long> result) {
+    //Sort result by time in reverse order
+    final Ordering<String> reversValueOrdering =
+        
Ordering.natural().reverse().nullsLast().onResultOf(Functions.forMap(result, 
null));
+    Map<String, Long> orderedMap = ImmutableSortedMap.copyOf(result, 
reversValueOrdering);
+    return orderedMap;
+  }
+
+  private static void getCriticalPath(String predecessor, VertexInfo dest, 
long time,
+      Map<String, Long> result) {
+    String destVertexName = (dest != null) ? (dest.getVertexName()) : "";
+
+    if (dest != null) {
+      time += dest.getTimeTaken();
+      predecessor += destVertexName + "-->";
+
+      for (VertexInfo incomingVertex : dest.getInputVertices()) {
+        getCriticalPath(predecessor, incomingVertex, time, result);
+      }
+
+      result.put(predecessor, time);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/e10d80fe/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/LocalityAnalyzer.java
----------------------------------------------------------------------
diff --git 
a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/LocalityAnalyzer.java
 
b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/LocalityAnalyzer.java
new file mode 100644
index 0000000..67b4c51
--- /dev/null
+++ 
b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/LocalityAnalyzer.java
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.analyzer.plugins;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.analyzer.Analyzer;
+import org.apache.tez.analyzer.CSVResult;
+import org.apache.tez.common.counters.DAGCounter;
+import org.apache.tez.common.counters.FileSystemCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.history.parser.datamodel.DagInfo;
+import org.apache.tez.history.parser.datamodel.TaskAttemptInfo;
+import org.apache.tez.history.parser.datamodel.VertexInfo;
+
+import java.util.List;
+import java.util.Map;
+
+
+/**
+ * Get locality information for tasks for vertices and get their task 
execution times.
+ * This would be helpeful to co-relate if the vertex runtime is anyways 
related to the data
+ * locality.
+ */
+public class LocalityAnalyzer implements Analyzer {
+
+  private final String[] headers = { "vertexName", "numTasks", 
"dataLocalRatio", "rackLocalRatio",
+      "otherRatio", "avgDataLocalTaskRuntime", "avgRackLocalTaskRuntime",
+      "avgOtherLocalTaskRuntime", "noOfInputs", "avgHDFSBytesRead_DataLocal",
+      "avgHDFSBytesRead_RackLocal", "avgHDFSBytesRead_Others", 
"recommendation" };
+
+  private static final String DATA_LOCAL_RATIO = 
"tez.locality-analyzer.data.local.ratio";
+  private static final float DATA_LOCAL_RATIO_DEFAULT = 0.5f;
+
+  private final Configuration config;
+
+  private final CSVResult csvResult;
+
+  public LocalityAnalyzer(Configuration config) {
+    this.config = config;
+    csvResult = new CSVResult(headers);
+  }
+
+  @Override
+  public void analyze(DagInfo dagInfo) throws TezException {
+    for (VertexInfo vertexInfo : dagInfo.getVertices()) {
+      String vertexName = vertexInfo.getVertexName();
+
+      Map<String, TezCounter> dataLocalTask = 
vertexInfo.getCounter(DAGCounter.class.getName(),
+          DAGCounter.DATA_LOCAL_TASKS.toString());
+      Map<String, TezCounter> rackLocalTask = 
vertexInfo.getCounter(DAGCounter.class.getName(),
+          DAGCounter.RACK_LOCAL_TASKS.toString());
+
+      long dataLocalTasks = 0;
+      long rackLocalTasks = 0;
+
+      if (!dataLocalTask.isEmpty()) {
+        dataLocalTasks = 
dataLocalTask.get(DAGCounter.class.getName()).getValue();
+      }
+
+      if (!rackLocalTask.isEmpty()) {
+        rackLocalTasks = 
rackLocalTask.get(DAGCounter.class.getName()).getValue();
+      }
+
+      long totalVertexTasks = vertexInfo.getNumTasks();
+
+      if (dataLocalTasks > 0 || rackLocalTasks > 0) {
+        //compute locality details.
+        float dataLocalRatio = dataLocalTasks * 1.0f / totalVertexTasks;
+        float rackLocalRatio = rackLocalTasks * 1.0f / totalVertexTasks;
+        float othersRatio = (totalVertexTasks - (dataLocalTasks + 
rackLocalTasks)) * 1.0f /
+            totalVertexTasks;
+
+        List<String> record = Lists.newLinkedList();
+        record.add(vertexName);
+        record.add(totalVertexTasks + "");
+        record.add(dataLocalRatio + "");
+        record.add(rackLocalRatio + "");
+        record.add(othersRatio + "");
+
+        TaskAttemptDetails dataLocalResult = computeAverages(vertexInfo,
+            DAGCounter.DATA_LOCAL_TASKS);
+        TaskAttemptDetails rackLocalResult = computeAverages(vertexInfo,
+            DAGCounter.RACK_LOCAL_TASKS);
+        TaskAttemptDetails otherTaskResult = computeAverages(vertexInfo,
+            DAGCounter.OTHER_LOCAL_TASKS);
+
+        record.add(dataLocalResult.avgRuntime + "");
+        record.add(rackLocalResult.avgRuntime + "");
+        record.add(otherTaskResult.avgRuntime + "");
+
+        //Get the number of inputs to this vertex
+        record.add(vertexInfo.getInputEdges().size() + "");
+
+        //Get the avg HDFS bytes read in this vertex for different type of 
locality
+        record.add(dataLocalResult.avgHDFSBytesRead + "");
+        record.add(rackLocalResult.avgHDFSBytesRead + "");
+        record.add(otherTaskResult.avgHDFSBytesRead + "");
+
+        String recommendation = "";
+        if (dataLocalRatio < config.getFloat(DATA_LOCAL_RATIO, 
DATA_LOCAL_RATIO_DEFAULT)) {
+          recommendation = "Data locality is poor for this vertex. Try tuning "
+              + 
TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS + ", "
+              + TezConfiguration.TEZ_AM_CONTAINER_REUSE_RACK_FALLBACK_ENABLED 
+ ", "
+              + 
TezConfiguration.TEZ_AM_CONTAINER_REUSE_NON_LOCAL_FALLBACK_ENABLED;
+        }
+
+        record.add(recommendation);
+        csvResult.addRecord(record.toArray(new String[record.size()]));
+      }
+    }
+  }
+
+  /**
+   * Compute counter averages for specific vertex
+   *
+   * @param vertexInfo
+   * @param counter
+   * @return task attempt details
+   */
+  private TaskAttemptDetails computeAverages(VertexInfo vertexInfo, DAGCounter 
counter) {
+    long totalTime = 0;
+    long totalTasks = 0;
+    long totalHDFSBytesRead = 0;
+
+    TaskAttemptDetails result = new TaskAttemptDetails();
+
+    for(TaskAttemptInfo attemptInfo : vertexInfo.getTaskAttempts()) {
+      Map<String, TezCounter> localityCounter = 
attemptInfo.getCounter(DAGCounter.class.getName(),
+          counter.toString());
+
+      if (!localityCounter.isEmpty() &&
+          localityCounter.get(DAGCounter.class.getName()).getValue() > 0) {
+        totalTime += attemptInfo.getTimeTaken();
+        totalTasks++;
+
+        //get HDFSBytes read counter
+        Map<String, TezCounter> hdfsBytesReadCounter = 
attemptInfo.getCounter(FileSystemCounter
+            .class.getName(), FileSystemCounter.HDFS_BYTES_READ.name());
+        for(Map.Entry<String, TezCounter> entry : 
hdfsBytesReadCounter.entrySet()) {
+          totalHDFSBytesRead += entry.getValue().getValue();
+        }
+      }
+    }
+    if (totalTasks > 0) {
+      result.avgRuntime = (totalTime * 1.0f / totalTasks);
+      result.avgHDFSBytesRead = (totalHDFSBytesRead * 1.0f / totalTasks);
+    }
+    return result;
+  }
+
+  @Override public CSVResult getResult() throws TezException {
+    return csvResult;
+  }
+
+  @Override public String getName() {
+    return "Locality Analyzer";
+  }
+
+  @Override public String getDescription() {
+    return "Analyze for locality information (data local, rack local, 
off-rack)";
+  }
+
+  @Override public Configuration getConfiguration() {
+    return config;
+  }
+
+  /**
+   * Placeholder for task attempt details
+   */
+  static class TaskAttemptDetails {
+    float avgHDFSBytesRead;
+    float avgRuntime;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/e10d80fe/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/ShuffleTimeAnalyzer.java
----------------------------------------------------------------------
diff --git 
a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/ShuffleTimeAnalyzer.java
 
b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/ShuffleTimeAnalyzer.java
new file mode 100644
index 0000000..8df40ba
--- /dev/null
+++ 
b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/ShuffleTimeAnalyzer.java
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.analyzer.plugins;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.analyzer.Analyzer;
+import org.apache.tez.analyzer.CSVResult;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.history.parser.datamodel.DagInfo;
+import org.apache.tez.history.parser.datamodel.TaskAttemptInfo;
+import org.apache.tez.history.parser.datamodel.VertexInfo;
+
+import java.util.List;
+import java.util.Map;
+
+
+/**
+ * Analyze the time taken by merge phase, shuffle phase, time taken to do 
realistic work etc in
+ * tasks.
+ *
+ * Just dump REDUCE_INPUT_GROUPS, REDUCE_INPUT_RECORDS, its ratio and 
SHUFFLE_BYTES for tasks
+ * grouped by vertices. Provide time taken as well.  Just render it as a table 
for now.
+ *
+ */
+public class ShuffleTimeAnalyzer implements Analyzer {
+
+  private static final String SHUFFLE_TIME_RATIO = 
"tez.shuffle-time-analyzer.shuffle.ratio";
+  private static final float SHUFFLE_TIME_RATIO_DEFAULT = 0.5f;
+
+  private static final String MIN_SHUFFLE_RECORDS = 
"tez.shuffle-time-analyzer.shuffle.min.records";
+  private static final long MIN_SHUFFLE_RECORDS_DEFAULT = 10000;
+
+  private static final String[] headers = { "vertexName", "taskAttemptId", 
"Node", "counterGroup",
+      "Comments", "REDUCE_INPUT_GROUPS", "REDUCE_INPUT_RECORDS", "ratio", 
"SHUFFLE_BYTES",
+      "Time taken to receive all events", "MERGE_PHASE_TIME", 
"SHUFFLE_PHASE_TIME",
+      "TimeTaken_For_Real_Task", "FIRST_EVENT_RECEIVED", "LAST_EVENT_RECEIVED",
+      "SHUFFLE_BYTES_DISK_DIRECT" };
+
+  private final CSVResult csvResult = new CSVResult(headers);
+
+  private final Configuration config;
+
+  private final float shuffleTimeRatio;
+  private final long minShuffleRecords;
+
+
+  public ShuffleTimeAnalyzer(Configuration config) {
+    this.config = config;
+
+    shuffleTimeRatio = config.getFloat
+        (SHUFFLE_TIME_RATIO, SHUFFLE_TIME_RATIO_DEFAULT);
+    minShuffleRecords = config.getLong(MIN_SHUFFLE_RECORDS, 
MIN_SHUFFLE_RECORDS_DEFAULT);
+  }
+
+  @Override
+  public void analyze(DagInfo dagInfo) throws TezException {
+
+    for (VertexInfo vertexInfo : dagInfo.getVertices()) {
+      for (TaskAttemptInfo attemptInfo : vertexInfo.getTaskAttempts()) {
+        //counter_group (basically source) --> counter
+        Map<String, TezCounter> reduceInputGroups = 
attemptInfo.getCounter(TaskCounter
+            .REDUCE_INPUT_GROUPS.toString());
+        Map<String, TezCounter> reduceInputRecords = 
attemptInfo.getCounter(TaskCounter
+            .REDUCE_INPUT_RECORDS.toString());
+
+        if (reduceInputGroups == null) {
+          continue;
+        }
+
+        for (Map.Entry<String, TezCounter> entry : 
reduceInputGroups.entrySet()) {
+          String counterGroupName = entry.getKey();
+          long reduceInputGroupsVal = entry.getValue().getValue();
+          long reduceInputRecordsVal = 
(reduceInputRecords.get(counterGroupName) != null) ?
+          reduceInputRecords.get(counterGroupName).getValue() : 0;
+
+          if (reduceInputRecordsVal <= 0) {
+            continue;
+          }
+          float ratio = (reduceInputGroupsVal * 1.0f / reduceInputRecordsVal);
+
+          if (ratio > 0 && reduceInputRecordsVal > minShuffleRecords) {
+            List<String> result = Lists.newLinkedList();
+            result.add(vertexInfo.getVertexName());
+            result.add(attemptInfo.getTaskAttemptId());
+            result.add(attemptInfo.getNodeId());
+            result.add(counterGroupName);
+
+            //Real work done in the task
+            long timeTakenForRealWork = attemptInfo.getTimeTaken() -
+                Long.parseLong(getCounterValue(TaskCounter.MERGE_PHASE_TIME, 
counterGroupName,
+                    attemptInfo));
+
+            String comments = "";
+            if ((timeTakenForRealWork * 1.0f / attemptInfo.getTimeTaken()) < 
shuffleTimeRatio) {
+              comments = "Time taken in shuffle is more than the actual work 
being done in task. "
+                  + " Check if source/destination machine is a slow node. 
Check if merge phase "
+                  + "time is more to understand disk bottlenecks in this node. 
 Check for skew";
+            }
+            result.add(comments);
+
+            result.add(reduceInputGroupsVal + "");
+            result.add(reduceInputRecordsVal + "");
+            result.add("" + (1.0f * reduceInputGroupsVal / 
reduceInputRecordsVal));
+            result.add(getCounterValue(TaskCounter.SHUFFLE_BYTES, 
counterGroupName, attemptInfo));
+
+            //Total time taken for receiving all events from source tasks
+            result.add(getOverheadFromSourceTasks(counterGroupName, 
attemptInfo));
+            result.add(getCounterValue(TaskCounter.MERGE_PHASE_TIME, 
counterGroupName, attemptInfo));
+            result.add(getCounterValue(TaskCounter.SHUFFLE_PHASE_TIME, 
counterGroupName, attemptInfo));
+
+
+            result.add(Long.toString(timeTakenForRealWork));
+
+            result.add(getCounterValue(TaskCounter.FIRST_EVENT_RECEIVED, 
counterGroupName, attemptInfo));
+            result.add(getCounterValue(TaskCounter.LAST_EVENT_RECEIVED, 
counterGroupName, attemptInfo));
+            result.add(getCounterValue(TaskCounter.SHUFFLE_BYTES_DISK_DIRECT, 
counterGroupName, attemptInfo));
+
+            csvResult.addRecord(result.toArray(new String[result.size()]));
+          }
+        }
+      }
+    }
+
+  }
+
+  /**
+   * Time taken to receive all events from source tasks
+   *
+   * @param counterGroupName
+   * @param attemptInfo
+   * @return String
+   */
+  private String getOverheadFromSourceTasks(String counterGroupName, 
TaskAttemptInfo attemptInfo) {
+    long firstEventReceived = 
Long.parseLong(getCounterValue(TaskCounter.FIRST_EVENT_RECEIVED,
+        counterGroupName, attemptInfo));
+    long lastEventReceived = 
Long.parseLong(getCounterValue(TaskCounter.LAST_EVENT_RECEIVED,
+        counterGroupName, attemptInfo));
+    return Long.toString(lastEventReceived - firstEventReceived);
+  }
+
+  private String getCounterValue(TaskCounter counter, String counterGroupName,
+      TaskAttemptInfo attemptInfo) {
+    Map<String, TezCounter> tezCounterMap = 
attemptInfo.getCounter(counter.toString());
+    if (tezCounterMap != null) {
+      for (Map.Entry<String, TezCounter> entry : tezCounterMap.entrySet()) {
+        String groupName = entry.getKey();
+        long val = entry.getValue().getValue();
+        if (groupName.equals(counterGroupName)) {
+          return Long.toString(val);
+        }
+      }
+    }
+    return "";
+  }
+
+  @Override
+  public CSVResult getResult() throws TezException {
+    return csvResult;
+  }
+
+  @Override
+  public String getName() {
+    return "Shuffle time analyzer";
+  }
+
+  @Override
+  public String getDescription() {
+    return "Analyze the time taken for shuffle, merge "
+        + "and the real work done in the task";
+  }
+
+  @Override
+  public Configuration getConfiguration() {
+    return config;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/e10d80fe/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SkewAnalyzer.java
----------------------------------------------------------------------
diff --git 
a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SkewAnalyzer.java
 
b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SkewAnalyzer.java
new file mode 100644
index 0000000..8152344
--- /dev/null
+++ 
b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SkewAnalyzer.java
@@ -0,0 +1,309 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.analyzer.plugins;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.analyzer.Analyzer;
+import org.apache.tez.analyzer.CSVResult;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.history.parser.datamodel.DagInfo;
+import org.apache.tez.history.parser.datamodel.TaskAttemptInfo;
+import org.apache.tez.history.parser.datamodel.VertexInfo;
+
+import java.util.List;
+import java.util.Map;
+
+
+/**
+ * <p/>
+ * Identify the skew (RECORD_INPUT_GROUPS / REDUCE_INPUT_RECORDS) ratio for 
all task attempts
+ * and report if they are below a certain threshold.
+ * <p/>
+ * <p/>
+ * - Case 1: Ratio of (reduce_input_groups / reduce_input_records) < 0.2  && 
SHUFFLE_BYTES > 1 GB
+ * per task attempt from a source. This means couple of keys having too many 
records. Either
+ * partitioning is wrong, or we need to increase memory limit for this vertex.
+ * <p/>
+ * - Case 2: Ratio of (reduce_input_groups / reduce_input_records) > 0.6 & 
Number of reduce input
+ * records in task attempt is closer to say 60% of overall number of records
+ * in vertex level & numTasks in vertex is greater than 1.  This might have 
any number of reducer
+ * groups.  This means that, partitioning is wrong (can also consider reducing 
number of tasks
+ * for that vertex). In some cases, too many reducers are launched and this 
can help find those.
+ * <p/>
+ * - Case 3: Ratio of (reduce_input_groups / reduce_input_records) is between 
0.2 & 0.6 per task
+ * attempt & numTasks is greater than 1 & SHUFFLE_BYTES > 1 GB per task 
attempt from a
+ * source. This means, may be consider increasing parallelism based on the 
task attempt runtime.
+ * <p/>
+ */
+public class SkewAnalyzer implements Analyzer {
+
+  private static final String SHUFFLE_BYTES_PER_ATTEMPT_PER_SOURCE = 
"tez.skew-analyzer.shuffle"
+      + ".bytes.per.source";
+  private static final long SHUFFLE_BYTES_PER_ATTEMPT_PER_SOURCE_DEFAULT = 900 
* 1024 * 1024l;
+
+  //Min reducer input group : reducer keys ratio for computation
+  private static final String ATTEMPT_SHUFFLE_KEY_GROUP_MIN_RATIO = 
"tez.skew-analyzer.shuffle.key"
+      + ".group.min.ratio";
+  private static final float ATTEMPT_SHUFFLE_KEY_GROUP_MIN_RATIO_DEFAULT = 
0.2f;
+
+  //Max reducer input group : reducer keys ratio for computation
+  private static final String ATTEMPT_SHUFFLE_KEY_GROUP_MAX_RATIO = 
"tez.skew-analyzer.shuffle.key"
+      + ".group.max.ratio";
+  private static final float ATTEMPT_SHUFFLE_KEY_GROUP_MAX_RATIO_DEFAULT = 
0.4f;
+
+
+
+  private static final String[] headers = { "vertexName", "taskAttemptId", 
"counterGroup", "node",
+      "REDUCE_INPUT_GROUPS", "REDUCE_INPUT_RECORDS", "ratio", "SHUFFLE_BYTES", 
"timeTaken",
+      "observation" };
+
+  private final CSVResult csvResult = new CSVResult(headers);
+
+  private final Configuration config;
+
+  private final float minRatio;
+  private final float maxRatio;
+  private final long maxShuffleBytesPerSource;
+
+  public SkewAnalyzer(Configuration config) {
+    this.config = config;
+    maxRatio = config.getFloat(ATTEMPT_SHUFFLE_KEY_GROUP_MAX_RATIO,
+        ATTEMPT_SHUFFLE_KEY_GROUP_MAX_RATIO_DEFAULT);
+    minRatio = config.getFloat(ATTEMPT_SHUFFLE_KEY_GROUP_MIN_RATIO,
+        ATTEMPT_SHUFFLE_KEY_GROUP_MIN_RATIO_DEFAULT);
+    maxShuffleBytesPerSource = 
config.getLong(SHUFFLE_BYTES_PER_ATTEMPT_PER_SOURCE,
+        SHUFFLE_BYTES_PER_ATTEMPT_PER_SOURCE_DEFAULT);
+  }
+
+  @Override
+  public void analyze(DagInfo dagInfo) throws TezException {
+    Preconditions.checkArgument(dagInfo != null, "DAG can't be null");
+    analyzeReducers(dagInfo);
+  }
+
+  private void analyzeReducers(DagInfo dagInfo) {
+    for (VertexInfo vertexInfo : dagInfo.getVertices()) {
+      for (TaskAttemptInfo attemptInfo : vertexInfo.getTaskAttempts()) {
+        analyzeGroupSkewPerSource(attemptInfo);
+        analyzeRecordSkewPerSource(attemptInfo);
+        analyzeForParallelism(attemptInfo);
+      }
+    }
+  }
+
+  /**
+   * Analyze scenario where couple keys are having too many records per source
+   *
+   * @param attemptInfo
+   */
+  private void analyzeGroupSkewPerSource(TaskAttemptInfo attemptInfo) {
+
+    //counter_group (basically source) --> counter
+    Map<String, TezCounter> reduceInputGroups = 
attemptInfo.getCounter(TaskCounter
+        .REDUCE_INPUT_GROUPS.toString());
+    Map<String, TezCounter> reduceInputRecords = 
attemptInfo.getCounter(TaskCounter
+        .REDUCE_INPUT_RECORDS.toString());
+    Map<String, TezCounter> shuffleBytes = 
attemptInfo.getCounter(TaskCounter.SHUFFLE_BYTES.toString());
+
+
+    //tez counter for every source
+    for (Map.Entry<String, TezCounter> entry : reduceInputGroups.entrySet()) {
+      if (entry.getKey().equals(TaskCounter.class.getName())) {
+        //TODO: Tez counters always ends up adding fgroups and groups, due to 
which we end up
+        // getting TaskCounter details as well.
+        continue;
+      }
+
+      String counterGroup = entry.getKey();
+      long inputGroupsCount = entry.getValue().getValue();
+      long inputRecordsCount = (reduceInputRecords.get(counterGroup) != null) 
? reduceInputRecords
+          .get(counterGroup).getValue() : 0;
+      long shuffleBytesPerSource = (shuffleBytes.get(counterGroup) != null) ? 
shuffleBytes.get
+          (counterGroup).getValue() : 0;
+
+      float ratio = (inputGroupsCount * 1.0f / inputRecordsCount);
+
+      //Case 1: Couple of keys having too many records per source.
+      if (shuffleBytesPerSource > maxShuffleBytesPerSource) {
+        if (ratio < minRatio) {
+          List<String> result = Lists.newLinkedList();
+          
result.add(attemptInfo.getTaskInfo().getVertexInfo().getVertexName());
+          result.add(attemptInfo.getTaskAttemptId());
+          result.add(counterGroup);
+          result.add(attemptInfo.getNodeId());
+          result.add(inputGroupsCount + "");
+          result.add(inputRecordsCount + "");
+          result.add(ratio + "");
+          result.add(shuffleBytesPerSource + "");
+          result.add(attemptInfo.getTimeTaken() + "");
+          result.add("Please check partitioning. Otherwise consider increasing 
memLimit");
+
+          csvResult.addRecord(result.toArray(new String[result.size()]));
+        }
+      }
+    }
+  }
+
+  /**
+   * Analyze scenario where one task is getting > 60% of the vertex level 
records
+   *
+   * @param attemptInfo
+   */
+  private void analyzeRecordSkewPerSource(TaskAttemptInfo attemptInfo) {
+
+    Map<String, TezCounter> vertexLevelReduceInputRecords =
+        attemptInfo.getTaskInfo().getVertexInfo()
+            .getCounter(TaskCounter.REDUCE_INPUT_RECORDS.toString());
+
+    int vertexNumTasks = 
attemptInfo.getTaskInfo().getVertexInfo().getNumTasks();
+
+    //counter_group (basically source) --> counter
+    Map<String, TezCounter> reduceInputGroups = 
attemptInfo.getCounter(TaskCounter
+        .REDUCE_INPUT_GROUPS.toString());
+    Map<String, TezCounter> reduceInputRecords = 
attemptInfo.getCounter(TaskCounter
+        .REDUCE_INPUT_RECORDS.toString());
+    Map<String, TezCounter> shuffleBytes = 
attemptInfo.getCounter(TaskCounter.SHUFFLE_BYTES.toString());
+
+
+    //tez counter for every source
+    for (Map.Entry<String, TezCounter> entry : reduceInputGroups.entrySet()) {
+      if (entry.getKey().equals(TaskCounter.class.getName())) {
+        //TODO: Tez counters always ends up adding fgroups and groups, due to 
which we end up
+        // getting TaskCounter details as well.
+        continue;
+      }
+
+      String counterGroup = entry.getKey();
+      long inputGroupsCount = entry.getValue().getValue();
+      long inputRecordsCount = (reduceInputRecords.get(counterGroup) != null) 
? reduceInputRecords
+          .get(counterGroup).getValue() : 0;
+      long shuffleBytesPerSource = (shuffleBytes.get(counterGroup) != null) 
?shuffleBytes.get
+          (counterGroup).getValue() : 0;
+      long vertexLevelInputRecordsCount = 
(vertexLevelReduceInputRecords.get(counterGroup) !=
+          null) ?
+          vertexLevelReduceInputRecords.get(counterGroup).getValue() : 0;
+
+      float ratio = (inputRecordsCount * 1.0f / vertexLevelInputRecordsCount);
+
+      if (vertexNumTasks > 1) {
+        if (ratio > maxRatio) {
+          //input records > 60% of vertex level record count
+          if (inputRecordsCount > (vertexLevelInputRecordsCount * 0.60)) {
+            List<String> result = Lists.newLinkedList();
+            
result.add(attemptInfo.getTaskInfo().getVertexInfo().getVertexName());
+            result.add(attemptInfo.getTaskAttemptId());
+            result.add(counterGroup);
+            result.add(attemptInfo.getNodeId());
+            result.add(inputGroupsCount + "");
+            result.add(inputRecordsCount + "");
+            result.add(ratio + "");
+            result.add(shuffleBytesPerSource + "");
+            result.add(attemptInfo.getTimeTaken() + "");
+            result.add("Some task attempts are getting > 60% of reduce input 
records. "
+                + "Consider adjusting parallelism & check partition logic");
+
+            csvResult.addRecord(result.toArray(new String[result.size()]));
+
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Analyze scenario where a vertex would need to increase parallelism
+   *
+   * @param attemptInfo
+   */
+  private void analyzeForParallelism(TaskAttemptInfo attemptInfo) {
+
+    //counter_group (basically source) --> counter
+    Map<String, TezCounter> reduceInputGroups = 
attemptInfo.getCounter(TaskCounter
+        .REDUCE_INPUT_GROUPS.toString());
+    Map<String, TezCounter> reduceInputRecords = 
attemptInfo.getCounter(TaskCounter
+        .REDUCE_INPUT_RECORDS.toString());
+    Map<String, TezCounter> shuffleBytes = 
attemptInfo.getCounter(TaskCounter.SHUFFLE_BYTES.toString());
+
+    //tez counter for every source
+    for (Map.Entry<String, TezCounter> entry : reduceInputGroups.entrySet()) {
+      if (entry.getKey().equals(TaskCounter.class.getName())) {
+        //TODO: Tez counters always ends up adding fgroups and groups, due to 
which we end up
+        // getting TaskCounter details as well.
+        continue;
+      }
+
+      String counterGroup = entry.getKey();
+      long inputGroupsCount = entry.getValue().getValue();
+      long inputRecordsCount = (reduceInputRecords.get(counterGroup) != null) 
? reduceInputRecords
+          .get(counterGroup).getValue() : 0;
+      long shuffleBytesPerSource = (shuffleBytes.get(counterGroup) != null) ? 
shuffleBytes.get
+          (counterGroup).getValue() : 0;
+
+      float ratio = (inputGroupsCount * 1.0f / inputRecordsCount);
+
+      //Case 3: Shuffle_Bytes > 1 GB.  Ratio between 0.2 & < 0.6. Consider 
increasing
+      // parallelism based on task runtime.
+      if (shuffleBytesPerSource > 
SHUFFLE_BYTES_PER_ATTEMPT_PER_SOURCE_DEFAULT) {
+        if (ratio > minRatio && ratio < maxRatio) {
+          //couple of keys have too many records. Classic case of partition 
issue.
+          List<String> result = Lists.newLinkedList();
+          
result.add(attemptInfo.getTaskInfo().getVertexInfo().getVertexName());
+          result.add(attemptInfo.getTaskAttemptId());
+          result.add(counterGroup);
+          result.add(attemptInfo.getNodeId());
+          result.add(inputGroupsCount + "");
+          result.add(inputRecordsCount + "");
+          result.add(ratio + "");
+          result.add(shuffleBytesPerSource + "");
+          result.add(attemptInfo.getTimeTaken() + "");
+          result.add("Consider increasing parallelism.");
+
+          csvResult.addRecord(result.toArray(new String[result.size()]));
+        }
+      }
+    }
+
+
+  }
+
+
+  @Override
+  public CSVResult getResult() throws TezException {
+    return csvResult;
+  }
+
+  @Override
+  public String getName() {
+    return "Skew Analyzer";
+  }
+
+  @Override
+  public String getDescription() {
+    return "Analyzer reducer skews by mining reducer task counters";
+  }
+
+  @Override
+  public Configuration getConfiguration() {
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/e10d80fe/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowNodeAnalyzer.java
----------------------------------------------------------------------
diff --git 
a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowNodeAnalyzer.java
 
b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowNodeAnalyzer.java
new file mode 100644
index 0000000..407cf47
--- /dev/null
+++ 
b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowNodeAnalyzer.java
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.analyzer.plugins;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.analyzer.Analyzer;
+import org.apache.tez.analyzer.CSVResult;
+import org.apache.tez.common.counters.FileSystemCounter;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
+import org.apache.tez.history.parser.datamodel.DagInfo;
+import org.apache.tez.history.parser.datamodel.TaskAttemptInfo;
+
+import java.util.Collection;
+import java.util.List;
+
+
+/**
+ * This will provide the set of nodes participated in the DAG in descending 
order of task execution
+ * time.
+ * <p/>
+ * Combine it with other counters to understand slow nodes better.
+ */
+public class SlowNodeAnalyzer implements Analyzer {
+
+  private static final Log LOG = LogFactory.getLog(SlowNodeAnalyzer.class);
+
+  private static final String[] headers = { "nodeName", "noOfTasksExecuted", 
"noOfKilledTasks",
+      "noOfFailedTasks", "avgSucceededTaskExecutionTime", 
"avgKilledTaskExecutionTime",
+      "avgFailedTaskExecutionTime", "avgHDFSBytesRead", "avgHDFSBytesWritten",
+      "avgFileBytesRead", "avgFileBytesWritten", "avgGCTimeMillis", 
"avgCPUTimeMillis" };
+
+  private final CSVResult csvResult = new CSVResult(headers);
+
+  private final Configuration config;
+
+  public SlowNodeAnalyzer(Configuration config) {
+    this.config = config;
+  }
+
+  @Override
+  public void analyze(DagInfo dagInfo) throws TezException {
+    Multimap<String, TaskAttemptInfo> nodeDetails = dagInfo.getNodeDetails();
+    for (String nodeName : nodeDetails.keySet()) {
+      List<String> record = Lists.newLinkedList();
+
+      Collection<TaskAttemptInfo> taskAttemptInfos = nodeDetails.get(nodeName);
+
+      record.add(nodeName);
+      record.add(taskAttemptInfos.size() + "");
+      record.add(getNumberOfTasks(taskAttemptInfos, TaskAttemptState.KILLED) + 
"");
+      record.add(getNumberOfTasks(taskAttemptInfos, TaskAttemptState.FAILED) + 
"");
+
+      Iterable<TaskAttemptInfo> succeedTasks = 
getFilteredTaskAttempts(taskAttemptInfos,
+          TaskAttemptState.SUCCEEDED);
+      record.add(getAvgTaskExecutionTime(succeedTasks) + "");
+
+      Iterable<TaskAttemptInfo> killedTasks = 
getFilteredTaskAttempts(taskAttemptInfos,
+          TaskAttemptState.KILLED);
+      record.add(getAvgTaskExecutionTime(killedTasks) + "");
+
+      Iterable<TaskAttemptInfo> failedTasks = 
getFilteredTaskAttempts(taskAttemptInfos,
+          TaskAttemptState.FAILED);
+      record.add(getAvgTaskExecutionTime(failedTasks) + "");
+
+      record.add(getAvgCounter(taskAttemptInfos, FileSystemCounter.class
+          .getName(), FileSystemCounter.HDFS_BYTES_READ.name()) + "");
+      record.add(getAvgCounter(taskAttemptInfos, FileSystemCounter.class
+          .getName(), FileSystemCounter.HDFS_BYTES_WRITTEN.name()) + "");
+      record.add(getAvgCounter(taskAttemptInfos, FileSystemCounter.class
+          .getName(), FileSystemCounter.FILE_BYTES_READ.name()) + "");
+      record.add(getAvgCounter(taskAttemptInfos, FileSystemCounter.class
+          .getName(), FileSystemCounter.FILE_BYTES_WRITTEN.name()) + "");
+      record.add(getAvgCounter(taskAttemptInfos, TaskCounter.class
+          .getName(), TaskCounter.GC_TIME_MILLIS.name()) + "");
+      record.add(getAvgCounter(taskAttemptInfos, TaskCounter.class
+              .getName(), TaskCounter.CPU_MILLISECONDS.name()) + "");
+
+          csvResult.addRecord(record.toArray(new String[record.size()]));
+    }
+  }
+
+  private Iterable<TaskAttemptInfo> 
getFilteredTaskAttempts(Collection<TaskAttemptInfo>
+      taskAttemptInfos, final TaskAttemptState status) {
+    return Iterables.filter(taskAttemptInfos, new
+        Predicate<TaskAttemptInfo>() {
+          @Override public boolean apply(TaskAttemptInfo input) {
+            return input.getStatus().equalsIgnoreCase(status.toString());
+          }
+        });
+  }
+
+  private float getAvgTaskExecutionTime(Iterable<TaskAttemptInfo> 
taskAttemptInfos) {
+    long totalTime = 0;
+    int size = 0;
+    for (TaskAttemptInfo attemptInfo : taskAttemptInfos) {
+      totalTime += attemptInfo.getTimeTaken();
+      size++;
+    }
+    return (size > 0) ? (totalTime * 1.0f / size) : 0;
+  }
+
+  private int getNumberOfTasks(Collection<TaskAttemptInfo> taskAttemptInfos, 
TaskAttemptState
+      status) {
+    int tasks = 0;
+    for (TaskAttemptInfo attemptInfo : taskAttemptInfos) {
+      if (attemptInfo.getStatus().equalsIgnoreCase(status.toString())) {
+        tasks++;
+      }
+    }
+    return tasks;
+  }
+
+  private float getAvgCounter(Collection<TaskAttemptInfo> taskAttemptInfos, 
String
+      counterGroupName, String counterName) {
+    long total = 0;
+    int taskCount = 0;
+    for (TaskAttemptInfo attemptInfo : taskAttemptInfos) {
+      TezCounters tezCounters = attemptInfo.getTezCounters();
+      TezCounter counter = tezCounters.findCounter(counterGroupName, 
counterName);
+      if (counter != null) {
+        total += counter.getValue();
+        taskCount++;
+      } else {
+        LOG.info("Could not find counterGroupName=" + counterGroupName + ", 
counter=" +
+            counterName + " in " + attemptInfo);
+      }
+    }
+    return (taskCount > 0) ? (total * 1.0f / taskCount) : 0;
+  }
+
+  @Override
+  public CSVResult getResult() throws TezException {
+    return csvResult;
+  }
+
+  @Override
+  public String getName() {
+    return "Slow Node Analyzer";
+  }
+
+  @Override
+  public String getDescription() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("Analyze node details for the DAG.").append("\n");
+    sb.append("This could be used to find out the set of nodes where the tasks 
are taking more "
+        + "time on average.").append("\n");
+    sb.append("This could be used to find out the set of nodes where the tasks 
are taking more "
+        + "time on average and to understand whether too many tasks got 
scheduled on a node.")
+        .append("\n");
+    sb.append("One needs to combine the task execution time with other metrics 
like bytes "
+        + "read/written etc to get better idea of bad nodes. In order to 
understand the slow "
+        + "nodes due to network, it might be worthwhile to consider the 
shuffle performance "
+        + "analyzer tool in tez-tools").append("\n");
+    return sb.toString();
+  }
+
+  @Override
+  public Configuration getConfiguration() {
+    return config;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/e10d80fe/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowTaskIdentifier.java
----------------------------------------------------------------------
diff --git 
a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowTaskIdentifier.java
 
b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowTaskIdentifier.java
new file mode 100644
index 0000000..7c7f5c0
--- /dev/null
+++ 
b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowTaskIdentifier.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.analyzer.plugins;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.analyzer.Analyzer;
+import org.apache.tez.analyzer.CSVResult;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.history.parser.datamodel.DagInfo;
+import org.apache.tez.history.parser.datamodel.TaskAttemptInfo;
+import org.apache.tez.history.parser.datamodel.VertexInfo;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+
+/**
+ * Analyze slow tasks in the DAG. Top 100 tasks are listed by default.
+ *
+ * <p/>
+ * //TODO: We do not get counters for killed task attempts yet.
+ */
+public class SlowTaskIdentifier implements Analyzer {
+
+  private static final String[] headers = { "vertexName", "taskAttemptId",
+      "Node", "taskDuration", "Status",
+      "NoOfInputs" };
+
+  private final CSVResult csvResult;
+
+  private static final String NO_OF_TASKS = 
"tez.slow-task-analyzer.task.count";
+  private static final int NO_OF_TASKS_DEFAULT = 100;
+
+  private final Configuration config;
+
+  public SlowTaskIdentifier(Configuration config) {
+    this.config = config;
+    this.csvResult = new CSVResult(headers);
+  }
+
+  @Override
+  public void analyze(DagInfo dagInfo) throws TezException {
+    List<TaskAttemptInfo> taskAttempts = Lists.newArrayList();
+    for(VertexInfo vertexInfo : dagInfo.getVertices()) {
+      taskAttempts.addAll(vertexInfo.getTaskAttempts());
+    }
+
+    //sort them by runtime in descending order
+    Collections.sort(taskAttempts, new Comparator<TaskAttemptInfo>() {
+      @Override public int compare(TaskAttemptInfo o1, TaskAttemptInfo o2) {
+        return (o1.getTimeTaken() > o2.getTimeTaken()) ? -1 :
+            ((o1.getTimeTaken() == o2.getTimeTaken()) ?
+                0 : 1);
+      }
+    });
+
+    int limit = config.getInt(NO_OF_TASKS, NO_OF_TASKS_DEFAULT);
+    for(int i=0;i<limit;i++) {
+      List<String> record = Lists.newLinkedList();
+      
record.add(taskAttempts.get(i).getTaskInfo().getVertexInfo().getVertexName());
+      record.add(taskAttempts.get(i).getTaskAttemptId());
+      record.add(taskAttempts.get(i).getContainer().getHost());
+      record.add(taskAttempts.get(i).getTimeTaken() + "");
+      record.add(taskAttempts.get(i).getStatus());
+      
record.add(taskAttempts.get(i).getTaskInfo().getVertexInfo().getInputEdges().size()
 + "");
+
+      csvResult.addRecord(record.toArray(new String[record.size()]));
+    }
+
+  }
+
+  @Override
+  public CSVResult getResult() throws TezException {
+    return csvResult;
+  }
+
+  @Override
+  public String getName() {
+    return "Slow Task Identifier";
+  }
+
+  @Override
+  public String getDescription() {
+    return "Identifies slow tasks in the DAG";
+  }
+
+  @Override
+  public Configuration getConfiguration() {
+    return config;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/e10d80fe/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowestVertexAnalyzer.java
----------------------------------------------------------------------
diff --git 
a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowestVertexAnalyzer.java
 
b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowestVertexAnalyzer.java
new file mode 100644
index 0000000..7364506
--- /dev/null
+++ 
b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowestVertexAnalyzer.java
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.analyzer.plugins;
+
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.MetricRegistry;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.analyzer.Analyzer;
+import org.apache.tez.analyzer.CSVResult;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.history.parser.datamodel.DagInfo;
+import org.apache.tez.history.parser.datamodel.TaskAttemptInfo;
+import org.apache.tez.history.parser.datamodel.VertexInfo;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Identify the slowest vertex in the DAG.
+ */
+public class SlowestVertexAnalyzer implements Analyzer {
+
+  private static final String[] headers = { "vertexName", "taskAttempts", 
"totalTime",
+      "shuffleTime", "shuffleTime_Max", "LastEventReceived", 
"LastEventReceivedFrom",
+      "TimeTaken_ForRealWork", "75thPercentile", "95thPercentile", 
"98thPercentile", "Median",
+      "observation", "comments" };
+
+  private final CSVResult csvResult = new CSVResult(headers);
+
+  private final Configuration config;
+  private final MetricRegistry metrics = new MetricRegistry();
+  private Histogram taskAttemptRuntimeHistorgram;
+
+  public SlowestVertexAnalyzer(Configuration config) {
+    this.config = config;
+  }
+
+  @Override
+  public void analyze(DagInfo dagInfo) throws TezException {
+
+    for (VertexInfo vertexInfo : dagInfo.getVertices()) {
+      String vertexName = vertexInfo.getVertexName();
+      long totalTime = vertexInfo.getTimeTaken();
+
+      long max = Long.MIN_VALUE;
+      String maxSourceName = "";
+      taskAttemptRuntimeHistorgram = metrics.histogram(vertexName);
+
+
+      for (TaskAttemptInfo attemptInfo : vertexInfo.getTaskAttempts()) {
+
+        taskAttemptRuntimeHistorgram.update(attemptInfo.getTimeTaken());
+
+        //Get the last event received from the incoming vertices
+        Map<String, TezCounter> lastEventReceivedMap = attemptInfo.getCounter(
+            TaskCounter.LAST_EVENT_RECEIVED.toString());
+
+        for (Map.Entry<String, TezCounter> entry : 
lastEventReceivedMap.entrySet()) {
+          if (entry.getKey().equals(TaskCounter.class.getName())) {
+            //TODO: Tez counters always ends up adding fgroups and groups, due 
to which we end up
+            // getting TaskCounter details as well.
+            continue;
+          }
+          //Find the slowest last event received
+          if (entry.getValue().getValue() > max) {
+            //w.r.t vertex start time.
+            max =(attemptInfo.getStartTime() +  entry.getValue().getValue()) -
+                (vertexInfo.getStartTime());
+            maxSourceName = entry.getKey();
+          }
+        }
+      }
+
+      long shuffleMax = Long.MIN_VALUE;
+      String shuffleMaxSource = "";
+      for (TaskAttemptInfo attemptInfo : vertexInfo.getTaskAttempts()) {
+        //Get the last event received from the incoming vertices
+        Map<String, TezCounter> lastEventReceivedMap = attemptInfo.getCounter(
+            TaskCounter.SHUFFLE_PHASE_TIME.toString());
+
+        for (Map.Entry<String, TezCounter> entry : 
lastEventReceivedMap.entrySet()) {
+          if (entry.getKey().equals(TaskCounter.class.getName())) {
+            //ignore. TODO: hack for taskcounter issue
+            continue;
+          }
+          //Find the slowest last event received
+          if (entry.getValue().getValue() > shuffleMax) {
+            //w.r.t vertex start time.
+            shuffleMax =(attemptInfo.getStartTime() +  
entry.getValue().getValue()) -
+                (vertexInfo.getStartTime());
+            shuffleMaxSource = entry.getKey();
+          }
+        }
+      }
+
+      String comments = "";
+
+      List<String> record = Lists.newLinkedList();
+      record.add(vertexName);
+      record.add(vertexInfo.getTaskAttempts().size() + "");
+      record.add(totalTime + "");
+      record.add(Math.max(0, shuffleMax) + "");
+      record.add(shuffleMaxSource);
+      record.add(Math.max(0, max) + "");
+      record.add(maxSourceName);
+      record.add(Math.max(0,(totalTime - max)) + "");
+
+      StringBuilder sb = new StringBuilder();
+      double percentile75 = 
taskAttemptRuntimeHistorgram.getSnapshot().get75thPercentile();
+      double percentile95 = 
taskAttemptRuntimeHistorgram.getSnapshot().get95thPercentile();
+      double percentile98 = 
taskAttemptRuntimeHistorgram.getSnapshot().get98thPercentile();
+      double percentile99 = 
taskAttemptRuntimeHistorgram.getSnapshot().get99thPercentile();
+      double medianAttemptRuntime = 
taskAttemptRuntimeHistorgram.getSnapshot().getMedian();
+
+      record.add("75th=" + percentile75);
+      record.add("95th=" + percentile95);
+      record.add("98th=" + percentile98);
+      record.add("median=" + medianAttemptRuntime);
+
+      if (percentile75 / percentile99 < 0.5) {
+        //looks like some straggler task is there.
+        sb.append("Looks like some straggler task is there");
+      }
+
+      record.add(sb.toString());
+
+      if (totalTime > 0 && vertexInfo.getTaskAttempts().size() > 0) {
+        if ((shuffleMax * 1.0f / totalTime) > 0.5) {
+          if ((max * 1.0f / totalTime) > 0.5) {
+            comments = "This vertex is slow due to its dependency on parent. 
Got a lot delayed last"
+                + " event received";
+          } else {
+            comments =
+                "Spending too much time on shuffle. Check shuffle bytes from 
previous vertex";
+          }
+        } else {
+          if (totalTime > 10000) { //greater than 10 seconds. //TODO: 
Configure it later.
+            comments = "Concentrate on this vertex (totalTime > 10 seconds)";
+          }
+        }
+      }
+
+      record.add(comments);
+      csvResult.addRecord(record.toArray(new String[record.size()]));
+    }
+  }
+
+
+  @Override
+  public CSVResult getResult() throws TezException {
+    return csvResult;
+  }
+
+  @Override
+  public String getName() {
+    return "SlowVertexAnalyzer";
+  }
+
+  @Override
+  public String getDescription() {
+    return "Identify the slowest vertex in the DAG, which needs to be looked 
into first";
+  }
+
+  @Override
+  public Configuration getConfiguration() {
+    return config;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/e10d80fe/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SpillAnalyzerImpl.java
----------------------------------------------------------------------
diff --git 
a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SpillAnalyzerImpl.java
 
b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SpillAnalyzerImpl.java
new file mode 100644
index 0000000..c650104
--- /dev/null
+++ 
b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SpillAnalyzerImpl.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.analyzer.plugins;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.analyzer.Analyzer;
+import org.apache.tez.analyzer.CSVResult;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.history.parser.datamodel.DagInfo;
+import org.apache.tez.history.parser.datamodel.TaskAttemptInfo;
+import org.apache.tez.history.parser.datamodel.VertexInfo;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+
+import java.util.List;
+import java.util.Map;
+
+
+/**
+ * Find out tasks which have more than 1 spill (ADDITIONAL_SPILL_COUNT).
+ * <p/>
+ * Accompany this with OUTPUT_BYTES (> 1 GB data written)
+ */
+public class SpillAnalyzerImpl implements Analyzer {
+
+  private static final String[] headers = { "vertexName", "taskAttemptId",
+      "Node", "counterGroupName",
+      "spillCount", "taskDuration",
+      "OUTPUT_BYTES", "OUTPUT_RECORDS",
+      "SPILLED_RECORDS", "Recommendation" };
+
+  private final CSVResult csvResult;
+
+  private static long OUTPUT_BYTES_THRESHOLD = 1 * 1024 * 1024 * 1024l;
+
+  private final Configuration config;
+
+  public SpillAnalyzerImpl(Configuration config) {
+    this.config = config;
+    this.csvResult = new CSVResult(headers);
+  }
+
+  @Override
+  public void analyze(DagInfo dagInfo) throws TezException {
+    for (VertexInfo vertexInfo : dagInfo.getVertices()) {
+      String vertexName = vertexInfo.getVertexName();
+
+      for (TaskAttemptInfo attemptInfo : vertexInfo.getTaskAttempts()) {
+        //Get ADDITIONAL_SPILL_COUNT, OUTPUT_BYTES for every source
+        Map<String, TezCounter> spillCountMap =
+            attemptInfo.getCounter(TaskCounter.ADDITIONAL_SPILL_COUNT.name());
+        Map<String, TezCounter> spilledRecordsMap =
+            attemptInfo.getCounter(TaskCounter.SPILLED_RECORDS.name());
+        Map<String, TezCounter> outputRecordsMap =
+            attemptInfo.getCounter(TaskCounter.OUTPUT_RECORDS.name());
+
+        Map<String, TezCounter> outputBytesMap =
+            attemptInfo.getCounter(TaskCounter.OUTPUT_BYTES.name());
+
+        for (Map.Entry<String, TezCounter> entry : spillCountMap.entrySet()) {
+          String source = entry.getKey();
+          long spillCount = entry.getValue().getValue();
+          long outBytes = outputBytesMap.get(source).getValue();
+
+          long outputRecords = outputRecordsMap.get(source).getValue();
+          long spilledRecords = spilledRecordsMap.get(source).getValue();
+
+          if (spillCount > 1 && outBytes > OUTPUT_BYTES_THRESHOLD) {
+            List<String> recorList = Lists.newLinkedList();
+            recorList.add(vertexName);
+            recorList.add(attemptInfo.getTaskAttemptId());
+            recorList.add(attemptInfo.getNodeId());
+            recorList.add(source);
+            recorList.add(spillCount + "");
+            recorList.add(attemptInfo.getTimeTaken() + "");
+            recorList.add(outBytes + "");
+            recorList.add(outputRecords + "");
+            recorList.add(spilledRecords + "");
+            recorList.add("Consider increasing " + 
TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB
+                + ", try increasing container size.");
+
+            csvResult.addRecord(recorList.toArray(new 
String[recorList.size()]));
+          }
+        }
+      }
+    }
+  }
+
+  @Override
+  public CSVResult getResult() throws TezException {
+    return csvResult;
+  }
+
+  @Override
+  public String getName() {
+    return "SpillAnalyzer";
+  }
+
+  @Override
+  public String getDescription() {
+    return "Analyze spill details in the task";
+  }
+
+  @Override
+  public Configuration getConfiguration() {
+    return config;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/e10d80fe/tez-tools/analyzers/pom.xml
----------------------------------------------------------------------
diff --git a/tez-tools/analyzers/pom.xml b/tez-tools/analyzers/pom.xml
new file mode 100644
index 0000000..97ce541
--- /dev/null
+++ b/tez-tools/analyzers/pom.xml
@@ -0,0 +1,31 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.tez</groupId>
+    <artifactId>tez-tools</artifactId>
+    <version>0.8.0-SNAPSHOT</version>
+  </parent>
+  <artifactId>tez-perf-analyzer</artifactId>
+  <packaging>pom</packaging>
+
+  <modules>
+    <module>job-analyzer</module>
+  </modules>
+</project>

http://git-wip-us.apache.org/repos/asf/tez/blob/e10d80fe/tez-tools/pom.xml
----------------------------------------------------------------------
diff --git a/tez-tools/pom.xml b/tez-tools/pom.xml
index bf8fdf8..ed13143 100644
--- a/tez-tools/pom.xml
+++ b/tez-tools/pom.xml
@@ -26,6 +26,9 @@
   <artifactId>tez-tools</artifactId>
   <packaging>pom</packaging>
 
+  <modules>
+    <module>analyzers</module>
+  </modules>
   <build>
     <plugins>
       <plugin>

Reply via email to