This is an automated email from the ASF dual-hosted git repository.

abstractdog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tez.git


The following commit(s) were added to refs/heads/master by this push:
     new 6487834  TEZ-4105: Tez job-analyzer tool to support proto logging 
history (László Bodor reviewed by Jonathan Turner Eagles)
6487834 is described below

commit 6487834281f8ef5f8afd9f5ba9fba68736d5b44a
Author: László Bodor <[email protected]>
AuthorDate: Wed Jun 3 13:30:01 2020 +0200

    TEZ-4105: Tez job-analyzer tool to support proto logging history (László 
Bodor reviewed by Jonathan Turner Eagles)
    
    Signed-off-by: Laszlo Bodor <[email protected]>
---
 pom.xml                                            |   5 +
 tez-plugins/tez-history-parser/pom.xml             |   4 +
 .../apache/tez/history/parser/ATSFileParser.java   |  13 +-
 .../tez/history/parser/ProtoHistoryParser.java     | 128 ++++
 .../tez/history/parser/SimpleHistoryParser.java    | 244 ++++---
 .../tez/history/parser/datamodel/BaseParser.java   |  21 +
 .../tez/history/parser/datamodel/DagInfo.java      |  23 +-
 .../history/parser/datamodel/TaskAttemptInfo.java  |   2 +-
 .../org/apache/tez/history/TestHistoryParser.java  |   5 +-
 .../logging/proto/HistoryEventProtoConverter.java  |   4 +
 .../proto/HistoryEventProtoJsonConversion.java     | 766 +++++++++++++++++++++
 .../history/logging/proto/ProtoMessageReader.java  |   2 +-
 .../proto/TestHistoryEventProtoConverter.java      |   4 +-
 .../java/org/apache/tez/analyzer/CSVResult.java    |   6 +
 .../tez/analyzer/plugins/AnalyzerDriver.java       |   2 +
 .../TaskAttemptResultStatisticsAnalyzer.java       | 125 ++++
 .../tez/analyzer/plugins/TezAnalyzerBase.java      |  95 ++-
 .../java/org/apache/tez/analyzer/TestAnalyzer.java |   5 +-
 18 files changed, 1323 insertions(+), 131 deletions(-)

diff --git a/pom.xml b/pom.xml
index 7de5322..fe67b50 100644
--- a/pom.xml
+++ b/pom.xml
@@ -198,6 +198,11 @@
       </dependency>
       <dependency>
         <groupId>org.apache.tez</groupId>
+        <artifactId>tez-protobuf-history-plugin</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.tez</groupId>
         <artifactId>tez-yarn-timeline-history</artifactId>
         <version>${project.version}</version>
         <type>test-jar</type>
diff --git a/tez-plugins/tez-history-parser/pom.xml 
b/tez-plugins/tez-history-parser/pom.xml
index 7937fa4..1b1d4e8 100644
--- a/tez-plugins/tez-history-parser/pom.xml
+++ b/tez-plugins/tez-history-parser/pom.xml
@@ -55,6 +55,10 @@
     </dependency>
     <dependency>
       <groupId>org.apache.tez</groupId>
+      <artifactId>tez-protobuf-history-plugin</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
       <artifactId>tez-yarn-timeline-history</artifactId>
       <scope>test</scope>
       <type>test-jar</type>
diff --git 
a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/ATSFileParser.java
 
b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/ATSFileParser.java
index e4720d4..e64fb43 100644
--- 
a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/ATSFileParser.java
+++ 
b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/ATSFileParser.java
@@ -41,6 +41,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.util.Enumeration;
 import java.util.Iterator;
+import java.util.List;
 import java.util.zip.ZipEntry;
 import java.util.zip.ZipFile;
 
