TEZ-2692. bugfixes & enhancements related to job parser and analyzer 
(rbalamohan)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/ecd90dc1
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/ecd90dc1
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/ecd90dc1

Branch: refs/heads/master
Commit: ecd90dc1f79a4b1614174b75030b85feb8842793
Parents: eadbfec
Author: Rajesh Balamohan <[email protected]>
Authored: Tue Aug 11 18:19:55 2015 +0530
Committer: Rajesh Balamohan <[email protected]>
Committed: Tue Aug 11 18:19:55 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../java/org/apache/tez/dag/utils/Graph.java    |  15 +-
 .../tez/history/parser/SimpleHistoryParser.java | 239 ++++++
 .../history/parser/datamodel/VertexInfo.java    |  65 +-
 .../apache/tez/history/TestATSFileParser.java   | 587 --------------
 .../apache/tez/history/TestHistoryParser.java   | 785 +++++++++++++++++++
 tez-tools/analyzers/job-analyzer/pom.xml        |   9 +
 .../java/org/apache/tez/analyzer/CSVResult.java |   5 +-
 .../analyzer/plugins/CriticalPathAnalyzer.java  |  44 +-
 .../tez/analyzer/plugins/LocalityAnalyzer.java  |   3 +-
 .../analyzer/plugins/ShuffleTimeAnalyzer.java   |  60 +-
 .../tez/analyzer/plugins/SkewAnalyzer.java      |   4 +
 .../analyzer/plugins/SlowTaskIdentifier.java    |  13 +-
 .../analyzer/plugins/SlowestVertexAnalyzer.java |  52 +-
 .../tez/analyzer/plugins/SpillAnalyzerImpl.java |  15 +-
 .../plugins/TaskConcurrencyAnalyzer.java        | 138 ++++
 .../org/apache/tez/analyzer/utils/SVGUtils.java | 264 +++++++
 .../org/apache/tez/analyzer/utils/Utils.java    | 100 +++
 18 files changed, 1751 insertions(+), 648 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/ecd90dc1/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b37eb9e..3de9fb7 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -16,6 +16,7 @@ INCOMPATIBLE CHANGES
   TEZ-2699. Internalize strings in ATF parser
 
 ALL CHANGES:
+  TEZ-2692. bugfixes & enhancements related to job parser and analyzer.
   TEZ-2663. SessionNotRunning exceptions are wrapped in a ServiceException 
from a dying AM.
   TEZ-2630. TezChild receives IP address instead of FQDN.
   TEZ-2684. ShuffleVertexManager.parsePartitionStats throws 
IllegalStateException: Stats should be initialized.

http://git-wip-us.apache.org/repos/asf/tez/blob/ecd90dc1/tez-dag/src/main/java/org/apache/tez/dag/utils/Graph.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/utils/Graph.java 
b/tez-dag/src/main/java/org/apache/tez/dag/utils/Graph.java
index 6de9c59..eb2bd41 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/utils/Graph.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/utils/Graph.java
@@ -62,6 +62,7 @@ public class Graph {
     List<Edge> outs;
     String label;
     String shape;
+    String color;
 
     public Node(String id) {
       this(id, null);
@@ -104,6 +105,10 @@ public class Graph {
     public void setShape(String shape) {
       this.shape = shape;
     }
+
+    public void setColor(String color) {
+      this.color = color;
+    }
   }
 
   private String name;
@@ -196,17 +201,19 @@ public class Graph {
     for (Node n : nodes) {
       if (n.shape != null && !n.shape.isEmpty()) {
         sb.append(String.format(
-            "%s%s [ label = %s, shape = %s ];",
+            "%s%s [ label = %s, shape = %s , color= %s];",
             indent,
             wrapSafeString(n.getUniqueId()),
             wrapSafeString(n.getLabel()),
-            wrapSafeString(n.shape)));
+            wrapSafeString(n.shape),
+            wrapSafeString(n.color == null ? "black" : n.color)));
       } else {
         sb.append(String.format(
-            "%s%s [ label = %s ];",
+            "%s%s [ label = %s , color= %s ];",
             indent,
             wrapSafeString(n.getUniqueId()),
-            wrapSafeString(n.getLabel())));
+            wrapSafeString(n.getLabel()),
+            wrapSafeString(n.color == null ? "black" : n.color)));
       }
       sb.append(System.getProperty("line.separator"));
       List<Edge> combinedOuts = combineEdges(n.outs);

http://git-wip-us.apache.org/repos/asf/tez/blob/ecd90dc1/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/SimpleHistoryParser.java
----------------------------------------------------------------------
diff --git 
a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/SimpleHistoryParser.java
 
