Repository: tez Updated Branches: refs/heads/branch-0.6 8cbaa40c5 -> 68f17cf75
TEZ-2311. AM can hang if kill received while recovering from previous attempt (zjffdu) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/68f17cf7 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/68f17cf7 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/68f17cf7 Branch: refs/heads/branch-0.6 Commit: 68f17cf755e3d6d6f20901689e8e5f0d1303be24 Parents: 8cbaa40 Author: Jeff Zhang <[email protected]> Authored: Fri Jul 31 16:36:25 2015 +0800 Committer: Jeff Zhang <[email protected]> Committed: Fri Jul 31 16:36:25 2015 +0800 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../java/org/apache/tez/client/LocalClient.java | 6 +- .../tez/dag/api/client/DAGClientHandler.java | 2 +- ...DAGClientAMProtocolBlockingPBServerImpl.java | 8 +- .../org/apache/tez/dag/app/DAGAppMaster.java | 38 ++++- .../org/apache/tez/dag/app/RecoveryParser.java | 22 +++ .../apache/tez/dag/app/dag/impl/DAGImpl.java | 10 +- .../apache/tez/dag/app/dag/impl/VertexImpl.java | 74 +++++---- .../tez/dag/history/HistoryEventType.java | 1 + .../dag/history/events/DAGKillRequestEvent.java | 127 +++++++++++++++ tez-dag/src/main/proto/HistoryEvents.proto | 6 + .../tez/dag/app/dag/impl/TestDAGRecovery.java | 95 +++++++++++ .../dag/app/dag/impl/TestVertexRecovery.java | 158 ++++++++++++++++++- .../TestHistoryEventsProtoConversion.java | 15 ++ .../impl/TestHistoryEventJsonConversion.java | 4 + .../ats/TestHistoryEventTimelineConversion.java | 4 + 16 files changed, 527 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/68f17cf7/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 025fd7d..20d31b2 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -6,6 +6,7 @@ Release 0.6.2: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-2311. AM can hang if kill received while recovering from previous attempt. TEZ-2623. Fix module dependencies related to hadoop-auth. TEZ-2560. fix tex-ui build for maven 3.3+ TEZ-2600. When used with HDFS federation(viewfs) ,tez will throw a error http://git-wip-us.apache.org/repos/asf/tez/blob/68f17cf7/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java b/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java index c5d9bf1..3ebef46 100644 --- a/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java +++ b/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java @@ -134,7 +134,11 @@ public class LocalClient extends FrameworkClient { @Override public void killApplication(ApplicationId appId) { - clientHandler.shutdownAM(); + try { + clientHandler.shutdownAM(); + } catch (TezException e) { + throw new RuntimeException(e); + } } @Override http://git-wip-us.apache.org/repos/asf/tez/blob/68f17cf7/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java index d14ed2a..3e4b638 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java @@ -113,7 +113,7 @@ public class DAGClientHandler { return dagAppMaster.submitDAGToAppMaster(dagPlan, additionalAmResources); } - public synchronized void shutdownAM() { + public synchronized void shutdownAM() throws TezException { LOG.info("Received message to shutdown AM"); if (dagAppMaster != null) { dagAppMaster.shutdownTezAM(); http://git-wip-us.apache.org/repos/asf/tez/blob/68f17cf7/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java index c054305..7efbbe8 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java @@ -177,8 +177,12 @@ public class DAGClientAMProtocolBlockingPBServerImpl implements DAGClientAMProto if (!real.getACLManager().checkAMModifyAccess(user)) { throw new AccessControlException("User " + user + " cannot perform AM modify operation"); } - real.shutdownAM(); - return ShutdownSessionResponseProto.newBuilder().build(); + try { + real.shutdownAM(); + return ShutdownSessionResponseProto.newBuilder().build(); + } catch(TezException e) { + throw wrapException(e); + } } @Override http://git-wip-us.apache.org/repos/asf/tez/blob/68f17cf7/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index a221662..2da2a0d 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -152,6 +152,7 @@ import org.apache.tez.dag.history.HistoryEventHandler; import org.apache.tez.dag.history.events.AMLaunchedEvent; import org.apache.tez.dag.history.events.AMStartedEvent; import org.apache.tez.dag.history.events.AppLaunchedEvent; +import org.apache.tez.dag.history.events.DAGKillRequestEvent; import org.apache.tez.dag.history.events.DAGRecoveredEvent; import org.apache.tez.dag.history.events.DAGSubmittedEvent; import org.apache.tez.dag.history.utils.DAGUtils; @@ -1068,7 +1069,7 @@ public class DAGAppMaster extends AbstractService { + oldState + " new state: " + state); } - public void shutdownTezAM() { + public void shutdownTezAM() throws TezException { sessionStopped.set(true); synchronized (this) { this.taskSchedulerEventHandler.setShouldUnregisterFlag(); @@ -1077,6 +1078,11 @@ public class DAGAppMaster extends AbstractService { //send a DAG_KILL message LOG.info("Sending a kill event to the current DAG" + ", dagId=" + currentDAG.getID()); + try { + logDAGKillRequestEvent(currentDAG.getID(), true); + } catch (IOException e) { + throw new TezException(e); + } sendEvent(new DAGEvent(currentDAG.getID(), DAGEventType.DAG_KILL)); } else { LOG.info("No current running DAG, shutting down the AM"); @@ -1088,6 +1094,12 @@ public class DAGAppMaster extends AbstractService { } } + void logDAGKillRequestEvent(TezDAGID dagId, boolean isSessionStopped) throws IOException { + DAGKillRequestEvent killRequestEvent = new DAGKillRequestEvent(dagId, clock.getTime(), isSessionStopped); + historyEventHandler.handleCriticalEvent( + new DAGHistoryEvent(dagId, killRequestEvent)); + } + public String submitDAGToAppMaster(DAGPlan dagPlan, Map<String, LocalResource> additionalResources) throws TezException { if (sessionStopped.get()) { @@ -1123,7 +1135,12 @@ public class DAGAppMaster extends AbstractService { } @SuppressWarnings("unchecked") - public void tryKillDAG(DAG dag){ + public void tryKillDAG(DAG dag) throws TezException { + try { + logDAGKillRequestEvent(dag.getID(), false); + } catch (IOException e) { + throw new TezException(e); + } dispatcher.getEventHandler().handle(new DAGEvent(dag.getID(), DAGEventType.DAG_KILL)); } @@ -1635,7 +1652,14 @@ public class DAGAppMaster extends AbstractService { amResources.putAll(recoveredDAGData.cumulativeAdditionalResources); cumulativeAdditionalResources.putAll(recoveredDAGData.cumulativeAdditionalResources); } - + + if (recoveredDAGData.isSessionStopped) { + LOG.info("AM crashed when shutting down in the previous attempt" + + ", continue the shutdown and recover it to SUCCEEDED"); + this.sessionStopped.set(true); + return; + } + if (recoveredDAGData.isCompleted || recoveredDAGData.nonRecoverable) { LOG.info("Found previous DAG in completed or non-recoverable state" @@ -1701,7 +1725,11 @@ public class DAGAppMaster extends AbstractService { this.dagSubmissionTimer.scheduleAtFixedRate(new TimerTask() { @Override public void run() { - checkAndHandleSessionTimeout(); + try { + checkAndHandleSessionTimeout(); + } catch (TezException e) { + LOG.error("Error when check AM session timeout", e); + } } }, sessionTimeoutInterval, sessionTimeoutInterval / 10); } @@ -1854,7 +1882,7 @@ public class DAGAppMaster extends AbstractService { } } - private synchronized void checkAndHandleSessionTimeout() { + private synchronized void checkAndHandleSessionTimeout() throws TezException { if (EnumSet.of(DAGAppMasterState.RUNNING, DAGAppMasterState.RECOVERING).contains(this.state) || sessionStopped.get()) { http://git-wip-us.apache.org/repos/asf/tez/blob/68f17cf7/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java b/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java index 767f036..02b62ca 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java @@ -50,6 +50,7 @@ import org.apache.tez.dag.history.events.ContainerStoppedEvent; import org.apache.tez.dag.history.events.DAGCommitStartedEvent; import org.apache.tez.dag.history.events.DAGFinishedEvent; import org.apache.tez.dag.history.events.DAGInitializedEvent; +import org.apache.tez.dag.history.events.DAGKillRequestEvent; import org.apache.tez.dag.history.events.DAGStartedEvent; import org.apache.tez.dag.history.events.DAGSubmittedEvent; import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent; @@ -105,6 +106,7 @@ public class RecoveryParser { public DAGState dagState = null; public boolean isCompleted = false; public boolean nonRecoverable = false; + public boolean isSessionStopped = false; public String reason = null; public Map<String, LocalResource> cumulativeAdditionalResources = null; } @@ -164,6 +166,9 @@ public class RecoveryParser { case DAG_FINISHED: event = new DAGFinishedEvent(); break; + case DAG_KILL_REQUEST: + event = new DAGKillRequestEvent(); + break; case CONTAINER_LAUNCHED: event = new ContainerLaunchedEvent(); break; @@ -350,6 +355,11 @@ public class RecoveryParser { dagFinishedEvent.fromSummaryProtoStream(proto); dagState = dagFinishedEvent.getState(); break; + case DAG_KILL_REQUEST: + DAGKillRequestEvent killRequestEvent = new DAGKillRequestEvent(); + killRequestEvent.fromSummaryProtoStream(proto); + bufferedSummaryEvents.add(killRequestEvent); + break; case DAG_COMMIT_STARTED: dagCommitCompleted = false; break; @@ -657,6 +667,13 @@ public class RecoveryParser { recoveredDAGData.recoveredDAG.restoreFromEvent(event); break; } + case DAG_KILL_REQUEST: + { + LOG.info("Recovering from event" + + ", eventType=" + eventType + + ", event=" + event.toString()); + break; + } case DAG_FINISHED: { LOG.info("Recovering from event" @@ -836,6 +853,11 @@ public class RecoveryParser { vertex.restoreFromEvent(vertexFinishedEvent); } break; + case DAG_KILL_REQUEST: + DAGKillRequestEvent killRequestEvent = (DAGKillRequestEvent)bufferedEvent; + recoveredDAGData.isSessionStopped = killRequestEvent.isSessionStopped(); + recoveredDAGData.recoveredDAG.restoreFromEvent(bufferedEvent); + break; default: throw new RuntimeException("Invalid data found in buffered summary events" + ", unknown event type " http://git-wip-us.apache.org/repos/asf/tez/blob/68f17cf7/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java index 6073525..c477b5d 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java @@ -74,6 +74,7 @@ import org.apache.tez.dag.api.records.DAGProtos.PlanGroupInputEdgeInfo; import org.apache.tez.dag.api.records.DAGProtos.PlanVertexGroupInfo; import org.apache.tez.dag.api.records.DAGProtos.VertexPlan; import org.apache.tez.dag.app.AppContext; +import org.apache.tez.dag.app.DAGAppMasterState; import org.apache.tez.dag.app.TaskAttemptListener; import org.apache.tez.dag.app.TaskHeartbeatHandler; import org.apache.tez.dag.app.dag.DAG; @@ -381,6 +382,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, Map<String, VertexGroupInfo> vertexGroups = Maps.newHashMap(); Map<String, List<VertexGroupInfo>> vertexGroupInfo = Maps.newHashMap(); private DAGState recoveredState = DAGState.NEW; + @VisibleForTesting boolean recoveryCommitInProgress = false; Map<String, Boolean> recoveredGroupCommits = new HashMap<String, Boolean>(); @@ -544,6 +546,10 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, recoveredGroupCommits.put( vertexGroupCommitFinishedEvent.getVertexGroupName(), true); return recoveredState; + case DAG_KILL_REQUEST: + trySetTerminationCause(DAGTerminationCause.DAG_KILL); + this.recoveredState = DAGState.KILLED; + return recoveredState; case DAG_FINISHED: recoveryCommitInProgress = false; DAGFinishedEvent finishedEvent = (DAGFinishedEvent) historyEvent; @@ -965,7 +971,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, //notify the eventhandler of state change if (oldState != getInternalState()) { LOG.info(dagId + " transitioned from " + oldState + " to " - + getInternalState()); + + getInternalState() + " due to event " + event.getType()); } } @@ -1525,7 +1531,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, case FAILED: case KILLED: // Completed - + // Recover all other data for all vertices // send recover event to all vertices with a final end state for (Vertex v : dag.vertices.values()) { http://git-wip-us.apache.org/repos/asf/tez/blob/68f17cf7/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java index 27a6eed..463da8b 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java @@ -2317,7 +2317,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, } } // Update tasks with their input payloads as needed - vertex.eventHandler.handle(new VertexEvent(vertex.vertexId, VertexEventType.V_START)); if (vertex.getInputVertices().isEmpty()) { @@ -2375,10 +2374,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, } break; case SUCCEEDED: - case FAILED: - case KILLED: - if (vertex.recoveredState == VertexState.SUCCEEDED - && vertex.hasCommitter + if (vertex.hasCommitter && vertex.summaryCompleteSeen && !vertex.vertexCompleteSeen) { String msg = "Cannot recover vertex as all recovery events not" + " found, vertex=" + vertex.logIdentifier @@ -2393,18 +2389,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, vertex.tasksNotYetScheduled = false; // recover tasks if (vertex.tasks != null && vertex.numTasks != 0) { - TaskState taskState = TaskState.KILLED; - switch (vertex.recoveredState) { - case SUCCEEDED: - taskState = TaskState.SUCCEEDED; - break; - case KILLED: - taskState = TaskState.KILLED; - break; - case FAILED: - taskState = TaskState.FAILED; - break; - } + TaskState taskState = TaskState.SUCCEEDED; for (Task task : vertex.tasks.values()) { vertex.eventHandler.handle( new TaskEventRecoverTask(task.getTaskId(), @@ -2426,6 +2411,25 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, } } break; + case FAILED: + case KILLED: + // vertex may be killed/failed before its tasks are scheduled. so here just recover vertex + // to the recovered state without waiting for its tasks' feedback and recover tasks to + // the corresponding state without recover its data. + if (vertex.tasks != null && vertex.numTasks != 0) { + TaskState taskState = TaskState.FAILED; + if (vertex.recoveredState == VertexState.KILLED) { + taskState = TaskState.KILLED; + } + for (Task task : vertex.tasks.values()) { + vertex.eventHandler.handle( + new TaskEventRecoverTask(task.getTaskId(), + taskState, false)); + } + } + endState = vertex.recoveredState; + vertex.finished(endState); + break; default: LOG.warn("Invalid recoveredState found when trying to recover" + " vertex" @@ -2793,24 +2797,11 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, } break; case SUCCEEDED: - case FAILED: - case KILLED: - vertex.tasksNotYetScheduled = false; + // recover tasks assert vertex.tasks.size() == vertex.numTasks; if (vertex.tasks != null && vertex.numTasks != 0) { - TaskState taskState = TaskState.KILLED; - switch (vertex.recoveredState) { - case SUCCEEDED: - taskState = TaskState.SUCCEEDED; - break; - case KILLED: - taskState = TaskState.KILLED; - break; - case FAILED: - taskState = TaskState.FAILED; - break; - } + TaskState taskState = TaskState.SUCCEEDED; for (Task task : vertex.tasks.values()) { vertex.eventHandler.handle( new TaskEventRecoverTask(task.getTaskId(), @@ -2832,6 +2823,25 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, vertex.finished(endState); } break; + case FAILED: + case KILLED: + // vertex may be killed/failed before its tasks are scheduled. so here just recover vertex + // to the recovered state without waiting for its tasks' feedback and recover tasks to + // the corresponding state without recover its data. + if (vertex.tasks != null && vertex.numTasks != 0) { + TaskState taskState = TaskState.FAILED; + if (vertex.recoveredState == VertexState.KILLED) { + taskState = TaskState.KILLED; + } + for (Task task : vertex.tasks.values()) { + vertex.eventHandler.handle( + new TaskEventRecoverTask(task.getTaskId(), + taskState, false)); + } + } + endState = vertex.recoveredState; + vertex.finished(endState); + break; default: LOG.warn("Invalid recoveredState found when trying to recover" + " vertex, recoveredState=" + vertex.recoveredState); http://git-wip-us.apache.org/repos/asf/tez/blob/68f17cf7/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java index 6949d21..d791d9e 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java @@ -26,6 +26,7 @@ public enum HistoryEventType { DAG_INITIALIZED, DAG_STARTED, DAG_FINISHED, + DAG_KILL_REQUEST, VERTEX_INITIALIZED, VERTEX_STARTED, VERTEX_PARALLELISM_UPDATED, http://git-wip-us.apache.org/repos/asf/tez/blob/68f17cf7/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGKillRequestEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGKillRequestEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGKillRequestEvent.java new file mode 100644 index 0000000..525e361 --- /dev/null +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGKillRequestEvent.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tez.dag.history.events; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import org.apache.tez.dag.history.HistoryEvent; +import org.apache.tez.dag.history.HistoryEventType; +import org.apache.tez.dag.history.SummaryEvent; +import org.apache.tez.dag.records.TezDAGID; +import org.apache.tez.dag.recovery.records.RecoveryProtos; +import org.apache.tez.dag.recovery.records.RecoveryProtos.SummaryEventProto; +import org.apache.tez.dag.utils.ProtoUtils; + +public class DAGKillRequestEvent implements HistoryEvent, SummaryEvent { + + private TezDAGID dagID; + private long killRequestTime; + private boolean isSessionStopped; + + public DAGKillRequestEvent() { + } + + public DAGKillRequestEvent(TezDAGID dagID, long killRequestTime, boolean isSessionStopped) { + this.dagID = dagID; + this.killRequestTime = killRequestTime; + this.isSessionStopped = isSessionStopped; + } + + @Override + public HistoryEventType getEventType() { + return HistoryEventType.DAG_KILL_REQUEST; + } + + @Override + public boolean isRecoveryEvent() { + return true; + } + + @Override + public boolean isHistoryEvent() { + return false; + } + + @Override + public void toProtoStream(OutputStream outputStream) throws IOException { + toProto().writeDelimitedTo(outputStream); + } + + public RecoveryProtos.DAGKillRequestProto toProto() { + return RecoveryProtos.DAGKillRequestProto.newBuilder() + .setDagId(dagID.toString()) + .setKillRequestTime(killRequestTime) + .setIsSessionStopped(isSessionStopped) + .build(); + } + + @Override + public void fromProtoStream(InputStream inputStream) throws IOException { + RecoveryProtos.DAGKillRequestProto proto = + RecoveryProtos.DAGKillRequestProto.parseDelimitedFrom(inputStream); + if (proto == null) { + throw new IOException("No data found in stream"); + } + fromProto(proto); + } + + public void fromProto(RecoveryProtos.DAGKillRequestProto proto) { + this.dagID = TezDAGID.fromString(proto.getDagId()); + this.killRequestTime = proto.getKillRequestTime(); + this.isSessionStopped = proto.getIsSessionStopped(); + } + + @Override + public void toSummaryProtoStream(OutputStream outputStream) + throws IOException { + ProtoUtils.toSummaryEventProto(dagID, killRequestTime, + HistoryEventType.DAG_KILL_REQUEST, isSessionStopped ? new byte[]{1} : new byte[]{0}) + .writeDelimitedTo(outputStream); + } + + @Override + public void fromSummaryProtoStream(SummaryEventProto proto) + throws IOException { + this.dagID = TezDAGID.fromString(proto.getDagId()); + this.killRequestTime = proto.getTimestamp(); + if (proto.getEventPayload().byteAt(0) == 1) { + this.isSessionStopped = true; + } else { + this.isSessionStopped = false; + } + } + + @Override + public boolean writeToRecoveryImmediately() { + return false; + } + + public TezDAGID getDagID() { + return dagID; + } + + public long getKillRequestTime() { + return killRequestTime; + } + + public boolean isSessionStopped() { + return isSessionStopped; + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/68f17cf7/tez-dag/src/main/proto/HistoryEvents.proto ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/proto/HistoryEvents.proto b/tez-dag/src/main/proto/HistoryEvents.proto index 45e9582..5ff017a 100644 --- a/tez-dag/src/main/proto/HistoryEvents.proto +++ b/tez-dag/src/main/proto/HistoryEvents.proto @@ -77,6 +77,12 @@ message DAGFinishedProto { optional TezCountersProto counters = 5; } +message DAGKillRequestProto { + optional string dag_id = 1; + optional int64 kill_request_time = 2; + optional bool isSessionStopped = 3; +} + message VertexInitializedProto { optional string vertex_name = 1; optional string vertex_id = 2; http://git-wip-us.apache.org/repos/asf/tez/blob/68f17cf7/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java index a0d5fb5..d6ef9c0 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java @@ -29,6 +29,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.event.AbstractEvent; import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.util.SystemClock; @@ -39,6 +40,7 @@ import org.apache.tez.dag.app.ClusterInfo; import org.apache.tez.dag.app.TaskAttemptListener; import org.apache.tez.dag.app.TaskHeartbeatHandler; import org.apache.tez.dag.app.dag.DAGState; +import org.apache.tez.dag.app.dag.DAGTerminationCause; import org.apache.tez.dag.app.dag.VertexState; import org.apache.tez.dag.app.dag.event.DAGAppMasterEventDAGFinished; import org.apache.tez.dag.app.dag.event.DAGEvent; @@ -49,6 +51,7 @@ import org.apache.tez.dag.app.dag.event.VertexEventRecoverVertex; import org.apache.tez.dag.history.events.DAGCommitStartedEvent; import org.apache.tez.dag.history.events.DAGFinishedEvent; import org.apache.tez.dag.history.events.DAGInitializedEvent; +import org.apache.tez.dag.history.events.DAGKillRequestEvent; import org.apache.tez.dag.history.events.DAGStartedEvent; import org.apache.tez.dag.history.events.VertexGroupCommitFinishedEvent; import org.apache.tez.dag.history.events.VertexGroupCommitStartedEvent; @@ -155,6 +158,10 @@ public class TestDAGRecovery { assertEquals(tezCounters, dag.fullCounters); } + private void restoreFromDAGKillRequestEvent() { + dag.restoreFromEvent(new DAGKillRequestEvent(dag.getID(), 0L, false)); + } + /** * New -> RecoverTransition */ @@ -174,6 +181,18 @@ public class TestDAGRecovery { } /** + * New -> restoreFromDAGKillRequested -> RecoverTransition + */ + @Test(timeout = 5000) + public void testDAGRecovery_FromNewToKilled() { + restoreFromDAGKillRequestEvent(); + assertNewState(); + dag.handle(new DAGEventRecoverEvent(dagId, new ArrayList<URL>())); + assertEquals(DAGState.KILLED, dag.getState()); + assertEquals(DAGTerminationCause.DAG_KILL, dag.getTerminationCause()); + } + + /** * restoreFromDAGInitializedEvent -> RecoverTransition */ @Test(timeout = 5000) @@ -198,6 +217,18 @@ public class TestDAGRecovery { } /** + * restoreFromDAGInitializedEvent -> restoreFromDAGKillRequested -> RecoverTransition + */ + @Test(timeout = 5000) + public void testDAGRecovery_FromInitedToKilled() { + restoreFromDAGInitializedEvent(); + restoreFromDAGKillRequestEvent(); + dag.handle(new DAGEventRecoverEvent(dagId, new ArrayList<URL>())); + assertEquals(DAGState.KILLED, dag.getState()); + assertEquals(DAGTerminationCause.DAG_KILL, dag.getTerminationCause()); + } + + /** * restoreFromDAGInitializedEvent -> restoreFromDAGStartedEvent -> * RecoverTransition */ @@ -224,6 +255,34 @@ public class TestDAGRecovery { } /** + * restoreFromDAGInitializedEvent -> restoreFromDAGStartedEvent -> restoreFromDAGKillRequested + * RecoverTransition + */ + @Test(timeout = 5000) + public void testDAGRecovery_FromStartedtoKilled() { + assertNewState(); + restoreFromDAGInitializedEvent(); + restoreFromDAGStartedEvent(); + restoreFromDAGKillRequestEvent(); + dag.handle(new DAGEventRecoverEvent(dagId, new ArrayList<URL>())); + assertEquals(DAGState.KILLED, dag.getState()); + assertEquals(DAGTerminationCause.DAG_KILL, dag.getTerminationCause()); + // send recover event to all the vertices with desired state of KILLED + ArgumentCaptor<AbstractEvent> eventCaptor = + ArgumentCaptor.forClass(AbstractEvent.class); + verify(mockEventHandler, times(7)).handle(eventCaptor.capture()); + List<AbstractEvent> events = eventCaptor.getAllValues(); + assertEquals(7, events.size()); + for (int i=0;i<6;++i) { + AbstractEvent vEvent = events.get(i); + assertTrue(vEvent instanceof VertexEventRecoverVertex); + VertexEventRecoverVertex recoverEvent = (VertexEventRecoverVertex) vEvent; + assertEquals(VertexState.KILLED, recoverEvent.getDesiredState()); + } + assertTrue(events.get(6) instanceof DAGAppMasterEventDAGFinished); + } + + /** * restoreFromDAGInitializedEvent -> restoreFromDAGStartedEvent -> * restoreFromDAGFinishedEvent (SUCCEEDED) -> RecoverTransition */ @@ -323,6 +382,42 @@ public class TestDAGRecovery { } /** + * restoreFromDAGInitializedEvent -> restoreFromDAGStartedEvent + * --> restoreFromDAGKillRequestEvent --> + * restoreFromDAGFinishedEvent -> RecoverTransition + */ + @Test(timeout = 5000) + public void testDAGRecovery_Finished_KILLED_WithKillRequest() { + // same behavior as without DAGKillRequestEvent because DAGFinishedEvent is seen + assertNewState(); + restoreFromDAGInitializedEvent(); + restoreFromDAGStartedEvent(); + restoreFromDAGKillRequestEvent(); + restoreFromDAGFinishedEvent(DAGState.KILLED); + + dag.handle(new DAGEventRecoverEvent(dagId, new ArrayList<URL>())); + assertEquals(DAGState.KILLED, dag.getState()); + assertEquals(tezCounters, dag.getAllCounters()); + // recover all the vertices to KILLED + ArgumentCaptor<Event> eventCaptor = ArgumentCaptor.forClass(Event.class); + verify(mockEventHandler, times(7)).handle(eventCaptor.capture()); + List<Event> events = eventCaptor.getAllValues(); + int i = 0; + for (; i < 6; ++i) { + assertTrue(events.get(i) instanceof VertexEventRecoverVertex); + VertexEventRecoverVertex recoverEvent = + (VertexEventRecoverVertex) events.get(i); + assertEquals(VertexState.KILLED, recoverEvent.getDesiredState()); + } + + // send DAGAppMasterEventDAGFinished at last + assertTrue(events.get(i) instanceof DAGAppMasterEventDAGFinished); + DAGAppMasterEventDAGFinished dagFinishedEvent = + (DAGAppMasterEventDAGFinished) events.get(i); + assertEquals(DAGState.KILLED, dagFinishedEvent.getDAGState()); + } + + /** * restoreFromDAGInitializedEvent -> restoreFromDAGStartedEvent -> -> * restoreFromDAGFinishedEvent -> RecoverTransition */ http://git-wip-us.apache.org/repos/asf/tez/blob/68f17cf7/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java index 6d92d5e..658872e 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java @@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.util.SystemClock; import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.api.TezUncheckedException; +import org.apache.tez.dag.api.oldrecords.TaskState; import org.apache.tez.dag.api.records.DAGProtos; import org.apache.tez.dag.api.records.DAGProtos.DAGPlan; import org.apache.tez.dag.api.records.DAGProtos.EdgePlan; @@ -56,9 +57,13 @@ import org.apache.tez.dag.app.AppContext; import org.apache.tez.dag.app.ClusterInfo; import org.apache.tez.dag.app.TaskAttemptListener; import org.apache.tez.dag.app.TaskHeartbeatHandler; +import org.apache.tez.dag.app.dag.DAGState; +import org.apache.tez.dag.app.dag.Task; import org.apache.tez.dag.app.dag.VertexState; import org.apache.tez.dag.app.dag.VertexTerminationCause; +import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType; import org.apache.tez.dag.app.dag.event.DAGEvent; +import org.apache.tez.dag.app.dag.event.DAGEventRecoverEvent; import org.apache.tez.dag.app.dag.event.DAGEventType; import org.apache.tez.dag.app.dag.event.TaskAttemptEvent; import org.apache.tez.dag.app.dag.event.TaskAttemptEventType; @@ -72,6 +77,8 @@ import org.apache.tez.dag.app.dag.event.VertexEventType; import org.apache.tez.dag.app.dag.impl.AMUserCodeException.Source; import org.apache.tez.dag.app.dag.impl.TestVertexImpl.CountingOutputCommitter; import org.apache.tez.dag.history.HistoryEventType; +import org.apache.tez.dag.history.events.DAGInitializedEvent; +import org.apache.tez.dag.history.events.DAGStartedEvent; import org.apache.tez.dag.history.events.VertexRecoverableEventsGeneratedEvent; import org.apache.tez.dag.history.events.VertexFinishedEvent; import org.apache.tez.dag.history.events.VertexInitializedEvent; @@ -401,6 +408,7 @@ public class TestVertexRecovery { public void setUp() throws IOException { dispatcher = new DrainDispatcher(); + dispatcher.register(DAGAppMasterEventType.class, mock(EventHandler.class)); dispatcher.register(DAGEventType.class, mock(EventHandler.class)); vertexEventHandler = new VertexEventHanlder(); dispatcher.register(VertexEventType.class, vertexEventHandler); @@ -423,7 +431,8 @@ public class TestVertexRecovery { ClusterInfo clusterInfo = new ClusterInfo(Resource.newInstance(8192,10)); doReturn(clusterInfo).when(mockAppContext).getClusterInfo(); - dag.handle(new DAGEvent(dagId, DAGEventType.DAG_INIT)); + dag.restoreFromEvent(new DAGInitializedEvent(dag.getID(), 0L, "user", "dagName", null)); + dag.restoreFromEvent(new DAGStartedEvent(dag.getID(), 0L, "user", "dagName")); LOG.info("finish setUp"); } @@ -869,6 +878,33 @@ public class TestVertexRecovery { @Test(timeout = 5000) public void testRecovery_VertexManagerErrorOnRecovery() { + // In order to simulate the behavior that VertexManagerError happens in recovering stage, need to start the recovering from + // vertex and disable the the eventhandling of DAG (use mock here). + dispatcher = new DrainDispatcher(); + dispatcher.register(DAGEventType.class, mock(EventHandler.class)); + vertexEventHandler = new VertexEventHanlder(); + dispatcher.register(VertexEventType.class, vertexEventHandler); + taskEventHandler = new TaskEventHandler(); + dispatcher.register(TaskEventType.class, taskEventHandler); + dispatcher.register(TaskAttemptEventType.class, + new TaskAttemptEventHandler()); + dispatcher.init(new Configuration()); + dispatcher.start(); + mockAppContext = mock(AppContext.class, RETURNS_DEEP_STUBS); + DAGPlan dagPlan = createDAGPlan(); + dag = + new DAGImpl(dagId, new Configuration(), dagPlan, + dispatcher.getEventHandler(), mock(TaskAttemptListener.class), + new Credentials(), new SystemClock(), user, + mock(TaskHeartbeatHandler.class), mockAppContext); + when(mockAppContext.getCurrentDAG()).thenReturn(dag); + ClusterInfo clusterInfo = new ClusterInfo(Resource.newInstance(8192,10)); + doReturn(clusterInfo).when(mockAppContext).getClusterInfo(); + dag.restoreFromEvent(new DAGInitializedEvent(dag.getID(), 0L, "user", "dagName", null)); + dag.restoreFromEvent(new DAGStartedEvent(dag.getID(), 0L, "user", "dagName")); + LOG.info("finish setUp"); + + /////////////////// Start the recover //////////////////////// VertexImpl vertex1 = (VertexImpl) dag.getVertex("vertex1"); restoreFromInitializedEvent(vertex1); vertex1.handle(new VertexEventRecoverVertex(vertex1.getVertexId(), @@ -1115,4 +1151,124 @@ public class TestVertexRecovery { assertTaskRecoveredEventSent(vertex2); assertTaskRecoveredEventSent(vertex3); } + + /** + * vertex1 (New) -> restoreFromInitialized -> restoreFromVertexStarted -> + * restoreFromVertexFinished (KILLED) + * vertex2 (New) -> restoreFromInitialized -> restoreFromVertexStarted -> + * restoreFromVertexFinished (KILLED) + * vertex3 (New) -> restoreFromInitialized -> restoreFromVertexStarted -> + * restoreFromVertexFinished (KILLED) + */ + @Test(timeout = 5000) + public void testRecovery_KilledBeforeTaskStarted() { + VertexImpl vertex1 = (VertexImpl) dag.getVertex("vertex1"); + restoreFromInitializedEvent(vertex1); + VertexState recoveredState = vertex1.restoreFromEvent(new VertexStartedEvent(vertex1.getVertexId(), + initRequestedTime + 100L, initRequestedTime + 200L)); + recoveredState = vertex1.restoreFromEvent(new VertexFinishedEvent(vertex1.getVertexId(), + "vertex1", 1, initRequestedTime, initedTime, initRequestedTime + 300L, + initRequestedTime + 400L, initRequestedTime + 500L, + VertexState.KILLED, "", new TezCounters(), new VertexStats(), null)); + assertEquals(VertexState.KILLED, recoveredState); + + VertexImpl vertex2 = (VertexImpl) dag.getVertex("vertex2"); + restoreFromInitializedEvent(vertex2); + recoveredState = vertex2.restoreFromEvent(new VertexStartedEvent(vertex2.getVertexId(), + initRequestedTime + 100L, initRequestedTime + 200L)); + recoveredState = vertex2.restoreFromEvent(new VertexFinishedEvent(vertex1.getVertexId(), + "vertex2", 1, initRequestedTime, initedTime, initRequestedTime + 300L, + initRequestedTime + 400L, initRequestedTime + 500L, + VertexState.KILLED, "", new TezCounters(), new VertexStats(), null)); + assertEquals(VertexState.KILLED, recoveredState); + + VertexImpl vertex3 = (VertexImpl) dag.getVertex("vertex3"); + restoreFromInitializedEvent(vertex3); + recoveredState = vertex3.restoreFromEvent(new VertexStartedEvent(vertex3.getVertexId(), + initRequestedTime + 100L, initRequestedTime + 200L)); + recoveredState = vertex3.restoreFromEvent(new VertexFinishedEvent(vertex3.getVertexId(), + "vertex3", 1, initRequestedTime, initedTime, initRequestedTime + 300L, + initRequestedTime + 400L, initRequestedTime + 500L, + VertexState.KILLED, "", new TezCounters(), new VertexStats(), null)); + assertEquals(VertexState.KILLED, recoveredState); + + // start the recovering, send RecoverEvent to its root vertices (v1, v2) + dag.handle(new DAGEventRecoverEvent(dag.getID(), null)); + dispatcher.await(); + // recover v1 to KILLED directly and also its tasks are recovered to KILLED + assertEquals(VertexState.KILLED, vertex1.getState()); + for (Task task : vertex1.tasks.values()) { + assertEquals(TaskState.KILLED, task.getState()); + } + // recover v2 to KILLED directly and also its tasks are recovered to KILLED + assertEquals(VertexState.KILLED, vertex2.getState()); + for (Task task : vertex2.tasks.values()) { + assertEquals(TaskState.KILLED, task.getState()); + } + // recover v3 to KILLED directly and also its tasks are recovered to KILLED + assertEquals(VertexState.KILLED, vertex3.getState()); + for (Task task : vertex3.tasks.values()) { + assertEquals(TaskState.KILLED, task.getState()); + } + } + + /** + * vertex1 (New) -> restoreFromInitialized -> restoreFromVertexStarted -> + * restoreFromVertexFinished (FAILED) + * vertex2 (New) -> restoreFromInitialized -> restoreFromVertexStarted + * vertex3 (New) -> restoreFromInitialized -> restoreFromVertexStarted -> + * restoreFromVertexFinished (FAILED) + */ + @Test(timeout = 5000) + public void testRecovery_FailedBeforeTaskStarted() { + VertexImpl vertex1 = (VertexImpl) dag.getVertex("vertex1"); + restoreFromInitializedEvent(vertex1); + VertexState recoveredState = vertex1.restoreFromEvent(new VertexStartedEvent(vertex1.getVertexId(), + initRequestedTime + 100L, initRequestedTime + 200L)); + recoveredState = vertex1.restoreFromEvent(new VertexFinishedEvent(vertex1.getVertexId(), + "vertex1", 1, initRequestedTime, initedTime, initRequestedTime + 300L, + initRequestedTime + 400L, initRequestedTime + 500L, + VertexState.FAILED, "", new TezCounters(), new VertexStats(), null)); + assertEquals(VertexState.FAILED, recoveredState); + + VertexImpl vertex2 = (VertexImpl) dag.getVertex("vertex2"); + restoreFromInitializedEvent(vertex2); + recoveredState = vertex2.restoreFromEvent(new VertexStartedEvent(vertex2.getVertexId(), + initRequestedTime + 100L, initRequestedTime + 200L)); + recoveredState = vertex2.restoreFromEvent(new VertexFinishedEvent(vertex2.getVertexId(), + "vertex2", 1, initRequestedTime, initedTime, initRequestedTime + 300L, + initRequestedTime + 400L, initRequestedTime + 500L, + VertexState.FAILED, "", new TezCounters(), new VertexStats(), null)); + assertEquals(VertexState.FAILED, recoveredState); + + VertexImpl vertex3 = (VertexImpl) dag.getVertex("vertex3"); + restoreFromInitializedEvent(vertex3); + recoveredState = vertex3.restoreFromEvent(new VertexStartedEvent(vertex3.getVertexId(), + initRequestedTime + 100L, initRequestedTime + 200L)); + recoveredState = vertex3.restoreFromEvent(new VertexFinishedEvent(vertex3.getVertexId(), + "vertex3", 1, initRequestedTime, initedTime, initRequestedTime + 300L, + initRequestedTime + 400L, initRequestedTime + 500L, + VertexState.FAILED, "", new TezCounters(), new VertexStats(), null)); + assertEquals(VertexState.FAILED, recoveredState); + + // start the recovering from DAG + dag.handle(new DAGEventRecoverEvent(dag.getID(), null)); + dispatcher.await(); + // recover v1 to KILLED directly and also its tasks are recovered to KILLED + assertEquals(VertexState.FAILED, vertex1.getState()); + for (Task task : vertex1.tasks.values()) { + assertEquals(TaskState.FAILED, task.getState()); + } + // recover v2 to KILLED directly and also its tasks are recovered to KILLED + assertEquals(VertexState.FAILED, vertex2.getState()); + for (Task task : vertex2.tasks.values()) { + assertEquals(TaskState.FAILED, task.getState()); + } + + // recover v3 to KILLED directly and also its tasks are recovered to KILLED + assertEquals(VertexState.FAILED, vertex3.getState()); + for (Task task : vertex3.tasks.values()) { + assertEquals(TaskState.FAILED, task.getState()); + } + } } http://git-wip-us.apache.org/repos/asf/tez/blob/68f17cf7/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java index 57545d9..4f4dfe7 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java @@ -207,6 +207,18 @@ public class TestHistoryEventsProtoConversion { logEvents(event, deserializedEvent); } + private void testDAGKillRequestEvent() throws Exception { + DAGKillRequestEvent event = + new DAGKillRequestEvent(TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 100334l,false); + DAGKillRequestEvent deserializedEvent = (DAGKillRequestEvent) + testProtoConversion(event); + Assert.assertEquals(event.getDagID(), + deserializedEvent.getDagID()); + Assert.assertEquals(event.getKillRequestTime(), deserializedEvent.getKillRequestTime()); + Assert.assertEquals(event.isSessionStopped(), deserializedEvent.isSessionStopped()); + logEvents(event, deserializedEvent); + } + private void testDAGFinishedEvent() throws Exception { { DAGFinishedEvent event = new DAGFinishedEvent( @@ -707,6 +719,9 @@ public class TestHistoryEventsProtoConversion { case DAG_RECOVERED: testDAGRecoveredEvent(); break; + case DAG_KILL_REQUEST: + testDAGKillRequestEvent(); + break; default: throw new Exception("Unhandled Event type in Unit tests: " + eventType); } http://git-wip-us.apache.org/repos/asf/tez/blob/68f17cf7/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java index 9439d4e..0beda20 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java @@ -44,6 +44,7 @@ import org.apache.tez.dag.history.events.ContainerStoppedEvent; import org.apache.tez.dag.history.events.DAGCommitStartedEvent; import org.apache.tez.dag.history.events.DAGFinishedEvent; import org.apache.tez.dag.history.events.DAGInitializedEvent; +import org.apache.tez.dag.history.events.DAGKillRequestEvent; import org.apache.tez.dag.history.events.DAGRecoveredEvent; import org.apache.tez.dag.history.events.DAGStartedEvent; import org.apache.tez.dag.history.events.DAGSubmittedEvent; @@ -185,6 +186,9 @@ public class TestHistoryEventJsonConversion { event = new DAGRecoveredEvent(applicationAttemptId, tezDAGID, dagPlan.getName(), user, 1l); break; + case DAG_KILL_REQUEST: + event = new DAGKillRequestEvent(); + break; default: Assert.fail("Unhandled event type " + eventType); } http://git-wip-us.apache.org/repos/asf/tez/blob/68f17cf7/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java index bfa4fa1..a6f5f84 100644 --- a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java +++ b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java @@ -50,6 +50,7 @@ import org.apache.tez.dag.history.events.ContainerStoppedEvent; import org.apache.tez.dag.history.events.DAGCommitStartedEvent; import org.apache.tez.dag.history.events.DAGFinishedEvent; import org.apache.tez.dag.history.events.DAGInitializedEvent; +import org.apache.tez.dag.history.events.DAGKillRequestEvent; import org.apache.tez.dag.history.events.DAGRecoveredEvent; import org.apache.tez.dag.history.events.DAGStartedEvent; import org.apache.tez.dag.history.events.DAGSubmittedEvent; @@ -190,6 +191,9 @@ public class TestHistoryEventTimelineConversion { event = new DAGRecoveredEvent(applicationAttemptId, tezDAGID, dagPlan.getName(), user, random.nextLong()); break; + case DAG_KILL_REQUEST: + event = new DAGKillRequestEvent(); + break; default: Assert.fail("Unhandled event type " + eventType); }
