TEZ-2692. bugfixes & enhancements related to job parser and analyzer (rbalamohan)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/ecd90dc1 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/ecd90dc1 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/ecd90dc1 Branch: refs/heads/master Commit: ecd90dc1f79a4b1614174b75030b85feb8842793 Parents: eadbfec Author: Rajesh Balamohan <[email protected]> Authored: Tue Aug 11 18:19:55 2015 +0530 Committer: Rajesh Balamohan <[email protected]> Committed: Tue Aug 11 18:19:55 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../java/org/apache/tez/dag/utils/Graph.java | 15 +- .../tez/history/parser/SimpleHistoryParser.java | 239 ++++++ .../history/parser/datamodel/VertexInfo.java | 65 +- .../apache/tez/history/TestATSFileParser.java | 587 -------------- .../apache/tez/history/TestHistoryParser.java | 785 +++++++++++++++++++ tez-tools/analyzers/job-analyzer/pom.xml | 9 + .../java/org/apache/tez/analyzer/CSVResult.java | 5 +- .../analyzer/plugins/CriticalPathAnalyzer.java | 44 +- .../tez/analyzer/plugins/LocalityAnalyzer.java | 3 +- .../analyzer/plugins/ShuffleTimeAnalyzer.java | 60 +- .../tez/analyzer/plugins/SkewAnalyzer.java | 4 + .../analyzer/plugins/SlowTaskIdentifier.java | 13 +- .../analyzer/plugins/SlowestVertexAnalyzer.java | 52 +- .../tez/analyzer/plugins/SpillAnalyzerImpl.java | 15 +- .../plugins/TaskConcurrencyAnalyzer.java | 138 ++++ .../org/apache/tez/analyzer/utils/SVGUtils.java | 264 +++++++ .../org/apache/tez/analyzer/utils/Utils.java | 100 +++ 18 files changed, 1751 insertions(+), 648 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/ecd90dc1/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index b37eb9e..3de9fb7 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -16,6 +16,7 @@ INCOMPATIBLE CHANGES TEZ-2699. Internalize strings in ATF parser ALL CHANGES: + TEZ-2692. bugfixes & enhancements related to job parser and analyzer. TEZ-2663. SessionNotRunning exceptions are wrapped in a ServiceException from a dying AM. TEZ-2630. TezChild receives IP address instead of FQDN. TEZ-2684. ShuffleVertexManager.parsePartitionStats throws IllegalStateException: Stats should be initialized. http://git-wip-us.apache.org/repos/asf/tez/blob/ecd90dc1/tez-dag/src/main/java/org/apache/tez/dag/utils/Graph.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/utils/Graph.java b/tez-dag/src/main/java/org/apache/tez/dag/utils/Graph.java index 6de9c59..eb2bd41 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/utils/Graph.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/utils/Graph.java @@ -62,6 +62,7 @@ public class Graph { List<Edge> outs; String label; String shape; + String color; public Node(String id) { this(id, null); @@ -104,6 +105,10 @@ public class Graph { public void setShape(String shape) { this.shape = shape; } + + public void setColor(String color) { + this.color = color; + } } private String name; @@ -196,17 +201,19 @@ public class Graph { for (Node n : nodes) { if (n.shape != null && !n.shape.isEmpty()) { sb.append(String.format( - "%s%s [ label = %s, shape = %s ];", + "%s%s [ label = %s, shape = %s , color= %s];", indent, wrapSafeString(n.getUniqueId()), wrapSafeString(n.getLabel()), - wrapSafeString(n.shape))); + wrapSafeString(n.shape), + wrapSafeString(n.color == null ? "black" : n.color))); } else { sb.append(String.format( - "%s%s [ label = %s ];", + "%s%s [ label = %s , color= %s ];", indent, wrapSafeString(n.getUniqueId()), - wrapSafeString(n.getLabel()))); + wrapSafeString(n.getLabel()), + wrapSafeString(n.color == null ? "black" : n.color))); } sb.append(System.getProperty("line.separator")); List<Edge> combinedOuts = combineEdges(n.outs); http://git-wip-us.apache.org/repos/asf/tez/blob/ecd90dc1/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/SimpleHistoryParser.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/SimpleHistoryParser.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/SimpleHistoryParser.java new file mode 100644 index 0000000..09c010a --- /dev/null +++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/SimpleHistoryParser.java @@ -0,0 +1,239 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tez.history.parser; + +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.collect.Maps; +import org.apache.tez.dag.api.TezException; +import org.apache.tez.dag.history.logging.impl.SimpleHistoryLoggingService; +import org.apache.tez.dag.records.TezDAGID; +import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.dag.records.TezTaskID; +import org.apache.tez.dag.records.TezVertexID; +import org.apache.tez.history.parser.datamodel.BaseParser; +import org.apache.tez.history.parser.datamodel.Constants; +import org.apache.tez.history.parser.datamodel.DagInfo; +import org.apache.tez.history.parser.datamodel.TaskAttemptInfo; +import org.apache.tez.history.parser.datamodel.TaskInfo; +import org.apache.tez.history.parser.datamodel.VertexInfo; +import org.codehaus.jettison.json.JSONException; +import org.codehaus.jettison.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.Iterator; +import java.util.Map; +import java.util.Scanner; + +/** + * Parser utility to parse data generated by SimpleHistoryLogging to in-memory datamodel provided + * in org.apache.tez.history.parser.datamodel + * <p/> + * <p/> + * Most of the information should be available. Minor info like VersionInfo may not be available, + * as it is not captured in SimpleHistoryLogging. + */ +public class SimpleHistoryParser extends BaseParser { + private static final Logger LOG = LoggerFactory.getLogger(SimpleHistoryParser.class); + private static final String UTF8 = "UTF-8"; + private final File historyFile; + + + public SimpleHistoryParser(File historyFile) { + super(); + Preconditions.checkArgument(historyFile.exists(), historyFile + " does not exist"); + this.historyFile = historyFile; + } + + /** + * Get in-memory representation of DagInfo + * + * @return DagInfo + * @throws TezException + */ + public DagInfo getDAGData(String dagId) throws TezException { + try { + Preconditions.checkArgument(!Strings.isNullOrEmpty(dagId), "Please provide valid dagId"); + dagId = dagId.trim(); + parseContents(historyFile, dagId); + linkParsedContents(); + return dagInfo; + } catch (IOException e) { + LOG.error("Error in reading DAG ", e); + throw new TezException(e); + } catch (JSONException e) { + LOG.error("Error in parsing DAG ", e); + throw new TezException(e); + } + } + + private void populateOtherInfo(JSONObject source, JSONObject destination) throws JSONException { + if (source == null || destination == null) { + return; + } + for (Iterator it = source.keys(); it.hasNext(); ) { + String key = (String) it.next(); + Object val = source.get(key); + destination.put(key, val); + } + } + + private void populateOtherInfo(JSONObject source, String entityName, + Map<String, JSONObject> destMap) throws JSONException { + JSONObject destinationJson = destMap.get(entityName); + JSONObject destOtherInfo = destinationJson.getJSONObject(Constants.OTHER_INFO); + populateOtherInfo(source, destOtherInfo); + } + + private void parseContents(File historyFile, String dagId) + throws JSONException, FileNotFoundException, TezException { + Scanner scanner = new Scanner(historyFile, UTF8); + scanner.useDelimiter(SimpleHistoryLoggingService.RECORD_SEPARATOR); + JSONObject dagJson = null; + Map<String, JSONObject> vertexJsonMap = Maps.newHashMap(); + Map<String, JSONObject> taskJsonMap = Maps.newHashMap(); + Map<String, JSONObject> attemptJsonMap = Maps.newHashMap(); + TezDAGID tezDAGID = TezDAGID.fromString(dagId); + while (scanner.hasNext()) { + String line = scanner.next(); + JSONObject jsonObject = new JSONObject(line); + String entity = jsonObject.getString(Constants.ENTITY); + String entityType = jsonObject.getString(Constants.ENTITY_TYPE); + switch (entityType) { + case Constants.TEZ_DAG_ID: + if (!dagId.equals(entity)) { + LOG.warn(dagId + " is not matching with " + entity); + continue; + } + // Club all DAG related information together (DAG_INIT, DAG_FINISH etc). Each of them + // would have a set of entities in otherinfo (e.g vertex mapping, dagPlan, start/finish + // time etc). + if (dagJson == null) { + dagJson = jsonObject; + } + JSONObject otherInfo = jsonObject.optJSONObject(Constants.OTHER_INFO); + JSONObject dagOtherInfo = dagJson.getJSONObject(Constants.OTHER_INFO); + populateOtherInfo(otherInfo, dagOtherInfo); + break; + case Constants.TEZ_VERTEX_ID: + String vertexName = entity; + TezVertexID tezVertexID = TezVertexID.fromString(vertexName); + if (!tezDAGID.equals(tezVertexID.getDAGId())) { + LOG.warn(vertexName + " does not belong to " + tezDAGID); + continue; + } + if (!vertexJsonMap.containsKey(vertexName)) { + vertexJsonMap.put(vertexName, jsonObject); + } + otherInfo = jsonObject.optJSONObject(Constants.OTHER_INFO); + populateOtherInfo(otherInfo, vertexName, vertexJsonMap); + break; + case Constants.TEZ_TASK_ID: + String taskName = entity; + TezTaskID tezTaskID = TezTaskID.fromString(taskName); + if (!tezDAGID.equals(tezTaskID.getVertexID().getDAGId())) { + LOG.warn(taskName + " does not belong to " + tezDAGID); + continue; + } + if (!taskJsonMap.containsKey(taskName)) { + taskJsonMap.put(taskName, jsonObject); + } + otherInfo = jsonObject.optJSONObject(Constants.OTHER_INFO); + populateOtherInfo(otherInfo, taskName, taskJsonMap); + break; + case Constants.TEZ_TASK_ATTEMPT_ID: + String taskAttemptName = entity; + TezTaskAttemptID tezAttemptId = TezTaskAttemptID.fromString(taskAttemptName); + if (!tezDAGID.equals(tezAttemptId.getTaskID().getVertexID().getDAGId())) { + LOG.warn(taskAttemptName + " does not belong to " + tezDAGID); + continue; + } + if (!attemptJsonMap.containsKey(taskAttemptName)) { + attemptJsonMap.put(taskAttemptName, jsonObject); + } + otherInfo = jsonObject.optJSONObject(Constants.OTHER_INFO); + populateOtherInfo(otherInfo, taskAttemptName, attemptJsonMap); + break; + default: + break; + } + } + scanner.close(); + if (dagJson != null) { + this.dagInfo = DagInfo.create(dagJson); + } else { + LOG.error("Dag is not yet parsed. Looks like partial file."); + throw new TezException( + "Please provide a valid/complete history log file containing " + dagId); + } + for (JSONObject jsonObject : vertexJsonMap.values()) { + VertexInfo vertexInfo = VertexInfo.create(jsonObject); + this.vertexList.add(vertexInfo); + LOG.debug("Parsed vertex {}", vertexInfo.getVertexName()); + } + for (JSONObject jsonObject : taskJsonMap.values()) { + TaskInfo taskInfo = TaskInfo.create(jsonObject); + this.taskList.add(taskInfo); + LOG.debug("Parsed task {}", taskInfo.getTaskId()); + } + for (JSONObject jsonObject : attemptJsonMap.values()) { + /** + * For converting SimpleHistoryLogging to in-memory representation + * + * We need to get "relatedEntities":[{"entity":"cn055-10.l42scl.hortonworks.com:58690", + * "entitytype":"nodeId"},{"entity":"container_1438652049951_0008_01_000152", + * "entitytype":"containerId"} and populate it in otherInfo object so that in-memory + * representation can parse it correctly + */ + JSONObject subJsonObject = jsonObject.optJSONArray(Constants.RELATED_ENTITIES) + .optJSONObject(0); + if (subJsonObject != null) { + String nodeId = subJsonObject.optString(Constants.ENTITY_TYPE); + if (!Strings.isNullOrEmpty(nodeId) && nodeId.equalsIgnoreCase(Constants.NODE_ID)) { + //populate it in otherInfo + JSONObject otherInfo = jsonObject.optJSONObject(Constants.OTHER_INFO); + String nodeIdVal = subJsonObject.optString(Constants.ENTITY); + if (otherInfo != null && nodeIdVal != null) { + otherInfo.put(Constants.NODE_ID, nodeIdVal); + } + } + } + + subJsonObject = jsonObject.optJSONArray(Constants.RELATED_ENTITIES) + .optJSONObject(1); + if (subJsonObject != null) { + String containerId = subJsonObject.optString(Constants.ENTITY_TYPE); + if (!Strings.isNullOrEmpty(containerId) && containerId.equalsIgnoreCase(Constants.CONTAINER_ID)) { + //populate it in otherInfo + JSONObject otherInfo = jsonObject.optJSONObject(Constants.OTHER_INFO); + String containerIdVal = subJsonObject.optString(Constants.ENTITY); + if (otherInfo != null && containerIdVal != null) { + otherInfo.put(Constants.CONTAINER_ID, containerIdVal); + } + } + } + TaskAttemptInfo attemptInfo = TaskAttemptInfo.create(jsonObject); + this.attemptList.add(attemptInfo); + LOG.debug("Parsed task attempt {}", attemptInfo.getTaskAttemptId()); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tez/blob/ecd90dc1/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VertexInfo.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VertexInfo.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VertexInfo.java index d2dac7d..6e227a5 100644 --- a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VertexInfo.java +++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VertexInfo.java @@ -33,6 +33,7 @@ import org.apache.tez.dag.api.oldrecords.TaskState; import org.codehaus.jettison.json.JSONException; import org.codehaus.jettison.json.JSONObject; +import javax.annotation.Nullable; import java.util.Collections; import java.util.Comparator; import java.util.List; @@ -164,13 +165,25 @@ public class VertexInfo extends BaseInfo { updateEdgeInfo(); } + public List<AdditionalInputOutputDetails> getAdditionalInputInfoList() { + return Collections.unmodifiableList(additionalInputInfoList); + } + + public List<AdditionalInputOutputDetails> getAdditionalOutputInfoList() { + return Collections.unmodifiableList(additionalOutputInfoList); + } + @Override public final long getStartTimeInterval() { return startTime - (dagInfo.getStartTime()); } public final long getFirstTaskStartTimeInterval() { - return getFirstTaskToStart().getStartTimeInterval(); + TaskInfo firstTask = getFirstTaskToStart(); + if (firstTask == null) { + return 0; + } + return firstTask.getStartTimeInterval(); } public final long getLastTaskFinishTimeInterval() { @@ -270,14 +283,32 @@ public class VertexInfo extends BaseInfo { } + + private List<TaskInfo> getTasksInternal() { + return Lists.newLinkedList(taskInfoMap.values()); + } + /** * Get all tasks * * @return list of taskInfo */ public final List<TaskInfo> getTasks() { - List<TaskInfo> taskInfoList = Lists.newLinkedList(taskInfoMap.values()); - Collections.sort(taskInfoList, orderingOnStartTime()); + return Collections.unmodifiableList(getTasksInternal()); + } + + /** + * Get all tasks in sorted order + * + * @param sorted + * @param ordering + * @return list of TaskInfo + */ + public final List<TaskInfo> getTasks(boolean sorted, @Nullable Ordering<TaskInfo> ordering) { + List<TaskInfo> taskInfoList = getTasksInternal(); + if (sorted) { + Collections.sort(taskInfoList, ((ordering == null) ? orderingOnStartTime() : ordering)); + } return Collections.unmodifiableList(taskInfoList); } @@ -352,12 +383,36 @@ public class VertexInfo extends BaseInfo { return Collections.unmodifiableList(outputVertices); } - public List<TaskAttemptInfo> getTaskAttempts() { + private List<TaskAttemptInfo> getTaskAttemptsInternal() { List<TaskAttemptInfo> taskAttemptInfos = Lists.newLinkedList(); for (TaskInfo taskInfo : getTasks()) { taskAttemptInfos.addAll(taskInfo.getTaskAttempts()); } - Collections.sort(taskAttemptInfos, orderingOnAttemptStartTime()); + return taskAttemptInfos; + } + + /** + * Get all task attempts + * + * @return List<TaskAttemptInfo> list of attempts + */ + public List<TaskAttemptInfo> getTaskAttempts() { + return Collections.unmodifiableList(getTaskAttemptsInternal()); + } + + /** + * Get all task attempts in sorted order + * + * @param sorted + * @param ordering + * @return list of TaskAttemptInfo + */ + public final List<TaskAttemptInfo> getTaskAttempts(boolean sorted, + @Nullable Ordering<TaskAttemptInfo> ordering) { + List<TaskAttemptInfo> taskAttemptInfos = getTaskAttemptsInternal(); + if (sorted) { + Collections.sort(taskAttemptInfos, ((ordering == null) ? orderingOnAttemptStartTime() : ordering)); + } return Collections.unmodifiableList(taskAttemptInfos); } http://git-wip-us.apache.org/repos/asf/tez/blob/ecd90dc1/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestATSFileParser.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestATSFileParser.java b/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestATSFileParser.java deleted file mode 100644 index 0d76e03..0000000 --- a/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestATSFileParser.java +++ /dev/null @@ -1,587 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tez.history; - -import com.google.common.collect.Sets; -import org.apache.commons.io.FileUtils; -import org.apache.commons.lang.RandomStringUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; -import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.tez.client.TezClient; -import org.apache.tez.common.counters.DAGCounter; -import org.apache.tez.common.counters.TaskCounter; -import org.apache.tez.common.counters.TezCounter; -import org.apache.tez.dag.api.DAG; -import org.apache.tez.dag.api.DataSinkDescriptor; -import org.apache.tez.dag.api.DataSourceDescriptor; -import org.apache.tez.dag.api.Edge; -import org.apache.tez.dag.api.EdgeProperty; -import org.apache.tez.dag.api.ProcessorDescriptor; -import org.apache.tez.dag.api.TezConfiguration; -import org.apache.tez.dag.api.TezException; -import org.apache.tez.dag.api.Vertex; -import org.apache.tez.dag.api.client.DAGClient; -import org.apache.tez.dag.api.client.StatusGetOpts; -import org.apache.tez.dag.api.event.VertexState; -import org.apache.tez.dag.api.oldrecords.TaskAttemptState; -import org.apache.tez.dag.api.oldrecords.TaskState; -import org.apache.tez.dag.app.dag.DAGState; -import org.apache.tez.dag.history.logging.ats.ATSHistoryLoggingService; -import org.apache.tez.dag.records.TezDAGID; -import org.apache.tez.examples.WordCount; -import org.apache.tez.history.parser.ATSFileParser; -import org.apache.tez.history.parser.datamodel.DagInfo; -import org.apache.tez.history.parser.datamodel.EdgeInfo; -import org.apache.tez.history.parser.datamodel.TaskAttemptInfo; -import org.apache.tez.history.parser.datamodel.TaskInfo; -import org.apache.tez.history.parser.datamodel.VersionInfo; -import org.apache.tez.history.parser.datamodel.VertexInfo; -import org.apache.tez.mapreduce.input.MRInput; -import org.apache.tez.mapreduce.output.MROutput; -import org.apache.tez.mapreduce.processor.SimpleMRProcessor; -import org.apache.tez.runtime.api.ProcessorContext; -import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; -import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfig; -import org.apache.tez.runtime.library.input.OrderedGroupedKVInput; -import org.apache.tez.runtime.library.output.OrderedPartitionedKVOutput; -import org.apache.tez.runtime.library.partitioner.HashPartitioner; -import org.apache.tez.tests.MiniTezClusterWithTimeline; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; - -import java.io.BufferedWriter; -import java.io.File; -import java.io.IOException; -import java.io.OutputStreamWriter; -import java.util.List; -import java.util.Map; - -import static org.junit.Assert.assertTrue; - -public class TestATSFileParser { - - private static MiniDFSCluster miniDFSCluster; - private static MiniTezClusterWithTimeline miniTezCluster; - - //location within miniHDFS cluster's hdfs - private static Path inputLoc = new Path("/tmp/sample.txt"); - - private final static String INPUT = "Input"; - private final static String OUTPUT = "Output"; - private final static String TOKENIZER = "Tokenizer"; - private final static String SUMMATION = "Summation"; - - private static Configuration conf = new Configuration(); - private static FileSystem fs; - private static String TEST_ROOT_DIR = - "target" + Path.SEPARATOR + TestATSFileParser.class.getName() + "-tmpDir"; - private static String TEZ_BASE_DIR = - "target" + Path.SEPARATOR + TestATSFileParser.class.getName() + "-tez"; - private static String DOWNLOAD_DIR = TEST_ROOT_DIR + Path.SEPARATOR + "download"; - - private static TezClient tezClient; - - private static int dagNumber; - - @BeforeClass - public static void setupCluster() throws Exception { - conf = new Configuration(); - conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_EDITS_NOEDITLOGCHANNELFLUSH, false); - EditLogFileOutputStream.setShouldSkipFsyncForTesting(true); - conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR); - miniDFSCluster = - new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build(); - fs = miniDFSCluster.getFileSystem(); - conf.set("fs.defaultFS", fs.getUri().toString()); - - setupTezCluster(); - } - - @AfterClass - public static void shutdownCluster() { - try { - if (tezClient != null) { - try { - tezClient.stop(); - } catch (TezException e) { - //ignore - } catch (IOException e) { - //ignore - } - } - if (miniDFSCluster != null) { - miniDFSCluster.shutdown(); - } - if (miniTezCluster != null) { - miniTezCluster.stop(); - } - } finally { - try { - FileUtils.deleteDirectory(new File(TEST_ROOT_DIR)); - FileUtils.deleteDirectory(new File(TEZ_BASE_DIR)); - } catch (IOException e) { - //safe to ignore - } - } - } - - // @Before - public static void setupTezCluster() throws Exception { - conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_CONNECT_TIMEOUT, 3 * 1000); - conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT, 3 * 1000); - conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_FAILURES_LIMIT, 2); - - //Enable per edge counters - conf.setBoolean(TezConfiguration.TEZ_TASK_GENERATE_COUNTERS_PER_IO, true); - conf.setBoolean(TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS, true); - conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); - conf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS, ATSHistoryLoggingService - .class.getName()); - - miniTezCluster = - new MiniTezClusterWithTimeline(TEZ_BASE_DIR, 1, 1, 1, true); - - miniTezCluster.init(conf); - miniTezCluster.start(); - - createSampleFile(inputLoc); - - TezConfiguration tezConf = new TezConfiguration(miniTezCluster.getConfig()); - tezConf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); - tezConf.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS, "0.0.0.0:8188"); - tezConf.setBoolean(TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS, true); - tezConf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS, - ATSHistoryLoggingService.class.getName()); - - tezClient = TezClient.create("WordCount", tezConf, true); - tezClient.start(); - tezClient.waitTillReady(); - } - - - /** - * Run a word count example in mini cluster and check if it is possible to download - * data from ATS and parse it. - * - * @throws Exception - */ - @Test - public void testParserWithSuccessfulJob() throws Exception { - //Run basic word count example. - String dagId = runWordCount(WordCount.TokenProcessor.class.getName(), - WordCount.SumProcessor.class.getName(), "WordCount"); - - //Export the data from ATS - String[] args = { "--dagId=" + dagId, "--downloadDir=" + DOWNLOAD_DIR }; - - int result = ATSImportTool.process(args); - assertTrue(result == 0); - - //Parse ATS data - DagInfo dagInfo = getDagInfo(dagId); - - //Verify DAGInfo. Verifies vertex, task, taskAttempts in recursive manner - verifyDagInfo(dagInfo); - - //Job specific - assertTrue(dagInfo.getNumVertices() == 2); - assertTrue(dagInfo.getName().equals("WordCount")); - assertTrue(dagInfo.getVertex(TOKENIZER).getProcessorClassName().equals( - WordCount.TokenProcessor.class.getName())); - assertTrue(dagInfo.getVertex(SUMMATION).getProcessorClassName() - .equals(WordCount.SumProcessor.class.getName())); - assertTrue(dagInfo.getEdges().size() == 1); - EdgeInfo edgeInfo = dagInfo.getEdges().iterator().next(); - assertTrue(edgeInfo.getDataMovementType(). - equals(EdgeProperty.DataMovementType.SCATTER_GATHER.toString())); - assertTrue(edgeInfo.getSourceVertex().getVertexName().equals(TOKENIZER)); - assertTrue(edgeInfo.getDestinationVertex().getVertexName().equals(SUMMATION)); - assertTrue(edgeInfo.getInputVertexName().equals(TOKENIZER)); - assertTrue(edgeInfo.getOutputVertexName().equals(SUMMATION)); - assertTrue(edgeInfo.getEdgeSourceClass().equals(OrderedPartitionedKVOutput.class.getName())); - assertTrue(edgeInfo.getEdgeDestinationClass().equals(OrderedGroupedKVInput.class.getName())); - assertTrue(dagInfo.getVertices().size() == 2); - String lastSourceTA = null; - String lastDataEventSourceTA = null; - for (VertexInfo vertexInfo : dagInfo.getVertices()) { - assertTrue(vertexInfo.getKilledTasksCount() == 0); - assertTrue(vertexInfo.getInitRequestedTime() > 0); - assertTrue(vertexInfo.getInitTime() > 0); - assertTrue(vertexInfo.getStartRequestedTime() > 0); - assertTrue(vertexInfo.getStartTime() > 0); - assertTrue(vertexInfo.getFinishTime() > 0); - long finishTime = 0; - for (TaskInfo taskInfo : vertexInfo.getTasks()) { - assertTrue(taskInfo.getNumberOfTaskAttempts() == 1); - assertTrue(taskInfo.getMaxTaskAttemptDuration() >= 0); - assertTrue(taskInfo.getMinTaskAttemptDuration() >= 0); - assertTrue(taskInfo.getAvgTaskAttemptDuration() >= 0); - assertTrue(taskInfo.getLastTaskAttemptToFinish() != null); - assertTrue(taskInfo.getContainersMapping().size() > 0); - assertTrue(taskInfo.getSuccessfulTaskAttempts().size() > 0); - assertTrue(taskInfo.getFailedTaskAttempts().size() == 0); - assertTrue(taskInfo.getKilledTaskAttempts().size() == 0); - List<TaskAttemptInfo> attempts = taskInfo.getTaskAttempts(); - if (vertexInfo.getVertexName().equals(TOKENIZER)) { - // get the last task to finish and track its successful attempt - if (finishTime < taskInfo.getFinishTime()) { - finishTime = taskInfo.getFinishTime(); - lastSourceTA = taskInfo.getSuccessfulAttemptId(); - } - } else { - for (TaskAttemptInfo attempt : attempts) { - assertTrue(attempt.getLastDataEventTime() > 0); - if (lastDataEventSourceTA == null) { - lastDataEventSourceTA = attempt.getLastDataEventSourceTA(); - } else { - // all attempts should have the same last data event source TA - assertTrue(lastDataEventSourceTA.equals(attempt.getLastDataEventSourceTA())); - } - } - } - for (TaskAttemptInfo attemptInfo : taskInfo.getTaskAttempts()) { - assertTrue(attemptInfo.getStartTimeInterval() > 0); - assertTrue(attemptInfo.getScheduledTimeInterval() > 0); - } - } - assertTrue(vertexInfo.getLastTaskToFinish() != null); - if (vertexInfo.getVertexName().equals(TOKENIZER)) { - assertTrue(vertexInfo.getInputEdges().size() == 0); - assertTrue(vertexInfo.getOutputEdges().size() == 1); - assertTrue(vertexInfo.getOutputVertices().size() == 1); - assertTrue(vertexInfo.getInputVertices().size() == 0); - } else { - assertTrue(vertexInfo.getInputEdges().size() == 1); - assertTrue(vertexInfo.getOutputEdges().size() == 0); - assertTrue(vertexInfo.getOutputVertices().size() == 0); - assertTrue(vertexInfo.getInputVertices().size() == 1); - } - } - assertTrue(lastSourceTA.equals(lastDataEventSourceTA)); - } - - /** - * Run a word count example in mini cluster. - * Provide invalid URL for ATS. - * - * @throws Exception - */ - @Test - public void testParserWithSuccessfulJob_InvalidATS() throws Exception { - //Run basic word count example. - String dagId = runWordCount(WordCount.TokenProcessor.class.getName(), - WordCount.SumProcessor.class.getName(), "WordCount-With-WrongATS-URL"); - - //Export the data from ATS - String atsAddress = "--atsAddress=http://atsHost:8188"; - String[] args = { "--dagId=" + dagId, - "--downloadDir=" + DOWNLOAD_DIR, - atsAddress - }; - - int result = ATSImportTool.process(args); - assertTrue(result == -1); - } - - /** - * Run a failed job and parse the data from ATS - */ - @Test - public void testParserWithFailedJob() throws Exception { - //Run a job which would fail - String dagId = runWordCount(WordCount.TokenProcessor.class.getName(), FailProcessor.class - .getName(), "WordCount-With-Exception"); - - //Export the data from ATS - String[] args = { "--dagId=" + dagId, "--downloadDir=" + DOWNLOAD_DIR }; - - int result = ATSImportTool.process(args); - assertTrue(result == 0); - - //Parse ATS data - DagInfo dagInfo = getDagInfo(dagId); - - //Verify DAGInfo. Verifies vertex, task, taskAttempts in recursive manner - verifyDagInfo(dagInfo); - - //Dag specific - VertexInfo summationVertex = dagInfo.getVertex(SUMMATION); - assertTrue(summationVertex.getFailedTasks().size() == 1); //1 task, 4 attempts failed - assertTrue(summationVertex.getFailedTasks().get(0).getFailedTaskAttempts().size() == 4); - assertTrue(summationVertex.getStatus().equals(VertexState.FAILED.toString())); - - assertTrue(dagInfo.getFailedVertices().size() == 1); - assertTrue(dagInfo.getFailedVertices().get(0).getVertexName().equals(SUMMATION)); - assertTrue(dagInfo.getSuccessfullVertices().size() == 1); - assertTrue(dagInfo.getSuccessfullVertices().get(0).getVertexName().equals(TOKENIZER)); - - assertTrue(dagInfo.getStatus().equals(DAGState.FAILED.toString())); - - verifyCounter(dagInfo.getCounter(DAGCounter.NUM_FAILED_TASKS.toString()), null, 4); - verifyCounter(dagInfo.getCounter(DAGCounter.NUM_SUCCEEDED_TASKS.toString()), null, 1); - verifyCounter(dagInfo.getCounter(DAGCounter.TOTAL_LAUNCHED_TASKS.toString()), null, 5); - - verifyCounter(dagInfo.getCounter(TaskCounter.INPUT_RECORDS_PROCESSED.toString()), - "TaskCounter_Tokenizer_INPUT_Input", 10); - verifyCounter(dagInfo.getCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_READ.toString()), - "TaskCounter_Tokenizer_OUTPUT_Summation", 0); - verifyCounter(dagInfo.getCounter(TaskCounter.OUTPUT_RECORDS.toString()), - "TaskCounter_Tokenizer_OUTPUT_Summation", - 20); //Every line has 2 words. 10 lines x 2 words = 20 - verifyCounter(dagInfo.getCounter(TaskCounter.SPILLED_RECORDS.toString()), - "TaskCounter_Tokenizer_OUTPUT_Summation", 20); //Same as above - - for (TaskInfo taskInfo : summationVertex.getTasks()) { - String lastAttemptId = null; - for (TaskAttemptInfo attemptInfo : taskInfo.getTaskAttempts()) { - if (lastAttemptId != null) { - // failed attempt should be causal TA of next attempt - assertTrue(lastAttemptId.equals(attemptInfo.getSchedulingCausalTA())); - } - lastAttemptId = attemptInfo.getTaskAttemptId(); - } - } - - //TODO: Need to check for SUMMATION vertex counters. Since all attempts are failed, counters are not getting populated. - //TaskCounter.REDUCE_INPUT_RECORDS - - //Verify if the processor exception is given in diagnostics - assertTrue(dagInfo.getDiagnostics().contains("Failing this processor for some reason")); - - } - - /** - * Create sample file for wordcount program - * - * @param inputLoc - * @throws IOException - */ - private static void createSampleFile(Path inputLoc) throws IOException { - fs.deleteOnExit(inputLoc); - FSDataOutputStream out = fs.create(inputLoc); - BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(out)); - for (int i = 0; i < 10; i++) { - writer.write("Sample " + RandomStringUtils.randomAlphanumeric(5)); - writer.newLine(); - } - writer.close(); - } - - private DagInfo getDagInfo(String dagId) throws TezException { - //Parse downloaded contents - File downloadedFile = new File(DOWNLOAD_DIR - + Path.SEPARATOR + dagId - + Path.SEPARATOR + dagId + ".zip"); - ATSFileParser parser = new ATSFileParser(downloadedFile); - DagInfo dagInfo = parser.getDAGData(dagId); - assertTrue(dagInfo.getDagId().equals(dagId)); - return dagInfo; - } - - private void verifyCounter(Map<String, TezCounter> counterMap, - String counterGroupName, long expectedVal) { - //Iterate through group-->tezCounter - for (Map.Entry<String, TezCounter> entry : counterMap.entrySet()) { - if (counterGroupName != null) { - if (entry.getKey().equals(counterGroupName)) { - assertTrue(entry.getValue().getValue() == expectedVal); - } - } else { - assertTrue(entry.getValue().getValue() == expectedVal); - } - } - } - - private String runWordCount(String tokenizerProcessor, String summationProcessor, - String dagName) - throws Exception { - dagNumber++; - - //HDFS path - Path outputLoc = new Path("/tmp/outPath_" + System.currentTimeMillis()); - - DataSourceDescriptor dataSource = MRInput.createConfigBuilder(conf, - TextInputFormat.class, inputLoc.toString()).build(); - - DataSinkDescriptor dataSink = - MROutput.createConfigBuilder(conf, TextOutputFormat.class, outputLoc.toString()).build(); - - Vertex tokenizerVertex = Vertex.create(TOKENIZER, ProcessorDescriptor.create( - tokenizerProcessor)).addDataSource(INPUT, dataSource); - - OrderedPartitionedKVEdgeConfig edgeConf = OrderedPartitionedKVEdgeConfig - .newBuilder(Text.class.getName(), IntWritable.class.getName(), - HashPartitioner.class.getName()).build(); - - Vertex summationVertex = Vertex.create(SUMMATION, - ProcessorDescriptor.create(summationProcessor), 1).addDataSink(OUTPUT, dataSink); - - // Create DAG and add the vertices. Connect the producer and consumer vertices via the edge - DAG dag = DAG.create(dagName); - dag.addVertex(tokenizerVertex).addVertex(summationVertex).addEdge( - Edge.create(tokenizerVertex, summationVertex, edgeConf.createDefaultEdgeProperty())); - - DAGClient client = tezClient.submitDAG(dag); - client.waitForCompletionWithStatusUpdates(Sets.newHashSet(StatusGetOpts.GET_COUNTERS)); - TezDAGID tezDAGID = TezDAGID.getInstance(tezClient.getAppMasterApplicationId(), dagNumber); - - return tezDAGID.toString(); - } - - /** - * Processor which would just throw exception. - */ - public static class FailProcessor extends SimpleMRProcessor { - public FailProcessor(ProcessorContext context) { - super(context); - } - - @Override - public void run() throws Exception { - throw new Exception("Failing this processor for some reason"); - } - } - - private void verifyDagInfo(DagInfo dagInfo) { - VersionInfo versionInfo = dagInfo.getVersionInfo(); - assertTrue(versionInfo != null); //should be present post 0.5.4 - assertTrue(versionInfo.getVersion() != null); - assertTrue(versionInfo.getRevision() != null); - assertTrue(versionInfo.getBuildTime() != null); - - assertTrue(dagInfo.getStartTime() > 0); - assertTrue(dagInfo.getFinishTimeInterval() > 0); - assertTrue(dagInfo.getStartTimeInterval() == 0); - assertTrue(dagInfo.getStartTime() > 0); - if (dagInfo.getStatus().equalsIgnoreCase(DAGState.SUCCEEDED.toString())) { - assertTrue(dagInfo.getFinishTime() >= dagInfo.getStartTime()); - } - assertTrue(dagInfo.getFinishTimeInterval() > dagInfo.getStartTimeInterval()); - - assertTrue(dagInfo.getStartTime() > dagInfo.getSubmitTime()); - assertTrue(dagInfo.getTimeTaken() > 0); - - //Verify all vertices - for (VertexInfo vertexInfo : dagInfo.getVertices()) { - verifyVertex(vertexInfo, vertexInfo.getFailedTasksCount() > 0); - } - - VertexInfo fastestVertex = dagInfo.getFastestVertex(); - assertTrue(fastestVertex != null); - - if (dagInfo.getStatus().equals(DAGState.SUCCEEDED)) { - assertTrue(dagInfo.getSlowestVertex() != null); - } - } - - private void verifyVertex(VertexInfo vertexInfo, boolean hasFailedTasks) { - assertTrue(vertexInfo != null); - if (hasFailedTasks) { - assertTrue(vertexInfo.getFailedTasksCount() > 0); - } - assertTrue(vertexInfo.getStartTimeInterval() > 0); - assertTrue(vertexInfo.getStartTime() > 0); - assertTrue(vertexInfo.getFinishTimeInterval() > 0); - assertTrue(vertexInfo.getStartTimeInterval() < vertexInfo.getFinishTimeInterval()); - assertTrue(vertexInfo.getVertexName() != null); - if (!hasFailedTasks) { - assertTrue(vertexInfo.getFinishTime() > 0); - assertTrue(vertexInfo.getFailedTasks().size() == 0); - assertTrue(vertexInfo.getSucceededTasksCount() == vertexInfo.getSuccessfulTasks().size()); - assertTrue(vertexInfo.getFailedTasksCount() == 0); - assertTrue(vertexInfo.getAvgTaskDuration() > 0); - assertTrue(vertexInfo.getMaxTaskDuration() > 0); - assertTrue(vertexInfo.getMinTaskDuration() > 0); - assertTrue(vertexInfo.getTimeTaken() > 0); - assertTrue(vertexInfo.getStatus().equalsIgnoreCase(VertexState.SUCCEEDED.toString())); - assertTrue(vertexInfo.getCompletedTasksCount() > 0); - assertTrue(vertexInfo.getFirstTaskToStart() != null); - assertTrue(vertexInfo.getSucceededTasksCount() > 0); - assertTrue(vertexInfo.getTasks().size() > 0); - } - - for (TaskInfo taskInfo : vertexInfo.getTasks()) { - if (taskInfo.getStatus().equals(TaskState.SUCCEEDED.toString())) { - verifyTask(taskInfo, false); - } - } - - for (TaskInfo taskInfo : vertexInfo.getFailedTasks()) { - verifyTask(taskInfo, true); - } - - assertTrue(vertexInfo.getProcessorClassName() != null); - assertTrue(vertexInfo.getStatus() != null); - assertTrue(vertexInfo.getDagInfo() != null); - assertTrue(vertexInfo.getInitTimeInterval() > 0); - assertTrue(vertexInfo.getNumTasks() > 0); - } - - private void verifyTask(TaskInfo taskInfo, boolean hasFailedAttempts) { - assertTrue(taskInfo != null); - assertTrue(taskInfo.getStatus() != null); - assertTrue(taskInfo.getStartTimeInterval() > 0); - - //Not testing for killed attempts. So if there are no failures, it should succeed - if (!hasFailedAttempts) { - assertTrue(taskInfo.getStatus().equals(TaskState.SUCCEEDED.toString())); - assertTrue(taskInfo.getFinishTimeInterval() > 0 && taskInfo.getFinishTime() > taskInfo - .getFinishTimeInterval()); - assertTrue( - taskInfo.getStartTimeInterval() > 0 && taskInfo.getStartTime() > taskInfo.getStartTimeInterval()); - assertTrue(taskInfo.getSuccessfulAttemptId() != null); - assertTrue(taskInfo.getSuccessfulTaskAttempt() != null); - } - assertTrue(taskInfo.getTaskId() != null); - - for (TaskAttemptInfo attemptInfo : taskInfo.getTaskAttempts()) { - verifyTaskAttemptInfo(attemptInfo); - } - } - - private void verifyTaskAttemptInfo(TaskAttemptInfo attemptInfo) { - if (attemptInfo.getStatus() != null && attemptInfo.getStatus() - .equals(TaskAttemptState.SUCCEEDED)) { - assertTrue(attemptInfo.getStartTimeInterval() > 0); - assertTrue(attemptInfo.getFinishTimeInterval() > 0); - assertTrue(attemptInfo.getStartTime() > 0); - assertTrue(attemptInfo.getFinishTime() > 0); - assertTrue(attemptInfo.getFinishTime() > attemptInfo.getStartTime()); - assertTrue(attemptInfo.getFinishTime() > attemptInfo.getFinishTimeInterval()); - assertTrue(attemptInfo.getStartTime() > attemptInfo.getStartTimeInterval()); - assertTrue(attemptInfo.getNodeId() != null); - assertTrue(attemptInfo.getTimeTaken() != -1); - assertTrue(attemptInfo.getEvents() != null); - assertTrue(attemptInfo.getTezCounters() != null); - assertTrue(attemptInfo.getContainer() != null); - } - assertTrue(attemptInfo.getTaskInfo() != null); - } -} http://git-wip-us.apache.org/repos/asf/tez/blob/ecd90dc1/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestHistoryParser.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestHistoryParser.java b/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestHistoryParser.java new file mode 100644 index 0000000..c89acb2 --- /dev/null +++ b/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestHistoryParser.java @@ -0,0 +1,785 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.history; + +import com.google.common.collect.Sets; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang.RandomStringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.tez.client.TezClient; +import org.apache.tez.common.counters.DAGCounter; +import org.apache.tez.common.counters.TaskCounter; +import org.apache.tez.common.counters.TezCounter; +import org.apache.tez.dag.api.DAG; +import org.apache.tez.dag.api.DataSinkDescriptor; +import org.apache.tez.dag.api.DataSourceDescriptor; +import org.apache.tez.dag.api.Edge; +import org.apache.tez.dag.api.EdgeProperty; +import org.apache.tez.dag.api.ProcessorDescriptor; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.TezException; +import org.apache.tez.dag.api.Vertex; +import org.apache.tez.dag.api.client.DAGClient; +import org.apache.tez.dag.api.client.StatusGetOpts; +import org.apache.tez.dag.api.event.VertexState; +import org.apache.tez.dag.api.oldrecords.TaskAttemptState; +import org.apache.tez.dag.api.oldrecords.TaskState; +import org.apache.tez.dag.app.dag.DAGState; +import org.apache.tez.dag.history.logging.ats.ATSHistoryLoggingService; +import org.apache.tez.dag.history.logging.impl.SimpleHistoryLoggingService; +import org.apache.tez.dag.records.TezDAGID; +import org.apache.tez.examples.WordCount; +import org.apache.tez.history.parser.ATSFileParser; +import org.apache.tez.history.parser.SimpleHistoryParser; +import org.apache.tez.history.parser.datamodel.BaseInfo; +import org.apache.tez.history.parser.datamodel.DagInfo; +import org.apache.tez.history.parser.datamodel.EdgeInfo; +import org.apache.tez.history.parser.datamodel.TaskAttemptInfo; +import org.apache.tez.history.parser.datamodel.TaskInfo; +import org.apache.tez.history.parser.datamodel.VersionInfo; +import org.apache.tez.history.parser.datamodel.VertexInfo; +import org.apache.tez.mapreduce.input.MRInput; +import org.apache.tez.mapreduce.output.MROutput; +import org.apache.tez.mapreduce.processor.SimpleMRProcessor; +import org.apache.tez.runtime.api.ProcessorContext; +import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; +import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfig; +import org.apache.tez.runtime.library.input.OrderedGroupedKVInput; +import org.apache.tez.runtime.library.output.OrderedPartitionedKVOutput; +import org.apache.tez.runtime.library.partitioner.HashPartitioner; +import org.apache.tez.tests.MiniTezClusterWithTimeline; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +public class TestHistoryParser { + + private static MiniDFSCluster miniDFSCluster; + private static MiniTezClusterWithTimeline miniTezCluster; + + //location within miniHDFS cluster's hdfs + private static Path inputLoc = new Path("/tmp/sample.txt"); + + private final static String INPUT = "Input"; + private final static String OUTPUT = "Output"; + private final static String TOKENIZER = "Tokenizer"; + private final static String SUMMATION = "Summation"; + private final static String SIMPLE_HISTORY_DIR = "/tmp/simplehistory/"; + private final static String HISTORY_TXT = "history.txt"; + + private static Configuration conf = new Configuration(); + private static FileSystem fs; + private static String TEST_ROOT_DIR = + "target" + Path.SEPARATOR + TestHistoryParser.class.getName() + "-tmpDir"; + private static String TEZ_BASE_DIR = + "target" + Path.SEPARATOR + TestHistoryParser.class.getName() + "-tez"; + private static String DOWNLOAD_DIR = TEST_ROOT_DIR + Path.SEPARATOR + "download"; + + @BeforeClass + public static void setupCluster() throws Exception { + conf = new Configuration(); + conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_EDITS_NOEDITLOGCHANNELFLUSH, false); + EditLogFileOutputStream.setShouldSkipFsyncForTesting(true); + conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR); + miniDFSCluster = + new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build(); + fs = miniDFSCluster.getFileSystem(); + conf.set("fs.defaultFS", fs.getUri().toString()); + + setupTezCluster(); + } + + @AfterClass + public static void shutdownCluster() { + try { + if (miniDFSCluster != null) { + miniDFSCluster.shutdown(); + } + if (miniTezCluster != null) { + miniTezCluster.stop(); + } + } finally { + try { + FileUtils.deleteDirectory(new File(TEST_ROOT_DIR)); + FileUtils.deleteDirectory(new File(TEZ_BASE_DIR)); + } catch (IOException e) { + //safe to ignore + } + } + } + + // @Before + public static void setupTezCluster() throws Exception { + conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_CONNECT_TIMEOUT, 3 * 1000); + conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT, 3 * 1000); + conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_FAILURES_LIMIT, 2); + + //Enable per edge counters + conf.setBoolean(TezConfiguration.TEZ_TASK_GENERATE_COUNTERS_PER_IO, true); + conf.setBoolean(TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS, true); + conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); + conf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS, ATSHistoryLoggingService + .class.getName()); + + conf.set(TezConfiguration.TEZ_SIMPLE_HISTORY_LOGGING_DIR, SIMPLE_HISTORY_DIR); + + miniTezCluster = + new MiniTezClusterWithTimeline(TEZ_BASE_DIR, 1, 1, 1, true); + + miniTezCluster.init(conf); + miniTezCluster.start(); + + createSampleFile(inputLoc); + + TezConfiguration tezConf = new TezConfiguration(miniTezCluster.getConfig()); + tezConf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); + tezConf.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS, "0.0.0.0:8188"); + tezConf.setBoolean(TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS, true); + tezConf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS, + ATSHistoryLoggingService.class.getName()); + + } + + + /** + * Run a word count example in mini cluster and check if it is possible to download + * data from ATS and parse it. Also, run with SimpleHistoryLogging option and verify + * if it matches with ATS data. + * + * @throws Exception + */ + @Test + public void testParserWithSuccessfulJob() throws Exception { + //Run basic word count example. + String dagId = runWordCount(WordCount.TokenProcessor.class.getName(), + WordCount.SumProcessor.class.getName(), "WordCount", true); + + //Export the data from ATS + String[] args = { "--dagId=" + dagId, "--downloadDir=" + DOWNLOAD_DIR }; + + int result = ATSImportTool.process(args); + assertTrue(result == 0); + + //Parse ATS data and verify results + DagInfo dagInfoFromATS = getDagInfo(dagId); + verifyDagInfo(dagInfoFromATS, true); + verifyJobSpecificInfo(dagInfoFromATS); + + //Now run with SimpleHistoryLogging + dagId = runWordCount(WordCount.TokenProcessor.class.getName(), + WordCount.SumProcessor.class.getName(), "WordCount", false); + Thread.sleep(10000); //For all flushes to happen and to avoid half-cooked download. + + DagInfo shDagInfo = getDagInfoFromSimpleHistory(dagId); + verifyDagInfo(shDagInfo, false); + verifyJobSpecificInfo(shDagInfo); + + //Compare dagInfo by parsing ATS data with DagInfo obtained by parsing SimpleHistoryLog + isDAGEqual(dagInfoFromATS, shDagInfo); + } + + private DagInfo getDagInfoFromSimpleHistory(String dagId) throws TezException, IOException { + TezDAGID tezDAGID = TezDAGID.fromString(dagId); + ApplicationAttemptId applicationAttemptId = ApplicationAttemptId.newInstance(tezDAGID + .getApplicationId(), 1); + Path historyPath = new Path(conf.get("fs.defaultFS") + + SIMPLE_HISTORY_DIR + HISTORY_TXT + "." + + applicationAttemptId); + FileSystem fs = historyPath.getFileSystem(conf); + + Path localPath = new Path(DOWNLOAD_DIR, HISTORY_TXT); + fs.copyToLocalFile(historyPath, localPath); + File localFile = new File(DOWNLOAD_DIR, HISTORY_TXT); + + //Now parse via SimpleHistory + SimpleHistoryParser parser = new SimpleHistoryParser(localFile); + DagInfo dagInfo = parser.getDAGData(dagId); + assertTrue(dagInfo.getDagId().equals(dagId)); + return dagInfo; + } + + private void verifyJobSpecificInfo(DagInfo dagInfo) { + //Job specific + assertTrue(dagInfo.getNumVertices() == 2); + assertTrue(dagInfo.getName().equals("WordCount")); + assertTrue(dagInfo.getVertex(TOKENIZER).getProcessorClassName().equals( + WordCount.TokenProcessor.class.getName())); + assertTrue(dagInfo.getVertex(SUMMATION).getProcessorClassName() + .equals(WordCount.SumProcessor.class.getName())); + assertTrue(dagInfo.getEdges().size() == 1); + EdgeInfo edgeInfo = dagInfo.getEdges().iterator().next(); + assertTrue(edgeInfo.getDataMovementType(). + equals(EdgeProperty.DataMovementType.SCATTER_GATHER.toString())); + assertTrue(edgeInfo.getSourceVertex().getVertexName().equals(TOKENIZER)); + assertTrue(edgeInfo.getDestinationVertex().getVertexName().equals(SUMMATION)); + assertTrue(edgeInfo.getInputVertexName().equals(TOKENIZER)); + assertTrue(edgeInfo.getOutputVertexName().equals(SUMMATION)); + assertTrue(edgeInfo.getEdgeSourceClass().equals(OrderedPartitionedKVOutput.class.getName())); + assertTrue(edgeInfo.getEdgeDestinationClass().equals(OrderedGroupedKVInput.class.getName())); + assertTrue(dagInfo.getVertices().size() == 2); + String lastSourceTA = null; + String lastDataEventSourceTA = null; + for (VertexInfo vertexInfo : dagInfo.getVertices()) { + assertTrue(vertexInfo.getKilledTasksCount() == 0); + assertTrue(vertexInfo.getInitRequestedTime() > 0); + assertTrue(vertexInfo.getInitTime() > 0); + assertTrue(vertexInfo.getStartRequestedTime() > 0); + assertTrue(vertexInfo.getStartTime() > 0); + assertTrue(vertexInfo.getFinishTime() > 0); + long finishTime = 0; + for (TaskInfo taskInfo : vertexInfo.getTasks()) { + assertTrue(taskInfo.getNumberOfTaskAttempts() == 1); + assertTrue(taskInfo.getMaxTaskAttemptDuration() >= 0); + assertTrue(taskInfo.getMinTaskAttemptDuration() >= 0); + assertTrue(taskInfo.getAvgTaskAttemptDuration() >= 0); + assertTrue(taskInfo.getLastTaskAttemptToFinish() != null); + assertTrue(taskInfo.getContainersMapping().size() > 0); + assertTrue(taskInfo.getSuccessfulTaskAttempts().size() > 0); + assertTrue(taskInfo.getFailedTaskAttempts().size() == 0); + assertTrue(taskInfo.getKilledTaskAttempts().size() == 0); + List<TaskAttemptInfo> attempts = taskInfo.getTaskAttempts(); + if (vertexInfo.getVertexName().equals(TOKENIZER)) { + // get the last task to finish and track its successful attempt + if (finishTime < taskInfo.getFinishTime()) { + finishTime = taskInfo.getFinishTime(); + lastSourceTA = taskInfo.getSuccessfulAttemptId(); + } + } else { + for (TaskAttemptInfo attempt : attempts) { + assertTrue(attempt.getLastDataEventTime() > 0); + if (lastDataEventSourceTA == null) { + lastDataEventSourceTA = attempt.getLastDataEventSourceTA(); + } else { + // all attempts should have the same last data event source TA + assertTrue(lastDataEventSourceTA.equals(attempt.getLastDataEventSourceTA())); + } + } + } + for (TaskAttemptInfo attemptInfo : taskInfo.getTaskAttempts()) { + assertTrue(attemptInfo.getStartTimeInterval() > 0); + assertTrue(attemptInfo.getScheduledTimeInterval() > 0); + } + } + assertTrue(vertexInfo.getLastTaskToFinish() != null); + if (vertexInfo.getVertexName().equals(TOKENIZER)) { + assertTrue(vertexInfo.getInputEdges().size() == 0); + assertTrue(vertexInfo.getOutputEdges().size() == 1); + assertTrue(vertexInfo.getOutputVertices().size() == 1); + assertTrue(vertexInfo.getInputVertices().size() == 0); + } else { + assertTrue(vertexInfo.getInputEdges().size() == 1); + assertTrue(vertexInfo.getOutputEdges().size() == 0); + assertTrue(vertexInfo.getOutputVertices().size() == 0); + assertTrue(vertexInfo.getInputVertices().size() == 1); + } + } + assertTrue(lastSourceTA.equals(lastDataEventSourceTA)); + } + + /** + * Run a word count example in mini cluster. + * Provide invalid URL for ATS. + * + * @throws Exception + */ + @Test + public void testParserWithSuccessfulJob_InvalidATS() throws Exception { + //Run basic word count example. + String dagId = runWordCount(WordCount.TokenProcessor.class.getName(), + WordCount.SumProcessor.class.getName(), "WordCount-With-WrongATS-URL", true); + + //Export the data from ATS + String atsAddress = "--atsAddress=http://atsHost:8188"; + String[] args = { "--dagId=" + dagId, + "--downloadDir=" + DOWNLOAD_DIR, + atsAddress + }; + + int result = ATSImportTool.process(args); + assertTrue(result == -1); + } + + /** + * Run a failed job and parse the data from ATS + */ + @Test + public void testParserWithFailedJob() throws Exception { + //Run a job which would fail + String dagId = runWordCount(WordCount.TokenProcessor.class.getName(), FailProcessor.class + .getName(), "WordCount-With-Exception", true); + + //Export the data from ATS + String[] args = { "--dagId=" + dagId, "--downloadDir=" + DOWNLOAD_DIR }; + + int result = ATSImportTool.process(args); + assertTrue(result == 0); + + //Parse ATS data + DagInfo dagInfo = getDagInfo(dagId); + + //Verify DAGInfo. Verifies vertex, task, taskAttempts in recursive manner + verifyDagInfo(dagInfo, true); + + //Dag specific + VertexInfo summationVertex = dagInfo.getVertex(SUMMATION); + assertTrue(summationVertex.getFailedTasks().size() == 1); //1 task, 4 attempts failed + assertTrue(summationVertex.getFailedTasks().get(0).getFailedTaskAttempts().size() == 4); + assertTrue(summationVertex.getStatus().equals(VertexState.FAILED.toString())); + + assertTrue(dagInfo.getFailedVertices().size() == 1); + assertTrue(dagInfo.getFailedVertices().get(0).getVertexName().equals(SUMMATION)); + assertTrue(dagInfo.getSuccessfullVertices().size() == 1); + assertTrue(dagInfo.getSuccessfullVertices().get(0).getVertexName().equals(TOKENIZER)); + + assertTrue(dagInfo.getStatus().equals(DAGState.FAILED.toString())); + + verifyCounter(dagInfo.getCounter(DAGCounter.NUM_FAILED_TASKS.toString()), null, 4); + verifyCounter(dagInfo.getCounter(DAGCounter.NUM_SUCCEEDED_TASKS.toString()), null, 1); + verifyCounter(dagInfo.getCounter(DAGCounter.TOTAL_LAUNCHED_TASKS.toString()), null, 5); + + verifyCounter(dagInfo.getCounter(TaskCounter.INPUT_RECORDS_PROCESSED.toString()), + "TaskCounter_Tokenizer_INPUT_Input", 10); + verifyCounter(dagInfo.getCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_READ.toString()), + "TaskCounter_Tokenizer_OUTPUT_Summation", 0); + verifyCounter(dagInfo.getCounter(TaskCounter.OUTPUT_RECORDS.toString()), + "TaskCounter_Tokenizer_OUTPUT_Summation", + 20); //Every line has 2 words. 10 lines x 2 words = 20 + verifyCounter(dagInfo.getCounter(TaskCounter.SPILLED_RECORDS.toString()), + "TaskCounter_Tokenizer_OUTPUT_Summation", 20); //Same as above + + for (TaskInfo taskInfo : summationVertex.getTasks()) { + String lastAttemptId = null; + for (TaskAttemptInfo attemptInfo : taskInfo.getTaskAttempts()) { + if (lastAttemptId != null) { + // failed attempt should be causal TA of next attempt + assertTrue(lastAttemptId.equals(attemptInfo.getSchedulingCausalTA())); + } + lastAttemptId = attemptInfo.getTaskAttemptId(); + } + } + + //TODO: Need to check for SUMMATION vertex counters. Since all attempts are failed, counters are not getting populated. + //TaskCounter.REDUCE_INPUT_RECORDS + + //Verify if the processor exception is given in diagnostics + assertTrue(dagInfo.getDiagnostics().contains("Failing this processor for some reason")); + + } + + /** + * Adding explicit equals here instead of in DAG/Vertex/Edge where hashCode also needs to + * change. Also, some custom comparisons are done here for unit testing. + */ + private void isDAGEqual(DagInfo dagInfo1, DagInfo dagInfo2) { + assertNotNull(dagInfo1); + assertNotNull(dagInfo2); + assertEquals(dagInfo1.getStatus(), dagInfo2.getStatus()); + isEdgeEqual(dagInfo1.getEdges(), dagInfo2.getEdges()); + isVertexEqual(dagInfo1.getVertices(), dagInfo2.getVertices()); + } + + private void isVertexEqual(VertexInfo vertexInfo1, VertexInfo vertexInfo2) { + assertTrue(vertexInfo1 != null); + assertTrue(vertexInfo2 != null); + assertTrue(vertexInfo1.getVertexName().equals(vertexInfo2.getVertexName())); + assertTrue(vertexInfo1.getProcessorClassName().equals(vertexInfo2.getProcessorClassName())); + assertTrue(vertexInfo1.getNumTasks() == vertexInfo2.getNumTasks()); + assertTrue(vertexInfo1.getCompletedTasksCount() == vertexInfo2.getCompletedTasksCount()); + assertTrue(vertexInfo1.getStatus().equals(vertexInfo2.getStatus())); + + isEdgeEqual(vertexInfo1.getInputEdges(), vertexInfo2.getInputEdges()); + isEdgeEqual(vertexInfo1.getOutputEdges(), vertexInfo2.getOutputEdges()); + + assertTrue(vertexInfo1.getInputVertices().size() == vertexInfo2.getInputVertices().size()); + assertTrue(vertexInfo1.getOutputVertices().size() == vertexInfo2.getOutputVertices().size()); + + assertTrue(vertexInfo1.getNumTasks() == vertexInfo2.getNumTasks()); + isTaskEqual(vertexInfo1.getTasks(), vertexInfo2.getTasks()); + } + + private void isVertexEqual(List<VertexInfo> vertexList1, List<VertexInfo> vertexList2) { + assertTrue("Vertices sizes should be the same", vertexList1.size() == vertexList2.size()); + Iterator<VertexInfo> it1 = vertexList1.iterator(); + Iterator<VertexInfo> it2 = vertexList2.iterator(); + while (it1.hasNext()) { + assertTrue(it2.hasNext()); + VertexInfo info1 = it1.next(); + VertexInfo info2 = it2.next(); + isVertexEqual(info1, info2); + } + } + + private void isEdgeEqual(EdgeInfo edgeInfo1, EdgeInfo edgeInfo2) { + assertTrue(edgeInfo1 != null); + assertTrue(edgeInfo2 != null); + String info1 = edgeInfo1.toString(); + String info2 = edgeInfo1.toString(); + assertTrue(info1.equals(info2)); + } + + private void isEdgeEqual(Collection<EdgeInfo> info1, Collection<EdgeInfo> info2) { + assertTrue("sizes should be the same", info1.size() == info1.size()); + Iterator<EdgeInfo> it1 = info1.iterator(); + Iterator<EdgeInfo> it2 = info2.iterator(); + while (it1.hasNext()) { + assertTrue(it2.hasNext()); + isEdgeEqual(it1.next(), it2.next()); + } + } + + private void isTaskEqual(Collection<TaskInfo> info1, Collection<TaskInfo> info2) { + assertTrue("sizes should be the same", info1.size() == info1.size()); + Iterator<TaskInfo> it1 = info1.iterator(); + Iterator<TaskInfo> it2 = info2.iterator(); + while (it1.hasNext()) { + assertTrue(it2.hasNext()); + isTaskEqual(it1.next(), it2.next()); + } + } + + private void isTaskEqual(TaskInfo taskInfo1, TaskInfo taskInfo2) { + assertTrue(taskInfo1 != null); + assertTrue(taskInfo2 != null); + assertTrue(taskInfo1.getVertexInfo() != null); + assertTrue(taskInfo2.getVertexInfo() != null); + assertTrue(taskInfo1.getStatus().equals(taskInfo2.getStatus())); + assertTrue( + taskInfo1.getVertexInfo().getVertexName() + .equals(taskInfo2.getVertexInfo().getVertexName())); + isTaskAttemptEqual(taskInfo1.getTaskAttempts(), taskInfo2.getTaskAttempts()); + + //Verify counters + isCountersSame(taskInfo1, taskInfo2); + } + + private void isCountersSame(BaseInfo info1, BaseInfo info2) { + isCounterSame(info1.getCounter(TaskCounter.ADDITIONAL_SPILL_COUNT.name()), + info2.getCounter(TaskCounter.ADDITIONAL_SPILL_COUNT.name())); + + isCounterSame(info1.getCounter(TaskCounter.SPILLED_RECORDS.name()), + info2.getCounter(TaskCounter.SPILLED_RECORDS.name())); + + isCounterSame(info1.getCounter(TaskCounter.OUTPUT_RECORDS.name()), + info2.getCounter(TaskCounter.OUTPUT_RECORDS.name())); + + isCounterSame(info1.getCounter(TaskCounter.OUTPUT_BYTES.name()), + info2.getCounter(TaskCounter.OUTPUT_BYTES.name())); + + isCounterSame(info1.getCounter(TaskCounter.OUTPUT_RECORDS.name()), + info2.getCounter(TaskCounter.OUTPUT_RECORDS.name())); + + isCounterSame(info1.getCounter(TaskCounter.REDUCE_INPUT_GROUPS.name()), + info2.getCounter(TaskCounter.REDUCE_INPUT_GROUPS.name())); + + isCounterSame(info1.getCounter(TaskCounter.REDUCE_INPUT_RECORDS.name()), + info2.getCounter(TaskCounter.REDUCE_INPUT_RECORDS.name())); + } + + private void isCounterSame(Map<String, TezCounter> counter1, Map<String, TezCounter> counter2) { + for (Map.Entry<String, TezCounter> entry : counter1.entrySet()) { + String source = entry.getKey(); + long val = entry.getValue().getValue(); + + //check if other counter has the same value + assertTrue(counter2.containsKey(entry.getKey())); + assertTrue(counter2.get(entry.getKey()).getValue() == val); + } + } + + private void isTaskAttemptEqual(Collection<TaskAttemptInfo> info1, + Collection<TaskAttemptInfo> info2) { + assertTrue("sizes should be the same", info1.size() == info1.size()); + Iterator<TaskAttemptInfo> it1 = info1.iterator(); + Iterator<TaskAttemptInfo> it2 = info2.iterator(); + while (it1.hasNext()) { + assertTrue(it2.hasNext()); + isTaskAttemptEqual(it1.next(), it2.next()); + } + } + + private void isTaskAttemptEqual(TaskAttemptInfo info1, TaskAttemptInfo info2) { + assertTrue(info1 != null); + assertTrue(info2 != null); + assertTrue(info1.getTaskInfo() != null); + assertTrue(info2.getTaskInfo() != null); + assertTrue(info1.getStatus().equals(info2.getStatus())); + assertTrue(info1.getTaskInfo().getVertexInfo().getVertexName().equals(info2.getTaskInfo() + .getVertexInfo().getVertexName())); + + //Verify counters + isCountersSame(info1, info2); + } + + + /** + * Create sample file for wordcount program + * + * @param inputLoc + * @throws IOException + */ + private static void createSampleFile(Path inputLoc) throws IOException { + fs.deleteOnExit(inputLoc); + FSDataOutputStream out = fs.create(inputLoc); + BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(out)); + for (int i = 0; i < 10; i++) { + writer.write("Sample " + RandomStringUtils.randomAlphanumeric(5)); + writer.newLine(); + } + writer.close(); + } + + private DagInfo getDagInfo(String dagId) throws TezException { + //Parse downloaded contents + File downloadedFile = new File(DOWNLOAD_DIR + + Path.SEPARATOR + dagId + + Path.SEPARATOR + dagId + ".zip"); + ATSFileParser parser = new ATSFileParser(downloadedFile); + DagInfo dagInfo = parser.getDAGData(dagId); + assertTrue(dagInfo.getDagId().equals(dagId)); + return dagInfo; + } + + private void verifyCounter(Map<String, TezCounter> counterMap, + String counterGroupName, long expectedVal) { + //Iterate through group-->tezCounter + for (Map.Entry<String, TezCounter> entry : counterMap.entrySet()) { + if (counterGroupName != null) { + if (entry.getKey().equals(counterGroupName)) { + assertTrue(entry.getValue().getValue() == expectedVal); + } + } else { + assertTrue(entry.getValue().getValue() == expectedVal); + } + } + } + + TezClient getTezClient(boolean withTimeline) throws Exception { + TezConfiguration tezConf = new TezConfiguration(miniTezCluster.getConfig()); + if (withTimeline) { + tezConf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, withTimeline); + tezConf.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS, "0.0.0.0:8188"); + tezConf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS, + ATSHistoryLoggingService.class.getName()); + } else { + tezConf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS, + SimpleHistoryLoggingService.class.getName()); + } + tezConf.setBoolean(TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS, true); + + TezClient tezClient = TezClient.create("WordCount", tezConf, false); + tezClient.start(); + tezClient.waitTillReady(); + return tezClient; + } + + private String runWordCount(String tokenizerProcessor, String summationProcessor, + String dagName, boolean withTimeline) + throws Exception { + //HDFS path + Path outputLoc = new Path("/tmp/outPath_" + System.currentTimeMillis()); + + DataSourceDescriptor dataSource = MRInput.createConfigBuilder(conf, + TextInputFormat.class, inputLoc.toString()).build(); + + DataSinkDescriptor dataSink = + MROutput.createConfigBuilder(conf, TextOutputFormat.class, outputLoc.toString()).build(); + + Vertex tokenizerVertex = Vertex.create(TOKENIZER, ProcessorDescriptor.create( + tokenizerProcessor)).addDataSource(INPUT, dataSource); + + OrderedPartitionedKVEdgeConfig edgeConf = OrderedPartitionedKVEdgeConfig + .newBuilder(Text.class.getName(), IntWritable.class.getName(), + HashPartitioner.class.getName()).build(); + + Vertex summationVertex = Vertex.create(SUMMATION, + ProcessorDescriptor.create(summationProcessor), 1).addDataSink(OUTPUT, dataSink); + + // Create DAG and add the vertices. Connect the producer and consumer vertices via the edge + DAG dag = DAG.create(dagName); + dag.addVertex(tokenizerVertex).addVertex(summationVertex).addEdge( + Edge.create(tokenizerVertex, summationVertex, edgeConf.createDefaultEdgeProperty())); + + TezClient tezClient = getTezClient(withTimeline); + DAGClient client = tezClient.submitDAG(dag); + client.waitForCompletionWithStatusUpdates(Sets.newHashSet(StatusGetOpts.GET_COUNTERS)); + TezDAGID tezDAGID = TezDAGID.getInstance(tezClient.getAppMasterApplicationId(), 1); + + if (tezClient != null) { + tezClient.stop(); + } + return tezDAGID.toString(); + } + + /** + * Processor which would just throw exception. + */ + public static class FailProcessor extends SimpleMRProcessor { + public FailProcessor(ProcessorContext context) { + super(context); + } + + @Override + public void run() throws Exception { + throw new Exception("Failing this processor for some reason"); + } + } + + private void verifyDagInfo(DagInfo dagInfo, boolean ats) { + if (ats) { + VersionInfo versionInfo = dagInfo.getVersionInfo(); + assertTrue(versionInfo != null); //should be present post 0.5.4 + assertTrue(versionInfo.getVersion() != null); + assertTrue(versionInfo.getRevision() != null); + assertTrue(versionInfo.getBuildTime() != null); + } + + assertTrue(dagInfo.getStartTime() > 0); + assertTrue(dagInfo.getFinishTimeInterval() > 0); + assertTrue(dagInfo.getStartTimeInterval() == 0); + assertTrue(dagInfo.getStartTime() > 0); + if (dagInfo.getStatus().equalsIgnoreCase(DAGState.SUCCEEDED.toString())) { + assertTrue(dagInfo.getFinishTime() >= dagInfo.getStartTime()); + } + assertTrue(dagInfo.getFinishTimeInterval() > dagInfo.getStartTimeInterval()); + + assertTrue(dagInfo.getStartTime() > dagInfo.getSubmitTime()); + assertTrue(dagInfo.getTimeTaken() > 0); + + //Verify all vertices + for (VertexInfo vertexInfo : dagInfo.getVertices()) { + verifyVertex(vertexInfo, vertexInfo.getFailedTasksCount() > 0); + } + + VertexInfo fastestVertex = dagInfo.getFastestVertex(); + assertTrue(fastestVertex != null); + + if (dagInfo.getStatus().equals(DAGState.SUCCEEDED)) { + assertTrue(dagInfo.getSlowestVertex() != null); + } + } + + private void verifyVertex(VertexInfo vertexInfo, boolean hasFailedTasks) { + assertTrue(vertexInfo != null); + if (hasFailedTasks) { + assertTrue(vertexInfo.getFailedTasksCount() > 0); + } + assertTrue(vertexInfo.getStartTimeInterval() > 0); + assertTrue(vertexInfo.getStartTime() > 0); + assertTrue(vertexInfo.getFinishTimeInterval() > 0); + assertTrue(vertexInfo.getStartTimeInterval() < vertexInfo.getFinishTimeInterval()); + assertTrue(vertexInfo.getVertexName() != null); + if (!hasFailedTasks) { + assertTrue(vertexInfo.getFinishTime() > 0); + assertTrue(vertexInfo.getFailedTasks().size() == 0); + assertTrue(vertexInfo.getSucceededTasksCount() == vertexInfo.getSuccessfulTasks().size()); + assertTrue(vertexInfo.getFailedTasksCount() == 0); + assertTrue(vertexInfo.getAvgTaskDuration() > 0); + assertTrue(vertexInfo.getMaxTaskDuration() > 0); + assertTrue(vertexInfo.getMinTaskDuration() > 0); + assertTrue(vertexInfo.getTimeTaken() > 0); + assertTrue(vertexInfo.getStatus().equalsIgnoreCase(VertexState.SUCCEEDED.toString())); + assertTrue(vertexInfo.getCompletedTasksCount() > 0); + assertTrue(vertexInfo.getFirstTaskToStart() != null); + assertTrue(vertexInfo.getSucceededTasksCount() > 0); + assertTrue(vertexInfo.getTasks().size() > 0); + } + + for (TaskInfo taskInfo : vertexInfo.getTasks()) { + if (taskInfo.getStatus().equals(TaskState.SUCCEEDED.toString())) { + verifyTask(taskInfo, false); + } + } + + for (TaskInfo taskInfo : vertexInfo.getFailedTasks()) { + verifyTask(taskInfo, true); + } + + assertTrue(vertexInfo.getProcessorClassName() != null); + assertTrue(vertexInfo.getStatus() != null); + assertTrue(vertexInfo.getDagInfo() != null); + assertTrue(vertexInfo.getInitTimeInterval() > 0); + assertTrue(vertexInfo.getNumTasks() > 0); + } + + private void verifyTask(TaskInfo taskInfo, boolean hasFailedAttempts) { + assertTrue(taskInfo != null); + assertTrue(taskInfo.getStatus() != null); + assertTrue(taskInfo.getStartTimeInterval() > 0); + + //Not testing for killed attempts. So if there are no failures, it should succeed + if (!hasFailedAttempts) { + assertTrue(taskInfo.getStatus().equals(TaskState.SUCCEEDED.toString())); + assertTrue(taskInfo.getFinishTimeInterval() > 0 && taskInfo.getFinishTime() > taskInfo + .getFinishTimeInterval()); + assertTrue( + taskInfo.getStartTimeInterval() > 0 && taskInfo.getStartTime() > taskInfo.getStartTimeInterval()); + assertTrue(taskInfo.getSuccessfulAttemptId() != null); + assertTrue(taskInfo.getSuccessfulTaskAttempt() != null); + } + assertTrue(taskInfo.getTaskId() != null); + + for (TaskAttemptInfo attemptInfo : taskInfo.getTaskAttempts()) { + verifyTaskAttemptInfo(attemptInfo); + } + } + + private void verifyTaskAttemptInfo(TaskAttemptInfo attemptInfo) { + if (attemptInfo.getStatus() != null && attemptInfo.getStatus() + .equals(TaskAttemptState.SUCCEEDED)) { + assertTrue(attemptInfo.getStartTimeInterval() > 0); + assertTrue(attemptInfo.getFinishTimeInterval() > 0); + assertTrue(attemptInfo.getStartTime() > 0); + assertTrue(attemptInfo.getFinishTime() > 0); + assertTrue(attemptInfo.getFinishTime() > attemptInfo.getStartTime()); + assertTrue(attemptInfo.getFinishTime() > attemptInfo.getFinishTimeInterval()); + assertTrue(attemptInfo.getStartTime() > attemptInfo.getStartTimeInterval()); + assertTrue(attemptInfo.getNodeId() != null); + assertTrue(attemptInfo.getTimeTaken() != -1); + assertTrue(attemptInfo.getEvents() != null); + assertTrue(attemptInfo.getTezCounters() != null); + assertTrue(attemptInfo.getContainer() != null); + } + assertTrue(attemptInfo.getTaskInfo() != null); + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/ecd90dc1/tez-tools/analyzers/job-analyzer/pom.xml ---------------------------------------------------------------------- diff --git a/tez-tools/analyzers/job-analyzer/pom.xml b/tez-tools/analyzers/job-analyzer/pom.xml index fe28b14..36b12fe 100644 --- a/tez-tools/analyzers/job-analyzer/pom.xml +++ b/tez-tools/analyzers/job-analyzer/pom.xml @@ -38,6 +38,15 @@ <artifactId>tez-history-parser</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </dependency> + <dependency> + <groupId>org.plutext</groupId> + <artifactId>jaxb-svg11</artifactId> + <version>1.0.2</version> + </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/tez/blob/ecd90dc1/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/CSVResult.java ---------------------------------------------------------------------- diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/CSVResult.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/CSVResult.java index 4151a90..27ad95e 100644 --- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/CSVResult.java +++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/CSVResult.java @@ -19,15 +19,14 @@ package org.apache.tez.analyzer; import com.google.common.base.Preconditions; +import com.google.common.base.Strings; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; -import com.google.common.base.Strings; import org.apache.tez.dag.api.TezException; import java.io.BufferedWriter; import java.io.File; import java.io.FileOutputStream; -import java.io.FileWriter; import java.io.IOException; import java.io.OutputStreamWriter; import java.nio.charset.Charset; @@ -99,7 +98,7 @@ public class CSVResult implements Result { StringBuilder sb = new StringBuilder(); for(int i=0;i<record.length;i++) { - sb.append(Strings.isNullOrEmpty(record[i]) ? record[i] : " "); + sb.append(!Strings.isNullOrEmpty(record[i]) ? record[i] : " "); if (i < record.length - 1) { sb.append(","); } http://git-wip-us.apache.org/repos/asf/tez/blob/ecd90dc1/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java ---------------------------------------------------------------------- diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java index 6748f3f..88d45f3 100644 --- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java +++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java @@ -19,6 +19,8 @@ package org.apache.tez.analyzer.plugins; import com.google.common.base.Functions; +import com.google.common.base.Splitter; +import com.google.common.base.Strings; import com.google.common.collect.ImmutableSortedMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -26,10 +28,13 @@ import com.google.common.collect.Ordering; import org.apache.hadoop.conf.Configuration; import org.apache.tez.analyzer.Analyzer; import org.apache.tez.analyzer.CSVResult; +import org.apache.tez.analyzer.utils.Utils; import org.apache.tez.dag.api.TezException; import org.apache.tez.history.parser.datamodel.DagInfo; import org.apache.tez.history.parser.datamodel.VertexInfo; +import java.io.File; +import java.io.IOException; import java.util.List; import java.util.Map; @@ -43,24 +48,43 @@ public class CriticalPathAnalyzer implements Analyzer { private final CSVResult csvResult; + private static final String DOT_FILE_DIR = "tez.critical-path.analyzer.dot.output.loc"; + private static final String DOT_FILE_DIR_DEFAULT = "."; //current directory + + private final String dotFileLocation; + + private static final String CONNECTOR = "-->"; + public CriticalPathAnalyzer(Configuration config) { this.config = config; this.csvResult = new CSVResult(headers); + this.dotFileLocation = config.get(DOT_FILE_DIR, DOT_FILE_DIR_DEFAULT); } @Override public void analyze(DagInfo dagInfo) throws TezException { Map<String, Long> result = Maps.newLinkedHashMap(); getCriticalPath("", dagInfo.getVertices().get(dagInfo.getVertices().size() - 1), 0, result); - System.out.println(); - System.out.println(); - - for (Map.Entry<String, Long> entry : sortByValues(result).entrySet()) { + Map<String, Long> sortedByValues = sortByValues(result); + for (Map.Entry<String, Long> entry : sortedByValues.entrySet()) { List<String> record = Lists.newLinkedList(); record.add(entry.getKey()); record.add(entry.getValue() + ""); csvResult.addRecord(record.toArray(new String[record.size()])); - System.out.println(entry.getKey() + ", " + entry.getValue()); + } + + String dotFile = dotFileLocation + File.separator + dagInfo.getDagId() + ".dot"; + try { + List<String> criticalVertices = null; + if (!sortedByValues.isEmpty()) { + String criticalPath = sortedByValues.keySet().iterator().next(); + criticalVertices = getVertexNames(criticalPath); + } else { + criticalVertices = Lists.newLinkedList(); + } + Utils.generateDAGVizFile(dagInfo, dotFile, criticalVertices); + } catch (IOException e) { + throw new TezException(e); } } @@ -98,7 +122,7 @@ public class CriticalPathAnalyzer implements Analyzer { if (dest != null) { time += dest.getTimeTaken(); - predecessor += destVertexName + "-->"; + predecessor += destVertexName + CONNECTOR; for (VertexInfo incomingVertex : dest.getInputVertices()) { getCriticalPath(predecessor, incomingVertex, time, result); @@ -107,4 +131,12 @@ public class CriticalPathAnalyzer implements Analyzer { result.put(predecessor, time); } } + + private static List<String> getVertexNames(String criticalPath) { + if (Strings.isNullOrEmpty(criticalPath)) { + return Lists.newLinkedList(); + } + return Lists.newLinkedList(Splitter.on(CONNECTOR).trimResults().omitEmptyStrings().split + (criticalPath)); + } } http://git-wip-us.apache.org/repos/asf/tez/blob/ecd90dc1/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/LocalityAnalyzer.java ---------------------------------------------------------------------- diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/LocalityAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/LocalityAnalyzer.java index 67b4c51..7ed52da 100644 --- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/LocalityAnalyzer.java +++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/LocalityAnalyzer.java @@ -108,7 +108,8 @@ public class LocalityAnalyzer implements Analyzer { record.add(otherTaskResult.avgRuntime + ""); //Get the number of inputs to this vertex - record.add(vertexInfo.getInputEdges().size() + ""); + record.add(vertexInfo.getInputEdges().size() + + vertexInfo.getAdditionalInputInfoList().size() + ""); //Get the avg HDFS bytes read in this vertex for different type of locality record.add(dataLocalResult.avgHDFSBytesRead + "");
