This is an automated email from the ASF dual-hosted git repository.
abstractdog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tez.git
The following commit(s) were added to refs/heads/master by this push:
new 6487834 TEZ-4105: Tez job-analyzer tool to support proto logging
history (László Bodor reviewed by Jonathan Turner Eagles)
6487834 is described below
commit 6487834281f8ef5f8afd9f5ba9fba68736d5b44a
Author: László Bodor <[email protected]>
AuthorDate: Wed Jun 3 13:30:01 2020 +0200
TEZ-4105: Tez job-analyzer tool to support proto logging history (László
Bodor reviewed by Jonathan Turner Eagles)
Signed-off-by: Laszlo Bodor <[email protected]>
---
pom.xml | 5 +
tez-plugins/tez-history-parser/pom.xml | 4 +
.../apache/tez/history/parser/ATSFileParser.java | 13 +-
.../tez/history/parser/ProtoHistoryParser.java | 128 ++++
.../tez/history/parser/SimpleHistoryParser.java | 244 ++++---
.../tez/history/parser/datamodel/BaseParser.java | 21 +
.../tez/history/parser/datamodel/DagInfo.java | 23 +-
.../history/parser/datamodel/TaskAttemptInfo.java | 2 +-
.../org/apache/tez/history/TestHistoryParser.java | 5 +-
.../logging/proto/HistoryEventProtoConverter.java | 4 +
.../proto/HistoryEventProtoJsonConversion.java | 766 +++++++++++++++++++++
.../history/logging/proto/ProtoMessageReader.java | 2 +-
.../proto/TestHistoryEventProtoConverter.java | 4 +-
.../java/org/apache/tez/analyzer/CSVResult.java | 6 +
.../tez/analyzer/plugins/AnalyzerDriver.java | 2 +
.../TaskAttemptResultStatisticsAnalyzer.java | 125 ++++
.../tez/analyzer/plugins/TezAnalyzerBase.java | 95 ++-
.../java/org/apache/tez/analyzer/TestAnalyzer.java | 5 +-
18 files changed, 1323 insertions(+), 131 deletions(-)
diff --git a/pom.xml b/pom.xml
index 7de5322..fe67b50 100644
--- a/pom.xml
+++ b/pom.xml
@@ -198,6 +198,11 @@
</dependency>
<dependency>
<groupId>org.apache.tez</groupId>
+ <artifactId>tez-protobuf-history-plugin</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
<artifactId>tez-yarn-timeline-history</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
diff --git a/tez-plugins/tez-history-parser/pom.xml
b/tez-plugins/tez-history-parser/pom.xml
index 7937fa4..1b1d4e8 100644
--- a/tez-plugins/tez-history-parser/pom.xml
+++ b/tez-plugins/tez-history-parser/pom.xml
@@ -55,6 +55,10 @@
</dependency>
<dependency>
<groupId>org.apache.tez</groupId>
+ <artifactId>tez-protobuf-history-plugin</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
<artifactId>tez-yarn-timeline-history</artifactId>
<scope>test</scope>
<type>test-jar</type>
diff --git
a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/ATSFileParser.java
b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/ATSFileParser.java
index e4720d4..e64fb43 100644
---
a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/ATSFileParser.java
+++
b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/ATSFileParser.java
@@ -41,6 +41,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.util.Enumeration;
import java.util.Iterator;
+import java.util.List;
import java.util.zip.ZipEntry;
import java.util.zip.ZipFile;
@@ -60,10 +61,10 @@ public class ATSFileParser extends BaseParser implements
ATSData {
private final File atsZipFile;
- public ATSFileParser(File atsZipFile) throws TezException {
+ public ATSFileParser(List<File> files) throws TezException {
super();
- Preconditions.checkArgument(atsZipFile.exists(), "Zipfile " + atsZipFile +
" does not exist");
- this.atsZipFile = atsZipFile;
+ Preconditions.checkArgument(checkFiles(files), "Zipfile " + files + " are
empty or they don't exist");
+ this.atsZipFile = files.get(0); //doesn't support multiple files at the
moment
}
@Override
@@ -72,14 +73,12 @@ public class ATSFileParser extends BaseParser implements
ATSData {
parseATSZipFile(atsZipFile);
linkParsedContents();
+ addRawDataToDagInfo();
return dagInfo;
- } catch (IOException e) {
+ } catch (IOException | JSONException e) {
LOG.error("Error in reading DAG ", e);
throw new TezException(e);
- } catch (JSONException e) {
- LOG.error("Error in parsing DAG ", e);
- throw new TezException(e);
} catch (InterruptedException e) {
throw new TezException(e);
}
diff --git
a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/ProtoHistoryParser.java
b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/ProtoHistoryParser.java
new file mode 100644
index 0000000..d28fd67
--- /dev/null
+++
b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/ProtoHistoryParser.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.history.parser;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.tez.common.Preconditions;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
+import
org.apache.tez.dag.history.logging.proto.HistoryEventProtoJsonConversion;
+import
org.apache.tez.dag.history.logging.proto.HistoryLoggerProtos.HistoryEventProto;
+import org.apache.tez.dag.history.logging.proto.ProtoMessageReader;
+import org.apache.tez.history.parser.datamodel.DagInfo;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Strings;
+
+/**
+ * Parser utility to parse data generated by ProtoHistoryLoggingService.
+ */
+public class ProtoHistoryParser extends SimpleHistoryParser {
+ private static final Logger LOG =
LoggerFactory.getLogger(ProtoHistoryParser.class);
+ private List<File> protoFiles;
+
+ public ProtoHistoryParser(List<File> files) {
+ super(files);
+ this.protoFiles = files;
+ }
+
+ /**
+ * Get in-memory representation of DagInfo.
+ *
+ * @return DagInfo
+ * @throws TezException
+ */
+ public DagInfo getDAGData(String dagId) throws TezException {
+ try {
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(dagId), "Please
provide valid dagId");
+ dagId = dagId.trim();
+ parseContents(dagId);
+ linkParsedContents();
+ addRawDataToDagInfo();
+ return dagInfo;
+ } catch (IOException | JSONException e) {
+ LOG.error("Error in reading DAG ", e);
+ throw new TezException(e);
+ }
+ }
+
+ private void parseContents(String dagId)
+ throws JSONException, FileNotFoundException, TezException, IOException {
+ JSONObjectSource source = getJsonSource();
+ parse(dagId, source);
+ }
+
+ private JSONObjectSource getJsonSource() throws IOException {
+ final TezConfiguration conf = new TezConfiguration();
+
+ Iterator<File> fileIt = protoFiles.iterator();
+
+ JSONObjectSource source = new JSONObjectSource() {
+ private HistoryEventProto message = null;
+ private ProtoMessageReader<HistoryEventProto> reader = new
ProtoMessageReader<>(conf,
+ new Path(fileIt.next().getPath()), HistoryEventProto.PARSER);
+
+ @Override
+ public JSONObject next() throws JSONException {
+ return HistoryEventProtoJsonConversion.convertToJson(message);
+ }
+
+ @Override
+ public boolean hasNext() throws IOException {
+ try {
+ message = (HistoryEventProto) reader.readEvent();
+ return message != null;
+ } catch (java.io.EOFException e) {
+ reader.close();
+
+ if (!fileIt.hasNext()) {
+ return false;
+ } else {
+ reader = new ProtoMessageReader<>(conf, new
Path(fileIt.next().getPath()),
+ HistoryEventProto.PARSER);
+ try {
+ message = (HistoryEventProto) reader.readEvent();
+ return message != null;
+ } catch (java.io.EOFException e2) {
+ return false;
+ }
+ }
+ }
+ }
+
+ @Override
+ public void close() {
+ try {
+ reader.close();
+ } catch (IOException e) {
+ LOG.warn("error while closing ProtoMessageReader", e);
+ }
+ }
+ };
+ return source;
+ }
+}
\ No newline at end of file
diff --git
a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/SimpleHistoryParser.java
b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/SimpleHistoryParser.java
index 3516de7..db3f648 100644
---
a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/SimpleHistoryParser.java
+++
b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/SimpleHistoryParser.java
@@ -17,9 +17,16 @@
*/
package org.apache.tez.history.parser;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Scanner;
+
+import org.apache.tez.common.ATSConstants;
import org.apache.tez.common.Preconditions;
-import com.google.common.base.Strings;
-import com.google.common.collect.Maps;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.history.logging.impl.SimpleHistoryLoggingService;
import org.apache.tez.dag.records.TezDAGID;
@@ -38,33 +45,34 @@ import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Scanner;
+import com.google.common.base.Strings;
+import com.google.common.collect.Maps;
/**
- * Parser utility to parse data generated by SimpleHistoryLogging to in-memory
datamodel provided
- * in org.apache.tez.history.parser.datamodel
+ * Parser utility to parse data generated by SimpleHistoryLogging to in-memory
datamodel provided in
+ * org.apache.tez.history.parser.datamodel.
* <p/>
* <p/>
- * Most of the information should be available. Minor info like VersionInfo
may not be available,
- * as it is not captured in SimpleHistoryLogging.
+ * Most of the information should be available. Minor info like VersionInfo
may not be available, as
+ * it is not captured in SimpleHistoryLogging.
*/
public class SimpleHistoryParser extends BaseParser {
private static final Logger LOG =
LoggerFactory.getLogger(SimpleHistoryParser.class);
- private static final String UTF8 = "UTF-8";
+ protected static final String UTF8 = "UTF-8";
private final File historyFile;
-
- public SimpleHistoryParser(File historyFile) {
+ public SimpleHistoryParser(List<File> files) {
super();
- Preconditions.checkArgument(historyFile.exists(), historyFile + " does not
exist");
- this.historyFile = historyFile;
+ Preconditions.checkArgument(checkFiles(files), files + " are empty or they
don't exist");
+ this.historyFile = files.get(0); //doesn't support multiple files at the
moment
}
+ protected interface JSONObjectSource {
+ boolean hasNext() throws IOException;
+ JSONObject next() throws JSONException;
+ void close();
+ };
+
/**
* Get in-memory representation of DagInfo
*
@@ -77,13 +85,11 @@ public class SimpleHistoryParser extends BaseParser {
dagId = dagId.trim();
parseContents(historyFile, dagId);
linkParsedContents();
+ addRawDataToDagInfo();
return dagInfo;
- } catch (IOException e) {
+ } catch (IOException | JSONException e) {
LOG.error("Error in reading DAG ", e);
throw new TezException(e);
- } catch (JSONException e) {
- LOG.error("Error in parsing DAG ", e);
- throw new TezException(e);
}
}
@@ -106,18 +112,117 @@ public class SimpleHistoryParser extends BaseParser {
}
private void parseContents(File historyFile, String dagId)
- throws JSONException, FileNotFoundException, TezException {
- Scanner scanner = new Scanner(historyFile, UTF8);
+ throws JSONException, FileNotFoundException, TezException, IOException {
+ JSONObjectSource source = getJsonSource();
+
+ parse(dagId, source);
+ }
+
+ private JSONObjectSource getJsonSource() throws FileNotFoundException {
+ final Scanner scanner = new Scanner(historyFile, UTF8);
scanner.useDelimiter(SimpleHistoryLoggingService.RECORD_SEPARATOR);
- JSONObject dagJson = null;
+
+ JSONObjectSource source = new JSONObjectSource() {
+ @Override
+ public JSONObject next() throws JSONException {
+ String line = scanner.next();
+ return new JSONObject(line);
+ }
+
+ @Override
+ public boolean hasNext() throws IOException {
+ return scanner.hasNext();
+ }
+
+ @Override
+ public void close() {
+ scanner.close();
+ }
+ };
+ return source;
+ }
+
+ protected void parse(String dagId, JSONObjectSource source)
+ throws JSONException, TezException, IOException {
Map<String, JSONObject> vertexJsonMap = Maps.newHashMap();
Map<String, JSONObject> taskJsonMap = Maps.newHashMap();
Map<String, JSONObject> attemptJsonMap = Maps.newHashMap();
+
+ readEventsFromSource(dagId, source, vertexJsonMap, taskJsonMap,
attemptJsonMap);
+ postProcessMaps(vertexJsonMap, taskJsonMap, attemptJsonMap);
+ }
+
+ protected void postProcessMaps(Map<String, JSONObject> vertexJsonMap,
+ Map<String, JSONObject> taskJsonMap, Map<String, JSONObject>
attemptJsonMap)
+ throws JSONException {
+ for (JSONObject jsonObject : vertexJsonMap.values()) {
+ VertexInfo vertexInfo = VertexInfo.create(jsonObject);
+ this.vertexList.add(vertexInfo);
+ LOG.debug("Parsed vertex {}", vertexInfo.getVertexName());
+ }
+ for (JSONObject jsonObject : taskJsonMap.values()) {
+ TaskInfo taskInfo = TaskInfo.create(jsonObject);
+ this.taskList.add(taskInfo);
+ LOG.debug("Parsed task {}", taskInfo.getTaskId());
+ }
+ for (JSONObject jsonObject : attemptJsonMap.values()) {
+ /**
+ * For converting SimpleHistoryLogging to in-memory representation
+ *
+ * We need to get
"relatedEntities":[{"entity":"cn055-10.l42scl.hortonworks.com:58690",
+ *
"entitytype":"nodeId"},{"entity":"container_1438652049951_0008_01_000152",
+ * "entitytype":"containerId"} and populate it in otherInfo object so
that in-memory
+ * representation can parse it correctly
+ */
+ JSONArray relatedEntities =
jsonObject.optJSONArray(Constants.RELATED_ENTITIES);
+ if (relatedEntities == null) {
+ //This can happen when CONTAINER_EXITED abruptly. (e.g Container
failed, exitCode=1)
+ LOG.debug("entity {} did not have related entities",
+ jsonObject.optJSONObject(Constants.ENTITY));
+ } else {
+ JSONObject subJsonObject = relatedEntities.optJSONObject(0);
+ if (subJsonObject != null) {
+ String nodeId = subJsonObject.optString(Constants.ENTITY_TYPE);
+ if (!Strings.isNullOrEmpty(nodeId) &&
nodeId.equalsIgnoreCase(Constants.NODE_ID)) {
+ //populate it in otherInfo
+ JSONObject otherInfo =
jsonObject.optJSONObject(Constants.OTHER_INFO);
+ String nodeIdVal = subJsonObject.optString(Constants.ENTITY);
+ if (otherInfo != null && nodeIdVal != null) {
+ otherInfo.put(Constants.NODE_ID, nodeIdVal);
+ }
+ }
+ }
+
+ subJsonObject = relatedEntities.optJSONObject(1);
+ if (subJsonObject != null) {
+ String containerId = subJsonObject.optString(Constants.ENTITY_TYPE);
+ if (!Strings.isNullOrEmpty(containerId) && containerId
+ .equalsIgnoreCase(Constants.CONTAINER_ID)) {
+ //populate it in otherInfo
+ JSONObject otherInfo =
jsonObject.optJSONObject(Constants.OTHER_INFO);
+ String containerIdVal = subJsonObject.optString(Constants.ENTITY);
+ if (otherInfo != null && containerIdVal != null) {
+ otherInfo.put(Constants.CONTAINER_ID, containerIdVal);
+ }
+ }
+ }
+ }
+ TaskAttemptInfo attemptInfo = TaskAttemptInfo.create(jsonObject);
+ this.attemptList.add(attemptInfo);
+ LOG.debug("Parsed task attempt {}", attemptInfo.getTaskAttemptId());
+ }
+ }
+
+ protected void readEventsFromSource(String dagId, JSONObjectSource source,
+ Map<String, JSONObject> vertexJsonMap, Map<String, JSONObject>
taskJsonMap,
+ Map<String, JSONObject> attemptJsonMap) throws JSONException,
TezException, IOException{
+ JSONObject dagJson = null;
TezDAGID tezDAGID = TezDAGID.fromString(dagId);
String userName = null;
- while (scanner.hasNext()) {
- String line = scanner.next();
- JSONObject jsonObject = new JSONObject(line);
+
+ while (source.hasNext()) {
+ JSONObject jsonObject = source.next();
+
String entity = jsonObject.getString(Constants.ENTITY);
String entityType = jsonObject.getString(Constants.ENTITY_TYPE);
switch (entityType) {
@@ -131,9 +236,12 @@ public class SimpleHistoryParser extends BaseParser {
// time etc).
if (dagJson == null) {
dagJson = jsonObject;
+ } else if (dagJson.optJSONObject(ATSConstants.OTHER_INFO)
+ .optJSONObject(ATSConstants.DAG_PLAN) == null) {
+ // if DAG_PLAN is not filled already, let's try to fetch it from
other
+
dagJson.getJSONObject(ATSConstants.OTHER_INFO).put(ATSConstants.DAG_PLAN,
jsonObject
+
.getJSONObject(ATSConstants.OTHER_INFO).getJSONObject(ATSConstants.DAG_PLAN));
}
- JSONObject otherInfo = jsonObject.optJSONObject(Constants.OTHER_INFO);
- JSONObject dagOtherInfo = dagJson.getJSONObject(Constants.OTHER_INFO);
JSONArray relatedEntities = dagJson.optJSONArray(Constants
.RELATED_ENTITIES);
//UserName is present in related entities
@@ -148,52 +256,52 @@ public class SimpleHistoryParser extends BaseParser {
}
}
}
- populateOtherInfo(otherInfo, dagOtherInfo);
+ populateOtherInfo(jsonObject.optJSONObject(Constants.OTHER_INFO),
+ dagJson.getJSONObject(Constants.OTHER_INFO));
break;
case Constants.TEZ_VERTEX_ID:
String vertexName = entity;
TezVertexID tezVertexID = TezVertexID.fromString(vertexName);
if (!tezDAGID.equals(tezVertexID.getDAGId())) {
- LOG.warn(vertexName + " does not belong to " + tezDAGID);
+ LOG.warn("{} does not belong to {} ('{}' != '{}')}", vertexName,
tezDAGID, tezDAGID, tezVertexID.getDAGId());
continue;
}
if (!vertexJsonMap.containsKey(vertexName)) {
vertexJsonMap.put(vertexName, jsonObject);
}
- otherInfo = jsonObject.optJSONObject(Constants.OTHER_INFO);
- populateOtherInfo(otherInfo, vertexName, vertexJsonMap);
+ populateOtherInfo(jsonObject.optJSONObject(Constants.OTHER_INFO),
vertexName, vertexJsonMap);
break;
case Constants.TEZ_TASK_ID:
String taskName = entity;
TezTaskID tezTaskID = TezTaskID.fromString(taskName);
if (!tezDAGID.equals(tezTaskID.getVertexID().getDAGId())) {
- LOG.warn(taskName + " does not belong to " + tezDAGID);
+ LOG.warn("{} does not belong to {} ('{}' != '{}')}", taskName,
tezDAGID, tezDAGID,
+ tezTaskID.getVertexID().getDAGId());
continue;
}
if (!taskJsonMap.containsKey(taskName)) {
taskJsonMap.put(taskName, jsonObject);
}
- otherInfo = jsonObject.optJSONObject(Constants.OTHER_INFO);
- populateOtherInfo(otherInfo, taskName, taskJsonMap);
+ populateOtherInfo(jsonObject.optJSONObject(Constants.OTHER_INFO),
taskName, taskJsonMap);
break;
case Constants.TEZ_TASK_ATTEMPT_ID:
String taskAttemptName = entity;
TezTaskAttemptID tezAttemptId =
TezTaskAttemptID.fromString(taskAttemptName);
if
(!tezDAGID.equals(tezAttemptId.getTaskID().getVertexID().getDAGId())) {
- LOG.warn(taskAttemptName + " does not belong to " + tezDAGID);
+ LOG.warn("{} does not belong to {} ('{}' != '{}')}",
taskAttemptName, tezDAGID, tezDAGID,
+ tezAttemptId.getTaskID().getVertexID().getDAGId());
continue;
}
if (!attemptJsonMap.containsKey(taskAttemptName)) {
attemptJsonMap.put(taskAttemptName, jsonObject);
}
- otherInfo = jsonObject.optJSONObject(Constants.OTHER_INFO);
- populateOtherInfo(otherInfo, taskAttemptName, attemptJsonMap);
+ populateOtherInfo(jsonObject.optJSONObject(Constants.OTHER_INFO),
taskAttemptName, attemptJsonMap);
break;
default:
break;
}
}
- scanner.close();
+ source.close();
if (dagJson != null) {
this.dagInfo = DagInfo.create(dagJson);
setUserName(userName);
@@ -202,61 +310,5 @@ public class SimpleHistoryParser extends BaseParser {
throw new TezException(
"Please provide a valid/complete history log file containing " +
dagId);
}
- for (JSONObject jsonObject : vertexJsonMap.values()) {
- VertexInfo vertexInfo = VertexInfo.create(jsonObject);
- this.vertexList.add(vertexInfo);
- LOG.debug("Parsed vertex {}", vertexInfo.getVertexName());
- }
- for (JSONObject jsonObject : taskJsonMap.values()) {
- TaskInfo taskInfo = TaskInfo.create(jsonObject);
- this.taskList.add(taskInfo);
- LOG.debug("Parsed task {}", taskInfo.getTaskId());
- }
- for (JSONObject jsonObject : attemptJsonMap.values()) {
- /**
- * For converting SimpleHistoryLogging to in-memory representation
- *
- * We need to get
"relatedEntities":[{"entity":"cn055-10.l42scl.hortonworks.com:58690",
- *
"entitytype":"nodeId"},{"entity":"container_1438652049951_0008_01_000152",
- * "entitytype":"containerId"} and populate it in otherInfo object so
that in-memory
- * representation can parse it correctly
- */
- JSONArray relatedEntities =
jsonObject.optJSONArray(Constants.RELATED_ENTITIES);
- if (relatedEntities == null) {
- //This can happen when CONTAINER_EXITED abruptly. (e.g Container
failed, exitCode=1)
- LOG.debug("entity {} did not have related entities",
- jsonObject.optJSONObject(Constants.ENTITY));
- } else {
- JSONObject subJsonObject = relatedEntities.optJSONObject(0);
- if (subJsonObject != null) {
- String nodeId = subJsonObject.optString(Constants.ENTITY_TYPE);
- if (!Strings.isNullOrEmpty(nodeId) &&
nodeId.equalsIgnoreCase(Constants.NODE_ID)) {
- //populate it in otherInfo
- JSONObject otherInfo =
jsonObject.optJSONObject(Constants.OTHER_INFO);
- String nodeIdVal = subJsonObject.optString(Constants.ENTITY);
- if (otherInfo != null && nodeIdVal != null) {
- otherInfo.put(Constants.NODE_ID, nodeIdVal);
- }
- }
- }
-
- subJsonObject = relatedEntities.optJSONObject(1);
- if (subJsonObject != null) {
- String containerId = subJsonObject.optString(Constants.ENTITY_TYPE);
- if (!Strings.isNullOrEmpty(containerId) && containerId
- .equalsIgnoreCase(Constants.CONTAINER_ID)) {
- //populate it in otherInfo
- JSONObject otherInfo =
jsonObject.optJSONObject(Constants.OTHER_INFO);
- String containerIdVal = subJsonObject.optString(Constants.ENTITY);
- if (otherInfo != null && containerIdVal != null) {
- otherInfo.put(Constants.CONTAINER_ID, containerIdVal);
- }
- }
- }
- }
- TaskAttemptInfo attemptInfo = TaskAttemptInfo.create(jsonObject);
- this.attemptList.add(attemptInfo);
- LOG.debug("Parsed task attempt {}", attemptInfo.getTaskAttemptId());
- }
}
}
\ No newline at end of file
diff --git
a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/BaseParser.java
b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/BaseParser.java
index 59ec03d..af8e292 100644
---
a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/BaseParser.java
+++
b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/BaseParser.java
@@ -25,6 +25,7 @@ import com.google.common.collect.Lists;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
+import java.io.File;
import java.util.List;
import java.util.Map;
@@ -44,6 +45,26 @@ public abstract class BaseParser {
attemptList = Lists.newLinkedList();
}
+
+ protected boolean checkFiles(List<File> files) {
+ if (files.isEmpty()) {
+ return false;
+ }
+ for (File file : files) {
+ if (!file.exists()) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+
+ protected void addRawDataToDagInfo() {
+ dagInfo.addMeta("vertices", vertexList);
+ dagInfo.addMeta("tasks", taskList);
+ dagInfo.addMeta("taskAttempts", attemptList);
+ }
+
/**
* link the parsed contents, so that it becomes easier to iterate from
DAG-->Task and Task--DAG.
* e.g Link vertex to dag, task to vertex, attempt to task etc
diff --git
a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/DagInfo.java
b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/DagInfo.java
index 64ddcf8..85fcfcf 100644
---
a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/DagInfo.java
+++
b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/DagInfo.java
@@ -44,6 +44,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -85,6 +86,8 @@ public class DagInfo extends BaseInfo {
private Multimap<Container, TaskAttemptInfo> containerMapping;
private Map<String, String> config;
+ private Map<String, Object> meta = new HashMap<String, Object>();
+
DagInfo(JSONObject jsonObject) throws JSONException {
super(jsonObject);
@@ -168,6 +171,10 @@ public class DagInfo extends BaseInfo {
return dagInfo;
}
+ public void addMeta(String key, Object value) {
+ meta.put(key, value);
+ }
+
private void parseDAGPlan(JSONObject dagPlan) throws JSONException {
int version = dagPlan.optInt(Constants.VERSION, 1);
parseEdges(dagPlan.optJSONArray(Constants.EDGES));
@@ -320,7 +327,7 @@ public class DagInfo extends BaseInfo {
BasicVertexInfo basicVertexInfo =
basicVertexInfoMap.get(vertexInfo.getVertexName());
Preconditions.checkArgument(basicVertexInfo != null,
- "VerteName " + vertexInfo.getVertexName()
+ "VertexName " + vertexInfo.getVertexName()
+ " not present in DAG's vertices " +
basicVertexInfoMap.entrySet());
//populate additional information in VertexInfo
@@ -387,6 +394,19 @@ public class DagInfo extends BaseInfo {
return sb.toString();
}
+ public String toExtendedString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append(toString());
+
+ try {
+ sb.append("\nmeta=").append(new JSONObject(meta).toString(3));
+ } catch (JSONException e) {
+ throw new RuntimeException(e);
+ }
+
+ return sb.toString();
+ }
+
public Multimap<Container, TaskAttemptInfo> getContainerMapping() {
return Multimaps.unmodifiableMultimap(containerMapping);
}
@@ -630,5 +650,4 @@ public class DagInfo extends BaseInfo {
final void setUserName(String userName) {
this.userName = userName;
}
-
}
diff --git
a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java
b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java
index 3f39310..3ce39bd 100644
---
a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java
+++
b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java
@@ -405,7 +405,7 @@ public class TaskAttemptInfo extends BaseInfo {
sb.append("container=").append(getContainer()).append(", ");
sb.append("nodeId=").append(getNodeId()).append(", ");
sb.append("logURL=").append(getLogURL()).append(", ");
- sb.append("status=").append(getStatus());
+ sb.append("status=").append(getDetailedStatus());
sb.append("]");
return sb.toString();
}
diff --git
a/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestHistoryParser.java
b/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestHistoryParser.java
index 92c4ad8..8a05465 100644
---
a/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestHistoryParser.java
+++
b/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestHistoryParser.java
@@ -88,6 +88,7 @@ import java.io.BufferedWriter;
import java.io.File;
import java.io.IOException;
import java.io.OutputStreamWriter;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
@@ -239,7 +240,7 @@ public class TestHistoryParser {
File localFile = new File(DOWNLOAD_DIR, HISTORY_TXT);
//Now parse via SimpleHistory
- SimpleHistoryParser parser = new SimpleHistoryParser(localFile);
+ SimpleHistoryParser parser = new
SimpleHistoryParser(Arrays.asList(localFile));
DagInfo dagInfo = parser.getDAGData(dagId);
assertTrue(dagInfo.getDagId().equals(dagId));
return dagInfo;
@@ -603,7 +604,7 @@ public class TestHistoryParser {
//Parse downloaded contents
File downloadedFile = new File(DOWNLOAD_DIR
+ Path.SEPARATOR + dagId + ".zip");
- ATSFileParser parser = new ATSFileParser(downloadedFile);
+ ATSFileParser parser = new ATSFileParser(Arrays.asList(downloadedFile));
DagInfo dagInfo = parser.getDAGData(dagId);
assertTrue(dagInfo.getDagId().equals(dagId));
return dagInfo;
diff --git
a/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/HistoryEventProtoConverter.java
b/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/HistoryEventProtoConverter.java
index 44dccb6..09079bd 100644
---
a/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/HistoryEventProtoConverter.java
+++
b/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/HistoryEventProtoConverter.java
@@ -351,11 +351,14 @@ public class HistoryEventProtoConverter {
addEventData(builder, ATSConstants.CREATION_TIME, event.getCreationTime());
addEventData(builder, ATSConstants.ALLOCATION_TIME,
event.getAllocationTime());
addEventData(builder, ATSConstants.START_TIME, event.getStartTime());
+
if (event.getCreationCausalTA() != null) {
addEventData(builder, ATSConstants.CREATION_CAUSAL_ATTEMPT,
event.getCreationCausalTA().toString());
}
addEventData(builder, ATSConstants.TIME_TAKEN, (event.getFinishTime() -
event.getStartTime()));
+ addEventData(builder, ATSConstants.STATUS, event.getState().name());
+
if (event.getTaskAttemptError() != null) {
addEventData(builder, ATSConstants.TASK_ATTEMPT_ERROR_ENUM,
event.getTaskAttemptError().name());
@@ -447,6 +450,7 @@ public class HistoryEventProtoConverter {
null, null, null, event.getVertexID(), null, null, null);
addEventData(builder, ATSConstants.VERTEX_NAME, event.getVertexName());
addEventData(builder, ATSConstants.INIT_REQUESTED_TIME,
event.getInitRequestedTime());
+ addEventData(builder, ATSConstants.INIT_TIME, event.getInitedTime());
addEventData(builder, ATSConstants.NUM_TASKS, event.getNumTasks());
addEventData(builder, ATSConstants.PROCESSOR_CLASS_NAME,
event.getProcessorName());
if (event.getServicePluginInfo() != null) {
diff --git
a/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/HistoryEventProtoJsonConversion.java
b/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/HistoryEventProtoJsonConversion.java
new file mode 100644
index 0000000..26e20ab
--- /dev/null
+++
b/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/HistoryEventProtoJsonConversion.java
@@ -0,0 +1,766 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.logging.proto;
+
+import java.util.Iterator;
+import java.util.Optional;
+
+import org.apache.tez.common.ATSConstants;
+import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.history.logging.EntityTypes;
+import
org.apache.tez.dag.history.logging.proto.HistoryLoggerProtos.HistoryEventProto;
+import org.apache.tez.dag.history.logging.proto.HistoryLoggerProtos.KVPair;
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+
+/**
+ * Convert HistoryEventProto into JSONObject for analyzers, which can already
consume the output of
+ * SimpleHistoryLoggingService's JSONs. This class is based on
HistoryEventJsonConversion, and all
+ * of the specific HistoryEvent calls were transformed info HistoryEventProto
calls by taking the
+ * corresponding HistoryEventProtoConverter methods into consideration.
+ */
+public final class HistoryEventProtoJsonConversion {
+
+ private HistoryEventProtoJsonConversion() {
+ }
+
+ public static JSONObject convertToJson(HistoryEventProto historyEvent)
throws JSONException {
+ JSONObject jsonObject = null;
+
+ switch (historyEvent.getEventType()) {
+ case "APP_LAUNCHED":
+ jsonObject = convertAppLaunchedEvent(historyEvent);
+ break;
+ case "AM_LAUNCHED":
+ jsonObject = convertAMLaunchedEvent(historyEvent);
+ break;
+ case "AM_STARTED":
+ jsonObject = convertAMStartedEvent(historyEvent);
+ break;
+ case "CONTAINER_LAUNCHED":
+ jsonObject = convertContainerLaunchedEvent(historyEvent);
+ break;
+ case "CONTAINER_STOPPED":
+ jsonObject = convertContainerStoppedEvent(historyEvent);
+ break;
+ case "DAG_SUBMITTED":
+ jsonObject = convertDAGSubmittedEvent(historyEvent);
+ break;
+ case "DAG_INITIALIZED":
+ jsonObject = convertDAGInitializedEvent(historyEvent);
+ break;
+ case "DAG_STARTED":
+ jsonObject = convertDAGStartedEvent(historyEvent);
+ break;
+ case "DAG_FINISHED":
+ jsonObject = convertDAGFinishedEvent(historyEvent);
+ break;
+ case "VERTEX_INITIALIZED":
+ jsonObject = convertVertexInitializedEvent(historyEvent);
+ break;
+ case "VERTEX_STARTED":
+ jsonObject = convertVertexStartedEvent(historyEvent);
+ break;
+ case "VERTEX_FINISHED":
+ jsonObject = convertVertexFinishedEvent(historyEvent);
+ break;
+ case "TASK_STARTED":
+ jsonObject = convertTaskStartedEvent(historyEvent);
+ break;
+ case "TASK_FINISHED":
+ jsonObject = convertTaskFinishedEvent(historyEvent);
+ break;
+ case "TASK_ATTEMPT_STARTED":
+ jsonObject = convertTaskAttemptStartedEvent(historyEvent);
+ break;
+ case "TASK_ATTEMPT_FINISHED":
+ jsonObject = convertTaskAttemptFinishedEvent(historyEvent);
+ break;
+ case "VERTEX_CONFIGURE_DONE":
+ jsonObject = convertVertexReconfigureDoneEvent(historyEvent);
+ break;
+ case "DAG_RECOVERED":
+ jsonObject = convertDAGRecoveredEvent(historyEvent);
+ break;
+ case "VERTEX_COMMIT_STARTED":
+ case "VERTEX_GROUP_COMMIT_STARTED":
+ case "VERTEX_GROUP_COMMIT_FINISHED":
+ case "DAG_COMMIT_STARTED":
+ throw new UnsupportedOperationException(
+ "Invalid Event, does not support history" + ", eventType=" +
historyEvent.getEventType());
+ default:
+ throw new UnsupportedOperationException(
+ "Unhandled Event" + ", eventType=" + historyEvent.getEventType());
+ }
+ return jsonObject;
+ }
+
+ private static JSONObject convertDAGRecoveredEvent(HistoryEventProto event)
throws JSONException {
+ JSONObject jsonObject = new JSONObject();
+ jsonObject.put(ATSConstants.ENTITY, event.getDagId());
+ jsonObject.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_DAG_ID.name());
+
+ // Related Entities not needed as should have been done in
+ // dag submission event
+
+ JSONArray events = new JSONArray();
+ JSONObject recoverEvent = new JSONObject();
+ recoverEvent.put(ATSConstants.TIMESTAMP, event.getEventTime());
+ recoverEvent.put(ATSConstants.EVENT_TYPE,
HistoryEventType.DAG_RECOVERED.name());
+
+ JSONObject recoverEventInfo = new JSONObject();
+ recoverEventInfo.put(ATSConstants.APPLICATION_ATTEMPT_ID,
event.getAppAttemptId().toString());
+ recoverEventInfo.put(ATSConstants.DAG_STATE, getDataValueByKey(event,
ATSConstants.DAG_STATE));
+ recoverEventInfo.put(ATSConstants.RECOVERY_FAILURE_REASON,
+ getDataValueByKey(event, ATSConstants.RECOVERY_FAILURE_REASON));
+
+ recoverEvent.put(ATSConstants.EVENT_INFO, recoverEventInfo);
+ events.put(recoverEvent);
+
+ jsonObject.put(ATSConstants.EVENTS, events);
+
+ return jsonObject;
+ }
+
+ private static JSONObject convertAppLaunchedEvent(HistoryEventProto event)
throws JSONException {
+ JSONObject jsonObject = new JSONObject();
+ jsonObject.put(ATSConstants.ENTITY, "tez_" + event.getAppId().toString());
+ jsonObject.put(ATSConstants.ENTITY_TYPE,
EntityTypes.TEZ_APPLICATION.name());
+
+ // Other info to tag with Tez App
+ JSONObject otherInfo = new JSONObject();
+ otherInfo.put(ATSConstants.USER, event.getUser());
+ otherInfo.put(ATSConstants.CONFIG, new JSONObject()); // TODO: config from
proto?
+
+ jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
+
+ return jsonObject;
+ }
+
+ private static JSONObject convertAMLaunchedEvent(HistoryEventProto event)
throws JSONException {
+ JSONObject jsonObject = new JSONObject();
+ jsonObject.put(ATSConstants.ENTITY, "tez_" +
event.getAppAttemptId().toString());
+ jsonObject.put(ATSConstants.ENTITY_TYPE,
EntityTypes.TEZ_APPLICATION_ATTEMPT.name());
+
+ // Related Entities
+ JSONArray relatedEntities = new JSONArray();
+ JSONObject appEntity = new JSONObject();
+ appEntity.put(ATSConstants.ENTITY, event.getAppId().toString());
+ appEntity.put(ATSConstants.ENTITY_TYPE, ATSConstants.APPLICATION_ID);
+ JSONObject appAttemptEntity = new JSONObject();
+ appAttemptEntity.put(ATSConstants.ENTITY,
event.getAppAttemptId().toString());
+ appAttemptEntity.put(ATSConstants.ENTITY_TYPE,
ATSConstants.APPLICATION_ATTEMPT_ID);
+ relatedEntities.put(appEntity);
+ relatedEntities.put(appAttemptEntity);
+ jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
+
+ // TODO decide whether this goes into different events,
+ // event info or other info.
+ JSONArray events = new JSONArray();
+ JSONObject initEvent = new JSONObject();
+ initEvent.put(ATSConstants.TIMESTAMP, event.getEventTime());
+ initEvent.put(ATSConstants.EVENT_TYPE,
HistoryEventType.AM_LAUNCHED.name());
+ events.put(initEvent);
+ jsonObject.put(ATSConstants.EVENTS, events);
+
+ // Other info to tag with Tez AM
+ JSONObject otherInfo = new JSONObject();
+ otherInfo.put(ATSConstants.APP_SUBMIT_TIME,
+ getDataValueByKey(event, ATSConstants.APP_SUBMIT_TIME));
+ jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
+
+ return jsonObject;
+ }
+
+ private static JSONObject convertAMStartedEvent(HistoryEventProto event)
throws JSONException {
+ JSONObject jsonObject = new JSONObject();
+ jsonObject.put(ATSConstants.ENTITY, "tez_" +
event.getAppAttemptId().toString());
+ jsonObject.put(ATSConstants.ENTITY_TYPE,
EntityTypes.TEZ_APPLICATION_ATTEMPT.name());
+
+ // Related Entities
+ JSONArray relatedEntities = new JSONArray();
+ JSONObject appEntity = new JSONObject();
+ appEntity.put(ATSConstants.ENTITY, event.getAppId().toString());
+ appEntity.put(ATSConstants.ENTITY_TYPE, ATSConstants.APPLICATION_ID);
+ JSONObject appAttemptEntity = new JSONObject();
+ appAttemptEntity.put(ATSConstants.ENTITY,
event.getAppAttemptId().toString());
+ appAttemptEntity.put(ATSConstants.ENTITY_TYPE,
ATSConstants.APPLICATION_ATTEMPT_ID);
+ relatedEntities.put(appEntity);
+ relatedEntities.put(appAttemptEntity);
+ jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
+
+ // TODO decide whether this goes into different events,
+ // event info or other info.
+ JSONArray events = new JSONArray();
+ JSONObject startEvent = new JSONObject();
+ startEvent.put(ATSConstants.TIMESTAMP, event.getEventTime());
+ startEvent.put(ATSConstants.EVENT_TYPE,
HistoryEventType.AM_STARTED.name());
+ events.put(startEvent);
+ jsonObject.put(ATSConstants.EVENTS, events);
+
+ return jsonObject;
+ }
+
+ private static JSONObject convertContainerLaunchedEvent(HistoryEventProto
event)
+ throws JSONException {
+ JSONObject jsonObject = new JSONObject();
+ jsonObject.put(ATSConstants.ENTITY,
+ "tez_" + getDataValueByKey(event, ATSConstants.CONTAINER_ID));
+ jsonObject.put(ATSConstants.ENTITY_TYPE,
EntityTypes.TEZ_CONTAINER_ID.name());
+
+ JSONArray relatedEntities = new JSONArray();
+ JSONObject appAttemptEntity = new JSONObject();
+ appAttemptEntity.put(ATSConstants.ENTITY,
event.getAppAttemptId().toString());
+ appAttemptEntity.put(ATSConstants.ENTITY_TYPE,
EntityTypes.TEZ_APPLICATION_ATTEMPT.name());
+
+ JSONObject containerEntity = new JSONObject();
+ containerEntity.put(ATSConstants.ENTITY, getDataValueByKey(event,
ATSConstants.CONTAINER_ID));
+ containerEntity.put(ATSConstants.ENTITY_TYPE, ATSConstants.CONTAINER_ID);
+
+ relatedEntities.put(appAttemptEntity);
+ relatedEntities.put(containerEntity);
+ jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
+
+ // TODO decide whether this goes into different events,
+ // event info or other info.
+ JSONArray events = new JSONArray();
+ JSONObject launchEvent = new JSONObject();
+ launchEvent.put(ATSConstants.TIMESTAMP, event.getEventTime());
+ launchEvent.put(ATSConstants.EVENT_TYPE,
HistoryEventType.CONTAINER_LAUNCHED.name());
+ events.put(launchEvent);
+ jsonObject.put(ATSConstants.EVENTS, events);
+
+ // TODO add other container info here? or assume AHS will have this?
+ // TODO container logs?
+
+ return jsonObject;
+ }
+
+ private static JSONObject convertContainerStoppedEvent(HistoryEventProto
event)
+ throws JSONException {
+ // structure is identical to ContainerLaunchedEvent
+ JSONObject jsonObject = new JSONObject();
+ jsonObject.put(ATSConstants.ENTITY,
+ "tez_" + getDataValueByKey(event, ATSConstants.CONTAINER_ID));
+ jsonObject.put(ATSConstants.ENTITY_TYPE,
EntityTypes.TEZ_CONTAINER_ID.name());
+
+ JSONArray relatedEntities = new JSONArray();
+ JSONObject appAttemptEntity = new JSONObject();
+ appAttemptEntity.put(ATSConstants.ENTITY,
event.getAppAttemptId().toString());
+ appAttemptEntity.put(ATSConstants.ENTITY_TYPE,
EntityTypes.TEZ_APPLICATION_ATTEMPT.name());
+
+ JSONObject containerEntity = new JSONObject();
+ containerEntity.put(ATSConstants.ENTITY, getDataValueByKey(event,
ATSConstants.CONTAINER_ID));
+ containerEntity.put(ATSConstants.ENTITY_TYPE, ATSConstants.CONTAINER_ID);
+
+ relatedEntities.put(appAttemptEntity);
+ relatedEntities.put(containerEntity);
+ jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
+
+ // TODO decide whether this goes into different events,
+ // event info or other info.
+ JSONArray events = new JSONArray();
+ JSONObject stopEvent = new JSONObject();
+ stopEvent.put(ATSConstants.TIMESTAMP, event.getEventTime());
+ stopEvent.put(ATSConstants.EVENT_TYPE,
HistoryEventType.CONTAINER_STOPPED.name());
+ events.put(stopEvent);
+ jsonObject.put(ATSConstants.EVENTS, events);
+
+ // TODO add other container info here? or assume AHS will have this?
+ // TODO container logs?
+
+ // Other info
+ JSONObject otherInfo = new JSONObject();
+ otherInfo.put(ATSConstants.EXIT_STATUS, getDataValueByKey(event,
ATSConstants.EXIT_STATUS));
+ jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
+
+ return jsonObject;
+ }
+
+ private static JSONObject convertDAGFinishedEvent(HistoryEventProto event)
throws JSONException {
+ JSONObject jsonObject = new JSONObject();
+ jsonObject.put(ATSConstants.ENTITY, event.getDagId());
+ jsonObject.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_DAG_ID.name());
+
+ // Related Entities not needed as should have been done in
+ // dag submission event
+
+ // TODO decide whether this goes into different events,
+ // event info or other info.
+ JSONArray events = new JSONArray();
+ JSONObject finishEvent = new JSONObject();
+ finishEvent.put(ATSConstants.TIMESTAMP, event.getEventTime());
+ finishEvent.put(ATSConstants.EVENT_TYPE,
HistoryEventType.DAG_FINISHED.name());
+ events.put(finishEvent);
+ jsonObject.put(ATSConstants.EVENTS, events);
+
+ JSONObject otherInfo = new JSONObject();
+
+ long startTime = getLongDataValueByKey(event, ATSConstants.START_TIME);
+
+ otherInfo.put(ATSConstants.START_TIME, startTime);
+ otherInfo.put(ATSConstants.FINISH_TIME, event.getEventTime());
+ otherInfo.put(ATSConstants.TIME_TAKEN, event.getEventTime() - startTime);
+ otherInfo.put(ATSConstants.STATUS, getDataValueByKey(event,
ATSConstants.STATUS));
+ otherInfo.put(ATSConstants.DIAGNOSTICS, getDataValueByKey(event,
ATSConstants.DIAGNOSTICS));
+ otherInfo.put(ATSConstants.COUNTERS, getJSONDataValueByKey(event,
ATSConstants.COUNTERS));
+ otherInfo.put(ATSConstants.COMPLETION_APPLICATION_ATTEMPT_ID,
+ event.getAppAttemptId().toString());
+
+ // added all info to otherInfo in order to cover
+ // all key/value pairs added from event.getDagTaskStats()
+ Iterator<KVPair> it = event.getEventDataList().iterator();
+ while (it.hasNext()) {
+ KVPair pair = it.next();
+ otherInfo.put(pair.getKey(), pair.getValue());
+ }
+
+ jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
+
+ return jsonObject;
+ }
+
+ private static JSONObject convertDAGInitializedEvent(HistoryEventProto event)
+ throws JSONException {
+ JSONObject jsonObject = new JSONObject();
+ jsonObject.put(ATSConstants.ENTITY, event.getDagId());
+ jsonObject.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_DAG_ID.name());
+
+ // Related Entities not needed as should have been done in
+ // dag submission event
+
+ JSONArray events = new JSONArray();
+ JSONObject initEvent = new JSONObject();
+ initEvent.put(ATSConstants.TIMESTAMP, event.getEventTime());
+ initEvent.put(ATSConstants.EVENT_TYPE,
HistoryEventType.DAG_INITIALIZED.name());
+ events.put(initEvent);
+ jsonObject.put(ATSConstants.EVENTS, events);
+
+ JSONObject otherInfo = new JSONObject();
+ otherInfo.put(ATSConstants.VERTEX_NAME_ID_MAPPING,
+ getJSONDataValueByKey(event, ATSConstants.VERTEX_NAME_ID_MAPPING));
+ jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
+
+ return jsonObject;
+ }
+
+ private static JSONObject convertDAGStartedEvent(HistoryEventProto event)
throws JSONException {
+ JSONObject jsonObject = new JSONObject();
+ jsonObject.put(ATSConstants.ENTITY, event.getDagId());
+ jsonObject.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_DAG_ID.name());
+
+ // Related Entities not needed as should have been done in
+ // dag submission event
+
+ // TODO decide whether this goes into different events,
+ // event info or other info.
+ JSONArray events = new JSONArray();
+ JSONObject startEvent = new JSONObject();
+ startEvent.put(ATSConstants.TIMESTAMP, event.getEventTime());
+ startEvent.put(ATSConstants.EVENT_TYPE,
HistoryEventType.DAG_STARTED.name());
+ events.put(startEvent);
+ jsonObject.put(ATSConstants.EVENTS, events);
+
+ return jsonObject;
+ }
+
+ private static JSONObject convertDAGSubmittedEvent(HistoryEventProto event)
throws JSONException {
+ JSONObject jsonObject = new JSONObject();
+ jsonObject.put(ATSConstants.ENTITY, event.getDagId());
+ jsonObject.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_DAG_ID.name());
+
+ // Related Entities
+ JSONArray relatedEntities = new JSONArray();
+ JSONObject tezAppEntity = new JSONObject();
+ tezAppEntity.put(ATSConstants.ENTITY, "tez_" +
event.getAppId().toString());
+ tezAppEntity.put(ATSConstants.ENTITY_TYPE,
EntityTypes.TEZ_APPLICATION.name());
+ JSONObject tezAppAttemptEntity = new JSONObject();
+ tezAppAttemptEntity.put(ATSConstants.ENTITY, "tez_" +
event.getAppAttemptId().toString());
+ tezAppAttemptEntity.put(ATSConstants.ENTITY_TYPE,
EntityTypes.TEZ_APPLICATION_ATTEMPT.name());
+ JSONObject appEntity = new JSONObject();
+ appEntity.put(ATSConstants.ENTITY, event.getAppId().toString());
+ appEntity.put(ATSConstants.ENTITY_TYPE, ATSConstants.APPLICATION_ID);
+ JSONObject appAttemptEntity = new JSONObject();
+ appAttemptEntity.put(ATSConstants.ENTITY,
event.getAppAttemptId().toString());
+ appAttemptEntity.put(ATSConstants.ENTITY_TYPE,
ATSConstants.APPLICATION_ATTEMPT_ID);
+ JSONObject userEntity = new JSONObject();
+ userEntity.put(ATSConstants.ENTITY, event.getUser());
+ userEntity.put(ATSConstants.ENTITY_TYPE, ATSConstants.USER);
+
+ relatedEntities.put(tezAppEntity);
+ relatedEntities.put(tezAppAttemptEntity);
+ relatedEntities.put(appEntity);
+ relatedEntities.put(appAttemptEntity);
+ relatedEntities.put(userEntity);
+ jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
+
+ // filters
+ JSONObject primaryFilters = new JSONObject();
+ primaryFilters.put(ATSConstants.DAG_NAME, getDataValueByKey(event,
ATSConstants.DAG_NAME));
+ primaryFilters.put(ATSConstants.CALLER_CONTEXT_ID,
+ getDataValueByKey(event, ATSConstants.CALLER_CONTEXT_ID));
+ primaryFilters.put(ATSConstants.CALLER_CONTEXT_TYPE,
+ getDataValueByKey(event, ATSConstants.CALLER_CONTEXT_TYPE));
+ primaryFilters.put(ATSConstants.DAG_QUEUE_NAME,
+ getDataValueByKey(event, ATSConstants.DAG_QUEUE_NAME));
+
+ jsonObject.put(ATSConstants.PRIMARY_FILTERS, primaryFilters);
+
+ // TODO decide whether this goes into different events,
+ // event info or other info.
+ JSONArray events = new JSONArray();
+ JSONObject submitEvent = new JSONObject();
+ submitEvent.put(ATSConstants.TIMESTAMP, event.getEventTime());
+ submitEvent.put(ATSConstants.EVENT_TYPE,
HistoryEventType.DAG_SUBMITTED.name());
+ events.put(submitEvent);
+ jsonObject.put(ATSConstants.EVENTS, events);
+
+ // Other info such as dag plan
+ JSONObject otherInfo = new JSONObject();
+ otherInfo.put(ATSConstants.DAG_PLAN, getJSONDataValueByKey(event,
ATSConstants.DAG_PLAN));
+
+ otherInfo.put(ATSConstants.CALLER_CONTEXT_ID,
+ getDataValueByKey(event, ATSConstants.CALLER_CONTEXT_ID));
+ otherInfo.put(ATSConstants.CALLER_CONTEXT_TYPE,
+ getDataValueByKey(event, ATSConstants.CALLER_CONTEXT_TYPE));
+ otherInfo.put(ATSConstants.DAG_QUEUE_NAME,
+ getDataValueByKey(event, ATSConstants.DAG_QUEUE_NAME));
+
+ jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
+
+ return jsonObject;
+ }
+
+ private static JSONObject convertTaskAttemptFinishedEvent(HistoryEventProto
event)
+ throws JSONException {
+ JSONObject jsonObject = new JSONObject();
+ jsonObject.put(ATSConstants.ENTITY, event.getTaskAttemptId());
+ jsonObject.put(ATSConstants.ENTITY_TYPE,
EntityTypes.TEZ_TASK_ATTEMPT_ID.name());
+
+ // Events
+ JSONArray events = new JSONArray();
+ JSONObject finishEvent = new JSONObject();
+ finishEvent.put(ATSConstants.TIMESTAMP, event.getEventTime());
+ finishEvent.put(ATSConstants.EVENT_TYPE,
HistoryEventType.TASK_ATTEMPT_FINISHED.name());
+ events.put(finishEvent);
+ jsonObject.put(ATSConstants.EVENTS, events);
+
+ JSONObject otherInfo = new JSONObject();
+ long startTime = getLongDataValueByKey(event, ATSConstants.START_TIME);
+
+ otherInfo.put(ATSConstants.CREATION_TIME, getDataValueByKey(event,
ATSConstants.CREATION_TIME));
+ otherInfo.put(ATSConstants.ALLOCATION_TIME,
+ getDataValueByKey(event, ATSConstants.ALLOCATION_TIME));
+ otherInfo.put(ATSConstants.START_TIME, startTime);
+ otherInfo.put(ATSConstants.FINISH_TIME, event.getEventTime());
+ otherInfo.put(ATSConstants.TIME_TAKEN, event.getEventTime() - startTime);
+
+ otherInfo.put(ATSConstants.CREATION_CAUSAL_ATTEMPT,
+ getDataValueByKey(event, ATSConstants.CREATION_CAUSAL_ATTEMPT));
+ otherInfo.put(ATSConstants.STATUS, getDataValueByKey(event,
ATSConstants.STATUS));
+
+ otherInfo.put(ATSConstants.STATUS, getDataValueByKey(event,
ATSConstants.STATUS));
+ otherInfo.put(ATSConstants.TASK_ATTEMPT_ERROR_ENUM,
+ getDataValueByKey(event, ATSConstants.TASK_ATTEMPT_ERROR_ENUM));
+ otherInfo.put(ATSConstants.TASK_FAILURE_TYPE,
+ getDataValueByKey(event, ATSConstants.TASK_FAILURE_TYPE));
+ otherInfo.put(ATSConstants.DIAGNOSTICS, getDataValueByKey(event,
ATSConstants.DIAGNOSTICS));
+ otherInfo.put(ATSConstants.COUNTERS, getJSONDataValueByKey(event,
ATSConstants.COUNTERS));
+ otherInfo.put(ATSConstants.LAST_DATA_EVENTS,
+ getJSONDataValueByKey(event, ATSConstants.LAST_DATA_EVENTS));
+ otherInfo.put(ATSConstants.NODE_ID, getDataValueByKey(event,
ATSConstants.NODE_ID));
+ otherInfo.put(ATSConstants.CONTAINER_ID, getDataValueByKey(event,
ATSConstants.CONTAINER_ID));
+ otherInfo.put(ATSConstants.IN_PROGRESS_LOGS_URL,
+ getDataValueByKey(event, ATSConstants.IN_PROGRESS_LOGS_URL));
+ otherInfo.put(ATSConstants.COMPLETED_LOGS_URL,
+ getDataValueByKey(event, ATSConstants.COMPLETED_LOGS_URL));
+ otherInfo.put(ATSConstants.NODE_HTTP_ADDRESS,
+ getDataValueByKey(event, ATSConstants.NODE_HTTP_ADDRESS));
+
+ jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
+
+ return jsonObject;
+ }
+
+ private static JSONObject convertTaskAttemptStartedEvent(HistoryEventProto
event)
+ throws JSONException {
+ JSONObject jsonObject = new JSONObject();
+ jsonObject.put(ATSConstants.ENTITY, event.getTaskAttemptId());
+ jsonObject.put(ATSConstants.ENTITY_TYPE,
EntityTypes.TEZ_TASK_ATTEMPT_ID.name());
+
+ // Related entities
+ JSONArray relatedEntities = new JSONArray();
+ JSONObject nodeEntity = new JSONObject();
+ nodeEntity.put(ATSConstants.ENTITY, getDataValueByKey(event,
ATSConstants.NODE_ID));
+ nodeEntity.put(ATSConstants.ENTITY_TYPE, ATSConstants.NODE_ID);
+
+ JSONObject containerEntity = new JSONObject();
+ containerEntity.put(ATSConstants.ENTITY, getDataValueByKey(event,
ATSConstants.CONTAINER_ID));
+ containerEntity.put(ATSConstants.ENTITY_TYPE, ATSConstants.CONTAINER_ID);
+
+ JSONObject taskEntity = new JSONObject();
+ taskEntity.put(ATSConstants.ENTITY, event.getTaskAttemptId());
+ taskEntity.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_TASK_ID.name());
+
+ relatedEntities.put(nodeEntity);
+ relatedEntities.put(containerEntity);
+ relatedEntities.put(taskEntity);
+ jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
+
+ // Events
+ JSONArray events = new JSONArray();
+ JSONObject startEvent = new JSONObject();
+ startEvent.put(ATSConstants.TIMESTAMP, event.getEventTime());
+ startEvent.put(ATSConstants.EVENT_TYPE,
HistoryEventType.TASK_ATTEMPT_STARTED.name());
+ events.put(startEvent);
+ jsonObject.put(ATSConstants.EVENTS, events);
+
+ // Other info
+ JSONObject otherInfo = new JSONObject();
+ otherInfo.put(ATSConstants.IN_PROGRESS_LOGS_URL,
+ getDataValueByKey(event, ATSConstants.IN_PROGRESS_LOGS_URL));
+ otherInfo.put(ATSConstants.COMPLETED_LOGS_URL,
+ getDataValueByKey(event, ATSConstants.COMPLETED_LOGS_URL));
+ jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
+
+ return jsonObject;
+ }
+
+ private static JSONObject convertTaskFinishedEvent(HistoryEventProto event)
throws JSONException {
+ JSONObject jsonObject = new JSONObject();
+ jsonObject.put(ATSConstants.ENTITY, event.getTaskId());
+ jsonObject.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_TASK_ID.name());
+
+ // Events
+ JSONArray events = new JSONArray();
+ JSONObject finishEvent = new JSONObject();
+ finishEvent.put(ATSConstants.TIMESTAMP, event.getEventTime());
+ finishEvent.put(ATSConstants.EVENT_TYPE,
HistoryEventType.TASK_FINISHED.name());
+ events.put(finishEvent);
+ jsonObject.put(ATSConstants.EVENTS, events);
+
+ long startTime = getLongDataValueByKey(event, ATSConstants.START_TIME);
+
+ JSONObject otherInfo = new JSONObject();
+ otherInfo.put(ATSConstants.START_TIME, startTime);
+ otherInfo.put(ATSConstants.FINISH_TIME, event.getEventTime());
+ otherInfo.put(ATSConstants.TIME_TAKEN, event.getEventTime() - startTime);
+
+ otherInfo.put(ATSConstants.STATUS, getDataValueByKey(event,
ATSConstants.STATUS));
+ otherInfo.put(ATSConstants.DIAGNOSTICS, getDataValueByKey(event,
ATSConstants.DIAGNOSTICS));
+ otherInfo.put(ATSConstants.COUNTERS, getJSONDataValueByKey(event,
ATSConstants.COUNTERS));
+ otherInfo.put(ATSConstants.SUCCESSFUL_ATTEMPT_ID,
+ getDataValueByKey(event, ATSConstants.SUCCESSFUL_ATTEMPT_ID));
+
+ jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
+
+ return jsonObject;
+ }
+
+ private static JSONObject convertTaskStartedEvent(HistoryEventProto event)
throws JSONException {
+ JSONObject jsonObject = new JSONObject();
+ jsonObject.put(ATSConstants.ENTITY, event.getTaskId());
+ jsonObject.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_TASK_ID.name());
+
+ // Related entities
+ JSONArray relatedEntities = new JSONArray();
+ JSONObject vertexEntity = new JSONObject();
+ vertexEntity.put(ATSConstants.ENTITY, event.getVertexId());
+ vertexEntity.put(ATSConstants.ENTITY_TYPE,
EntityTypes.TEZ_VERTEX_ID.name());
+ relatedEntities.put(vertexEntity);
+ jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
+
+ // Events
+ JSONArray events = new JSONArray();
+ JSONObject startEvent = new JSONObject();
+ startEvent.put(ATSConstants.TIMESTAMP, event.getEventTime());
+ startEvent.put(ATSConstants.EVENT_TYPE,
HistoryEventType.TASK_STARTED.name());
+ events.put(startEvent);
+ jsonObject.put(ATSConstants.EVENTS, events);
+
+ // Other info
+ // TODO fix schedule/launch time to be events
+ JSONObject otherInfo = new JSONObject();
+ otherInfo.put(ATSConstants.START_TIME, event.getEventTime());
+ otherInfo.put(ATSConstants.SCHEDULED_TIME,
+ getDataValueByKey(event, ATSConstants.SCHEDULED_TIME));
+ jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
+
+ return jsonObject;
+ }
+
+ private static JSONObject convertVertexFinishedEvent(HistoryEventProto event)
+ throws JSONException {
+ JSONObject jsonObject = new JSONObject();
+ jsonObject.put(ATSConstants.ENTITY, event.getVertexId());
+ jsonObject.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_VERTEX_ID.name());
+
+ // Events
+ JSONArray events = new JSONArray();
+ JSONObject finishEvent = new JSONObject();
+ finishEvent.put(ATSConstants.TIMESTAMP, event.getEventTime());
+ finishEvent.put(ATSConstants.EVENT_TYPE,
HistoryEventType.VERTEX_FINISHED.name());
+ events.put(finishEvent);
+ jsonObject.put(ATSConstants.EVENTS, events);
+
+ long startTime = getLongDataValueByKey(event, ATSConstants.START_TIME);
+
+ JSONObject otherInfo = new JSONObject();
+ otherInfo.put(ATSConstants.FINISH_TIME, event.getEventTime());
+ otherInfo.put(ATSConstants.TIME_TAKEN, (event.getEventTime() - startTime));
+ otherInfo.put(ATSConstants.STATUS, getDataValueByKey(event,
ATSConstants.STATUS));
+ otherInfo.put(ATSConstants.DIAGNOSTICS, getDataValueByKey(event,
ATSConstants.DIAGNOSTICS));
+ otherInfo.put(ATSConstants.COUNTERS, getJSONDataValueByKey(event,
ATSConstants.COUNTERS));
+
+ otherInfo.put(ATSConstants.STATS, getJSONDataValueByKey(event,
ATSConstants.STATS));
+
+ // added all info to otherInfo in order to cover
+ // all key/value pairs added from event.getVertexTaskStats()
+ Iterator<KVPair> it = event.getEventDataList().iterator();
+ while (it.hasNext()) {
+ KVPair pair = it.next();
+ otherInfo.put(pair.getKey(), pair.getValue());
+ }
+
+ otherInfo.put(ATSConstants.SERVICE_PLUGIN,
+ getJSONDataValueByKey(event, ATSConstants.SERVICE_PLUGIN));
+
+ jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
+
+ return jsonObject;
+ }
+
+ private static JSONObject
convertVertexReconfigureDoneEvent(HistoryEventProto event)
+ throws JSONException {
+ JSONObject jsonObject = new JSONObject();
+ jsonObject.put(ATSConstants.ENTITY, event.getVertexId());
+ jsonObject.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_VERTEX_ID.name());
+
+ // Events
+ JSONArray events = new JSONArray();
+ JSONObject updateEvent = new JSONObject();
+ updateEvent.put(ATSConstants.TIMESTAMP, event.getEventTime());
+ updateEvent.put(ATSConstants.EVENT_TYPE,
HistoryEventType.VERTEX_CONFIGURE_DONE.name());
+
+ JSONObject eventInfo = new JSONObject();
+ eventInfo.put(ATSConstants.NUM_TASKS, getDataValueByKey(event,
ATSConstants.NUM_TASKS));
+ eventInfo.put(ATSConstants.UPDATED_EDGE_MANAGERS,
+ getJSONDataValueByKey(event, ATSConstants.UPDATED_EDGE_MANAGERS));
+ updateEvent.put(ATSConstants.EVENT_INFO, eventInfo);
+ events.put(updateEvent);
+ jsonObject.put(ATSConstants.EVENTS, events);
+
+ // Other info
+ JSONObject otherInfo = new JSONObject();
+ jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
+
+ // TODO add more on all other updated information
+ return jsonObject;
+ }
+
+ private static JSONObject convertVertexInitializedEvent(HistoryEventProto
event)
+ throws JSONException {
+ JSONObject jsonObject = new JSONObject();
+ jsonObject.put(ATSConstants.ENTITY, event.getVertexId());
+ jsonObject.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_VERTEX_ID.name());
+
+ // Related entities
+ JSONArray relatedEntities = new JSONArray();
+ JSONObject vertexEntity = new JSONObject();
+ vertexEntity.put(ATSConstants.ENTITY, event.getDagId());
+ vertexEntity.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_DAG_ID.name());
+ relatedEntities.put(vertexEntity);
+ jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
+
+ // Events
+ JSONArray events = new JSONArray();
+ JSONObject initEvent = new JSONObject();
+ initEvent.put(ATSConstants.TIMESTAMP, event.getEventTime());
+ initEvent.put(ATSConstants.EVENT_TYPE,
HistoryEventType.VERTEX_INITIALIZED.name());
+ events.put(initEvent);
+ jsonObject.put(ATSConstants.EVENTS, events);
+
+ // Other info
+ // TODO fix requested times to be events
+ JSONObject otherInfo = new JSONObject();
+ otherInfo.put(ATSConstants.VERTEX_NAME, getDataValueByKey(event,
ATSConstants.VERTEX_NAME));
+ otherInfo.put(ATSConstants.INIT_REQUESTED_TIME,
+ getDataValueByKey(event, ATSConstants.INIT_REQUESTED_TIME));
+ otherInfo.put(ATSConstants.INIT_TIME, getDataValueByKey(event,
ATSConstants.INIT_TIME));
+ otherInfo.put(ATSConstants.NUM_TASKS, getDataValueByKey(event,
ATSConstants.NUM_TASKS));
+ otherInfo.put(ATSConstants.PROCESSOR_CLASS_NAME,
+ getDataValueByKey(event, ATSConstants.PROCESSOR_CLASS_NAME));
+ otherInfo.put(ATSConstants.SERVICE_PLUGIN,
+ getJSONDataValueByKey(event, ATSConstants.SERVICE_PLUGIN));
+ jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
+
+ return jsonObject;
+ }
+
+ private static JSONObject convertVertexStartedEvent(HistoryEventProto event)
+ throws JSONException {
+ JSONObject jsonObject = new JSONObject();
+ jsonObject.put(ATSConstants.ENTITY, event.getVertexId());
+ jsonObject.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_VERTEX_ID.name());
+
+ // Related entities
+ JSONArray relatedEntities = new JSONArray();
+ JSONObject vertexEntity = new JSONObject();
+ vertexEntity.put(ATSConstants.ENTITY, event.getDagId());
+ vertexEntity.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_DAG_ID.name());
+ relatedEntities.put(vertexEntity);
+ jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
+
+ // Events
+ JSONArray events = new JSONArray();
+ JSONObject startEvent = new JSONObject();
+ startEvent.put(ATSConstants.TIMESTAMP, event.getEventTime());
+ startEvent.put(ATSConstants.EVENT_TYPE,
HistoryEventType.VERTEX_STARTED.name());
+ events.put(startEvent);
+ jsonObject.put(ATSConstants.EVENTS, events);
+
+ // Other info
+ // TODO fix requested times to be events
+ JSONObject otherInfo = new JSONObject();
+ otherInfo.put(ATSConstants.START_REQUESTED_TIME,
+ getDataValueByKey(event, ATSConstants.START_REQUESTED_TIME));
+ otherInfo.put(ATSConstants.START_TIME, event.getEventTime());
+ jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
+
+ return jsonObject;
+ }
+
+ private static String getDataValueByKey(HistoryEventProto event, String key)
{
+ Optional<KVPair> pair =
+ event.getEventDataList().stream().filter(p ->
p.getKey().equals(key)).findAny();
+ return pair.isPresent() ? pair.get().getValue() : null;
+ }
+
+ private static long getLongDataValueByKey(HistoryEventProto event, String
key) {
+ String value = getDataValueByKey(event, key);
+ return (value == null || value.isEmpty()) ? 0 : Long.parseLong(value);
+ }
+
+ private static JSONObject getJSONDataValueByKey(HistoryEventProto event,
String key)
+ throws JSONException {
+ String value = getDataValueByKey(event, key);
+ return (value == null || value.isEmpty()) ? null : new JSONObject(value);
+ }
+}
diff --git
a/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoMessageReader.java
b/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoMessageReader.java
index d736fea..2cac4d8 100644
---
a/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoMessageReader.java
+++
b/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoMessageReader.java
@@ -34,7 +34,7 @@ public class ProtoMessageReader<T extends MessageLite>
implements Closeable {
private final Reader reader;
private final ProtoMessageWritable<T> writable;
- ProtoMessageReader(Configuration conf, Path filePath, Parser<T> parser)
throws IOException {
+ public ProtoMessageReader(Configuration conf, Path filePath, Parser<T>
parser) throws IOException {
this.filePath = filePath;
// The writer does not flush the length during hflush. Using length
options lets us read
// past length in the FileStatus but it will throw EOFException during a
read instead
diff --git
a/tez-plugins/tez-protobuf-history-plugin/src/test/java/org/apache/tez/dag/history/logging/proto/TestHistoryEventProtoConverter.java
b/tez-plugins/tez-protobuf-history-plugin/src/test/java/org/apache/tez/dag/history/logging/proto/TestHistoryEventProtoConverter.java
index 92d3e30..64f66bc 100644
---
a/tez-plugins/tez-protobuf-history-plugin/src/test/java/org/apache/tez/dag/history/logging/proto/TestHistoryEventProtoConverter.java
+++
b/tez-plugins/tez-protobuf-history-plugin/src/test/java/org/apache/tez/dag/history/logging/proto/TestHistoryEventProtoConverter.java
@@ -409,7 +409,7 @@ public class TestHistoryEventProtoConverter {
"logsURL", "nodeHttpAddress");
HistoryEventProto proto = converter.convert(event);
assertCommon(proto, HistoryEventType.TASK_ATTEMPT_FINISHED, finishTime,
- EntityTypes.TEZ_DAG_ID, null, null, 16);
+ EntityTypes.TEZ_DAG_ID, null, null, 17);
assertEventData(proto, ATSConstants.STATUS, state.name());
assertEventData(proto, ATSConstants.CREATION_CAUSAL_ATTEMPT,
tezTaskAttemptID.toString());
@@ -499,7 +499,7 @@ public class TestHistoryEventProtoConverter {
HistoryEventProto proto = converter.convert(event);
assertCommon(proto, HistoryEventType.VERTEX_INITIALIZED, initedTime,
- EntityTypes.TEZ_VERTEX_ID, null, null, 5);
+ EntityTypes.TEZ_VERTEX_ID, null, null, 6);
assertEventData(proto, ATSConstants.VERTEX_NAME, "v1");
assertEventData(proto, ATSConstants.PROCESSOR_CLASS_NAME, "proc");
diff --git
a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/CSVResult.java
b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/CSVResult.java
index 1da281c..0e167b2 100644
---
a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/CSVResult.java
+++
b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/CSVResult.java
@@ -32,6 +32,8 @@ import java.io.IOException;
import java.io.OutputStreamWriter;
import java.nio.charset.Charset;
import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
@@ -65,6 +67,10 @@ public class CSVResult implements Result {
return Iterators.unmodifiableIterator(recordsList.iterator());
}
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public void sort(Comparator comparator) {
+ Collections.sort(recordsList, comparator);
+ }
public void setComments(String comments) {
this.comments = comments;
diff --git
a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/AnalyzerDriver.java
b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/AnalyzerDriver.java
index cf600c5..cad0d98 100644
---
a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/AnalyzerDriver.java
+++
b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/AnalyzerDriver.java
@@ -46,6 +46,8 @@ public class AnalyzerDriver {
"Print spill details in a DAG");
pgd.addClass("TaskAssignmentAnalyzer", TaskAssignmentAnalyzer.class,
"Print task-to-node assignment details of a DAG");
+ pgd.addClass("TaskAttemptResultStatisticsAnalyzer",
TaskAttemptResultStatisticsAnalyzer.class,
+ "Print vertex:node:status level details of task attempt results");
pgd.addClass("TaskConcurrencyAnalyzer", TaskConcurrencyAnalyzer.class,
"Print the task concurrency details in a DAG");
pgd.addClass("VertexLevelCriticalPathAnalyzer",
VertexLevelCriticalPathAnalyzer.class,
diff --git
a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TaskAttemptResultStatisticsAnalyzer.java
b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TaskAttemptResultStatisticsAnalyzer.java
new file mode 100644
index 0000000..df2f95c
--- /dev/null
+++
b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TaskAttemptResultStatisticsAnalyzer.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.analyzer.plugins;
+
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.tez.analyzer.Analyzer;
+import org.apache.tez.analyzer.CSVResult;
+import org.apache.tez.analyzer.Result;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.history.parser.datamodel.DagInfo;
+import org.apache.tez.history.parser.datamodel.TaskAttemptInfo;
+import org.apache.tez.history.parser.datamodel.VertexInfo;
+
+/**
+ * Get simple count of task attempt states on vertex:node:status level, like
below.
+ *
+ * vertex (+task stats: all/succeeded/failed/killed),node,status,numAttempts
+ * Map 1 (vertex_x_y_z) (216/153/0/63),node1,KILLED:INTERNAL_PREEMPTION,1185
+ * Map 1 (vertex_x_y_z) (216/153/0/63),node1,KILLED:TERMINATED_AT_SHUTDOWN,22
+ * Map 1 (vertex_x_y_z) (216/153/0/63),node1,KILLED:EXTERNAL_PREEMPTION,3349
+ * Map 1 (vertex_x_y_z) (216/153/0/63),node1,SUCCEEDED,1
+ */
+public class TaskAttemptResultStatisticsAnalyzer extends TezAnalyzerBase
implements Analyzer {
+ private final String[] headers =
+ { "vertex (+task stats: all/succeeded/failed/killed)", "node", "status",
"numAttempts" };
+ private final Configuration config;
+ private final CSVResult csvResult;
+
+ public TaskAttemptResultStatisticsAnalyzer(Configuration config) {
+ this.config = config;
+ csvResult = new CSVResult(headers);
+ }
+
+ @Override
+ public void analyze(DagInfo dagInfo) throws TezException {
+ Map<String, Integer> map = new HashMap<>();
+
+ for (VertexInfo vertex : dagInfo.getVertices()) {
+ String taskStatsInVertex =
+ String.format("%s/%s/%s/%s", vertex.getNumTasks(),
vertex.getSucceededTasksCount(),
+ vertex.getFailedTasksCount(), vertex.getKilledTasksCount());
+ for (TaskAttemptInfo attempt : vertex.getTaskAttempts()) {
+ String key = String.format("%s#%s#%s",
+ String.format("%s (%s) (%s)", vertex.getVertexName(),
vertex.getVertexId(),
+ taskStatsInVertex),
+ attempt.getNodeId(), attempt.getDetailedStatus());
+ Integer previousValue = (Integer) map.get(key);
+ map.put(key, previousValue == null ? 1 : previousValue + 1);
+ }
+ }
+
+ map.forEach((key, value) -> {
+ addARecord(key.split("#")[0], key.split("#")[1], key.split("#")[2],
value);
+ });
+
+ csvResult.sort(new Comparator<String[]>() {
+ public int compare(String[] first, String[] second) {
+ int vertexOrder = first[0].compareTo(second[0]);
+ int nodeOrder = first[1].compareTo(second[1]);
+ int statusOrder = first[2].compareTo(second[2]);
+
+ return vertexOrder == 0 ? (nodeOrder == 0 ? statusOrder : nodeOrder) :
vertexOrder;
+ }
+ });
+ }
+
+ private void addARecord(String vertexData, String node, String status,
+ int numAttempts) {
+ String[] record = new String[4];
+ record[0] = vertexData;
+ record[1] = node;
+ record[2] = status;
+ record[3] = Integer.toString(numAttempts);
+ csvResult.addRecord(record);
+ }
+
+ @Override
+ public Result getResult() throws TezException {
+ return csvResult;
+ }
+
+ @Override
+ public String getName() {
+ return "Task Attempt Result Statistics Analyzer";
+ }
+
+ @Override
+ public String getDescription() {
+ return "Get statistics about task attempts states in vertex:node:status
level";
+ }
+
+ @Override
+ public Configuration getConfiguration() {
+ return config;
+ }
+
+ public static void main(String[] args) throws Exception {
+ Configuration config = new Configuration();
+ TaskAttemptResultStatisticsAnalyzer analyzer = new
TaskAttemptResultStatisticsAnalyzer(config);
+ int res = ToolRunner.run(config, analyzer, args);
+ analyzer.printResults();
+ System.exit(res);
+ }
+}
diff --git
a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TezAnalyzerBase.java
b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TezAnalyzerBase.java
index 59ae4a3..75a55a7 100644
---
a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TezAnalyzerBase.java
+++
b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TezAnalyzerBase.java
@@ -19,7 +19,12 @@
package org.apache.tez.analyzer.plugins;
import java.io.File;
+import java.io.FilenameFilter;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Iterator;
+import java.util.List;
+import java.util.stream.Collectors;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser;
@@ -27,6 +32,7 @@ import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
+import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.Tool;
@@ -36,6 +42,7 @@ import org.apache.tez.analyzer.Result;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.history.ATSImportTool;
import org.apache.tez.history.parser.ATSFileParser;
+import org.apache.tez.history.parser.ProtoHistoryParser;
import org.apache.tez.history.parser.SimpleHistoryParser;
import org.apache.tez.history.parser.datamodel.DagInfo;
@@ -52,6 +59,7 @@ public abstract class TezAnalyzerBase extends Configured
implements Tool, Analyz
private static final String SAVE_RESULTS = "saveResults";
private static final String DAG_ID = "dagId";
private static final String FROM_SIMPLE_HISTORY = "fromSimpleHistory";
+ private static final String FROM_PROTO_HISTORY = "fromProtoHistory";
private static final String HELP = "help";
private static final int SEPARATOR_WIDTH = 80;
@@ -81,7 +89,12 @@ public abstract class TezAnalyzerBase extends Configured
implements Tool, Analyz
(FROM_SIMPLE_HISTORY)
.withDescription("Event data from Simple History logging. Must also
specify event file")
.isRequired(false).create();
-
+
+ Option fromProtoHistoryOption =
+
OptionBuilder.withArgName(FROM_PROTO_HISTORY).withLongOpt(FROM_PROTO_HISTORY)
+ .withDescription("Event data from Proto History logging. Must also
specify event file")
+ .isRequired(false).create();
+
Option help = OptionBuilder.withArgName(HELP).withLongOpt
(HELP)
.withDescription("print help")
@@ -93,6 +106,7 @@ public abstract class TezAnalyzerBase extends Configured
implements Tool, Analyz
opts.addOption(saveResults);
opts.addOption(eventFileNameOption);
opts.addOption(fromSimpleHistoryOption);
+ opts.addOption(fromProtoHistoryOption);
opts.addOption(help);
return opts;
}
@@ -133,21 +147,36 @@ public abstract class TezAnalyzerBase extends Configured
implements Tool, Analyz
outputDir = System.getProperty("user.dir");
}
- File file = null;
+ String dagId = cmdLine.getOptionValue(DAG_ID);
+
+ List<File> files = new ArrayList<File>();
if (cmdLine.hasOption(EVENT_FILE_NAME)) {
- file = new File(cmdLine.getOptionValue(EVENT_FILE_NAME));
+ for (String file : cmdLine.getOptionValue(EVENT_FILE_NAME).split(",")) {
+ File fileOrDir = new File(file);
+ if (fileOrDir.exists()) {
+ if (fileOrDir.isFile()) {
+ files.add(fileOrDir);
+ } else {
+ files.addAll(collectFilesForDagId(fileOrDir, dagId));
+ }
+ }
+ }
}
-
- String dagId = cmdLine.getOptionValue(DAG_ID);
-
+
DagInfo dagInfo = null;
- if (file == null) {
+ if (files.isEmpty()) {
if (cmdLine.hasOption(FROM_SIMPLE_HISTORY)) {
System.err.println("Event file name must be specified when using
simple history");
printUsage();
return -2;
}
+ if (cmdLine.hasOption(FROM_PROTO_HISTORY)) {
+ System.err.println("Proto file name must be specified when using proto
history");
+ printUsage();
+ return -2;
+ }
+
// using ATS - try to download directly
String[] importArgs = { "--dagId=" + dagId, "--downloadDir=" + outputDir
};
@@ -159,30 +188,60 @@ public abstract class TezAnalyzerBase extends Configured
implements Tool, Analyz
//Parse ATS data and verify results
//Parse downloaded contents
- file = new File(outputDir
- + Path.SEPARATOR + dagId + ".zip");
+ files.add(new File(outputDir
+ + Path.SEPARATOR + dagId + ".zip"));
}
- Preconditions.checkState(file != null);
- if (!cmdLine.hasOption(FROM_SIMPLE_HISTORY)) {
- ATSFileParser parser = new ATSFileParser(file);
+ Preconditions.checkState(!files.isEmpty());
+ if (cmdLine.hasOption(FROM_SIMPLE_HISTORY)) {
+ SimpleHistoryParser parser = new SimpleHistoryParser(files);
+ dagInfo = parser.getDAGData(dagId);
+ } else if (cmdLine.hasOption(FROM_PROTO_HISTORY)) {
+ ProtoHistoryParser parser = new ProtoHistoryParser(files);
dagInfo = parser.getDAGData(dagId);
} else {
- SimpleHistoryParser parser = new SimpleHistoryParser(file);
+ ATSFileParser parser = new ATSFileParser(files);
dagInfo = parser.getDAGData(dagId);
}
Preconditions.checkState(dagInfo.getDagId().equals(dagId));
analyze(dagInfo);
Result result = getResult();
- if (saveResults && (result instanceof CSVResult)) {
- String fileName = outputDir + File.separator
- + this.getClass().getName() + "_" + dagInfo.getDagId() + ".csv";
- ((CSVResult) result).dumpToFile(fileName);
- LOG.info("Saved results in " + fileName);
+
+ if (saveResults) {
+ String dagInfoFileName = outputDir + File.separator +
this.getClass().getName() + "_"
+ + dagInfo.getDagId() + ".dag";
+ FileUtils.writeStringToFile(new File(dagInfoFileName),
dagInfo.toExtendedString());
+ LOG.info("Saved dag info in " + dagInfoFileName);
+
+ if (result instanceof CSVResult) {
+ String fileName = outputDir + File.separator +
this.getClass().getName() + "_"
+ + dagInfo.getDagId() + ".csv";
+ ((CSVResult) result).dumpToFile(fileName);
+ LOG.info("Saved results in " + fileName);
+ }
}
+
return 0;
}
+ private List<File> collectFilesForDagId(File parentDir, String dagId) {
+ File[] arrFiles = parentDir.listFiles(new FilenameFilter() {
+ @Override
+ public boolean accept(File dir, String name) {
+ return name.contains(dagId);
+ }
+ });
+ if (arrFiles == null || arrFiles.length == 0) {
+ throw new RuntimeException(
+ String.format("cannot find relevant files for dag: '%s' in dir: %s",
dagId, parentDir));
+ }
+
+ List<File> files = Arrays.asList(arrFiles);
+ LOG.info("collected files for dag: \n"
+ + files.stream().map(f -> "\n" +
f.getAbsolutePath()).collect(Collectors.toList()));
+ return files;
+ }
+
public void printResults() throws TezException {
Result result = getResult();
if (result instanceof CSVResult) {
diff --git
a/tez-tools/analyzers/job-analyzer/src/test/java/org/apache/tez/analyzer/TestAnalyzer.java
b/tez-tools/analyzers/job-analyzer/src/test/java/org/apache/tez/analyzer/TestAnalyzer.java
index d34f9c5..606b7e3 100644
---
a/tez-tools/analyzers/job-analyzer/src/test/java/org/apache/tez/analyzer/TestAnalyzer.java
+++
b/tez-tools/analyzers/job-analyzer/src/test/java/org/apache/tez/analyzer/TestAnalyzer.java
@@ -21,6 +21,7 @@ package org.apache.tez.analyzer;
import static org.junit.Assert.assertTrue;
import java.io.File;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Random;
@@ -267,7 +268,7 @@ public class TestAnalyzer {
//Parse downloaded contents
File downloadedFile = new File(DOWNLOAD_DIR
+ Path.SEPARATOR + dagId + ".zip");
- ATSFileParser parser = new ATSFileParser(downloadedFile);
+ ATSFileParser parser = new ATSFileParser(Arrays.asList(downloadedFile));
dagInfo = parser.getDAGData(dagId);
assertTrue(dagInfo.getDagId().equals(dagId));
} else {
@@ -286,7 +287,7 @@ public class TestAnalyzer {
}
//Now parse via SimpleHistory
File localFile = new File(DOWNLOAD_DIR, HISTORY_TXT);
- SimpleHistoryParser parser = new SimpleHistoryParser(localFile);
+ SimpleHistoryParser parser = new
SimpleHistoryParser(Arrays.asList(localFile));
dagInfo = parser.getDAGData(dagId);
assertTrue(dagInfo.getDagId().equals(dagId));
}