@@ -60,10 +61,10 @@ public class ATSFileParser extends BaseParser implements 
ATSData {
 
   private final File atsZipFile;
 
-  public ATSFileParser(File atsZipFile) throws TezException {
+  public ATSFileParser(List<File> files) throws TezException {
     super();
-    Preconditions.checkArgument(atsZipFile.exists(), "Zipfile " + atsZipFile + 
" does not exist");
-    this.atsZipFile = atsZipFile;
+    Preconditions.checkArgument(checkFiles(files), "Zipfile " + files + " are 
empty or they don't exist");
+    this.atsZipFile = files.get(0); //doesn't support multiple files at the 
moment
   }
 
   @Override
@@ -72,14 +73,12 @@ public class ATSFileParser extends BaseParser implements 
ATSData {
       parseATSZipFile(atsZipFile);
 
       linkParsedContents();
+      addRawDataToDagInfo();
 
       return dagInfo;
-    } catch (IOException e) {
+    } catch (IOException | JSONException e) {
       LOG.error("Error in reading DAG ", e);
       throw new TezException(e);
-    } catch (JSONException e) {
-      LOG.error("Error in parsing DAG ", e);
-      throw new TezException(e);
     } catch (InterruptedException e) {
       throw new TezException(e);
     }
diff --git 
a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/ProtoHistoryParser.java
 
b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/ProtoHistoryParser.java
new file mode 100644
index 0000000..d28fd67
--- /dev/null
+++ 
b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/ProtoHistoryParser.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.history.parser;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.tez.common.Preconditions;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
+import 
org.apache.tez.dag.history.logging.proto.HistoryEventProtoJsonConversion;
+import 
org.apache.tez.dag.history.logging.proto.HistoryLoggerProtos.HistoryEventProto;
+import org.apache.tez.dag.history.logging.proto.ProtoMessageReader;
+import org.apache.tez.history.parser.datamodel.DagInfo;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Strings;
+
+/**
+ * Parser utility to parse data generated by ProtoHistoryLoggingService.
+ */
+public class ProtoHistoryParser extends SimpleHistoryParser {
+  private static final Logger LOG = 
LoggerFactory.getLogger(ProtoHistoryParser.class);
+  private List<File> protoFiles;
+
+  public ProtoHistoryParser(List<File> files) {
+    super(files);
+    this.protoFiles = files;
+  }
+
+  /**
+   * Get in-memory representation of DagInfo.
+   *
+   * @return DagInfo
+   * @throws TezException
+   */
+  public DagInfo getDAGData(String dagId) throws TezException {
+    try {
+      Preconditions.checkArgument(!Strings.isNullOrEmpty(dagId), "Please 
provide valid dagId");
+      dagId = dagId.trim();
+      parseContents(dagId);
+      linkParsedContents();
+      addRawDataToDagInfo();
+      return dagInfo;
+    } catch (IOException | JSONException e) {
+      LOG.error("Error in reading DAG ", e);
+      throw new TezException(e);
+    }
+  }
+
+  private void parseContents(String dagId)
+      throws JSONException, FileNotFoundException, TezException, IOException {
+    JSONObjectSource source = getJsonSource();
+    parse(dagId, source);
+  }
+
+  private JSONObjectSource getJsonSource() throws IOException {
+    final TezConfiguration conf = new TezConfiguration();
+
+    Iterator<File> fileIt = protoFiles.iterator();
+
+    JSONObjectSource source = new JSONObjectSource() {
+      private HistoryEventProto message = null;
+      private ProtoMessageReader<HistoryEventProto> reader = new 
ProtoMessageReader<>(conf,
+          new Path(fileIt.next().getPath()), HistoryEventProto.PARSER);
+
+      @Override
+      public JSONObject next() throws JSONException {
+        return HistoryEventProtoJsonConversion.convertToJson(message);
+      }
+
+      @Override
+      public boolean hasNext() throws IOException {
+        try {
+          message = (HistoryEventProto) reader.readEvent();
+          return message != null;
+        } catch (java.io.EOFException e) {
+          reader.close();
+
+          if (!fileIt.hasNext()) {
+            return false;
+          } else {
+            reader = new ProtoMessageReader<>(conf, new 
Path(fileIt.next().getPath()),
+                HistoryEventProto.PARSER);
+            try {
+              message = (HistoryEventProto) reader.readEvent();
+              return message != null;
+            } catch (java.io.EOFException e2) {
+              return false;
+            }
+          }
+        }
+      }
+
+      @Override
+      public void close() {
+        try {
+          reader.close();
+        } catch (IOException e) {
+          LOG.warn("error while closing ProtoMessageReader", e);
+        }
+      }
+    };
+    return source;
+  }
+}
\ No newline at end of file
diff --git 
a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/SimpleHistoryParser.java
 
b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/SimpleHistoryParser.java
index 3516de7..db3f648 100644
--- 
a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/SimpleHistoryParser.java
+++ 
b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/SimpleHistoryParser.java
@@ -17,9 +17,16 @@
  */
 package org.apache.tez.history.parser;
 
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Scanner;
+
+import org.apache.tez.common.ATSConstants;
 import org.apache.tez.common.Preconditions;
-import com.google.common.base.Strings;
-import com.google.common.collect.Maps;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.history.logging.impl.SimpleHistoryLoggingService;
 import org.apache.tez.dag.records.TezDAGID;
@@ -38,33 +45,34 @@ import org.codehaus.jettison.json.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Scanner;
+import com.google.common.base.Strings;
+import com.google.common.collect.Maps;
 
 /**
- * Parser utility to parse data generated by SimpleHistoryLogging to in-memory 
datamodel provided
- * in org.apache.tez.history.parser.datamodel
+ * Parser utility to parse data generated by SimpleHistoryLogging to in-memory 
datamodel provided in
+ * org.apache.tez.history.parser.datamodel.
  * <p/>
  * <p/>
- * Most of the information should be available. Minor info like VersionInfo 
may not be available,
- * as it is not captured in SimpleHistoryLogging.
+ * Most of the information should be available. Minor info like VersionInfo 
may not be available, as
+ * it is not captured in SimpleHistoryLogging.
  */
 public class SimpleHistoryParser extends BaseParser {
   private static final Logger LOG = 
LoggerFactory.getLogger(SimpleHistoryParser.class);
-  private static final String UTF8 = "UTF-8";
+  protected static final String UTF8 = "UTF-8";
   private final File historyFile;
 
-
-  public SimpleHistoryParser(File historyFile) {
+  public SimpleHistoryParser(List<File> files) {
     super();
-    Preconditions.checkArgument(historyFile.exists(), historyFile + " does not 
exist");
-    this.historyFile = historyFile;
+    Preconditions.checkArgument(checkFiles(files), files + " are empty or they 
don't exist");
+    this.historyFile = files.get(0); //doesn't support multiple files at the 
moment
   }
 
+  protected interface JSONObjectSource {
+    boolean hasNext() throws IOException;
+    JSONObject next() throws JSONException;
+    void close();
+  };
+
   /**
    * Get in-memory representation of DagInfo
    *
@@ -77,13 +85,11 @@ public class SimpleHistoryParser extends BaseParser {
       dagId = dagId.trim();
       parseContents(historyFile, dagId);
       linkParsedContents();
+      addRawDataToDagInfo();
       return dagInfo;
-    } catch (IOException e) {
+    } catch (IOException | JSONException e) {
       LOG.error("Error in reading DAG ", e);
       throw new TezException(e);
-    } catch (JSONException e) {
-      LOG.error("Error in parsing DAG ", e);
-      throw new TezException(e);
     }
   }
 
@@ -106,18 +112,117 @@ public class SimpleHistoryParser extends BaseParser {
   }
 
   private void parseContents(File historyFile, String dagId)
-      throws JSONException, FileNotFoundException, TezException {
-    Scanner scanner = new Scanner(historyFile, UTF8);
+      throws JSONException, FileNotFoundException, TezException, IOException {
+    JSONObjectSource source = getJsonSource();
+
+    parse(dagId, source);
+  }
+
+  private JSONObjectSource getJsonSource() throws FileNotFoundException {
+    final Scanner scanner = new Scanner(historyFile, UTF8);
     scanner.useDelimiter(SimpleHistoryLoggingService.RECORD_SEPARATOR);
-    JSONObject dagJson = null;
+
+    JSONObjectSource source = new JSONObjectSource() {
+      @Override
+      public JSONObject next() throws JSONException {
+        String line = scanner.next();
+        return new JSONObject(line);
+      }
+
+      @Override
+      public boolean hasNext() throws IOException {
+        return scanner.hasNext();
+      }
+
+      @Override
+      public void close() {
+        scanner.close();
+      }
+    };
+    return source;
+  }
+
+  protected void parse(String dagId, JSONObjectSource source)
+      throws JSONException, TezException, IOException {
     Map<String, JSONObject> vertexJsonMap = Maps.newHashMap();
     Map<String, JSONObject> taskJsonMap = Maps.newHashMap();
     Map<String, JSONObject> attemptJsonMap = Maps.newHashMap();
+
+    readEventsFromSource(dagId, source, vertexJsonMap, taskJsonMap, 
attemptJsonMap);
+    postProcessMaps(vertexJsonMap, taskJsonMap, attemptJsonMap);
+  }
+
+  protected void postProcessMaps(Map<String, JSONObject> vertexJsonMap,
+      Map<String, JSONObject> taskJsonMap, Map<String, JSONObject> 
attemptJsonMap)
+      throws JSONException {
+    for (JSONObject jsonObject : vertexJsonMap.values()) {
+      VertexInfo vertexInfo = VertexInfo.create(jsonObject);
+      this.vertexList.add(vertexInfo);
+      LOG.debug("Parsed vertex {}", vertexInfo.getVertexName());
+    }
+    for (JSONObject jsonObject : taskJsonMap.values()) {
+      TaskInfo taskInfo = TaskInfo.create(jsonObject);
+      this.taskList.add(taskInfo);
+      LOG.debug("Parsed task {}", taskInfo.getTaskId());
+    }
+    for (JSONObject jsonObject : attemptJsonMap.values()) {
+      /**
+       * For converting SimpleHistoryLogging to in-memory representation
+       *
+       * We need to get 
"relatedEntities":[{"entity":"cn055-10.l42scl.hortonworks.com:58690",
+       * 
"entitytype":"nodeId"},{"entity":"container_1438652049951_0008_01_000152",
+       * "entitytype":"containerId"} and populate it in otherInfo object so 
that in-memory
+       * representation can parse it correctly
+       */
+      JSONArray relatedEntities = 
jsonObject.optJSONArray(Constants.RELATED_ENTITIES);
+      if (relatedEntities == null) {
+        //This can happen when CONTAINER_EXITED abruptly. (e.g Container 
failed, exitCode=1)
+        LOG.debug("entity {} did not have related entities",
+            jsonObject.optJSONObject(Constants.ENTITY));
+      } else {
+        JSONObject subJsonObject = relatedEntities.optJSONObject(0);
+        if (subJsonObject != null) {
+          String nodeId = subJsonObject.optString(Constants.ENTITY_TYPE);
+          if (!Strings.isNullOrEmpty(nodeId) && 
nodeId.equalsIgnoreCase(Constants.NODE_ID)) {
+            //populate it in otherInfo
+            JSONObject otherInfo = 
jsonObject.optJSONObject(Constants.OTHER_INFO);
+            String nodeIdVal = subJsonObject.optString(Constants.ENTITY);
+            if (otherInfo != null && nodeIdVal != null) {
+              otherInfo.put(Constants.NODE_ID, nodeIdVal);
+            }
+          }
+        }
+
+        subJsonObject = relatedEntities.optJSONObject(1);
+        if (subJsonObject != null) {
+          String containerId = subJsonObject.optString(Constants.ENTITY_TYPE);
+          if (!Strings.isNullOrEmpty(containerId) && containerId
+              .equalsIgnoreCase(Constants.CONTAINER_ID)) {
+            //populate it in otherInfo
+            JSONObject otherInfo = 
jsonObject.optJSONObject(Constants.OTHER_INFO);
+            String containerIdVal = subJsonObject.optString(Constants.ENTITY);
+            if (otherInfo != null && containerIdVal != null) {
+              otherInfo.put(Constants.CONTAINER_ID, containerIdVal);
+            }
+          }
+        }
+      }
+      TaskAttemptInfo attemptInfo = TaskAttemptInfo.create(jsonObject);
+      this.attemptList.add(attemptInfo);
+      LOG.debug("Parsed task attempt {}", attemptInfo.getTaskAttemptId());
+    }
+  }
+
+  protected void readEventsFromSource(String dagId, JSONObjectSource source,
+      Map<String, JSONObject> vertexJsonMap, Map<String, JSONObject> 
taskJsonMap,
+      Map<String, JSONObject> attemptJsonMap) throws JSONException, 
TezException, IOException{
+    JSONObject dagJson = null;
     TezDAGID tezDAGID = TezDAGID.fromString(dagId);
     String userName = null;
-    while (scanner.hasNext()) {
-      String line = scanner.next();
-      JSONObject jsonObject = new JSONObject(line);
+
+    while (source.hasNext()) {
+      JSONObject jsonObject = source.next();
+
       String entity = jsonObject.getString(Constants.ENTITY);
       String entityType = jsonObject.getString(Constants.ENTITY_TYPE);
       switch (entityType) {
@@ -131,9 +236,12 @@ public class SimpleHistoryParser extends BaseParser {
         // time etc).
         if (dagJson == null) {
           dagJson = jsonObject;
+        } else if (dagJson.optJSONObject(ATSConstants.OTHER_INFO)
+            .optJSONObject(ATSConstants.DAG_PLAN) == null) {
+          // if DAG_PLAN is not filled already, let's try to fetch it from 
other
+          
dagJson.getJSONObject(ATSConstants.OTHER_INFO).put(ATSConstants.DAG_PLAN, 
jsonObject
+              
.getJSONObject(ATSConstants.OTHER_INFO).getJSONObject(ATSConstants.DAG_PLAN));
         }
-        JSONObject otherInfo = jsonObject.optJSONObject(Constants.OTHER_INFO);
-        JSONObject dagOtherInfo = dagJson.getJSONObject(Constants.OTHER_INFO);
         JSONArray relatedEntities = dagJson.optJSONArray(Constants
             .RELATED_ENTITIES);
         //UserName is present in related entities
@@ -148,52 +256,52 @@ public class SimpleHistoryParser extends BaseParser {
             }
           }
         }
-        populateOtherInfo(otherInfo, dagOtherInfo);
+        populateOtherInfo(jsonObject.optJSONObject(Constants.OTHER_INFO),
+            dagJson.getJSONObject(Constants.OTHER_INFO));
         break;
       case Constants.TEZ_VERTEX_ID:
         String vertexName = entity;
         TezVertexID tezVertexID = TezVertexID.fromString(vertexName);
         if (!tezDAGID.equals(tezVertexID.getDAGId())) {
-          LOG.warn(vertexName + " does not belong to " + tezDAGID);
+          LOG.warn("{} does not belong to {} ('{}' != '{}')}", vertexName, 
tezDAGID, tezDAGID, tezVertexID.getDAGId());
           continue;
         }
         if (!vertexJsonMap.containsKey(vertexName)) {
           vertexJsonMap.put(vertexName, jsonObject);
         }
-        otherInfo = jsonObject.optJSONObject(Constants.OTHER_INFO);
-        populateOtherInfo(otherInfo, vertexName, vertexJsonMap);
+        populateOtherInfo(jsonObject.optJSONObject(Constants.OTHER_INFO), 
vertexName, vertexJsonMap);
         break;
       case Constants.TEZ_TASK_ID:
         String taskName = entity;
         TezTaskID tezTaskID = TezTaskID.fromString(taskName);
         if (!tezDAGID.equals(tezTaskID.getVertexID().getDAGId())) {
-          LOG.warn(taskName + " does not belong to " + tezDAGID);
+          LOG.warn("{} does not belong to {} ('{}' != '{}')}", taskName, 
tezDAGID, tezDAGID,
+              tezTaskID.getVertexID().getDAGId());
           continue;
         }
         if (!taskJsonMap.containsKey(taskName)) {
           taskJsonMap.put(taskName, jsonObject);
         }
-        otherInfo = jsonObject.optJSONObject(Constants.OTHER_INFO);
-        populateOtherInfo(otherInfo, taskName, taskJsonMap);
+        populateOtherInfo(jsonObject.optJSONObject(Constants.OTHER_INFO), 
taskName, taskJsonMap);
         break;
       case Constants.TEZ_TASK_ATTEMPT_ID:
         String taskAttemptName = entity;
         TezTaskAttemptID tezAttemptId = 
TezTaskAttemptID.fromString(taskAttemptName);
         if 
(!tezDAGID.equals(tezAttemptId.getTaskID().getVertexID().getDAGId())) {
-          LOG.warn(taskAttemptName + " does not belong to " + tezDAGID);
+          LOG.warn("{} does not belong to {} ('{}' != '{}')}", 
taskAttemptName, tezDAGID, tezDAGID,
+              tezAttemptId.getTaskID().getVertexID().getDAGId());
           continue;
         }
         if (!attemptJsonMap.containsKey(taskAttemptName)) {
           attemptJsonMap.put(taskAttemptName, jsonObject);
         }
-        otherInfo = jsonObject.optJSONObject(Constants.OTHER_INFO);
-        populateOtherInfo(otherInfo, taskAttemptName, attemptJsonMap);
+        populateOtherInfo(jsonObject.optJSONObject(Constants.OTHER_INFO), 
taskAttemptName, attemptJsonMap);
         break;
       default:
         break;
       }
     }
-    scanner.close();
+    source.close();
     if (dagJson != null) {
       this.dagInfo = DagInfo.create(dagJson);
       setUserName(userName);
@@ -202,61 +310,5 @@ public class SimpleHistoryParser extends BaseParser {
       throw new TezException(
           "Please provide a valid/complete history log file containing " + 
dagId);
     }
-    for (JSONObject jsonObject : vertexJsonMap.values()) {
-      VertexInfo vertexInfo = VertexInfo.create(jsonObject);
-      this.vertexList.add(vertexInfo);
-      LOG.debug("Parsed vertex {}", vertexInfo.getVertexName());
-    }
-    for (JSONObject jsonObject : taskJsonMap.values()) {
-      TaskInfo taskInfo = TaskInfo.create(jsonObject);
-      this.taskList.add(taskInfo);
-      LOG.debug("Parsed task {}", taskInfo.getTaskId());
-    }
-    for (JSONObject jsonObject : attemptJsonMap.values()) {
-      /**
-       * For converting SimpleHistoryLogging to in-memory representation
-       *
-       * We need to get 
"relatedEntities":[{"entity":"cn055-10.l42scl.hortonworks.com:58690",
-       * 
"entitytype":"nodeId"},{"entity":"container_1438652049951_0008_01_000152",
-       * "entitytype":"containerId"} and populate it in otherInfo object so 
that in-memory
-       * representation can parse it correctly
-       */
-      JSONArray relatedEntities = 
jsonObject.optJSONArray(Constants.RELATED_ENTITIES);
-      if (relatedEntities == null) {
-        //This can happen when CONTAINER_EXITED abruptly. (e.g Container 
failed, exitCode=1)
-        LOG.debug("entity {} did not have related entities",
-            jsonObject.optJSONObject(Constants.ENTITY));
-      } else {
-        JSONObject subJsonObject = relatedEntities.optJSONObject(0);
-        if (subJsonObject != null) {
-          String nodeId = subJsonObject.optString(Constants.ENTITY_TYPE);
-          if (!Strings.isNullOrEmpty(nodeId) && 
nodeId.equalsIgnoreCase(Constants.NODE_ID)) {
-            //populate it in otherInfo
-            JSONObject otherInfo = 
jsonObject.optJSONObject(Constants.OTHER_INFO);
-            String nodeIdVal = subJsonObject.optString(Constants.ENTITY);
-            if (otherInfo != null && nodeIdVal != null) {
-              otherInfo.put(Constants.NODE_ID, nodeIdVal);
-            }
-          }
-        }
-
-        subJsonObject = relatedEntities.optJSONObject(1);
-        if (subJsonObject != null) {
-          String containerId = subJsonObject.optString(Constants.ENTITY_TYPE);
-          if (!Strings.isNullOrEmpty(containerId) && containerId
-              .equalsIgnoreCase(Constants.CONTAINER_ID)) {
-            //populate it in otherInfo
-            JSONObject otherInfo = 
jsonObject.optJSONObject(Constants.OTHER_INFO);
-            String containerIdVal = subJsonObject.optString(Constants.ENTITY);
-            if (otherInfo != null && containerIdVal != null) {
-              otherInfo.put(Constants.CONTAINER_ID, containerIdVal);
-            }
-          }
-        }
-      }
-      TaskAttemptInfo attemptInfo = TaskAttemptInfo.create(jsonObject);
-      this.attemptList.add(attemptInfo);
-      LOG.debug("Parsed task attempt {}", attemptInfo.getTaskAttemptId());
-    }
   }
 }
\ No newline at end of file
diff --git 
a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/BaseParser.java
 
b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/BaseParser.java
index 59ec03d..af8e292 100644
--- 
a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/BaseParser.java
+++ 
b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/BaseParser.java
@@ -25,6 +25,7 @@ import com.google.common.collect.Lists;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 
+import java.io.File;
 import java.util.List;
 import java.util.Map;
 
@@ -44,6 +45,26 @@ public abstract class BaseParser {
     attemptList = Lists.newLinkedList();
   }
 
+
+  protected boolean checkFiles(List<File> files) {
+    if (files.isEmpty()) {
+      return false;
+    }
+    for (File file : files) {
+      if (!file.exists()) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+
+  protected void addRawDataToDagInfo() {
+    dagInfo.addMeta("vertices", vertexList);
+    dagInfo.addMeta("tasks", taskList);
+    dagInfo.addMeta("taskAttempts", attemptList);
+  }
+
   /**
    * link the parsed contents, so that it becomes easier to iterate from 
DAG-->Task and Task--DAG.
    * e.g Link vertex to dag, task to vertex, attempt to task etc
diff --git 
a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/DagInfo.java
 
b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/DagInfo.java
index 64ddcf8..85fcfcf 100644
--- 
a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/DagInfo.java
+++ 
b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/DagInfo.java
@@ -44,6 +44,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -85,6 +86,8 @@ public class DagInfo extends BaseInfo {
   private Multimap<Container, TaskAttemptInfo> containerMapping;
   private Map<String, String> config;
 
+  private Map<String, Object> meta = new HashMap<String, Object>();
+
   DagInfo(JSONObject jsonObject) throws JSONException {
     super(jsonObject);
 
@@ -168,6 +171,10 @@ public class DagInfo extends BaseInfo {
     return dagInfo;
   }
 
+  public void addMeta(String key, Object value) {
+    meta.put(key, value);
+  }
+
   private void parseDAGPlan(JSONObject dagPlan) throws JSONException {
     int version = dagPlan.optInt(Constants.VERSION, 1);
     parseEdges(dagPlan.optJSONArray(Constants.EDGES));
@@ -320,7 +327,7 @@ public class DagInfo extends BaseInfo {
     BasicVertexInfo basicVertexInfo = 
basicVertexInfoMap.get(vertexInfo.getVertexName());
 
     Preconditions.checkArgument(basicVertexInfo != null,
-        "VerteName " + vertexInfo.getVertexName()
+        "VertexName " + vertexInfo.getVertexName()
             + " not present in DAG's vertices " + 
basicVertexInfoMap.entrySet());
 
     //populate additional information in VertexInfo
@@ -387,6 +394,19 @@ public class DagInfo extends BaseInfo {
     return sb.toString();
   }
 
+  public String toExtendedString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append(toString());
+
+    try {
+      sb.append("\nmeta=").append(new JSONObject(meta).toString(3));
+    } catch (JSONException e) {
+      throw new RuntimeException(e);
+    }
+
+    return sb.toString();
+  }
+
   public Multimap<Container, TaskAttemptInfo> getContainerMapping() {
     return Multimaps.unmodifiableMultimap(containerMapping);
   }
@@ -630,5 +650,4 @@ public class DagInfo extends BaseInfo {
   final void setUserName(String userName) {
     this.userName = userName;
   }
-
 }
diff --git 
a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java
 
b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java
index 3f39310..3ce39bd 100644
--- 
a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java
+++ 
b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java
@@ -405,7 +405,7 @@ public class TaskAttemptInfo extends BaseInfo {
     sb.append("container=").append(getContainer()).append(", ");
     sb.append("nodeId=").append(getNodeId()).append(", ");
     sb.append("logURL=").append(getLogURL()).append(", ");
-    sb.append("status=").append(getStatus());
+    sb.append("status=").append(getDetailedStatus());
     sb.append("]");
     return sb.toString();
   }
diff --git 
a/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestHistoryParser.java
 
b/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestHistoryParser.java
index 92c4ad8..8a05465 100644
--- 
a/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestHistoryParser.java
+++ 
b/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestHistoryParser.java
@@ -88,6 +88,7 @@ import java.io.BufferedWriter;
 import java.io.File;
 import java.io.IOException;
 import java.io.OutputStreamWriter;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
@@ -239,7 +240,7 @@ public class TestHistoryParser {
     File localFile = new File(DOWNLOAD_DIR, HISTORY_TXT);
 
     //Now parse via SimpleHistory
-    SimpleHistoryParser parser = new SimpleHistoryParser(localFile);
+    SimpleHistoryParser parser = new 
SimpleHistoryParser(Arrays.asList(localFile));
     DagInfo dagInfo = parser.getDAGData(dagId);
     assertTrue(dagInfo.getDagId().equals(dagId));
     return dagInfo;
@@ -603,7 +604,7 @@ public class TestHistoryParser {
     //Parse downloaded contents
     File downloadedFile = new File(DOWNLOAD_DIR
         + Path.SEPARATOR + dagId + ".zip");
-    ATSFileParser parser = new ATSFileParser(downloadedFile);
+    ATSFileParser parser = new ATSFileParser(Arrays.asList(downloadedFile));
     DagInfo dagInfo = parser.getDAGData(dagId);
     assertTrue(dagInfo.getDagId().equals(dagId));
     return dagInfo;
diff --git 
a/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/HistoryEventProtoConverter.java
 
b/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/HistoryEventProtoConverter.java
index 44dccb6..09079bd 100644
--- 
a/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/HistoryEventProtoConverter.java
+++ 
b/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/HistoryEventProtoConverter.java
@@ -351,11 +351,14 @@ public class HistoryEventProtoConverter {
     addEventData(builder, ATSConstants.CREATION_TIME, event.getCreationTime());
     addEventData(builder, ATSConstants.ALLOCATION_TIME, 
event.getAllocationTime());
     addEventData(builder, ATSConstants.START_TIME, event.getStartTime());
+
     if (event.getCreationCausalTA() != null) {
       addEventData(builder, ATSConstants.CREATION_CAUSAL_ATTEMPT,
           event.getCreationCausalTA().toString());
     }
     addEventData(builder, ATSConstants.TIME_TAKEN, (event.getFinishTime() - 
event.getStartTime()));
+    addEventData(builder, ATSConstants.STATUS, event.getState().name());
+
     if (event.getTaskAttemptError() != null) {
       addEventData(builder, ATSConstants.TASK_ATTEMPT_ERROR_ENUM,
           event.getTaskAttemptError().name());
@@ -447,6 +450,7 @@ public class HistoryEventProtoConverter {
         null, null, null, event.getVertexID(), null, null, null);
     addEventData(builder, ATSConstants.VERTEX_NAME, event.getVertexName());
     addEventData(builder, ATSConstants.INIT_REQUESTED_TIME, 
event.getInitRequestedTime());
+    addEventData(builder, ATSConstants.INIT_TIME, event.getInitedTime());
     addEventData(builder, ATSConstants.NUM_TASKS, event.getNumTasks());
     addEventData(builder, ATSConstants.PROCESSOR_CLASS_NAME, 
event.getProcessorName());
     if (event.getServicePluginInfo() != null) {
diff --git 
a/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/HistoryEventProtoJsonConversion.java
 
b/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/HistoryEventProtoJsonConversion.java
new file mode 100644
index 0000000..26e20ab
--- /dev/null
+++ 
b/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/HistoryEventProtoJsonConversion.java
@@ -0,0 +1,766 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.logging.proto;
+
+import java.util.Iterator;
+import java.util.Optional;
+
+import org.apache.tez.common.ATSConstants;
+import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.history.logging.EntityTypes;
+import 
org.apache.tez.dag.history.logging.proto.HistoryLoggerProtos.HistoryEventProto;
+import org.apache.tez.dag.history.logging.proto.HistoryLoggerProtos.KVPair;
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+
+/**
+ * Convert HistoryEventProto into JSONObject for analyzers, which can already 
consume the output of
+ * SimpleHistoryLoggingService's JSONs. This class is based on 
HistoryEventJsonConversion, and all
+ * of the specific HistoryEvent calls were transformed info HistoryEventProto 
calls by taking the
+ * corresponding HistoryEventProtoConverter methods into consideration.
+ */
+public final class HistoryEventProtoJsonConversion {
+
+  private HistoryEventProtoJsonConversion() {
+  }
+
+  public static JSONObject convertToJson(HistoryEventProto historyEvent) 
throws JSONException {
+    JSONObject jsonObject = null;
+
+    switch (historyEvent.getEventType()) {
+    case "APP_LAUNCHED":
+      jsonObject = convertAppLaunchedEvent(historyEvent);
+      break;
+    case "AM_LAUNCHED":
+      jsonObject = convertAMLaunchedEvent(historyEvent);
+      break;
+    case "AM_STARTED":
+      jsonObject = convertAMStartedEvent(historyEvent);
+      break;
+    case "CONTAINER_LAUNCHED":
+      jsonObject = convertContainerLaunchedEvent(historyEvent);
+      break;
+    case "CONTAINER_STOPPED":
+      jsonObject = convertContainerStoppedEvent(historyEvent);
+      break;
+    case "DAG_SUBMITTED":
+      jsonObject = convertDAGSubmittedEvent(historyEvent);
+      break;
+    case "DAG_INITIALIZED":
+      jsonObject = convertDAGInitializedEvent(historyEvent);
+      break;
+    case "DAG_STARTED":
+      jsonObject = convertDAGStartedEvent(historyEvent);
+      break;
+    case "DAG_FINISHED":
+      jsonObject = convertDAGFinishedEvent(historyEvent);
+      break;
+    case "VERTEX_INITIALIZED":
+      jsonObject = convertVertexInitializedEvent(historyEvent);
+      break;
+    case "VERTEX_STARTED":
+      jsonObject = convertVertexStartedEvent(historyEvent);
+      break;
+    case "VERTEX_FINISHED":
+      jsonObject = convertVertexFinishedEvent(historyEvent);
+      break;
+    case "TASK_STARTED":
+      jsonObject = convertTaskStartedEvent(historyEvent);
+      break;
+    case "TASK_FINISHED":
+      jsonObject = convertTaskFinishedEvent(historyEvent);
+      break;
+    case "TASK_ATTEMPT_STARTED":
+      jsonObject = convertTaskAttemptStartedEvent(historyEvent);
+      break;
+    case "TASK_ATTEMPT_FINISHED":
+      jsonObject = convertTaskAttemptFinishedEvent(historyEvent);
+      break;
+    case "VERTEX_CONFIGURE_DONE":
+      jsonObject = convertVertexReconfigureDoneEvent(historyEvent);
+      break;
+    case "DAG_RECOVERED":
+      jsonObject = convertDAGRecoveredEvent(historyEvent);
+      break;
+    case "VERTEX_COMMIT_STARTED":
+    case "VERTEX_GROUP_COMMIT_STARTED":
+    case "VERTEX_GROUP_COMMIT_FINISHED":
+    case "DAG_COMMIT_STARTED":
+      throw new UnsupportedOperationException(
+          "Invalid Event, does not support history" + ", eventType=" + 
historyEvent.getEventType());
+    default:
+      throw new UnsupportedOperationException(
+          "Unhandled Event" + ", eventType=" + historyEvent.getEventType());
+    }
+    return jsonObject;
+  }
+
+  private static JSONObject convertDAGRecoveredEvent(HistoryEventProto event) 
throws JSONException {
+    JSONObject jsonObject = new JSONObject();
+    jsonObject.put(ATSConstants.ENTITY, event.getDagId());
+    jsonObject.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_DAG_ID.name());
+
+    // Related Entities not needed as should have been done in
+    // dag submission event
+
+    JSONArray events = new JSONArray();
+    JSONObject recoverEvent = new JSONObject();
+    recoverEvent.put(ATSConstants.TIMESTAMP, event.getEventTime());
+    recoverEvent.put(ATSConstants.EVENT_TYPE, 
HistoryEventType.DAG_RECOVERED.name());
+
+    JSONObject recoverEventInfo = new JSONObject();
+    recoverEventInfo.put(ATSConstants.APPLICATION_ATTEMPT_ID, 
event.getAppAttemptId().toString());
+    recoverEventInfo.put(ATSConstants.DAG_STATE, getDataValueByKey(event, 
ATSConstants.DAG_STATE));
+    recoverEventInfo.put(ATSConstants.RECOVERY_FAILURE_REASON,
+        getDataValueByKey(event, ATSConstants.RECOVERY_FAILURE_REASON));
+
+    recoverEvent.put(ATSConstants.EVENT_INFO, recoverEventInfo);
+    events.put(recoverEvent);
+
+    jsonObject.put(ATSConstants.EVENTS, events);
+
+    return jsonObject;
+  }
+
+  private static JSONObject convertAppLaunchedEvent(HistoryEventProto event) 
throws JSONException {
+    JSONObject jsonObject = new JSONObject();
+    jsonObject.put(ATSConstants.ENTITY, "tez_" + event.getAppId().toString());
+    jsonObject.put(ATSConstants.ENTITY_TYPE, 
EntityTypes.TEZ_APPLICATION.name());
+
+    // Other info to tag with Tez App
+    JSONObject otherInfo = new JSONObject();
+    otherInfo.put(ATSConstants.USER, event.getUser());
+    otherInfo.put(ATSConstants.CONFIG, new JSONObject()); // TODO: config from 
proto?
+
+    jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
+
+    return jsonObject;
+  }
+
+  private static JSONObject convertAMLaunchedEvent(HistoryEventProto event) 
throws JSONException {
+    JSONObject jsonObject = new JSONObject();
+    jsonObject.put(ATSConstants.ENTITY, "tez_" + 
event.getAppAttemptId().toString());
+    jsonObject.put(ATSConstants.ENTITY_TYPE, 
EntityTypes.TEZ_APPLICATION_ATTEMPT.name());
+
+    // Related Entities
+    JSONArray relatedEntities = new JSONArray();
+    JSONObject appEntity = new JSONObject();
+    appEntity.put(ATSConstants.ENTITY, event.getAppId().toString());
+    appEntity.put(ATSConstants.ENTITY_TYPE, ATSConstants.APPLICATION_ID);
+    JSONObject appAttemptEntity = new JSONObject();
+    appAttemptEntity.put(ATSConstants.ENTITY, 
event.getAppAttemptId().toString());
+    appAttemptEntity.put(ATSConstants.ENTITY_TYPE, 
ATSConstants.APPLICATION_ATTEMPT_ID);
+    relatedEntities.put(appEntity);
+    relatedEntities.put(appAttemptEntity);
+    jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
+
+    // TODO decide whether this goes into different events,
+    // event info or other info.
+    JSONArray events = new JSONArray();
+    JSONObject initEvent = new JSONObject();
+    initEvent.put(ATSConstants.TIMESTAMP, event.getEventTime());
+    initEvent.put(ATSConstants.EVENT_TYPE, 
HistoryEventType.AM_LAUNCHED.name());
+    events.put(initEvent);
+    jsonObject.put(ATSConstants.EVENTS, events);
+
+    // Other info to tag with Tez AM
+    JSONObject otherInfo = new JSONObject();
+    otherInfo.put(ATSConstants.APP_SUBMIT_TIME,
+        getDataValueByKey(event, ATSConstants.APP_SUBMIT_TIME));
+    jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
+
+    return jsonObject;
+  }
+
+  private static JSONObject convertAMStartedEvent(HistoryEventProto event) 
throws JSONException {
+    JSONObject jsonObject = new JSONObject();
+    jsonObject.put(ATSConstants.ENTITY, "tez_" + 
event.getAppAttemptId().toString());
+    jsonObject.put(ATSConstants.ENTITY_TYPE, 
EntityTypes.TEZ_APPLICATION_ATTEMPT.name());
+
+    // Related Entities
+    JSONArray relatedEntities = new JSONArray();
+    JSONObject appEntity = new JSONObject();
+    appEntity.put(ATSConstants.ENTITY, event.getAppId().toString());
+    appEntity.put(ATSConstants.ENTITY_TYPE, ATSConstants.APPLICATION_ID);
+    JSONObject appAttemptEntity = new JSONObject();
+    appAttemptEntity.put(ATSConstants.ENTITY, 
event.getAppAttemptId().toString());
+    appAttemptEntity.put(ATSConstants.ENTITY_TYPE, 
ATSConstants.APPLICATION_ATTEMPT_ID);
+    relatedEntities.put(appEntity);
+    relatedEntities.put(appAttemptEntity);
+    jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
+
+    // TODO decide whether this goes into different events,
+    // event info or other info.
+    JSONArray events = new JSONArray();
+    JSONObject startEvent = new JSONObject();
+    startEvent.put(ATSConstants.TIMESTAMP, event.getEventTime());
+    startEvent.put(ATSConstants.EVENT_TYPE, 
HistoryEventType.AM_STARTED.name());
+    events.put(startEvent);
+    jsonObject.put(ATSConstants.EVENTS, events);
+
+    return jsonObject;
+  }
+
+  private static JSONObject convertContainerLaunchedEvent(HistoryEventProto 
event)
+      throws JSONException {
+    JSONObject jsonObject = new JSONObject();
+    jsonObject.put(ATSConstants.ENTITY,
+        "tez_" + getDataValueByKey(event, ATSConstants.CONTAINER_ID));
+    jsonObject.put(ATSConstants.ENTITY_TYPE, 
EntityTypes.TEZ_CONTAINER_ID.name());
+
+    JSONArray relatedEntities = new JSONArray();
+    JSONObject appAttemptEntity = new JSONObject();
+    appAttemptEntity.put(ATSConstants.ENTITY, 
event.getAppAttemptId().toString());
+    appAttemptEntity.put(ATSConstants.ENTITY_TYPE, 
EntityTypes.TEZ_APPLICATION_ATTEMPT.name());
+
+    JSONObject containerEntity = new JSONObject();
+    containerEntity.put(ATSConstants.ENTITY, getDataValueByKey(event, 
ATSConstants.CONTAINER_ID));
+    containerEntity.put(ATSConstants.ENTITY_TYPE, ATSConstants.CONTAINER_ID);
+
+    relatedEntities.put(appAttemptEntity);
+    relatedEntities.put(containerEntity);
+    jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
+
+    // TODO decide whether this goes into different events,
+    // event info or other info.
+    JSONArray events = new JSONArray();
+    JSONObject launchEvent = new JSONObject();
+    launchEvent.put(ATSConstants.TIMESTAMP, event.getEventTime());
+    launchEvent.put(ATSConstants.EVENT_TYPE, 
HistoryEventType.CONTAINER_LAUNCHED.name());
+    events.put(launchEvent);
+    jsonObject.put(ATSConstants.EVENTS, events);
+
+    // TODO add other container info here? or assume AHS will have this?
+    // TODO container logs?
+
+    return jsonObject;
+  }
+
+  private static JSONObject convertContainerStoppedEvent(HistoryEventProto 
event)
+      throws JSONException {
+    // structure is identical to ContainerLaunchedEvent
+    JSONObject jsonObject = new JSONObject();
+    jsonObject.put(ATSConstants.ENTITY,
+        "tez_" + getDataValueByKey(event, ATSConstants.CONTAINER_ID));
+    jsonObject.put(ATSConstants.ENTITY_TYPE, 
EntityTypes.TEZ_CONTAINER_ID.name());
+
+    JSONArray relatedEntities = new JSONArray();
+    JSONObject appAttemptEntity = new JSONObject();
+    appAttemptEntity.put(ATSConstants.ENTITY, 
event.getAppAttemptId().toString());
+    appAttemptEntity.put(ATSConstants.ENTITY_TYPE, 
EntityTypes.TEZ_APPLICATION_ATTEMPT.name());
+
+    JSONObject containerEntity = new JSONObject();
+    containerEntity.put(ATSConstants.ENTITY, getDataValueByKey(event, 
ATSConstants.CONTAINER_ID));
+    containerEntity.put(ATSConstants.ENTITY_TYPE, ATSConstants.CONTAINER_ID);
+
+    relatedEntities.put(appAttemptEntity);
+    relatedEntities.put(containerEntity);
+    jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
+
+    // TODO decide whether this goes into different events,
+    // event info or other info.
+    JSONArray events = new JSONArray();
+    JSONObject stopEvent = new JSONObject();
+    stopEvent.put(ATSConstants.TIMESTAMP, event.getEventTime());
+    stopEvent.put(ATSConstants.EVENT_TYPE, 
HistoryEventType.CONTAINER_STOPPED.name());
+    events.put(stopEvent);
+    jsonObject.put(ATSConstants.EVENTS, events);
+
+    // TODO add other container info here? or assume AHS will have this?
+    // TODO container logs?
+
+    // Other info
+    JSONObject otherInfo = new JSONObject();
+    otherInfo.put(ATSConstants.EXIT_STATUS, getDataValueByKey(event, 
ATSConstants.EXIT_STATUS));
+    jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
+
+    return jsonObject;
+  }
+
+  private static JSONObject convertDAGFinishedEvent(HistoryEventProto event) 
throws JSONException {
+    JSONObject jsonObject = new JSONObject();
+    jsonObject.put(ATSConstants.ENTITY, event.getDagId());
+    jsonObject.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_DAG_ID.name());
+
+    // Related Entities not needed as should have been done in
+    // dag submission event
+
+    // TODO decide whether this goes into different events,
+    // event info or other info.
+    JSONArray events = new JSONArray();
+    JSONObject finishEvent = new JSONObject();
+    finishEvent.put(ATSConstants.TIMESTAMP, event.getEventTime());
+    finishEvent.put(ATSConstants.EVENT_TYPE, 
HistoryEventType.DAG_FINISHED.name());
+    events.put(finishEvent);
+    jsonObject.put(ATSConstants.EVENTS, events);
+
+    JSONObject otherInfo = new JSONObject();
+
+    long startTime = getLongDataValueByKey(event, ATSConstants.START_TIME);
+
+    otherInfo.put(ATSConstants.START_TIME, startTime);
+    otherInfo.put(ATSConstants.FINISH_TIME, event.getEventTime());
+    otherInfo.put(ATSConstants.TIME_TAKEN, event.getEventTime() - startTime);
+    otherInfo.put(ATSConstants.STATUS, getDataValueByKey(event, 
ATSConstants.STATUS));
+    otherInfo.put(ATSConstants.DIAGNOSTICS, getDataValueByKey(event, 
ATSConstants.DIAGNOSTICS));
+    otherInfo.put(ATSConstants.COUNTERS, getJSONDataValueByKey(event, 
ATSConstants.COUNTERS));
+    otherInfo.put(ATSConstants.COMPLETION_APPLICATION_ATTEMPT_ID,
+        event.getAppAttemptId().toString());
+
+    // added all info to otherInfo in order to cover
+    // all key/value pairs added from event.getDagTaskStats()
+    Iterator<KVPair> it = event.getEventDataList().iterator();
+    while (it.hasNext()) {
+      KVPair pair = it.next();
+      otherInfo.put(pair.getKey(), pair.getValue());
+    }
+
+    jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
+
+    return jsonObject;
+  }
+
+  private static JSONObject convertDAGInitializedEvent(HistoryEventProto event)
+      throws JSONException {
+    JSONObject jsonObject = new JSONObject();
+    jsonObject.put(ATSConstants.ENTITY, event.getDagId());
+    jsonObject.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_DAG_ID.name());
+
+    // Related Entities not needed as should have been done in
+    // dag submission event
+
+    JSONArray events = new JSONArray();
+    JSONObject initEvent = new JSONObject();
+    initEvent.put(ATSConstants.TIMESTAMP, event.getEventTime());
+    initEvent.put(ATSConstants.EVENT_TYPE, 
HistoryEventType.DAG_INITIALIZED.name());
+    events.put(initEvent);
+    jsonObject.put(ATSConstants.EVENTS, events);
+
+    JSONObject otherInfo = new JSONObject();
+    otherInfo.put(ATSConstants.VERTEX_NAME_ID_MAPPING,
+        getJSONDataValueByKey(event, ATSConstants.VERTEX_NAME_ID_MAPPING));
+    jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
+
+    return jsonObject;
+  }
+
+  private static JSONObject convertDAGStartedEvent(HistoryEventProto event) 
throws JSONException {
+    JSONObject jsonObject = new JSONObject();
+    jsonObject.put(ATSConstants.ENTITY, event.getDagId());
+    jsonObject.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_DAG_ID.name());
+
+    // Related Entities not needed as should have been done in
+    // dag submission event
+
+    // TODO decide whether this goes into different events,
+    // event info or other info.
+    JSONArray events = new JSONArray();
+    JSONObject startEvent = new JSONObject();
+    startEvent.put(ATSConstants.TIMESTAMP, event.getEventTime());
+    startEvent.put(ATSConstants.EVENT_TYPE, 
HistoryEventType.DAG_STARTED.name());
+    events.put(startEvent);
+    jsonObject.put(ATSConstants.EVENTS, events);
+
+    return jsonObject;
+  }
+
+  private static JSONObject convertDAGSubmittedEvent(HistoryEventProto event) 
throws JSONException {
+    JSONObject jsonObject = new JSONObject();
+    jsonObject.put(ATSConstants.ENTITY, event.getDagId());
+    jsonObject.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_DAG_ID.name());
+
+    // Related Entities
+    JSONArray relatedEntities = new JSONArray();
+    JSONObject tezAppEntity = new JSONObject();
+    tezAppEntity.put(ATSConstants.ENTITY, "tez_" + 
event.getAppId().toString());
+    tezAppEntity.put(ATSConstants.ENTITY_TYPE, 
EntityTypes.TEZ_APPLICATION.name());
+    JSONObject tezAppAttemptEntity = new JSONObject();
+    tezAppAttemptEntity.put(ATSConstants.ENTITY, "tez_" + 
event.getAppAttemptId().toString());
+    tezAppAttemptEntity.put(ATSConstants.ENTITY_TYPE, 
EntityTypes.TEZ_APPLICATION_ATTEMPT.name());
+    JSONObject appEntity = new JSONObject();
+    appEntity.put(ATSConstants.ENTITY, event.getAppId().toString());
+    appEntity.put(ATSConstants.ENTITY_TYPE, ATSConstants.APPLICATION_ID);
+    JSONObject appAttemptEntity = new JSONObject();
+    appAttemptEntity.put(ATSConstants.ENTITY, 
event.getAppAttemptId().toString());
+    appAttemptEntity.put(ATSConstants.ENTITY_TYPE, 
ATSConstants.APPLICATION_ATTEMPT_ID);
+    JSONObject userEntity = new JSONObject();
+    userEntity.put(ATSConstants.ENTITY, event.getUser());
+    userEntity.put(ATSConstants.ENTITY_TYPE, ATSConstants.USER);
+
+    relatedEntities.put(tezAppEntity);
+    relatedEntities.put(tezAppAttemptEntity);
+    relatedEntities.put(appEntity);
+    relatedEntities.put(appAttemptEntity);
+    relatedEntities.put(userEntity);
+    jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
+
+    // filters
+    JSONObject primaryFilters = new JSONObject();
+    primaryFilters.put(ATSConstants.DAG_NAME, getDataValueByKey(event, 
ATSConstants.DAG_NAME));
+    primaryFilters.put(ATSConstants.CALLER_CONTEXT_ID,
+        getDataValueByKey(event, ATSConstants.CALLER_CONTEXT_ID));
+    primaryFilters.put(ATSConstants.CALLER_CONTEXT_TYPE,
+        getDataValueByKey(event, ATSConstants.CALLER_CONTEXT_TYPE));
+    primaryFilters.put(ATSConstants.DAG_QUEUE_NAME,
+        getDataValueByKey(event, ATSConstants.DAG_QUEUE_NAME));
+
+    jsonObject.put(ATSConstants.PRIMARY_FILTERS, primaryFilters);
+
+    // TODO decide whether this goes into different events,
+    // event info or other info.
+    JSONArray events = new JSONArray();
+    JSONObject submitEvent = new JSONObject();
+    submitEvent.put(ATSConstants.TIMESTAMP, event.getEventTime());
+    submitEvent.put(ATSConstants.EVENT_TYPE, 
HistoryEventType.DAG_SUBMITTED.name());
+    events.put(submitEvent);
+    jsonObject.put(ATSConstants.EVENTS, events);
+
+    // Other info such as dag plan
+    JSONObject otherInfo = new JSONObject();
+    otherInfo.put(ATSConstants.DAG_PLAN, getJSONDataValueByKey(event, 
ATSConstants.DAG_PLAN));
+
+    otherInfo.put(ATSConstants.CALLER_CONTEXT_ID,
+        getDataValueByKey(event, ATSConstants.CALLER_CONTEXT_ID));
+    otherInfo.put(ATSConstants.CALLER_CONTEXT_TYPE,
+        getDataValueByKey(event, ATSConstants.CALLER_CONTEXT_TYPE));
+    otherInfo.put(ATSConstants.DAG_QUEUE_NAME,
+        getDataValueByKey(event, ATSConstants.DAG_QUEUE_NAME));
+
+    jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
+
+    return jsonObject;
+  }
+
+  private static JSONObject convertTaskAttemptFinishedEvent(HistoryEventProto 
event)
+      throws JSONException {
+    JSONObject jsonObject = new JSONObject();
+    jsonObject.put(ATSConstants.ENTITY, event.getTaskAttemptId());
+    jsonObject.put(ATSConstants.ENTITY_TYPE, 
EntityTypes.TEZ_TASK_ATTEMPT_ID.name());
+
+    // Events
+    JSONArray events = new JSONArray();
+    JSONObject finishEvent = new JSONObject();
+    finishEvent.put(ATSConstants.TIMESTAMP, event.getEventTime());
+    finishEvent.put(ATSConstants.EVENT_TYPE, 
HistoryEventType.TASK_ATTEMPT_FINISHED.name());
+    events.put(finishEvent);
+    jsonObject.put(ATSConstants.EVENTS, events);
+
+    JSONObject otherInfo = new JSONObject();
+    long startTime = getLongDataValueByKey(event, ATSConstants.START_TIME);
+
+    otherInfo.put(ATSConstants.CREATION_TIME, getDataValueByKey(event, 
ATSConstants.CREATION_TIME));
+    otherInfo.put(ATSConstants.ALLOCATION_TIME,
+        getDataValueByKey(event, ATSConstants.ALLOCATION_TIME));
+    otherInfo.put(ATSConstants.START_TIME, startTime);
+    otherInfo.put(ATSConstants.FINISH_TIME, event.getEventTime());
+    otherInfo.put(ATSConstants.TIME_TAKEN, event.getEventTime() - startTime);
+
+    otherInfo.put(ATSConstants.CREATION_CAUSAL_ATTEMPT,
+        getDataValueByKey(event, ATSConstants.CREATION_CAUSAL_ATTEMPT));
+    otherInfo.put(ATSConstants.STATUS, getDataValueByKey(event, 
ATSConstants.STATUS));
+
+    otherInfo.put(ATSConstants.STATUS, getDataValueByKey(event, 
ATSConstants.STATUS));
+    otherInfo.put(ATSConstants.TASK_ATTEMPT_ERROR_ENUM,
+        getDataValueByKey(event, ATSConstants.TASK_ATTEMPT_ERROR_ENUM));
+    otherInfo.put(ATSConstants.TASK_FAILURE_TYPE,
+        getDataValueByKey(event, ATSConstants.TASK_FAILURE_TYPE));
+    otherInfo.put(ATSConstants.DIAGNOSTICS, getDataValueByKey(event, 
ATSConstants.DIAGNOSTICS));
+    otherInfo.put(ATSConstants.COUNTERS, getJSONDataValueByKey(event, 
ATSConstants.COUNTERS));
+    otherInfo.put(ATSConstants.LAST_DATA_EVENTS,
+        getJSONDataValueByKey(event, ATSConstants.LAST_DATA_EVENTS));
+    otherInfo.put(ATSConstants.NODE_ID, getDataValueByKey(event, 
ATSConstants.NODE_ID));
+    otherInfo.put(ATSConstants.CONTAINER_ID, getDataValueByKey(event, 
ATSConstants.CONTAINER_ID));
+    otherInfo.put(ATSConstants.IN_PROGRESS_LOGS_URL,
+        getDataValueByKey(event, ATSConstants.IN_PROGRESS_LOGS_URL));
+    otherInfo.put(ATSConstants.COMPLETED_LOGS_URL,
+        getDataValueByKey(event, ATSConstants.COMPLETED_LOGS_URL));
+    otherInfo.put(ATSConstants.NODE_HTTP_ADDRESS,
+        getDataValueByKey(event, ATSConstants.NODE_HTTP_ADDRESS));
+
+    jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
+
+    return jsonObject;
+  }
+
+  private static JSONObject convertTaskAttemptStartedEvent(HistoryEventProto 
event)
+      throws JSONException {
+    JSONObject jsonObject = new JSONObject();
+    jsonObject.put(ATSConstants.ENTITY, event.getTaskAttemptId());
+    jsonObject.put(ATSConstants.ENTITY_TYPE, 
EntityTypes.TEZ_TASK_ATTEMPT_ID.name());
+
+    // Related entities
+    JSONArray relatedEntities = new JSONArray();
+    JSONObject nodeEntity = new JSONObject();
+    nodeEntity.put(ATSConstants.ENTITY, getDataValueByKey(event, 
ATSConstants.NODE_ID));
+    nodeEntity.put(ATSConstants.ENTITY_TYPE, ATSConstants.NODE_ID);
+
+    JSONObject containerEntity = new JSONObject();
+    containerEntity.put(ATSConstants.ENTITY, getDataValueByKey(event, 
ATSConstants.CONTAINER_ID));
+    containerEntity.put(ATSConstants.ENTITY_TYPE, ATSConstants.CONTAINER_ID);
+
+    JSONObject taskEntity = new JSONObject();
+    taskEntity.put(ATSConstants.ENTITY, event.getTaskAttemptId());
+    taskEntity.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_TASK_ID.name());
+
+    relatedEntities.put(nodeEntity);
+    relatedEntities.put(containerEntity);
+    relatedEntities.put(taskEntity);
+    jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
+
+    // Events
+    JSONArray events = new JSONArray();
+    JSONObject startEvent = new JSONObject();
+    startEvent.put(ATSConstants.TIMESTAMP, event.getEventTime());
+    startEvent.put(ATSConstants.EVENT_TYPE, 
HistoryEventType.TASK_ATTEMPT_STARTED.name());
+    events.put(startEvent);
+    jsonObject.put(ATSConstants.EVENTS, events);
+
+    // Other info
+    JSONObject otherInfo = new JSONObject();
+    otherInfo.put(ATSConstants.IN_PROGRESS_LOGS_URL,
+        getDataValueByKey(event, ATSConstants.IN_PROGRESS_LOGS_URL));
+    otherInfo.put(ATSConstants.COMPLETED_LOGS_URL,
+        getDataValueByKey(event, ATSConstants.COMPLETED_LOGS_URL));
+    jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
+
+    return jsonObject;
+  }
+
+  private static JSONObject convertTaskFinishedEvent(HistoryEventProto event) 
throws JSONException {
+    JSONObject jsonObject = new JSONObject();
+    jsonObject.put(ATSConstants.ENTITY, event.getTaskId());
+    jsonObject.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_TASK_ID.name());
+
+    // Events
+    JSONArray events = new JSONArray();
+    JSONObject finishEvent = new JSONObject();
+    finishEvent.put(ATSConstants.TIMESTAMP, event.getEventTime());
+    finishEvent.put(ATSConstants.EVENT_TYPE, 
HistoryEventType.TASK_FINISHED.name());
+    events.put(finishEvent);
+    jsonObject.put(ATSConstants.EVENTS, events);
+
+    long startTime = getLongDataValueByKey(event, ATSConstants.START_TIME);
+
+    JSONObject otherInfo = new JSONObject();
+    otherInfo.put(ATSConstants.START_TIME, startTime);
+    otherInfo.put(ATSConstants.FINISH_TIME, event.getEventTime());
+    otherInfo.put(ATSConstants.TIME_TAKEN, event.getEventTime() - startTime);
+
+    otherInfo.put(ATSConstants.STATUS, getDataValueByKey(event, 
ATSConstants.STATUS));
+    otherInfo.put(ATSConstants.DIAGNOSTICS, getDataValueByKey(event, 
ATSConstants.DIAGNOSTICS));
+    otherInfo.put(ATSConstants.COUNTERS, getJSONDataValueByKey(event, 
ATSConstants.COUNTERS));
+    otherInfo.put(ATSConstants.SUCCESSFUL_ATTEMPT_ID,
+        getDataValueByKey(event, ATSConstants.SUCCESSFUL_ATTEMPT_ID));
+
+    jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
+
+    return jsonObject;
+  }
+
+  private static JSONObject convertTaskStartedEvent(HistoryEventProto event) 
throws JSONException {
+    JSONObject jsonObject = new JSONObject();
+    jsonObject.put(ATSConstants.ENTITY, event.getTaskId());
+    jsonObject.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_TASK_ID.name());
+
+    // Related entities
+    JSONArray relatedEntities = new JSONArray();
+    JSONObject vertexEntity = new JSONObject();
+    vertexEntity.put(ATSConstants.ENTITY, event.getVertexId());
+    vertexEntity.put(ATSConstants.ENTITY_TYPE, 
EntityTypes.TEZ_VERTEX_ID.name());
+    relatedEntities.put(vertexEntity);
+    jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
+
+    // Events
+    JSONArray events = new JSONArray();
+    JSONObject startEvent = new JSONObject();
+    startEvent.put(ATSConstants.TIMESTAMP, event.getEventTime());
+    startEvent.put(ATSConstants.EVENT_TYPE, 
HistoryEventType.TASK_STARTED.name());
+    events.put(startEvent);
+    jsonObject.put(ATSConstants.EVENTS, events);
+
+    // Other info
+    // TODO fix schedule/launch time to be events
+    JSONObject otherInfo = new JSONObject();
+    otherInfo.put(ATSConstants.START_TIME, event.getEventTime());
+    otherInfo.put(ATSConstants.SCHEDULED_TIME,
+        getDataValueByKey(event, ATSConstants.SCHEDULED_TIME));
+    jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
+
+    return jsonObject;
+  }
+
+  private static JSONObject convertVertexFinishedEvent(HistoryEventProto event)
+      throws JSONException {
+    JSONObject jsonObject = new JSONObject();
+    jsonObject.put(ATSConstants.ENTITY, event.getVertexId());
+    jsonObject.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_VERTEX_ID.name());
+
+    // Events
+    JSONArray events = new JSONArray();
+    JSONObject finishEvent = new JSONObject();
+    finishEvent.put(ATSConstants.TIMESTAMP, event.getEventTime());
+    finishEvent.put(ATSConstants.EVENT_TYPE, 
HistoryEventType.VERTEX_FINISHED.name());
+    events.put(finishEvent);
+    jsonObject.put(ATSConstants.EVENTS, events);
+
+    long startTime = getLongDataValueByKey(event, ATSConstants.START_TIME);
+
+    JSONObject otherInfo = new JSONObject();
+    otherInfo.put(ATSConstants.FINISH_TIME, event.getEventTime());
+    otherInfo.put(ATSConstants.TIME_TAKEN, (event.getEventTime() - startTime));
+    otherInfo.put(ATSConstants.STATUS, getDataValueByKey(event, 
ATSConstants.STATUS));
+    otherInfo.put(ATSConstants.DIAGNOSTICS, getDataValueByKey(event, 
ATSConstants.DIAGNOSTICS));
+    otherInfo.put(ATSConstants.COUNTERS, getJSONDataValueByKey(event, 
ATSConstants.COUNTERS));
+
+    otherInfo.put(ATSConstants.STATS, getJSONDataValueByKey(event, 
ATSConstants.STATS));
+
+    // added all info to otherInfo in order to cover
+    // all key/value pairs added from event.getVertexTaskStats()
+    Iterator<KVPair> it = event.getEventDataList().iterator();
+    while (it.hasNext()) {
+      KVPair pair = it.next();
+      otherInfo.put(pair.getKey(), pair.getValue());
+    }
+
+    otherInfo.put(ATSConstants.SERVICE_PLUGIN,
+        getJSONDataValueByKey(event, ATSConstants.SERVICE_PLUGIN));
+
+    jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
+
+    return jsonObject;
+  }
+
+  private static JSONObject 
convertVertexReconfigureDoneEvent(HistoryEventProto event)
+      throws JSONException {
+    JSONObject jsonObject = new JSONObject();
+    jsonObject.put(ATSConstants.ENTITY, event.getVertexId());
+    jsonObject.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_VERTEX_ID.name());
+
+    // Events
+    JSONArray events = new JSONArray();
+    JSONObject updateEvent = new JSONObject();
+    updateEvent.put(ATSConstants.TIMESTAMP, event.getEventTime());
+    updateEvent.put(ATSConstants.EVENT_TYPE, 
HistoryEventType.VERTEX_CONFIGURE_DONE.name());
+
+    JSONObject eventInfo = new JSONObject();
+    eventInfo.put(ATSConstants.NUM_TASKS, getDataValueByKey(event, 
ATSConstants.NUM_TASKS));
+    eventInfo.put(ATSConstants.UPDATED_EDGE_MANAGERS,
+        getJSONDataValueByKey(event, ATSConstants.UPDATED_EDGE_MANAGERS));
+    updateEvent.put(ATSConstants.EVENT_INFO, eventInfo);
+    events.put(updateEvent);
+    jsonObject.put(ATSConstants.EVENTS, events);
+
+    // Other info
+    JSONObject otherInfo = new JSONObject();
+    jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
+
+    // TODO add more on all other updated information
+    return jsonObject;
+  }
+
+  private static JSONObject convertVertexInitializedEvent(HistoryEventProto 
event)
+      throws JSONException {
+    JSONObject jsonObject = new JSONObject();
+    jsonObject.put(ATSConstants.ENTITY, event.getVertexId());
+    jsonObject.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_VERTEX_ID.name());
+
+    // Related entities
+    JSONArray relatedEntities = new JSONArray();
+    JSONObject vertexEntity = new JSONObject();
+    vertexEntity.put(ATSConstants.ENTITY, event.getDagId());
+    vertexEntity.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_DAG_ID.name());
+    relatedEntities.put(vertexEntity);
+    jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
+
+    // Events
+    JSONArray events = new JSONArray();
+    JSONObject initEvent = new JSONObject();
+    initEvent.put(ATSConstants.TIMESTAMP, event.getEventTime());
+    initEvent.put(ATSConstants.EVENT_TYPE, 
HistoryEventType.VERTEX_INITIALIZED.name());
+    events.put(initEvent);
+    jsonObject.put(ATSConstants.EVENTS, events);
+
+    // Other info
+    // TODO fix requested times to be events
+    JSONObject otherInfo = new JSONObject();
+    otherInfo.put(ATSConstants.VERTEX_NAME, getDataValueByKey(event, 
ATSConstants.VERTEX_NAME));
+    otherInfo.put(ATSConstants.INIT_REQUESTED_TIME,
+        getDataValueByKey(event, ATSConstants.INIT_REQUESTED_TIME));
+    otherInfo.put(ATSConstants.INIT_TIME, getDataValueByKey(event, 
ATSConstants.INIT_TIME));
+    otherInfo.put(ATSConstants.NUM_TASKS, getDataValueByKey(event, 
ATSConstants.NUM_TASKS));
+    otherInfo.put(ATSConstants.PROCESSOR_CLASS_NAME,
+        getDataValueByKey(event, ATSConstants.PROCESSOR_CLASS_NAME));
+    otherInfo.put(ATSConstants.SERVICE_PLUGIN,
+        getJSONDataValueByKey(event, ATSConstants.SERVICE_PLUGIN));
+    jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
+
+    return jsonObject;
+  }
+
+  private static JSONObject convertVertexStartedEvent(HistoryEventProto event)
+      throws JSONException {
+    JSONObject jsonObject = new JSONObject();
+    jsonObject.put(ATSConstants.ENTITY, event.getVertexId());
+    jsonObject.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_VERTEX_ID.name());
+
+    // Related entities
+    JSONArray relatedEntities = new JSONArray();
+    JSONObject vertexEntity = new JSONObject();
+    vertexEntity.put(ATSConstants.ENTITY, event.getDagId());
+    vertexEntity.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_DAG_ID.name());
+    relatedEntities.put(vertexEntity);
+    jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
+
+    // Events
+    JSONArray events = new JSONArray();
+    JSONObject startEvent = new JSONObject();
+    startEvent.put(ATSConstants.TIMESTAMP, event.getEventTime());
+    startEvent.put(ATSConstants.EVENT_TYPE, 
HistoryEventType.VERTEX_STARTED.name());
+    events.put(startEvent);
+    jsonObject.put(ATSConstants.EVENTS, events);
+
+    // Other info
+    // TODO fix requested times to be events
+    JSONObject otherInfo = new JSONObject();
+    otherInfo.put(ATSConstants.START_REQUESTED_TIME,
+        getDataValueByKey(event, ATSConstants.START_REQUESTED_TIME));
+    otherInfo.put(ATSConstants.START_TIME, event.getEventTime());
+    jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
+
+    return jsonObject;
+  }
+
+  private static String getDataValueByKey(HistoryEventProto event, String key) 
{
+    Optional<KVPair> pair =
+        event.getEventDataList().stream().filter(p -> 
p.getKey().equals(key)).findAny();
+    return pair.isPresent() ? pair.get().getValue() : null;
+  }
+
+  private static long getLongDataValueByKey(HistoryEventProto event, String 
key) {
+    String value = getDataValueByKey(event, key);
+    return (value == null || value.isEmpty()) ? 0 : Long.parseLong(value);
+  }
+
+  private static JSONObject getJSONDataValueByKey(HistoryEventProto event, 
String key)
+      throws JSONException {
+    String value = getDataValueByKey(event, key);
+    return (value == null || value.isEmpty()) ? null : new JSONObject(value);
+  }
+}
diff --git 
a/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoMessageReader.java
 
