http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/DagInfo.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/DagInfo.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/DagInfo.java new file mode 100644 index 0000000..5fb760c --- /dev/null +++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/DagInfo.java @@ -0,0 +1,586 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.history.parser.datamodel; + +import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; +import com.google.common.collect.Iterables; +import com.google.common.collect.LinkedHashMultimap; +import com.google.common.collect.LinkedListMultimap; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Multimap; +import com.google.common.collect.Multimaps; +import com.google.common.collect.Ordering; +import org.apache.commons.collections.BidiMap; +import org.apache.commons.collections.bidimap.DualHashBidiMap; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.util.StringInterner; +import org.apache.tez.client.CallerContext; +import org.apache.tez.dag.api.event.VertexState; +import org.codehaus.jettison.json.JSONArray; +import org.codehaus.jettison.json.JSONException; +import org.codehaus.jettison.json.JSONObject; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; + +import static org.apache.hadoop.classification.InterfaceAudience.Public; +import static org.apache.hadoop.classification.InterfaceStability.Evolving; + +@Public +@Evolving +public class DagInfo extends BaseInfo { + + private static final Log LOG = LogFactory.getLog(DagInfo.class); + + //Fields populated via JSON + private final String name; + private final long startTime; + private final long endTime; + private final long submitTime; + private final int failedTasks; + private final String dagId; + private final int numVertices; + private final String status; + private final String diagnostics; + private VersionInfo versionInfo; + private CallerContext callerContext; + + //VertexID --> VertexName & vice versa + private final BidiMap vertexNameIDMapping; + + //edgeId to EdgeInfo mapping + private final Map<Integer, EdgeInfo> edgeInfoMap; + + //Only for internal parsing (vertexname mapping) + private Map<String, BasicVertexInfo> basicVertexInfoMap; + + //VertexName --> VertexInfo + private Map<String, VertexInfo> vertexNameMap; + + private Multimap<Container, TaskAttemptInfo> containerMapping; + + DagInfo(JSONObject jsonObject) throws JSONException { + super(jsonObject); + + vertexNameMap = Maps.newHashMap(); + vertexNameIDMapping = new DualHashBidiMap(); + edgeInfoMap = Maps.newHashMap(); + basicVertexInfoMap = Maps.newHashMap(); + containerMapping = LinkedHashMultimap.create(); + + Preconditions.checkArgument(jsonObject.getString(Constants.ENTITY_TYPE).equalsIgnoreCase + (Constants.TEZ_DAG_ID)); + + dagId = StringInterner.weakIntern(jsonObject.getString(Constants.ENTITY)); + + //Parse additional Info + JSONObject otherInfoNode = jsonObject.getJSONObject(Constants.OTHER_INFO); + startTime = otherInfoNode.optLong(Constants.START_TIME); + endTime = otherInfoNode.optLong(Constants.FINISH_TIME); + //TODO: Not getting populated correctly for lots of jobs. Verify + submitTime = otherInfoNode.optLong(Constants.START_REQUESTED_TIME); + diagnostics = otherInfoNode.optString(Constants.DIAGNOSTICS); + failedTasks = otherInfoNode.optInt(Constants.NUM_FAILED_TASKS); + JSONObject dagPlan = otherInfoNode.optJSONObject(Constants.DAG_PLAN); + name = StringInterner.weakIntern((dagPlan != null) ? (dagPlan.optString(Constants.DAG_NAME)) : null); + if (dagPlan != null) { + JSONArray vertices = dagPlan.optJSONArray(Constants.VERTICES); + if (vertices != null) { + numVertices = vertices.length(); + } else { + numVertices = 0; + } + parseDAGPlan(dagPlan); + } else { + numVertices = 0; + } + status = StringInterner.weakIntern(otherInfoNode.optString(Constants.STATUS)); + + //parse name id mapping + JSONObject vertexIDMappingJson = otherInfoNode.optJSONObject(Constants.VERTEX_NAME_ID_MAPPING); + if (vertexIDMappingJson != null) { + //get vertex name + for (Map.Entry<String, BasicVertexInfo> entry : basicVertexInfoMap.entrySet()) { + String vertexId = vertexIDMappingJson.optString(entry.getKey()); + //vertexName --> vertexId + vertexNameIDMapping.put(entry.getKey(), vertexId); + } + } + } + + public static DagInfo create(JSONObject jsonObject) throws JSONException { + DagInfo dagInfo = new DagInfo(jsonObject); + return dagInfo; + } + + private void parseDAGPlan(JSONObject dagPlan) throws JSONException { + int version = dagPlan.optInt(Constants.VERSION, 1); + parseEdges(dagPlan.optJSONArray(Constants.EDGES)); + + JSONArray verticesInfo = dagPlan.optJSONArray(Constants.VERTICES); + parseBasicVertexInfo(verticesInfo); + + if (version > 1) { + parseDAGContext(dagPlan.optJSONObject(Constants.DAG_CONTEXT)); + } + } + + private void parseDAGContext(JSONObject callerContextInfo) { + if (callerContextInfo == null) { + LOG.info("No DAG Caller Context available"); + return; + } + String context = callerContextInfo.optString(Constants.CONTEXT); + String callerId = callerContextInfo.optString(Constants.CALLER_ID); + String callerType = callerContextInfo.optString(Constants.CALLER_TYPE); + String description = callerContextInfo.optString(Constants.DESCRIPTION); + + this.callerContext = CallerContext.create(context, description); + if (callerId != null && !callerId.isEmpty() && callerType != null && !callerType.isEmpty()) { + this.callerContext.setCallerIdAndType(callerId, callerType); + } else { + LOG.info("No DAG Caller Context Id and Type available"); + } + + } + + private void parseBasicVertexInfo(JSONArray verticesInfo) throws JSONException { + if (verticesInfo == null) { + LOG.info("No vertices available."); + return; + } + + //Parse basic information available in DAG for vertex and edges + for (int i = 0; i < verticesInfo.length(); i++) { + BasicVertexInfo basicVertexInfo = new BasicVertexInfo(); + + JSONObject vJson = verticesInfo.getJSONObject(i); + basicVertexInfo.vertexName = + vJson.optString(Constants.VERTEX_NAME); + JSONArray inEdges = vJson.optJSONArray(Constants.IN_EDGE_IDS); + if (inEdges != null) { + String[] inEdgeIds = new String[inEdges.length()]; + for (int j = 0; j < inEdges.length(); j++) { + inEdgeIds[j] = inEdges.get(j).toString(); + } + basicVertexInfo.inEdgeIds = inEdgeIds; + } + + JSONArray outEdges = vJson.optJSONArray(Constants.OUT_EDGE_IDS); + if (outEdges != null) { + String[] outEdgeIds = new String[outEdges.length()]; + for (int j = 0; j < outEdges.length(); j++) { + outEdgeIds[j] = outEdges.get(j).toString(); + } + basicVertexInfo.outEdgeIds = outEdgeIds; + } + + JSONArray addInputsJson = + vJson.optJSONArray(Constants.ADDITIONAL_INPUTS); + basicVertexInfo.additionalInputs = parseAdditionalDetailsForVertex(addInputsJson); + + JSONArray addOutputsJson = + vJson.optJSONArray(Constants.ADDITIONAL_OUTPUTS); + basicVertexInfo.additionalOutputs = parseAdditionalDetailsForVertex(addOutputsJson); + + basicVertexInfoMap.put(basicVertexInfo.vertexName, basicVertexInfo); + } + } + + /** + * get additional details available for every vertex in the dag + * + * @param jsonArray + * @return AdditionalInputOutputDetails[] + * @throws JSONException + */ + private AdditionalInputOutputDetails[] parseAdditionalDetailsForVertex(JSONArray jsonArray) throws + JSONException { + if (jsonArray != null) { + AdditionalInputOutputDetails[] + additionalInputOutputDetails = new AdditionalInputOutputDetails[jsonArray.length()]; + for (int j = 0; j < jsonArray.length(); j++) { + String name = jsonArray.getJSONObject(j).optString( + Constants.NAME); + String clazz = jsonArray.getJSONObject(j).optString( + Constants.CLASS); + String initializer = + jsonArray.getJSONObject(j).optString(Constants.INITIALIZER); + String userPayloadText = jsonArray.getJSONObject(j).optString( + Constants.USER_PAYLOAD_TEXT); + + additionalInputOutputDetails[j] = + new AdditionalInputOutputDetails(name, clazz, initializer, userPayloadText); + + } + return additionalInputOutputDetails; + } + return null; + } + + /** + * Parse edge details in the DAG + * + * @param edgesArray + * + * @throws JSONException + */ + private void parseEdges(JSONArray edgesArray) throws JSONException { + if (edgesArray == null) { + return; + } + for (int i = 0; i < edgesArray.length(); i++) { + JSONObject edge = edgesArray.getJSONObject(i); + Integer edgeId = edge.optInt(Constants.EDGE_ID); + String inputVertexName = + edge.optString(Constants.INPUT_VERTEX_NAME); + String outputVertexName = + edge.optString(Constants.OUTPUT_VERTEX_NAME); + String dataMovementType = + edge.optString(Constants.DATA_MOVEMENT_TYPE); + String edgeSourceClass = + edge.optString(Constants.EDGE_SOURCE_CLASS); + String edgeDestinationClass = + edge.optString(Constants.EDGE_DESTINATION_CLASS); + String inputUserPayloadAsText = + edge.optString(Constants.INPUT_PAYLOAD_TEXT); + String outputUserPayloadAsText = + edge.optString(Constants.OUTPUT_PAYLOAD_TEXT); + EdgeInfo edgeInfo = new EdgeInfo(inputVertexName, outputVertexName, + dataMovementType, edgeSourceClass, edgeDestinationClass, inputUserPayloadAsText, + outputUserPayloadAsText); + edgeInfoMap.put(edgeId, edgeInfo); + } + } + + static class BasicVertexInfo { + String vertexName; + String[] inEdgeIds; + String[] outEdgeIds; + AdditionalInputOutputDetails[] additionalInputs; + AdditionalInputOutputDetails[] additionalOutputs; + } + + void addVertexInfo(VertexInfo vertexInfo) { + BasicVertexInfo basicVertexInfo = basicVertexInfoMap.get(vertexInfo.getVertexName()); + + Preconditions.checkArgument(basicVertexInfo != null, + "VerteName " + vertexInfo.getVertexName() + + " not present in DAG's vertices " + basicVertexInfoMap.entrySet()); + + //populate additional information in VertexInfo + if (basicVertexInfo.additionalInputs != null) { + vertexInfo.setAdditionalInputInfoList(Arrays.asList(basicVertexInfo.additionalInputs)); + } + if (basicVertexInfo.additionalOutputs != null) { + vertexInfo.setAdditionalOutputInfoList(Arrays.asList(basicVertexInfo.additionalOutputs)); + } + + //Populate edge information in vertex + if (basicVertexInfo.inEdgeIds != null) { + for (String edge : basicVertexInfo.inEdgeIds) { + EdgeInfo edgeInfo = edgeInfoMap.get(Integer.parseInt(edge)); + Preconditions.checkState(edgeInfo != null, "EdgeId " + edge + " not present in DAG"); + vertexInfo.addInEdge(edgeInfo); + } + } + + if (basicVertexInfo.outEdgeIds != null) { + for (String edge : basicVertexInfo.outEdgeIds) { + EdgeInfo edgeInfo = edgeInfoMap.get(Integer.parseInt(edge)); + Preconditions.checkState(edgeInfo != null, "EdgeId " + edge + " not present in DAG"); + vertexInfo.addOutEdge(edgeInfo); + } + } + + vertexNameMap.put(vertexInfo.getVertexName(), vertexInfo); + } + + void setVersionInfo(VersionInfo versionInfo) { + this.versionInfo = versionInfo; + } + + void addContainerMapping(Container container, TaskAttemptInfo taskAttemptInfo) { + this.containerMapping.put(container, taskAttemptInfo); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("["); + sb.append("dagID=").append(getDagId()).append(", "); + sb.append("dagName=").append(getName()).append(", "); + sb.append("status=").append(getStatus()).append(", "); + sb.append("startTime=").append(getStartTimeInterval()).append(", "); + sb.append("submitTime=").append(getSubmitTime()).append(", "); + sb.append("endTime=").append(getFinishTimeInterval()).append(", "); + sb.append("timeTaken=").append(getTimeTaken()).append(", "); + sb.append("diagnostics=").append(getDiagnostics()).append(", "); + sb.append("vertexNameIDMapping=").append(getVertexNameIDMapping()).append(", "); + sb.append("failedTasks=").append(getFailedTaskCount()).append(", "); + sb.append("events=").append(getEvents()).append(", "); + sb.append("status=").append(getStatus()); + sb.append("]"); + return sb.toString(); + } + + public Multimap<Container, TaskAttemptInfo> getContainerMapping() { + return Multimaps.unmodifiableMultimap(containerMapping); + } + + public final VersionInfo getVersionInfo() { + return versionInfo; + } + + public final CallerContext getCallerContext() { + return callerContext; + } + + public final String getName() { + return name; + } + + public final Collection<EdgeInfo> getEdges() { + return Collections.unmodifiableCollection(edgeInfoMap.values()); + } + + public final long getSubmitTime() { + return submitTime; + } + + public final long getStartTime() { + return startTime; + } + + public final long getFinishTime() { + return endTime; + } + + /** + * Reference start time for the DAG. Vertex, Task, TaskAttempt would map on to this. + * If absolute start time is needed, call getAbsStartTime(). + * + * @return starting time w.r.t to dag + */ + public final long getStartTimeInterval() { + return 0; + } + + @Override + public final long getFinishTimeInterval() { + long dagEndTime = (endTime - startTime); + if (dagEndTime < 0) { + //probably dag is not complete or failed in middle. get the last task attempt time + for (VertexInfo vertexInfo : getVertices()) { + dagEndTime = (vertexInfo.getFinishTimeInterval() > dagEndTime) ? vertexInfo.getFinishTimeInterval() : dagEndTime; + } + } + return dagEndTime; + } + + public final long getTimeTaken() { + return getFinishTimeInterval(); + } + + public final String getStatus() { + return status; + } + + /** + * Get vertexInfo for a given vertexid + * + * @param vertexId + * @return VertexInfo + */ + public VertexInfo getVertexFromId(String vertexId) { + return vertexNameMap.get(vertexNameIDMapping.getKey(vertexId)); + } + + /** + * Get vertexInfo for a given vertex name + * + * @param vertexName + * @return VertexInfo + */ + public final VertexInfo getVertex(String vertexName) { + return vertexNameMap.get(vertexName); + } + + public final String getDiagnostics() { + return diagnostics; + } + + /** + * Get all vertices + * + * @return List<VertexInfo> + */ + public final List<VertexInfo> getVertices() { + List<VertexInfo> vertices = Lists.newLinkedList(vertexNameMap.values()); + Collections.sort(vertices, new Comparator<VertexInfo>() { + + @Override public int compare(VertexInfo o1, VertexInfo o2) { + return (o1.getStartTimeInterval() < o2.getStartTimeInterval()) ? -1 : + ((o1.getStartTimeInterval() == o2.getStartTimeInterval()) ? + 0 : 1); + } + }); + return Collections.unmodifiableList(vertices); + } + + /** + * Get list of failed vertices + * + * @return List<VertexInfo> + */ + public final List<VertexInfo> getFailedVertices() { + return getVertices(VertexState.FAILED); + } + + /** + * Get list of killed vertices + * + * @return List<VertexInfo> + */ + public final List<VertexInfo> getKilledVertices() { + return getVertices(VertexState.KILLED); + } + + /** + * Get list of failed vertices + * + * @return List<VertexInfo> + */ + public final List<VertexInfo> getSuccessfullVertices() { + return getVertices(VertexState.SUCCEEDED); + } + + /** + * Get list of vertices belonging to a specific state + * + * @param state + * @return Collection<VertexInfo> + */ + public final List<VertexInfo> getVertices(final VertexState state) { + return Collections.unmodifiableList(Lists.newLinkedList(Iterables.filter(Lists.newLinkedList + (vertexNameMap.values()), new Predicate<VertexInfo>() { + @Override public boolean apply(VertexInfo input) { + return input.getStatus() != null && input.getStatus().equals(state.toString()); + } + } + ) + ) + ); + } + + public final Map<String, VertexInfo> getVertexMapping() { + return Collections.unmodifiableMap(vertexNameMap); + } + + private Ordering<VertexInfo> getVertexOrdering() { + return Ordering.from(new Comparator<VertexInfo>() { + @Override public int compare(VertexInfo o1, VertexInfo o2) { + return (o1.getTimeTaken() < o2.getTimeTaken()) ? -1 : + ((o1.getTimeTaken() == o2.getTimeTaken()) ? + 0 : 1); + } + }); + } + + /** + * Get the slowest vertex in the DAG + * + * @return VertexInfo + */ + public final VertexInfo getSlowestVertex() { + List<VertexInfo> vertexInfoList = getVertices(); + if (vertexInfoList.size() == 0) { + return null; + } + return getVertexOrdering().max(vertexInfoList); + } + + /** + * Get the slowest vertex in the DAG + * + * @return VertexInfo + */ + public final VertexInfo getFastestVertex() { + List<VertexInfo> vertexInfoList = getVertices(); + if (vertexInfoList.size() == 0) { + return null; + } + return getVertexOrdering().min(vertexInfoList); + } + + /** + * Get node details for this DAG. Would be useful for analyzing node to tasks. + * + * @return Multimap<String, TaskAttemptInfo> taskAttempt details at every node + */ + public final Multimap<String, TaskAttemptInfo> getNodeDetails() { + Multimap<String, TaskAttemptInfo> nodeDetails = LinkedListMultimap.create(); + for (VertexInfo vertexInfo : getVertices()) { + Multimap<Container, TaskAttemptInfo> containerMapping = vertexInfo.getContainersMapping(); + for (Map.Entry<Container, TaskAttemptInfo> entry : containerMapping.entries()) { + nodeDetails.put(entry.getKey().getHost(), entry.getValue()); + } + } + return nodeDetails; + } + + /** + * Get containers used for this DAG + * + * @return Multimap<Container, TaskAttemptInfo> task attempt details at every container + */ + public final Multimap<Container, TaskAttemptInfo> getContainersToTaskAttemptMapping() { + List<VertexInfo> VertexInfoList = getVertices(); + Multimap<Container, TaskAttemptInfo> containerMapping = LinkedHashMultimap.create(); + + for (VertexInfo vertexInfo : VertexInfoList) { + containerMapping.putAll(vertexInfo.getContainersMapping()); + } + return Multimaps.unmodifiableMultimap(containerMapping); + } + + public final Map getVertexNameIDMapping() { + return vertexNameIDMapping; + } + + public final int getNumVertices() { + return numVertices; + } + + public final String getDagId() { + return dagId; + } + + public final int getFailedTaskCount() { + return failedTasks; + } + +}
http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/EdgeInfo.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/EdgeInfo.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/EdgeInfo.java new file mode 100644 index 0000000..ab8e831 --- /dev/null +++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/EdgeInfo.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.history.parser.datamodel; + +import static org.apache.hadoop.classification.InterfaceAudience.Public; +import static org.apache.hadoop.classification.InterfaceStability.Evolving; + +@Public +@Evolving +public class EdgeInfo { + + private final String inputVertexName; + private final String outputVertexName; + private final String dataMovementType; + private final String edgeSourceClass; + private final String edgeDestinationClass; + private final String inputUserPayloadAsText; + private final String outputUserPayloadAsText; + + private VertexInfo sourceVertex; + private VertexInfo destinationVertex; + + public EdgeInfo(String inputVertexName, String outputVertexName, String dataMovementType, + String edgeSourceClass, String edgeDestinationClass, String inputUserPayloadAsText, String + outputUserPayloadAsText) { + this.inputVertexName = inputVertexName; + this.outputVertexName = outputVertexName; + this.dataMovementType = dataMovementType; + this.edgeSourceClass = edgeSourceClass; + this.edgeDestinationClass = edgeDestinationClass; + this.inputUserPayloadAsText = inputUserPayloadAsText; + this.outputUserPayloadAsText = outputUserPayloadAsText; + } + + public final String getInputVertexName() { + return inputVertexName; + } + + public final String getOutputVertexName() { + return outputVertexName; + } + + public final String getDataMovementType() { + return dataMovementType; + } + + public final String getEdgeSourceClass() { + return edgeSourceClass; + } + + public final String getEdgeDestinationClass() { + return edgeDestinationClass; + } + + public final String getInputUserPayloadAsText() { + return inputUserPayloadAsText; + } + + public final String getOutputUserPayloadAsText() { + return outputUserPayloadAsText; + } + + public final VertexInfo getSourceVertex() { + return sourceVertex; + } + + public final void setSourceVertex(VertexInfo sourceVertex) { + this.sourceVertex = sourceVertex; + } + + public final VertexInfo getDestinationVertex() { + return destinationVertex; + } + + public final void setDestinationVertex(VertexInfo destinationVertex) { + this.destinationVertex = destinationVertex; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("["); + sb.append("inputVertexName=").append(inputVertexName).append(", "); + sb.append("outputVertexName=").append(outputVertexName).append(", "); + sb.append("dataMovementType=").append(dataMovementType).append(", "); + sb.append("edgeSourceClass=").append(edgeSourceClass).append(", "); + sb.append("edgeDestinationClass=").append(edgeDestinationClass).append(", "); + sb.append("inputUserPayloadAsText=").append(inputUserPayloadAsText).append(","); + sb.append("outputUserPayloadAsText=").append(outputUserPayloadAsText).append(", "); + sb.append("sourceVertex=").append(sourceVertex.getVertexName()).append(", "); + sb.append("destinationVertex=").append(destinationVertex.getVertexName()).append(", "); + sb.append("outputUserPayloadAsText=").append(outputUserPayloadAsText); + sb.append("]"); + return sb.toString(); + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/Event.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/Event.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/Event.java new file mode 100644 index 0000000..70310f3 --- /dev/null +++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/Event.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.history.parser.datamodel; + +import static org.apache.hadoop.classification.InterfaceAudience.Public; +import static org.apache.hadoop.classification.InterfaceStability.Evolving; + +@Public +@Evolving +public class Event { + private final String info; + private final String type; + private final long time; + + private long refTime; //typically dag start time. + + public Event(String info, String type, long time) { + this.time = time; + this.type = type; + this.info = info; + } + + void setReferenceTime(long refTime) { + this.refTime = refTime; + } + + public final String getInfo() { + return info; + } + + public final String getType() { + return type; + } + + public final long getAbsoluteTime() { + return time; + } + + public final long getTime() { + return time - refTime; + } + + @Override + public String toString() { + return "[info=" + info + ", type=" + type + ", time=" + time + "]"; + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java new file mode 100644 index 0000000..d373513 --- /dev/null +++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java @@ -0,0 +1,379 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.history.parser.datamodel; + +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Ordering; + +import org.apache.hadoop.util.StringInterner; +import org.apache.tez.common.ATSConstants; +import org.apache.tez.common.counters.DAGCounter; +import org.apache.tez.common.counters.TaskCounter; +import org.apache.tez.common.counters.TezCounter; +import org.apache.tez.dag.api.oldrecords.TaskAttemptState; +import org.apache.tez.history.parser.utils.Utils; +import org.codehaus.jettison.json.JSONException; +import org.codehaus.jettison.json.JSONObject; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Map; + +import static org.apache.hadoop.classification.InterfaceStability.Evolving; +import static org.apache.hadoop.classification.InterfaceAudience.Public; + +@Public +@Evolving +public class TaskAttemptInfo extends BaseInfo { + + private static final String SUCCEEDED = StringInterner.weakIntern(TaskAttemptState.SUCCEEDED.name()); + + private final String taskAttemptId; + private final long startTime; + private final long endTime; + private final String diagnostics; + + private final long creationTime; + private final long allocationTime; + private final String containerId; + private final String nodeId; + private final String status; + private final String logUrl; + private final String creationCausalTA; + private final String terminationCause; + private final long executionTimeInterval; + // this list is in time order - array list for easy walking + private final ArrayList<DataDependencyEvent> lastDataEvents = Lists.newArrayList(); + + private TaskInfo taskInfo; + + private Container container; + + public static class DataDependencyEvent { + String taId; + long timestamp; + public DataDependencyEvent(String id, long time) { + taId = id; + timestamp = time; + } + public long getTimestamp() { + return timestamp; + } + public String getTaskAttemptId() { + return taId; + } + } + + TaskAttemptInfo(JSONObject jsonObject) throws JSONException { + super(jsonObject); + + Preconditions.checkArgument( + jsonObject.getString(Constants.ENTITY_TYPE).equalsIgnoreCase + (Constants.TEZ_TASK_ATTEMPT_ID)); + + taskAttemptId = StringInterner.weakIntern(jsonObject.optString(Constants.ENTITY)); + + //Parse additional Info + final JSONObject otherInfoNode = jsonObject.getJSONObject(Constants.OTHER_INFO); + startTime = otherInfoNode.optLong(Constants.START_TIME); + endTime = otherInfoNode.optLong(Constants.FINISH_TIME); + diagnostics = otherInfoNode.optString(Constants.DIAGNOSTICS); + creationTime = otherInfoNode.optLong(Constants.CREATION_TIME); + creationCausalTA = StringInterner.weakIntern( + otherInfoNode.optString(Constants.CREATION_CAUSAL_ATTEMPT)); + allocationTime = otherInfoNode.optLong(Constants.ALLOCATION_TIME); + containerId = StringInterner.weakIntern(otherInfoNode.optString(Constants.CONTAINER_ID)); + String id = otherInfoNode.optString(Constants.NODE_ID); + nodeId = StringInterner.weakIntern((id != null) ? (id.split(":")[0]) : ""); + logUrl = otherInfoNode.optString(Constants.COMPLETED_LOGS_URL); + + status = StringInterner.weakIntern(otherInfoNode.optString(Constants.STATUS)); + container = new Container(containerId, nodeId); + if (otherInfoNode.has(Constants.LAST_DATA_EVENTS)) { + List<DataDependencyEvent> eventInfo = Utils.parseDataEventDependencyFromJSON( + otherInfoNode.optJSONObject(Constants.LAST_DATA_EVENTS)); + long lastTime = 0; + for (DataDependencyEvent item : eventInfo) { + // check these are in time order + Preconditions.checkState(lastTime < item.getTimestamp()); + lastTime = item.getTimestamp(); + lastDataEvents.add(item); + } + } + terminationCause = StringInterner + .weakIntern(otherInfoNode.optString(ATSConstants.TASK_ATTEMPT_ERROR_ENUM)); + executionTimeInterval = (endTime > startTime) ? (endTime - startTime) : 0; + } + + public static Ordering<TaskAttemptInfo> orderingOnAllocationTime() { + return Ordering.from(new Comparator<TaskAttemptInfo>() { + @Override + public int compare(TaskAttemptInfo o1, TaskAttemptInfo o2) { + return (o1.getAllocationTime() < o2.getAllocationTime() ? -1 + : o1.getAllocationTime() > o2.getAllocationTime() ? 1 : 0); + } + }); + } + + void setTaskInfo(TaskInfo taskInfo) { + Preconditions.checkArgument(taskInfo != null, "Provide valid taskInfo"); + this.taskInfo = taskInfo; + taskInfo.addTaskAttemptInfo(this); + } + + @Override + public final long getStartTimeInterval() { + return startTime - (getTaskInfo().getVertexInfo().getDagInfo().getStartTime()); + } + + @Override + public final long getFinishTimeInterval() { + return endTime - (getTaskInfo().getVertexInfo().getDagInfo().getStartTime()); + } + + public final boolean isSucceeded() { + return status.equals(SUCCEEDED); + } + + public final List<DataDependencyEvent> getLastDataEvents() { + return lastDataEvents; + } + + public final long getExecutionTimeInterval() { + return executionTimeInterval; + } + + public final long getPostDataExecutionTimeInterval() { + if (getStartTime() > 0 && getFinishTime() > 0) { + // start time defaults to the actual start time + long postDataStartTime = startTime; + if (getLastDataEvents() != null && !getLastDataEvents().isEmpty()) { + // if last data event is after the start time then use last data event time + long lastEventTime = getLastDataEvents().get(getLastDataEvents().size()-1).getTimestamp(); + postDataStartTime = startTime > lastEventTime ? startTime : lastEventTime; + } + return (getFinishTime() - postDataStartTime); + } + return -1; + } + + public final long getAllocationToEndTimeInterval() { + return (endTime - allocationTime); + } + + public final long getAllocationToStartTimeInterval() { + return (startTime - allocationTime); + } + + public final long getCreationToAllocationTimeInterval() { + return (allocationTime - creationTime); + } + + public final long getStartTime() { + return startTime; + } + + public final long getFinishTime() { + return endTime; + } + + public final long getCreationTime() { + return creationTime; + } + + public final DataDependencyEvent getLastDataEventInfo(long timeThreshold) { + for (int i=lastDataEvents.size()-1; i>=0; i--) { + // walk back in time until we get first event that happened before the threshold + DataDependencyEvent item = lastDataEvents.get(i); + if (item.getTimestamp() < timeThreshold) { + return item; + } + } + return null; + } + + public final long getTimeTaken() { + return getFinishTimeInterval() - getStartTimeInterval(); + } + + public final long getCreationTimeInterval() { + return creationTime - (getTaskInfo().getVertexInfo().getDagInfo().getStartTime()); + } + + public final String getCreationCausalTA() { + return creationCausalTA; + } + + public final long getAllocationTime() { + return allocationTime; + } + + public final String getShortName() { + return getTaskInfo().getVertexInfo().getVertexName() + " : " + + taskAttemptId.substring(taskAttemptId.lastIndexOf('_', taskAttemptId.lastIndexOf('_') - 1) + 1); + } + + @Override + public final String getDiagnostics() { + return diagnostics; + } + + public final String getTerminationCause() { + return terminationCause; + } + + public static TaskAttemptInfo create(JSONObject taskInfoObject) throws JSONException { + return new TaskAttemptInfo(taskInfoObject); + } + + public final boolean isLocalityInfoAvailable() { + Map<String, TezCounter> dataLocalTask = getCounter(DAGCounter.class.getName(), + DAGCounter.DATA_LOCAL_TASKS.toString()); + Map<String, TezCounter> rackLocalTask = getCounter(DAGCounter.class.getName(), + DAGCounter.RACK_LOCAL_TASKS.toString()); + + Map<String, TezCounter> otherLocalTask = getCounter(DAGCounter.class.getName(), + DAGCounter.OTHER_LOCAL_TASKS.toString()); + + if (!dataLocalTask.isEmpty() || !rackLocalTask.isEmpty() || !otherLocalTask.isEmpty()) { + return true; + } + return false; + } + + public final String getDetailedStatus() { + if (!Strings.isNullOrEmpty(getTerminationCause())) { + return getStatus() + ":" + getTerminationCause(); + } + return getStatus(); + } + + public final TezCounter getLocalityInfo() { + Map<String, TezCounter> dataLocalTask = getCounter(DAGCounter.class.getName(), + DAGCounter.DATA_LOCAL_TASKS.toString()); + Map<String, TezCounter> rackLocalTask = getCounter(DAGCounter.class.getName(), + DAGCounter.RACK_LOCAL_TASKS.toString()); + Map<String, TezCounter> otherLocalTask = getCounter(DAGCounter.class.getName(), + DAGCounter.OTHER_LOCAL_TASKS.toString()); + + if (!dataLocalTask.isEmpty()) { + return dataLocalTask.get(DAGCounter.class.getName()); + } + + if (!rackLocalTask.isEmpty()) { + return rackLocalTask.get(DAGCounter.class.getName()); + } + + if (!otherLocalTask.isEmpty()) { + return otherLocalTask.get(DAGCounter.class.getName()); + } + return null; + } + + public final TaskInfo getTaskInfo() { + return taskInfo; + } + + public final String getTaskAttemptId() { + return taskAttemptId; + } + + public final String getNodeId() { + return nodeId; + } + + public final String getStatus() { + return status; + } + + public final Container getContainer() { + return container; + } + + public final String getLogURL() { + return logUrl; + } + + /** + * Get merge counter per source. Available in case of reducer task + * + * @return Map<String, TezCounter> merge phase time at every counter group level + */ + public final Map<String, TezCounter> getMergePhaseTime() { + return getCounter(null, TaskCounter.MERGE_PHASE_TIME.name()); + } + + /** + * Get shuffle counter per source. Available in case of shuffle + * + * @return Map<String, TezCounter> shuffle phase time at every counter group level + */ + public final Map<String, TezCounter> getShufflePhaseTime() { + return getCounter(null, TaskCounter.SHUFFLE_PHASE_TIME.name()); + } + + /** + * Get OUTPUT_BYTES counter per source. Available in case of map outputs + * + * @return Map<String, TezCounter> output bytes counter at every counter group + */ + public final Map<String, TezCounter> getTaskOutputBytes() { + return getCounter(null, TaskCounter.OUTPUT_BYTES.name()); + } + + /** + * Get number of spills per source. (SPILLED_RECORDS / OUTPUT_RECORDS) + * + * @return Map<String, Long> spill count details + */ + public final Map<String, Float> getSpillCount() { + Map<String, TezCounter> outputRecords = getCounter(null, "OUTPUT_RECORDS"); + Map<String, TezCounter> spilledRecords = getCounter(null, "SPILLED_RECORDS"); + Map<String, Float> result = Maps.newHashMap(); + for (Map.Entry<String, TezCounter> entry : spilledRecords.entrySet()) { + String source = entry.getKey(); + long spilledVal = entry.getValue().getValue(); + long outputVal = outputRecords.get(source).getValue(); + result.put(source, (spilledVal * 1.0f) / (outputVal * 1.0f)); + } + return result; + } + + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("["); + sb.append("taskAttemptId=").append(getTaskAttemptId()).append(", "); + sb.append("creationTime=").append(getCreationTimeInterval()).append(", "); + sb.append("startTime=").append(getStartTimeInterval()).append(", "); + sb.append("finishTime=").append(getFinishTimeInterval()).append(", "); + sb.append("timeTaken=").append(getTimeTaken()).append(", "); + sb.append("events=").append(getEvents()).append(", "); + sb.append("diagnostics=").append(getDiagnostics()).append(", "); + sb.append("container=").append(getContainer()).append(", "); + sb.append("nodeId=").append(getNodeId()).append(", "); + sb.append("logURL=").append(getLogURL()).append(", "); + sb.append("status=").append(getStatus()); + sb.append("]"); + return sb.toString(); + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskInfo.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskInfo.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskInfo.java new file mode 100644 index 0000000..c6f89d6 --- /dev/null +++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskInfo.java @@ -0,0 +1,354 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.history.parser.datamodel; + +import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; +import com.google.common.base.Strings; +import com.google.common.collect.Iterables; +import com.google.common.collect.LinkedHashMultimap; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Multimap; +import com.google.common.collect.Multimaps; +import com.google.common.collect.Ordering; + +import org.apache.hadoop.util.StringInterner; +import org.apache.tez.dag.api.oldrecords.TaskAttemptState; +import org.codehaus.jettison.json.JSONException; +import org.codehaus.jettison.json.JSONObject; + +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; + +import static org.apache.hadoop.classification.InterfaceAudience.Public; +import static org.apache.hadoop.classification.InterfaceStability.Evolving; + +@Public +@Evolving +public class TaskInfo extends BaseInfo { + + private final long startTime; + private final long endTime; + private final String diagnostics; + private final String successfulAttemptId; + private final long scheduledTime; + private final String status; + private final String taskId; + + private VertexInfo vertexInfo; + + private Map<String, TaskAttemptInfo> attemptInfoMap = Maps + .newHashMap(); + + TaskInfo(JSONObject jsonObject) throws JSONException { + super(jsonObject); + + Preconditions.checkArgument( + jsonObject.getString(Constants.ENTITY_TYPE).equalsIgnoreCase + (Constants.TEZ_TASK_ID)); + + taskId = StringInterner.weakIntern(jsonObject.optString(Constants.ENTITY)); + + //Parse additional Info + final JSONObject otherInfoNode = jsonObject.getJSONObject(Constants.OTHER_INFO); + startTime = otherInfoNode.optLong(Constants.START_TIME); + endTime = otherInfoNode.optLong(Constants.FINISH_TIME); + diagnostics = otherInfoNode.optString(Constants.DIAGNOSTICS); + successfulAttemptId = StringInterner.weakIntern( + otherInfoNode.optString(Constants.SUCCESSFUL_ATTEMPT_ID)); + scheduledTime = otherInfoNode.optLong(Constants.SCHEDULED_TIME); + status = StringInterner.weakIntern(otherInfoNode.optString(Constants.STATUS)); + } + + @Override + public final long getStartTimeInterval() { + return startTime - (vertexInfo.getDagInfo().getStartTime()); + } + + public final long getStartTime() { + return startTime; + } + + public final long getFinishTime() { + return endTime; + } + + @Override + public final long getFinishTimeInterval() { + long taskFinishTime = endTime - (vertexInfo.getDagInfo().getStartTime()); + if (taskFinishTime < 0) { + //probably vertex is not complete or failed in middle. get the last task attempt time + for (TaskAttemptInfo attemptInfo : getTaskAttempts()) { + taskFinishTime = (attemptInfo.getFinishTimeInterval() > taskFinishTime) + ? attemptInfo.getFinishTimeInterval() : taskFinishTime; + } + } + return taskFinishTime; + } + + @Override + public final String getDiagnostics() { + return diagnostics; + } + + public static TaskInfo create(JSONObject taskInfoObject) throws + JSONException { + return new TaskInfo(taskInfoObject); + } + + void addTaskAttemptInfo(TaskAttemptInfo taskAttemptInfo) { + attemptInfoMap.put(taskAttemptInfo.getTaskAttemptId(), taskAttemptInfo); + } + + void setVertexInfo(VertexInfo vertexInfo) { + Preconditions.checkArgument(vertexInfo != null, "Provide valid vertexInfo"); + this.vertexInfo = vertexInfo; + //link it to vertex + vertexInfo.addTaskInfo(this); + } + + public final VertexInfo getVertexInfo() { + return vertexInfo; + } + + /** + * Get all task attempts + * + * @return list of task attempt info + */ + public final List<TaskAttemptInfo> getTaskAttempts() { + List<TaskAttemptInfo> attemptsList = Lists.newLinkedList(attemptInfoMap.values()); + Collections.sort(attemptsList, orderingOnAttemptStartTime()); + return Collections.unmodifiableList(attemptsList); + } + + /** + * Get list of failed tasks + * + * @return List<TaskAttemptInfo> + */ + public final List<TaskAttemptInfo> getFailedTaskAttempts() { + return getTaskAttempts(TaskAttemptState.FAILED); + } + + /** + * Get list of killed tasks + * + * @return List<TaskAttemptInfo> + */ + public final List<TaskAttemptInfo> getKilledTaskAttempts() { + return getTaskAttempts(TaskAttemptState.KILLED); + } + + /** + * Get list of failed tasks + * + * @return List<TaskAttemptInfo> + */ + public final List<TaskAttemptInfo> getSuccessfulTaskAttempts() { + return getTaskAttempts(TaskAttemptState.SUCCEEDED); + } + + /** + * Get list of tasks belonging to a specific state + * + * @param state + * @return Collection<TaskAttemptInfo> + */ + public final List<TaskAttemptInfo> getTaskAttempts(final TaskAttemptState state) { + return Collections.unmodifiableList(Lists.newLinkedList(Iterables.filter(Lists.newLinkedList + (attemptInfoMap.values()), new Predicate<TaskAttemptInfo>() { + @Override + public boolean apply(TaskAttemptInfo input) { + return input.getStatus() != null && input.getStatus().equals(state.toString()); + } + } + ) + ) + ); + } + + /** + * Get the set of containers on which the task attempts ran for this task + * + * @return Multimap<Container, TaskAttemptInfo> task attempt details at container level + */ + public final Multimap<Container, TaskAttemptInfo> getContainersMapping() { + Multimap<Container, TaskAttemptInfo> containerMapping = LinkedHashMultimap.create(); + for (TaskAttemptInfo attemptInfo : getTaskAttempts()) { + containerMapping.put(attemptInfo.getContainer(), attemptInfo); + } + return Multimaps.unmodifiableMultimap(containerMapping); + } + + /** + * Get the successful task attempt + * + * @return TaskAttemptInfo + */ + public final TaskAttemptInfo getSuccessfulTaskAttempt() { + if (!Strings.isNullOrEmpty(getSuccessfulAttemptId())) { + for (TaskAttemptInfo attemptInfo : getTaskAttempts()) { + if (attemptInfo.getTaskAttemptId().equals(getSuccessfulAttemptId())) { + return attemptInfo; + } + } + } + // fall back to checking status if successful attempt id is not available + for (TaskAttemptInfo attemptInfo : getTaskAttempts()) { + if (attemptInfo.getStatus().equalsIgnoreCase(TaskAttemptState.SUCCEEDED.toString())) { + return attemptInfo; + } + } + return null; + } + + /** + * Get last task attempt to finish + * + * @return TaskAttemptInfo + */ + public final TaskAttemptInfo getLastTaskAttemptToFinish() { + List<TaskAttemptInfo> attemptsList = getTaskAttempts(); + if (attemptsList.isEmpty()) { + return null; + } + + return Ordering.from(new Comparator<TaskAttemptInfo>() { + @Override public int compare(TaskAttemptInfo o1, TaskAttemptInfo o2) { + return (o1.getFinishTimeInterval() < o2.getFinishTimeInterval()) ? -1 : + ((o1.getFinishTimeInterval() == o2.getFinishTimeInterval()) ? + 0 : 1); + } + }).max(attemptsList); + } + + /** + * Get average task attempt duration. Includes succesful and failed tasks + * + * @return float + */ + public final float getAvgTaskAttemptDuration() { + float totalTaskDuration = 0; + List<TaskAttemptInfo> attemptsList = getTaskAttempts(); + if (attemptsList.size() == 0) { + return 0; + } + for (TaskAttemptInfo attemptInfo : attemptsList) { + totalTaskDuration += attemptInfo.getTimeTaken(); + } + return ((totalTaskDuration * 1.0f) / attemptsList.size()); + } + + private Ordering<TaskAttemptInfo> orderingOnTimeTaken() { + return Ordering.from(new Comparator<TaskAttemptInfo>() { + @Override public int compare(TaskAttemptInfo o1, TaskAttemptInfo o2) { + return (o1.getTimeTaken() < o2.getTimeTaken()) ? -1 : + ((o1.getTimeTaken() == o2.getTimeTaken()) ? + 0 : 1); + } + }); + } + + private Ordering<TaskAttemptInfo> orderingOnAttemptStartTime() { + return Ordering.from(new Comparator<TaskAttemptInfo>() { + @Override public int compare(TaskAttemptInfo o1, TaskAttemptInfo o2) { + return (o1.getStartTimeInterval() < o2.getStartTimeInterval()) ? -1 : + ((o1.getStartTimeInterval() == o2.getStartTimeInterval()) ? 0 : 1); + } + }); + } + + /** + * Get min task attempt duration. This includes successful/failed task attempts as well + * + * @return long + */ + public final long getMinTaskAttemptDuration() { + List<TaskAttemptInfo> attemptsList = getTaskAttempts(); + if (attemptsList.isEmpty()) { + return 0; + } + + return orderingOnTimeTaken().min(attemptsList).getTimeTaken(); + } + + /** + * Get max task attempt duration. This includes successful/failed task attempts as well + * + * @return long + */ + public final long getMaxTaskAttemptDuration() { + List<TaskAttemptInfo> attemptsList = getTaskAttempts(); + if (attemptsList.isEmpty()) { + return 0; + } + + return orderingOnTimeTaken().max(attemptsList).getTimeTaken(); + } + + public final int getNumberOfTaskAttempts() { + return getTaskAttempts().size(); + } + + public final String getStatus() { + return status; + } + + public final String getTaskId() { + return taskId; + } + + public final long getTimeTaken() { + return getFinishTimeInterval() - getStartTimeInterval(); + } + + public final String getSuccessfulAttemptId() { + return successfulAttemptId; + } + + public final long getAbsoluteScheduleTime() { + return scheduledTime; + } + + public final long getScheduledTime() { + return scheduledTime - this.getVertexInfo().getDagInfo().getStartTime(); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("["); + sb.append("taskId=").append(getTaskId()).append(", "); + sb.append("scheduledTime=").append(getAbsoluteScheduleTime()).append(", "); + sb.append("startTime=").append(getStartTimeInterval()).append(", "); + sb.append("finishTime=").append(getFinishTimeInterval()).append(", "); + sb.append("timeTaken=").append(getTimeTaken()).append(", "); + sb.append("events=").append(getEvents()).append(", "); + sb.append("diagnostics=").append(getDiagnostics()).append(", "); + sb.append("successfulAttempId=").append(getSuccessfulAttemptId()).append(", "); + sb.append("status=").append(getStatus()).append(", "); + sb.append("vertexName=").append(getVertexInfo().getVertexName()); + sb.append("]"); + return sb.toString(); + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VersionInfo.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VersionInfo.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VersionInfo.java new file mode 100644 index 0000000..97d18cd --- /dev/null +++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VersionInfo.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.history.parser.datamodel; + +public class VersionInfo { + + private final String buildTime; + private final String revision; + private final String version; + + public VersionInfo(String buildTime, String revision, String version) { + this.buildTime = buildTime; + this.revision = revision; + this.version = version; + } + + public String getBuildTime() { + return buildTime; + } + + public String getRevision() { + return revision; + } + + public String getVersion() { + return version; + } + +} http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VertexInfo.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VertexInfo.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VertexInfo.java new file mode 100644 index 0000000..50647fe --- /dev/null +++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VertexInfo.java @@ -0,0 +1,636 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.history.parser.datamodel; + +import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; +import com.google.common.collect.Iterables; +import com.google.common.collect.LinkedHashMultimap; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Multimap; +import com.google.common.collect.Multimaps; +import com.google.common.collect.Ordering; + +import org.apache.hadoop.util.StringInterner; +import org.apache.tez.dag.api.oldrecords.TaskState; +import org.codehaus.jettison.json.JSONException; +import org.codehaus.jettison.json.JSONObject; + +import javax.annotation.Nullable; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; + +import static org.apache.hadoop.classification.InterfaceAudience.Public; +import static org.apache.hadoop.classification.InterfaceStability.Evolving; + +@Public +@Evolving +public class VertexInfo extends BaseInfo { + + private final String vertexId; + private final String vertexName; + private final long finishTime; + private final long initTime; + private final long initRequestedTime; + private final long startTime; + private final long startRequestedTime; + + private final String diagnostics; + private final String processorClass; + + private final int numTasks; + private final int failedTasks; + private final int completedTasks; + private final int succeededTasks; + private final int killedTasks; + private final int numFailedTaskAttempts; + + private final String status; + + //TaskID --> TaskInfo for internal reference + private Map<String, TaskInfo> taskInfoMap; + + private final List<EdgeInfo> inEdgeList; + private final List<EdgeInfo> outEdgeList; + + private final List<AdditionalInputOutputDetails> additionalInputInfoList; + private final List<AdditionalInputOutputDetails> additionalOutputInfoList; + + private long avgPostDataExecutionTimeInterval = -1; + + private DagInfo dagInfo; + + VertexInfo(JSONObject jsonObject) throws JSONException { + super(jsonObject); + + Preconditions.checkArgument( + jsonObject.getString(Constants.ENTITY_TYPE).equalsIgnoreCase + (Constants.TEZ_VERTEX_ID)); + + vertexId = StringInterner.weakIntern(jsonObject.optString(Constants.ENTITY)); + taskInfoMap = Maps.newHashMap(); + + inEdgeList = Lists.newLinkedList(); + outEdgeList = Lists.newLinkedList(); + additionalInputInfoList = Lists.newLinkedList(); + additionalOutputInfoList = Lists.newLinkedList(); + + //Parse additional Info + JSONObject otherInfoNode = jsonObject.getJSONObject(Constants.OTHER_INFO); + initRequestedTime = otherInfoNode.optLong(Constants.INIT_REQUESTED_TIME); + startRequestedTime = otherInfoNode.optLong(Constants.START_REQUESTED_TIME); + startTime = otherInfoNode.optLong(Constants.START_TIME); + initTime = otherInfoNode.optLong(Constants.INIT_TIME); + finishTime = otherInfoNode.optLong(Constants.FINISH_TIME); + diagnostics = otherInfoNode.optString(Constants.DIAGNOSTICS); + numTasks = otherInfoNode.optInt(Constants.NUM_TASKS); + failedTasks = otherInfoNode.optInt(Constants.NUM_FAILED_TASKS); + succeededTasks = + otherInfoNode.optInt(Constants.NUM_SUCCEEDED_TASKS); + completedTasks = + otherInfoNode.optInt(Constants.NUM_COMPLETED_TASKS); + killedTasks = otherInfoNode.optInt(Constants.NUM_KILLED_TASKS); + numFailedTaskAttempts = + otherInfoNode.optInt(Constants.NUM_FAILED_TASKS_ATTEMPTS); + vertexName = StringInterner.weakIntern(otherInfoNode.optString(Constants.VERTEX_NAME)); + processorClass = StringInterner.weakIntern(otherInfoNode.optString(Constants.PROCESSOR_CLASS_NAME)); + status = StringInterner.weakIntern(otherInfoNode.optString(Constants.STATUS)); + } + + public static VertexInfo create(JSONObject vertexInfoObject) throws + JSONException { + return new VertexInfo(vertexInfoObject); + } + + /** + * Update edge details with source and destination vertex objects. + */ + private void updateEdgeInfo() { + if (dagInfo.getNumVertices() == dagInfo.getVertices().size()) { + //We can update EdgeInfo when all vertices are parsed + Map<String, VertexInfo> vertexMapping = dagInfo.getVertexMapping(); + for (EdgeInfo edge : dagInfo.getEdges()) { + VertexInfo sourceVertex = vertexMapping.get(edge.getInputVertexName()); + VertexInfo destinationVertex = vertexMapping.get(edge.getOutputVertexName()); + edge.setSourceVertex(sourceVertex); + edge.setDestinationVertex(destinationVertex); + } + } + } + + void addTaskInfo(TaskInfo taskInfo) { + this.taskInfoMap.put(taskInfo.getTaskId(), taskInfo); + } + + void setAdditionalInputInfoList(List<AdditionalInputOutputDetails> additionalInputInfoList) { + this.additionalInputInfoList.clear(); + this.additionalInputInfoList.addAll(additionalInputInfoList); + } + + void setAdditionalOutputInfoList(List<AdditionalInputOutputDetails> additionalOutputInfoList) { + this.additionalOutputInfoList.clear(); + this.additionalOutputInfoList.addAll(additionalOutputInfoList); + } + + void addInEdge(EdgeInfo edgeInfo) { + this.inEdgeList.add(edgeInfo); + } + + void addOutEdge(EdgeInfo edgeInfo) { + this.outEdgeList.add(edgeInfo); + } + + void setDagInfo(DagInfo dagInfo) { + Preconditions.checkArgument(dagInfo != null, "Provide valid dagInfo"); + this.dagInfo = dagInfo; + //link vertex to dagInfo + dagInfo.addVertexInfo(this); + updateEdgeInfo(); + } + + public List<AdditionalInputOutputDetails> getAdditionalInputInfoList() { + return Collections.unmodifiableList(additionalInputInfoList); + } + + public List<AdditionalInputOutputDetails> getAdditionalOutputInfoList() { + return Collections.unmodifiableList(additionalOutputInfoList); + } + + @Override + public final long getStartTimeInterval() { + return startTime - (dagInfo.getStartTime()); + } + + public final long getFirstTaskStartTimeInterval() { + TaskInfo firstTask = getFirstTaskToStart(); + if (firstTask == null) { + return 0; + } + return firstTask.getStartTimeInterval(); + } + + public final long getLastTaskFinishTimeInterval() { + if (getLastTaskToFinish() == null || getLastTaskToFinish().getFinishTimeInterval() < 0) { + return dagInfo.getFinishTimeInterval(); + } + return getLastTaskToFinish().getFinishTimeInterval(); + } + + public final long getAvgPostDataExecutionTimeInterval() { + if (avgPostDataExecutionTimeInterval == -1) { + long totalExecutionTime = 0; + long totalAttempts = 0; + for (TaskInfo task : getTasks()) { + TaskAttemptInfo attempt = task.getSuccessfulTaskAttempt(); + if (attempt != null) { + // count only time after last data was received + long execTime = attempt.getPostDataExecutionTimeInterval(); + if (execTime >= 0) { + totalExecutionTime += execTime; + totalAttempts++; + } + } + } + if (totalAttempts > 0) { + avgPostDataExecutionTimeInterval = Math.round(totalExecutionTime*1.0/totalAttempts); + } + } + return avgPostDataExecutionTimeInterval; + } + + public final long getStartTime() { + return startTime; + } + + public final long getFinishTime() { + return finishTime; + } + + public final long getInitTime() { + return initTime; + } + + public final long getInitRequestedTime() { + return initRequestedTime; + } + + public final long getStartRequestedTime() { + return startRequestedTime; + } + + @Override + public final long getFinishTimeInterval() { + long vertexEndTime = finishTime - (dagInfo.getStartTime()); + if (vertexEndTime < 0) { + //probably vertex is not complete or failed in middle. get the last task attempt time + for (TaskInfo taskInfo : getTasks()) { + vertexEndTime = (taskInfo.getFinishTimeInterval() > vertexEndTime) + ? taskInfo.getFinishTimeInterval() : vertexEndTime; + } + } + return vertexEndTime; + } + + @Override + public final String getDiagnostics() { + return diagnostics; + } + + public final String getVertexName() { + return vertexName; + } + + public final String getVertexId() { + return vertexId; + } + + //Quite possible that getFinishTime is not yet recorded for failed vertices (or killed vertices) + //Start time of vertex infers that the dependencies are done and AM has inited it. + public final long getTimeTaken() { + return (getFinishTimeInterval() - getStartTimeInterval()); + } + + //Time taken for last task to finish - time taken for first task to start + public final long getTimeTakenForTasks() { + return (getLastTaskFinishTimeInterval() - getFirstTaskStartTimeInterval()); + } + + public final long getInitTimeInterval() { + return initTime - dagInfo.getStartTime(); + } + + public final int getNumTasks() { + return numTasks; + } + + public final int getFailedTasksCount() { + return failedTasks; + } + + public final int getKilledTasksCount() { + return killedTasks; + } + + public final int getCompletedTasksCount() { + return completedTasks; + } + + public final int getSucceededTasksCount() { + return succeededTasks; + } + + public final int getNumFailedTaskAttemptsCount() { + return numFailedTaskAttempts; + } + + public final String getProcessorClassName() { + return processorClass; + + } + + + private List<TaskInfo> getTasksInternal() { + return Lists.newLinkedList(taskInfoMap.values()); + } + + /** + * Get all tasks + * + * @return list of taskInfo + */ + public final List<TaskInfo> getTasks() { + return Collections.unmodifiableList(getTasksInternal()); + } + + /** + * Get all tasks in sorted order + * + * @param sorted + * @param ordering + * @return list of TaskInfo + */ + public final List<TaskInfo> getTasks(boolean sorted, @Nullable Ordering<TaskInfo> ordering) { + List<TaskInfo> taskInfoList = getTasksInternal(); + if (sorted) { + Collections.sort(taskInfoList, ((ordering == null) ? orderingOnStartTime() : ordering)); + } + return Collections.unmodifiableList(taskInfoList); + } + + /** + * Get list of failed tasks + * + * @return List<TaskAttemptInfo> + */ + public final List<TaskInfo> getFailedTasks() { + return getTasks(TaskState.FAILED); + } + + /** + * Get list of killed tasks + * + * @return List<TaskAttemptInfo> + */ + public final List<TaskInfo> getKilledTasks() { + return getTasks(TaskState.KILLED); + } + + /** + * Get list of failed tasks + * + * @return List<TaskAttemptInfo> + */ + public final List<TaskInfo> getSuccessfulTasks() { + return getTasks(TaskState.SUCCEEDED); + } + + /** + * Get list of tasks belonging to a specific state + * + * @param state + * @return List<TaskAttemptInfo> + */ + public final List<TaskInfo> getTasks(final TaskState state) { + return Collections.unmodifiableList(Lists.newLinkedList(Iterables.filter(Lists.newLinkedList + (taskInfoMap.values()), new Predicate<TaskInfo>() { + @Override public boolean apply(TaskInfo input) { + return input.getStatus() != null && input.getStatus().equals(state.toString()); + } + } + ) + ) + ); + } + + /** + * Get source vertices for this vertex + * + * @return List<VertexInfo> list of incoming vertices to this vertex + */ + public final List<VertexInfo> getInputVertices() { + List<VertexInfo> inputVertices = Lists.newLinkedList(); + for (EdgeInfo edge : inEdgeList) { + inputVertices.add(edge.getSourceVertex()); + } + return Collections.unmodifiableList(inputVertices); + } + + /** + * Get destination vertices for this vertex + * + * @return List<VertexInfo> list of output vertices + */ + public final List<VertexInfo> getOutputVertices() { + List<VertexInfo> outputVertices = Lists.newLinkedList(); + for (EdgeInfo edge : outEdgeList) { + outputVertices.add(edge.getDestinationVertex()); + } + return Collections.unmodifiableList(outputVertices); + } + + // expensive method to call for large DAGs as it creates big lists on every call + private List<TaskAttemptInfo> getTaskAttemptsInternal() { + List<TaskAttemptInfo> taskAttemptInfos = Lists.newLinkedList(); + for (TaskInfo taskInfo : getTasks()) { + taskAttemptInfos.addAll(taskInfo.getTaskAttempts()); + } + return taskAttemptInfos; + } + + /** + * Get all task attempts + * + * @return List<TaskAttemptInfo> list of attempts + */ + public List<TaskAttemptInfo> getTaskAttempts() { + return Collections.unmodifiableList(getTaskAttemptsInternal()); + } + + /** + * Get all task attempts in sorted order + * + * @param sorted + * @param ordering + * @return list of TaskAttemptInfo + */ + public final List<TaskAttemptInfo> getTaskAttempts(boolean sorted, + @Nullable Ordering<TaskAttemptInfo> ordering) { + List<TaskAttemptInfo> taskAttemptInfos = getTaskAttemptsInternal(); + if (sorted) { + Collections.sort(taskAttemptInfos, ((ordering == null) ? orderingOnAttemptStartTime() : ordering)); + } + return Collections.unmodifiableList(taskAttemptInfos); + } + + public final TaskInfo getTask(String taskId) { + return taskInfoMap.get(taskId); + } + + /** + * Get incoming edge information for a specific vertex + * + * @return List<EdgeInfo> list of input edges on this vertex + */ + public final List<EdgeInfo> getInputEdges() { + return Collections.unmodifiableList(inEdgeList); + } + + /** + * Get outgoing edge information for a specific vertex + * + * @return List<EdgeInfo> list of output edges on this vertex + */ + public final List<EdgeInfo> getOutputEdges() { + return Collections.unmodifiableList(outEdgeList); + } + + public final Multimap<Container, TaskAttemptInfo> getContainersMapping() { + Multimap<Container, TaskAttemptInfo> containerMapping = LinkedHashMultimap.create(); + for (TaskAttemptInfo attemptInfo : getTaskAttempts()) { + containerMapping.put(attemptInfo.getContainer(), attemptInfo); + } + return Multimaps.unmodifiableMultimap(containerMapping); + } + + /** + * Get first task to start + * + * @return TaskInfo + */ + public final TaskInfo getFirstTaskToStart() { + List<TaskInfo> taskInfoList = Lists.newLinkedList(taskInfoMap.values()); + if (taskInfoList.size() == 0) { + return null; + } + Collections.sort(taskInfoList, new Comparator<TaskInfo>() { + @Override public int compare(TaskInfo o1, TaskInfo o2) { + return (o1.getStartTimeInterval() < o2.getStartTimeInterval()) ? -1 : + ((o1.getStartTimeInterval() == o2.getStartTimeInterval()) ? + 0 : 1); + } + }); + return taskInfoList.get(0); + } + + /** + * Get last task to finish + * + * @return TaskInfo + */ + public final TaskInfo getLastTaskToFinish() { + List<TaskInfo> taskInfoList = Lists.newLinkedList(taskInfoMap.values()); + if (taskInfoList.size() == 0) { + return null; + } + Collections.sort(taskInfoList, new Comparator<TaskInfo>() { + @Override public int compare(TaskInfo o1, TaskInfo o2) { + return (o1.getFinishTimeInterval() > o2.getFinishTimeInterval()) ? -1 : + ((o1.getStartTimeInterval() == o2.getStartTimeInterval()) ? + 0 : 1); + } + }); + return taskInfoList.get(0); + } + + /** + * Get average task duration + * + * @return long + */ + public final float getAvgTaskDuration() { + float totalTaskDuration = 0; + List<TaskInfo> tasksList = getTasks(); + if (tasksList.size() == 0) { + return 0; + } + for (TaskInfo taskInfo : tasksList) { + totalTaskDuration += taskInfo.getTimeTaken(); + } + return ((totalTaskDuration * 1.0f) / tasksList.size()); + } + + /** + * Get min task duration in vertex + * + * @return long + */ + public final long getMinTaskDuration() { + TaskInfo taskInfo = getMinTaskDurationTask(); + return (taskInfo != null) ? taskInfo.getTimeTaken() : 0; + } + + /** + * Get max task duration in vertex + * + * @return long + */ + public final long getMaxTaskDuration() { + TaskInfo taskInfo = getMaxTaskDurationTask(); + return (taskInfo != null) ? taskInfo.getTimeTaken() : 0; + } + + private Ordering<TaskInfo> orderingOnTimeTaken() { + return Ordering.from(new Comparator<TaskInfo>() { + @Override public int compare(TaskInfo o1, TaskInfo o2) { + return (o1.getTimeTaken() < o2.getTimeTaken()) ? -1 : + ((o1.getTimeTaken() == o2.getTimeTaken()) ? 0 : 1); + } + }); + } + + private Ordering<TaskInfo> orderingOnStartTime() { + return Ordering.from(new Comparator<TaskInfo>() { + @Override public int compare(TaskInfo o1, TaskInfo o2) { + return (o1.getStartTimeInterval() < o2.getStartTimeInterval()) ? -1 : + ((o1.getStartTimeInterval() == o2.getStartTimeInterval()) ? 0 : 1); + } + }); + } + + private Ordering<TaskAttemptInfo> orderingOnAttemptStartTime() { + return Ordering.from(new Comparator<TaskAttemptInfo>() { + @Override public int compare(TaskAttemptInfo o1, TaskAttemptInfo o2) { + return (o1.getStartTimeInterval() < o2.getStartTimeInterval()) ? -1 : + ((o1.getStartTimeInterval() == o2.getStartTimeInterval()) ? 0 : 1); + } + }); + } + + /** + * Get min task duration in vertex + * + * @return TaskInfo + */ + public final TaskInfo getMinTaskDurationTask() { + List<TaskInfo> taskInfoList = getTasks(); + if (taskInfoList.size() == 0) { + return null; + } + + return orderingOnTimeTaken().min(taskInfoList); + } + + /** + * Get max task duration in vertex + * + * @return TaskInfo + */ + public final TaskInfo getMaxTaskDurationTask() { + List<TaskInfo> taskInfoList = getTasks(); + if (taskInfoList.size() == 0) { + return null; + } + return orderingOnTimeTaken().max(taskInfoList); + } + + public final String getStatus() { + return status; + } + + public final DagInfo getDagInfo() { + return dagInfo; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("["); + sb.append("vertexName=").append(getVertexName()).append(", "); + sb.append("events=").append(getEvents()).append(", "); + sb.append("initTime=").append(getInitTimeInterval()).append(", "); + sb.append("startTime=").append(getStartTimeInterval()).append(", "); + sb.append("endTime=").append(getFinishTimeInterval()).append(", "); + sb.append("timeTaken=").append(getTimeTaken()).append(", "); + sb.append("diagnostics=").append(getDiagnostics()).append(", "); + sb.append("numTasks=").append(getNumTasks()).append(", "); + sb.append("processorClassName=").append(getProcessorClassName()).append(", "); + sb.append("numCompletedTasks=").append(getCompletedTasksCount()).append(", "); + sb.append("numFailedTaskAttempts=").append(getNumFailedTaskAttemptsCount()).append(", "); + sb.append("numSucceededTasks=").append(getSucceededTasksCount()).append(", "); + sb.append("numFailedTasks=").append(getFailedTasks()).append(", "); + sb.append("numKilledTasks=").append(getKilledTasks()).append(", "); + sb.append("tasksCount=").append(taskInfoMap.size()).append(", "); + sb.append("status=").append(getStatus()); + sb.append("]"); + return sb.toString(); + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/8c8db7c5/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/utils/Utils.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/utils/Utils.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/utils/Utils.java new file mode 100644 index 0000000..ffb854a --- /dev/null +++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/utils/Utils.java @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.history.parser.utils; + +import com.google.common.base.Strings; +import com.google.common.collect.Lists; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.util.StringInterner; +import org.apache.log4j.ConsoleAppender; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.log4j.PatternLayout; +import org.apache.tez.common.counters.CounterGroup; +import org.apache.tez.common.counters.TezCounter; +import org.apache.tez.common.counters.TezCounters; +import org.apache.tez.dag.history.logging.EntityTypes; +import org.apache.tez.history.parser.datamodel.Constants; +import org.apache.tez.history.parser.datamodel.Event; +import org.apache.tez.history.parser.datamodel.TaskAttemptInfo.DataDependencyEvent; +import org.codehaus.jettison.json.JSONArray; +import org.codehaus.jettison.json.JSONException; +import org.codehaus.jettison.json.JSONObject; + +import java.util.List; + [email protected] +public class Utils { + + private static final String LOG4J_CONFIGURATION = "log4j.configuration"; + + /** + * Parse tez counters from json + * + * @param jsonObject + * @return TezCounters + * @throws JSONException + */ + public static TezCounters parseTezCountersFromJSON(JSONObject jsonObject) + throws JSONException { + TezCounters counters = new TezCounters(); + + if (jsonObject == null) { + return counters; //empty counters. + } + + final JSONArray counterGroupNodes = jsonObject.optJSONArray(Constants.COUNTER_GROUPS); + if (counterGroupNodes != null) { + for (int i = 0; i < counterGroupNodes.length(); i++) { + JSONObject counterGroupNode = counterGroupNodes.optJSONObject(i); + final String groupName = counterGroupNode.optString(Constants.COUNTER_GROUP_NAME); + final String groupDisplayName = counterGroupNode.optString( + Constants.COUNTER_GROUP_DISPLAY_NAME, groupName); + + CounterGroup group = counters.addGroup(groupName, groupDisplayName); + + final JSONArray counterNodes = counterGroupNode.optJSONArray(Constants.COUNTERS); + + //Parse counter nodes + for (int j = 0; j < counterNodes.length(); j++) { + JSONObject counterNode = counterNodes.optJSONObject(j); + final String counterName = counterNode.getString(Constants.COUNTER_NAME); + final String counterDisplayName = + counterNode.optString(Constants.COUNTER_DISPLAY_NAME, counterName); + final long counterValue = counterNode.getLong(Constants.COUNTER_VALUE); + TezCounter counter = group.findCounter( + counterName, + counterDisplayName); + counter.setValue(counterValue); + } + } + } + return counters; + } + + public static List<DataDependencyEvent> parseDataEventDependencyFromJSON(JSONObject jsonObject) + throws JSONException { + List<DataDependencyEvent> events = Lists.newArrayList(); + JSONArray fields = jsonObject.optJSONArray(Constants.LAST_DATA_EVENTS); + for (int i=0; i<fields.length(); i++) { + JSONObject eventMap = fields.getJSONObject(i); + events.add(new DataDependencyEvent( + StringInterner.weakIntern(eventMap.optString(EntityTypes.TEZ_TASK_ATTEMPT_ID.name())), + eventMap.optLong(Constants.TIMESTAMP))); + } + return events; + } + + /** + * Parse events from json + * + * @param eventNodes + * @param eventList + * @throws JSONException + */ + public static void parseEvents(JSONArray eventNodes, List<Event> eventList) throws + JSONException { + if (eventNodes == null) { + return; + } + for (int i = 0; i < eventNodes.length(); i++) { + JSONObject eventNode = eventNodes.optJSONObject(i); + final String eventInfo = eventNode.optString(Constants.EVENT_INFO); + final String eventType = eventNode.optString(Constants.EVENT_TYPE); + final long time = eventNode.optLong(Constants.EVENT_TIME_STAMP); + + Event event = new Event(eventInfo, eventType, time); + + eventList.add(event); + + } + } + + public static void setupRootLogger() { + if (Strings.isNullOrEmpty(System.getProperty(LOG4J_CONFIGURATION))) { + //By default print to console with INFO level + Logger.getRootLogger(). + addAppender(new ConsoleAppender(new PatternLayout(PatternLayout.TTCC_CONVERSION_PATTERN))); + Logger.getRootLogger().setLevel(Level.INFO); + } + } + +}
