TEZ-2303. ConcurrentModificationException while processing recovery (zjffdu)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/73bdbb21 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/73bdbb21 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/73bdbb21 Branch: refs/heads/TEZ-2003 Commit: 73bdbb219ded569ad88caf39a56abad3b0503d08 Parents: 9e9cf99 Author: Jeff Zhang <[email protected]> Authored: Tue Apr 28 10:40:12 2015 +0800 Committer: Jeff Zhang <[email protected]> Committed: Tue Apr 28 10:40:12 2015 +0800 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/tez/dag/app/dag/impl/DAGImpl.java | 83 ++++---- .../tez/dag/app/dag/impl/TaskAttemptImpl.java | 67 ++++--- .../apache/tez/dag/app/dag/impl/TaskImpl.java | 187 ++++++++++--------- .../apache/tez/dag/app/dag/impl/VertexImpl.java | 162 ++++++++-------- 5 files changed, 260 insertions(+), 240 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/73bdbb21/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index b582b85..d6a0adf 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -318,6 +318,7 @@ TEZ-UI CHANGES (TEZ-8): Release 0.5.4: Unreleased ALL CHANGES: + TEZ-2303. ConcurrentModificationException while processing recovery. TEZ-2334. ContainerManagementProtocolProxy modifies IPC timeout conf without making a copy. TEZ-2317. Event processing backlog can result in task failures for short tasks http://git-wip-us.apache.org/repos/asf/tez/blob/73bdbb21/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java index c47a0d7..f562451 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java @@ -618,45 +618,50 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, @Override public DAGState restoreFromEvent(HistoryEvent historyEvent) { - switch (historyEvent.getEventType()) { - case DAG_INITIALIZED: - recoveredState = initializeDAG((DAGInitializedEvent) historyEvent); - recoveryInitEventSeen = true; - return recoveredState; - case DAG_STARTED: - if (!recoveryInitEventSeen) { - throw new RuntimeException("Started Event seen but" - + " no Init Event was encountered earlier"); - } - recoveryStartEventSeen = true; - this.startTime = ((DAGStartedEvent) historyEvent).getStartTime(); - recoveredState = DAGState.RUNNING; - return recoveredState; - case DAG_COMMIT_STARTED: - recoveryCommitInProgress = true; - return recoveredState; - case VERTEX_GROUP_COMMIT_STARTED: - VertexGroupCommitStartedEvent vertexGroupCommitStartedEvent = - (VertexGroupCommitStartedEvent) historyEvent; - recoveredGroupCommits.put( - vertexGroupCommitStartedEvent.getVertexGroupName(), false); - return recoveredState; - case VERTEX_GROUP_COMMIT_FINISHED: - VertexGroupCommitFinishedEvent vertexGroupCommitFinishedEvent = - (VertexGroupCommitFinishedEvent) historyEvent; - recoveredGroupCommits.put( - vertexGroupCommitFinishedEvent.getVertexGroupName(), true); - return recoveredState; - case DAG_FINISHED: - recoveryCommitInProgress = false; - DAGFinishedEvent finishedEvent = (DAGFinishedEvent) historyEvent; - setFinishTime(finishedEvent.getFinishTime()); - recoveredState = finishedEvent.getState(); - this.fullCounters = finishedEvent.getTezCounters(); - return recoveredState; - default: - throw new RuntimeException("Unexpected event received for restoring" - + " state, eventType=" + historyEvent.getEventType()); + writeLock.lock(); + try { + switch (historyEvent.getEventType()) { + case DAG_INITIALIZED: + recoveredState = initializeDAG((DAGInitializedEvent) historyEvent); + recoveryInitEventSeen = true; + return recoveredState; + case DAG_STARTED: + if (!recoveryInitEventSeen) { + throw new RuntimeException("Started Event seen but" + + " no Init Event was encountered earlier"); + } + recoveryStartEventSeen = true; + this.startTime = ((DAGStartedEvent) historyEvent).getStartTime(); + recoveredState = DAGState.RUNNING; + return recoveredState; + case DAG_COMMIT_STARTED: + recoveryCommitInProgress = true; + return recoveredState; + case VERTEX_GROUP_COMMIT_STARTED: + VertexGroupCommitStartedEvent vertexGroupCommitStartedEvent = + (VertexGroupCommitStartedEvent) historyEvent; + recoveredGroupCommits.put( + vertexGroupCommitStartedEvent.getVertexGroupName(), false); + return recoveredState; + case VERTEX_GROUP_COMMIT_FINISHED: + VertexGroupCommitFinishedEvent vertexGroupCommitFinishedEvent = + (VertexGroupCommitFinishedEvent) historyEvent; + recoveredGroupCommits.put( + vertexGroupCommitFinishedEvent.getVertexGroupName(), true); + return recoveredState; + case DAG_FINISHED: + recoveryCommitInProgress = false; + DAGFinishedEvent finishedEvent = (DAGFinishedEvent) historyEvent; + setFinishTime(finishedEvent.getFinishTime()); + recoveredState = finishedEvent.getState(); + this.fullCounters = finishedEvent.getTezCounters(); + return recoveredState; + default: + throw new RuntimeException("Unexpected event received for restoring" + + " state, eventType=" + historyEvent.getEventType()); + } + } finally { + writeLock.unlock(); } } http://git-wip-us.apache.org/repos/asf/tez/blob/73bdbb21/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java index 1af4274..1f3e1cf 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java @@ -734,39 +734,44 @@ public class TaskAttemptImpl implements TaskAttempt, @Override public TaskAttemptState restoreFromEvent(HistoryEvent historyEvent) { - switch (historyEvent.getEventType()) { - case TASK_ATTEMPT_STARTED: - { - TaskAttemptStartedEvent tEvent = (TaskAttemptStartedEvent) historyEvent; - this.launchTime = tEvent.getStartTime(); - recoveryStartEventSeen = true; - recoveredState = TaskAttemptState.RUNNING; - this.containerId = tEvent.getContainerId(); - sendEvent(createDAGCounterUpdateEventTALaunched(this)); - return recoveredState; - } - case TASK_ATTEMPT_FINISHED: - { - if (!recoveryStartEventSeen) { - throw new RuntimeException("Finished Event seen but" - + " no Started Event was encountered earlier"); + writeLock.lock(); + try { + switch (historyEvent.getEventType()) { + case TASK_ATTEMPT_STARTED: + { + TaskAttemptStartedEvent tEvent = (TaskAttemptStartedEvent) historyEvent; + this.launchTime = tEvent.getStartTime(); + recoveryStartEventSeen = true; + recoveredState = TaskAttemptState.RUNNING; + this.containerId = tEvent.getContainerId(); + sendEvent(createDAGCounterUpdateEventTALaunched(this)); + return recoveredState; + } + case TASK_ATTEMPT_FINISHED: + { + if (!recoveryStartEventSeen) { + throw new RuntimeException("Finished Event seen but" + + " no Started Event was encountered earlier"); + } + TaskAttemptFinishedEvent tEvent = (TaskAttemptFinishedEvent) historyEvent; + this.finishTime = tEvent.getFinishTime(); + this.reportedStatus.counters = tEvent.getCounters(); + this.reportedStatus.progress = 1f; + this.reportedStatus.state = tEvent.getState(); + this.terminationCause = tEvent.getTaskAttemptError() != null ? tEvent.getTaskAttemptError() + : TaskAttemptTerminationCause.UNKNOWN_ERROR; + this.diagnostics.add(tEvent.getDiagnostics()); + this.recoveredState = tEvent.getState(); + sendEvent(createDAGCounterUpdateEventTAFinished(this, tEvent.getState())); + return recoveredState; } - TaskAttemptFinishedEvent tEvent = (TaskAttemptFinishedEvent) historyEvent; - this.finishTime = tEvent.getFinishTime(); - this.reportedStatus.counters = tEvent.getCounters(); - this.reportedStatus.progress = 1f; - this.reportedStatus.state = tEvent.getState(); - this.terminationCause = tEvent.getTaskAttemptError() != null ? tEvent.getTaskAttemptError() - : TaskAttemptTerminationCause.UNKNOWN_ERROR; - this.diagnostics.add(tEvent.getDiagnostics()); - this.recoveredState = tEvent.getState(); - sendEvent(createDAGCounterUpdateEventTAFinished(this, tEvent.getState())); - return recoveredState; + default: + throw new RuntimeException("Unexpected event received for restoring" + + " state, eventType=" + historyEvent.getEventType()); + } - default: - throw new RuntimeException("Unexpected event received for restoring" - + " state, eventType=" + historyEvent.getEventType()); - + } finally { + writeLock.unlock(); } } http://git-wip-us.apache.org/repos/asf/tez/blob/73bdbb21/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java index a1eed07..91413a5 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java @@ -517,106 +517,111 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { @Override public TaskState restoreFromEvent(HistoryEvent historyEvent) { - switch (historyEvent.getEventType()) { - case TASK_STARTED: - { - TaskStartedEvent tEvent = (TaskStartedEvent) historyEvent; - recoveryStartEventSeen = true; - this.scheduledTime = tEvent.getScheduledTime(); - if (this.attempts == null - || this.attempts.isEmpty()) { - this.attempts = new LinkedHashMap<TezTaskAttemptID, TaskAttempt>(); - } - recoveredState = TaskState.SCHEDULED; - historyTaskStartGenerated = true; - taskAttemptStatus.clear(); - return recoveredState; - } - case TASK_FINISHED: - { - TaskFinishedEvent tEvent = (TaskFinishedEvent) historyEvent; - if (!recoveryStartEventSeen - && !tEvent.getState().equals(TaskState.KILLED)) { - throw new TezUncheckedException("Finished Event seen but" - + " no Started Event was encountered earlier" - + ", taskId=" + taskId - + ", finishState=" + tEvent.getState()); - } - recoveredState = tEvent.getState(); - if (tEvent.getState() == TaskState.SUCCEEDED - && tEvent.getSuccessfulAttemptID() != null) { - successfulAttempt = tEvent.getSuccessfulAttemptID(); + writeLock.lock(); + try { + switch (historyEvent.getEventType()) { + case TASK_STARTED: + { + TaskStartedEvent tEvent = (TaskStartedEvent) historyEvent; + recoveryStartEventSeen = true; + this.scheduledTime = tEvent.getScheduledTime(); + if (this.attempts == null + || this.attempts.isEmpty()) { + this.attempts = new LinkedHashMap<TezTaskAttemptID, TaskAttempt>(); + } + recoveredState = TaskState.SCHEDULED; + historyTaskStartGenerated = true; + taskAttemptStatus.clear(); + return recoveredState; } - return recoveredState; - } - case TASK_ATTEMPT_STARTED: - { - TaskAttemptStartedEvent taskAttemptStartedEvent = - (TaskAttemptStartedEvent) historyEvent; - TaskAttempt recoveredAttempt = createRecoveredTaskAttempt( - taskAttemptStartedEvent.getTaskAttemptID()); - recoveredAttempt.restoreFromEvent(taskAttemptStartedEvent); - if (LOG.isDebugEnabled()) { - LOG.debug("Adding restored attempt into known attempts map" - + ", taskAttemptId=" + taskAttemptStartedEvent.getTaskAttemptID()); + case TASK_FINISHED: + { + TaskFinishedEvent tEvent = (TaskFinishedEvent) historyEvent; + if (!recoveryStartEventSeen + && !tEvent.getState().equals(TaskState.KILLED)) { + throw new TezUncheckedException("Finished Event seen but" + + " no Started Event was encountered earlier" + + ", taskId=" + taskId + + ", finishState=" + tEvent.getState()); + } + recoveredState = tEvent.getState(); + if (tEvent.getState() == TaskState.SUCCEEDED + && tEvent.getSuccessfulAttemptID() != null) { + successfulAttempt = tEvent.getSuccessfulAttemptID(); + } + return recoveredState; } - this.attempts.put(taskAttemptStartedEvent.getTaskAttemptID(), - recoveredAttempt); - this.taskAttemptStatus.put(taskAttemptStartedEvent.getTaskAttemptID().getId(), false); - this.recoveredState = TaskState.RUNNING; - return recoveredState; - } - case TASK_ATTEMPT_FINISHED: - { - TaskAttemptFinishedEvent taskAttemptFinishedEvent = - (TaskAttemptFinishedEvent) historyEvent; - TaskAttempt taskAttempt = this.attempts.get( - taskAttemptFinishedEvent.getTaskAttemptID()); - this.taskAttemptStatus.put(taskAttemptFinishedEvent.getTaskAttemptID().getId(), true); - if (taskAttempt == null) { - LOG.warn("Received an attempt finished event for an attempt that " - + " never started or does not exist" - + ", taskAttemptId=" + taskAttemptFinishedEvent.getTaskAttemptID() - + ", taskAttemptFinishState=" + taskAttemptFinishedEvent.getState()); + case TASK_ATTEMPT_STARTED: + { + TaskAttemptStartedEvent taskAttemptStartedEvent = + (TaskAttemptStartedEvent) historyEvent; TaskAttempt recoveredAttempt = createRecoveredTaskAttempt( - taskAttemptFinishedEvent.getTaskAttemptID()); - this.attempts.put(taskAttemptFinishedEvent.getTaskAttemptID(), + taskAttemptStartedEvent.getTaskAttemptID()); + recoveredAttempt.restoreFromEvent(taskAttemptStartedEvent); + if (LOG.isDebugEnabled()) { + LOG.debug("Adding restored attempt into known attempts map" + + ", taskAttemptId=" + taskAttemptStartedEvent.getTaskAttemptID()); + } + this.attempts.put(taskAttemptStartedEvent.getTaskAttemptID(), recoveredAttempt); - if (!taskAttemptFinishedEvent.getState().equals(TaskAttemptState.KILLED)) { - throw new TezUncheckedException("Could not find task attempt" - + " when trying to recover" + this.taskAttemptStatus.put(taskAttemptStartedEvent.getTaskAttemptID().getId(), false); + this.recoveredState = TaskState.RUNNING; + return recoveredState; + } + case TASK_ATTEMPT_FINISHED: + { + TaskAttemptFinishedEvent taskAttemptFinishedEvent = + (TaskAttemptFinishedEvent) historyEvent; + TaskAttempt taskAttempt = this.attempts.get( + taskAttemptFinishedEvent.getTaskAttemptID()); + this.taskAttemptStatus.put(taskAttemptFinishedEvent.getTaskAttemptID().getId(), true); + if (taskAttempt == null) { + LOG.warn("Received an attempt finished event for an attempt that " + + " never started or does not exist" + ", taskAttemptId=" + taskAttemptFinishedEvent.getTaskAttemptID() - + ", taskAttemptFinishState" + taskAttemptFinishedEvent.getState()); + + ", taskAttemptFinishState=" + taskAttemptFinishedEvent.getState()); + TaskAttempt recoveredAttempt = createRecoveredTaskAttempt( + taskAttemptFinishedEvent.getTaskAttemptID()); + this.attempts.put(taskAttemptFinishedEvent.getTaskAttemptID(), + recoveredAttempt); + if (!taskAttemptFinishedEvent.getState().equals(TaskAttemptState.KILLED)) { + throw new TezUncheckedException("Could not find task attempt" + + " when trying to recover" + + ", taskAttemptId=" + taskAttemptFinishedEvent.getTaskAttemptID() + + ", taskAttemptFinishState" + taskAttemptFinishedEvent.getState()); + } + return recoveredState; + } + if (getUncompletedAttemptsCount() < 0) { + throw new TezUncheckedException("Invalid recovery event for attempt finished" + + ", more completions than starts encountered" + + ", taskId=" + taskId + + ", finishedAttempts=" + getFinishedAttemptsCount() + + ", incompleteAttempts=" + getUncompletedAttemptsCount()); + } + TaskAttemptState taskAttemptState = taskAttempt.restoreFromEvent( + taskAttemptFinishedEvent); + if (taskAttemptState.equals(TaskAttemptState.SUCCEEDED)) { + recoveredState = TaskState.SUCCEEDED; + successfulAttempt = taskAttempt.getID(); + } else if (taskAttemptState.equals(TaskAttemptState.FAILED)){ + failedAttempts++; + getVertex().incrementFailedTaskAttemptCount(); + successfulAttempt = null; + recoveredState = TaskState.RUNNING; // reset to RUNNING, may fail after SUCCEEDED + } else if (taskAttemptState.equals(TaskAttemptState.KILLED)) { + successfulAttempt = null; + getVertex().incrementKilledTaskAttemptCount(); + recoveredState = TaskState.RUNNING; // reset to RUNNING, may been killed after SUCCEEDED } return recoveredState; } - if (getUncompletedAttemptsCount() < 0) { - throw new TezUncheckedException("Invalid recovery event for attempt finished" - + ", more completions than starts encountered" - + ", taskId=" + taskId - + ", finishedAttempts=" + getFinishedAttemptsCount() - + ", incompleteAttempts=" + getUncompletedAttemptsCount()); - } - TaskAttemptState taskAttemptState = taskAttempt.restoreFromEvent( - taskAttemptFinishedEvent); - if (taskAttemptState.equals(TaskAttemptState.SUCCEEDED)) { - recoveredState = TaskState.SUCCEEDED; - successfulAttempt = taskAttempt.getID(); - } else if (taskAttemptState.equals(TaskAttemptState.FAILED)){ - failedAttempts++; - getVertex().incrementFailedTaskAttemptCount(); - successfulAttempt = null; - recoveredState = TaskState.RUNNING; // reset to RUNNING, may fail after SUCCEEDED - } else if (taskAttemptState.equals(TaskAttemptState.KILLED)) { - successfulAttempt = null; - getVertex().incrementKilledTaskAttemptCount(); - recoveredState = TaskState.RUNNING; // reset to RUNNING, may been killed after SUCCEEDED - } - return recoveredState; + default: + throw new RuntimeException("Unexpected event received for restoring" + + " state, eventType=" + historyEvent.getEventType()); } - default: - throw new RuntimeException("Unexpected event received for restoring" - + " state, eventType=" + historyEvent.getEventType()); + } finally { + writeLock.unlock(); } } http://git-wip-us.apache.org/repos/asf/tez/blob/73bdbb21/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java index c4619a0..987e9d3 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java @@ -1257,85 +1257,89 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, @Override public VertexState restoreFromEvent(HistoryEvent historyEvent) { - switch (historyEvent.getEventType()) { - case VERTEX_INITIALIZED: - recoveryInitEventSeen = true; - recoveredState = setupVertex((VertexInitializedEvent) historyEvent); - createTasks(); - if (LOG.isDebugEnabled()) { - LOG.debug("Recovered state for vertex after Init event" - + ", vertex=" + logIdentifier - + ", recoveredState=" + recoveredState); - } - return recoveredState; - case VERTEX_STARTED: - if (!recoveryInitEventSeen) { - throw new RuntimeException("Started Event seen but" - + " no Init Event was encountered earlier"); - } - recoveryStartEventSeen = true; - VertexStartedEvent startedEvent = (VertexStartedEvent) historyEvent; - startTimeRequested = startedEvent.getStartRequestedTime(); - startedTime = startedEvent.getStartTime(); - recoveredState = VertexState.RUNNING; - if (LOG.isDebugEnabled()) { - LOG.debug("Recovered state for vertex after Started event" - + ", vertex=" + logIdentifier - + ", recoveredState=" + recoveredState); - } - return recoveredState; - case VERTEX_PARALLELISM_UPDATED: - // TODO TEZ-1019 this should flow through setParallelism method - VertexParallelismUpdatedEvent updatedEvent = - (VertexParallelismUpdatedEvent) historyEvent; - int oldNumTasks = numTasks; - int newNumTasks = updatedEvent.getNumTasks(); - handleParallelismUpdate(newNumTasks, updatedEvent.getSourceEdgeProperties(), - updatedEvent.getRootInputSpecUpdates(), oldNumTasks); - Preconditions.checkState(this.numTasks == newNumTasks, getLogIdentifier()); - if (updatedEvent.getVertexLocationHint() != null) { - setVertexLocationHint(updatedEvent.getVertexLocationHint()); - } - stateChangeNotifier.stateChanged(vertexId, - new VertexStateUpdateParallelismUpdated(vertexName, numTasks, oldNumTasks)); - if (LOG.isDebugEnabled()) { - LOG.debug("Recovered state for vertex after parallelism updated event" - + ", vertex=" + logIdentifier - + ", recoveredState=" + recoveredState); - } - return recoveredState; - case VERTEX_COMMIT_STARTED: - recoveryCommitInProgress = true; - hasCommitter = true; - return recoveredState; - case VERTEX_FINISHED: - VertexFinishedEvent finishedEvent = (VertexFinishedEvent) historyEvent; - if (finishedEvent.isFromSummary()) { - summaryCompleteSeen = true; - } else { - vertexCompleteSeen = true; - } - numTasks = finishedEvent.getNumTasks(); - recoveryCommitInProgress = false; - recoveredState = finishedEvent.getState(); - diagnostics.add(finishedEvent.getDiagnostics()); - finishTime = finishedEvent.getFinishTime(); - // TODO counters ?? - if (LOG.isDebugEnabled()) { - LOG.debug("Recovered state for vertex after finished event" - + ", vertex=" + logIdentifier - + ", recoveredState=" + recoveredState); - } - return recoveredState; - case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED: - VertexRecoverableEventsGeneratedEvent vEvent = - (VertexRecoverableEventsGeneratedEvent) historyEvent; - this.recoveredEvents.addAll(vEvent.getTezEvents()); - return recoveredState; - default: - throw new RuntimeException("Unexpected event received for restoring" - + " state, eventType=" + historyEvent.getEventType()); - + writeLock.lock(); + try { + switch (historyEvent.getEventType()) { + case VERTEX_INITIALIZED: + recoveryInitEventSeen = true; + recoveredState = setupVertex((VertexInitializedEvent) historyEvent); + createTasks(); + if (LOG.isDebugEnabled()) { + LOG.debug("Recovered state for vertex after Init event" + + ", vertex=" + logIdentifier + + ", recoveredState=" + recoveredState); + } + return recoveredState; + case VERTEX_STARTED: + if (!recoveryInitEventSeen) { + throw new RuntimeException("Started Event seen but" + + " no Init Event was encountered earlier"); + } + recoveryStartEventSeen = true; + VertexStartedEvent startedEvent = (VertexStartedEvent) historyEvent; + startTimeRequested = startedEvent.getStartRequestedTime(); + startedTime = startedEvent.getStartTime(); + recoveredState = VertexState.RUNNING; + if (LOG.isDebugEnabled()) { + LOG.debug("Recovered state for vertex after Started event" + + ", vertex=" + logIdentifier + + ", recoveredState=" + recoveredState); + } + return recoveredState; + case VERTEX_PARALLELISM_UPDATED: + // TODO TEZ-1019 this should flow through setParallelism method + VertexParallelismUpdatedEvent updatedEvent = + (VertexParallelismUpdatedEvent) historyEvent; + int oldNumTasks = numTasks; + int newNumTasks = updatedEvent.getNumTasks(); + handleParallelismUpdate(newNumTasks, updatedEvent.getSourceEdgeProperties(), + updatedEvent.getRootInputSpecUpdates(), oldNumTasks); + Preconditions.checkState(this.numTasks == newNumTasks, getLogIdentifier()); + if (updatedEvent.getVertexLocationHint() != null) { + setVertexLocationHint(updatedEvent.getVertexLocationHint()); + } + stateChangeNotifier.stateChanged(vertexId, + new VertexStateUpdateParallelismUpdated(vertexName, numTasks, oldNumTasks)); + if (LOG.isDebugEnabled()) { + LOG.debug("Recovered state for vertex after parallelism updated event" + + ", vertex=" + logIdentifier + + ", recoveredState=" + recoveredState); + } + return recoveredState; + case VERTEX_COMMIT_STARTED: + recoveryCommitInProgress = true; + hasCommitter = true; + return recoveredState; + case VERTEX_FINISHED: + VertexFinishedEvent finishedEvent = (VertexFinishedEvent) historyEvent; + if (finishedEvent.isFromSummary()) { + summaryCompleteSeen = true; + } else { + vertexCompleteSeen = true; + } + numTasks = finishedEvent.getNumTasks(); + recoveryCommitInProgress = false; + recoveredState = finishedEvent.getState(); + diagnostics.add(finishedEvent.getDiagnostics()); + finishTime = finishedEvent.getFinishTime(); + // TODO counters ?? + if (LOG.isDebugEnabled()) { + LOG.debug("Recovered state for vertex after finished event" + + ", vertex=" + logIdentifier + + ", recoveredState=" + recoveredState); + } + return recoveredState; + case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED: + VertexRecoverableEventsGeneratedEvent vEvent = + (VertexRecoverableEventsGeneratedEvent) historyEvent; + this.recoveredEvents.addAll(vEvent.getTezEvents()); + return recoveredState; + default: + throw new RuntimeException("Unexpected event received for restoring" + + " state, eventType=" + historyEvent.getEventType()); + } + } finally { + writeLock.unlock(); } }