b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/SimpleHistoryParser.java
new file mode 100644
index 0000000..09c010a
--- /dev/null
+++ 
b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/SimpleHistoryParser.java
@@ -0,0 +1,239 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.history.parser;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.Maps;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.history.logging.impl.SimpleHistoryLoggingService;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.history.parser.datamodel.BaseParser;
+import org.apache.tez.history.parser.datamodel.Constants;
+import org.apache.tez.history.parser.datamodel.DagInfo;
+import org.apache.tez.history.parser.datamodel.TaskAttemptInfo;
+import org.apache.tez.history.parser.datamodel.TaskInfo;
+import org.apache.tez.history.parser.datamodel.VertexInfo;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Scanner;
+
+/**
+ * Parser utility to parse data generated by SimpleHistoryLogging to in-memory 
datamodel provided
+ * in org.apache.tez.history.parser.datamodel
+ * <p/>
+ * <p/>
+ * Most of the information should be available. Minor info like VersionInfo 
may not be available,
+ * as it is not captured in SimpleHistoryLogging.
+ */
+public class SimpleHistoryParser extends BaseParser {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SimpleHistoryParser.class);
+  private static final String UTF8 = "UTF-8";
+  private final File historyFile;
+
+
+  public SimpleHistoryParser(File historyFile) {
+    super();
+    Preconditions.checkArgument(historyFile.exists(), historyFile + " does not 
exist");
+    this.historyFile = historyFile;
+  }
+
+  /**
+   * Get in-memory representation of DagInfo
+   *
+   * @return DagInfo
+   * @throws TezException
+   */
+  public DagInfo getDAGData(String dagId) throws TezException {
+    try {
+      Preconditions.checkArgument(!Strings.isNullOrEmpty(dagId), "Please 
provide valid dagId");
+      dagId = dagId.trim();
+      parseContents(historyFile, dagId);
+      linkParsedContents();
+      return dagInfo;
+    } catch (IOException e) {
+      LOG.error("Error in reading DAG ", e);
+      throw new TezException(e);
+    } catch (JSONException e) {
+      LOG.error("Error in parsing DAG ", e);
+      throw new TezException(e);
+    }
+  }
+
+  private void populateOtherInfo(JSONObject source, JSONObject destination) 
throws JSONException {
+    if (source == null || destination == null) {
+      return;
+    }
+    for (Iterator it = source.keys(); it.hasNext(); ) {
+      String key = (String) it.next();
+      Object val = source.get(key);
+      destination.put(key, val);
+    }
+  }
+
+  private void populateOtherInfo(JSONObject source, String entityName,
+      Map<String, JSONObject> destMap) throws JSONException {
+    JSONObject destinationJson = destMap.get(entityName);
+    JSONObject destOtherInfo = 
destinationJson.getJSONObject(Constants.OTHER_INFO);
+    populateOtherInfo(source, destOtherInfo);
+  }
+
+  private void parseContents(File historyFile, String dagId)
+      throws JSONException, FileNotFoundException, TezException {
+    Scanner scanner = new Scanner(historyFile, UTF8);
+    scanner.useDelimiter(SimpleHistoryLoggingService.RECORD_SEPARATOR);
+    JSONObject dagJson = null;
+    Map<String, JSONObject> vertexJsonMap = Maps.newHashMap();
+    Map<String, JSONObject> taskJsonMap = Maps.newHashMap();
+    Map<String, JSONObject> attemptJsonMap = Maps.newHashMap();
+    TezDAGID tezDAGID = TezDAGID.fromString(dagId);
+    while (scanner.hasNext()) {
+      String line = scanner.next();
+      JSONObject jsonObject = new JSONObject(line);
+      String entity = jsonObject.getString(Constants.ENTITY);
+      String entityType = jsonObject.getString(Constants.ENTITY_TYPE);
+      switch (entityType) {
+      case Constants.TEZ_DAG_ID:
+        if (!dagId.equals(entity)) {
+          LOG.warn(dagId + " is not matching with " + entity);
+          continue;
+        }
+        // Club all DAG related information together (DAG_INIT, DAG_FINISH 
etc). Each of them
+        // would have a set of entities in otherinfo (e.g vertex mapping, 
dagPlan, start/finish
+        // time etc).
+        if (dagJson == null) {
+          dagJson = jsonObject;
+        }
+        JSONObject otherInfo = jsonObject.optJSONObject(Constants.OTHER_INFO);
+        JSONObject dagOtherInfo = dagJson.getJSONObject(Constants.OTHER_INFO);
+        populateOtherInfo(otherInfo, dagOtherInfo);
+        break;
+      case Constants.TEZ_VERTEX_ID:
+        String vertexName = entity;
+        TezVertexID tezVertexID = TezVertexID.fromString(vertexName);
+        if (!tezDAGID.equals(tezVertexID.getDAGId())) {
+          LOG.warn(vertexName + " does not belong to " + tezDAGID);
+          continue;
+        }
+        if (!vertexJsonMap.containsKey(vertexName)) {
+          vertexJsonMap.put(vertexName, jsonObject);
+        }
+        otherInfo = jsonObject.optJSONObject(Constants.OTHER_INFO);
+        populateOtherInfo(otherInfo, vertexName, vertexJsonMap);
+        break;
+      case Constants.TEZ_TASK_ID:
+        String taskName = entity;
+        TezTaskID tezTaskID = TezTaskID.fromString(taskName);
+        if (!tezDAGID.equals(tezTaskID.getVertexID().getDAGId())) {
+          LOG.warn(taskName + " does not belong to " + tezDAGID);
+          continue;
+        }
+        if (!taskJsonMap.containsKey(taskName)) {
+          taskJsonMap.put(taskName, jsonObject);
+        }
+        otherInfo = jsonObject.optJSONObject(Constants.OTHER_INFO);
+        populateOtherInfo(otherInfo, taskName, taskJsonMap);
+        break;
+      case Constants.TEZ_TASK_ATTEMPT_ID:
+        String taskAttemptName = entity;
+        TezTaskAttemptID tezAttemptId = 
TezTaskAttemptID.fromString(taskAttemptName);
+        if 
(!tezDAGID.equals(tezAttemptId.getTaskID().getVertexID().getDAGId())) {
+          LOG.warn(taskAttemptName + " does not belong to " + tezDAGID);
+          continue;
+        }
+        if (!attemptJsonMap.containsKey(taskAttemptName)) {
+          attemptJsonMap.put(taskAttemptName, jsonObject);
+        }
+        otherInfo = jsonObject.optJSONObject(Constants.OTHER_INFO);
+        populateOtherInfo(otherInfo, taskAttemptName, attemptJsonMap);
+        break;
+      default:
+        break;
+      }
+    }
+    scanner.close();
+    if (dagJson != null) {
+      this.dagInfo = DagInfo.create(dagJson);
+    } else {
+      LOG.error("Dag is not yet parsed. Looks like partial file.");
+      throw new TezException(
+          "Please provide a valid/complete history log file containing " + 
dagId);
+    }
+    for (JSONObject jsonObject : vertexJsonMap.values()) {
+      VertexInfo vertexInfo = VertexInfo.create(jsonObject);
+      this.vertexList.add(vertexInfo);
+      LOG.debug("Parsed vertex {}", vertexInfo.getVertexName());
+    }
+    for (JSONObject jsonObject : taskJsonMap.values()) {
+      TaskInfo taskInfo = TaskInfo.create(jsonObject);
+      this.taskList.add(taskInfo);
+      LOG.debug("Parsed task {}", taskInfo.getTaskId());
+    }
+    for (JSONObject jsonObject : attemptJsonMap.values()) {
+      /**
+       * For converting SimpleHistoryLogging to in-memory representation
+       *
+       * We need to get 
"relatedEntities":[{"entity":"cn055-10.l42scl.hortonworks.com:58690",
+       * 
"entitytype":"nodeId"},{"entity":"container_1438652049951_0008_01_000152",
+       * "entitytype":"containerId"} and populate it in otherInfo object so 
that in-memory
+       * representation can parse it correctly
+       */
+    JSONObject subJsonObject = 
jsonObject.optJSONArray(Constants.RELATED_ENTITIES)
+        .optJSONObject(0);
+      if (subJsonObject != null) {
+        String nodeId = subJsonObject.optString(Constants.ENTITY_TYPE);
+        if (!Strings.isNullOrEmpty(nodeId) && 
nodeId.equalsIgnoreCase(Constants.NODE_ID)) {
+          //populate it in otherInfo
+          JSONObject otherInfo = 
jsonObject.optJSONObject(Constants.OTHER_INFO);
+          String nodeIdVal = subJsonObject.optString(Constants.ENTITY);
+          if (otherInfo != null && nodeIdVal != null) {
+            otherInfo.put(Constants.NODE_ID, nodeIdVal);
+          }
+        }
+      }
+
+      subJsonObject = jsonObject.optJSONArray(Constants.RELATED_ENTITIES)
+          .optJSONObject(1);
+      if (subJsonObject != null) {
+        String containerId = subJsonObject.optString(Constants.ENTITY_TYPE);
+        if (!Strings.isNullOrEmpty(containerId) && 
containerId.equalsIgnoreCase(Constants.CONTAINER_ID)) {
+          //populate it in otherInfo
+          JSONObject otherInfo = 
jsonObject.optJSONObject(Constants.OTHER_INFO);
+          String containerIdVal = subJsonObject.optString(Constants.ENTITY);
+          if (otherInfo != null && containerIdVal != null) {
+            otherInfo.put(Constants.CONTAINER_ID, containerIdVal);
+          }
+        }
+      }
+      TaskAttemptInfo attemptInfo = TaskAttemptInfo.create(jsonObject);
+      this.attemptList.add(attemptInfo);
+      LOG.debug("Parsed task attempt {}", attemptInfo.getTaskAttemptId());
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tez/blob/ecd90dc1/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VertexInfo.java
----------------------------------------------------------------------
diff --git 
a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VertexInfo.java
 
b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VertexInfo.java
index d2dac7d..6e227a5 100644
--- 
a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VertexInfo.java
+++ 
b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VertexInfo.java
@@ -33,6 +33,7 @@ import org.apache.tez.dag.api.oldrecords.TaskState;
 import org.codehaus.jettison.json.JSONException;
 import org.codehaus.jettison.json.JSONObject;
 
+import javax.annotation.Nullable;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
@@ -164,13 +165,25 @@ public class VertexInfo extends BaseInfo {
     updateEdgeInfo();
   }
 
+  public List<AdditionalInputOutputDetails> getAdditionalInputInfoList() {
+    return Collections.unmodifiableList(additionalInputInfoList);
+  }
+
+  public List<AdditionalInputOutputDetails> getAdditionalOutputInfoList() {
+    return Collections.unmodifiableList(additionalOutputInfoList);
+  }
+
   @Override
   public final long getStartTimeInterval() {
     return startTime - (dagInfo.getStartTime());
   }
 
   public final long getFirstTaskStartTimeInterval() {
-    return getFirstTaskToStart().getStartTimeInterval();
+    TaskInfo firstTask = getFirstTaskToStart();
+    if (firstTask == null) {
+      return 0;
+    }
+    return firstTask.getStartTimeInterval();
   }
 
   public final long getLastTaskFinishTimeInterval() {
@@ -270,14 +283,32 @@ public class VertexInfo extends BaseInfo {
 
   }
 
+
+  private List<TaskInfo> getTasksInternal() {
+    return Lists.newLinkedList(taskInfoMap.values());
+  }
+
   /**
    * Get all tasks
    *
    * @return list of taskInfo
    */
   public final List<TaskInfo> getTasks() {
-    List<TaskInfo> taskInfoList = Lists.newLinkedList(taskInfoMap.values());
-    Collections.sort(taskInfoList, orderingOnStartTime());
+    return Collections.unmodifiableList(getTasksInternal());
+  }
+
+  /**
+   * Get all tasks in sorted order
+   *
+   * @param sorted
+   * @param ordering
+   * @return list of TaskInfo
+   */
+  public final List<TaskInfo> getTasks(boolean sorted, @Nullable 
Ordering<TaskInfo> ordering) {
+    List<TaskInfo> taskInfoList = getTasksInternal();
+    if (sorted) {
+      Collections.sort(taskInfoList, ((ordering == null) ? 
orderingOnStartTime() : ordering));
+    }
     return Collections.unmodifiableList(taskInfoList);
   }
 
@@ -352,12 +383,36 @@ public class VertexInfo extends BaseInfo {
     return Collections.unmodifiableList(outputVertices);
   }
 
-  public List<TaskAttemptInfo> getTaskAttempts() {
+  private List<TaskAttemptInfo> getTaskAttemptsInternal() {
     List<TaskAttemptInfo> taskAttemptInfos = Lists.newLinkedList();
     for (TaskInfo taskInfo : getTasks()) {
       taskAttemptInfos.addAll(taskInfo.getTaskAttempts());
     }
-    Collections.sort(taskAttemptInfos, orderingOnAttemptStartTime());
+    return taskAttemptInfos;
+  }
+
+  /**
+   * Get all task attempts
+   *
+   * @return List<TaskAttemptInfo> list of attempts
+   */
+  public List<TaskAttemptInfo> getTaskAttempts() {
+    return Collections.unmodifiableList(getTaskAttemptsInternal());
+  }
+
+  /**
+   * Get all task attempts in sorted order
+   *
+   * @param sorted
+   * @param ordering
+   * @return list of TaskAttemptInfo
+   */
+  public final List<TaskAttemptInfo> getTaskAttempts(boolean sorted,
+      @Nullable Ordering<TaskAttemptInfo> ordering) {
+    List<TaskAttemptInfo> taskAttemptInfos = getTaskAttemptsInternal();
+    if (sorted) {
+      Collections.sort(taskAttemptInfos, ((ordering == null) ? 
orderingOnAttemptStartTime() : ordering));
+    }
     return Collections.unmodifiableList(taskAttemptInfos);
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/ecd90dc1/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestATSFileParser.java
----------------------------------------------------------------------
diff --git 
a/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestATSFileParser.java
 
b/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestATSFileParser.java
deleted file mode 100644
index 0d76e03..0000000
--- 
a/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestATSFileParser.java
+++ /dev/null
@@ -1,587 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.history;
-
-import com.google.common.collect.Sets;
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang.RandomStringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.tez.client.TezClient;
-import org.apache.tez.common.counters.DAGCounter;
-import org.apache.tez.common.counters.TaskCounter;
-import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.dag.api.DAG;
-import org.apache.tez.dag.api.DataSinkDescriptor;
-import org.apache.tez.dag.api.DataSourceDescriptor;
-import org.apache.tez.dag.api.Edge;
-import org.apache.tez.dag.api.EdgeProperty;
-import org.apache.tez.dag.api.ProcessorDescriptor;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.TezException;
-import org.apache.tez.dag.api.Vertex;
-import org.apache.tez.dag.api.client.DAGClient;
-import org.apache.tez.dag.api.client.StatusGetOpts;
-import org.apache.tez.dag.api.event.VertexState;
-import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
-import org.apache.tez.dag.api.oldrecords.TaskState;
-import org.apache.tez.dag.app.dag.DAGState;
-import org.apache.tez.dag.history.logging.ats.ATSHistoryLoggingService;
-import org.apache.tez.dag.records.TezDAGID;
-import org.apache.tez.examples.WordCount;
-import org.apache.tez.history.parser.ATSFileParser;
-import org.apache.tez.history.parser.datamodel.DagInfo;
-import org.apache.tez.history.parser.datamodel.EdgeInfo;
-import org.apache.tez.history.parser.datamodel.TaskAttemptInfo;
-import org.apache.tez.history.parser.datamodel.TaskInfo;
-import org.apache.tez.history.parser.datamodel.VersionInfo;
-import org.apache.tez.history.parser.datamodel.VertexInfo;
-import org.apache.tez.mapreduce.input.MRInput;
-import org.apache.tez.mapreduce.output.MROutput;
-import org.apache.tez.mapreduce.processor.SimpleMRProcessor;
-import org.apache.tez.runtime.api.ProcessorContext;
-import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
-import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfig;
-import org.apache.tez.runtime.library.input.OrderedGroupedKVInput;
-import org.apache.tez.runtime.library.output.OrderedPartitionedKVOutput;
-import org.apache.tez.runtime.library.partitioner.HashPartitioner;
-import org.apache.tez.tests.MiniTezClusterWithTimeline;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.util.List;
-import java.util.Map;
-
-import static org.junit.Assert.assertTrue;
-
-public class TestATSFileParser {
-
-  private static MiniDFSCluster miniDFSCluster;
-  private static MiniTezClusterWithTimeline miniTezCluster;
-
-  //location within miniHDFS cluster's hdfs
-  private static Path inputLoc = new Path("/tmp/sample.txt");
-
-  private final static String INPUT = "Input";
-  private final static String OUTPUT = "Output";
-  private final static String TOKENIZER = "Tokenizer";
-  private final static String SUMMATION = "Summation";
-
-  private static Configuration conf = new Configuration();
-  private static FileSystem fs;
-  private static String TEST_ROOT_DIR =
-      "target" + Path.SEPARATOR + TestATSFileParser.class.getName() + 
"-tmpDir";
-  private static String TEZ_BASE_DIR =
-      "target" + Path.SEPARATOR + TestATSFileParser.class.getName() + "-tez";
-  private static String DOWNLOAD_DIR = TEST_ROOT_DIR + Path.SEPARATOR + 
"download";
-
-  private static TezClient tezClient;
-
-  private static int dagNumber;
-
-  @BeforeClass
-  public static void setupCluster() throws Exception {
-    conf = new Configuration();
-    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_EDITS_NOEDITLOGCHANNELFLUSH, 
false);
-    EditLogFileOutputStream.setShouldSkipFsyncForTesting(true);
-    conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR);
-    miniDFSCluster =
-        new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build();
-    fs = miniDFSCluster.getFileSystem();
-    conf.set("fs.defaultFS", fs.getUri().toString());
-
-    setupTezCluster();
-  }
-
-  @AfterClass
-  public static void shutdownCluster() {
-    try {
-      if (tezClient != null) {
-        try {
-          tezClient.stop();
-        } catch (TezException e) {
-          //ignore
-        } catch (IOException e) {
-          //ignore
-        }
-      }
-      if (miniDFSCluster != null) {
-        miniDFSCluster.shutdown();
-      }
-      if (miniTezCluster != null) {
-        miniTezCluster.stop();
-      }
-    } finally {
-      try {
-        FileUtils.deleteDirectory(new File(TEST_ROOT_DIR));
-        FileUtils.deleteDirectory(new File(TEZ_BASE_DIR));
-      } catch (IOException e) {
-        //safe to ignore
-      }
-    }
-  }
-
-  // @Before
-  public static void setupTezCluster() throws Exception {
-    conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_CONNECT_TIMEOUT, 3 
* 1000);
-    conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT, 3 * 
1000);
-    
conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_FAILURES_LIMIT, 
2);
-
-    //Enable per edge counters
-    conf.setBoolean(TezConfiguration.TEZ_TASK_GENERATE_COUNTERS_PER_IO, true);
-    conf.setBoolean(TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS, 
true);
-    conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
-    conf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS, 
ATSHistoryLoggingService
-        .class.getName());
-
-    miniTezCluster =
-        new MiniTezClusterWithTimeline(TEZ_BASE_DIR, 1, 1, 1, true);
-
-    miniTezCluster.init(conf);
-    miniTezCluster.start();
-
-    createSampleFile(inputLoc);
-
-    TezConfiguration tezConf = new 
TezConfiguration(miniTezCluster.getConfig());
-    tezConf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
-    tezConf.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS, 
"0.0.0.0:8188");
-    
tezConf.setBoolean(TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS, 
true);
-    tezConf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS,
-        ATSHistoryLoggingService.class.getName());
-
-    tezClient = TezClient.create("WordCount", tezConf, true);
-    tezClient.start();
-    tezClient.waitTillReady();
-  }
-
-
-  /**
-   * Run a word count example in mini cluster and check if it is possible to 
download
-   * data from ATS and parse it.
-   *
-   * @throws Exception
-   */
-  @Test
-  public void testParserWithSuccessfulJob() throws Exception {
-    //Run basic word count example.
-    String dagId = runWordCount(WordCount.TokenProcessor.class.getName(),
-        WordCount.SumProcessor.class.getName(), "WordCount");
-
-    //Export the data from ATS
-    String[] args = { "--dagId=" + dagId, "--downloadDir=" + DOWNLOAD_DIR };
-
-    int result = ATSImportTool.process(args);
-    assertTrue(result == 0);
-
-    //Parse ATS data
-    DagInfo dagInfo = getDagInfo(dagId);
-
-    //Verify DAGInfo. Verifies vertex, task, taskAttempts in recursive manner
-    verifyDagInfo(dagInfo);
-
-    //Job specific
-    assertTrue(dagInfo.getNumVertices() == 2);
-    assertTrue(dagInfo.getName().equals("WordCount"));
-    assertTrue(dagInfo.getVertex(TOKENIZER).getProcessorClassName().equals(
-        WordCount.TokenProcessor.class.getName()));
-    assertTrue(dagInfo.getVertex(SUMMATION).getProcessorClassName()
-        .equals(WordCount.SumProcessor.class.getName()));
-    assertTrue(dagInfo.getEdges().size() == 1);
-    EdgeInfo edgeInfo = dagInfo.getEdges().iterator().next();
-    assertTrue(edgeInfo.getDataMovementType().
-        equals(EdgeProperty.DataMovementType.SCATTER_GATHER.toString()));
-    assertTrue(edgeInfo.getSourceVertex().getVertexName().equals(TOKENIZER));
-    
assertTrue(edgeInfo.getDestinationVertex().getVertexName().equals(SUMMATION));
-    assertTrue(edgeInfo.getInputVertexName().equals(TOKENIZER));
-    assertTrue(edgeInfo.getOutputVertexName().equals(SUMMATION));
-    
assertTrue(edgeInfo.getEdgeSourceClass().equals(OrderedPartitionedKVOutput.class.getName()));
-    
assertTrue(edgeInfo.getEdgeDestinationClass().equals(OrderedGroupedKVInput.class.getName()));
-    assertTrue(dagInfo.getVertices().size() == 2);
-    String lastSourceTA = null;
-    String lastDataEventSourceTA = null;
-    for (VertexInfo vertexInfo : dagInfo.getVertices()) {
-      assertTrue(vertexInfo.getKilledTasksCount() == 0);
-      assertTrue(vertexInfo.getInitRequestedTime() > 0);
-      assertTrue(vertexInfo.getInitTime() > 0);
-      assertTrue(vertexInfo.getStartRequestedTime() > 0);
-      assertTrue(vertexInfo.getStartTime() > 0);
-      assertTrue(vertexInfo.getFinishTime() > 0);
-      long finishTime = 0;
-      for (TaskInfo taskInfo : vertexInfo.getTasks()) {
-        assertTrue(taskInfo.getNumberOfTaskAttempts() == 1);
-        assertTrue(taskInfo.getMaxTaskAttemptDuration() >= 0);
-        assertTrue(taskInfo.getMinTaskAttemptDuration() >= 0);
-        assertTrue(taskInfo.getAvgTaskAttemptDuration() >= 0);
-        assertTrue(taskInfo.getLastTaskAttemptToFinish() != null);
-        assertTrue(taskInfo.getContainersMapping().size() > 0);
-        assertTrue(taskInfo.getSuccessfulTaskAttempts().size() > 0);
-        assertTrue(taskInfo.getFailedTaskAttempts().size() == 0);
-        assertTrue(taskInfo.getKilledTaskAttempts().size() == 0);
-        List<TaskAttemptInfo> attempts = taskInfo.getTaskAttempts();
-        if (vertexInfo.getVertexName().equals(TOKENIZER)) {
-          // get the last task to finish and track its successful attempt
-          if (finishTime < taskInfo.getFinishTime()) {
-            finishTime = taskInfo.getFinishTime();
-            lastSourceTA = taskInfo.getSuccessfulAttemptId();
-          }
-        } else {
-          for (TaskAttemptInfo attempt : attempts) {
-            assertTrue(attempt.getLastDataEventTime() > 0);
-            if (lastDataEventSourceTA == null) {
-              lastDataEventSourceTA = attempt.getLastDataEventSourceTA();
-            } else {
-              // all attempts should have the same last data event source TA
-              
assertTrue(lastDataEventSourceTA.equals(attempt.getLastDataEventSourceTA()));
-            }
-          }
-        }
-        for (TaskAttemptInfo attemptInfo : taskInfo.getTaskAttempts()) {
-          assertTrue(attemptInfo.getStartTimeInterval() > 0);
-          assertTrue(attemptInfo.getScheduledTimeInterval() > 0);
-        }
-      }
-      assertTrue(vertexInfo.getLastTaskToFinish() != null);
-      if (vertexInfo.getVertexName().equals(TOKENIZER)) {
-        assertTrue(vertexInfo.getInputEdges().size() == 0);
-        assertTrue(vertexInfo.getOutputEdges().size() == 1);
-        assertTrue(vertexInfo.getOutputVertices().size() == 1);
-        assertTrue(vertexInfo.getInputVertices().size() == 0);
-      } else {
-        assertTrue(vertexInfo.getInputEdges().size() == 1);
-        assertTrue(vertexInfo.getOutputEdges().size() == 0);
-        assertTrue(vertexInfo.getOutputVertices().size() == 0);
-        assertTrue(vertexInfo.getInputVertices().size() == 1);
-      }
-    }
-    assertTrue(lastSourceTA.equals(lastDataEventSourceTA));
-  }
-
-  /**
-   * Run a word count example in mini cluster.
-   * Provide invalid URL for ATS.
-   *
-   * @throws Exception
-   */
-  @Test
-  public void testParserWithSuccessfulJob_InvalidATS() throws Exception {
-    //Run basic word count example.
-    String dagId =  runWordCount(WordCount.TokenProcessor.class.getName(),
-        WordCount.SumProcessor.class.getName(), "WordCount-With-WrongATS-URL");
-
-    //Export the data from ATS
-    String atsAddress = "--atsAddress=http://atsHost:8188";;
-    String[] args = { "--dagId=" + dagId,
-        "--downloadDir=" + DOWNLOAD_DIR,
-        atsAddress
-      };
-
-    int result = ATSImportTool.process(args);
-    assertTrue(result == -1);
-  }
-
-  /**
-   * Run a failed job and parse the data from ATS
-   */
-  @Test
-  public void testParserWithFailedJob() throws Exception {
-    //Run a job which would fail
-    String dagId = runWordCount(WordCount.TokenProcessor.class.getName(), 
FailProcessor.class
-        .getName(), "WordCount-With-Exception");
-
-    //Export the data from ATS
-    String[] args = { "--dagId=" + dagId, "--downloadDir=" + DOWNLOAD_DIR };
-
-    int result = ATSImportTool.process(args);
-    assertTrue(result == 0);
-
-    //Parse ATS data
-    DagInfo dagInfo = getDagInfo(dagId);
-
-    //Verify DAGInfo. Verifies vertex, task, taskAttempts in recursive manner
-    verifyDagInfo(dagInfo);
-
-    //Dag specific
-    VertexInfo summationVertex = dagInfo.getVertex(SUMMATION);
-    assertTrue(summationVertex.getFailedTasks().size() == 1); //1 task, 4 
attempts failed
-    
assertTrue(summationVertex.getFailedTasks().get(0).getFailedTaskAttempts().size()
 == 4);
-    
assertTrue(summationVertex.getStatus().equals(VertexState.FAILED.toString()));
-
-    assertTrue(dagInfo.getFailedVertices().size() == 1);
-    
assertTrue(dagInfo.getFailedVertices().get(0).getVertexName().equals(SUMMATION));
-    assertTrue(dagInfo.getSuccessfullVertices().size() == 1);
-    
assertTrue(dagInfo.getSuccessfullVertices().get(0).getVertexName().equals(TOKENIZER));
-
-    assertTrue(dagInfo.getStatus().equals(DAGState.FAILED.toString()));
-
-    verifyCounter(dagInfo.getCounter(DAGCounter.NUM_FAILED_TASKS.toString()), 
null, 4);
-    
verifyCounter(dagInfo.getCounter(DAGCounter.NUM_SUCCEEDED_TASKS.toString()), 
null, 1);
-    
verifyCounter(dagInfo.getCounter(DAGCounter.TOTAL_LAUNCHED_TASKS.toString()), 
null, 5);
-
-    
verifyCounter(dagInfo.getCounter(TaskCounter.INPUT_RECORDS_PROCESSED.toString()),
-        "TaskCounter_Tokenizer_INPUT_Input", 10);
-    
verifyCounter(dagInfo.getCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_READ.toString()),
-        "TaskCounter_Tokenizer_OUTPUT_Summation", 0);
-    verifyCounter(dagInfo.getCounter(TaskCounter.OUTPUT_RECORDS.toString()),
-        "TaskCounter_Tokenizer_OUTPUT_Summation",
-        20); //Every line has 2 words. 10 lines x 2 words = 20
-    verifyCounter(dagInfo.getCounter(TaskCounter.SPILLED_RECORDS.toString()),
-        "TaskCounter_Tokenizer_OUTPUT_Summation", 20); //Same as above
-    
-    for (TaskInfo taskInfo : summationVertex.getTasks()) {
-      String lastAttemptId = null;
-      for (TaskAttemptInfo attemptInfo : taskInfo.getTaskAttempts()) {
-        if (lastAttemptId != null) {
-          // failed attempt should be causal TA of next attempt
-          
assertTrue(lastAttemptId.equals(attemptInfo.getSchedulingCausalTA()));
-        }
-        lastAttemptId = attemptInfo.getTaskAttemptId();
-      }
-    }
-
-    //TODO: Need to check for SUMMATION vertex counters. Since all attempts 
are failed, counters are not getting populated.
-    //TaskCounter.REDUCE_INPUT_RECORDS
-
-    //Verify if the processor exception is given in diagnostics
-    assertTrue(dagInfo.getDiagnostics().contains("Failing this processor for 
some reason"));
-
-  }
-
-  /**
-   * Create sample file for wordcount program
-   *
-   * @param inputLoc
-   * @throws IOException
-   */
-  private static void createSampleFile(Path inputLoc) throws IOException {
-    fs.deleteOnExit(inputLoc);
-    FSDataOutputStream out = fs.create(inputLoc);
-    BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(out));
-    for (int i = 0; i < 10; i++) {
-      writer.write("Sample " + RandomStringUtils.randomAlphanumeric(5));
-      writer.newLine();
-    }
-    writer.close();
-  }
-
-  private DagInfo getDagInfo(String dagId) throws TezException {
-    //Parse downloaded contents
-    File downloadedFile = new File(DOWNLOAD_DIR
-        + Path.SEPARATOR + dagId
-        + Path.SEPARATOR + dagId + ".zip");
-    ATSFileParser parser = new ATSFileParser(downloadedFile);
-    DagInfo dagInfo = parser.getDAGData(dagId);
-    assertTrue(dagInfo.getDagId().equals(dagId));
-    return dagInfo;
-  }
-
-  private void verifyCounter(Map<String, TezCounter> counterMap,
-      String counterGroupName, long expectedVal) {
-    //Iterate through group-->tezCounter
-    for (Map.Entry<String, TezCounter> entry : counterMap.entrySet()) {
-      if (counterGroupName != null) {
-        if (entry.getKey().equals(counterGroupName)) {
-          assertTrue(entry.getValue().getValue() == expectedVal);
-        }
-      } else {
-        assertTrue(entry.getValue().getValue() == expectedVal);
-      }
-    }
-  }
-
-  private String runWordCount(String tokenizerProcessor, String 
summationProcessor,
-      String dagName)
-      throws Exception {
-    dagNumber++;
-
-    //HDFS path
-    Path outputLoc = new Path("/tmp/outPath_" + System.currentTimeMillis());
-
-    DataSourceDescriptor dataSource = MRInput.createConfigBuilder(conf,
-        TextInputFormat.class, inputLoc.toString()).build();
-
-    DataSinkDescriptor dataSink =
-        MROutput.createConfigBuilder(conf, TextOutputFormat.class, 
outputLoc.toString()).build();
-
-    Vertex tokenizerVertex = Vertex.create(TOKENIZER, 
ProcessorDescriptor.create(
-        tokenizerProcessor)).addDataSource(INPUT, dataSource);
-
-    OrderedPartitionedKVEdgeConfig edgeConf = OrderedPartitionedKVEdgeConfig
-        .newBuilder(Text.class.getName(), IntWritable.class.getName(),
-            HashPartitioner.class.getName()).build();
-
-    Vertex summationVertex = Vertex.create(SUMMATION,
-        ProcessorDescriptor.create(summationProcessor), 1).addDataSink(OUTPUT, 
dataSink);
-
-    // Create DAG and add the vertices. Connect the producer and consumer 
vertices via the edge
-    DAG dag = DAG.create(dagName);
-    dag.addVertex(tokenizerVertex).addVertex(summationVertex).addEdge(
-        Edge.create(tokenizerVertex, summationVertex, 
edgeConf.createDefaultEdgeProperty()));
-
-    DAGClient client = tezClient.submitDAG(dag);
-    
client.waitForCompletionWithStatusUpdates(Sets.newHashSet(StatusGetOpts.GET_COUNTERS));
-    TezDAGID tezDAGID = 
TezDAGID.getInstance(tezClient.getAppMasterApplicationId(), dagNumber);
-
-    return tezDAGID.toString();
-  }
-
-  /**
-   * Processor which would just throw exception.
-   */
-  public static class FailProcessor extends SimpleMRProcessor {
-    public FailProcessor(ProcessorContext context) {
-      super(context);
-    }
-
-    @Override
-    public void run() throws Exception {
-      throw new Exception("Failing this processor for some reason");
-    }
-  }
-
-  private void verifyDagInfo(DagInfo dagInfo) {
-    VersionInfo versionInfo = dagInfo.getVersionInfo();
-    assertTrue(versionInfo != null); //should be present post 0.5.4
-    assertTrue(versionInfo.getVersion() != null);
-    assertTrue(versionInfo.getRevision() != null);
-    assertTrue(versionInfo.getBuildTime() != null);
-
-    assertTrue(dagInfo.getStartTime() > 0);
-    assertTrue(dagInfo.getFinishTimeInterval() > 0);
-    assertTrue(dagInfo.getStartTimeInterval() == 0);
-    assertTrue(dagInfo.getStartTime() > 0);
-    if (dagInfo.getStatus().equalsIgnoreCase(DAGState.SUCCEEDED.toString())) {
-      assertTrue(dagInfo.getFinishTime() >= dagInfo.getStartTime());
-    }
-    assertTrue(dagInfo.getFinishTimeInterval() > 
dagInfo.getStartTimeInterval());
-
-    assertTrue(dagInfo.getStartTime() > dagInfo.getSubmitTime());
-    assertTrue(dagInfo.getTimeTaken() > 0);
-
-    //Verify all vertices
-    for (VertexInfo vertexInfo : dagInfo.getVertices()) {
-      verifyVertex(vertexInfo, vertexInfo.getFailedTasksCount() > 0);
-    }
-
-    VertexInfo fastestVertex = dagInfo.getFastestVertex();
-    assertTrue(fastestVertex != null);
-
-    if (dagInfo.getStatus().equals(DAGState.SUCCEEDED)) {
-      assertTrue(dagInfo.getSlowestVertex() != null);
-    }
-  }
-
-  private void verifyVertex(VertexInfo vertexInfo, boolean hasFailedTasks) {
-    assertTrue(vertexInfo != null);
-    if (hasFailedTasks) {
-      assertTrue(vertexInfo.getFailedTasksCount() > 0);
-    }
-    assertTrue(vertexInfo.getStartTimeInterval() > 0);
-    assertTrue(vertexInfo.getStartTime() > 0);
-    assertTrue(vertexInfo.getFinishTimeInterval() > 0);
-    assertTrue(vertexInfo.getStartTimeInterval() < 
vertexInfo.getFinishTimeInterval());
-    assertTrue(vertexInfo.getVertexName() != null);
-    if (!hasFailedTasks) {
-      assertTrue(vertexInfo.getFinishTime() > 0);
-      assertTrue(vertexInfo.getFailedTasks().size() == 0);
-      assertTrue(vertexInfo.getSucceededTasksCount() == 
vertexInfo.getSuccessfulTasks().size());
-      assertTrue(vertexInfo.getFailedTasksCount() == 0);
-      assertTrue(vertexInfo.getAvgTaskDuration() > 0);
-      assertTrue(vertexInfo.getMaxTaskDuration() > 0);
-      assertTrue(vertexInfo.getMinTaskDuration() > 0);
-      assertTrue(vertexInfo.getTimeTaken() > 0);
-      
assertTrue(vertexInfo.getStatus().equalsIgnoreCase(VertexState.SUCCEEDED.toString()));
-      assertTrue(vertexInfo.getCompletedTasksCount() > 0);
-      assertTrue(vertexInfo.getFirstTaskToStart() != null);
-      assertTrue(vertexInfo.getSucceededTasksCount() > 0);
-      assertTrue(vertexInfo.getTasks().size() > 0);
-    }
-
-    for (TaskInfo taskInfo : vertexInfo.getTasks()) {
-      if (taskInfo.getStatus().equals(TaskState.SUCCEEDED.toString())) {
-        verifyTask(taskInfo, false);
-      }
-    }
-
-    for (TaskInfo taskInfo : vertexInfo.getFailedTasks()) {
-      verifyTask(taskInfo, true);
-    }
-
-    assertTrue(vertexInfo.getProcessorClassName() != null);
-    assertTrue(vertexInfo.getStatus() != null);
-    assertTrue(vertexInfo.getDagInfo() != null);
-    assertTrue(vertexInfo.getInitTimeInterval() > 0);
-    assertTrue(vertexInfo.getNumTasks() > 0);
-  }
-
-  private void verifyTask(TaskInfo taskInfo, boolean hasFailedAttempts) {
-    assertTrue(taskInfo != null);
-    assertTrue(taskInfo.getStatus() != null);
-    assertTrue(taskInfo.getStartTimeInterval() > 0);
-
-    //Not testing for killed attempts. So if there are no failures, it should 
succeed
-    if (!hasFailedAttempts) {
-      assertTrue(taskInfo.getStatus().equals(TaskState.SUCCEEDED.toString()));
-      assertTrue(taskInfo.getFinishTimeInterval() > 0 && 
taskInfo.getFinishTime() > taskInfo
-          .getFinishTimeInterval());
-      assertTrue(
-          taskInfo.getStartTimeInterval() > 0 && taskInfo.getStartTime() > 
taskInfo.getStartTimeInterval());
-      assertTrue(taskInfo.getSuccessfulAttemptId() != null);
-      assertTrue(taskInfo.getSuccessfulTaskAttempt() != null);
-    }
-    assertTrue(taskInfo.getTaskId() != null);
-
-    for (TaskAttemptInfo attemptInfo : taskInfo.getTaskAttempts()) {
-      verifyTaskAttemptInfo(attemptInfo);
-    }
-  }
-
-  private void verifyTaskAttemptInfo(TaskAttemptInfo attemptInfo) {
-    if (attemptInfo.getStatus() != null && attemptInfo.getStatus()
-        .equals(TaskAttemptState.SUCCEEDED)) {
-      assertTrue(attemptInfo.getStartTimeInterval() > 0);
-      assertTrue(attemptInfo.getFinishTimeInterval() > 0);
-      assertTrue(attemptInfo.getStartTime() > 0);
-      assertTrue(attemptInfo.getFinishTime() > 0);
-      assertTrue(attemptInfo.getFinishTime() > attemptInfo.getStartTime());
-      assertTrue(attemptInfo.getFinishTime() > 
attemptInfo.getFinishTimeInterval());
-      assertTrue(attemptInfo.getStartTime() > 
attemptInfo.getStartTimeInterval());
-      assertTrue(attemptInfo.getNodeId() != null);
-      assertTrue(attemptInfo.getTimeTaken() != -1);
-      assertTrue(attemptInfo.getEvents() != null);
-      assertTrue(attemptInfo.getTezCounters() != null);
-      assertTrue(attemptInfo.getContainer() != null);
-    }
-    assertTrue(attemptInfo.getTaskInfo() != null);
-  }
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/ecd90dc1/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestHistoryParser.java
----------------------------------------------------------------------
diff --git 
a/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestHistoryParser.java
 
