TEZ-2248. VertexImpl/DAGImpl.checkForCompletion have too many termination cause checks (zjffdu)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/778c1f5a Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/778c1f5a Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/778c1f5a Branch: refs/heads/TEZ-2003 Commit: 778c1f5afcf6cc2552e180e3d48385a1fade673d Parents: 0a64ae7 Author: Jeff Zhang <[email protected]> Authored: Fri Apr 24 13:05:18 2015 +0800 Committer: Jeff Zhang <[email protected]> Committed: Fri Apr 24 13:05:18 2015 +0800 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../tez/dag/app/dag/DAGTerminationCause.java | 28 +++-- .../tez/dag/app/dag/VertexTerminationCause.java | 34 +++-- .../apache/tez/dag/app/dag/impl/DAGImpl.java | 66 +--------- .../apache/tez/dag/app/dag/impl/VertexImpl.java | 123 ++----------------- 5 files changed, 57 insertions(+), 195 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/778c1f5a/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 47af758..afb458a 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -9,6 +9,7 @@ INCOMPATIBLE CHANGES TEZ-1993. Implement a pluggable InputSizeEstimator for grouping fairly ALL CHANGES: + TEZ-2248. VertexImpl/DAGImpl.checkForCompletion have too many termination cause checks TEZ-2341. TestMockDAGAppMaster.testBasicCounters fails on windows TEZ-2352. Move getTaskStatistics into the RuntimeTask class. TEZ-2357. Tez UI: misc.js.orig is committed by accident http://git-wip-us.apache.org/repos/asf/tez/blob/778c1f5a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGTerminationCause.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGTerminationCause.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGTerminationCause.java index d8ba95d..b6be395 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGTerminationCause.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGTerminationCause.java @@ -25,27 +25,37 @@ package org.apache.tez.dag.app.dag; public enum DAGTerminationCause { /** DAG was directly killed. */ - DAG_KILL, + DAG_KILL(DAGState.KILLED), /** A vertex failed. */ - VERTEX_FAILURE, + VERTEX_FAILURE(DAGState.FAILED), /** DAG failed due as it had zero vertices. */ - ZERO_VERTICES, + ZERO_VERTICES(DAGState.FAILED), /** DAG failed during init. */ - INIT_FAILURE, + INIT_FAILURE(DAGState.FAILED), /** DAG failed during output commit. */ - COMMIT_FAILURE, + COMMIT_FAILURE(DAGState.FAILED), /** In some cases, vertex could not rerun, e.g. its output been committed as a shared output of vertex group */ - VERTEX_RERUN_AFTER_COMMIT, + VERTEX_RERUN_AFTER_COMMIT(DAGState.FAILED), - VERTEX_RERUN_IN_COMMITTING, + VERTEX_RERUN_IN_COMMITTING(DAGState.FAILED), /** DAG failed while trying to write recovery events */ - RECOVERY_FAILURE, + RECOVERY_FAILURE(DAGState.FAILED), - INTERNAL_ERROR + INTERNAL_ERROR(DAGState.ERROR); + + private DAGState finishedState; + + DAGTerminationCause(DAGState finishedState) { + this.finishedState = finishedState; + } + + public DAGState getFinishedState() { + return finishedState; + } } http://git-wip-us.apache.org/repos/asf/tez/blob/778c1f5a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexTerminationCause.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexTerminationCause.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexTerminationCause.java index ebece97..28712ad 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexTerminationCause.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexTerminationCause.java @@ -24,39 +24,49 @@ package org.apache.tez.dag.app.dag; public enum VertexTerminationCause { /** DAG was killed */ - DAG_KILL, + DAG_KILL(VertexState.KILLED), /** Other vertex failed causing DAG to fail thus killing this vertex */ - OTHER_VERTEX_FAILURE, + OTHER_VERTEX_FAILURE(VertexState.KILLED), /** Initialization failed for one of the root Inputs */ - ROOT_INPUT_INIT_FAILURE, + ROOT_INPUT_INIT_FAILURE(VertexState.FAILED), /** This vertex failed as its AM usercode (VertexManager/EdgeManager/InputInitializer) * throw Exception */ - AM_USERCODE_FAILURE, + AM_USERCODE_FAILURE(VertexState.FAILED), /** One of the tasks for this vertex failed. */ - OWN_TASK_FAILURE, + OWN_TASK_FAILURE(VertexState.FAILED), /** This vertex failed during commit. */ - COMMIT_FAILURE, + COMMIT_FAILURE(VertexState.FAILED), /** In some cases, vertex could not rerun, e.g. its output been committed as a shared output of vertex group */ - VERTEX_RERUN_AFTER_COMMIT, + VERTEX_RERUN_AFTER_COMMIT(VertexState.FAILED), /** Rerun vertex while it is in committing, it would cause conflict. */ - VERTEX_RERUN_IN_COMMITTING, + VERTEX_RERUN_IN_COMMITTING(VertexState.FAILED), /** This vertex failed as it had invalid number tasks. */ - INVALID_NUM_OF_TASKS, + INVALID_NUM_OF_TASKS(VertexState.FAILED), /** This vertex failed during init. */ - INIT_FAILURE, + INIT_FAILURE(VertexState.FAILED), - INTERNAL_ERROR, + INTERNAL_ERROR(VertexState.ERROR), /** error when writing recovery log */ - RECOVERY_ERROR, + RECOVERY_ERROR(VertexState.FAILED); + + private VertexState finishedState; + + private VertexTerminationCause(VertexState finishedState) { + this.finishedState = finishedState; + } + + public VertexState getFinishedState() { + return finishedState; + } } http://git-wip-us.apache.org/repos/asf/tez/blob/778c1f5a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java index 9e55088..f8cd10f 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java @@ -1234,67 +1234,13 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, } private static DAGState finishWithTerminationCause(DAGImpl dag) { - if(dag.terminationCause == DAGTerminationCause.DAG_KILL ){ - String diagnosticMsg = "DAG killed due to user-initiated kill." + - " failedVertices:" + dag.numFailedVertices + - " killedVertices:" + dag.numKilledVertices; - LOG.info(diagnosticMsg); - dag.addDiagnostic(diagnosticMsg); - return dag.finished(DAGState.KILLED); - } - if(dag.terminationCause == DAGTerminationCause.VERTEX_FAILURE ){ - String diagnosticMsg = "DAG failed due to vertex failure." + - " failedVertices:" + dag.numFailedVertices + - " killedVertices:" + dag.numKilledVertices; - LOG.info(diagnosticMsg); - dag.addDiagnostic(diagnosticMsg); - return dag.finished(DAGState.FAILED); - } - if(dag.terminationCause == DAGTerminationCause.COMMIT_FAILURE ){ - String diagnosticMsg = "DAG failed due to commit failure." + - " failedVertices:" + dag.numFailedVertices + - " killedVertices:" + dag.numKilledVertices; - LOG.info(diagnosticMsg); - dag.addDiagnostic(diagnosticMsg); - return dag.finished(DAGState.FAILED); - } - if(dag.terminationCause == DAGTerminationCause.VERTEX_RERUN_AFTER_COMMIT ){ - String diagnosticMsg = "DAG failed due to vertex rerun after commit." + - " failedVertices:" + dag.numFailedVertices + - " killedVertices:" + dag.numKilledVertices; - LOG.info(diagnosticMsg); - dag.addDiagnostic(diagnosticMsg); - return dag.finished(DAGState.FAILED); - } - if(dag.terminationCause == DAGTerminationCause.VERTEX_RERUN_IN_COMMITTING ){ - String diagnosticMsg = "DAG failed due to vertex rerun in commit." + - " failedVertices:" + dag.numFailedVertices + - " killedVertices:" + dag.numKilledVertices; - LOG.info(diagnosticMsg); - dag.addDiagnostic(diagnosticMsg); - return dag.finished(DAGState.FAILED); - } - if(dag.terminationCause == DAGTerminationCause.RECOVERY_FAILURE ){ - String diagnosticMsg = "DAG failed due to failure in recovery handling." + - " failedVertices:" + dag.numFailedVertices + - " killedVertices:" + dag.numKilledVertices; - LOG.info(diagnosticMsg); - dag.addDiagnostic(diagnosticMsg); - return dag.finished(DAGState.FAILED); - } - - // catch all - String diagnosticMsg = "All vertices complete, but cannot determine final state of DAG" - + ", numCompletedVertices=" + dag.numCompletedVertices - + ", numSuccessfulVertices=" + dag.numSuccessfulVertices - + ", numFailedVertices=" + dag.numFailedVertices - + ", numKilledVertices=" + dag.numKilledVertices - + ", numVertices=" + dag.numVertices - + ", commitInProgress=" + dag.commitFutures.size() - + ", terminationCause=" + dag.terminationCause; - LOG.error(diagnosticMsg); + Preconditions.checkArgument(dag.getTerminationCause() != null, "TerminationCause is not set."); + String diagnosticMsg = "DAG did not succeed due to " + dag.terminationCause + + ". failedVertices:" + dag.numFailedVertices + + " killedVertices:" + dag.numKilledVertices; + LOG.info(diagnosticMsg); dag.addDiagnostic(diagnosticMsg); - return dag.finished(DAGState.ERROR); + return dag.finished(dag.getTerminationCause().getFinishedState()); } private void updateCpuCounters() { http://git-wip-us.apache.org/repos/asf/tez/blob/778c1f5a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java index e22343b..dfa358d 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java @@ -1948,119 +1948,14 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, } } - // TODO TEZ-2248 private static VertexState finishWithTerminationCause(VertexImpl vertex) { - if(vertex.terminationCause == VertexTerminationCause.DAG_KILL ){ - vertex.setFinishTime(); - String diagnosticMsg = "Vertex killed due to user-initiated job kill. " - + "failedTasks:" - + vertex.failedTaskCount; - LOG.info(diagnosticMsg); - vertex.addDiagnostic(diagnosticMsg); - return vertex.finished(VertexState.KILLED); - } - else if(vertex.terminationCause == VertexTerminationCause.OTHER_VERTEX_FAILURE ){ - vertex.setFinishTime(); - String diagnosticMsg = "Vertex killed as other vertex failed. " - + "failedTasks:" - + vertex.failedTaskCount; - LOG.info(diagnosticMsg); - vertex.addDiagnostic(diagnosticMsg); - return vertex.finished(VertexState.KILLED); - } - else if(vertex.terminationCause == VertexTerminationCause.OWN_TASK_FAILURE ){ - if(vertex.failedTaskCount == 0){ - LOG.error("task failure accounting error. terminationCause=TASK_FAILURE but vertex.failedTaskCount == 0"); - } - vertex.setFinishTime(); - String diagnosticMsg = "Vertex failed as one or more tasks failed. " - + "failedTasks:" - + vertex.failedTaskCount; - LOG.info(diagnosticMsg); - vertex.addDiagnostic(diagnosticMsg); - return vertex.finished(VertexState.FAILED); - } - else if (vertex.terminationCause == VertexTerminationCause.INTERNAL_ERROR) { - vertex.setFinishTime(); - String diagnosticMsg = "Vertex failed/killed due to internal error. " - + "failedTasks:" - + vertex.failedTaskCount - + " killedTasks:" - + vertex.killedTaskCount; - LOG.info(diagnosticMsg); - return vertex.finished(VertexState.FAILED); - } - else if (vertex.terminationCause == VertexTerminationCause.AM_USERCODE_FAILURE) { - vertex.setFinishTime(); - String diagnosticMsg = "Vertex failed/killed due to VertexManagerPlugin/EdgeManagerPlugin failed. " - + "failedTasks:" - + vertex.failedTaskCount - + " killedTasks:" - + vertex.killedTaskCount; - LOG.info(diagnosticMsg); - return vertex.finished(VertexState.FAILED); - } - else if (vertex.terminationCause == VertexTerminationCause.ROOT_INPUT_INIT_FAILURE) { - vertex.setFinishTime(); - String diagnosticMsg = "Vertex failed/killed due to ROOT_INPUT_INIT_FAILURE failed. " - + "failedTasks:" - + vertex.failedTaskCount - + " killedTasks:" - + vertex.killedTaskCount; - LOG.info(diagnosticMsg); - return vertex.finished(VertexState.FAILED); - } - else if (vertex.terminationCause == VertexTerminationCause.COMMIT_FAILURE) { - vertex.setFinishTime(); - String diagnosticMsg = "Vertex failed/killed due to COMMIT_FAILURE failed. " - + "failedTasks:" - + vertex.failedTaskCount - + " killedTasks:" - + vertex.killedTaskCount; - LOG.info(diagnosticMsg); - return vertex.finished(VertexState.FAILED); - } - else if (vertex.terminationCause == VertexTerminationCause.VERTEX_RERUN_AFTER_COMMIT) { - vertex.setFinishTime(); - String diagnosticMsg = "Vertex failed/killed due to vertex-rerun after commit. " - + "failedTasks:" - + vertex.failedTaskCount - + " killedTasks:" - + vertex.killedTaskCount; - LOG.info(diagnosticMsg); - return vertex.finished(VertexState.FAILED); - } - else if (vertex.terminationCause == VertexTerminationCause.VERTEX_RERUN_IN_COMMITTING) { - vertex.setFinishTime(); - String diagnosticMsg = "Vertex failed/killed due to vertex-rerun in commiting. " - + "failedTasks:" - + vertex.failedTaskCount - + " killedTasks:" - + vertex.killedTaskCount; - LOG.info(diagnosticMsg); - return vertex.finished(VertexState.FAILED); - } - else if (vertex.terminationCause == VertexTerminationCause.RECOVERY_ERROR) { - vertex.setFinishTime(); - String diagnosticMsg = "Vertex failed/killed due to recovery error. " - + "failedTasks:" - + vertex.failedTaskCount - + " killedTasks:" - + vertex.killedTaskCount; - LOG.info(diagnosticMsg); - return vertex.finished(VertexState.FAILED); - } - else { - //should never occur - throw new TezUncheckedException("All tasks & commits complete, but cannot determine final state of vertex:" - + vertex.logIdentifier - + ", failedTaskCount=" + vertex.failedTaskCount - + ", killedTaskCount=" + vertex.killedTaskCount - + ", successfulTaskCount=" + vertex.succeededTaskCount - + ", completedTaskCount=" + vertex.completedTaskCount - + ", commitInProgress=" + vertex.commitFutures.size() - + ", terminationCause=" + vertex.terminationCause); - } + Preconditions.checkArgument(vertex.getTerminationCause()!= null, "TerminationCause is not set"); + String diagnosticMsg = "Vertex did not succeed due to " + vertex.getTerminationCause() + + ", failedTasks:" + vertex.failedTaskCount + + " killedTasks:" + vertex.killedTaskCount; + LOG.info(diagnosticMsg); + vertex.addDiagnostic(diagnosticMsg); + return vertex.finished(vertex.getTerminationCause().getFinishedState()); } /** @@ -2133,7 +2028,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, } catch (IOException e) { LOG.error("Failed to send vertex finished event to recovery", e); finalState = VertexState.FAILED; - this.terminationCause = VertexTerminationCause.INTERNAL_ERROR; + trySetTerminationCause(VertexTerminationCause.INTERNAL_ERROR); eventHandler.handle(new DAGEventVertexCompleted(getVertexId(), finalState)); } @@ -3750,7 +3645,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, LOG.info("Received a user code error during recovering, setting recovered" + " state to FAILED"); vertex.addDiagnostic(msg + "," + ExceptionUtils.getStackTrace(e.getCause())); - vertex.terminationCause = VertexTerminationCause.AM_USERCODE_FAILURE; + vertex.trySetTerminationCause(VertexTerminationCause.AM_USERCODE_FAILURE); vertex.recoveredState = VertexState.FAILED; return VertexState.RECOVERING; } else if (vertex.getState() == VertexState.RUNNING || vertex.getState() == VertexState.COMMITTING) {
