Repository: tez Updated Branches: refs/heads/branch-0.7 86b71c6a0 -> 6852f79aa
TEZ-3107. tez-tools: Log warn msgs in case ATS has wrong values (e.g startTime > finishTime) (rbalamohan) (cherry picked from commit 7e3d5461c3b948ca1c27f386e3e9e3665b8a649e) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/6852f79a Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/6852f79a Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/6852f79a Branch: refs/heads/branch-0.7 Commit: 6852f79aaa885aee183d168831c9d405adc133c6 Parents: 86b71c6 Author: Rajesh Balamohan <[email protected]> Authored: Fri Feb 12 09:59:25 2016 -0800 Committer: Rajesh Balamohan <[email protected]> Committed: Fri Feb 12 10:28:44 2016 -0800 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/tez/dag/app/dag/impl/DAGImpl.java | 13 +++++- .../tez/history/parser/datamodel/DagInfo.java | 33 ++++++++++++++- .../parser/datamodel/TaskAttemptInfo.java | 37 +++++++++++++++- .../tez/history/parser/datamodel/TaskInfo.java | 37 +++++++++++++++- .../history/parser/datamodel/VertexInfo.java | 44 ++++++++++++++++++-- .../apache/tez/history/TestHistoryParser.java | 6 +++ 7 files changed, 161 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/6852f79a/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index a386c8b..e991566 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES TEZ-2972. Avoid task rescheduling when a node turns unhealthy ALL CHANGES + TEZ-3107. tez-tools: Log warn msgs in case ATS has wrong values (e.g startTime > finishTime). TEZ-3104. Tez fails on Bzip2 intermediate output format on hadoop 2.7.1 and earlier TEZ-3093. CriticalPathAnalyzer should be accessible via zeppelin TEZ-3089. TaskConcurrencyAnalyzer can return negative task count with very large jobs http://git-wip-us.apache.org/repos/asf/tez/blob/6852f79a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java index bb6f474..6b98a7b 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java @@ -1255,6 +1255,12 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, void logJobHistoryFinishedEvent(TezCounters counters) throws IOException { Map<String, Integer> taskStats = constructTaskStats(getDAGProgress()); + if (finishTime < startTime) { + LOG.warn("DAG finish time is smaller than start time. " + + "startTime=" + startTime + + ", finishTime=" + finishTime + ); + } DAGFinishedEvent finishEvt = new DAGFinishedEvent(dagId, startTime, finishTime, DAGState.SUCCEEDED, "", counters, this.userName, this.dagName, taskStats, this.appContext.getApplicationAttemptId()); @@ -1264,7 +1270,12 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, void logJobHistoryUnsuccesfulEvent(DAGState state, TezCounters counters) throws IOException { Map<String, Integer> taskStats = constructTaskStats(getDAGProgress()); - + if (finishTime < startTime) { + LOG.warn("DAG finish time is smaller than start time. " + + "startTime=" + startTime + + ", finishTime=" + finishTime + ); + } DAGFinishedEvent finishEvt = new DAGFinishedEvent(dagId, startTime, clock.getTime(), state, StringUtils.join(getDiagnostics(), LINE_SEPARATOR), http://git-wip-us.apache.org/repos/asf/tez/blob/6852f79a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/DagInfo.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/DagInfo.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/DagInfo.java index 5fb760c..8057be7 100644 --- a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/DagInfo.java +++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/DagInfo.java @@ -35,6 +35,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.util.StringInterner; import org.apache.tez.client.CallerContext; import org.apache.tez.dag.api.event.VertexState; +import org.apache.tez.dag.history.HistoryEventType; import org.codehaus.jettison.json.JSONArray; import org.codehaus.jettison.json.JSONException; import org.codehaus.jettison.json.JSONObject; @@ -98,8 +99,36 @@ public class DagInfo extends BaseInfo { //Parse additional Info JSONObject otherInfoNode = jsonObject.getJSONObject(Constants.OTHER_INFO); - startTime = otherInfoNode.optLong(Constants.START_TIME); - endTime = otherInfoNode.optLong(Constants.FINISH_TIME); + + long sTime = otherInfoNode.optLong(Constants.START_TIME); + long eTime= otherInfoNode.optLong(Constants.FINISH_TIME); + if (eTime < sTime) { + LOG.warn("DAG has got wrong start/end values. " + + "startTime=" + sTime + ", endTime=" + eTime + ". Will check " + + "timestamps in DAG started/finished events"); + + // Check if events DAG_STARTED, DAG_FINISHED can be made use of + for(Event event : eventList) { + switch (HistoryEventType.valueOf(event.getType())) { + case DAG_STARTED: + sTime = event.getAbsoluteTime(); + break; + case DAG_FINISHED: + eTime = event.getAbsoluteTime(); + break; + default: + break; + } + } + + if (eTime < sTime) { + LOG.warn("DAG has got wrong start/end values in events as well. " + + "startTime=" + sTime + ", endTime=" + eTime); + } + } + startTime = sTime; + endTime = eTime; + //TODO: Not getting populated correctly for lots of jobs. Verify submitTime = otherInfoNode.optLong(Constants.START_REQUESTED_TIME); diagnostics = otherInfoNode.optString(Constants.DIAGNOSTICS); http://git-wip-us.apache.org/repos/asf/tez/blob/6852f79a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java index d373513..885d743 100644 --- a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java +++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java @@ -24,12 +24,15 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Ordering; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.util.StringInterner; import org.apache.tez.common.ATSConstants; import org.apache.tez.common.counters.DAGCounter; import org.apache.tez.common.counters.TaskCounter; import org.apache.tez.common.counters.TezCounter; import org.apache.tez.dag.api.oldrecords.TaskAttemptState; +import org.apache.tez.dag.history.HistoryEventType; import org.apache.tez.history.parser.utils.Utils; import org.codehaus.jettison.json.JSONException; import org.codehaus.jettison.json.JSONObject; @@ -46,6 +49,8 @@ import static org.apache.hadoop.classification.InterfaceAudience.Public; @Evolving public class TaskAttemptInfo extends BaseInfo { + private static final Log LOG = LogFactory.getLog(TaskAttemptInfo.class); + private static final String SUCCEEDED = StringInterner.weakIntern(TaskAttemptState.SUCCEEDED.name()); private final String taskAttemptId; @@ -95,8 +100,36 @@ public class TaskAttemptInfo extends BaseInfo { //Parse additional Info final JSONObject otherInfoNode = jsonObject.getJSONObject(Constants.OTHER_INFO); - startTime = otherInfoNode.optLong(Constants.START_TIME); - endTime = otherInfoNode.optLong(Constants.FINISH_TIME); + + long sTime = otherInfoNode.optLong(Constants.START_TIME); + long eTime = otherInfoNode.optLong(Constants.FINISH_TIME); + if (eTime < sTime) { + LOG.warn("TaskAttemptInfo has got wrong start/end values. " + + "startTime=" + sTime + ", endTime=" + eTime + ". Will check " + + "timestamps in DAG started/finished events"); + + // Check if events TASK_STARTED, TASK_FINISHED can be made use of + for(Event event : eventList) { + switch (HistoryEventType.valueOf(event.getType())) { + case TASK_ATTEMPT_STARTED: + sTime = event.getAbsoluteTime(); + break; + case TASK_ATTEMPT_FINISHED: + eTime = event.getAbsoluteTime(); + break; + default: + break; + } + } + + if (eTime < sTime) { + LOG.warn("TaskAttemptInfo has got wrong start/end values in events as well. " + + "startTime=" + sTime + ", endTime=" + eTime); + } + } + startTime = sTime; + endTime = eTime; + diagnostics = otherInfoNode.optString(Constants.DIAGNOSTICS); creationTime = otherInfoNode.optLong(Constants.CREATION_TIME); creationCausalTA = StringInterner.weakIntern( http://git-wip-us.apache.org/repos/asf/tez/blob/6852f79a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskInfo.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskInfo.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskInfo.java index c6f89d6..fb3f232 100644 --- a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskInfo.java +++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskInfo.java @@ -29,8 +29,11 @@ import com.google.common.collect.Multimap; import com.google.common.collect.Multimaps; import com.google.common.collect.Ordering; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.util.StringInterner; import org.apache.tez.dag.api.oldrecords.TaskAttemptState; +import org.apache.tez.dag.history.HistoryEventType; import org.codehaus.jettison.json.JSONException; import org.codehaus.jettison.json.JSONObject; @@ -46,6 +49,8 @@ import static org.apache.hadoop.classification.InterfaceStability.Evolving; @Evolving public class TaskInfo extends BaseInfo { + private static final Log LOG = LogFactory.getLog(TaskInfo.class); + private final long startTime; private final long endTime; private final String diagnostics; @@ -70,8 +75,36 @@ public class TaskInfo extends BaseInfo { //Parse additional Info final JSONObject otherInfoNode = jsonObject.getJSONObject(Constants.OTHER_INFO); - startTime = otherInfoNode.optLong(Constants.START_TIME); - endTime = otherInfoNode.optLong(Constants.FINISH_TIME); + + long sTime = otherInfoNode.optLong(Constants.START_TIME); + long eTime = otherInfoNode.optLong(Constants.FINISH_TIME); + if (eTime < sTime) { + LOG.warn("Task has got wrong start/end values. " + + "startTime=" + sTime + ", endTime=" + eTime + ". Will check " + + "timestamps in DAG started/finished events"); + + // Check if events TASK_STARTED, TASK_FINISHED can be made use of + for(Event event : eventList) { + switch (HistoryEventType.valueOf(event.getType())) { + case TASK_STARTED: + sTime = event.getAbsoluteTime(); + break; + case TASK_FINISHED: + eTime = event.getAbsoluteTime(); + break; + default: + break; + } + } + + if (eTime < sTime) { + LOG.warn("Task has got wrong start/end values in events as well. " + + "startTime=" + sTime + ", endTime=" + eTime); + } + } + startTime = sTime; + endTime = eTime; + diagnostics = otherInfoNode.optString(Constants.DIAGNOSTICS); successfulAttemptId = StringInterner.weakIntern( otherInfoNode.optString(Constants.SUCCESSFUL_ATTEMPT_ID)); http://git-wip-us.apache.org/repos/asf/tez/blob/6852f79a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VertexInfo.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VertexInfo.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VertexInfo.java index 50647fe..0f6831b 100644 --- a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VertexInfo.java +++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VertexInfo.java @@ -28,8 +28,11 @@ import com.google.common.collect.Multimap; import com.google.common.collect.Multimaps; import com.google.common.collect.Ordering; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.util.StringInterner; import org.apache.tez.dag.api.oldrecords.TaskState; +import org.apache.tez.dag.history.HistoryEventType; import org.codehaus.jettison.json.JSONException; import org.codehaus.jettison.json.JSONObject; @@ -46,6 +49,8 @@ import static org.apache.hadoop.classification.InterfaceStability.Evolving; @Evolving public class VertexInfo extends BaseInfo { + private static final Log LOG = LogFactory.getLog(VertexInfo.class); + private final String vertexId; private final String vertexName; private final long finishTime; @@ -98,9 +103,42 @@ public class VertexInfo extends BaseInfo { JSONObject otherInfoNode = jsonObject.getJSONObject(Constants.OTHER_INFO); initRequestedTime = otherInfoNode.optLong(Constants.INIT_REQUESTED_TIME); startRequestedTime = otherInfoNode.optLong(Constants.START_REQUESTED_TIME); - startTime = otherInfoNode.optLong(Constants.START_TIME); - initTime = otherInfoNode.optLong(Constants.INIT_TIME); - finishTime = otherInfoNode.optLong(Constants.FINISH_TIME); + + long sTime = otherInfoNode.optLong(Constants.START_TIME); + long iTime = otherInfoNode.optLong(Constants.INIT_TIME); + long eTime = otherInfoNode.optLong(Constants.FINISH_TIME); + if (eTime < sTime) { + LOG.warn("Vertex has got wrong start/end values. " + + "startTime=" + sTime + ", endTime=" + eTime + ". Will check " + + "timestamps in DAG started/finished events"); + + // Check if events VERTEX_STARTED, VERTEX_FINISHED can be made use of + for(Event event : eventList) { + switch (HistoryEventType.valueOf(event.getType())) { + case VERTEX_INITIALIZED: + iTime = event.getAbsoluteTime(); + break; + case VERTEX_STARTED: + sTime = event.getAbsoluteTime(); + break; + case VERTEX_FINISHED: + eTime = event.getAbsoluteTime(); + break; + default: + break; + } + } + + if (eTime < sTime) { + LOG.warn("Vertex has got wrong start/end values in events as well. " + + "startTime=" + sTime + ", endTime=" + eTime); + } + } + startTime = sTime; + finishTime = eTime; + initTime = iTime; + + diagnostics = otherInfoNode.optString(Constants.DIAGNOSTICS); numTasks = otherInfoNode.optInt(Constants.NUM_TASKS); failedTasks = otherInfoNode.optInt(Constants.NUM_FAILED_TASKS); http://git-wip-us.apache.org/repos/asf/tez/blob/6852f79a/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestHistoryParser.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestHistoryParser.java b/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestHistoryParser.java index b373f6e..372585b 100644 --- a/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestHistoryParser.java +++ b/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestHistoryParser.java @@ -249,6 +249,7 @@ public class TestHistoryParser { WordCount.TokenProcessor.class.getName())); assertTrue(dagInfo.getVertex(SUMMATION).getProcessorClassName() .equals(WordCount.SumProcessor.class.getName())); + assertTrue(dagInfo.getFinishTime() > dagInfo.getStartTime()); assertTrue(dagInfo.getEdges().size() == 1); EdgeInfo edgeInfo = dagInfo.getEdges().iterator().next(); assertTrue(edgeInfo.getDataMovementType(). @@ -269,6 +270,7 @@ public class TestHistoryParser { assertTrue(vertexInfo.getStartRequestedTime() > 0); assertTrue(vertexInfo.getStartTime() > 0); assertTrue(vertexInfo.getFinishTime() > 0); + assertTrue(vertexInfo.getFinishTime() > vertexInfo.getStartTime()); long finishTime = 0; for (TaskInfo taskInfo : vertexInfo.getTasks()) { assertTrue(taskInfo.getNumberOfTaskAttempts() == 1); @@ -280,6 +282,7 @@ public class TestHistoryParser { assertTrue(taskInfo.getSuccessfulTaskAttempts().size() > 0); assertTrue(taskInfo.getFailedTaskAttempts().size() == 0); assertTrue(taskInfo.getKilledTaskAttempts().size() == 0); + assertTrue(taskInfo.getFinishTime() > taskInfo.getStartTime()); List<TaskAttemptInfo> attempts = taskInfo.getTaskAttempts(); if (vertexInfo.getVertexName().equals(TOKENIZER)) { // get the last task to finish and track its successful attempt @@ -304,6 +307,7 @@ public class TestHistoryParser { assertTrue(attemptInfo.getCreationTime() > 0); assertTrue(attemptInfo.getAllocationTime() > 0); assertTrue(attemptInfo.getStartTime() > 0); + assertTrue(attemptInfo.getFinishTime() > attemptInfo.getStartTime()); } } assertTrue(vertexInfo.getLastTaskToFinish() != null); @@ -748,6 +752,7 @@ public class TestHistoryParser { assertTrue(vertexInfo.getFirstTaskToStart() != null); assertTrue(vertexInfo.getSucceededTasksCount() > 0); assertTrue(vertexInfo.getTasks().size() > 0); + assertTrue(vertexInfo.getFinishTime() > vertexInfo.getStartTime()); } for (TaskInfo taskInfo : vertexInfo.getTasks()) { @@ -781,6 +786,7 @@ public class TestHistoryParser { taskInfo.getStartTimeInterval() > 0 && taskInfo.getStartTime() > taskInfo.getStartTimeInterval()); assertTrue(taskInfo.getSuccessfulAttemptId() != null); assertTrue(taskInfo.getSuccessfulTaskAttempt() != null); + assertTrue(taskInfo.getFinishTime() > taskInfo.getStartTime()); } assertTrue(taskInfo.getTaskId() != null);
