http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestHistoryParser.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestHistoryParser.java b/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestHistoryParser.java new file mode 100644 index 0000000..2b23294 --- /dev/null +++ b/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestHistoryParser.java @@ -0,0 +1,813 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.history; + +import com.google.common.collect.Sets; +import com.sun.tools.internal.ws.processor.ProcessorException; +import org.apache.commons.cli.ParseException; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang.RandomStringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.tez.client.CallerContext; +import org.apache.tez.client.TezClient; +import org.apache.tez.common.counters.DAGCounter; +import org.apache.tez.common.counters.TaskCounter; +import org.apache.tez.common.counters.TezCounter; +import org.apache.tez.dag.api.DAG; +import org.apache.tez.dag.api.DataSinkDescriptor; +import org.apache.tez.dag.api.DataSourceDescriptor; +import org.apache.tez.dag.api.Edge; +import org.apache.tez.dag.api.EdgeProperty; +import org.apache.tez.dag.api.ProcessorDescriptor; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.TezException; +import org.apache.tez.dag.api.Vertex; +import org.apache.tez.dag.api.client.DAGClient; +import org.apache.tez.dag.api.client.StatusGetOpts; +import org.apache.tez.dag.api.event.VertexState; +import org.apache.tez.dag.api.oldrecords.TaskAttemptState; +import org.apache.tez.dag.api.oldrecords.TaskState; +import org.apache.tez.dag.app.dag.DAGState; +import org.apache.tez.dag.history.logging.ats.ATSHistoryLoggingService; +import org.apache.tez.dag.history.logging.impl.SimpleHistoryLoggingService; +import org.apache.tez.dag.records.TezDAGID; +import org.apache.tez.examples.WordCount; +import org.apache.tez.history.parser.ATSFileParser; +import org.apache.tez.history.parser.SimpleHistoryParser; +import org.apache.tez.history.parser.datamodel.BaseInfo; +import org.apache.tez.history.parser.datamodel.DagInfo; +import org.apache.tez.history.parser.datamodel.EdgeInfo; +import org.apache.tez.history.parser.datamodel.TaskAttemptInfo; +import org.apache.tez.history.parser.datamodel.TaskAttemptInfo.DataDependencyEvent; +import org.apache.tez.history.parser.datamodel.TaskInfo; +import org.apache.tez.history.parser.datamodel.VersionInfo; +import org.apache.tez.history.parser.datamodel.VertexInfo; +import org.apache.tez.mapreduce.input.MRInput; +import org.apache.tez.mapreduce.output.MROutput; +import org.apache.tez.mapreduce.processor.SimpleMRProcessor; +import org.apache.tez.runtime.api.ProcessorContext; +import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; +import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfig; +import org.apache.tez.runtime.library.input.OrderedGroupedKVInput; +import org.apache.tez.runtime.library.output.OrderedPartitionedKVOutput; +import org.apache.tez.runtime.library.partitioner.HashPartitioner; +import org.apache.tez.tests.MiniTezClusterWithTimeline; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.*; + +public class TestHistoryParser { + + private static MiniDFSCluster miniDFSCluster; + private static MiniTezClusterWithTimeline miniTezCluster; + + //location within miniHDFS cluster's hdfs + private static Path inputLoc = new Path("/tmp/sample.txt"); + + private final static String INPUT = "Input"; + private final static String OUTPUT = "Output"; + private final static String TOKENIZER = "Tokenizer"; + private final static String SUMMATION = "Summation"; + private final static String SIMPLE_HISTORY_DIR = "/tmp/simplehistory/"; + private final static String HISTORY_TXT = "history.txt"; + + private static Configuration conf = new Configuration(); + private static FileSystem fs; + private static String TEST_ROOT_DIR = + "target" + Path.SEPARATOR + TestHistoryParser.class.getName() + "-tmpDir"; + private static String TEZ_BASE_DIR = + "target" + Path.SEPARATOR + TestHistoryParser.class.getName() + "-tez"; + private static String DOWNLOAD_DIR = TEST_ROOT_DIR + Path.SEPARATOR + "download"; + + @BeforeClass + public static void setupCluster() throws Exception { + conf = new Configuration(); + conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_EDITS_NOEDITLOGCHANNELFLUSH, false); + EditLogFileOutputStream.setShouldSkipFsyncForTesting(true); + conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR); + miniDFSCluster = + new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build(); + fs = miniDFSCluster.getFileSystem(); + conf.set("fs.defaultFS", fs.getUri().toString()); + + setupTezCluster(); + } + + @AfterClass + public static void shutdownCluster() { + try { + if (miniDFSCluster != null) { + miniDFSCluster.shutdown(); + } + if (miniTezCluster != null) { + miniTezCluster.stop(); + } + } finally { + try { + FileUtils.deleteDirectory(new File(TEST_ROOT_DIR)); + FileUtils.deleteDirectory(new File(TEZ_BASE_DIR)); + } catch (IOException e) { + //safe to ignore + } + } + } + + // @Before + public static void setupTezCluster() throws Exception { + conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_CONNECT_TIMEOUT, 3 * 1000); + conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT, 3 * 1000); + conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_FAILURES_LIMIT, 2); + + //Enable per edge counters + conf.setBoolean(TezConfiguration.TEZ_TASK_GENERATE_COUNTERS_PER_IO, true); + conf.setBoolean(TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS, true); + conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); + conf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS, ATSHistoryLoggingService + .class.getName()); + + conf.set(TezConfiguration.TEZ_SIMPLE_HISTORY_LOGGING_DIR, SIMPLE_HISTORY_DIR); + + miniTezCluster = + new MiniTezClusterWithTimeline(TEZ_BASE_DIR, 1, 1, 1, true); + + miniTezCluster.init(conf); + miniTezCluster.start(); + + createSampleFile(inputLoc); + + TezConfiguration tezConf = new TezConfiguration(miniTezCluster.getConfig()); + tezConf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); + tezConf.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS, "0.0.0.0:8188"); + tezConf.setBoolean(TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS, true); + tezConf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS, + ATSHistoryLoggingService.class.getName()); + + } + + + /** + * Run a word count example in mini cluster and check if it is possible to download + * data from ATS and parse it. Also, run with SimpleHistoryLogging option and verify + * if it matches with ATS data. + * + * @throws Exception + */ + @Test + public void testParserWithSuccessfulJob() throws Exception { + //Run basic word count example. + String dagId = runWordCount(WordCount.TokenProcessor.class.getName(), + WordCount.SumProcessor.class.getName(), "WordCount", true); + + //Export the data from ATS + String[] args = { "--dagId=" + dagId, "--downloadDir=" + DOWNLOAD_DIR }; + + int result = ATSImportTool.process(args); + assertTrue(result == 0); + + //Parse ATS data and verify results + DagInfo dagInfoFromATS = getDagInfo(dagId); + verifyDagInfo(dagInfoFromATS, true); + verifyJobSpecificInfo(dagInfoFromATS); + + //Now run with SimpleHistoryLogging + dagId = runWordCount(WordCount.TokenProcessor.class.getName(), + WordCount.SumProcessor.class.getName(), "WordCount", false); + Thread.sleep(10000); //For all flushes to happen and to avoid half-cooked download. + + DagInfo shDagInfo = getDagInfoFromSimpleHistory(dagId); + verifyDagInfo(shDagInfo, false); + verifyJobSpecificInfo(shDagInfo); + + //Compare dagInfo by parsing ATS data with DagInfo obtained by parsing SimpleHistoryLog + isDAGEqual(dagInfoFromATS, shDagInfo); + } + + private DagInfo getDagInfoFromSimpleHistory(String dagId) throws TezException, IOException { + TezDAGID tezDAGID = TezDAGID.fromString(dagId); + ApplicationAttemptId applicationAttemptId = ApplicationAttemptId.newInstance(tezDAGID + .getApplicationId(), 1); + Path historyPath = new Path(conf.get("fs.defaultFS") + + SIMPLE_HISTORY_DIR + HISTORY_TXT + "." + + applicationAttemptId); + FileSystem fs = historyPath.getFileSystem(conf); + + Path localPath = new Path(DOWNLOAD_DIR, HISTORY_TXT); + fs.copyToLocalFile(historyPath, localPath); + File localFile = new File(DOWNLOAD_DIR, HISTORY_TXT); + + //Now parse via SimpleHistory + SimpleHistoryParser parser = new SimpleHistoryParser(localFile); + DagInfo dagInfo = parser.getDAGData(dagId); + assertTrue(dagInfo.getDagId().equals(dagId)); + return dagInfo; + } + + private void verifyJobSpecificInfo(DagInfo dagInfo) { + //Job specific + assertTrue(dagInfo.getNumVertices() == 2); + assertTrue(dagInfo.getName().equals("WordCount")); + assertTrue(dagInfo.getVertex(TOKENIZER).getProcessorClassName().equals( + WordCount.TokenProcessor.class.getName())); + assertTrue(dagInfo.getVertex(SUMMATION).getProcessorClassName() + .equals(WordCount.SumProcessor.class.getName())); + assertTrue(dagInfo.getEdges().size() == 1); + EdgeInfo edgeInfo = dagInfo.getEdges().iterator().next(); + assertTrue(edgeInfo.getDataMovementType(). + equals(EdgeProperty.DataMovementType.SCATTER_GATHER.toString())); + assertTrue(edgeInfo.getSourceVertex().getVertexName().equals(TOKENIZER)); + assertTrue(edgeInfo.getDestinationVertex().getVertexName().equals(SUMMATION)); + assertTrue(edgeInfo.getInputVertexName().equals(TOKENIZER)); + assertTrue(edgeInfo.getOutputVertexName().equals(SUMMATION)); + assertTrue(edgeInfo.getEdgeSourceClass().equals(OrderedPartitionedKVOutput.class.getName())); + assertTrue(edgeInfo.getEdgeDestinationClass().equals(OrderedGroupedKVInput.class.getName())); + assertTrue(dagInfo.getVertices().size() == 2); + String lastSourceTA = null; + String lastDataEventSourceTA = null; + for (VertexInfo vertexInfo : dagInfo.getVertices()) { + assertTrue(vertexInfo.getKilledTasksCount() == 0); + assertTrue(vertexInfo.getInitRequestedTime() > 0); + assertTrue(vertexInfo.getInitTime() > 0); + assertTrue(vertexInfo.getStartRequestedTime() > 0); + assertTrue(vertexInfo.getStartTime() > 0); + assertTrue(vertexInfo.getFinishTime() > 0); + long finishTime = 0; + for (TaskInfo taskInfo : vertexInfo.getTasks()) { + assertTrue(taskInfo.getNumberOfTaskAttempts() == 1); + assertTrue(taskInfo.getMaxTaskAttemptDuration() >= 0); + assertTrue(taskInfo.getMinTaskAttemptDuration() >= 0); + assertTrue(taskInfo.getAvgTaskAttemptDuration() >= 0); + assertTrue(taskInfo.getLastTaskAttemptToFinish() != null); + assertTrue(taskInfo.getContainersMapping().size() > 0); + assertTrue(taskInfo.getSuccessfulTaskAttempts().size() > 0); + assertTrue(taskInfo.getFailedTaskAttempts().size() == 0); + assertTrue(taskInfo.getKilledTaskAttempts().size() == 0); + List<TaskAttemptInfo> attempts = taskInfo.getTaskAttempts(); + if (vertexInfo.getVertexName().equals(TOKENIZER)) { + // get the last task to finish and track its successful attempt + if (finishTime < taskInfo.getFinishTime()) { + finishTime = taskInfo.getFinishTime(); + lastSourceTA = taskInfo.getSuccessfulAttemptId(); + } + } else { + for (TaskAttemptInfo attempt : attempts) { + DataDependencyEvent item = attempt.getLastDataEvents().get(0); + assertTrue(item.getTimestamp() > 0); + + if (lastDataEventSourceTA == null) { + lastDataEventSourceTA = item.getTaskAttemptId(); + } else { + // all attempts should have the same last data event source TA + assertTrue(lastDataEventSourceTA.equals(item.getTaskAttemptId())); + } + } + } + for (TaskAttemptInfo attemptInfo : taskInfo.getTaskAttempts()) { + assertTrue(attemptInfo.getCreationTime() > 0); + assertTrue(attemptInfo.getAllocationTime() > 0); + assertTrue(attemptInfo.getStartTime() > 0); + } + } + assertTrue(vertexInfo.getLastTaskToFinish() != null); + if (vertexInfo.getVertexName().equals(TOKENIZER)) { + assertTrue(vertexInfo.getInputEdges().size() == 0); + assertTrue(vertexInfo.getOutputEdges().size() == 1); + assertTrue(vertexInfo.getOutputVertices().size() == 1); + assertTrue(vertexInfo.getInputVertices().size() == 0); + } else { + assertTrue(vertexInfo.getInputEdges().size() == 1); + assertTrue(vertexInfo.getOutputEdges().size() == 0); + assertTrue(vertexInfo.getOutputVertices().size() == 0); + assertTrue(vertexInfo.getInputVertices().size() == 1); + } + } + assertTrue(lastSourceTA.equals(lastDataEventSourceTA)); + } + + /** + * Run a word count example in mini cluster. + * Provide invalid URL for ATS. + * + * @throws Exception + */ + @Test + public void testParserWithSuccessfulJob_InvalidATS() throws Exception { + //Run basic word count example. + String dagId = runWordCount(WordCount.TokenProcessor.class.getName(), + WordCount.SumProcessor.class.getName(), "WordCount-With-WrongATS-URL", true); + + //Export the data from ATS + String atsAddress = "--atsAddress=http://atsHost:8188"; + String[] args = { "--dagId=" + dagId, + "--downloadDir=" + DOWNLOAD_DIR, + atsAddress + }; + + try { + int result = ATSImportTool.process(args); + fail("Should have failed with processException"); + } catch(ParseException e) { + //expects exception + } + } + + /** + * Run a failed job and parse the data from ATS + */ + @Test + public void testParserWithFailedJob() throws Exception { + //Run a job which would fail + String dagId = runWordCount(WordCount.TokenProcessor.class.getName(), FailProcessor.class + .getName(), "WordCount-With-Exception", true); + + //Export the data from ATS + String[] args = { "--dagId=" + dagId, "--downloadDir=" + DOWNLOAD_DIR }; + + int result = ATSImportTool.process(args); + assertTrue(result == 0); + + //Parse ATS data + DagInfo dagInfo = getDagInfo(dagId); + + //Verify DAGInfo. Verifies vertex, task, taskAttempts in recursive manner + verifyDagInfo(dagInfo, true); + + //Dag specific + VertexInfo summationVertex = dagInfo.getVertex(SUMMATION); + assertTrue(summationVertex.getFailedTasks().size() == 1); //1 task, 4 attempts failed + assertTrue(summationVertex.getFailedTasks().get(0).getFailedTaskAttempts().size() == 4); + assertTrue(summationVertex.getStatus().equals(VertexState.FAILED.toString())); + + assertTrue(dagInfo.getFailedVertices().size() == 1); + assertTrue(dagInfo.getFailedVertices().get(0).getVertexName().equals(SUMMATION)); + assertTrue(dagInfo.getSuccessfullVertices().size() == 1); + assertTrue(dagInfo.getSuccessfullVertices().get(0).getVertexName().equals(TOKENIZER)); + + assertTrue(dagInfo.getStatus().equals(DAGState.FAILED.toString())); + + verifyCounter(dagInfo.getCounter(DAGCounter.NUM_FAILED_TASKS.toString()), null, 4); + verifyCounter(dagInfo.getCounter(DAGCounter.NUM_SUCCEEDED_TASKS.toString()), null, 1); + verifyCounter(dagInfo.getCounter(DAGCounter.TOTAL_LAUNCHED_TASKS.toString()), null, 5); + + verifyCounter(dagInfo.getCounter(TaskCounter.INPUT_RECORDS_PROCESSED.toString()), + "TaskCounter_Tokenizer_INPUT_Input", 10); + verifyCounter(dagInfo.getCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_READ.toString()), + "TaskCounter_Tokenizer_OUTPUT_Summation", 0); + verifyCounter(dagInfo.getCounter(TaskCounter.OUTPUT_RECORDS.toString()), + "TaskCounter_Tokenizer_OUTPUT_Summation", + 20); //Every line has 2 words. 10 lines x 2 words = 20 + verifyCounter(dagInfo.getCounter(TaskCounter.SPILLED_RECORDS.toString()), + "TaskCounter_Tokenizer_OUTPUT_Summation", 20); //Same as above + + for (TaskInfo taskInfo : summationVertex.getTasks()) { + TaskAttemptInfo lastAttempt = null; + for (TaskAttemptInfo attemptInfo : taskInfo.getTaskAttempts()) { + if (lastAttempt != null) { + // failed attempt should be causal TA of next attempt + assertTrue(lastAttempt.getTaskAttemptId().equals(attemptInfo.getCreationCausalTA())); + assertTrue(lastAttempt.getTerminationCause() != null); + } + lastAttempt = attemptInfo; + } + } + + //TODO: Need to check for SUMMATION vertex counters. Since all attempts are failed, counters are not getting populated. + //TaskCounter.REDUCE_INPUT_RECORDS + + //Verify if the processor exception is given in diagnostics + assertTrue(dagInfo.getDiagnostics().contains("Failing this processor for some reason")); + + } + + /** + * Adding explicit equals here instead of in DAG/Vertex/Edge where hashCode also needs to + * change. Also, some custom comparisons are done here for unit testing. + */ + private void isDAGEqual(DagInfo dagInfo1, DagInfo dagInfo2) { + assertNotNull(dagInfo1); + assertNotNull(dagInfo2); + assertEquals(dagInfo1.getStatus(), dagInfo2.getStatus()); + isEdgeEqual(dagInfo1.getEdges(), dagInfo2.getEdges()); + isVertexEqual(dagInfo1.getVertices(), dagInfo2.getVertices()); + } + + private void isVertexEqual(VertexInfo vertexInfo1, VertexInfo vertexInfo2) { + assertTrue(vertexInfo1 != null); + assertTrue(vertexInfo2 != null); + assertTrue(vertexInfo1.getVertexName().equals(vertexInfo2.getVertexName())); + assertTrue(vertexInfo1.getProcessorClassName().equals(vertexInfo2.getProcessorClassName())); + assertTrue(vertexInfo1.getNumTasks() == vertexInfo2.getNumTasks()); + assertTrue(vertexInfo1.getCompletedTasksCount() == vertexInfo2.getCompletedTasksCount()); + assertTrue(vertexInfo1.getStatus().equals(vertexInfo2.getStatus())); + + isEdgeEqual(vertexInfo1.getInputEdges(), vertexInfo2.getInputEdges()); + isEdgeEqual(vertexInfo1.getOutputEdges(), vertexInfo2.getOutputEdges()); + + assertTrue(vertexInfo1.getInputVertices().size() == vertexInfo2.getInputVertices().size()); + assertTrue(vertexInfo1.getOutputVertices().size() == vertexInfo2.getOutputVertices().size()); + + assertTrue(vertexInfo1.getNumTasks() == vertexInfo2.getNumTasks()); + isTaskEqual(vertexInfo1.getTasks(), vertexInfo2.getTasks()); + } + + private void isVertexEqual(List<VertexInfo> vertexList1, List<VertexInfo> vertexList2) { + assertTrue("Vertices sizes should be the same", vertexList1.size() == vertexList2.size()); + Iterator<VertexInfo> it1 = vertexList1.iterator(); + Iterator<VertexInfo> it2 = vertexList2.iterator(); + while (it1.hasNext()) { + assertTrue(it2.hasNext()); + VertexInfo info1 = it1.next(); + VertexInfo info2 = it2.next(); + isVertexEqual(info1, info2); + } + } + + private void isEdgeEqual(EdgeInfo edgeInfo1, EdgeInfo edgeInfo2) { + assertTrue(edgeInfo1 != null); + assertTrue(edgeInfo2 != null); + String info1 = edgeInfo1.toString(); + String info2 = edgeInfo1.toString(); + assertTrue(info1.equals(info2)); + } + + private void isEdgeEqual(Collection<EdgeInfo> info1, Collection<EdgeInfo> info2) { + assertTrue("sizes should be the same", info1.size() == info1.size()); + Iterator<EdgeInfo> it1 = info1.iterator(); + Iterator<EdgeInfo> it2 = info2.iterator(); + while (it1.hasNext()) { + assertTrue(it2.hasNext()); + isEdgeEqual(it1.next(), it2.next()); + } + } + + private void isTaskEqual(Collection<TaskInfo> info1, Collection<TaskInfo> info2) { + assertTrue("sizes should be the same", info1.size() == info1.size()); + Iterator<TaskInfo> it1 = info1.iterator(); + Iterator<TaskInfo> it2 = info2.iterator(); + while (it1.hasNext()) { + assertTrue(it2.hasNext()); + isTaskEqual(it1.next(), it2.next()); + } + } + + private void isTaskEqual(TaskInfo taskInfo1, TaskInfo taskInfo2) { + assertTrue(taskInfo1 != null); + assertTrue(taskInfo2 != null); + assertTrue(taskInfo1.getVertexInfo() != null); + assertTrue(taskInfo2.getVertexInfo() != null); + assertTrue(taskInfo1.getStatus().equals(taskInfo2.getStatus())); + assertTrue( + taskInfo1.getVertexInfo().getVertexName() + .equals(taskInfo2.getVertexInfo().getVertexName())); + isTaskAttemptEqual(taskInfo1.getTaskAttempts(), taskInfo2.getTaskAttempts()); + + //Verify counters + isCountersSame(taskInfo1, taskInfo2); + } + + private void isCountersSame(BaseInfo info1, BaseInfo info2) { + isCounterSame(info1.getCounter(TaskCounter.ADDITIONAL_SPILL_COUNT.name()), + info2.getCounter(TaskCounter.ADDITIONAL_SPILL_COUNT.name())); + + isCounterSame(info1.getCounter(TaskCounter.SPILLED_RECORDS.name()), + info2.getCounter(TaskCounter.SPILLED_RECORDS.name())); + + isCounterSame(info1.getCounter(TaskCounter.OUTPUT_RECORDS.name()), + info2.getCounter(TaskCounter.OUTPUT_RECORDS.name())); + + isCounterSame(info1.getCounter(TaskCounter.OUTPUT_BYTES.name()), + info2.getCounter(TaskCounter.OUTPUT_BYTES.name())); + + isCounterSame(info1.getCounter(TaskCounter.OUTPUT_RECORDS.name()), + info2.getCounter(TaskCounter.OUTPUT_RECORDS.name())); + + isCounterSame(info1.getCounter(TaskCounter.REDUCE_INPUT_GROUPS.name()), + info2.getCounter(TaskCounter.REDUCE_INPUT_GROUPS.name())); + + isCounterSame(info1.getCounter(TaskCounter.REDUCE_INPUT_RECORDS.name()), + info2.getCounter(TaskCounter.REDUCE_INPUT_RECORDS.name())); + } + + private void isCounterSame(Map<String, TezCounter> counter1, Map<String, TezCounter> counter2) { + for (Map.Entry<String, TezCounter> entry : counter1.entrySet()) { + String source = entry.getKey(); + long val = entry.getValue().getValue(); + + //check if other counter has the same value + assertTrue(counter2.containsKey(entry.getKey())); + assertTrue(counter2.get(entry.getKey()).getValue() == val); + } + } + + private void isTaskAttemptEqual(Collection<TaskAttemptInfo> info1, + Collection<TaskAttemptInfo> info2) { + assertTrue("sizes should be the same", info1.size() == info1.size()); + Iterator<TaskAttemptInfo> it1 = info1.iterator(); + Iterator<TaskAttemptInfo> it2 = info2.iterator(); + while (it1.hasNext()) { + assertTrue(it2.hasNext()); + isTaskAttemptEqual(it1.next(), it2.next()); + } + } + + private void isTaskAttemptEqual(TaskAttemptInfo info1, TaskAttemptInfo info2) { + assertTrue(info1 != null); + assertTrue(info2 != null); + assertTrue(info1.getTaskInfo() != null); + assertTrue(info2.getTaskInfo() != null); + assertTrue(info1.getStatus().equals(info2.getStatus())); + assertTrue(info1.getTaskInfo().getVertexInfo().getVertexName().equals(info2.getTaskInfo() + .getVertexInfo().getVertexName())); + + //Verify counters + isCountersSame(info1, info2); + } + + + /** + * Create sample file for wordcount program + * + * @param inputLoc + * @throws IOException + */ + private static void createSampleFile(Path inputLoc) throws IOException { + fs.deleteOnExit(inputLoc); + FSDataOutputStream out = fs.create(inputLoc); + BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(out)); + for (int i = 0; i < 10; i++) { + writer.write("Sample " + RandomStringUtils.randomAlphanumeric(5)); + writer.newLine(); + } + writer.close(); + } + + private DagInfo getDagInfo(String dagId) throws TezException { + //Parse downloaded contents + File downloadedFile = new File(DOWNLOAD_DIR + + Path.SEPARATOR + dagId + ".zip"); + ATSFileParser parser = new ATSFileParser(downloadedFile); + DagInfo dagInfo = parser.getDAGData(dagId); + assertTrue(dagInfo.getDagId().equals(dagId)); + return dagInfo; + } + + private void verifyCounter(Map<String, TezCounter> counterMap, + String counterGroupName, long expectedVal) { + //Iterate through group-->tezCounter + for (Map.Entry<String, TezCounter> entry : counterMap.entrySet()) { + if (counterGroupName != null) { + if (entry.getKey().equals(counterGroupName)) { + assertTrue(entry.getValue().getValue() == expectedVal); + } + } else { + assertTrue(entry.getValue().getValue() == expectedVal); + } + } + } + + TezClient getTezClient(boolean withTimeline) throws Exception { + TezConfiguration tezConf = new TezConfiguration(miniTezCluster.getConfig()); + if (withTimeline) { + tezConf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, withTimeline); + tezConf.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS, "0.0.0.0:8188"); + tezConf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS, + ATSHistoryLoggingService.class.getName()); + } else { + tezConf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS, + SimpleHistoryLoggingService.class.getName()); + } + tezConf.setBoolean(TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS, true); + + TezClient tezClient = TezClient.create("WordCount", tezConf, false); + tezClient.start(); + tezClient.waitTillReady(); + return tezClient; + } + + private String runWordCount(String tokenizerProcessor, String summationProcessor, + String dagName, boolean withTimeline) + throws Exception { + //HDFS path + Path outputLoc = new Path("/tmp/outPath_" + System.currentTimeMillis()); + + DataSourceDescriptor dataSource = MRInput.createConfigBuilder(conf, + TextInputFormat.class, inputLoc.toString()).build(); + + DataSinkDescriptor dataSink = + MROutput.createConfigBuilder(conf, TextOutputFormat.class, outputLoc.toString()).build(); + + Vertex tokenizerVertex = Vertex.create(TOKENIZER, ProcessorDescriptor.create( + tokenizerProcessor)).addDataSource(INPUT, dataSource); + + OrderedPartitionedKVEdgeConfig edgeConf = OrderedPartitionedKVEdgeConfig + .newBuilder(Text.class.getName(), IntWritable.class.getName(), + HashPartitioner.class.getName()).build(); + + Vertex summationVertex = Vertex.create(SUMMATION, + ProcessorDescriptor.create(summationProcessor), 1).addDataSink(OUTPUT, dataSink); + + // Create DAG and add the vertices. Connect the producer and consumer vertices via the edge + DAG dag = DAG.create(dagName); + dag.addVertex(tokenizerVertex).addVertex(summationVertex).addEdge( + Edge.create(tokenizerVertex, summationVertex, edgeConf.createDefaultEdgeProperty())); + + TezClient tezClient = getTezClient(withTimeline); + + // Update Caller Context + CallerContext callerContext = CallerContext.create("TezExamples", "Tez WordCount Example Job"); + ApplicationId appId = tezClient.getAppMasterApplicationId(); + if (appId == null) { + appId = ApplicationId.newInstance(1001l, 1); + } + callerContext.setCallerIdAndType(appId.toString(), "TezApplication"); + dag.setCallerContext(callerContext); + + DAGClient client = tezClient.submitDAG(dag); + client.waitForCompletionWithStatusUpdates(Sets.newHashSet(StatusGetOpts.GET_COUNTERS)); + TezDAGID tezDAGID = TezDAGID.getInstance(tezClient.getAppMasterApplicationId(), 1); + + if (tezClient != null) { + tezClient.stop(); + } + return tezDAGID.toString(); + } + + /** + * Processor which would just throw exception. + */ + public static class FailProcessor extends SimpleMRProcessor { + public FailProcessor(ProcessorContext context) { + super(context); + } + + @Override + public void run() throws Exception { + throw new Exception("Failing this processor for some reason"); + } + } + + private void verifyDagInfo(DagInfo dagInfo, boolean ats) { + if (ats) { + VersionInfo versionInfo = dagInfo.getVersionInfo(); + assertTrue(versionInfo != null); //should be present post 0.5.4 + assertTrue(versionInfo.getVersion() != null); + assertTrue(versionInfo.getRevision() != null); + assertTrue(versionInfo.getBuildTime() != null); + } + + assertTrue(dagInfo.getStartTime() > 0); + assertTrue(dagInfo.getFinishTimeInterval() > 0); + assertTrue(dagInfo.getStartTimeInterval() == 0); + assertTrue(dagInfo.getStartTime() > 0); + if (dagInfo.getStatus().equalsIgnoreCase(DAGState.SUCCEEDED.toString())) { + assertTrue(dagInfo.getFinishTime() >= dagInfo.getStartTime()); + } + assertTrue(dagInfo.getFinishTimeInterval() > dagInfo.getStartTimeInterval()); + + assertTrue(dagInfo.getStartTime() > dagInfo.getSubmitTime()); + assertTrue(dagInfo.getTimeTaken() > 0); + + assertNotNull(dagInfo.getCallerContext()); + assertEquals("TezExamples", dagInfo.getCallerContext().getContext()); + assertEquals("Tez WordCount Example Job", dagInfo.getCallerContext().getBlob()); + assertNotNull(dagInfo.getCallerContext().getCallerId()); + assertEquals("TezApplication", dagInfo.getCallerContext().getCallerType()); + + //Verify all vertices + for (VertexInfo vertexInfo : dagInfo.getVertices()) { + verifyVertex(vertexInfo, vertexInfo.getFailedTasksCount() > 0); + } + + VertexInfo fastestVertex = dagInfo.getFastestVertex(); + assertTrue(fastestVertex != null); + + if (dagInfo.getStatus().equals(DAGState.SUCCEEDED)) { + assertTrue(dagInfo.getSlowestVertex() != null); + } + } + + private void verifyVertex(VertexInfo vertexInfo, boolean hasFailedTasks) { + assertTrue(vertexInfo != null); + if (hasFailedTasks) { + assertTrue(vertexInfo.getFailedTasksCount() > 0); + } + assertTrue(vertexInfo.getStartTimeInterval() > 0); + assertTrue(vertexInfo.getStartTime() > 0); + assertTrue(vertexInfo.getFinishTimeInterval() > 0); + assertTrue(vertexInfo.getStartTimeInterval() < vertexInfo.getFinishTimeInterval()); + assertTrue(vertexInfo.getVertexName() != null); + if (!hasFailedTasks) { + assertTrue(vertexInfo.getFinishTime() > 0); + assertTrue(vertexInfo.getFailedTasks().size() == 0); + assertTrue(vertexInfo.getSucceededTasksCount() == vertexInfo.getSuccessfulTasks().size()); + assertTrue(vertexInfo.getFailedTasksCount() == 0); + assertTrue(vertexInfo.getAvgTaskDuration() > 0); + assertTrue(vertexInfo.getMaxTaskDuration() > 0); + assertTrue(vertexInfo.getMinTaskDuration() > 0); + assertTrue(vertexInfo.getTimeTaken() > 0); + assertTrue(vertexInfo.getStatus().equalsIgnoreCase(VertexState.SUCCEEDED.toString())); + assertTrue(vertexInfo.getCompletedTasksCount() > 0); + assertTrue(vertexInfo.getFirstTaskToStart() != null); + assertTrue(vertexInfo.getSucceededTasksCount() > 0); + assertTrue(vertexInfo.getTasks().size() > 0); + } + + for (TaskInfo taskInfo : vertexInfo.getTasks()) { + if (taskInfo.getStatus().equals(TaskState.SUCCEEDED.toString())) { + verifyTask(taskInfo, false); + } + } + + for (TaskInfo taskInfo : vertexInfo.getFailedTasks()) { + verifyTask(taskInfo, true); + } + + assertTrue(vertexInfo.getProcessorClassName() != null); + assertTrue(vertexInfo.getStatus() != null); + assertTrue(vertexInfo.getDagInfo() != null); + assertTrue(vertexInfo.getInitTimeInterval() > 0); + assertTrue(vertexInfo.getNumTasks() > 0); + } + + private void verifyTask(TaskInfo taskInfo, boolean hasFailedAttempts) { + assertTrue(taskInfo != null); + assertTrue(taskInfo.getStatus() != null); + assertTrue(taskInfo.getStartTimeInterval() > 0); + + //Not testing for killed attempts. So if there are no failures, it should succeed + if (!hasFailedAttempts) { + assertTrue(taskInfo.getStatus().equals(TaskState.SUCCEEDED.toString())); + assertTrue(taskInfo.getFinishTimeInterval() > 0 && taskInfo.getFinishTime() > taskInfo + .getFinishTimeInterval()); + assertTrue( + taskInfo.getStartTimeInterval() > 0 && taskInfo.getStartTime() > taskInfo.getStartTimeInterval()); + assertTrue(taskInfo.getSuccessfulAttemptId() != null); + assertTrue(taskInfo.getSuccessfulTaskAttempt() != null); + } + assertTrue(taskInfo.getTaskId() != null); + + for (TaskAttemptInfo attemptInfo : taskInfo.getTaskAttempts()) { + verifyTaskAttemptInfo(attemptInfo); + } + } + + private void verifyTaskAttemptInfo(TaskAttemptInfo attemptInfo) { + if (attemptInfo.getStatus() != null && attemptInfo.getStatus() + .equals(TaskAttemptState.SUCCEEDED)) { + assertTrue(attemptInfo.getStartTimeInterval() > 0); + assertTrue(attemptInfo.getFinishTimeInterval() > 0); + assertTrue(attemptInfo.getCreationTime() > 0); + assertTrue(attemptInfo.getAllocationTime() > 0); + assertTrue(attemptInfo.getStartTime() > 0); + assertTrue(attemptInfo.getFinishTime() > 0); + assertTrue(attemptInfo.getFinishTime() > attemptInfo.getStartTime()); + assertTrue(attemptInfo.getFinishTime() > attemptInfo.getFinishTimeInterval()); + assertTrue(attemptInfo.getStartTime() > attemptInfo.getStartTimeInterval()); + assertTrue(attemptInfo.getNodeId() != null); + assertTrue(attemptInfo.getTimeTaken() != -1); + assertTrue(attemptInfo.getEvents() != null); + assertTrue(attemptInfo.getTezCounters() != null); + assertTrue(attemptInfo.getContainer() != null); + } + assertTrue(attemptInfo.getTaskInfo() != null); + } +}
http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-tools/analyzers/job-analyzer/findbugs-exclude.xml ---------------------------------------------------------------------- diff --git a/tez-tools/analyzers/job-analyzer/findbugs-exclude.xml b/tez-tools/analyzers/job-analyzer/findbugs-exclude.xml new file mode 100644 index 0000000..5bebb05 --- /dev/null +++ b/tez-tools/analyzers/job-analyzer/findbugs-exclude.xml @@ -0,0 +1,28 @@ +<!-- + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. See accompanying LICENSE file. +--> +<FindBugsFilter> + + + <Match> + <Class name="org.apache.tez.analyzer.CSVResult"/> + <Bug pattern="EI_EXPOSE_REP2"/> + </Match> + + <Match> + <Class name="org.apache.tez.analyzer.CSVResult"/> + <Bug pattern="EI_EXPOSE_REP"/> + </Match> + + +</FindBugsFilter> http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-tools/analyzers/job-analyzer/pom.xml ---------------------------------------------------------------------- diff --git a/tez-tools/analyzers/job-analyzer/pom.xml b/tez-tools/analyzers/job-analyzer/pom.xml new file mode 100644 index 0000000..627c444 --- /dev/null +++ b/tez-tools/analyzers/job-analyzer/pom.xml @@ -0,0 +1,168 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. See accompanying LICENSE file. +--> + +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.tez</groupId> + <artifactId>tez-perf-analyzer</artifactId> + <version>0.7.1-SNAPSHOT</version> + </parent> + <artifactId>tez-job-analyzer</artifactId> + + <dependencies> + <dependency> + <groupId>io.dropwizard.metrics</groupId> + <artifactId>metrics-core</artifactId> + </dependency> + <dependency> + <groupId>org.apache.tez</groupId> + <artifactId>tez-history-parser</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.tez</groupId> + <artifactId>tez-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.tez</groupId> + <artifactId>tez-common</artifactId> + </dependency> + <dependency> + <groupId>org.apache.tez</groupId> + <artifactId>tez-dag</artifactId> + </dependency> + <dependency> + <groupId>org.apache.tez</groupId> + <artifactId>tez-tests</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.tez</groupId> + <artifactId>tez-tests</artifactId> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.tez</groupId> + <artifactId>tez-yarn-timeline-history</artifactId> + </dependency> + <dependency> + <groupId>org.apache.tez</groupId> + <artifactId>tez-yarn-timeline-history</artifactId> + <scope>test</scope> + <type>test-jar</type> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-common</artifactId> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-client</artifactId> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-server-tests</artifactId> + <scope>test</scope> + <type>test-jar</type> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs</artifactId> + <scope>test</scope> + <type>test-jar</type> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <scope>test</scope> + <type>test-jar</type> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-core</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-common</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-jobclient</artifactId> + <scope>test</scope> + <type>test-jar</type> + </dependency> + <dependency> + <groupId>org.codehaus.jettison</groupId> + <artifactId>jettison</artifactId> + </dependency> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-json</artifactId> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <configuration> + <archive> + <manifest> + <mainClass>org.apache.tez.analyzer.plugins.AnalyzerDriver</mainClass> + </manifest> + </archive> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.rat</groupId> + <artifactId>apache-rat-plugin</artifactId> + </plugin> + </plugins> + </build> + +</project> http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/Analyzer.java ---------------------------------------------------------------------- diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/Analyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/Analyzer.java new file mode 100644 index 0000000..6021c58 --- /dev/null +++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/Analyzer.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.analyzer; + +import org.apache.hadoop.conf.Configuration; +import org.apache.tez.dag.api.TezException; +import org.apache.tez.history.parser.datamodel.DagInfo; + + +public interface Analyzer { + + /** + * Analyze Dag + * + * @param dagInfo + * @throws TezException + */ + public void analyze(DagInfo dagInfo) throws TezException; + + /** + * Get the result of analysis + * + * @return analysis result + * @throws TezException + */ + public Result getResult() throws TezException; + + /** + * Get name of the analyzer + * + * @return name of analyze + */ + public String getName(); + + /** + * Get description of the analyzer + * + * @return description of analyzer + */ + public String getDescription(); + + /** + * Get config properties related to this analyzer + * + * @return config related to analyzer + */ + public Configuration getConfiguration(); +} http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/CSVResult.java ---------------------------------------------------------------------- diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/CSVResult.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/CSVResult.java new file mode 100644 index 0000000..5246c68 --- /dev/null +++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/CSVResult.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.analyzer; + +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; +import org.apache.tez.dag.api.TezException; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.nio.charset.Charset; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; + +/** + * Simple placeholder for storing CSV results. + * Contains headers and records in string format. + */ +public class CSVResult implements Result { + + private final String[] headers; + private final List<String[]> recordsList; + private String comments; + + public CSVResult(String[] header) { + this.headers = header; + recordsList = Lists.newLinkedList(); + } + + public String[] getHeaders() { + return headers; + } + + public void addRecord(String[] record) { + Preconditions.checkArgument(record != null, "Record can't be null"); + Preconditions.checkArgument(record.length == headers.length, "Record length" + record.length + + " does not match headers length " + headers.length); + recordsList.add(record); + } + + public Iterator<String[]> getRecordsIterator() { + return Iterators.unmodifiableIterator(recordsList.iterator()); + } + + + public void setComments(String comments) { + this.comments = comments; + } + + @Override public String toJson() throws TezException { + return ""; + } + + @Override public String getComments() { + return comments; + } + + @Override public String toString() { + return "CSVResult{" + + "headers=" + Arrays.toString(headers) + + ", recordsList=" + recordsList + + '}'; + } + + //For testing + public void dumpToFile(String fileName) throws IOException { + OutputStreamWriter writer = new OutputStreamWriter( + new FileOutputStream(new File(fileName)), + Charset.forName("UTF-8").newEncoder()); + BufferedWriter bw = new BufferedWriter(writer); + bw.write(Joiner.on(",").join(headers)); + bw.newLine(); + for (String[] record : recordsList) { + + if (record.length != headers.length) { + continue; //LOG error msg? + } + + StringBuilder sb = new StringBuilder(); + for(int i=0;i<record.length;i++) { + sb.append(!Strings.isNullOrEmpty(record[i]) ? record[i] : " "); + if (i < record.length - 1) { + sb.append(","); + } + } + bw.write(sb.toString()); + bw.newLine(); + } + bw.flush(); + bw.close(); + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/Result.java ---------------------------------------------------------------------- diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/Result.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/Result.java new file mode 100644 index 0000000..d1881eb --- /dev/null +++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/Result.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.analyzer; + +import org.apache.tez.dag.api.TezException; + +public interface Result { + + /** + * Convert result to json format + * + * @return json + * @throws TezException + */ + public String toJson() throws TezException; + + /** + * Recommendation / comments about the analysis if any. + * + * @return comments + */ + public String getComments(); +} http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/AnalyzerDriver.java ---------------------------------------------------------------------- diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/AnalyzerDriver.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/AnalyzerDriver.java new file mode 100644 index 0000000..57b21cb --- /dev/null +++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/AnalyzerDriver.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.analyzer.plugins; + +import org.apache.hadoop.util.ProgramDriver; + +public class AnalyzerDriver { + + public static void main(String argv[]){ + int exitCode = -1; + ProgramDriver pgd = new ProgramDriver(); + try { + pgd.addClass("CriticalPath", CriticalPathAnalyzer.class, + "Find the critical path of a DAG"); + pgd.addClass("ContainerReuseAnalyzer", ContainerReuseAnalyzer.class, + "Print container reuse details in a DAG"); + pgd.addClass("LocalityAnalyzer", LocalityAnalyzer.class, + "Print locality details in a DAG"); + pgd.addClass("ShuffleTimeAnalyzer", ShuffleTimeAnalyzer.class, + "Analyze the shuffle time details in a DAG"); + pgd.addClass("SkewAnalyzer", SkewAnalyzer.class, + "Analyze the skew details in a DAG"); + pgd.addClass("SlowestVertexAnalyzer", SlowestVertexAnalyzer.class, + "Print slowest vertex details in a DAG"); + pgd.addClass("SlowNodeAnalyzer", SlowNodeAnalyzer.class, + "Print node details in a DAG"); + pgd.addClass("SlowTaskIdentifier", SlowTaskIdentifier.class, + "Print slow task details in a DAG"); + pgd.addClass("SpillAnalyzer", SpillAnalyzerImpl.class, + "Print spill details in a DAG"); + pgd.addClass("TaskConcurrencyAnalyzer", TaskConcurrencyAnalyzer.class, + "Print the task concurrency details in a DAG"); + pgd.addClass("VertexLevelCriticalPathAnalyzer", VertexLevelCriticalPathAnalyzer.class, + "Find critical path at vertex level in a DAG"); + exitCode = pgd.run(argv); + } catch(Throwable e){ + e.printStackTrace(); + } + + System.exit(exitCode); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/ContainerReuseAnalyzer.java ---------------------------------------------------------------------- diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/ContainerReuseAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/ContainerReuseAnalyzer.java new file mode 100644 index 0000000..5b862f8 --- /dev/null +++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/ContainerReuseAnalyzer.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.analyzer.plugins; + +import com.google.common.collect.Lists; +import com.google.common.collect.Multimap; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.ToolRunner; +import org.apache.tez.analyzer.Analyzer; +import org.apache.tez.analyzer.CSVResult; +import org.apache.tez.dag.api.TezException; +import org.apache.tez.history.parser.datamodel.Container; +import org.apache.tez.history.parser.datamodel.DagInfo; +import org.apache.tez.history.parser.datamodel.TaskAttemptInfo; +import org.apache.tez.history.parser.datamodel.VertexInfo; + +import java.util.List; + + +/** + * Get container reuse information at a per vertex level basis. + */ +public class ContainerReuseAnalyzer extends TezAnalyzerBase implements Analyzer { + + private final Configuration config; + + private static final String[] headers = + { "vertexName", "taskAttempts", "node", "containerId", "reuseCount" }; + + private final CSVResult csvResult; + + public ContainerReuseAnalyzer(Configuration config) { + this.config = config; + this.csvResult = new CSVResult(headers); + } + + @Override + public void analyze(DagInfo dagInfo) throws TezException { + for (VertexInfo vertexInfo : dagInfo.getVertices()) { + Multimap<Container, TaskAttemptInfo> containers = vertexInfo.getContainersMapping(); + for (Container container : containers.keySet()) { + List<String> record = Lists.newLinkedList(); + record.add(vertexInfo.getVertexName()); + record.add(vertexInfo.getTaskAttempts().size() + ""); + record.add(container.getHost()); + record.add(container.getId()); + record.add(Integer.toString(containers.get(container).size())); + csvResult.addRecord(record.toArray(new String[record.size()])); + } + } + } + + @Override + public CSVResult getResult() throws TezException { + return csvResult; + } + + @Override + public String getName() { + return "Container Reuse Analyzer"; + } + + @Override + public String getDescription() { + return "Get details on container reuse analysis"; + } + + @Override + public Configuration getConfiguration() { + return config; + } + + public static void main(String[] args) throws Exception { + Configuration config = new Configuration(); + ContainerReuseAnalyzer analyzer = new ContainerReuseAnalyzer(config); + int res = ToolRunner.run(config, analyzer, args); + analyzer.printResults(); + System.exit(res); + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java ---------------------------------------------------------------------- diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java new file mode 100644 index 0000000..d4efdf9 --- /dev/null +++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java @@ -0,0 +1,646 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.analyzer.plugins; + +import java.io.File; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.StringInterner; +import org.apache.hadoop.util.ToolRunner; +import org.apache.tez.analyzer.Analyzer; +import org.apache.tez.analyzer.CSVResult; +import org.apache.tez.analyzer.plugins.CriticalPathAnalyzer.CriticalPathStep.EntityType; +import org.apache.tez.analyzer.utils.SVGUtils; +import org.apache.tez.dag.api.TezException; +import org.apache.tez.dag.api.oldrecords.TaskAttemptState; +import org.apache.tez.dag.records.TaskAttemptTerminationCause; +import org.apache.tez.history.parser.datamodel.Container; +import org.apache.tez.history.parser.datamodel.DagInfo; +import org.apache.tez.history.parser.datamodel.TaskAttemptInfo; +import org.apache.tez.history.parser.datamodel.VertexInfo; +import org.apache.tez.history.parser.datamodel.TaskAttemptInfo.DataDependencyEvent; +import org.apache.tez.history.parser.datamodel.TaskInfo; + +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +public class CriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer { + + String succeededState = StringInterner.weakIntern(TaskAttemptState.SUCCEEDED.name()); + String failedState = StringInterner.weakIntern(TaskAttemptState.FAILED.name()); + + public enum CriticalPathDependency { + DATA_DEPENDENCY, + INIT_DEPENDENCY, + COMMIT_DEPENDENCY, + RETRY_DEPENDENCY, + OUTPUT_RECREATE_DEPENDENCY + } + + public static final String DRAW_SVG = "tez.critical-path-analyzer.draw-svg"; + + public static class CriticalPathStep { + public enum EntityType { + ATTEMPT, + VERTEX_INIT, + DAG_COMMIT + } + + EntityType type; + TaskAttemptInfo attempt; + CriticalPathDependency reason; // reason linking this to the previous step on the critical path + long startCriticalPathTime; // time at which attempt is on critical path + long stopCriticalPathTime; // time at which attempt is off critical path + List<String> notes = Lists.newLinkedList(); + + public CriticalPathStep(TaskAttemptInfo attempt, EntityType type) { + this.type = type; + this.attempt = attempt; + } + public EntityType getType() { + return type; + } + public TaskAttemptInfo getAttempt() { + return attempt; + } + public long getStartCriticalTime() { + return startCriticalPathTime; + } + public long getStopCriticalTime() { + return stopCriticalPathTime; + } + public CriticalPathDependency getReason() { + return reason; + } + public List<String> getNotes() { + return notes; + } + } + + List<CriticalPathStep> criticalPath = Lists.newLinkedList(); + + Map<String, TaskAttemptInfo> attempts = Maps.newHashMap(); + + int maxConcurrency = 0; + ArrayList<TimeInfo> concurrencyByTime = Lists.newArrayList(); + + public CriticalPathAnalyzer() { + } + + @Override + public void analyze(DagInfo dagInfo) throws TezException { + // get all attempts in the dag and find the last failed/succeeded attempt. + // ignore killed attempt to handle kills that happen upon dag completion + TaskAttemptInfo lastAttempt = null; + long lastAttemptFinishTime = 0; + for (VertexInfo vertex : dagInfo.getVertices()) { + for (TaskInfo task : vertex.getTasks()) { + for (TaskAttemptInfo attempt : task.getTaskAttempts()) { + attempts.put(attempt.getTaskAttemptId(), attempt); + if (attempt.getStatus().equals(succeededState) || + attempt.getStatus().equals(failedState)) { + if (lastAttemptFinishTime < attempt.getFinishTime()) { + lastAttempt = attempt; + lastAttemptFinishTime = attempt.getFinishTime(); + } + } + } + } + } + + if (lastAttempt == null) { + System.out.println("Cannot find last attempt to finish in DAG " + dagInfo.getDagId()); + return; + } + + createCriticalPath(dagInfo, lastAttempt, lastAttemptFinishTime, attempts); + + analyzeCriticalPath(dagInfo); + + if (getConf().getBoolean(DRAW_SVG, true)) { + saveCriticalPathAsSVG(dagInfo); + } + } + + public List<CriticalPathStep> getCriticalPath() { + return criticalPath; + } + + private void saveCriticalPathAsSVG(DagInfo dagInfo) { + SVGUtils svg = new SVGUtils(); + String outputFileName = getOutputDir() + File.separator + dagInfo.getDagId() + ".svg"; + System.out.println("Writing output to: " + outputFileName); + svg.saveCriticalPathAsSVG(dagInfo, outputFileName, criticalPath); + } + + static class TimeInfo implements Comparable<TimeInfo> { + long timestamp; + int count; + boolean start; + TimeInfo(long timestamp, boolean start) { + this.timestamp = timestamp; + this.start = start; + } + + @Override + public int compareTo(TimeInfo o) { + return Long.compare(this.timestamp, o.timestamp); + } + + @Override + public int hashCode() { + return (int)((timestamp >> 32) ^ timestamp); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if(o == null) { + return false; + } + if (o.getClass() == this.getClass()) { + TimeInfo other = (TimeInfo) o; + return (this.compareTo(other) == 0); + } + else { + return false; + } + } + } + + private void determineConcurrency(DagInfo dag) { + ArrayList<TimeInfo> timeInfo = Lists.newArrayList(); + for (VertexInfo v : dag.getVertices()) { + for (TaskInfo t : v.getTasks()) { + for (TaskAttemptInfo a : t.getTaskAttempts()) { + if (a.getStartTime() > 0) { + timeInfo.add(new TimeInfo(a.getStartTime(), true)); + timeInfo.add(new TimeInfo(a.getFinishTime(), false)); + } + } + } + } + Collections.sort(timeInfo); + + int concurrency = 0; + TimeInfo lastTimeInfo = null; + for (TimeInfo t : timeInfo) { + concurrency += (t.start) ? 1 : -1; + maxConcurrency = (concurrency > maxConcurrency) ? concurrency : maxConcurrency; + if (lastTimeInfo == null || lastTimeInfo.timestamp < t.timestamp) { + lastTimeInfo = t; + lastTimeInfo.count = concurrency; + concurrencyByTime.add(lastTimeInfo); + } else { + // lastTimeInfo.timestamp == t.timestamp + lastTimeInfo.count = concurrency; + } + } +// for (TimeInfo t : concurrencyByTime) { +// System.out.println(t.timestamp + " " + t.count); +// } + } + + private int getIntervalMaxConcurrency(long begin, long end) { + int concurrency = 0; + for (TimeInfo timeInfo : concurrencyByTime) { + if (timeInfo.timestamp < begin) { + continue; + } + if (timeInfo.timestamp > end) { + break; + } + if (timeInfo.count > concurrency) { + concurrency = timeInfo.count; + } + } + return concurrency; + } + + private void analyzeAllocationOverhead(DagInfo dag) { + List<TaskAttemptInfo> preemptedAttempts = Lists.newArrayList(); + for (VertexInfo v : dag.getVertices()) { + for (TaskInfo t : v.getTasks()) { + for (TaskAttemptInfo a : t.getTaskAttempts()) { + if (a.getTerminationCause().equals( + TaskAttemptTerminationCause.INTERNAL_PREEMPTION.name())) { + System.out.println("Found preempted attempt " + a.getTaskAttemptId()); + preemptedAttempts.add(a); + } + } + } + } + for (int i = 0; i < criticalPath.size(); ++i) { + CriticalPathStep step = criticalPath.get(i); + TaskAttemptInfo attempt = step.attempt; + if (step.getType() != EntityType.ATTEMPT) { + continue; + } + + long creationTime = attempt.getCreationTime(); + long allocationTime = attempt.getAllocationTime(); + long finishTime = attempt.getFinishTime(); + if (allocationTime < step.startCriticalPathTime) { + // allocated before it became critical + continue; + } + + // the attempt is critical before allocation. So allocation overhead needs analysis + Container container = attempt.getContainer(); + if (container != null) { + Collection<TaskAttemptInfo> attempts = dag.getContainerMapping().get(container); + if (attempts != null && !attempts.isEmpty()) { + // arrange attempts by allocation time + List<TaskAttemptInfo> attemptsList = Lists.newArrayList(attempts); + Collections.sort(attemptsList, TaskAttemptInfo.orderingOnAllocationTime()); + // walk the list to record allocation time before the current attempt + long containerPreviousAllocatedTime = 0; + int reUsesForVertex = 1; + for (TaskAttemptInfo containerAttempt : attemptsList) { + if (containerAttempt.getTaskAttemptId().equals(attempt.getTaskAttemptId())) { + break; + } + if (containerAttempt.getTaskInfo().getVertexInfo().getVertexId().equals( + attempt.getTaskInfo().getVertexInfo().getVertexId())) { + // another task from the same vertex ran in this container. So there are multiple + // waves for this vertex on this container. + reUsesForVertex++; + } + long cAllocTime = containerAttempt.getAllocationTime(); + long cFinishTime = containerAttempt.getFinishTime(); + if (cFinishTime > creationTime) { + // for containerAttempts that used the container while this attempt was waiting + // add up time container was allocated to containerAttempt. Account for allocations + // that started before this attempt was created. + containerPreviousAllocatedTime += + (cFinishTime - (cAllocTime > creationTime ? cAllocTime : creationTime)); + } + } + int numVertexTasks = attempt.getTaskInfo().getVertexInfo().getNumTasks(); + int intervalMaxConcurrency = getIntervalMaxConcurrency(creationTime, finishTime); + double numWaves = getWaves(numVertexTasks, intervalMaxConcurrency); + + if (reUsesForVertex > 1) { + step.notes.add("Container ran multiple tasks for this vertex. "); + if (numWaves < 1) { + // less than 1 wave total but still ran more than 1 on this container + step.notes.add("Vertex potentially seeing contention from other branches in the DAG. "); + } + } + if (containerPreviousAllocatedTime == 0) { + step.notes.add("Container newly allocated."); + } else { + if (containerPreviousAllocatedTime >= attempt.getCreationToAllocationTimeInterval()) { + step.notes.add("Container was fully allocated"); + } else { + step.notes.add("Container in use for " + + SVGUtils.getTimeStr(containerPreviousAllocatedTime) + " out of " + + SVGUtils.getTimeStr(attempt.getCreationToAllocationTimeInterval()) + + " of allocation wait time"); + } + } + } + // look for internal preemptions while attempt was waiting for allocation + for (TaskAttemptInfo a : preemptedAttempts) { + if (a.getTaskInfo().getVertexInfo().getVertexId() + .equals(attempt.getTaskInfo().getVertexInfo().getVertexId())) { + // dont preempt same vertex task. ideally this should look at priority but we dont have it + continue; + } + if (a.getFinishTime() > creationTime && a.getFinishTime() < allocationTime) { + // found an attempt that was preempted within this time interval + step.notes.add("Potentially waited for preemption of " + a.getShortName()); + } + } + } + } + } + + private double getWaves(int numTasks, int concurrency) { + double numWaves = (numTasks*1.0) / concurrency; + numWaves = (double)Math.round(numWaves * 10d) / 10d; // convert to 1 decimal place + return numWaves; + } + + private void analyzeWaves(DagInfo dag) { + for (int i = 0; i < criticalPath.size(); ++i) { + CriticalPathStep step = criticalPath.get(i); + TaskAttemptInfo attempt = step.attempt; + if (step.getType() != EntityType.ATTEMPT) { + continue; + } + long creationTime = attempt.getCreationTime(); + long finishTime = attempt.getFinishTime(); + + int numVertexTasks = attempt.getTaskInfo().getVertexInfo().getNumTasks(); + if (numVertexTasks <= 1) { + continue; + } + int intervalMaxConcurrency = getIntervalMaxConcurrency(creationTime, finishTime); + double numWaves = getWaves(numVertexTasks, intervalMaxConcurrency); + + step.notes.add("Vertex ran " + numVertexTasks + + " tasks in " + numWaves + + " waves with available concurrency of " + intervalMaxConcurrency); + if (numWaves > 1) { + if (numWaves%1 < 0.5) { + // more than 1 wave needed and last wave is small + step.notes.add("Last partial wave did not use full concurrency. "); + } + } + } + } + + private void analyzeStragglers(DagInfo dag) { + long dagStartTime = dag.getStartTime(); + long dagTime = dag.getFinishTime() - dagStartTime; + long totalAttemptCriticalTime = 0; + for (int i = 0; i < criticalPath.size(); ++i) { + CriticalPathStep step = criticalPath.get(i); + totalAttemptCriticalTime += (step.stopCriticalPathTime - step.startCriticalPathTime); + TaskAttemptInfo attempt = step.attempt; + if (step.getType() == EntityType.ATTEMPT) { + // analyze execution overhead + if (attempt.getLastDataEvents().size() > 1) { + // there were read errors. that could have delayed the attempt. ignore this + continue; + } + long avgPostDataExecutionTime = attempt.getTaskInfo().getVertexInfo() + .getAvgPostDataExecutionTimeInterval(); + if (avgPostDataExecutionTime <= 0) { + continue; + } + long attemptExecTime = attempt.getPostDataExecutionTimeInterval(); + if (avgPostDataExecutionTime * 1.25 < attemptExecTime) { + step.notes + .add("Potential straggler. Post Data Execution time " + + SVGUtils.getTimeStr(attemptExecTime) + + " compared to vertex average of " + + SVGUtils.getTimeStr(avgPostDataExecutionTime)); + } + } + } + System.out + .println("DAG time taken: " + dagTime + " TotalAttemptTime: " + totalAttemptCriticalTime + + " DAG finish time: " + dag.getFinishTime() + " DAG start time: " + dagStartTime); + } + + private void analyzeCriticalPath(DagInfo dag) { + if (!criticalPath.isEmpty()) { + determineConcurrency(dag); + analyzeStragglers(dag); + analyzeWaves(dag); + analyzeAllocationOverhead(dag); + } + } + + private void createCriticalPath(DagInfo dagInfo, TaskAttemptInfo lastAttempt, + long lastAttemptFinishTime, Map<String, TaskAttemptInfo> attempts) { + List<CriticalPathStep> tempCP = Lists.newLinkedList(); + if (lastAttempt != null) { + TaskAttemptInfo currentAttempt = lastAttempt; + CriticalPathStep currentStep = new CriticalPathStep(currentAttempt, EntityType.DAG_COMMIT); + long currentAttemptStopCriticalPathTime = lastAttemptFinishTime; + + // add the commit step + if (dagInfo.getFinishTime() > 0) { + currentStep.stopCriticalPathTime = dagInfo.getFinishTime(); + } else { + // AM crashed and no dag finished written + currentStep.stopCriticalPathTime = currentAttemptStopCriticalPathTime; + } + currentStep.startCriticalPathTime = currentAttemptStopCriticalPathTime; + currentStep.reason = CriticalPathDependency.COMMIT_DEPENDENCY; + tempCP.add(currentStep); + + while (true) { + Preconditions.checkState(currentAttempt != null); + Preconditions.checkState(currentAttemptStopCriticalPathTime > 0); + System.out.println( + "Step: " + tempCP.size() + " Attempt: " + currentAttempt.getTaskAttemptId()); + + currentStep = new CriticalPathStep(currentAttempt, EntityType.ATTEMPT); + currentStep.stopCriticalPathTime = currentAttemptStopCriticalPathTime; + + // consider the last data event seen immediately preceding the current critical path + // stop time for this attempt + long currentStepLastDataEventTime = 0; + String currentStepLastDataTA = null; + DataDependencyEvent item = currentAttempt.getLastDataEventInfo(currentStep.stopCriticalPathTime); + if (item!=null) { + currentStepLastDataEventTime = item.getTimestamp(); + currentStepLastDataTA = item.getTaskAttemptId(); + } + + // sanity check + for (CriticalPathStep previousStep : tempCP) { + if (previousStep.type == EntityType.ATTEMPT) { + if (previousStep.attempt.getTaskAttemptId().equals(currentAttempt.getTaskAttemptId())) { + // found loop. + // this should only happen for read errors in currentAttempt + List<DataDependencyEvent> dataEvents = currentAttempt.getLastDataEvents(); + Preconditions.checkState(dataEvents.size() > 1); // received + // original and + // retry data events + Preconditions.checkState(currentStepLastDataEventTime < dataEvents + .get(dataEvents.size() - 1).getTimestamp()); // new event is + // earlier than + // last + } + } + } + + tempCP.add(currentStep); + + // find the next attempt on the critical path + boolean dataDependency = false; + // find out predecessor dependency + if (currentStepLastDataEventTime > currentAttempt.getCreationTime()) { + dataDependency = true; + } + + long startCriticalPathTime = 0; + String nextAttemptId = null; + CriticalPathDependency reason = null; + if (dataDependency) { + // last data event was produced after the attempt was scheduled. use + // data dependency + // typically the case when scheduling ahead of time + System.out.println("Has data dependency"); + if (!Strings.isNullOrEmpty(currentStepLastDataTA)) { + // there is a valid data causal TA. Use it. + nextAttemptId = currentStepLastDataTA; + reason = CriticalPathDependency.DATA_DEPENDENCY; + startCriticalPathTime = currentStepLastDataEventTime; + System.out.println("Using data dependency " + nextAttemptId); + } else { + // there is no valid data causal TA. This means data event came from the same vertex + VertexInfo vertex = currentAttempt.getTaskInfo().getVertexInfo(); + Preconditions.checkState(!vertex.getAdditionalInputInfoList().isEmpty(), + "Vertex: " + vertex.getVertexId() + " has no external inputs but the last data event " + + "TA is null for " + currentAttempt.getTaskAttemptId()); + nextAttemptId = null; + reason = CriticalPathDependency.INIT_DEPENDENCY; + System.out.println("Using init dependency"); + } + } else { + // attempt was scheduled after last data event. use scheduling dependency + // typically happens for retries + System.out.println("Has scheduling dependency"); + if (!Strings.isNullOrEmpty(currentAttempt.getCreationCausalTA())) { + // there is a scheduling causal TA. Use it. + nextAttemptId = currentAttempt.getCreationCausalTA(); + reason = CriticalPathDependency.RETRY_DEPENDENCY; + TaskAttemptInfo nextAttempt = attempts.get(nextAttemptId); + if (nextAttemptId != null) { + VertexInfo currentVertex = currentAttempt.getTaskInfo().getVertexInfo(); + VertexInfo nextVertex = nextAttempt.getTaskInfo().getVertexInfo(); + if (!nextVertex.getVertexName().equals(currentVertex.getVertexName())){ + // cause from different vertex. Might be rerun to re-generate outputs + for (VertexInfo outVertex : currentVertex.getOutputVertices()) { + if (nextVertex.getVertexName().equals(outVertex.getVertexName())) { + // next vertex is an output vertex + reason = CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY; + break; + } + } + } + } + if (reason == CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY) { + // rescheduled due to read error. start critical at read error report time. + // for now proxy own creation time for read error report time + startCriticalPathTime = currentAttempt.getCreationTime(); + } else { + // rescheduled due to own previous attempt failure + // we are critical when the previous attempt fails + Preconditions.checkState(nextAttempt != null); + Preconditions.checkState(nextAttempt.getTaskInfo().getTaskId().equals( + currentAttempt.getTaskInfo().getTaskId())); + startCriticalPathTime = nextAttempt.getFinishTime(); + } + System.out.println("Using scheduling dependency " + nextAttemptId); + } else { + // there is no scheduling causal TA. + if (!Strings.isNullOrEmpty(currentStepLastDataTA)) { + // there is a data event going to the vertex. Count the time between data event and + // creation time as Initializer/Manager overhead and follow data dependency + nextAttemptId = currentStepLastDataTA; + reason = CriticalPathDependency.DATA_DEPENDENCY; + startCriticalPathTime = currentStepLastDataEventTime; + long overhead = currentAttempt.getCreationTime() - currentStepLastDataEventTime; + currentStep.notes + .add("Initializer/VertexManager scheduling overhead " + SVGUtils.getTimeStr(overhead)); + System.out.println("Using data dependency " + nextAttemptId); + } else { + // there is no scheduling causal TA and no data event casual TA. + // the vertex has external input that sent the last data events + // or the vertex has external input but does not use events + // or the vertex has no external inputs or edges + nextAttemptId = null; + reason = CriticalPathDependency.INIT_DEPENDENCY; + System.out.println("Using init dependency"); + } + } + } + + currentStep.startCriticalPathTime = startCriticalPathTime; + currentStep.reason = reason; + + Preconditions.checkState(currentStep.stopCriticalPathTime >= currentStep.startCriticalPathTime); + + if (Strings.isNullOrEmpty(nextAttemptId)) { + Preconditions.checkState(reason.equals(CriticalPathDependency.INIT_DEPENDENCY)); + Preconditions.checkState(startCriticalPathTime == 0); + // no predecessor attempt found. this is the last step in the critical path + // assume attempts start critical path time is when its scheduled. before that is + // vertex initialization time + currentStep.startCriticalPathTime = currentStep.attempt.getCreationTime(); + + // add vertex init step + long initStepStopCriticalTime = currentStep.startCriticalPathTime; + currentStep = new CriticalPathStep(currentAttempt, EntityType.VERTEX_INIT); + currentStep.stopCriticalPathTime = initStepStopCriticalTime; + currentStep.startCriticalPathTime = dagInfo.getStartTime(); + currentStep.reason = CriticalPathDependency.INIT_DEPENDENCY; + tempCP.add(currentStep); + + if (!tempCP.isEmpty()) { + for (int i=tempCP.size() - 1; i>=0; --i) { + criticalPath.add(tempCP.get(i)); + } + } + return; + } + + currentAttempt = attempts.get(nextAttemptId); + currentAttemptStopCriticalPathTime = startCriticalPathTime; + } + } + } + + @Override + public CSVResult getResult() throws TezException { + String[] headers = { "Entity", "PathReason", "Status", "CriticalStartTime", + "CriticalStopTime", "Notes" }; + + CSVResult csvResult = new CSVResult(headers); + for (CriticalPathStep step : criticalPath) { + String entity = (step.getType() == EntityType.ATTEMPT ? step.getAttempt().getTaskAttemptId() + : (step.getType() == EntityType.VERTEX_INIT + ? step.attempt.getTaskInfo().getVertexInfo().getVertexName() : "DAG COMMIT")); + String [] record = {entity, step.getReason().name(), + step.getAttempt().getDetailedStatus(), String.valueOf(step.getStartCriticalTime()), + String.valueOf(step.getStopCriticalTime()), + Joiner.on(";").join(step.getNotes())}; + csvResult.addRecord(record); + } + return csvResult; + } + + @Override + public String getName() { + return "CriticalPathAnalyzer"; + } + + @Override + public String getDescription() { + return "Analyze critical path of the DAG"; + } + + @Override + public Configuration getConfiguration() { + return getConf(); + } + + public static void main(String[] args) throws Exception { + int res = ToolRunner.run(new Configuration(), new CriticalPathAnalyzer(), args); + System.exit(res); + } + +}
