Repository: tez Updated Branches: refs/heads/master 9aa3ae61e -> 3b669f895
TEZ-2783. Refactor analyzers to extend TezAnalyzerBase (rbalamohan) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/3b669f89 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/3b669f89 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/3b669f89 Branch: refs/heads/master Commit: 3b669f895f8254f78c1001724c0e57ad2732b36b Parents: 9aa3ae6 Author: Rajesh Balamohan <[email protected]> Authored: Wed Sep 9 12:24:05 2015 +0530 Committer: Rajesh Balamohan <[email protected]> Committed: Wed Sep 9 12:24:05 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../java/org/apache/tez/analyzer/CSVResult.java | 17 ++++--- .../tez/analyzer/plugins/AnalyzerDriver.java | 20 ++++++++ .../plugins/ContainerReuseAnalyzer.java | 12 ++++- .../tez/analyzer/plugins/LocalityAnalyzer.java | 12 ++++- .../analyzer/plugins/ShuffleTimeAnalyzer.java | 12 ++++- .../tez/analyzer/plugins/SkewAnalyzer.java | 12 ++++- .../tez/analyzer/plugins/SlowNodeAnalyzer.java | 11 ++++- .../analyzer/plugins/SlowTaskIdentifier.java | 12 ++++- .../analyzer/plugins/SlowestVertexAnalyzer.java | 11 ++++- .../tez/analyzer/plugins/SpillAnalyzerImpl.java | 12 ++++- .../plugins/TaskConcurrencyAnalyzer.java | 12 ++++- .../tez/analyzer/plugins/TezAnalyzerBase.java | 51 ++++++++++++++++++++ .../VertexLevelCriticalPathAnalyzer.java | 12 ++++- 14 files changed, 190 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/3b669f89/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index ee11006..bd04fb4 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ Release 0.8.1: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-2783. Refactor analyzers to extend TezAnalyzerBase TEZ-2784. optimize TaskImpl.isFinished() TEZ-2788. Allow TezAnalyzerBase to parse SimpleHistory logs TEZ-2782. VertexInfo.getAvgExecutionTimeInterval throws NPE when task does not have any valid attempts info http://git-wip-us.apache.org/repos/asf/tez/blob/3b669f89/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/CSVResult.java ---------------------------------------------------------------------- diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/CSVResult.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/CSVResult.java index 27ad95e..5246c68 100644 --- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/CSVResult.java +++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/CSVResult.java @@ -18,6 +18,7 @@ package org.apache.tez.analyzer; +import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.collect.Iterators; @@ -40,23 +41,23 @@ import java.util.List; */ public class CSVResult implements Result { - private final String[] header; + private final String[] headers; private final List<String[]> recordsList; private String comments; public CSVResult(String[] header) { - this.header = header; + this.headers = header; recordsList = Lists.newLinkedList(); } public String[] getHeaders() { - return header; + return headers; } public void addRecord(String[] record) { Preconditions.checkArgument(record != null, "Record can't be null"); - Preconditions.checkArgument(record.length == header.length, "Record length" + record.length + - " does not match header length " + header.length); + Preconditions.checkArgument(record.length == headers.length, "Record length" + record.length + + " does not match headers length " + headers.length); recordsList.add(record); } @@ -79,7 +80,7 @@ public class CSVResult implements Result { @Override public String toString() { return "CSVResult{" + - "header=" + Arrays.toString(header) + + "headers=" + Arrays.toString(headers) + ", recordsList=" + recordsList + '}'; } @@ -90,9 +91,11 @@ public class CSVResult implements Result { new FileOutputStream(new File(fileName)), Charset.forName("UTF-8").newEncoder()); BufferedWriter bw = new BufferedWriter(writer); + bw.write(Joiner.on(",").join(headers)); + bw.newLine(); for (String[] record : recordsList) { - if (record.length != header.length) { + if (record.length != headers.length) { continue; //LOG error msg? } http://git-wip-us.apache.org/repos/asf/tez/blob/3b669f89/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/AnalyzerDriver.java ---------------------------------------------------------------------- diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/AnalyzerDriver.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/AnalyzerDriver.java index 33dbead..57b21cb 100644 --- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/AnalyzerDriver.java +++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/AnalyzerDriver.java @@ -28,6 +28,26 @@ public class AnalyzerDriver { try { pgd.addClass("CriticalPath", CriticalPathAnalyzer.class, "Find the critical path of a DAG"); + pgd.addClass("ContainerReuseAnalyzer", ContainerReuseAnalyzer.class, + "Print container reuse details in a DAG"); + pgd.addClass("LocalityAnalyzer", LocalityAnalyzer.class, + "Print locality details in a DAG"); + pgd.addClass("ShuffleTimeAnalyzer", ShuffleTimeAnalyzer.class, + "Analyze the shuffle time details in a DAG"); + pgd.addClass("SkewAnalyzer", SkewAnalyzer.class, + "Analyze the skew details in a DAG"); + pgd.addClass("SlowestVertexAnalyzer", SlowestVertexAnalyzer.class, + "Print slowest vertex details in a DAG"); + pgd.addClass("SlowNodeAnalyzer", SlowNodeAnalyzer.class, + "Print node details in a DAG"); + pgd.addClass("SlowTaskIdentifier", SlowTaskIdentifier.class, + "Print slow task details in a DAG"); + pgd.addClass("SpillAnalyzer", SpillAnalyzerImpl.class, + "Print spill details in a DAG"); + pgd.addClass("TaskConcurrencyAnalyzer", TaskConcurrencyAnalyzer.class, + "Print the task concurrency details in a DAG"); + pgd.addClass("VertexLevelCriticalPathAnalyzer", VertexLevelCriticalPathAnalyzer.class, + "Find critical path at vertex level in a DAG"); exitCode = pgd.run(argv); } catch(Throwable e){ e.printStackTrace(); http://git-wip-us.apache.org/repos/asf/tez/blob/3b669f89/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/ContainerReuseAnalyzer.java ---------------------------------------------------------------------- diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/ContainerReuseAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/ContainerReuseAnalyzer.java index 905f966..5b862f8 100644 --- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/ContainerReuseAnalyzer.java +++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/ContainerReuseAnalyzer.java @@ -20,7 +20,9 @@ package org.apache.tez.analyzer.plugins; import com.google.common.collect.Lists; import com.google.common.collect.Multimap; + import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.ToolRunner; import org.apache.tez.analyzer.Analyzer; import org.apache.tez.analyzer.CSVResult; import org.apache.tez.dag.api.TezException; @@ -35,7 +37,7 @@ import java.util.List; /** * Get container reuse information at a per vertex level basis. */ -public class ContainerReuseAnalyzer implements Analyzer { +public class ContainerReuseAnalyzer extends TezAnalyzerBase implements Analyzer { private final Configuration config; @@ -84,4 +86,12 @@ public class ContainerReuseAnalyzer implements Analyzer { public Configuration getConfiguration() { return config; } + + public static void main(String[] args) throws Exception { + Configuration config = new Configuration(); + ContainerReuseAnalyzer analyzer = new ContainerReuseAnalyzer(config); + int res = ToolRunner.run(config, analyzer, args); + analyzer.printResults(); + System.exit(res); + } } http://git-wip-us.apache.org/repos/asf/tez/blob/3b669f89/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/LocalityAnalyzer.java ---------------------------------------------------------------------- diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/LocalityAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/LocalityAnalyzer.java index 7ed52da..ec72df1 100644 --- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/LocalityAnalyzer.java +++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/LocalityAnalyzer.java @@ -19,7 +19,9 @@ package org.apache.tez.analyzer.plugins; import com.google.common.collect.Lists; + import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.ToolRunner; import org.apache.tez.analyzer.Analyzer; import org.apache.tez.analyzer.CSVResult; import org.apache.tez.common.counters.DAGCounter; @@ -40,7 +42,7 @@ import java.util.Map; * This would be helpeful to co-relate if the vertex runtime is anyways related to the data * locality. */ -public class LocalityAnalyzer implements Analyzer { +public class LocalityAnalyzer extends TezAnalyzerBase implements Analyzer { private final String[] headers = { "vertexName", "numTasks", "dataLocalRatio", "rackLocalRatio", "otherRatio", "avgDataLocalTaskRuntime", "avgRackLocalTaskRuntime", @@ -191,4 +193,12 @@ public class LocalityAnalyzer implements Analyzer { float avgHDFSBytesRead; float avgRuntime; } + + public static void main(String[] args) throws Exception { + Configuration config = new Configuration(); + LocalityAnalyzer analyzer = new LocalityAnalyzer(config); + int res = ToolRunner.run(config, analyzer, args); + analyzer.printResults(); + System.exit(res); + } } http://git-wip-us.apache.org/repos/asf/tez/blob/3b669f89/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/ShuffleTimeAnalyzer.java ---------------------------------------------------------------------- diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/ShuffleTimeAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/ShuffleTimeAnalyzer.java index a570493..57e91c6 100644 --- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/ShuffleTimeAnalyzer.java +++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/ShuffleTimeAnalyzer.java @@ -20,7 +20,9 @@ package org.apache.tez.analyzer.plugins; import com.google.common.base.Strings; import com.google.common.collect.Lists; + import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.ToolRunner; import org.apache.tez.analyzer.Analyzer; import org.apache.tez.analyzer.CSVResult; import org.apache.tez.common.counters.TaskCounter; @@ -42,7 +44,7 @@ import java.util.Map; * grouped by vertices. Provide time taken as well. Just render it as a table for now. * */ -public class ShuffleTimeAnalyzer implements Analyzer { +public class ShuffleTimeAnalyzer extends TezAnalyzerBase implements Analyzer { /** * ratio of (total time taken by task - shuffle time) / (total time taken by task) @@ -210,4 +212,12 @@ public class ShuffleTimeAnalyzer implements Analyzer { public Configuration getConfiguration() { return config; } + + public static void main(String[] args) throws Exception { + Configuration config = new Configuration(); + ShuffleTimeAnalyzer analyzer = new ShuffleTimeAnalyzer(config); + int res = ToolRunner.run(config, analyzer, args); + analyzer.printResults(); + System.exit(res); + } } http://git-wip-us.apache.org/repos/asf/tez/blob/3b669f89/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SkewAnalyzer.java ---------------------------------------------------------------------- diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SkewAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SkewAnalyzer.java index f09380d..067d871 100644 --- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SkewAnalyzer.java +++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SkewAnalyzer.java @@ -20,7 +20,9 @@ package org.apache.tez.analyzer.plugins; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; + import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.ToolRunner; import org.apache.tez.analyzer.Analyzer; import org.apache.tez.analyzer.CSVResult; import org.apache.tez.common.counters.TaskCounter; @@ -55,7 +57,7 @@ import java.util.Map; * source. This means, may be consider increasing parallelism based on the task attempt runtime. * <p/> */ -public class SkewAnalyzer implements Analyzer { +public class SkewAnalyzer extends TezAnalyzerBase implements Analyzer { /** * Amount of bytes that was sent as shuffle bytes from source. If it is below this threshold, @@ -310,4 +312,12 @@ public class SkewAnalyzer implements Analyzer { public Configuration getConfiguration() { return null; } + + public static void main(String[] args) throws Exception { + Configuration config = new Configuration(); + SkewAnalyzer analyzer = new SkewAnalyzer(config); + int res = ToolRunner.run(config, analyzer, args); + analyzer.printResults(); + System.exit(res); + } } http://git-wip-us.apache.org/repos/asf/tez/blob/3b669f89/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowNodeAnalyzer.java ---------------------------------------------------------------------- diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowNodeAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowNodeAnalyzer.java index 407cf47..a810a8a 100644 --- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowNodeAnalyzer.java +++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowNodeAnalyzer.java @@ -22,9 +22,11 @@ import com.google.common.base.Predicate; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Multimap; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.ToolRunner; import org.apache.tez.analyzer.Analyzer; import org.apache.tez.analyzer.CSVResult; import org.apache.tez.common.counters.FileSystemCounter; @@ -46,7 +48,7 @@ import java.util.List; * <p/> * Combine it with other counters to understand slow nodes better. */ -public class SlowNodeAnalyzer implements Analyzer { +public class SlowNodeAnalyzer extends TezAnalyzerBase implements Analyzer { private static final Log LOG = LogFactory.getLog(SlowNodeAnalyzer.class); @@ -185,4 +187,11 @@ public class SlowNodeAnalyzer implements Analyzer { return config; } + public static void main(String[] args) throws Exception { + Configuration config = new Configuration(); + SlowNodeAnalyzer analyzer = new SlowNodeAnalyzer(config); + int res = ToolRunner.run(config, analyzer, args); + analyzer.printResults(); + System.exit(res); + } } http://git-wip-us.apache.org/repos/asf/tez/blob/3b669f89/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowTaskIdentifier.java ---------------------------------------------------------------------- diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowTaskIdentifier.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowTaskIdentifier.java index 1a8d9d3..d2474ad 100644 --- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowTaskIdentifier.java +++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowTaskIdentifier.java @@ -19,7 +19,9 @@ package org.apache.tez.analyzer.plugins; import com.google.common.collect.Lists; + import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.ToolRunner; import org.apache.tez.analyzer.Analyzer; import org.apache.tez.analyzer.CSVResult; import org.apache.tez.dag.api.TezException; @@ -38,7 +40,7 @@ import java.util.List; * <p/> * //TODO: We do not get counters for killed task attempts yet. */ -public class SlowTaskIdentifier implements Analyzer { +public class SlowTaskIdentifier extends TezAnalyzerBase implements Analyzer { private static final String[] headers = { "vertexName", "taskAttemptId", "Node", "taskDuration", "Status", "diagnostics", @@ -113,4 +115,12 @@ public class SlowTaskIdentifier implements Analyzer { public Configuration getConfiguration() { return config; } + + public static void main(String[] args) throws Exception { + Configuration config = new Configuration(); + SlowTaskIdentifier analyzer = new SlowTaskIdentifier(config); + int res = ToolRunner.run(config, analyzer, args); + analyzer.printResults(); + System.exit(res); + } } http://git-wip-us.apache.org/repos/asf/tez/blob/3b669f89/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowestVertexAnalyzer.java ---------------------------------------------------------------------- diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowestVertexAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowestVertexAnalyzer.java index c8d9695..33f2421 100644 --- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowestVertexAnalyzer.java +++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowestVertexAnalyzer.java @@ -21,7 +21,9 @@ package org.apache.tez.analyzer.plugins; import com.codahale.metrics.Histogram; import com.codahale.metrics.MetricRegistry; import com.google.common.collect.Lists; + import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.ToolRunner; import org.apache.tez.analyzer.Analyzer; import org.apache.tez.analyzer.CSVResult; import org.apache.tez.common.counters.TaskCounter; @@ -38,7 +40,7 @@ import java.util.Map; /** * Identify the slowest vertex in the DAG. */ -public class SlowestVertexAnalyzer implements Analyzer { +public class SlowestVertexAnalyzer extends TezAnalyzerBase implements Analyzer { private static final String[] headers = { "vertexName", "taskAttempts", "totalTime", "shuffleTime", "shuffleTime_Max", "LastEventReceived", "LastEventReceivedFrom", @@ -207,4 +209,11 @@ public class SlowestVertexAnalyzer implements Analyzer { return config; } + public static void main(String[] args) throws Exception { + Configuration config = new Configuration(); + SlowestVertexAnalyzer analyzer = new SlowestVertexAnalyzer(config); + int res = ToolRunner.run(config, analyzer, args); + analyzer.printResults(); + System.exit(res); + } } http://git-wip-us.apache.org/repos/asf/tez/blob/3b669f89/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SpillAnalyzerImpl.java ---------------------------------------------------------------------- diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SpillAnalyzerImpl.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SpillAnalyzerImpl.java index 83b1bb0..d69ca23 100644 --- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SpillAnalyzerImpl.java +++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SpillAnalyzerImpl.java @@ -19,7 +19,9 @@ package org.apache.tez.analyzer.plugins; import com.google.common.collect.Lists; + import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.ToolRunner; import org.apache.tez.analyzer.Analyzer; import org.apache.tez.analyzer.CSVResult; import org.apache.tez.common.counters.TaskCounter; @@ -39,7 +41,7 @@ import java.util.Map; * <p/> * Accompany this with OUTPUT_BYTES (> 1 GB data written) */ -public class SpillAnalyzerImpl implements Analyzer { +public class SpillAnalyzerImpl extends TezAnalyzerBase implements Analyzer { private static final String[] headers = { "vertexName", "taskAttemptId", "Node", "counterGroupName", @@ -132,4 +134,12 @@ public class SpillAnalyzerImpl implements Analyzer { public Configuration getConfiguration() { return config; } + + public static void main(String[] args) throws Exception { + Configuration config = new Configuration(); + SpillAnalyzerImpl analyzer = new SpillAnalyzerImpl(config); + int res = ToolRunner.run(config, analyzer, args); + analyzer.printResults(); + System.exit(res); + } } http://git-wip-us.apache.org/repos/asf/tez/blob/3b669f89/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TaskConcurrencyAnalyzer.java ---------------------------------------------------------------------- diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TaskConcurrencyAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TaskConcurrencyAnalyzer.java index c07ff83..070294f 100644 --- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TaskConcurrencyAnalyzer.java +++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TaskConcurrencyAnalyzer.java @@ -20,7 +20,9 @@ package org.apache.tez.analyzer.plugins; import com.google.common.collect.Lists; import com.google.common.collect.TreeMultiset; + import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.ToolRunner; import org.apache.tez.analyzer.Analyzer; import org.apache.tez.analyzer.CSVResult; import org.apache.tez.dag.api.TezException; @@ -34,7 +36,7 @@ import java.util.List; /** * Analyze concurrent tasks running in every vertex at regular intervals. */ -public class TaskConcurrencyAnalyzer implements Analyzer { +public class TaskConcurrencyAnalyzer extends TezAnalyzerBase implements Analyzer { private static final String[] headers = { "time", "vertexName", "concurrentTasksRunning" }; @@ -135,4 +137,12 @@ public class TaskConcurrencyAnalyzer implements Analyzer { public Configuration getConfiguration() { return config; } + + public static void main(String[] args) throws Exception { + Configuration config = new Configuration(); + TaskConcurrencyAnalyzer analyzer = new TaskConcurrencyAnalyzer(config); + int res = ToolRunner.run(config, analyzer, args); + analyzer.printResults(); + System.exit(res); + } } http://git-wip-us.apache.org/repos/asf/tez/blob/3b669f89/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TezAnalyzerBase.java ---------------------------------------------------------------------- diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TezAnalyzerBase.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TezAnalyzerBase.java index fa3bd97..e63e97b 100644 --- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TezAnalyzerBase.java +++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TezAnalyzerBase.java @@ -19,6 +19,7 @@ package org.apache.tez.analyzer.plugins; import java.io.File; +import java.util.Iterator; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.GnuParser; @@ -30,6 +31,9 @@ import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.util.Tool; import org.apache.tez.analyzer.Analyzer; +import org.apache.tez.analyzer.CSVResult; +import org.apache.tez.analyzer.Result; +import org.apache.tez.dag.api.TezException; import org.apache.tez.history.ATSImportTool; import org.apache.tez.history.parser.ATSFileParser; import org.apache.tez.history.parser.SimpleHistoryParser; @@ -42,11 +46,16 @@ public abstract class TezAnalyzerBase extends Configured implements Tool, Analyz private static final String EVENT_FILE_NAME = "eventFileName"; private static final String OUTPUT_DIR = "outputDir"; + private static final String SAVE_RESULTS = "saveResults"; private static final String DAG_ID = "dagId"; private static final String FROM_SIMPLE_HISTORY = "fromSimpleHistory"; private static final String HELP = "help"; + private static final int SEPARATOR_WIDTH = 80; + private static final int MIN_COL_WIDTH = 12; + private String outputDir; + private boolean saveResults = false; @SuppressWarnings("static-access") private static Options buildOptions() { @@ -56,6 +65,10 @@ public abstract class TezAnalyzerBase extends Configured implements Tool, Analyz Option outputDirOption = OptionBuilder.withArgName(OUTPUT_DIR).withLongOpt(OUTPUT_DIR) .withDescription("Directory to write outputs to.").hasArg().isRequired(false).create(); + Option saveResults = OptionBuilder.withArgName(SAVE_RESULTS).withLongOpt(SAVE_RESULTS) + .withDescription("Saves results to output directory (optional)") + .hasArg(false).isRequired(false).create(); + Option eventFileNameOption = OptionBuilder.withArgName(EVENT_FILE_NAME).withLongOpt (EVENT_FILE_NAME) .withDescription("File with event data for the DAG").hasArg() @@ -74,6 +87,7 @@ public abstract class TezAnalyzerBase extends Configured implements Tool, Analyz Options opts = new Options(); opts.addOption(dagIdOption); opts.addOption(outputDirOption); + opts.addOption(saveResults); opts.addOption(eventFileNameOption); opts.addOption(fromSimpleHistoryOption); opts.addOption(help); @@ -104,6 +118,7 @@ public abstract class TezAnalyzerBase extends Configured implements Tool, Analyz printUsage(); return -1; } + saveResults = cmdLine.hasOption(SAVE_RESULTS); if(cmdLine.hasOption(HELP)) { printUsage(); @@ -156,7 +171,43 @@ public abstract class TezAnalyzerBase extends Configured implements Tool, Analyz } Preconditions.checkState(dagInfo.getDagId().equals(dagId)); analyze(dagInfo); + Result result = getResult(); + if (saveResults && (result instanceof CSVResult)) { + String fileName = outputDir + File.separator + + this.getClass().getName() + "_" + dagInfo.getDagId() + ".csv"; + ((CSVResult) result).dumpToFile(fileName); + System.out.println("Saved results in " + fileName); + } return 0; } + public void printResults() throws TezException { + Result result = getResult(); + if (result instanceof CSVResult) { + String[] headers = ((CSVResult) result).getHeaders(); + + StringBuilder formatBuilder = new StringBuilder(); + int size = Math.max(MIN_COL_WIDTH, SEPARATOR_WIDTH / headers.length); + for (int i = 0; i < headers.length; i++) { + formatBuilder.append("%-").append(size).append("s "); + } + String format = formatBuilder.toString(); + + StringBuilder separator = new StringBuilder(); + for (int i = 0; i < SEPARATOR_WIDTH; i++) { + separator.append("-"); + } + + System.out.println(separator); + System.out.println(String.format(format.toString(), headers)); + System.out.println(separator); + + Iterator<String[]> recordsIterator = ((CSVResult) result).getRecordsIterator(); + while (recordsIterator.hasNext()) { + String line = String.format(format, recordsIterator.next()); + System.out.println(line); + } + System.out.println(separator); + } + } } http://git-wip-us.apache.org/repos/asf/tez/blob/3b669f89/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/VertexLevelCriticalPathAnalyzer.java ---------------------------------------------------------------------- diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/VertexLevelCriticalPathAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/VertexLevelCriticalPathAnalyzer.java index 9661ea3..06b8983 100644 --- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/VertexLevelCriticalPathAnalyzer.java +++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/VertexLevelCriticalPathAnalyzer.java @@ -25,7 +25,9 @@ import com.google.common.collect.ImmutableSortedMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Ordering; + import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.ToolRunner; import org.apache.tez.analyzer.Analyzer; import org.apache.tez.analyzer.CSVResult; import org.apache.tez.analyzer.utils.Utils; @@ -41,7 +43,7 @@ import java.util.Map; /** * Identify a set of vertices which fall in the critical path in a DAG. */ -public class VertexLevelCriticalPathAnalyzer implements Analyzer { +public class VertexLevelCriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer { private final Configuration config; private static final String[] headers = { "CriticalPath", "Score" }; @@ -139,4 +141,12 @@ public class VertexLevelCriticalPathAnalyzer implements Analyzer { return Lists.newLinkedList(Splitter.on(CONNECTOR).trimResults().omitEmptyStrings().split (criticalPath)); } + + public static void main(String[] args) throws Exception { + Configuration config = new Configuration(); + VertexLevelCriticalPathAnalyzer analyzer = new VertexLevelCriticalPathAnalyzer(config); + int res = ToolRunner.run(config, analyzer, args); + analyzer.printResults(); + System.exit(res); + } }