b/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestHistoryParser.java
new file mode 100644
index 0000000..c89acb2
--- /dev/null
+++ 
b/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestHistoryParser.java
@@ -0,0 +1,785 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.history;
+
+import com.google.common.collect.Sets;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.tez.client.TezClient;
+import org.apache.tez.common.counters.DAGCounter;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.DataSinkDescriptor;
+import org.apache.tez.dag.api.DataSourceDescriptor;
+import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.StatusGetOpts;
+import org.apache.tez.dag.api.event.VertexState;
+import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
+import org.apache.tez.dag.api.oldrecords.TaskState;
+import org.apache.tez.dag.app.dag.DAGState;
+import org.apache.tez.dag.history.logging.ats.ATSHistoryLoggingService;
+import org.apache.tez.dag.history.logging.impl.SimpleHistoryLoggingService;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.examples.WordCount;
+import org.apache.tez.history.parser.ATSFileParser;
+import org.apache.tez.history.parser.SimpleHistoryParser;
+import org.apache.tez.history.parser.datamodel.BaseInfo;
+import org.apache.tez.history.parser.datamodel.DagInfo;
+import org.apache.tez.history.parser.datamodel.EdgeInfo;
+import org.apache.tez.history.parser.datamodel.TaskAttemptInfo;
+import org.apache.tez.history.parser.datamodel.TaskInfo;
+import org.apache.tez.history.parser.datamodel.VersionInfo;
+import org.apache.tez.history.parser.datamodel.VertexInfo;
+import org.apache.tez.mapreduce.input.MRInput;
+import org.apache.tez.mapreduce.output.MROutput;
+import org.apache.tez.mapreduce.processor.SimpleMRProcessor;
+import org.apache.tez.runtime.api.ProcessorContext;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfig;
+import org.apache.tez.runtime.library.input.OrderedGroupedKVInput;
+import org.apache.tez.runtime.library.output.OrderedPartitionedKVOutput;
+import org.apache.tez.runtime.library.partitioner.HashPartitioner;
+import org.apache.tez.tests.MiniTezClusterWithTimeline;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class TestHistoryParser {
+
+  private static MiniDFSCluster miniDFSCluster;
+  private static MiniTezClusterWithTimeline miniTezCluster;
+
+  //location within miniHDFS cluster's hdfs
+  private static Path inputLoc = new Path("/tmp/sample.txt");
+
+  private final static String INPUT = "Input";
+  private final static String OUTPUT = "Output";
+  private final static String TOKENIZER = "Tokenizer";
+  private final static String SUMMATION = "Summation";
+  private final static String SIMPLE_HISTORY_DIR = "/tmp/simplehistory/";
+  private final static String HISTORY_TXT = "history.txt";
+
+  private static Configuration conf = new Configuration();
+  private static FileSystem fs;
+  private static String TEST_ROOT_DIR =
+      "target" + Path.SEPARATOR + TestHistoryParser.class.getName() + 
"-tmpDir";
+  private static String TEZ_BASE_DIR =
+      "target" + Path.SEPARATOR + TestHistoryParser.class.getName() + "-tez";
+  private static String DOWNLOAD_DIR = TEST_ROOT_DIR + Path.SEPARATOR + 
"download";
+
+  @BeforeClass
+  public static void setupCluster() throws Exception {
+    conf = new Configuration();
+    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_EDITS_NOEDITLOGCHANNELFLUSH, 
false);
+    EditLogFileOutputStream.setShouldSkipFsyncForTesting(true);
+    conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR);
+    miniDFSCluster =
+        new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build();
+    fs = miniDFSCluster.getFileSystem();
+    conf.set("fs.defaultFS", fs.getUri().toString());
+
+    setupTezCluster();
+  }
+
+  @AfterClass
+  public static void shutdownCluster() {
+    try {
+      if (miniDFSCluster != null) {
+        miniDFSCluster.shutdown();
+      }
+      if (miniTezCluster != null) {
+        miniTezCluster.stop();
+      }
+    } finally {
+      try {
+        FileUtils.deleteDirectory(new File(TEST_ROOT_DIR));
+        FileUtils.deleteDirectory(new File(TEZ_BASE_DIR));
+      } catch (IOException e) {
+        //safe to ignore
+      }
+    }
+  }
+
+  // @Before
+  public static void setupTezCluster() throws Exception {
+    conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_CONNECT_TIMEOUT, 3 
* 1000);
+    conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT, 3 * 
1000);
+    
conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_FAILURES_LIMIT, 
2);
+
+    //Enable per edge counters
+    conf.setBoolean(TezConfiguration.TEZ_TASK_GENERATE_COUNTERS_PER_IO, true);
+    conf.setBoolean(TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS, 
true);
+    conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
+    conf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS, 
ATSHistoryLoggingService
+        .class.getName());
+
+    conf.set(TezConfiguration.TEZ_SIMPLE_HISTORY_LOGGING_DIR, 
SIMPLE_HISTORY_DIR);
+
+    miniTezCluster =
+        new MiniTezClusterWithTimeline(TEZ_BASE_DIR, 1, 1, 1, true);
+
+    miniTezCluster.init(conf);
+    miniTezCluster.start();
+
+    createSampleFile(inputLoc);
+
+    TezConfiguration tezConf = new 
TezConfiguration(miniTezCluster.getConfig());
+    tezConf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
+    tezConf.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS, 
"0.0.0.0:8188");
+    
tezConf.setBoolean(TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS, 
true);
+    tezConf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS,
+        ATSHistoryLoggingService.class.getName());
+
+  }
+
+
+  /**
+   * Run a word count example in mini cluster and check if it is possible to 
download
+   * data from ATS and parse it. Also, run with SimpleHistoryLogging option 
and verify
+   * if it matches with ATS data.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testParserWithSuccessfulJob() throws Exception {
+    //Run basic word count example.
+    String dagId = runWordCount(WordCount.TokenProcessor.class.getName(),
+        WordCount.SumProcessor.class.getName(), "WordCount", true);
+
+    //Export the data from ATS
+    String[] args = { "--dagId=" + dagId, "--downloadDir=" + DOWNLOAD_DIR };
+
+    int result = ATSImportTool.process(args);
+    assertTrue(result == 0);
+
+    //Parse ATS data and verify results
+    DagInfo dagInfoFromATS = getDagInfo(dagId);
+    verifyDagInfo(dagInfoFromATS, true);
+    verifyJobSpecificInfo(dagInfoFromATS);
+
+    //Now run with SimpleHistoryLogging
+    dagId = runWordCount(WordCount.TokenProcessor.class.getName(),
+        WordCount.SumProcessor.class.getName(), "WordCount", false);
+    Thread.sleep(10000); //For all flushes to happen and to avoid half-cooked 
download.
+
+    DagInfo shDagInfo = getDagInfoFromSimpleHistory(dagId);
+    verifyDagInfo(shDagInfo, false);
+    verifyJobSpecificInfo(shDagInfo);
+
+    //Compare dagInfo by parsing ATS data with DagInfo obtained by parsing 
SimpleHistoryLog
+    isDAGEqual(dagInfoFromATS, shDagInfo);
+  }
+
+  private DagInfo getDagInfoFromSimpleHistory(String dagId) throws 
TezException, IOException {
+    TezDAGID tezDAGID = TezDAGID.fromString(dagId);
+    ApplicationAttemptId applicationAttemptId = 
ApplicationAttemptId.newInstance(tezDAGID
+        .getApplicationId(), 1);
+    Path historyPath = new Path(conf.get("fs.defaultFS")
+        + SIMPLE_HISTORY_DIR + HISTORY_TXT + "."
+        + applicationAttemptId);
+    FileSystem fs = historyPath.getFileSystem(conf);
+
+    Path localPath = new Path(DOWNLOAD_DIR, HISTORY_TXT);
+    fs.copyToLocalFile(historyPath, localPath);
+    File localFile = new File(DOWNLOAD_DIR, HISTORY_TXT);
+
+    //Now parse via SimpleHistory
+    SimpleHistoryParser parser = new SimpleHistoryParser(localFile);
+    DagInfo dagInfo = parser.getDAGData(dagId);
+    assertTrue(dagInfo.getDagId().equals(dagId));
+    return dagInfo;
+  }
+
+  private void verifyJobSpecificInfo(DagInfo dagInfo) {
+    //Job specific
+    assertTrue(dagInfo.getNumVertices() == 2);
+    assertTrue(dagInfo.getName().equals("WordCount"));
+    assertTrue(dagInfo.getVertex(TOKENIZER).getProcessorClassName().equals(
+        WordCount.TokenProcessor.class.getName()));
+    assertTrue(dagInfo.getVertex(SUMMATION).getProcessorClassName()
+        .equals(WordCount.SumProcessor.class.getName()));
+    assertTrue(dagInfo.getEdges().size() == 1);
+    EdgeInfo edgeInfo = dagInfo.getEdges().iterator().next();
+    assertTrue(edgeInfo.getDataMovementType().
+        equals(EdgeProperty.DataMovementType.SCATTER_GATHER.toString()));
+    assertTrue(edgeInfo.getSourceVertex().getVertexName().equals(TOKENIZER));
+    
assertTrue(edgeInfo.getDestinationVertex().getVertexName().equals(SUMMATION));
+    assertTrue(edgeInfo.getInputVertexName().equals(TOKENIZER));
+    assertTrue(edgeInfo.getOutputVertexName().equals(SUMMATION));
+    
assertTrue(edgeInfo.getEdgeSourceClass().equals(OrderedPartitionedKVOutput.class.getName()));
+    
assertTrue(edgeInfo.getEdgeDestinationClass().equals(OrderedGroupedKVInput.class.getName()));
+    assertTrue(dagInfo.getVertices().size() == 2);
+    String lastSourceTA = null;
+    String lastDataEventSourceTA = null;
+    for (VertexInfo vertexInfo : dagInfo.getVertices()) {
+      assertTrue(vertexInfo.getKilledTasksCount() == 0);
+      assertTrue(vertexInfo.getInitRequestedTime() > 0);
+      assertTrue(vertexInfo.getInitTime() > 0);
+      assertTrue(vertexInfo.getStartRequestedTime() > 0);
+      assertTrue(vertexInfo.getStartTime() > 0);
+      assertTrue(vertexInfo.getFinishTime() > 0);
+      long finishTime = 0;
+      for (TaskInfo taskInfo : vertexInfo.getTasks()) {
+        assertTrue(taskInfo.getNumberOfTaskAttempts() == 1);
+        assertTrue(taskInfo.getMaxTaskAttemptDuration() >= 0);
+        assertTrue(taskInfo.getMinTaskAttemptDuration() >= 0);
+        assertTrue(taskInfo.getAvgTaskAttemptDuration() >= 0);
+        assertTrue(taskInfo.getLastTaskAttemptToFinish() != null);
+        assertTrue(taskInfo.getContainersMapping().size() > 0);
+        assertTrue(taskInfo.getSuccessfulTaskAttempts().size() > 0);
+        assertTrue(taskInfo.getFailedTaskAttempts().size() == 0);
+        assertTrue(taskInfo.getKilledTaskAttempts().size() == 0);
+        List<TaskAttemptInfo> attempts = taskInfo.getTaskAttempts();
+        if (vertexInfo.getVertexName().equals(TOKENIZER)) {
+          // get the last task to finish and track its successful attempt
+          if (finishTime < taskInfo.getFinishTime()) {
+            finishTime = taskInfo.getFinishTime();
+            lastSourceTA = taskInfo.getSuccessfulAttemptId();
+          }
+        } else {
+          for (TaskAttemptInfo attempt : attempts) {
+            assertTrue(attempt.getLastDataEventTime() > 0);
+            if (lastDataEventSourceTA == null) {
+              lastDataEventSourceTA = attempt.getLastDataEventSourceTA();
+            } else {
+              // all attempts should have the same last data event source TA
+              
assertTrue(lastDataEventSourceTA.equals(attempt.getLastDataEventSourceTA()));
+            }
+          }
+        }
+        for (TaskAttemptInfo attemptInfo : taskInfo.getTaskAttempts()) {
+          assertTrue(attemptInfo.getStartTimeInterval() > 0);
+          assertTrue(attemptInfo.getScheduledTimeInterval() > 0);
+        }
+      }
+      assertTrue(vertexInfo.getLastTaskToFinish() != null);
+      if (vertexInfo.getVertexName().equals(TOKENIZER)) {
+        assertTrue(vertexInfo.getInputEdges().size() == 0);
+        assertTrue(vertexInfo.getOutputEdges().size() == 1);
+        assertTrue(vertexInfo.getOutputVertices().size() == 1);
+        assertTrue(vertexInfo.getInputVertices().size() == 0);
+      } else {
+        assertTrue(vertexInfo.getInputEdges().size() == 1);
+        assertTrue(vertexInfo.getOutputEdges().size() == 0);
+        assertTrue(vertexInfo.getOutputVertices().size() == 0);
+        assertTrue(vertexInfo.getInputVertices().size() == 1);
+      }
+    }
+    assertTrue(lastSourceTA.equals(lastDataEventSourceTA));
+  }
+
+  /**
+   * Run a word count example in mini cluster.
+   * Provide invalid URL for ATS.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testParserWithSuccessfulJob_InvalidATS() throws Exception {
+    //Run basic word count example.
+    String dagId =  runWordCount(WordCount.TokenProcessor.class.getName(),
+        WordCount.SumProcessor.class.getName(), "WordCount-With-WrongATS-URL", 
true);
+
+    //Export the data from ATS
+    String atsAddress = "--atsAddress=http://atsHost:8188";;
+    String[] args = { "--dagId=" + dagId,
+        "--downloadDir=" + DOWNLOAD_DIR,
+        atsAddress
+      };
+
+    int result = ATSImportTool.process(args);
+    assertTrue(result == -1);
+  }
+
+  /**
+   * Run a failed job and parse the data from ATS
+   */
+  @Test
+  public void testParserWithFailedJob() throws Exception {
+    //Run a job which would fail
+    String dagId = runWordCount(WordCount.TokenProcessor.class.getName(), 
FailProcessor.class
+        .getName(), "WordCount-With-Exception", true);
+
+    //Export the data from ATS
+    String[] args = { "--dagId=" + dagId, "--downloadDir=" + DOWNLOAD_DIR };
+
+    int result = ATSImportTool.process(args);
+    assertTrue(result == 0);
+
+    //Parse ATS data
+    DagInfo dagInfo = getDagInfo(dagId);
+
+    //Verify DAGInfo. Verifies vertex, task, taskAttempts in recursive manner
+    verifyDagInfo(dagInfo, true);
+
+    //Dag specific
+    VertexInfo summationVertex = dagInfo.getVertex(SUMMATION);
+    assertTrue(summationVertex.getFailedTasks().size() == 1); //1 task, 4 
attempts failed
+    
assertTrue(summationVertex.getFailedTasks().get(0).getFailedTaskAttempts().size()
 == 4);
+    
assertTrue(summationVertex.getStatus().equals(VertexState.FAILED.toString()));
+
+    assertTrue(dagInfo.getFailedVertices().size() == 1);
+    
assertTrue(dagInfo.getFailedVertices().get(0).getVertexName().equals(SUMMATION));
+    assertTrue(dagInfo.getSuccessfullVertices().size() == 1);
+    
assertTrue(dagInfo.getSuccessfullVertices().get(0).getVertexName().equals(TOKENIZER));
+
+    assertTrue(dagInfo.getStatus().equals(DAGState.FAILED.toString()));
+
+    verifyCounter(dagInfo.getCounter(DAGCounter.NUM_FAILED_TASKS.toString()), 
null, 4);
+    
verifyCounter(dagInfo.getCounter(DAGCounter.NUM_SUCCEEDED_TASKS.toString()), 
null, 1);
+    
verifyCounter(dagInfo.getCounter(DAGCounter.TOTAL_LAUNCHED_TASKS.toString()), 
null, 5);
+
+    
verifyCounter(dagInfo.getCounter(TaskCounter.INPUT_RECORDS_PROCESSED.toString()),
+        "TaskCounter_Tokenizer_INPUT_Input", 10);
+    
verifyCounter(dagInfo.getCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_READ.toString()),
+        "TaskCounter_Tokenizer_OUTPUT_Summation", 0);
+    verifyCounter(dagInfo.getCounter(TaskCounter.OUTPUT_RECORDS.toString()),
+        "TaskCounter_Tokenizer_OUTPUT_Summation",
+        20); //Every line has 2 words. 10 lines x 2 words = 20
+    verifyCounter(dagInfo.getCounter(TaskCounter.SPILLED_RECORDS.toString()),
+        "TaskCounter_Tokenizer_OUTPUT_Summation", 20); //Same as above
+
+    for (TaskInfo taskInfo : summationVertex.getTasks()) {
+      String lastAttemptId = null;
+      for (TaskAttemptInfo attemptInfo : taskInfo.getTaskAttempts()) {
+        if (lastAttemptId != null) {
+          // failed attempt should be causal TA of next attempt
+          
assertTrue(lastAttemptId.equals(attemptInfo.getSchedulingCausalTA()));
+        }
+        lastAttemptId = attemptInfo.getTaskAttemptId();
+      }
+    }
+
+    //TODO: Need to check for SUMMATION vertex counters. Since all attempts 
are failed, counters are not getting populated.
+    //TaskCounter.REDUCE_INPUT_RECORDS
+
+    //Verify if the processor exception is given in diagnostics
+    assertTrue(dagInfo.getDiagnostics().contains("Failing this processor for 
some reason"));
+
+  }
+
+  /**
+   * Adding explicit equals here instead of in DAG/Vertex/Edge where hashCode 
also needs to
+   * change. Also, some custom comparisons are done here for unit testing.
+   */
+  private void isDAGEqual(DagInfo dagInfo1, DagInfo dagInfo2) {
+    assertNotNull(dagInfo1);
+    assertNotNull(dagInfo2);
+    assertEquals(dagInfo1.getStatus(), dagInfo2.getStatus());
+    isEdgeEqual(dagInfo1.getEdges(), dagInfo2.getEdges());
+    isVertexEqual(dagInfo1.getVertices(), dagInfo2.getVertices());
+  }
+
+  private void isVertexEqual(VertexInfo vertexInfo1, VertexInfo vertexInfo2) {
+    assertTrue(vertexInfo1 != null);
+    assertTrue(vertexInfo2 != null);
+    
assertTrue(vertexInfo1.getVertexName().equals(vertexInfo2.getVertexName()));
+    
assertTrue(vertexInfo1.getProcessorClassName().equals(vertexInfo2.getProcessorClassName()));
+    assertTrue(vertexInfo1.getNumTasks() == vertexInfo2.getNumTasks());
+    assertTrue(vertexInfo1.getCompletedTasksCount() == 
vertexInfo2.getCompletedTasksCount());
+    assertTrue(vertexInfo1.getStatus().equals(vertexInfo2.getStatus()));
+
+    isEdgeEqual(vertexInfo1.getInputEdges(), vertexInfo2.getInputEdges());
+    isEdgeEqual(vertexInfo1.getOutputEdges(), vertexInfo2.getOutputEdges());
+
+    assertTrue(vertexInfo1.getInputVertices().size() == 
vertexInfo2.getInputVertices().size());
+    assertTrue(vertexInfo1.getOutputVertices().size() == 
vertexInfo2.getOutputVertices().size());
+
+    assertTrue(vertexInfo1.getNumTasks() == vertexInfo2.getNumTasks());
+    isTaskEqual(vertexInfo1.getTasks(), vertexInfo2.getTasks());
+  }
+
+  private void isVertexEqual(List<VertexInfo> vertexList1, List<VertexInfo> 
vertexList2) {
+    assertTrue("Vertices sizes should be the same", vertexList1.size() == 
vertexList2.size());
+    Iterator<VertexInfo> it1 = vertexList1.iterator();
+    Iterator<VertexInfo> it2 = vertexList2.iterator();
+    while (it1.hasNext()) {
+      assertTrue(it2.hasNext());
+      VertexInfo info1 = it1.next();
+      VertexInfo info2 = it2.next();
+      isVertexEqual(info1, info2);
+    }
+  }
+
+  private void isEdgeEqual(EdgeInfo edgeInfo1, EdgeInfo edgeInfo2) {
+    assertTrue(edgeInfo1 != null);
+    assertTrue(edgeInfo2 != null);
+    String info1 = edgeInfo1.toString();
+    String info2 = edgeInfo1.toString();
+    assertTrue(info1.equals(info2));
+  }
+
+  private void isEdgeEqual(Collection<EdgeInfo> info1, Collection<EdgeInfo> 
info2) {
+    assertTrue("sizes should be the same", info1.size() == info1.size());
+    Iterator<EdgeInfo> it1 = info1.iterator();
+    Iterator<EdgeInfo> it2 = info2.iterator();
+    while (it1.hasNext()) {
+      assertTrue(it2.hasNext());
+      isEdgeEqual(it1.next(), it2.next());
+    }
+  }
+
+  private void isTaskEqual(Collection<TaskInfo> info1, Collection<TaskInfo> 
info2) {
+    assertTrue("sizes should be the same", info1.size() == info1.size());
+    Iterator<TaskInfo> it1 = info1.iterator();
+    Iterator<TaskInfo> it2 = info2.iterator();
+    while (it1.hasNext()) {
+      assertTrue(it2.hasNext());
+      isTaskEqual(it1.next(), it2.next());
+    }
+  }
+
+  private void isTaskEqual(TaskInfo taskInfo1, TaskInfo taskInfo2) {
+    assertTrue(taskInfo1 != null);
+    assertTrue(taskInfo2 != null);
+    assertTrue(taskInfo1.getVertexInfo() != null);
+    assertTrue(taskInfo2.getVertexInfo() != null);
+    assertTrue(taskInfo1.getStatus().equals(taskInfo2.getStatus()));
+    assertTrue(
+        taskInfo1.getVertexInfo().getVertexName()
+            .equals(taskInfo2.getVertexInfo().getVertexName()));
+    isTaskAttemptEqual(taskInfo1.getTaskAttempts(), 
taskInfo2.getTaskAttempts());
+
+    //Verify counters
+    isCountersSame(taskInfo1, taskInfo2);
+  }
+
+  private void isCountersSame(BaseInfo info1, BaseInfo info2) {
+    isCounterSame(info1.getCounter(TaskCounter.ADDITIONAL_SPILL_COUNT.name()),
+        info2.getCounter(TaskCounter.ADDITIONAL_SPILL_COUNT.name()));
+
+    isCounterSame(info1.getCounter(TaskCounter.SPILLED_RECORDS.name()),
+        info2.getCounter(TaskCounter.SPILLED_RECORDS.name()));
+
+    isCounterSame(info1.getCounter(TaskCounter.OUTPUT_RECORDS.name()),
+        info2.getCounter(TaskCounter.OUTPUT_RECORDS.name()));
+
+    isCounterSame(info1.getCounter(TaskCounter.OUTPUT_BYTES.name()),
+        info2.getCounter(TaskCounter.OUTPUT_BYTES.name()));
+
+    isCounterSame(info1.getCounter(TaskCounter.OUTPUT_RECORDS.name()),
+        info2.getCounter(TaskCounter.OUTPUT_RECORDS.name()));
+
+    isCounterSame(info1.getCounter(TaskCounter.REDUCE_INPUT_GROUPS.name()),
+        info2.getCounter(TaskCounter.REDUCE_INPUT_GROUPS.name()));
+
+    isCounterSame(info1.getCounter(TaskCounter.REDUCE_INPUT_RECORDS.name()),
+        info2.getCounter(TaskCounter.REDUCE_INPUT_RECORDS.name()));
+  }
+
+  private void isCounterSame(Map<String, TezCounter> counter1, Map<String, 
TezCounter> counter2) {
+    for (Map.Entry<String, TezCounter> entry : counter1.entrySet()) {
+      String source = entry.getKey();
+      long val = entry.getValue().getValue();
+
+      //check if other counter has the same value
+      assertTrue(counter2.containsKey(entry.getKey()));
+      assertTrue(counter2.get(entry.getKey()).getValue() == val);
+    }
+  }
+
+  private void isTaskAttemptEqual(Collection<TaskAttemptInfo> info1,
+      Collection<TaskAttemptInfo> info2) {
+    assertTrue("sizes should be the same", info1.size() == info1.size());
+    Iterator<TaskAttemptInfo> it1 = info1.iterator();
+    Iterator<TaskAttemptInfo> it2 = info2.iterator();
+    while (it1.hasNext()) {
+      assertTrue(it2.hasNext());
+      isTaskAttemptEqual(it1.next(), it2.next());
+    }
+  }
+
+  private void isTaskAttemptEqual(TaskAttemptInfo info1, TaskAttemptInfo 
info2) {
+    assertTrue(info1 != null);
+    assertTrue(info2 != null);
+    assertTrue(info1.getTaskInfo() != null);
+    assertTrue(info2.getTaskInfo() != null);
+    assertTrue(info1.getStatus().equals(info2.getStatus()));
+    
assertTrue(info1.getTaskInfo().getVertexInfo().getVertexName().equals(info2.getTaskInfo()
+        .getVertexInfo().getVertexName()));
+
+    //Verify counters
+    isCountersSame(info1, info2);
+  }
+
+
+  /**
+   * Create sample file for wordcount program
+   *
+   * @param inputLoc
+   * @throws IOException
+   */
+  private static void createSampleFile(Path inputLoc) throws IOException {
+    fs.deleteOnExit(inputLoc);
+    FSDataOutputStream out = fs.create(inputLoc);
+    BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(out));
+    for (int i = 0; i < 10; i++) {
+      writer.write("Sample " + RandomStringUtils.randomAlphanumeric(5));
+      writer.newLine();
+    }
+    writer.close();
+  }
+
+  private DagInfo getDagInfo(String dagId) throws TezException {
+    //Parse downloaded contents
+    File downloadedFile = new File(DOWNLOAD_DIR
+        + Path.SEPARATOR + dagId
+        + Path.SEPARATOR + dagId + ".zip");
+    ATSFileParser parser = new ATSFileParser(downloadedFile);
+    DagInfo dagInfo = parser.getDAGData(dagId);
+    assertTrue(dagInfo.getDagId().equals(dagId));
+    return dagInfo;
+  }
+
+  private void verifyCounter(Map<String, TezCounter> counterMap,
+      String counterGroupName, long expectedVal) {
+    //Iterate through group-->tezCounter
+    for (Map.Entry<String, TezCounter> entry : counterMap.entrySet()) {
+      if (counterGroupName != null) {
+        if (entry.getKey().equals(counterGroupName)) {
+          assertTrue(entry.getValue().getValue() == expectedVal);
+        }
+      } else {
+        assertTrue(entry.getValue().getValue() == expectedVal);
+      }
+    }
+  }
+
+  TezClient getTezClient(boolean withTimeline) throws Exception {
+    TezConfiguration tezConf = new 
TezConfiguration(miniTezCluster.getConfig());
+    if (withTimeline) {
+      tezConf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, 
withTimeline);
+      tezConf.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS, 
"0.0.0.0:8188");
+      tezConf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS,
+          ATSHistoryLoggingService.class.getName());
+    } else {
+      tezConf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS,
+          SimpleHistoryLoggingService.class.getName());
+    }
+    
tezConf.setBoolean(TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS, 
true);
+
+    TezClient tezClient = TezClient.create("WordCount", tezConf, false);
+    tezClient.start();
+    tezClient.waitTillReady();
+    return tezClient;
+  }
+
+  private String runWordCount(String tokenizerProcessor, String 
summationProcessor,
+      String dagName, boolean withTimeline)
+      throws Exception {
+    //HDFS path
+    Path outputLoc = new Path("/tmp/outPath_" + System.currentTimeMillis());
+
+    DataSourceDescriptor dataSource = MRInput.createConfigBuilder(conf,
+        TextInputFormat.class, inputLoc.toString()).build();
+
+    DataSinkDescriptor dataSink =
+        MROutput.createConfigBuilder(conf, TextOutputFormat.class, 
outputLoc.toString()).build();
+
+    Vertex tokenizerVertex = Vertex.create(TOKENIZER, 
ProcessorDescriptor.create(
+        tokenizerProcessor)).addDataSource(INPUT, dataSource);
+
+    OrderedPartitionedKVEdgeConfig edgeConf = OrderedPartitionedKVEdgeConfig
+        .newBuilder(Text.class.getName(), IntWritable.class.getName(),
+            HashPartitioner.class.getName()).build();
+
+    Vertex summationVertex = Vertex.create(SUMMATION,
+        ProcessorDescriptor.create(summationProcessor), 1).addDataSink(OUTPUT, 
dataSink);
+
+    // Create DAG and add the vertices. Connect the producer and consumer 
vertices via the edge
+    DAG dag = DAG.create(dagName);
+    dag.addVertex(tokenizerVertex).addVertex(summationVertex).addEdge(
+        Edge.create(tokenizerVertex, summationVertex, 
edgeConf.createDefaultEdgeProperty()));
+
+    TezClient tezClient = getTezClient(withTimeline);
+    DAGClient client = tezClient.submitDAG(dag);
+    
client.waitForCompletionWithStatusUpdates(Sets.newHashSet(StatusGetOpts.GET_COUNTERS));
+    TezDAGID tezDAGID = 
TezDAGID.getInstance(tezClient.getAppMasterApplicationId(), 1);
+
+    if (tezClient != null) {
+      tezClient.stop();
+    }
+    return tezDAGID.toString();
+  }
+
+  /**
+   * Processor which would just throw exception.
+   */
+  public static class FailProcessor extends SimpleMRProcessor {
+    public FailProcessor(ProcessorContext context) {
+      super(context);
+    }
+
+    @Override
+    public void run() throws Exception {
+      throw new Exception("Failing this processor for some reason");
+    }
+  }
+
+  private void verifyDagInfo(DagInfo dagInfo, boolean ats) {
+    if (ats) {
+      VersionInfo versionInfo = dagInfo.getVersionInfo();
+      assertTrue(versionInfo != null); //should be present post 0.5.4
+      assertTrue(versionInfo.getVersion() != null);
+      assertTrue(versionInfo.getRevision() != null);
+      assertTrue(versionInfo.getBuildTime() != null);
+    }
+
+    assertTrue(dagInfo.getStartTime() > 0);
+    assertTrue(dagInfo.getFinishTimeInterval() > 0);
+    assertTrue(dagInfo.getStartTimeInterval() == 0);
+    assertTrue(dagInfo.getStartTime() > 0);
+    if (dagInfo.getStatus().equalsIgnoreCase(DAGState.SUCCEEDED.toString())) {
+      assertTrue(dagInfo.getFinishTime() >= dagInfo.getStartTime());
+    }
+    assertTrue(dagInfo.getFinishTimeInterval() > 
dagInfo.getStartTimeInterval());
+
+    assertTrue(dagInfo.getStartTime() > dagInfo.getSubmitTime());
+    assertTrue(dagInfo.getTimeTaken() > 0);
+
+    //Verify all vertices
+    for (VertexInfo vertexInfo : dagInfo.getVertices()) {
+      verifyVertex(vertexInfo, vertexInfo.getFailedTasksCount() > 0);
+    }
+
+    VertexInfo fastestVertex = dagInfo.getFastestVertex();
+    assertTrue(fastestVertex != null);
+
+    if (dagInfo.getStatus().equals(DAGState.SUCCEEDED)) {
+      assertTrue(dagInfo.getSlowestVertex() != null);
+    }
+  }
+
+  private void verifyVertex(VertexInfo vertexInfo, boolean hasFailedTasks) {
+    assertTrue(vertexInfo != null);
+    if (hasFailedTasks) {
+      assertTrue(vertexInfo.getFailedTasksCount() > 0);
+    }
+    assertTrue(vertexInfo.getStartTimeInterval() > 0);
+    assertTrue(vertexInfo.getStartTime() > 0);
+    assertTrue(vertexInfo.getFinishTimeInterval() > 0);
+    assertTrue(vertexInfo.getStartTimeInterval() < 
vertexInfo.getFinishTimeInterval());
+    assertTrue(vertexInfo.getVertexName() != null);
+    if (!hasFailedTasks) {
+      assertTrue(vertexInfo.getFinishTime() > 0);
+      assertTrue(vertexInfo.getFailedTasks().size() == 0);
+      assertTrue(vertexInfo.getSucceededTasksCount() == 
vertexInfo.getSuccessfulTasks().size());
+      assertTrue(vertexInfo.getFailedTasksCount() == 0);
+      assertTrue(vertexInfo.getAvgTaskDuration() > 0);
+      assertTrue(vertexInfo.getMaxTaskDuration() > 0);
+      assertTrue(vertexInfo.getMinTaskDuration() > 0);
+      assertTrue(vertexInfo.getTimeTaken() > 0);
+      
assertTrue(vertexInfo.getStatus().equalsIgnoreCase(VertexState.SUCCEEDED.toString()));
+      assertTrue(vertexInfo.getCompletedTasksCount() > 0);
+      assertTrue(vertexInfo.getFirstTaskToStart() != null);
+      assertTrue(vertexInfo.getSucceededTasksCount() > 0);
+      assertTrue(vertexInfo.getTasks().size() > 0);
+    }
+
+    for (TaskInfo taskInfo : vertexInfo.getTasks()) {
+      if (taskInfo.getStatus().equals(TaskState.SUCCEEDED.toString())) {
+        verifyTask(taskInfo, false);
+      }
+    }
+
+    for (TaskInfo taskInfo : vertexInfo.getFailedTasks()) {
+      verifyTask(taskInfo, true);
+    }
+
+    assertTrue(vertexInfo.getProcessorClassName() != null);
+    assertTrue(vertexInfo.getStatus() != null);
+    assertTrue(vertexInfo.getDagInfo() != null);
+    assertTrue(vertexInfo.getInitTimeInterval() > 0);
+    assertTrue(vertexInfo.getNumTasks() > 0);
+  }
+
+  private void verifyTask(TaskInfo taskInfo, boolean hasFailedAttempts) {
+    assertTrue(taskInfo != null);
+    assertTrue(taskInfo.getStatus() != null);
+    assertTrue(taskInfo.getStartTimeInterval() > 0);
+
+    //Not testing for killed attempts. So if there are no failures, it should 
succeed
+    if (!hasFailedAttempts) {
+      assertTrue(taskInfo.getStatus().equals(TaskState.SUCCEEDED.toString()));
+      assertTrue(taskInfo.getFinishTimeInterval() > 0 && 
taskInfo.getFinishTime() > taskInfo
+          .getFinishTimeInterval());
+      assertTrue(
+          taskInfo.getStartTimeInterval() > 0 && taskInfo.getStartTime() > 
taskInfo.getStartTimeInterval());
+      assertTrue(taskInfo.getSuccessfulAttemptId() != null);
+      assertTrue(taskInfo.getSuccessfulTaskAttempt() != null);
+    }
+    assertTrue(taskInfo.getTaskId() != null);
+
+    for (TaskAttemptInfo attemptInfo : taskInfo.getTaskAttempts()) {
+      verifyTaskAttemptInfo(attemptInfo);
+    }
+  }
+
+  private void verifyTaskAttemptInfo(TaskAttemptInfo attemptInfo) {
+    if (attemptInfo.getStatus() != null && attemptInfo.getStatus()
+        .equals(TaskAttemptState.SUCCEEDED)) {
+      assertTrue(attemptInfo.getStartTimeInterval() > 0);
+      assertTrue(attemptInfo.getFinishTimeInterval() > 0);
+      assertTrue(attemptInfo.getStartTime() > 0);
+      assertTrue(attemptInfo.getFinishTime() > 0);
+      assertTrue(attemptInfo.getFinishTime() > attemptInfo.getStartTime());
+      assertTrue(attemptInfo.getFinishTime() > 
attemptInfo.getFinishTimeInterval());
+      assertTrue(attemptInfo.getStartTime() > 
attemptInfo.getStartTimeInterval());
+      assertTrue(attemptInfo.getNodeId() != null);
+      assertTrue(attemptInfo.getTimeTaken() != -1);
+      assertTrue(attemptInfo.getEvents() != null);
+      assertTrue(attemptInfo.getTezCounters() != null);
+      assertTrue(attemptInfo.getContainer() != null);
+    }
+    assertTrue(attemptInfo.getTaskInfo() != null);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/ecd90dc1/tez-tools/analyzers/job-analyzer/pom.xml
----------------------------------------------------------------------
diff --git a/tez-tools/analyzers/job-analyzer/pom.xml 
b/tez-tools/analyzers/job-analyzer/pom.xml
index fe28b14..36b12fe 100644
--- a/tez-tools/analyzers/job-analyzer/pom.xml
+++ b/tez-tools/analyzers/job-analyzer/pom.xml
@@ -38,6 +38,15 @@
       <artifactId>tez-history-parser</artifactId>
       <version>${project.version}</version>
     </dependency>
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.plutext</groupId>
+      <artifactId>jaxb-svg11</artifactId>
+      <version>1.0.2</version>
+    </dependency>
   </dependencies>
 
   <build>

http://git-wip-us.apache.org/repos/asf/tez/blob/ecd90dc1/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/CSVResult.java
----------------------------------------------------------------------
diff --git 
a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/CSVResult.java
 
b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/CSVResult.java
index 4151a90..27ad95e 100644
--- 
a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/CSVResult.java
+++ 
b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/CSVResult.java
@@ -19,15 +19,14 @@
 package org.apache.tez.analyzer;
 
 import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
-import com.google.common.base.Strings;
 import org.apache.tez.dag.api.TezException;
 
 import java.io.BufferedWriter;
 import java.io.File;
 import java.io.FileOutputStream;
-import java.io.FileWriter;
 import java.io.IOException;
 import java.io.OutputStreamWriter;
 import java.nio.charset.Charset;
@@ -99,7 +98,7 @@ public class CSVResult implements Result {
 
       StringBuilder sb = new StringBuilder();
       for(int i=0;i<record.length;i++) {
-        sb.append(Strings.isNullOrEmpty(record[i]) ? record[i] : " ");
+        sb.append(!Strings.isNullOrEmpty(record[i]) ? record[i] : " ");
         if (i < record.length - 1) {
           sb.append(",");
         }

http://git-wip-us.apache.org/repos/asf/tez/blob/ecd90dc1/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java
----------------------------------------------------------------------
diff --git 
a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java
 
b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java
index 6748f3f..88d45f3 100644
--- 
a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java
+++ 
b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java
@@ -19,6 +19,8 @@
 package org.apache.tez.analyzer.plugins;
 
 import com.google.common.base.Functions;
+import com.google.common.base.Splitter;
+import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableSortedMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -26,10 +28,13 @@ import com.google.common.collect.Ordering;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.tez.analyzer.Analyzer;
 import org.apache.tez.analyzer.CSVResult;
+import org.apache.tez.analyzer.utils.Utils;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.history.parser.datamodel.DagInfo;
 import org.apache.tez.history.parser.datamodel.VertexInfo;
 
+import java.io.File;
+import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 
@@ -43,24 +48,43 @@ public class CriticalPathAnalyzer implements Analyzer {
 
   private final CSVResult csvResult;
 
+  private static final String DOT_FILE_DIR = 
"tez.critical-path.analyzer.dot.output.loc";
+  private static final String DOT_FILE_DIR_DEFAULT = "."; //current directory
+
+  private final String dotFileLocation;
+
+  private static final String CONNECTOR = "-->";
+
   public CriticalPathAnalyzer(Configuration config) {
     this.config = config;
     this.csvResult = new CSVResult(headers);
+    this.dotFileLocation = config.get(DOT_FILE_DIR, DOT_FILE_DIR_DEFAULT);
   }
 
   @Override public void analyze(DagInfo dagInfo) throws TezException {
     Map<String, Long> result = Maps.newLinkedHashMap();
     getCriticalPath("", dagInfo.getVertices().get(dagInfo.getVertices().size() 
- 1), 0, result);
 
-    System.out.println();
-    System.out.println();
-
-    for (Map.Entry<String, Long> entry : sortByValues(result).entrySet()) {
+    Map<String, Long> sortedByValues = sortByValues(result);
+    for (Map.Entry<String, Long> entry : sortedByValues.entrySet()) {
       List<String> record = Lists.newLinkedList();
       record.add(entry.getKey());
       record.add(entry.getValue() + "");
       csvResult.addRecord(record.toArray(new String[record.size()]));
-      System.out.println(entry.getKey() + ", " + entry.getValue());
+    }
+
+    String dotFile = dotFileLocation + File.separator + dagInfo.getDagId() + 
".dot";
+    try {
+      List<String> criticalVertices = null;
+      if (!sortedByValues.isEmpty()) {
+        String criticalPath = sortedByValues.keySet().iterator().next();
+        criticalVertices = getVertexNames(criticalPath);
+      } else {
+        criticalVertices = Lists.newLinkedList();
+      }
+      Utils.generateDAGVizFile(dagInfo, dotFile, criticalVertices);
+    } catch (IOException e) {
+      throw new TezException(e);
     }
   }
 
@@ -98,7 +122,7 @@ public class CriticalPathAnalyzer implements Analyzer {
 
     if (dest != null) {
       time += dest.getTimeTaken();
-      predecessor += destVertexName + "-->";
+      predecessor += destVertexName + CONNECTOR;
 
       for (VertexInfo incomingVertex : dest.getInputVertices()) {
         getCriticalPath(predecessor, incomingVertex, time, result);
@@ -107,4 +131,12 @@ public class CriticalPathAnalyzer implements Analyzer {
       result.put(predecessor, time);
     }
   }
+
+  private static List<String> getVertexNames(String criticalPath) {
+    if (Strings.isNullOrEmpty(criticalPath)) {
+      return Lists.newLinkedList();
+    }
+    return 
Lists.newLinkedList(Splitter.on(CONNECTOR).trimResults().omitEmptyStrings().split
+        (criticalPath));
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/ecd90dc1/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/LocalityAnalyzer.java
----------------------------------------------------------------------
diff --git 
a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/LocalityAnalyzer.java
 
b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/LocalityAnalyzer.java
index 67b4c51..7ed52da 100644
--- 
a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/LocalityAnalyzer.java
+++ 
b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/LocalityAnalyzer.java
@@ -108,7 +108,8 @@ public class LocalityAnalyzer implements Analyzer {
         record.add(otherTaskResult.avgRuntime + "");
 
         //Get the number of inputs to this vertex
-        record.add(vertexInfo.getInputEdges().size() + "");
+        record.add(vertexInfo.getInputEdges().size()
+            + vertexInfo.getAdditionalInputInfoList().size() + "");
 
         //Get the avg HDFS bytes read in this vertex for different type of 
locality
         record.add(dataLocalResult.avgHDFSBytesRead + "");

Reply via email to