b/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoMessageReader.java
index d736fea..2cac4d8 100644
--- 
a/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoMessageReader.java
+++ 
b/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoMessageReader.java
@@ -34,7 +34,7 @@ public class ProtoMessageReader<T extends MessageLite> 
implements Closeable {
   private final Reader reader;
   private final ProtoMessageWritable<T> writable;
 
-  ProtoMessageReader(Configuration conf, Path filePath, Parser<T> parser) 
throws IOException {
+  public ProtoMessageReader(Configuration conf, Path filePath, Parser<T> 
parser) throws IOException {
     this.filePath = filePath;
     // The writer does not flush the length during hflush. Using length 
options lets us read
     // past length in the FileStatus but it will throw EOFException during a 
read instead
diff --git 
a/tez-plugins/tez-protobuf-history-plugin/src/test/java/org/apache/tez/dag/history/logging/proto/TestHistoryEventProtoConverter.java
 
b/tez-plugins/tez-protobuf-history-plugin/src/test/java/org/apache/tez/dag/history/logging/proto/TestHistoryEventProtoConverter.java
index 92d3e30..64f66bc 100644
--- 
a/tez-plugins/tez-protobuf-history-plugin/src/test/java/org/apache/tez/dag/history/logging/proto/TestHistoryEventProtoConverter.java
+++ 
b/tez-plugins/tez-protobuf-history-plugin/src/test/java/org/apache/tez/dag/history/logging/proto/TestHistoryEventProtoConverter.java
@@ -409,7 +409,7 @@ public class TestHistoryEventProtoConverter {
         "logsURL", "nodeHttpAddress");
     HistoryEventProto proto = converter.convert(event);
     assertCommon(proto, HistoryEventType.TASK_ATTEMPT_FINISHED, finishTime,
-        EntityTypes.TEZ_DAG_ID, null, null, 16);
+        EntityTypes.TEZ_DAG_ID, null, null, 17);
 
     assertEventData(proto, ATSConstants.STATUS, state.name());
     assertEventData(proto, ATSConstants.CREATION_CAUSAL_ATTEMPT, 
tezTaskAttemptID.toString());
@@ -499,7 +499,7 @@ public class TestHistoryEventProtoConverter {
 
     HistoryEventProto proto = converter.convert(event);
     assertCommon(proto, HistoryEventType.VERTEX_INITIALIZED, initedTime,
-        EntityTypes.TEZ_VERTEX_ID, null, null, 5);
+        EntityTypes.TEZ_VERTEX_ID, null, null, 6);
 
     assertEventData(proto, ATSConstants.VERTEX_NAME, "v1");
     assertEventData(proto, ATSConstants.PROCESSOR_CLASS_NAME, "proc");
diff --git 
a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/CSVResult.java
 
b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/CSVResult.java
index 1da281c..0e167b2 100644
--- 
a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/CSVResult.java
+++ 
b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/CSVResult.java
@@ -32,6 +32,8 @@ import java.io.IOException;
 import java.io.OutputStreamWriter;
 import java.nio.charset.Charset;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
 
@@ -65,6 +67,10 @@ public class CSVResult implements Result {
     return Iterators.unmodifiableIterator(recordsList.iterator());
   }
 
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  public void sort(Comparator comparator) {
+    Collections.sort(recordsList, comparator);
+  }
 
   public void setComments(String comments) {
     this.comments = comments;
diff --git 
a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/AnalyzerDriver.java
 
b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/AnalyzerDriver.java
index cf600c5..cad0d98 100644
--- 
a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/AnalyzerDriver.java
+++ 
b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/AnalyzerDriver.java
@@ -46,6 +46,8 @@ public class AnalyzerDriver {
           "Print spill details in a DAG");
       pgd.addClass("TaskAssignmentAnalyzer", TaskAssignmentAnalyzer.class,
           "Print task-to-node assignment details of a DAG");
+      pgd.addClass("TaskAttemptResultStatisticsAnalyzer", 
TaskAttemptResultStatisticsAnalyzer.class,
+          "Print vertex:node:status level details of task attempt results");
       pgd.addClass("TaskConcurrencyAnalyzer", TaskConcurrencyAnalyzer.class,
           "Print the task concurrency details in a DAG");
       pgd.addClass("VertexLevelCriticalPathAnalyzer", 
VertexLevelCriticalPathAnalyzer.class,
diff --git 
a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TaskAttemptResultStatisticsAnalyzer.java
 
b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TaskAttemptResultStatisticsAnalyzer.java
new file mode 100644
index 0000000..df2f95c
--- /dev/null
+++ 
b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TaskAttemptResultStatisticsAnalyzer.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.analyzer.plugins;
+
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.tez.analyzer.Analyzer;
+import org.apache.tez.analyzer.CSVResult;
+import org.apache.tez.analyzer.Result;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.history.parser.datamodel.DagInfo;
+import org.apache.tez.history.parser.datamodel.TaskAttemptInfo;
+import org.apache.tez.history.parser.datamodel.VertexInfo;
+
+/**
+ * Get simple count of task attempt states on vertex:node:status level, like 
below.
+ *
+ * vertex (+task stats: all/succeeded/failed/killed),node,status,numAttempts
+ * Map 1 (vertex_x_y_z) (216/153/0/63),node1,KILLED:INTERNAL_PREEMPTION,1185
+ * Map 1 (vertex_x_y_z) (216/153/0/63),node1,KILLED:TERMINATED_AT_SHUTDOWN,22
+ * Map 1 (vertex_x_y_z) (216/153/0/63),node1,KILLED:EXTERNAL_PREEMPTION,3349
+ * Map 1 (vertex_x_y_z) (216/153/0/63),node1,SUCCEEDED,1
+ */
+public class TaskAttemptResultStatisticsAnalyzer extends TezAnalyzerBase 
implements Analyzer {
+  private final String[] headers =
+      { "vertex (+task stats: all/succeeded/failed/killed)", "node", "status", 
"numAttempts" };
+  private final Configuration config;
+  private final CSVResult csvResult;
+
+  public TaskAttemptResultStatisticsAnalyzer(Configuration config) {
+    this.config = config;
+    csvResult = new CSVResult(headers);
+  }
+
+  @Override
+  public void analyze(DagInfo dagInfo) throws TezException {
+    Map<String, Integer> map = new HashMap<>();
+
+    for (VertexInfo vertex : dagInfo.getVertices()) {
+      String taskStatsInVertex =
+          String.format("%s/%s/%s/%s", vertex.getNumTasks(), 
vertex.getSucceededTasksCount(),
+              vertex.getFailedTasksCount(), vertex.getKilledTasksCount());
+      for (TaskAttemptInfo attempt : vertex.getTaskAttempts()) {
+        String key = String.format("%s#%s#%s",
+            String.format("%s (%s) (%s)", vertex.getVertexName(), 
vertex.getVertexId(),
+                taskStatsInVertex),
+            attempt.getNodeId(), attempt.getDetailedStatus());
+        Integer previousValue = (Integer) map.get(key);
+        map.put(key, previousValue == null ? 1 : previousValue + 1);
+      }
+    }
+
+    map.forEach((key, value) -> {
+      addARecord(key.split("#")[0], key.split("#")[1], key.split("#")[2], 
value);
+    });
+
+    csvResult.sort(new Comparator<String[]>() {
+      public int compare(String[] first, String[] second) {
+        int vertexOrder = first[0].compareTo(second[0]);
+        int nodeOrder = first[1].compareTo(second[1]);
+        int statusOrder = first[2].compareTo(second[2]);
+
+        return vertexOrder == 0 ? (nodeOrder == 0 ? statusOrder : nodeOrder) : 
vertexOrder;
+      }
+    });
+  }
+
+  private void addARecord(String vertexData, String node, String status,
+      int numAttempts) {
+    String[] record = new String[4];
+    record[0] = vertexData;
+    record[1] = node;
+    record[2] = status;
+    record[3] = Integer.toString(numAttempts);
+    csvResult.addRecord(record);
+  }
+
+  @Override
+  public Result getResult() throws TezException {
+    return csvResult;
+  }
+
+  @Override
+  public String getName() {
+    return "Task Attempt Result Statistics Analyzer";
+  }
+
+  @Override
+  public String getDescription() {
+    return "Get statistics about task attempts states in vertex:node:status 
level";
+  }
+
+  @Override
+  public Configuration getConfiguration() {
+    return config;
+  }
+
+  public static void main(String[] args) throws Exception {
+    Configuration config = new Configuration();
+    TaskAttemptResultStatisticsAnalyzer analyzer = new 
TaskAttemptResultStatisticsAnalyzer(config);
+    int res = ToolRunner.run(config, analyzer, args);
+    analyzer.printResults();
+    System.exit(res);
+  }
+}
diff --git 
a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TezAnalyzerBase.java
 
b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TezAnalyzerBase.java
index 59ae4a3..75a55a7 100644
--- 
a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TezAnalyzerBase.java
+++ 
b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TezAnalyzerBase.java
@@ -19,7 +19,12 @@
 package org.apache.tez.analyzer.plugins;
 
 import java.io.File;
+import java.io.FilenameFilter;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Iterator;
+import java.util.List;
+import java.util.stream.Collectors;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.GnuParser;
@@ -27,6 +32,7 @@ import org.apache.commons.cli.Option;
 import org.apache.commons.cli.OptionBuilder;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
+import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.util.Tool;
@@ -36,6 +42,7 @@ import org.apache.tez.analyzer.Result;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.history.ATSImportTool;
 import org.apache.tez.history.parser.ATSFileParser;
+import org.apache.tez.history.parser.ProtoHistoryParser;
 import org.apache.tez.history.parser.SimpleHistoryParser;
 import org.apache.tez.history.parser.datamodel.DagInfo;
 
@@ -52,6 +59,7 @@ public abstract class TezAnalyzerBase extends Configured 
implements Tool, Analyz
   private static final String SAVE_RESULTS = "saveResults";
   private static final String DAG_ID = "dagId";
   private static final String FROM_SIMPLE_HISTORY = "fromSimpleHistory";
+  private static final String FROM_PROTO_HISTORY = "fromProtoHistory";
   private static final String HELP = "help";
 
   private static final int SEPARATOR_WIDTH = 80;
@@ -81,7 +89,12 @@ public abstract class TezAnalyzerBase extends Configured 
implements Tool, Analyz
         (FROM_SIMPLE_HISTORY)
         .withDescription("Event data from Simple History logging. Must also 
specify event file")
         .isRequired(false).create();
-    
+
+    Option fromProtoHistoryOption =
+        
OptionBuilder.withArgName(FROM_PROTO_HISTORY).withLongOpt(FROM_PROTO_HISTORY)
+            .withDescription("Event data from Proto History logging. Must also 
specify event file")
+            .isRequired(false).create();
+
     Option help = OptionBuilder.withArgName(HELP).withLongOpt
         (HELP)
         .withDescription("print help")
@@ -93,6 +106,7 @@ public abstract class TezAnalyzerBase extends Configured 
implements Tool, Analyz
     opts.addOption(saveResults);
     opts.addOption(eventFileNameOption);
     opts.addOption(fromSimpleHistoryOption);
+    opts.addOption(fromProtoHistoryOption);
     opts.addOption(help);
     return opts;
   }
@@ -133,21 +147,36 @@ public abstract class TezAnalyzerBase extends Configured 
implements Tool, Analyz
       outputDir = System.getProperty("user.dir");
     }
 
-    File file = null;
+    String dagId = cmdLine.getOptionValue(DAG_ID);
+
+    List<File> files = new ArrayList<File>();
     if (cmdLine.hasOption(EVENT_FILE_NAME)) {
-      file = new File(cmdLine.getOptionValue(EVENT_FILE_NAME));
+      for (String file : cmdLine.getOptionValue(EVENT_FILE_NAME).split(",")) {
+        File fileOrDir = new File(file);
+        if (fileOrDir.exists()) {
+          if (fileOrDir.isFile()) {
+            files.add(fileOrDir);
+          } else {
+            files.addAll(collectFilesForDagId(fileOrDir, dagId));
+          }
+        }
+      }
     }
-    
-    String dagId = cmdLine.getOptionValue(DAG_ID);
-    
+
     DagInfo dagInfo = null;
     
-    if (file == null) {
+    if (files.isEmpty()) {
       if (cmdLine.hasOption(FROM_SIMPLE_HISTORY)) {
         System.err.println("Event file name must be specified when using 
simple history");
         printUsage();
         return -2;
       }
+      if (cmdLine.hasOption(FROM_PROTO_HISTORY)) {
+        System.err.println("Proto file name must be specified when using proto 
history");
+        printUsage();
+        return -2;
+      }
+
       // using ATS - try to download directly
       String[] importArgs = { "--dagId=" + dagId, "--downloadDir=" + outputDir 
};
 
@@ -159,30 +188,60 @@ public abstract class TezAnalyzerBase extends Configured 
implements Tool, Analyz
 
       //Parse ATS data and verify results
       //Parse downloaded contents
-      file = new File(outputDir
-          + Path.SEPARATOR + dagId + ".zip");
+      files.add(new File(outputDir
+          + Path.SEPARATOR + dagId + ".zip"));
     }
     
-    Preconditions.checkState(file != null);
-    if (!cmdLine.hasOption(FROM_SIMPLE_HISTORY)) {
-      ATSFileParser parser = new ATSFileParser(file);
+    Preconditions.checkState(!files.isEmpty());
+    if (cmdLine.hasOption(FROM_SIMPLE_HISTORY)) {
+      SimpleHistoryParser parser = new SimpleHistoryParser(files);
+      dagInfo = parser.getDAGData(dagId);
+    } else if (cmdLine.hasOption(FROM_PROTO_HISTORY)) {
+      ProtoHistoryParser parser = new ProtoHistoryParser(files);
       dagInfo = parser.getDAGData(dagId);
     } else {
-      SimpleHistoryParser parser = new SimpleHistoryParser(file);
+      ATSFileParser parser = new ATSFileParser(files);
       dagInfo = parser.getDAGData(dagId);
     }
     Preconditions.checkState(dagInfo.getDagId().equals(dagId));
     analyze(dagInfo);
     Result result = getResult();
-    if (saveResults && (result instanceof CSVResult)) {
-      String fileName = outputDir + File.separator
-          + this.getClass().getName() + "_" + dagInfo.getDagId() + ".csv";
-      ((CSVResult) result).dumpToFile(fileName);
-      LOG.info("Saved results in " + fileName);
+
+    if (saveResults) {
+      String dagInfoFileName = outputDir + File.separator + 
this.getClass().getName() + "_"
+          + dagInfo.getDagId() + ".dag";
+      FileUtils.writeStringToFile(new File(dagInfoFileName), 
dagInfo.toExtendedString());
+      LOG.info("Saved dag info in " + dagInfoFileName);
+
+      if (result instanceof CSVResult) {
+        String fileName = outputDir + File.separator + 
this.getClass().getName() + "_"
+            + dagInfo.getDagId() + ".csv";
+        ((CSVResult) result).dumpToFile(fileName);
+        LOG.info("Saved results in " + fileName);
+      }
     }
+
     return 0;
   }
 
+  private List<File> collectFilesForDagId(File parentDir, String dagId) {
+    File[] arrFiles = parentDir.listFiles(new FilenameFilter() {
+      @Override
+      public boolean accept(File dir, String name) {
+        return name.contains(dagId);
+      }
+    });
+    if (arrFiles == null || arrFiles.length == 0) {
+      throw new RuntimeException(
+          String.format("cannot find relevant files for dag: '%s' in dir: %s", 
dagId, parentDir));
+    }
+
+    List<File> files = Arrays.asList(arrFiles);
+    LOG.info("collected files for dag: \n"
+        + files.stream().map(f -> "\n" + 
f.getAbsolutePath()).collect(Collectors.toList()));
+    return files;
+  }
+
   public void printResults() throws TezException {
     Result result = getResult();
     if (result instanceof CSVResult) {
diff --git 
a/tez-tools/analyzers/job-analyzer/src/test/java/org/apache/tez/analyzer/TestAnalyzer.java
 
b/tez-tools/analyzers/job-analyzer/src/test/java/org/apache/tez/analyzer/TestAnalyzer.java
index d34f9c5..606b7e3 100644
--- 
a/tez-tools/analyzers/job-analyzer/src/test/java/org/apache/tez/analyzer/TestAnalyzer.java
+++ 
b/tez-tools/analyzers/job-analyzer/src/test/java/org/apache/tez/analyzer/TestAnalyzer.java
@@ -21,6 +21,7 @@ package org.apache.tez.analyzer;
 import static org.junit.Assert.assertTrue;
 
 import java.io.File;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Random;
@@ -267,7 +268,7 @@ public class TestAnalyzer {
       //Parse downloaded contents
       File downloadedFile = new File(DOWNLOAD_DIR
           + Path.SEPARATOR + dagId + ".zip");
-      ATSFileParser parser = new ATSFileParser(downloadedFile);
+      ATSFileParser parser = new ATSFileParser(Arrays.asList(downloadedFile));
       dagInfo = parser.getDAGData(dagId);
       assertTrue(dagInfo.getDagId().equals(dagId));
     } else {
@@ -286,7 +287,7 @@ public class TestAnalyzer {
       }
       //Now parse via SimpleHistory
       File localFile = new File(DOWNLOAD_DIR, HISTORY_TXT);
-      SimpleHistoryParser parser = new SimpleHistoryParser(localFile);
+      SimpleHistoryParser parser = new 
SimpleHistoryParser(Arrays.asList(localFile));
       dagInfo = parser.getDAGData(dagId);
       assertTrue(dagInfo.getDagId().equals(dagId));
     }

Reply via email to