TEZ-2973. Backport Analyzers to branch-0.7
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/8c8db7c5 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/8c8db7c5 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/8c8db7c5 Branch: refs/heads/branch-0.7 Commit: 8c8db7c516d0cd1ca256cddbe76bbe306e23f9c2 Parents: 868ca53 Author: Jonathan Eagles <[email protected]> Authored: Mon Dec 7 16:41:00 2015 -0600 Committer: Jonathan Eagles <[email protected]> Committed: Mon Dec 7 16:41:00 2015 -0600 ---------------------------------------------------------------------- CHANGES.txt | 1 + pom.xml | 5 + .../tez/common/counters/FileSystemCounter.java | 4 + .../java/org/apache/tez/dag/utils/Graph.java | 15 +- tez-plugins/pom.xml | 2 + .../tez-history-parser/findbugs-exclude.xml | 16 + tez-plugins/tez-history-parser/pom.xml | 190 +++++ .../org/apache/tez/history/ATSImportTool.java | 463 +++++++++++ .../org/apache/tez/history/parser/ATSData.java | 48 ++ .../tez/history/parser/ATSFileParser.java | 227 +++++ .../tez/history/parser/SimpleHistoryParser.java | 242 ++++++ .../datamodel/AdditionalInputOutputDetails.java | 71 ++ .../tez/history/parser/datamodel/BaseInfo.java | 142 ++++ .../history/parser/datamodel/BaseParser.java | 114 +++ .../tez/history/parser/datamodel/Constants.java | 64 ++ .../tez/history/parser/datamodel/Container.java | 70 ++ .../tez/history/parser/datamodel/DagInfo.java | 586 +++++++++++++ .../tez/history/parser/datamodel/EdgeInfo.java | 112 +++ .../tez/history/parser/datamodel/Event.java | 63 ++ .../parser/datamodel/TaskAttemptInfo.java | 379 +++++++++ .../tez/history/parser/datamodel/TaskInfo.java | 354 ++++++++ .../history/parser/datamodel/VersionInfo.java | 45 + .../history/parser/datamodel/VertexInfo.java | 636 ++++++++++++++ .../apache/tez/history/parser/utils/Utils.java | 139 ++++ .../apache/tez/history/TestHistoryParser.java | 813 ++++++++++++++++++ .../analyzers/job-analyzer/findbugs-exclude.xml | 28 + tez-tools/analyzers/job-analyzer/pom.xml | 168 ++++ .../java/org/apache/tez/analyzer/Analyzer.java | 64 ++ .../java/org/apache/tez/analyzer/CSVResult.java | 115 +++ .../java/org/apache/tez/analyzer/Result.java | 39 + .../tez/analyzer/plugins/AnalyzerDriver.java | 59 ++ .../plugins/ContainerReuseAnalyzer.java | 97 +++ .../analyzer/plugins/CriticalPathAnalyzer.java | 646 +++++++++++++++ .../tez/analyzer/plugins/LocalityAnalyzer.java | 204 +++++ .../analyzer/plugins/ShuffleTimeAnalyzer.java | 223 +++++ .../tez/analyzer/plugins/SkewAnalyzer.java | 323 ++++++++ .../tez/analyzer/plugins/SlowNodeAnalyzer.java | 197 +++++ .../analyzer/plugins/SlowTaskIdentifier.java | 126 +++ .../analyzer/plugins/SlowestVertexAnalyzer.java | 219 +++++ .../tez/analyzer/plugins/SpillAnalyzerImpl.java | 145 ++++ .../plugins/TaskConcurrencyAnalyzer.java | 148 ++++ .../tez/analyzer/plugins/TezAnalyzerBase.java | 213 +++++ .../VertexLevelCriticalPathAnalyzer.java | 152 ++++ .../org/apache/tez/analyzer/utils/SVGUtils.java | 334 ++++++++ .../org/apache/tez/analyzer/utils/Utils.java | 100 +++ .../org/apache/tez/analyzer/TestAnalyzer.java | 823 +++++++++++++++++++ tez-tools/analyzers/pom.xml | 51 ++ tez-tools/pom.xml | 4 + 48 files changed, 9275 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 7fb7957..8c23aa6 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -8,6 +8,7 @@ INCOMPATIBLE CHANGES TEZ-2949. Allow duplicate dag names within session for Tez. ALL CHANGES + TEZ-2973. Backport Analyzers to branch-0.7 TEZ-2975. Bump up apache commons dependency. TEZ-2970. Re-localization in TezChild does not use correct UGI. TEZ-2968. Counter limits exception causes AM to crash. http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 3b7aaf4..169353f 100644 --- a/pom.xml +++ b/pom.xml @@ -176,6 +176,11 @@ <version>${pig.version}</version> </dependency> <dependency> + <groupId>io.dropwizard.metrics</groupId> + <artifactId>metrics-core</artifactId> + <version>3.1.0</version> + </dependency> + <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.5</version> http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-api/src/main/java/org/apache/tez/common/counters/FileSystemCounter.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/common/counters/FileSystemCounter.java b/tez-api/src/main/java/org/apache/tez/common/counters/FileSystemCounter.java index 57d1053..73e3581 100644 --- a/tez-api/src/main/java/org/apache/tez/common/counters/FileSystemCounter.java +++ b/tez-api/src/main/java/org/apache/tez/common/counters/FileSystemCounter.java @@ -27,4 +27,8 @@ public enum FileSystemCounter { READ_OPS, LARGE_READ_OPS, WRITE_OPS, + HDFS_BYTES_READ, + HDFS_BYTES_WRITTEN, + FILE_BYTES_READ, + FILE_BYTES_WRITTEN } http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-dag/src/main/java/org/apache/tez/dag/utils/Graph.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/utils/Graph.java b/tez-dag/src/main/java/org/apache/tez/dag/utils/Graph.java index cc9033d..1d8e395 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/utils/Graph.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/utils/Graph.java @@ -62,6 +62,7 @@ public class Graph { List<Edge> outs; String label; String shape; + String color; public Node(String id) { this(id, null); @@ -104,6 +105,10 @@ public class Graph { public void setShape(String shape) { this.shape = shape; } + + public void setColor(String color) { + this.color = color; + } } private String name; @@ -196,17 +201,19 @@ public class Graph { for (Node n : nodes) { if (n.shape != null && !n.shape.isEmpty()) { sb.append(String.format( - "%s%s [ label = %s, shape = %s ];", + "%s%s [ label = %s, shape = %s , color= %s];", indent, wrapSafeString(n.getUniqueId()), wrapSafeString(n.getLabel()), - wrapSafeString(n.shape))); + wrapSafeString(n.shape), + wrapSafeString(n.color == null ? "black" : n.color))); } else { sb.append(String.format( - "%s%s [ label = %s ];", + "%s%s [ label = %s , color= %s ];", indent, wrapSafeString(n.getUniqueId()), - wrapSafeString(n.getLabel()))); + wrapSafeString(n.getLabel()), + wrapSafeString(n.color == null ? "black" : n.color))); } sb.append(System.getProperty("line.separator")); List<Edge> combinedOuts = combineEdges(n.outs); http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-plugins/pom.xml ---------------------------------------------------------------------- diff --git a/tez-plugins/pom.xml b/tez-plugins/pom.xml index 89b9d1e..fe7c61a 100644 --- a/tez-plugins/pom.xml +++ b/tez-plugins/pom.xml @@ -34,6 +34,7 @@ </activation> <modules> <module>tez-yarn-timeline-history</module> + <module>tez-history-parser</module> </modules> </profile> <profile> @@ -46,6 +47,7 @@ <modules> <module>tez-yarn-timeline-history</module> <module>tez-yarn-timeline-history-with-acls</module> + <module>tez-history-parser</module> </modules> </profile> </profiles> http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-plugins/tez-history-parser/findbugs-exclude.xml ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-history-parser/findbugs-exclude.xml b/tez-plugins/tez-history-parser/findbugs-exclude.xml new file mode 100644 index 0000000..5b11308 --- /dev/null +++ b/tez-plugins/tez-history-parser/findbugs-exclude.xml @@ -0,0 +1,16 @@ +<!-- + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. See accompanying LICENSE file. +--> +<FindBugsFilter> + +</FindBugsFilter> http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-plugins/tez-history-parser/pom.xml ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-history-parser/pom.xml b/tez-plugins/tez-history-parser/pom.xml new file mode 100644 index 0000000..7dce608 --- /dev/null +++ b/tez-plugins/tez-history-parser/pom.xml @@ -0,0 +1,190 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.tez</groupId> + <artifactId>tez-plugins</artifactId> + <version>0.7.1-SNAPSHOT</version> + </parent> + <artifactId>tez-history-parser</artifactId> + + <dependencies> + <dependency> + <groupId>org.apache.tez</groupId> + <artifactId>tez-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.tez</groupId> + <artifactId>tez-common</artifactId> + </dependency> + <dependency> + <groupId>org.apache.tez</groupId> + <artifactId>tez-dag</artifactId> + </dependency> + <dependency> + <groupId>org.apache.tez</groupId> + <artifactId>tez-tests</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.tez</groupId> + <artifactId>tez-tests</artifactId> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.tez</groupId> + <artifactId>tez-yarn-timeline-history</artifactId> + </dependency> + <dependency> + <groupId>org.apache.tez</groupId> + <artifactId>tez-yarn-timeline-history</artifactId> + <scope>test</scope> + <type>test-jar</type> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-common</artifactId> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-client</artifactId> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-server-tests</artifactId> + <scope>test</scope> + <type>test-jar</type> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs</artifactId> + <scope>test</scope> + <type>test-jar</type> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <scope>test</scope> + <type>test-jar</type> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-core</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-common</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-jobclient</artifactId> + <scope>test</scope> + <type>test-jar</type> + </dependency> + <dependency> + <groupId>org.codehaus.jettison</groupId> + <artifactId>jettison</artifactId> + </dependency> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-json</artifactId> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.rat</groupId> + <artifactId>apache-rat-plugin</artifactId> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-deploy-plugin</artifactId> + <configuration> + <skip>true</skip> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <configuration> + <archive> + <manifest> + <mainClass>org.apache.tez.history.ATSImportTool</mainClass> + </manifest> + </archive> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-assembly-plugin</artifactId> + <configuration> + <descriptorRefs> + <descriptorRef>jar-with-dependencies</descriptorRef> + </descriptorRefs> + <archive> + <manifest> + <addClasspath>true</addClasspath> + <mainClass>org.apache.tez.history.ATSImportTool</mainClass> + </manifest> + </archive> + </configuration> + <executions> + <execution> + <id>assemble-all</id> + <phase>package</phase> + <goals> + <goal>single</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project> http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/ATSImportTool.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/ATSImportTool.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/ATSImportTool.java new file mode 100644 index 0000000..3efeb5a --- /dev/null +++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/ATSImportTool.java @@ -0,0 +1,463 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.history; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.sun.jersey.api.client.Client; +import com.sun.jersey.api.client.ClientHandlerException; +import com.sun.jersey.api.client.ClientResponse; +import com.sun.jersey.api.client.UniformInterfaceException; +import com.sun.jersey.api.client.WebResource; +import com.sun.jersey.api.client.config.ClientConfig; +import com.sun.jersey.api.client.config.DefaultClientConfig; +import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory; +import com.sun.jersey.client.urlconnection.URLConnectionClientHandler; +import com.sun.jersey.json.impl.provider.entity.JSONRootElementProvider; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.GnuParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.MissingOptionException; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.OptionBuilder; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.commons.io.IOUtils; +import org.apache.commons.io.LineIterator; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.http.HttpConfig; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.tez.dag.api.TezException; +import org.apache.tez.dag.records.TezDAGID; +import org.apache.tez.history.parser.datamodel.Constants; +import org.apache.tez.history.parser.utils.Utils; +import org.codehaus.jettison.json.JSONArray; +import org.codehaus.jettison.json.JSONException; +import org.codehaus.jettison.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.ws.rs.core.MediaType; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.net.HttpURLConnection; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URL; +import java.net.URLEncoder; +import java.util.zip.ZipEntry; +import java.util.zip.ZipOutputStream; + +import static org.apache.hadoop.classification.InterfaceStability.Evolving; + +/** + * <pre> + * Simple tool which imports ATS data pertaining to a DAG (Dag, Vertex, Task, Attempt) + * and creates a zip file out of it. + * + * usage: + * + * java -cp tez-history-parser-x.y.z-jar-with-dependencies.jar org.apache.tez.history.ATSImportTool + * + * OR + * + * HADOOP_CLASSPATH=$TEZ_HOME/*:$TEZ_HOME/lib/*:$HADOOP_CLASSPATH hadoop jar tez-history-parser-x.y.z.jar org.apache.tez.history.ATSImportTool + * + * + * --yarnTimelineAddress <yarnTimelineAddress> Optional. Yarn Timeline Address(e.g http://clusterATSNode:8188) + * --batchSize <batchSize> Optional. batch size for downloading data + * --dagId <dagId> DagId that needs to be downloaded + * --downloadDir <downloadDir> download directory where data needs to be downloaded + * --help print help + * + * </pre> + */ +@Evolving +public class ATSImportTool extends Configured implements Tool { + + private static final Logger LOG = LoggerFactory.getLogger(ATSImportTool.class); + + private static final String BATCH_SIZE = "batchSize"; + private static final int BATCH_SIZE_DEFAULT = 100; + + private static final String YARN_TIMELINE_SERVICE_ADDRESS = "yarnTimelineAddress"; + private static final String DAG_ID = "dagId"; + private static final String BASE_DOWNLOAD_DIR = "downloadDir"; + + private static final String HTTPS_SCHEME = "https://"; + private static final String HTTP_SCHEME = "http://"; + + private static final String VERTEX_QUERY_STRING = "%s/%s?limit=%s&primaryFilter=%s:%s"; + private static final String TASK_QUERY_STRING = "%s/%s?limit=%s&primaryFilter=%s:%s"; + private static final String TASK_ATTEMPT_QUERY_STRING = "%s/%s?limit=%s&primaryFilter=%s:%s"; + private static final String UTF8 = "UTF-8"; + + private static final String LINE_SEPARATOR = System.getProperty("line.separator"); + + private final int batchSize; + private final String baseUri; + private final String dagId; + + private final File zipFile; + private final Client httpClient; + private final TezDAGID tezDAGID; + + public ATSImportTool(String baseUri, String dagId, File downloadDir, int batchSize) { + Preconditions.checkArgument(!Strings.isNullOrEmpty(dagId), "dagId can not be null or empty"); + Preconditions.checkArgument(downloadDir != null, "downloadDir can not be null"); + tezDAGID = TezDAGID.fromString(dagId); + + this.baseUri = baseUri; + this.batchSize = batchSize; + this.dagId = dagId; + + this.httpClient = getHttpClient(); + + this.zipFile = new File(downloadDir, this.dagId + ".zip"); + + boolean result = downloadDir.mkdirs(); + LOG.trace("Result of creating dir {}={}", downloadDir, result); + if (!downloadDir.exists()) { + throw new IllegalArgumentException("dir=" + downloadDir + " does not exist"); + } + + LOG.info("Using baseURL={}, dagId={}, batchSize={}, downloadDir={}", baseUri, dagId, + batchSize, downloadDir); + } + + /** + * Download data from ATS for specific DAG + * + * @throws Exception + */ + private void download() throws Exception { + FileOutputStream fos = null; + try { + fos = new FileOutputStream(zipFile, false); + ZipOutputStream zos = new ZipOutputStream(fos); + downloadData(zos); + IOUtils.closeQuietly(zos); + } catch (Exception e) { + LOG.error("Exception in download", e); + throw e; + } finally { + if (httpClient != null) { + httpClient.destroy(); + } + IOUtils.closeQuietly(fos); + } + } + + /** + * Download DAG data (DAG, Vertex, Task, TaskAttempts) from ATS and write to zip file + * + * @param zos + * @throws TezException + * @throws JSONException + * @throws IOException + */ + private void downloadData(ZipOutputStream zos) throws TezException, JSONException, IOException { + JSONObject finalJson = new JSONObject(); + + //Download application details (TEZ_VERSION etc) + String tezAppId = "tez_" + tezDAGID.getApplicationId().toString(); + String tezAppUrl = String.format("%s/%s/%s", baseUri, Constants.TEZ_APPLICATION, tezAppId); + JSONObject tezAppJson = getJsonRootEntity(tezAppUrl); + finalJson.put(Constants.APPLICATION, tezAppJson); + + //Download dag + String dagUrl = String.format("%s/%s/%s", baseUri, Constants.TEZ_DAG_ID, dagId); + JSONObject dagRoot = getJsonRootEntity(dagUrl); + finalJson.put(Constants.DAG, dagRoot); + + //Create a zip entry with dagId as its name. + ZipEntry zipEntry = new ZipEntry(dagId); + zos.putNextEntry(zipEntry); + //Write in formatted way + IOUtils.write(finalJson.toString(4), zos, UTF8); + + //Download vertex + String vertexURL = + String.format(VERTEX_QUERY_STRING, baseUri, + Constants.TEZ_VERTEX_ID, batchSize, Constants.TEZ_DAG_ID, dagId); + downloadJSONArrayFromATS(vertexURL, zos, Constants.VERTICES); + + //Download task + String taskURL = String.format(TASK_QUERY_STRING, baseUri, + Constants.TEZ_TASK_ID, batchSize, Constants.TEZ_DAG_ID, dagId); + downloadJSONArrayFromATS(taskURL, zos, Constants.TASKS); + + //Download task attempts + String taskAttemptURL = String.format(TASK_ATTEMPT_QUERY_STRING, baseUri, + Constants.TEZ_TASK_ATTEMPT_ID, batchSize, Constants.TEZ_DAG_ID, dagId); + downloadJSONArrayFromATS(taskAttemptURL, zos, Constants.TASK_ATTEMPTS); + } + + /** + * Download data from ATS in batches + * + * @param url + * @param zos + * @param tag + * @throws IOException + * @throws TezException + * @throws JSONException + */ + private void downloadJSONArrayFromATS(String url, ZipOutputStream zos, String tag) + throws IOException, TezException, JSONException { + + Preconditions.checkArgument(zos != null, "ZipOutputStream can not be null"); + + String baseUrl = url; + JSONArray entities; + + long downloadedCount = 0; + while ((entities = getJsonRootEntity(url).optJSONArray(Constants.ENTITIES)) != null + && entities.length() > 0) { + + int limit = (entities.length() >= batchSize) ? (entities.length() - 1) : entities.length(); + LOG.debug("Limit={}, downloaded entities len={}", limit, entities.length()); + + //write downloaded part to zipfile. This is done to avoid any memory pressure when + // downloading and writing 1000s of tasks. + ZipEntry zipEntry = new ZipEntry("part-" + System.currentTimeMillis() + ".json"); + zos.putNextEntry(zipEntry); + JSONObject finalJson = new JSONObject(); + finalJson.put(tag, entities); + IOUtils.write(finalJson.toString(4), zos, "UTF-8"); + downloadedCount += entities.length(); + + if (entities.length() < batchSize) { + break; + } + + //Set the last item in entities as the fromId + url = baseUrl + "&fromId=" + + entities.getJSONObject(entities.length() - 1).getString(Constants.ENTITY); + + String firstItem = entities.getJSONObject(0).getString(Constants.ENTITY); + String lastItem = entities.getJSONObject(entities.length() - 1).getString(Constants.ENTITY); + LOG.info("Downloaded={}, First item={}, LastItem={}, new url={}", downloadedCount, + firstItem, lastItem, url); + } + } + + private void logErrorMessage(ClientResponse response) throws IOException { + LOG.error("Response status={}", response.getClientResponseStatus().toString()); + LineIterator it = null; + try { + it = IOUtils.lineIterator(response.getEntityInputStream(), UTF8); + while (it.hasNext()) { + String line = it.nextLine(); + LOG.error(line); + } + } finally { + if (it != null) { + it.close(); + } + } + } + + //For secure cluster, this should work as long as valid ticket is available in the node. + private JSONObject getJsonRootEntity(String url) throws TezException, IOException { + try { + WebResource wr = getHttpClient().resource(url); + ClientResponse response = wr.accept(MediaType.APPLICATION_JSON_TYPE) + .type(MediaType.APPLICATION_JSON_TYPE) + .get(ClientResponse.class); + + if (response.getClientResponseStatus() != ClientResponse.Status.OK) { + // In the case of secure cluster, if there is any auth exception it sends the data back as + // a html page and JSON parsing could throw exceptions. Instead, get the stream contents + // completely and log it in case of error. + logErrorMessage(response); + throw new TezException("Failed to get response from YARN Timeline: url: " + url); + } + return response.getEntity(JSONObject.class); + } catch (ClientHandlerException e) { + throw new TezException("Error processing response from YARN Timeline. URL=" + url, e); + } catch (UniformInterfaceException e) { + throw new TezException("Error accessing content from YARN Timeline - unexpected response. " + + "URL=" + url, e); + } catch (IllegalArgumentException e) { + throw new TezException("Error accessing content from YARN Timeline - invalid url. URL=" + url, + e); + } + } + + private Client getHttpClient() { + if (httpClient == null) { + ClientConfig config = new DefaultClientConfig(JSONRootElementProvider.App.class); + HttpURLConnectionFactory urlFactory = new PseudoAuthenticatedURLConnectionFactory(); + return new Client(new URLConnectionClientHandler(urlFactory), config); + } + return httpClient; + } + + static class PseudoAuthenticatedURLConnectionFactory implements HttpURLConnectionFactory { + @Override + public HttpURLConnection getHttpURLConnection(URL url) throws IOException { + String tokenString = (url.getQuery() == null ? "?" : "&") + "user.name=" + + URLEncoder.encode(UserGroupInformation.getCurrentUser().getShortUserName(), "UTF8"); + return (HttpURLConnection) (new URL(url.toString() + tokenString)).openConnection(); + } + } + + @Override + public int run(String[] args) throws Exception { + try { + download(); + return 0; + } catch (Exception e) { + e.printStackTrace(); + LOG.error("Error occurred when downloading data ", e); + return -1; + } + } + + private static Options buildOptions() { + Option dagIdOption = OptionBuilder.withArgName(DAG_ID).withLongOpt(DAG_ID) + .withDescription("DagId that needs to be downloaded").hasArg().isRequired(true).create(); + + Option downloadDirOption = OptionBuilder.withArgName(BASE_DOWNLOAD_DIR).withLongOpt + (BASE_DOWNLOAD_DIR) + .withDescription("Download directory where data needs to be downloaded").hasArg() + .isRequired(true).create(); + + Option atsAddressOption = OptionBuilder.withArgName(YARN_TIMELINE_SERVICE_ADDRESS).withLongOpt( + YARN_TIMELINE_SERVICE_ADDRESS) + .withDescription("Optional. ATS address (e.g http://clusterATSNode:8188)").hasArg() + .isRequired(false) + .create(); + + Option batchSizeOption = OptionBuilder.withArgName(BATCH_SIZE).withLongOpt(BATCH_SIZE) + .withDescription("Optional. batch size for downloading data").hasArg() + .isRequired(false) + .create(); + + Option help = OptionBuilder.withArgName("help").withLongOpt("help") + .withDescription("print help").isRequired(false).create(); + + Options opts = new Options(); + opts.addOption(dagIdOption); + opts.addOption(downloadDirOption); + opts.addOption(atsAddressOption); + opts.addOption(batchSizeOption); + opts.addOption(help); + return opts; + } + + static void printHelp(Options options) { + HelpFormatter formatter = new HelpFormatter(); + formatter.setWidth(240); + String help = LINE_SEPARATOR + + "java -cp tez-history-parser-x.y.z-jar-with-dependencies.jar org.apache.tez.history.ATSImportTool" + + LINE_SEPARATOR + + "OR" + + LINE_SEPARATOR + + "HADOOP_CLASSPATH=$TEZ_HOME/*:$TEZ_HOME/lib/*:$HADOOP_CLASSPATH hadoop jar " + + "tez-history-parser-x.y.z.jar " + ATSImportTool.class.getName() + + LINE_SEPARATOR; + formatter.printHelp(240, help, "Options", options, "", true); + } + + static boolean hasHttpsPolicy(Configuration conf) { + YarnConfiguration yarnConf = new YarnConfiguration(conf); + return (HttpConfig.Policy.HTTPS_ONLY == HttpConfig.Policy.fromString(yarnConf + .get(YarnConfiguration.YARN_HTTP_POLICY_KEY, YarnConfiguration.YARN_HTTP_POLICY_DEFAULT))); + } + + static String getBaseTimelineURL(String yarnTimelineAddress, Configuration conf) + throws TezException { + boolean isHttps = hasHttpsPolicy(conf); + + if (yarnTimelineAddress == null) { + if (isHttps) { + yarnTimelineAddress = conf.get(Constants.TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS_CONF_NAME); + } else { + yarnTimelineAddress = conf.get(Constants.TIMELINE_SERVICE_WEBAPP_HTTP_ADDRESS_CONF_NAME); + } + Preconditions.checkArgument(!Strings.isNullOrEmpty(yarnTimelineAddress), "Yarn timeline address can" + + " not be empty. Please check configurations."); + } else { + yarnTimelineAddress = yarnTimelineAddress.trim(); + Preconditions.checkArgument(!Strings.isNullOrEmpty(yarnTimelineAddress), "Yarn timeline address can" + + " not be empty. Please provide valid url with --" + + YARN_TIMELINE_SERVICE_ADDRESS + " option"); + } + + yarnTimelineAddress = yarnTimelineAddress.toLowerCase(); + if (!yarnTimelineAddress.startsWith(HTTP_SCHEME) + && !yarnTimelineAddress.startsWith(HTTPS_SCHEME)) { + yarnTimelineAddress = ((isHttps) ? HTTPS_SCHEME : HTTP_SCHEME) + yarnTimelineAddress; + } + + try { + yarnTimelineAddress = new URI(yarnTimelineAddress).normalize().toString().trim(); + yarnTimelineAddress = (yarnTimelineAddress.endsWith("/")) ? + yarnTimelineAddress.substring(0, yarnTimelineAddress.length() - 1) : + yarnTimelineAddress; + } catch (URISyntaxException e) { + throw new TezException("Please provide a valid URL. url=" + yarnTimelineAddress, e); + } + + return Joiner.on("").join(yarnTimelineAddress, Constants.RESOURCE_URI_BASE); + } + + @VisibleForTesting + public static int process(String[] args) throws Exception { + Options options = buildOptions(); + try { + Configuration conf = new Configuration(); + CommandLine cmdLine = new GnuParser().parse(options, args); + String dagId = cmdLine.getOptionValue(DAG_ID); + + File downloadDir = new File(cmdLine.getOptionValue(BASE_DOWNLOAD_DIR)); + + String yarnTimelineAddress = cmdLine.getOptionValue(YARN_TIMELINE_SERVICE_ADDRESS); + String baseTimelineURL = getBaseTimelineURL(yarnTimelineAddress, conf); + + int batchSize = (cmdLine.hasOption(BATCH_SIZE)) ? + (Integer.parseInt(cmdLine.getOptionValue(BATCH_SIZE))) : BATCH_SIZE_DEFAULT; + + return ToolRunner.run(conf, new ATSImportTool(baseTimelineURL, dagId, + downloadDir, batchSize), args); + } catch (ParseException e) { + LOG.error("Error in parsing options ", e); + printHelp(options); + throw e; + } catch (Exception e) { + LOG.error("Error in processing ", e); + throw e; + } + } + + public static void main(String[] args) throws Exception { + Utils.setupRootLogger(); + int res = process(args); + System.exit(res); + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/ATSData.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/ATSData.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/ATSData.java new file mode 100644 index 0000000..f504007 --- /dev/null +++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/ATSData.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.history.parser; + +import org.apache.tez.dag.api.TezException; +import org.apache.tez.history.parser.datamodel.DagInfo; +import org.codehaus.jettison.json.JSONException; + +import static org.apache.hadoop.classification.InterfaceStability.Evolving; + +/** + * Main interface to pull data from ATS. + * <p/> + * It is possible that to have ATS data store in any DB (e.g LevelDB or HBase). Depending on + * store, there can be multiple implementations to pull data from these stores and create the + * DagInfo object for analysis. + * <p/> + */ +@Evolving +public interface ATSData { + + /** + * Get the DAG representation for processing + * + * @param dagId + * @return DagInfo + * @throws JSONException + * @throws TezException + */ + public DagInfo getDAGData(String dagId) throws TezException; + +} http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/ATSFileParser.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/ATSFileParser.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/ATSFileParser.java new file mode 100644 index 0000000..aae20eb --- /dev/null +++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/ATSFileParser.java @@ -0,0 +1,227 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.history.parser; + +import com.google.common.base.Preconditions; +import org.apache.commons.io.IOUtils; +import org.apache.tez.dag.api.TezException; +import org.apache.tez.history.parser.datamodel.BaseParser; +import org.apache.tez.history.parser.datamodel.Constants; +import org.apache.tez.history.parser.datamodel.DagInfo; +import org.apache.tez.history.parser.datamodel.TaskAttemptInfo; +import org.apache.tez.history.parser.datamodel.TaskInfo; +import org.apache.tez.history.parser.datamodel.VersionInfo; +import org.apache.tez.history.parser.datamodel.VertexInfo; +import org.codehaus.jettison.json.JSONArray; +import org.codehaus.jettison.json.JSONException; +import org.codehaus.jettison.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.util.Enumeration; +import java.util.zip.ZipEntry; +import java.util.zip.ZipFile; + +import static org.apache.hadoop.classification.InterfaceAudience.Public; +import static org.apache.hadoop.classification.InterfaceStability.Evolving; + +/** + * Simple class to parse ATS zip file of a DAG and generate the relevant in-memory structure + * (DagInfo) necessary for processing later. + */ + +@Public +@Evolving +public class ATSFileParser extends BaseParser implements ATSData { + + private static final Logger LOG = LoggerFactory.getLogger(ATSFileParser.class); + + private final File atsZipFile; + + public ATSFileParser(File atsZipFile) throws TezException { + super(); + Preconditions.checkArgument(atsZipFile.exists(), "Zipfile " + atsZipFile + " does not exist"); + this.atsZipFile = atsZipFile; + } + + @Override + public DagInfo getDAGData(String dagId) throws TezException { + try { + parseATSZipFile(atsZipFile); + + linkParsedContents(); + + return dagInfo; + } catch (IOException e) { + LOG.error("Error in reading DAG ", e); + throw new TezException(e); + } catch (JSONException e) { + LOG.error("Error in parsing DAG ", e); + throw new TezException(e); + } catch (InterruptedException e) { + throw new TezException(e); + } + } + + /** + * Parse vertices json + * + * @param verticesJson + * @throws JSONException + */ + private void processVertices(JSONArray verticesJson) throws JSONException { + //Process vertex information + Preconditions.checkState(verticesJson != null, "Vertex json can not be null"); + if (verticesJson != null) { + LOG.info("Started parsing vertex"); + for (int i = 0; i < verticesJson.length(); i++) { + VertexInfo vertexInfo = VertexInfo.create(verticesJson.getJSONObject(i)); + vertexList.add(vertexInfo); + } + LOG.info("Finished parsing vertex"); + } + } + + /** + * Parse Tasks json + * + * @param tasksJson + * @throws JSONException + */ + private void processTasks(JSONArray tasksJson) throws JSONException { + //Process Task information + Preconditions.checkState(tasksJson != null, "Task json can not be null"); + if (tasksJson != null) { + LOG.debug("Started parsing task"); + for (int i = 0; i < tasksJson.length(); i++) { + TaskInfo taskInfo = TaskInfo.create(tasksJson.getJSONObject(i)); + taskList.add(taskInfo); + } + LOG.debug("Finished parsing task"); + } + } + + /** + * Parse TaskAttempt json + * + * @param taskAttemptsJson + * @throws JSONException + */ + private void processAttempts(JSONArray taskAttemptsJson) throws JSONException { + //Process TaskAttempt information + Preconditions.checkState(taskAttemptsJson != null, "Attempts json can not be null"); + if (taskAttemptsJson != null) { + LOG.debug("Started parsing task attempts"); + for (int i = 0; i < taskAttemptsJson.length(); i++) { + TaskAttemptInfo attemptInfo = TaskAttemptInfo.create(taskAttemptsJson.getJSONObject(i)); + attemptList.add(attemptInfo); + } + LOG.debug("Finished parsing task attempts"); + } + } + + /** + * Parse TezApplication json + * + * @param tezApplicationJson + * @throws JSONException + */ + private void processApplication(JSONObject tezApplicationJson) throws JSONException { + if (tezApplicationJson != null) { + LOG.debug("Started parsing tez application"); + JSONObject otherInfoNode = tezApplicationJson.optJSONObject(Constants.OTHER_INFO); + if (otherInfoNode != null) { + JSONObject tezVersion = otherInfoNode.optJSONObject(Constants.TEZ_VERSION); + if (tezVersion != null) { + String version = tezVersion.optString(Constants.VERSION); + String buildTime = tezVersion.optString(Constants.BUILD_TIME); + String revision = tezVersion.optString(Constants.REVISION); + this.versionInfo = new VersionInfo(version, buildTime, revision); + } + //TODO: might need to parse config info? (e.g, hive settings etc. could consume memory) + } + LOG.debug("Finished parsing tez application"); + } + } + + private JSONObject readJson(InputStream in) throws IOException, JSONException { + //Read entire content to memory + final ByteArrayOutputStream bout = new ByteArrayOutputStream(); + IOUtils.copy(in, bout); + return new JSONObject(new String(bout.toByteArray(), "UTF-8")); + } + + /** + * Read zip file contents. Every file can contain "dag", "vertices", "tasks", "task_attempts" + * + * @param atsFile + * @throws IOException + * @throws JSONException + */ + private void parseATSZipFile(File atsFile) + throws IOException, JSONException, TezException, InterruptedException { + final ZipFile atsZipFile = new ZipFile(atsFile); + try { + Enumeration<? extends ZipEntry> zipEntries = atsZipFile.entries(); + while (zipEntries.hasMoreElements()) { + ZipEntry zipEntry = zipEntries.nextElement(); + LOG.info("Processing " + zipEntry.getName()); + InputStream inputStream = atsZipFile.getInputStream(zipEntry); + JSONObject jsonObject = readJson(inputStream); + + //This json can contain dag, vertices, tasks, task_attempts + JSONObject dagJson = jsonObject.optJSONObject(Constants.DAG); + if (dagJson != null) { + //TODO: support for multiple dags per ATS file later. + dagInfo = DagInfo.create(dagJson); + } + + //Process vertex + JSONArray vertexJson = jsonObject.optJSONArray(Constants.VERTICES); + if (vertexJson != null) { + processVertices(vertexJson); + } + + //Process task + JSONArray taskJson = jsonObject.optJSONArray(Constants.TASKS); + if (taskJson != null) { + processTasks(taskJson); + } + + //Process task attempts + JSONArray attemptsJson = jsonObject.optJSONArray(Constants.TASK_ATTEMPTS); + if (attemptsJson != null) { + processAttempts(attemptsJson); + } + + //Process application (mainly versionInfo) + JSONObject tezAppJson = jsonObject.optJSONObject(Constants.APPLICATION); + if (tezAppJson != null) { + processApplication(tezAppJson); + } + } + } finally { + atsZipFile.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/SimpleHistoryParser.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/SimpleHistoryParser.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/SimpleHistoryParser.java new file mode 100644 index 0000000..4d3e96f --- /dev/null +++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/SimpleHistoryParser.java @@ -0,0 +1,242 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tez.history.parser; + +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.collect.Maps; +import org.apache.tez.dag.api.TezException; +import org.apache.tez.dag.history.logging.impl.SimpleHistoryLoggingService; +import org.apache.tez.dag.records.TezDAGID; +import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.dag.records.TezTaskID; +import org.apache.tez.dag.records.TezVertexID; +import org.apache.tez.history.parser.datamodel.BaseParser; +import org.apache.tez.history.parser.datamodel.Constants; +import org.apache.tez.history.parser.datamodel.DagInfo; +import org.apache.tez.history.parser.datamodel.TaskAttemptInfo; +import org.apache.tez.history.parser.datamodel.TaskInfo; +import org.apache.tez.history.parser.datamodel.VertexInfo; +import org.codehaus.jettison.json.JSONArray; +import org.codehaus.jettison.json.JSONException; +import org.codehaus.jettison.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.Iterator; +import java.util.Map; +import java.util.Scanner; + +/** + * Parser utility to parse data generated by SimpleHistoryLogging to in-memory datamodel provided + * in org.apache.tez.history.parser.datamodel + * <p/> + * <p/> + * Most of the information should be available. Minor info like VersionInfo may not be available, + * as it is not captured in SimpleHistoryLogging. + */ +public class SimpleHistoryParser extends BaseParser { + private static final Logger LOG = LoggerFactory.getLogger(SimpleHistoryParser.class); + private static final String UTF8 = "UTF-8"; + private final File historyFile; + + + public SimpleHistoryParser(File historyFile) { + super(); + Preconditions.checkArgument(historyFile.exists(), historyFile + " does not exist"); + this.historyFile = historyFile; + } + + /** + * Get in-memory representation of DagInfo + * + * @return DagInfo + * @throws TezException + */ + public DagInfo getDAGData(String dagId) throws TezException { + try { + Preconditions.checkArgument(!Strings.isNullOrEmpty(dagId), "Please provide valid dagId"); + dagId = dagId.trim(); + parseContents(historyFile, dagId); + linkParsedContents(); + return dagInfo; + } catch (IOException e) { + LOG.error("Error in reading DAG ", e); + throw new TezException(e); + } catch (JSONException e) { + LOG.error("Error in parsing DAG ", e); + throw new TezException(e); + } + } + + private void populateOtherInfo(JSONObject source, JSONObject destination) throws JSONException { + if (source == null || destination == null) { + return; + } + for (Iterator it = source.keys(); it.hasNext(); ) { + String key = (String) it.next(); + Object val = source.get(key); + destination.put(key, val); + } + } + + private void populateOtherInfo(JSONObject source, String entityName, + Map<String, JSONObject> destMap) throws JSONException { + JSONObject destinationJson = destMap.get(entityName); + JSONObject destOtherInfo = destinationJson.getJSONObject(Constants.OTHER_INFO); + populateOtherInfo(source, destOtherInfo); + } + + private void parseContents(File historyFile, String dagId) + throws JSONException, FileNotFoundException, TezException { + Scanner scanner = new Scanner(historyFile, UTF8); + scanner.useDelimiter(SimpleHistoryLoggingService.RECORD_SEPARATOR); + JSONObject dagJson = null; + Map<String, JSONObject> vertexJsonMap = Maps.newHashMap(); + Map<String, JSONObject> taskJsonMap = Maps.newHashMap(); + Map<String, JSONObject> attemptJsonMap = Maps.newHashMap(); + TezDAGID tezDAGID = TezDAGID.fromString(dagId); + while (scanner.hasNext()) { + String line = scanner.next(); + JSONObject jsonObject = new JSONObject(line); + String entity = jsonObject.getString(Constants.ENTITY); + String entityType = jsonObject.getString(Constants.ENTITY_TYPE); + if (entityType.equals(Constants.TEZ_DAG_ID)) { + if (!dagId.equals(entity)) { + LOG.warn(dagId + " is not matching with " + entity); + continue; + } + // Club all DAG related information together (DAG_INIT, DAG_FINISH etc). Each of them + // would have a set of entities in otherinfo (e.g vertex mapping, dagPlan, start/finish + // time etc). + if (dagJson == null) { + dagJson = jsonObject; + } + JSONObject otherInfo = jsonObject.optJSONObject(Constants.OTHER_INFO); + JSONObject dagOtherInfo = dagJson.getJSONObject(Constants.OTHER_INFO); + populateOtherInfo(otherInfo, dagOtherInfo); + } + else if (entityType.equals(Constants.TEZ_VERTEX_ID)) { + String vertexName = entity; + TezVertexID tezVertexID = TezVertexID.fromString(vertexName); + if (!tezDAGID.equals(tezVertexID.getDAGId())) { + LOG.warn(vertexName + " does not belong to " + tezDAGID); + continue; + } + if (!vertexJsonMap.containsKey(vertexName)) { + vertexJsonMap.put(vertexName, jsonObject); + } + JSONObject otherInfo = jsonObject.optJSONObject(Constants.OTHER_INFO); + populateOtherInfo(otherInfo, vertexName, vertexJsonMap); + } + else if (entityType.equals(Constants.TEZ_TASK_ID)) { + String taskName = entity; + TezTaskID tezTaskID = TezTaskID.fromString(taskName); + if (!tezDAGID.equals(tezTaskID.getVertexID().getDAGId())) { + LOG.warn(taskName + " does not belong to " + tezDAGID); + continue; + } + if (!taskJsonMap.containsKey(taskName)) { + taskJsonMap.put(taskName, jsonObject); + } + JSONObject otherInfo = jsonObject.optJSONObject(Constants.OTHER_INFO); + populateOtherInfo(otherInfo, taskName, taskJsonMap); + } + else if (entityType.equals(Constants.TEZ_TASK_ATTEMPT_ID)) { + String taskAttemptName = entity; + TezTaskAttemptID tezAttemptId = TezTaskAttemptID.fromString(taskAttemptName); + if (!tezDAGID.equals(tezAttemptId.getTaskID().getVertexID().getDAGId())) { + LOG.warn(taskAttemptName + " does not belong to " + tezDAGID); + continue; + } + if (!attemptJsonMap.containsKey(taskAttemptName)) { + attemptJsonMap.put(taskAttemptName, jsonObject); + } + JSONObject otherInfo = jsonObject.optJSONObject(Constants.OTHER_INFO); + populateOtherInfo(otherInfo, taskAttemptName, attemptJsonMap); + } + } + scanner.close(); + if (dagJson != null) { + this.dagInfo = DagInfo.create(dagJson); + } else { + LOG.error("Dag is not yet parsed. Looks like partial file."); + throw new TezException( + "Please provide a valid/complete history log file containing " + dagId); + } + for (JSONObject jsonObject : vertexJsonMap.values()) { + VertexInfo vertexInfo = VertexInfo.create(jsonObject); + this.vertexList.add(vertexInfo); + LOG.debug("Parsed vertex {}", vertexInfo.getVertexName()); + } + for (JSONObject jsonObject : taskJsonMap.values()) { + TaskInfo taskInfo = TaskInfo.create(jsonObject); + this.taskList.add(taskInfo); + LOG.debug("Parsed task {}", taskInfo.getTaskId()); + } + for (JSONObject jsonObject : attemptJsonMap.values()) { + /** + * For converting SimpleHistoryLogging to in-memory representation + * + * We need to get "relatedEntities":[{"entity":"cn055-10.l42scl.hortonworks.com:58690", + * "entitytype":"nodeId"},{"entity":"container_1438652049951_0008_01_000152", + * "entitytype":"containerId"} and populate it in otherInfo object so that in-memory + * representation can parse it correctly + */ + JSONArray relatedEntities = jsonObject.optJSONArray(Constants.RELATED_ENTITIES); + if (relatedEntities == null) { + //This can happen when CONTAINER_EXITED abruptly. (e.g Container failed, exitCode=1) + LOG.debug("entity {} did not have related entities", + jsonObject.optJSONObject(Constants.ENTITY)); + } else { + JSONObject subJsonObject = relatedEntities.optJSONObject(0); + if (subJsonObject != null) { + String nodeId = subJsonObject.optString(Constants.ENTITY_TYPE); + if (!Strings.isNullOrEmpty(nodeId) && nodeId.equalsIgnoreCase(Constants.NODE_ID)) { + //populate it in otherInfo + JSONObject otherInfo = jsonObject.optJSONObject(Constants.OTHER_INFO); + String nodeIdVal = subJsonObject.optString(Constants.ENTITY); + if (otherInfo != null && nodeIdVal != null) { + otherInfo.put(Constants.NODE_ID, nodeIdVal); + } + } + } + + subJsonObject = relatedEntities.optJSONObject(1); + if (subJsonObject != null) { + String containerId = subJsonObject.optString(Constants.ENTITY_TYPE); + if (!Strings.isNullOrEmpty(containerId) && containerId + .equalsIgnoreCase(Constants.CONTAINER_ID)) { + //populate it in otherInfo + JSONObject otherInfo = jsonObject.optJSONObject(Constants.OTHER_INFO); + String containerIdVal = subJsonObject.optString(Constants.ENTITY); + if (otherInfo != null && containerIdVal != null) { + otherInfo.put(Constants.CONTAINER_ID, containerIdVal); + } + } + } + } + TaskAttemptInfo attemptInfo = TaskAttemptInfo.create(jsonObject); + this.attemptList.add(attemptInfo); + LOG.debug("Parsed task attempt {}", attemptInfo.getTaskAttemptId()); + } + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/AdditionalInputOutputDetails.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/AdditionalInputOutputDetails.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/AdditionalInputOutputDetails.java new file mode 100644 index 0000000..b853d5c --- /dev/null +++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/AdditionalInputOutputDetails.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.history.parser.datamodel; + +import static org.apache.hadoop.classification.InterfaceAudience.Public; +import static org.apache.hadoop.classification.InterfaceStability.Evolving; + +/** + * Additional input/ouput information present in DAG. + */ + +@Public +@Evolving +public class AdditionalInputOutputDetails { + private final String name; + private final String clazz; + private final String initializer; + private final String userPayloadText; + + public AdditionalInputOutputDetails(String name, String clazz, String initializer, + String userPayloadText) { + this.name = name; + this.clazz = clazz; + this.initializer = initializer; + this.userPayloadText = userPayloadText; + } + + public final String getName() { + return name; + } + + public final String getClazz() { + return clazz; + } + + public final String getInitializer() { + return initializer; + } + + public final String getUserPayloadText() { + return userPayloadText; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("["); + sb.append("name=").append(name).append(", "); + sb.append("clazz=").append(clazz).append(", "); + sb.append("initializer=").append(initializer).append(", "); + sb.append("userPayloadText=").append(userPayloadText); + sb.append("]"); + return sb.toString(); + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/BaseInfo.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/BaseInfo.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/BaseInfo.java new file mode 100644 index 0000000..3f9666a --- /dev/null +++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/BaseInfo.java @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.history.parser.datamodel; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.tez.common.counters.CounterGroup; +import org.apache.tez.common.counters.TezCounter; +import org.apache.tez.common.counters.TezCounters; +import org.apache.tez.history.parser.utils.Utils; +import org.codehaus.jettison.json.JSONException; +import org.codehaus.jettison.json.JSONObject; + +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import static org.apache.hadoop.classification.InterfaceAudience.Public; +import static org.apache.hadoop.classification.InterfaceStability.Evolving; + +@Public +@Evolving +public abstract class BaseInfo { + + protected TezCounters tezCounters; + protected List<Event> eventList; + + BaseInfo(JSONObject jsonObject) throws JSONException { + final JSONObject otherInfoNode = jsonObject.getJSONObject(Constants.OTHER_INFO); + //parse tez counters + tezCounters = Utils.parseTezCountersFromJSON( + otherInfoNode.optJSONObject(Constants.COUNTERS)); + + //parse events + eventList = Lists.newArrayList(); + Utils.parseEvents(jsonObject.optJSONArray(Constants.EVENTS), eventList); + } + + public TezCounters getTezCounters() { + return tezCounters; + } + + /** + * Get start time w.r.t DAG + * + * @return long + */ + public abstract long getStartTimeInterval(); + + /** + * Get finish time w.r.t DAG + * + * @return long + */ + public abstract long getFinishTimeInterval(); + + /** + * Get absolute start time + * + * @return long + */ + public abstract long getStartTime(); + + /** + * Get absolute finish time + * + * @return long + */ + public abstract long getFinishTime(); + + public abstract String getDiagnostics(); + + public List<Event> getEvents() { + return eventList; + } + + /** + * Get counter for a specific counter group name. + * If counterGroupName is not mentioned, it would end up returning counter found in all + * groups + * + * @param counterGroupName + * @param counter + * @return Map<String, TezCounter> tez counter at every counter group level + */ + public Map<String, TezCounter> getCounter(String counterGroupName, String counter) { + //TODO: FS, TaskCounters are directly getting added as TezCounters always pass those. Need a + // way to get rid of these. + Map<String, TezCounter> result = Maps.newHashMap(); + Iterator<String> iterator = tezCounters.getGroupNames().iterator(); + boolean found = false; + while (iterator.hasNext()) { + CounterGroup counterGroup = tezCounters.getGroup(iterator.next()); + if (counterGroupName != null) { + String groupName = counterGroup.getName(); + if (groupName.equals(counterGroupName)) { + found = true; + } + } + + //Explicitly mention that no need to create the counter if not present + TezCounter tezCounter = counterGroup.getUnderlyingGroup().findCounter(counter, false); + if (tezCounter != null) { + result.put(counterGroup.getName(), tezCounter); + } + + if (found) { + //Retrieved counter specific to a counter group. Safe to exit. + break; + } + + } + return result; + } + + /** + * Find a counter in all counter groups + * + * @param counter + * @return Map of countergroup to TezCounter mapping + */ + public Map<String, TezCounter> getCounter(String counter) { + return getCounter(null, counter); + } + +} http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/BaseParser.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/BaseParser.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/BaseParser.java new file mode 100644 index 0000000..62ba474 --- /dev/null +++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/BaseParser.java @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.history.parser.datamodel; + +import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.dag.records.TezTaskID; + +import java.util.List; + +public abstract class BaseParser { + + protected DagInfo dagInfo; + protected VersionInfo versionInfo; + protected final List<VertexInfo> vertexList; + protected final List<TaskInfo> taskList; + protected final List<TaskAttemptInfo> attemptList; + + + public BaseParser() { + vertexList = Lists.newLinkedList(); + taskList = Lists.newLinkedList(); + attemptList = Lists.newLinkedList(); + } + + /** + * link the parsed contents, so that it becomes easier to iterate from DAG-->Task and Task--DAG. + * e.g Link vertex to dag, task to vertex, attempt to task etc + */ + protected void linkParsedContents() { + //Link vertex to DAG + for (VertexInfo vertexInfo : vertexList) { + vertexInfo.setDagInfo(dagInfo); + } + + //Link task to vertex + for (TaskInfo taskInfo : taskList) { + //Link vertex to task + String vertexId = TezTaskID.fromString(taskInfo.getTaskId()).getVertexID().toString(); + VertexInfo vertexInfo = dagInfo.getVertexFromId(vertexId); + Preconditions.checkState(vertexInfo != null, "VertexInfo for " + vertexId + " can't be " + + "null"); + taskInfo.setVertexInfo(vertexInfo); + } + + //Link task attempt to task + for (TaskAttemptInfo attemptInfo : attemptList) { + //Link task to task attempt + TezTaskAttemptID taskAttemptId = TezTaskAttemptID.fromString(attemptInfo + .getTaskAttemptId()); + VertexInfo vertexInfo = dagInfo.getVertexFromId(taskAttemptId.getTaskID() + .getVertexID().toString()); + Preconditions.checkState(vertexInfo != null, "Vertex " + taskAttemptId.getTaskID() + .getVertexID().toString() + " is not present in DAG"); + TaskInfo taskInfo = vertexInfo.getTask(taskAttemptId.getTaskID().toString()); + attemptInfo.setTaskInfo(taskInfo); + } + + //Set container details + for (VertexInfo vertexInfo : dagInfo.getVertices()) { + for (TaskAttemptInfo taskAttemptInfo : vertexInfo.getTaskAttempts()) { + dagInfo.addContainerMapping(taskAttemptInfo.getContainer(), taskAttemptInfo); + } + } + + + //Set reference time for all events + for (VertexInfo vertexInfo : dagInfo.getVertices()) { + setReferenceTime(vertexInfo.getEvents(), dagInfo.getStartTimeInterval()); + for (TaskInfo taskInfo : vertexInfo.getTasks()) { + setReferenceTime(taskInfo.getEvents(), dagInfo.getStartTimeInterval()); + for (TaskAttemptInfo taskAttemptInfo : taskInfo.getTaskAttempts()) { + setReferenceTime(taskAttemptInfo.getEvents(), dagInfo.getStartTimeInterval()); + } + } + } + + dagInfo.setVersionInfo(versionInfo); + } + + /** + * Set reference time to all events + * + * @param eventList + * @param referenceTime + */ + private void setReferenceTime(List<Event> eventList, final long referenceTime) { + Iterables.all(eventList, new Predicate<Event>() { + @Override public boolean apply(Event input) { + input.setReferenceTime(referenceTime); + return false; + } + }); + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/Constants.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/Constants.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/Constants.java new file mode 100644 index 0000000..dce79e2 --- /dev/null +++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/Constants.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.history.parser.datamodel; + +import org.apache.tez.common.ATSConstants; + +import static org.apache.hadoop.classification.InterfaceAudience.Private; + +@Private +public class Constants extends ATSConstants { + + public static final String EVENT_TIME_STAMP = "timestamp"; + public static final String TEZ_APPLICATION = "TEZ_APPLICATION"; + public static final String TEZ_TASK_ID = "TEZ_TASK_ID"; + public static final String TEZ_TASK_ATTEMPT_ID = "TEZ_TASK_ATTEMPT_ID"; + + public static final String EDGE_ID = "edgeId"; + public static final String INPUT_VERTEX_NAME = "inputVertexName"; + public static final String OUTPUT_VERTEX_NAME = "outputVertexName"; + public static final String DATA_MOVEMENT_TYPE = "dataMovementType"; + public static final String EDGE_SOURCE_CLASS = "edgeSourceClass"; + public static final String EDGE_DESTINATION_CLASS = "edgeDestinationClass"; + public static final String INPUT_PAYLOAD_TEXT = "inputUserPayloadAsText"; + public static final String OUTPUT_PAYLOAD_TEXT = "outputUserPayloadAsText"; + + public static final String EDGES = "edges"; + public static final String OUT_EDGE_IDS = "outEdgeIds"; + public static final String IN_EDGE_IDS = "inEdgeIds"; + public static final String ADDITIONAL_INPUTS = "additionalInputs"; + public static final String ADDITIONAL_OUTPUTS = "additionalOutputs"; + + public static final String NAME = "name"; + public static final String CLASS = "class"; + public static final String INITIALIZER = "initializer"; + public static final String USER_PAYLOAD_TEXT = "userPayloadAsText"; + + public static final String DAG_CONTEXT = "dagContext"; + + //constants for ATS data export + public static final String DAG = "dag"; + public static final String VERTICES = "vertices"; + public static final String TASKS = "tasks"; + public static final String TASK_ATTEMPTS = "task_attempts"; + public static final String APPLICATION = "application"; + + + +} http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/Container.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/Container.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/Container.java new file mode 100644 index 0000000..4e01d1b --- /dev/null +++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/Container.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.history.parser.datamodel; + +import com.google.common.base.Objects; + +import static org.apache.hadoop.classification.InterfaceAudience.Public; +import static org.apache.hadoop.classification.InterfaceStability.Evolving; + +@Public +@Evolving +public class Container { + + private final String id; + private final String host; + + public Container(String id, String host) { + this.id = id; + this.host = host; + } + + public final String getId() { + return id; + } + + public final String getHost() { + return host; + } + + @Override public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("["); + sb.append("id=").append(id).append(", "); + sb.append("host=").append(host); + sb.append("]"); + return sb.toString(); + } + + @Override public int hashCode() { + return Objects.hashCode(id, host); + } + + @Override public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + final Container other = (Container) obj; + return Objects.equal(this.id, other.id) + && Objects.equal(this.host, other.host); + } +}
