Repository: tez Updated Branches: refs/heads/master 1f2a93563 -> 4ce6ea6ed
TEZ-3644. Cleanup container list stored in AMNode. (sseth) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/4ce6ea6e Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/4ce6ea6e Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/4ce6ea6e Branch: refs/heads/master Commit: 4ce6ea6ed867a67600dbc36a2f56c37bbec3d708 Parents: 1f2a935 Author: Siddharth Seth <[email protected]> Authored: Thu Mar 2 16:02:16 2017 -0800 Committer: Siddharth Seth <[email protected]> Committed: Thu Mar 2 16:02:16 2017 -0800 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../dag/app/rm/container/AMContainerImpl.java | 62 +++++++-- .../org/apache/tez/dag/app/rm/node/AMNode.java | 3 + .../rm/node/AMNodeEventContainerCompleted.java | 37 ++++++ .../tez/dag/app/rm/node/AMNodeEventType.java | 5 +- .../apache/tez/dag/app/rm/node/AMNodeImpl.java | 67 ++++++++-- .../tez/dag/app/rm/node/AMNodeTracker.java | 5 +- .../dag/app/rm/node/PerSourceNodeTracker.java | 11 +- .../dag/app/rm/container/TestAMContainer.java | 128 ++++++++++++------- .../tez/dag/app/rm/node/TestAMNodeTracker.java | 73 +++++++++++ 10 files changed, 322 insertions(+), 70 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/4ce6ea6e/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index b8465de..07841bf 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3644. Cleanup container list stored in AMNode. TEZ-3646. IFile.Writer has an extra output stream flush call TEZ-3643. Long running AMs can go out of memory due to retained AMContainer instances. TEZ-3637. TezMerger logs too much at INFO level http://git-wip-us.apache.org/repos/asf/tez/blob/4ce6ea6e/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java index ac429c7..18e72a7 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java @@ -32,9 +32,12 @@ import org.apache.tez.Utils; import org.apache.tez.common.TezUtilsInternal; import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType; import org.apache.tez.dag.app.dag.event.DAGAppMasterEventUserServiceFatalError; +import org.apache.tez.dag.app.rm.node.AMNodeEventContainerCompleted; import org.apache.tez.serviceplugins.api.ContainerEndReason; import org.apache.tez.common.ContainerSignatureMatcher; import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; +import org.apache.tez.state.OnStateChangedCallback; +import org.apache.tez.state.StateMachineTez; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.security.Credentials; @@ -48,7 +51,6 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.state.InvalidStateTransitonException; import org.apache.hadoop.yarn.state.MultipleArcTransition; import org.apache.hadoop.yarn.state.SingleArcTransition; -import org.apache.hadoop.yarn.state.StateMachine; import org.apache.hadoop.yarn.state.StateMachineFactory; import org.apache.hadoop.yarn.util.Clock; import org.apache.tez.dag.app.AppContext; @@ -118,6 +120,8 @@ public class AMContainerImpl implements AMContainer { private Credentials credentials; private boolean credentialsChanged = false; + + private boolean completedMessageSent = false; // TODO Consider registering with the TAL, instead of the TAL pulling. // Possibly after splitting TAL and ContainerListener. @@ -127,8 +131,11 @@ public class AMContainerImpl implements AMContainer { // TODO Create a generic ERROR state. Container tries informing relevant components in this case. + private final NonRunningStateEnteredCallback NON_RUNNING_STATE_ENTERED_CALLBACK = new NonRunningStateEnteredCallback(); + + private final StateMachineTez<AMContainerState, AMContainerEventType, AMContainerEvent, AMContainerImpl> + stateMachine; - private final StateMachine<AMContainerState, AMContainerEventType, AMContainerEvent> stateMachine; private static final StateMachineFactory <AMContainerImpl, AMContainerState, AMContainerEventType, AMContainerEvent> stateMachineFactory = @@ -328,7 +335,19 @@ public class AMContainerImpl implements AMContainer { this.schedulerId = schedulerId; this.launcherId = launcherId; this.taskCommId = taskCommId; - this.stateMachine = stateMachineFactory.make(this); + this.stateMachine = new StateMachineTez<>(stateMachineFactory.make(this), this); + augmentStateMachine(); + } + + + private void augmentStateMachine() { + stateMachine + .registerStateEnteredCallback(AMContainerState.STOP_REQUESTED, + NON_RUNNING_STATE_ENTERED_CALLBACK) + .registerStateEnteredCallback(AMContainerState.STOPPING, + NON_RUNNING_STATE_ENTERED_CALLBACK) + .registerStateEnteredCallback(AMContainerState.COMPLETED, + NON_RUNNING_STATE_ENTERED_CALLBACK); } @Override @@ -422,7 +441,7 @@ public class AMContainerImpl implements AMContainer { LOG.error("Can't handle event " + event.getType() + " at current state " + oldState + " for ContainerId " + this.containerId, e); - inError = true; + setError(); // TODO Can't set state to COMPLETED. Add a default error state. } if (oldState != getState()) { @@ -482,7 +501,7 @@ public class AMContainerImpl implements AMContainer { msg, e)); // We have not registered with any of the listeners etc yet. Send out a deallocateContainer // message and return. The AM will shutdown shortly. - container.inError = true; + container.setError(); container.deAllocate(); return; } @@ -515,7 +534,7 @@ public class AMContainerImpl implements AMContainer { @Override public void transition(AMContainerImpl container, AMContainerEvent cEvent) { AMContainerEventAssignTA event = (AMContainerEventAssignTA) cEvent; - container.inError = true; + container.setError(); container.registerFailedAttempt(event.getTaskAttemptId()); container.maybeSendNodeFailureForFailedAssignment(event .getTaskAttemptId()); @@ -961,7 +980,7 @@ public class AMContainerImpl implements AMContainer { @Override public void transition(AMContainerImpl container, AMContainerEvent cEvent) { AMContainerEventAssignTA event = (AMContainerEventAssignTA) cEvent; - container.inError = true; + container.setError(); String errorMessage = "AttemptId: " + event.getTaskAttemptId() + " cannot be allocated to container: " + container.getContainerId() + " in " + container.getState() + " state"; @@ -1032,7 +1051,7 @@ public class AMContainerImpl implements AMContainer { @Override public void transition(AMContainerImpl container, AMContainerEvent cEvent) { - container.inError = true; + container.setError(); } } @@ -1046,7 +1065,7 @@ public class AMContainerImpl implements AMContainer { // think the container is still around and assign a task to it. The task // ends up getting a CONTAINER_KILLED message. Task could handle this by // asking for a reschedule in this case. Will end up FAILING the task instead of KILLING it. - container.inError = true; + container.setError(); AMContainerEventAssignTA event = (AMContainerEventAssignTA) cEvent; String errorMessage = "AttemptId: " + event.getTaskAttemptId() + " cannot be allocated to container: " + container.getContainerId() @@ -1058,9 +1077,19 @@ public class AMContainerImpl implements AMContainer { } } + private static class NonRunningStateEnteredCallback + implements OnStateChangedCallback<AMContainerState, AMContainerImpl> { + + @Override + public void onStateChanged(AMContainerImpl amContainer, + AMContainerState amContainerState) { + amContainer.handleNonRunningStateEntered(); + } + } + private void handleExtraTAAssign( AMContainerEventAssignTA event, TezTaskAttemptID currentTaId) { - this.inError = true; + setError(); String errorMessage = "AMScheduler Error: Multiple simultaneous " + "taskAttempt allocations to: " + this.getContainerId() + ". Attempts: " + currentTaId + ", " + event.getTaskAttemptId() + @@ -1078,6 +1107,19 @@ public class AMContainerImpl implements AMContainer { this.unregisterFromContainerListener(); } + private void setError() { + this.inError = true; + handleNonRunningStateEntered(); + } + + private void handleNonRunningStateEntered() { + if (!completedMessageSent) { + completedMessageSent = true; + sendEvent(new AMNodeEventContainerCompleted(getContainer().getNodeId(), + schedulerId, containerId)); + } + } + protected void registerFailedAttempt(TezTaskAttemptID taId) { failedAssignments.add(taId); } http://git-wip-us.apache.org/repos/asf/tez/blob/4ce6ea6e/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNode.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNode.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNode.java index 1c34816..bc01e04 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNode.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNode.java @@ -23,6 +23,7 @@ import java.util.List; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.tez.dag.app.dag.DAG; public interface AMNode extends EventHandler<AMNodeEvent> { @@ -33,4 +34,6 @@ public interface AMNode extends EventHandler<AMNodeEvent> { public boolean isUnhealthy(); public boolean isBlacklisted(); public boolean isUsable(); + + void dagComplete(DAG dag); } http://git-wip-us.apache.org/repos/asf/tez/blob/4ce6ea6e/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventContainerCompleted.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventContainerCompleted.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventContainerCompleted.java new file mode 100644 index 0000000..f999c3a --- /dev/null +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventContainerCompleted.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.tez.dag.app.rm.node; + +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; + +public class AMNodeEventContainerCompleted extends AMNodeEvent { + + private final ContainerId containerId; + + public AMNodeEventContainerCompleted( + NodeId nodeId, + int schedulerId, ContainerId containerId) { + super(nodeId, schedulerId, AMNodeEventType.N_CONTAINER_COMPLETED); + this.containerId = containerId; + } + + public ContainerId getContainerId() { + return containerId; + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/4ce6ea6e/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventType.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventType.java index 86087d0..a141124 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventType.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventType.java @@ -21,7 +21,10 @@ package org.apache.tez.dag.app.rm.node; public enum AMNodeEventType { //Producer: Scheduler N_CONTAINER_ALLOCATED, - + + //Producer: Container + N_CONTAINER_COMPLETED, + //Producer: TaskSchedulerEventHandler N_TA_SUCCEEDED, http://git-wip-us.apache.org/repos/asf/tez/blob/4ce6ea6e/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java index bcc38c6..f4ad032 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java @@ -19,6 +19,8 @@ package org.apache.tez.dag.app.rm.node; import java.util.EnumSet; +import java.util.HashSet; +import java.util.LinkedHashSet; import java.util.LinkedList; import java.util.List; import java.util.Set; @@ -26,6 +28,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; +import org.apache.tez.dag.app.dag.DAG; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -60,20 +63,19 @@ public class AMNodeImpl implements AMNode { private boolean blacklistingEnabled; private boolean ignoreBlacklisting = false; private boolean nodeUpdatesRescheduleEnabled; - private Set<TezTaskAttemptID> failedAttemptIds = Sets.newHashSet(); + private final Set<TezTaskAttemptID> failedAttemptIds = Sets.newHashSet(); @SuppressWarnings("rawtypes") protected EventHandler eventHandler; @VisibleForTesting - final List<ContainerId> containers = new LinkedList<ContainerId>(); + final Set<ContainerId> containers = new LinkedHashSet<>(); + final Set<ContainerId> completedContainers = new HashSet<>(); int numFailedTAs = 0; int numSuccessfulTAs = 0; - - //Book-keeping only. In case of Health status change. - private final List<ContainerId> pastContainers = new LinkedList<ContainerId>(); - + private static final ContainerCompletedTransition CONTAINER_COMPLETED_TRANSITION = + new ContainerCompletedTransition(); private final StateMachine<AMNodeState, AMNodeEventType, AMNodeEvent> stateMachine; @@ -103,6 +105,8 @@ public class AMNodeImpl implements AMNode { new IgnoreBlacklistingStateChangeTransition(true)) .addTransition(AMNodeState.ACTIVE, AMNodeState.ACTIVE, AMNodeEventType.N_TURNED_HEALTHY) + .addTransition(AMNodeState.ACTIVE, AMNodeState.ACTIVE, + AMNodeEventType.N_CONTAINER_COMPLETED, CONTAINER_COMPLETED_TRANSITION) // Transitions from BLACKLISTED state. .addTransition(AMNodeState.BLACKLISTED, AMNodeState.BLACKLISTED, @@ -120,6 +124,8 @@ public class AMNodeImpl implements AMNode { .addTransition(AMNodeState.BLACKLISTED, AMNodeState.FORCED_ACTIVE, AMNodeEventType.N_IGNORE_BLACKLISTING_ENABLED, new IgnoreBlacklistingStateChangeTransition(true)) + .addTransition(AMNodeState.BLACKLISTED, AMNodeState.BLACKLISTED, + AMNodeEventType.N_CONTAINER_COMPLETED, CONTAINER_COMPLETED_TRANSITION) .addTransition( AMNodeState.BLACKLISTED, AMNodeState.BLACKLISTED, @@ -142,6 +148,8 @@ public class AMNodeImpl implements AMNode { EnumSet.of(AMNodeState.BLACKLISTED, AMNodeState.ACTIVE), AMNodeEventType.N_IGNORE_BLACKLISTING_DISABLED, new IgnoreBlacklistingDisabledTransition()) + .addTransition(AMNodeState.FORCED_ACTIVE, AMNodeState.FORCED_ACTIVE, + AMNodeEventType.N_CONTAINER_COMPLETED, CONTAINER_COMPLETED_TRANSITION) .addTransition( AMNodeState.FORCED_ACTIVE, AMNodeState.FORCED_ACTIVE, @@ -168,6 +176,8 @@ public class AMNodeImpl implements AMNode { EnumSet.of(AMNodeState.ACTIVE, AMNodeState.FORCED_ACTIVE), AMNodeEventType.N_TURNED_HEALTHY, new NodeTurnedHealthyTransition()) .addTransition(AMNodeState.UNHEALTHY, AMNodeState.UNHEALTHY, + AMNodeEventType.N_CONTAINER_COMPLETED, CONTAINER_COMPLETED_TRANSITION) + .addTransition(AMNodeState.UNHEALTHY, AMNodeState.UNHEALTHY, AMNodeEventType.N_TURNED_UNHEALTHY, new GenericErrorTransition()) .installTopology(); @@ -259,7 +269,6 @@ public class AMNodeImpl implements AMNode { sendEvent(new AMContainerEventNodeFailed(c, "Node blacklisted")); } // these containers are not useful anymore - pastContainers.addAll(containers); containers.clear(); sendEvent(new AMSchedulerEventNodeBlacklistUpdate(getNodeId(), true, schedulerId)); } @@ -295,9 +304,9 @@ public class AMNodeImpl implements AMNode { @Override public AMNodeState transition(AMNodeImpl node, AMNodeEvent nEvent) { AMNodeEventTaskAttemptEnded event = (AMNodeEventTaskAttemptEnded) nEvent; - LOG.info("Attempt failed on node: " + node.getNodeId() + " TA: " - + event.getTaskAttemptId() + " failed: " + event.failed() - + " container: " + event.getContainerId() + " numFailedTAs: " + LOG.info("Attempt " + (event.failed() ? "failed" : "killed") + "on node: " + node.getNodeId() + + " TA: " + event.getTaskAttemptId() + + ", container: " + event.getContainerId() + ", numFailedTAs: " + node.numFailedTAs); if (event.failed()) { // ignore duplicate attempt ids @@ -381,8 +390,6 @@ public class AMNodeImpl implements AMNode { AMNodeEventContainerAllocated event = (AMNodeEventContainerAllocated) nEvent; node.sendEvent(new AMContainerEvent(event.getContainerId(), AMContainerEventType.C_STOP_REQUEST)); - // ZZZ CReuse: Should the scheduler check node state before scheduling a - // container on it ? } } @@ -434,7 +441,6 @@ public class AMNodeImpl implements AMNode { MultipleArcTransition<AMNodeImpl, AMNodeEvent, AMNodeState> { @Override public AMNodeState transition(AMNodeImpl node, AMNodeEvent nEvent) { - node.pastContainers.addAll(node.containers); node.containers.clear(); if (node.ignoreBlacklisting) { return AMNodeState.FORCED_ACTIVE; @@ -444,6 +450,17 @@ public class AMNodeImpl implements AMNode { } } + protected static class ContainerCompletedTransition + implements SingleArcTransition<AMNodeImpl, AMNodeEvent> { + + @Override + public void transition(AMNodeImpl amNode, AMNodeEvent amNodeEvent) { + AMNodeEventContainerCompleted cc = + (AMNodeEventContainerCompleted) amNodeEvent; + amNode.completedContainers.add(cc.getContainerId()); + } + } + @Override public boolean isUnhealthy() { this.readLock.lock(); @@ -468,4 +485,28 @@ public class AMNodeImpl implements AMNode { public boolean isUsable() { return !(isUnhealthy() || isBlacklisted()); } + + @Override + public void dagComplete(DAG dag) { + this.writeLock.lock(); + try { + int countBefore = containers.size(); + int countCompleted = completedContainers.size(); + + + // Actual functionality. + containers.removeAll(completedContainers); + completedContainers.clear(); + + int countAfter = containers.size(); + if (LOG.isDebugEnabled()) { + LOG.debug( + "Node {}, cleaning up knownContainers. current={}, completed={}, postCleanup={}", + getNodeId(), countBefore, countCompleted, countAfter); + } + + } finally { + this.writeLock.unlock(); + } + } } http://git-wip-us.apache.org/repos/asf/tez/blob/4ce6ea6e/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java index fdc8a4c..1536170 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java @@ -101,6 +101,7 @@ public class AMNodeTracker extends AbstractService implements // No synchronization required until there's multiple dispatchers. switch (rEvent.getType()) { case N_CONTAINER_ALLOCATED: + case N_CONTAINER_COMPLETED: case N_TA_SUCCEEDED: case N_TA_ENDED: case N_IGNORE_BLACKLISTING_ENABLED: @@ -140,7 +141,9 @@ public class AMNodeTracker extends AbstractService implements } public void dagComplete(DAG dag) { - // TODO TEZ-2337 Maybe reset failures from previous DAGs + for (PerSourceNodeTracker perSourceNodeTracker : perSourceNodeTrackers.values()) { + perSourceNodeTracker.dagComplete(dag); + } } private PerSourceNodeTracker getAndCreateIfNeededPerSourceTracker(int schedulerId) { http://git-wip-us.apache.org/repos/asf/tez/blob/4ce6ea6e/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/PerSourceNodeTracker.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/PerSourceNodeTracker.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/PerSourceNodeTracker.java index 72c3230..74c6176 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/PerSourceNodeTracker.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/PerSourceNodeTracker.java @@ -24,6 +24,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.tez.dag.app.AppContext; +import org.apache.tez.dag.app.dag.DAG; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -106,7 +107,8 @@ public class PerSourceNodeTracker { } break; default: - nodeMap.get(nodeId).handle(rEvent); + amNode = nodeMap.get(nodeId); + amNode.handle(rEvent); } } @@ -186,6 +188,13 @@ public class PerSourceNodeTracker { } } + public void dagComplete(DAG dag) { + for (AMNode amNode : nodeMap.values()) { + amNode.dagComplete(dag); + } + // TODO TEZ-2337 Maybe reset failures from previous DAGs + } + @SuppressWarnings("unchecked") private void sendEvent(Event<?> event) { this.eventHandler.handle(event); http://git-wip-us.apache.org/repos/asf/tez/blob/4ce6ea6e/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java index 4d1bbae..1b9df99 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java @@ -64,6 +64,7 @@ import org.apache.hadoop.yarn.util.SystemClock; import org.apache.tez.common.security.JobTokenIdentifier; import org.apache.tez.common.security.TokenCache; import org.apache.tez.dag.app.TaskCommunicatorWrapper; +import org.apache.tez.dag.app.rm.node.AMNodeEventType; import org.apache.tez.serviceplugins.api.ContainerEndReason; import org.apache.tez.serviceplugins.api.ServicePluginException; import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; @@ -146,7 +147,9 @@ public class TestAMContainer { wc.containerCompleted(); wc.verifyHistoryStopEvent(); wc.verifyState(AMContainerState.COMPLETED); - wc.verifyNoOutgoingEvents(); + List<Event> outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1); + verifyUnOrderedOutgoingEventTypes(outgoingEvents, + AMNodeEventType.N_CONTAINER_COMPLETED); verifyUnregisterRunningContainer(wc.tal, wc.containerID, 0, ContainerEndReason.COMPLETED, null); verify(wc.chh).unregister(wc.containerID); @@ -196,7 +199,9 @@ public class TestAMContainer { wc.containerCompleted(); wc.verifyHistoryStopEvent(); wc.verifyState(AMContainerState.COMPLETED); - wc.verifyNoOutgoingEvents(); + List<Event> outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1); + verifyUnOrderedOutgoingEventTypes(outgoingEvents, + AMNodeEventType.N_CONTAINER_COMPLETED); verifyUnregisterRunningContainer(wc.tal, wc.containerID, 0, ContainerEndReason.COMPLETED, null); verify(wc.chh).unregister(wc.containerID); @@ -266,7 +271,9 @@ public class TestAMContainer { wc.containerCompleted(); wc.verifyHistoryStopEvent(); wc.verifyState(AMContainerState.COMPLETED); - wc.verifyNoOutgoingEvents(); + List<Event> outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1); + verifyUnOrderedOutgoingEventTypes(outgoingEvents, + AMNodeEventType.N_CONTAINER_COMPLETED); verifyUnregisterRunningContainer(wc.tal, wc.containerID, 0, ContainerEndReason.COMPLETED, null); verify(wc.chh).unregister(wc.containerID); @@ -288,9 +295,11 @@ public class TestAMContainer { wc.stopRequest(); wc.verifyState(AMContainerState.STOP_REQUESTED); // Event to NM to stop the container. - wc.verifyCountAndGetOutgoingEvents(1); - assertTrue(wc.verifyCountAndGetOutgoingEvents(1).get(0).getType() == - ContainerLauncherEventType.CONTAINER_STOP_REQUEST); + + List<Event> outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2); + verifyUnOrderedOutgoingEventTypes(outgoingEvents, + ContainerLauncherEventType.CONTAINER_STOP_REQUEST, + AMNodeEventType.N_CONTAINER_COMPLETED); wc.nmStopSent(); wc.verifyState(AMContainerState.STOPPING); @@ -323,9 +332,10 @@ public class TestAMContainer { wc.stopRequest(); wc.verifyState(AMContainerState.STOP_REQUESTED); // Event to NM to stop the container. - wc.verifyCountAndGetOutgoingEvents(1); - assertTrue(wc.verifyCountAndGetOutgoingEvents(1).get(0).getType() == - ContainerLauncherEventType.CONTAINER_STOP_REQUEST); + List<Event> outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2); + verifyUnOrderedOutgoingEventTypes(outgoingEvents, + ContainerLauncherEventType.CONTAINER_STOP_REQUEST, + AMNodeEventType.N_CONTAINER_COMPLETED); wc.nmStopFailed(); wc.verifyState(AMContainerState.STOPPING); @@ -366,11 +376,12 @@ public class TestAMContainer { "Multiple simultaneous taskAttempt"); verify(wc.chh).unregister(wc.containerID); // 1 for NM stop request. 2 TERMINATING to TaskAttempt. - outgoingEvents = wc.verifyCountAndGetOutgoingEvents(3); + outgoingEvents = wc.verifyCountAndGetOutgoingEvents(4); verifyUnOrderedOutgoingEventTypes(outgoingEvents, ContainerLauncherEventType.CONTAINER_STOP_REQUEST, TaskAttemptEventType.TA_CONTAINER_TERMINATING, - TaskAttemptEventType.TA_CONTAINER_TERMINATING); + TaskAttemptEventType.TA_CONTAINER_TERMINATING, + AMNodeEventType.N_CONTAINER_COMPLETED); assertTrue(wc.amContainer.isInErrorState()); wc.nmStopSent(); @@ -405,11 +416,12 @@ public class TestAMContainer { "Multiple simultaneous taskAttempt"); verify(wc.chh).unregister(wc.containerID); // 1 for NM stop request. 2 TERMINATING to TaskAttempt. - outgoingEvents = wc.verifyCountAndGetOutgoingEvents(3); + outgoingEvents = wc.verifyCountAndGetOutgoingEvents(4); verifyUnOrderedOutgoingEventTypes(outgoingEvents, ContainerLauncherEventType.CONTAINER_STOP_REQUEST, TaskAttemptEventType.TA_CONTAINER_TERMINATING, - TaskAttemptEventType.TA_CONTAINER_TERMINATING); + TaskAttemptEventType.TA_CONTAINER_TERMINATING, + AMNodeEventType.N_CONTAINER_COMPLETED); assertTrue(wc.amContainer.isInErrorState()); wc.nmStopSent(); @@ -442,10 +454,11 @@ public class TestAMContainer { "timed out"); verify(wc.chh).unregister(wc.containerID); // 1 to TA, 1 for RM de-allocate. - outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2); + outgoingEvents = wc.verifyCountAndGetOutgoingEvents(3); verifyUnOrderedOutgoingEventTypes(outgoingEvents, TaskAttemptEventType.TA_CONTAINER_TERMINATING, - ContainerLauncherEventType.CONTAINER_STOP_REQUEST); + ContainerLauncherEventType.CONTAINER_STOP_REQUEST, + AMNodeEventType.N_CONTAINER_COMPLETED); // TODO Should this be an RM DE-ALLOCATE instead ? wc.containerCompleted(); @@ -477,10 +490,11 @@ public class TestAMContainer { "received a STOP_REQUEST"); verify(wc.chh).unregister(wc.containerID); // 1 to TA, 1 for RM de-allocate. - outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2); + outgoingEvents = wc.verifyCountAndGetOutgoingEvents(3); verifyUnOrderedOutgoingEventTypes(outgoingEvents, TaskAttemptEventType.TA_CONTAINER_TERMINATING, - ContainerLauncherEventType.CONTAINER_STOP_REQUEST); + ContainerLauncherEventType.CONTAINER_STOP_REQUEST, + AMNodeEventType.N_CONTAINER_COMPLETED); // TODO Should this be an RM DE-ALLOCATE instead ? wc.containerCompleted(); @@ -511,10 +525,11 @@ public class TestAMContainer { verifyUnregisterRunningContainer(wc.tal, wc.containerID, 0, ContainerEndReason.LAUNCH_FAILED, "launchFailed"); - outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2); + outgoingEvents = wc.verifyCountAndGetOutgoingEvents(3); verifyUnOrderedOutgoingEventTypes(outgoingEvents, TaskAttemptEventType.TA_CONTAINER_TERMINATING, - AMSchedulerEventType.S_CONTAINER_DEALLOCATE); + AMSchedulerEventType.S_CONTAINER_DEALLOCATE, + AMNodeEventType.N_CONTAINER_COMPLETED); for (Event e : outgoingEvents) { if (e.getType() == TaskAttemptEventType.TA_CONTAINER_TERMINATING) { Assert.assertEquals(TaskAttemptTerminationCause.CONTAINER_LAUNCH_FAILED, @@ -538,7 +553,9 @@ public class TestAMContainer { wc.containerCompleted(); wc.verifyState(AMContainerState.COMPLETED); - wc.verifyNoOutgoingEvents(); + List<Event> outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1); + verifyUnOrderedOutgoingEventTypes(outgoingEvents, + AMNodeEventType.N_CONTAINER_COMPLETED); assertFalse(wc.amContainer.isInErrorState()); } @@ -561,9 +578,10 @@ public class TestAMContainer { verify(wc.tal).registerRunningContainer(wc.containerID, 0); verifyUnregisterRunningContainer(wc.tal, wc.containerID, 0, ContainerEndReason.COMPLETED, null); - outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1); + outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2); verifyUnOrderedOutgoingEventTypes(outgoingEvents, - TaskAttemptEventType.TA_CONTAINER_TERMINATED); + TaskAttemptEventType.TA_CONTAINER_TERMINATED, + AMNodeEventType.N_CONTAINER_COMPLETED); Assert.assertEquals(TaskAttemptTerminationCause.CONTAINER_LAUNCH_FAILED, ((TaskAttemptEventContainerTerminated)outgoingEvents.get(0)).getTerminationCause()); @@ -591,9 +609,10 @@ public class TestAMContainer { verify(wc.tal).registerRunningContainer(wc.containerID, 0); verifyUnregisterRunningContainer(wc.tal, wc.containerID, 0, ContainerEndReason.OTHER, "DiskFailed"); - outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1); + outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2); verifyUnOrderedOutgoingEventTypes(outgoingEvents, - TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM); + TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM, + AMNodeEventType.N_CONTAINER_COMPLETED); Assert.assertEquals(TaskAttemptTerminationCause.NODE_DISK_ERROR, ((TaskAttemptEventContainerTerminatedBySystem)outgoingEvents.get(0)).getTerminationCause()); @@ -623,9 +642,10 @@ public class TestAMContainer { verifyUnregisterRunningContainer(wc.tal, wc.containerID, 0, ContainerEndReason.NODE_FAILED, "NodeFailed"); - outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1); + outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2); verifyUnOrderedOutgoingEventTypes(outgoingEvents, - TaskAttemptEventType.TA_CONTAINER_TERMINATED); + TaskAttemptEventType.TA_CONTAINER_TERMINATED, + AMNodeEventType.N_CONTAINER_COMPLETED); Assert.assertEquals(TaskAttemptTerminationCause.NODE_FAILED, ((TaskAttemptEventContainerTerminated)outgoingEvents.get(0)).getTerminationCause()); @@ -656,11 +676,12 @@ public class TestAMContainer { verify(wc.chh).register(wc.containerID); verify(wc.chh).unregister(wc.containerID); - wc.verifyCountAndGetOutgoingEvents(0); + outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1); + verifyUnOrderedOutgoingEventTypes(outgoingEvents, + AMNodeEventType.N_CONTAINER_COMPLETED); assertFalse(wc.amContainer.isInErrorState()); - wc.verifyNoOutgoingEvents(); wc.verifyHistoryStopEvent(); assertFalse(wc.amContainer.isInErrorState()); @@ -685,9 +706,10 @@ public class TestAMContainer { verify(wc.chh).register(wc.containerID); verify(wc.chh).unregister(wc.containerID); - outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1); + outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2); verifyUnOrderedOutgoingEventTypes(outgoingEvents, - TaskAttemptEventType.TA_CONTAINER_TERMINATED); + TaskAttemptEventType.TA_CONTAINER_TERMINATED, + AMNodeEventType.N_CONTAINER_COMPLETED); assertFalse(wc.amContainer.isInErrorState()); @@ -722,11 +744,14 @@ public class TestAMContainer { verify(wc.chh).register(wc.containerID); verify(wc.chh).unregister(wc.containerID); - outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1); + outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2); + + Event event = findEventByType(outgoingEvents, TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM); Assert.assertEquals(TaskAttemptTerminationCause.EXTERNAL_PREEMPTION, - ((TaskAttemptEventContainerTerminatedBySystem)outgoingEvents.get(0)).getTerminationCause()); + ((TaskAttemptEventContainerTerminatedBySystem)event).getTerminationCause()); verifyUnOrderedOutgoingEventTypes(outgoingEvents, - TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM); + TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM, + AMNodeEventType.N_CONTAINER_COMPLETED); assertFalse(wc.amContainer.isInErrorState()); @@ -761,9 +786,10 @@ public class TestAMContainer { verify(wc.chh).register(wc.containerID); verify(wc.chh).unregister(wc.containerID); - outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1); + outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2); verifyUnOrderedOutgoingEventTypes(outgoingEvents, - TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM); + TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM, + AMNodeEventType.N_CONTAINER_COMPLETED); Assert.assertEquals(TaskAttemptTerminationCause.INTERNAL_PREEMPTION, ((TaskAttemptEventContainerTerminatedBySystem)outgoingEvents.get(0)).getTerminationCause()); @@ -799,11 +825,13 @@ public class TestAMContainer { verify(wc.chh).register(wc.containerID); verify(wc.chh).unregister(wc.containerID); - outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1); + outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2); + Event event = findEventByType(outgoingEvents, TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM); Assert.assertEquals(TaskAttemptTerminationCause.NODE_DISK_ERROR, - ((TaskAttemptEventContainerTerminatedBySystem)outgoingEvents.get(0)).getTerminationCause()); + ((TaskAttemptEventContainerTerminatedBySystem)event).getTerminationCause()); verifyUnOrderedOutgoingEventTypes(outgoingEvents, - TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM); + TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM, + AMNodeEventType.N_CONTAINER_COMPLETED); assertFalse(wc.amContainer.isInErrorState()); @@ -862,11 +890,12 @@ public class TestAMContainer { wc.nodeFailed(); // Expecting a complete event from the RM wc.verifyState(AMContainerState.STOPPING); - outgoingEvents = wc.verifyCountAndGetOutgoingEvents(3); + outgoingEvents = wc.verifyCountAndGetOutgoingEvents(4); verifyUnOrderedOutgoingEventTypes(outgoingEvents, TaskAttemptEventType.TA_NODE_FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATING, - AMSchedulerEventType.S_CONTAINER_DEALLOCATE); + AMSchedulerEventType.S_CONTAINER_DEALLOCATE, + AMNodeEventType.N_CONTAINER_COMPLETED); for (Event event : outgoingEvents) { if (event.getType() == TaskAttemptEventType.TA_NODE_FAILED) { @@ -904,11 +933,12 @@ public class TestAMContainer { wc.nodeFailed(); // Expecting a complete event from the RM wc.verifyState(AMContainerState.STOPPING); - outgoingEvents = wc.verifyCountAndGetOutgoingEvents(3); + outgoingEvents = wc.verifyCountAndGetOutgoingEvents(4); verifyUnOrderedOutgoingEventTypes(outgoingEvents, TaskAttemptEventType.TA_NODE_FAILED, TaskAttemptEventType.TA_NODE_FAILED, - AMSchedulerEventType.S_CONTAINER_DEALLOCATE); + AMSchedulerEventType.S_CONTAINER_DEALLOCATE, + AMNodeEventType.N_CONTAINER_COMPLETED); for (Event event : outgoingEvents) { if (event.getType() == TaskAttemptEventType.TA_NODE_FAILED) { @@ -945,12 +975,13 @@ public class TestAMContainer { wc.nodeFailed(); // Expecting a complete event from the RM wc.verifyState(AMContainerState.STOPPING); - outgoingEvents = wc.verifyCountAndGetOutgoingEvents(4); + outgoingEvents = wc.verifyCountAndGetOutgoingEvents(5); verifyUnOrderedOutgoingEventTypes(outgoingEvents, TaskAttemptEventType.TA_NODE_FAILED, TaskAttemptEventType.TA_NODE_FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATING, - AMSchedulerEventType.S_CONTAINER_DEALLOCATE); + AMSchedulerEventType.S_CONTAINER_DEALLOCATE, + AMNodeEventType.N_CONTAINER_COMPLETED); for (Event event : outgoingEvents) { if (event.getType() == TaskAttemptEventType.TA_NODE_FAILED) { @@ -1439,6 +1470,15 @@ public class TestAMContainer { assertTrue("Found unexpected events: " + eventsCopy + " in outgoing event list", eventsCopy.isEmpty()); } + + private Event findEventByType(List<Event> events, Enum<?> type) { + for (Event event : events) { + if (event.getType() == type) { + return event; + } + } + return null; + } private LocalResource createLocalResource(String name) { LocalResource lr = LocalResource.newInstance(URL.newInstance(null, "localhost", 2321, name), http://git-wip-us.apache.org/repos/asf/tez/blob/4ce6ea6e/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeTracker.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeTracker.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeTracker.java index e123dd1..11d3b7a 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeTracker.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/node/TestAMNodeTracker.java @@ -29,6 +29,7 @@ import static org.mockito.Mockito.mock; import java.util.List; +import org.apache.tez.dag.app.dag.DAG; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -326,6 +327,78 @@ public class TestAMNodeTracker { } } + @Test(timeout = 10000L) + public void testNodeCompletedAndCleanup() { + AppContext appContext = mock(AppContext.class); + Configuration conf = new Configuration(false); + conf.setInt(TezConfiguration.TEZ_AM_MAX_TASK_FAILURES_PER_NODE, 2); + TestEventHandler handler = new TestEventHandler(); + AMNodeTracker amNodeTracker = new AMNodeTracker(handler, appContext); + doReturn(amNodeTracker).when(appContext).getNodeTracker(); + AMContainerMap amContainerMap = mock(AMContainerMap.class); + TaskSchedulerManager taskSchedulerManager = + mock(TaskSchedulerManager.class); + dispatcher.register(AMNodeEventType.class, amNodeTracker); + dispatcher.register(AMContainerEventType.class, amContainerMap); + dispatcher.register(AMSchedulerEventType.class, taskSchedulerManager); + amNodeTracker.init(conf); + amNodeTracker.start(); + + try { + + NodeId nodeId = NodeId.newInstance("fakenode", 3333); + amNodeTracker.nodeSeen(nodeId, 0); + + AMNode amNode = amNodeTracker.get(nodeId, 0); + ContainerId[] containerIds = new ContainerId[7]; + + // Start 5 containers. + for (int i = 0; i < 5; i++) { + containerIds[i] = mock(ContainerId.class); + amNodeTracker + .handle(new AMNodeEventContainerAllocated(nodeId, 0, containerIds[i])); + } + assertEquals(5, amNode.getContainers().size()); + + // Finnish 1st dag + amNodeTracker.dagComplete(mock(DAG.class)); + assertEquals(5, amNode.getContainers().size()); + + + // Mark 2 as complete. Finish 2nd dag. + for (int i = 0; i < 2; i++) { + amNodeTracker.handle( + new AMNodeEventContainerCompleted(nodeId, 0, containerIds[i])); + } + amNodeTracker.dagComplete(mock(DAG.class)); + assertEquals(3, amNode.getContainers().size()); + + // Add 2 more containers. Mark all as complete. Finish 3rd dag. + for (int i = 5; i < 7; i++) { + containerIds[i] = mock(ContainerId.class); + amNodeTracker + .handle(new AMNodeEventContainerAllocated(nodeId, 0, containerIds[i])); + } + assertEquals(5, amNode.getContainers().size()); + amNodeTracker.dagComplete(mock(DAG.class)); + assertEquals(5, amNode.getContainers().size()); + amNodeTracker.dagComplete(mock(DAG.class)); + assertEquals(5, amNode.getContainers().size()); + + for (int i = 2; i < 7; i++) { + amNodeTracker.handle( + new AMNodeEventContainerCompleted(nodeId, 0, containerIds[i])); + } + assertEquals(5, amNode.getContainers().size()); + amNodeTracker.dagComplete(mock(DAG.class)); + assertEquals(0, amNode.getContainers().size()); + + } finally { + amNodeTracker.stop(); + } + + } + @Test(timeout=10000) public void testNodeUnhealthyRescheduleTasksEnabled() throws Exception { _testNodeUnhealthyRescheduleTasks(true);
