Revert "YARN-4502. Fix two AM containers get allocated when AM restart. (Vinod Kumar Vavilapalli via wangda)"
This reverts commit 805a9ed85eb34c8125cfb7d26d07cdfac12b3579. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/adf260a7 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/adf260a7 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/adf260a7 Branch: refs/heads/HDFS-1312 Commit: adf260a728df427eb729abe8fb9ad7248991ea54 Parents: b08ecf5 Author: Wangda Tan <[email protected]> Authored: Mon Jan 18 16:50:45 2016 +0800 Committer: Wangda Tan <[email protected]> Committed: Mon Jan 18 16:50:45 2016 +0800 ---------------------------------------------------------------------- .../sls/scheduler/ResourceSchedulerWrapper.java | 4 +- hadoop-yarn-project/CHANGES.txt | 3 - .../ProportionalCapacityPreemptionPolicy.java | 6 +- .../rmcontainer/RMContainerImpl.java | 17 ++- .../scheduler/AbstractYarnScheduler.java | 44 ++------ .../scheduler/AppSchedulingInfo.java | 11 +- .../scheduler/ContainerPreemptEvent.java | 57 ++++++++++ .../scheduler/PreemptableResourceScheduler.java | 4 +- .../scheduler/SchedulerApplicationAttempt.java | 2 +- .../scheduler/capacity/CapacityScheduler.java | 72 ++++++------ .../scheduler/common/fica/FiCaSchedulerApp.java | 2 +- .../scheduler/event/ContainerPreemptEvent.java | 55 --------- .../event/ContainerRescheduledEvent.java | 35 ++++++ .../scheduler/event/SchedulerEventType.java | 10 +- .../scheduler/fair/FairScheduler.java | 34 ++++-- .../scheduler/fifo/FifoScheduler.java | 28 +++-- .../yarn/server/resourcemanager/MockRM.java | 3 +- .../resourcemanager/TestRMDispatcher.java | 13 +-- .../applicationsmanager/TestAMRestart.java | 6 +- ...estProportionalCapacityPreemptionPolicy.java | 6 +- .../scheduler/TestAbstractYarnScheduler.java | 113 ------------------- .../capacity/TestApplicationPriority.java | 6 +- .../capacity/TestCapacityScheduler.java | 8 +- .../scheduler/fair/TestFairScheduler.java | 16 +-- 24 files changed, 244 insertions(+), 311 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/adf260a7/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java index 1bc6f23..106525d 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java @@ -70,10 +70,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnSched import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; @@ -957,7 +957,7 @@ final public class ResourceSchedulerWrapper } @Override - protected void completedContainerInternal(RMContainer rmContainer, + protected void completedContainer(RMContainer rmContainer, ContainerStatus containerStatus, RMContainerEventType event) { // do nothing } http://git-wip-us.apache.org/repos/asf/hadoop/blob/adf260a7/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index a9f4a84..24f16b5 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -1271,9 +1271,6 @@ Release 2.8.0 - UNRELEASED YARN-4538. QueueMetrics pending cores and memory metrics wrong. (Bibin A Chundatt via wangda) - YARN-4502. Fix two AM containers get allocated when AM restart. - (Vinod Kumar Vavilapalli via wangda) - Release 2.7.3 - UNRELEASED INCOMPATIBLE CHANGES http://git-wip-us.apache.org/repos/asf/hadoop/blob/adf260a7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java index c68bb8e..5df2be8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java @@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; @@ -50,7 +51,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Capacity import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.SystemClock; @@ -257,7 +257,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic // kill it rmContext.getDispatcher().getEventHandler().handle( new ContainerPreemptEvent(appAttemptId, container, - SchedulerEventType.KILL_PREEMPTED_CONTAINER)); + SchedulerEventType.KILL_CONTAINER)); preempted.remove(container); } else { if (preempted.get(container) != null) { @@ -764,7 +764,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic if (!observeOnly) { rmContext.getDispatcher().getEventHandler().handle( new ContainerPreemptEvent( - appId, c, SchedulerEventType.KILL_RESERVED_CONTAINER)); + appId, c, SchedulerEventType.DROP_RESERVATION)); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/adf260a7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java index 83876d0..96c4f27 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java @@ -49,6 +49,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptE import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerRescheduledEvent; import org.apache.hadoop.yarn.state.InvalidStateTransitionException; import org.apache.hadoop.yarn.state.MultipleArcTransition; import org.apache.hadoop.yarn.state.SingleArcTransition; @@ -96,7 +97,7 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> { .addTransition(RMContainerState.ALLOCATED, RMContainerState.EXPIRED, RMContainerEventType.EXPIRE, new FinishedTransition()) .addTransition(RMContainerState.ALLOCATED, RMContainerState.KILLED, - RMContainerEventType.KILL, new FinishedTransition()) + RMContainerEventType.KILL, new ContainerRescheduledTransition()) // Transitions from ACQUIRED state .addTransition(RMContainerState.ACQUIRED, RMContainerState.RUNNING, @@ -520,8 +521,7 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> { @Override public void transition(RMContainerImpl container, RMContainerEvent event) { - // Clear ResourceRequest stored in RMContainer, we don't need to remember - // this anymore. + // Clear ResourceRequest stored in RMContainer container.setResourceRequests(null); // Register with containerAllocationExpirer. @@ -597,6 +597,17 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> { } } + private static final class ContainerRescheduledTransition extends + FinishedTransition { + + @Override + public void transition(RMContainerImpl container, RMContainerEvent event) { + // Tell scheduler to recover request of this container to app + container.eventHandler.handle(new ContainerRescheduledEvent(container)); + super.transition(container, event); + } + } + private static class FinishedTransition extends BaseTransition { @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/adf260a7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 41a04f2..ed93acf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -511,28 +511,20 @@ public abstract class AbstractYarnScheduler * Recover resource request back from RMContainer when a container is * preempted before AM pulled the same. If container is pulled by * AM, then RMContainer will not have resource request to recover. - * @param rmContainer rmContainer + * @param rmContainer */ - private void recoverResourceRequestForContainer(RMContainer rmContainer) { + protected void recoverResourceRequestForContainer(RMContainer rmContainer) { List<ResourceRequest> requests = rmContainer.getResourceRequests(); // If container state is moved to ACQUIRED, request will be empty. if (requests == null) { return; } - - // Add resource request back to Scheduler ApplicationAttempt. - - // We lookup the application-attempt here again using - // getCurrentApplicationAttempt() because there is only one app-attempt at - // any point in the scheduler. But in corner cases, AMs can crash, - // corresponding containers get killed and recovered to the same-attempt, - // but because the app-attempt is extinguished right after, the recovered - // requests don't serve any purpose, but that's okay. - SchedulerApplicationAttempt schedulerAttempt = - getCurrentAttemptForContainer(rmContainer.getContainerId()); + // Add resource request back to Scheduler. + SchedulerApplicationAttempt schedulerAttempt + = getCurrentAttemptForContainer(rmContainer.getContainerId()); if (schedulerAttempt != null) { - schedulerAttempt.recoverResourceRequestsForContainer(requests); + schedulerAttempt.recoverResourceRequests(requests); } } @@ -567,30 +559,8 @@ public abstract class AbstractYarnScheduler } } - @VisibleForTesting - @Private - // clean up a completed container - public void completedContainer(RMContainer rmContainer, - ContainerStatus containerStatus, RMContainerEventType event) { - - if (rmContainer == null) { - LOG.info("Container " + containerStatus.getContainerId() - + " completed with event " + event - + ", but corresponding RMContainer doesn't exist."); - return; - } - - completedContainerInternal(rmContainer, containerStatus, event); - - // If the container is getting killed in ACQUIRED state, the requester (AM - // for regular containers and RM itself for AM container) will not know what - // happened. Simply add the ResourceRequest back again so that requester - // doesn't need to do anything conditionally. - recoverResourceRequestForContainer(rmContainer); - } - // clean up a completed container - protected abstract void completedContainerInternal(RMContainer rmContainer, + protected abstract void completedContainer(RMContainer rmContainer, ContainerStatus containerStatus, RMContainerEventType event); protected void releaseContainers(List<ContainerId> containers, http://git-wip-us.apache.org/repos/asf/hadoop/blob/adf260a7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java index 631b418..973e9d3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java @@ -289,15 +289,12 @@ public class AppSchedulingInfo { * application, by asking for more resources and releasing resources acquired * by the application. * - * @param requests - * resources to be acquired - * @param recoverPreemptedRequestForAContainer - * recover ResourceRequest on preemption + * @param requests resources to be acquired + * @param recoverPreemptedRequest recover ResourceRequest on preemption * @return true if any resource was updated, false otherwise */ public synchronized boolean updateResourceRequests( - List<ResourceRequest> requests, - boolean recoverPreemptedRequestForAContainer) { + List<ResourceRequest> requests, boolean recoverPreemptedRequest) { // Flag to track if any incoming requests update "ANY" requests boolean anyResourcesUpdated = false; @@ -318,7 +315,7 @@ public class AppSchedulingInfo { // Increment number of containers if recovering preempted resources ResourceRequest lastRequest = asks.get(resourceName); - if (recoverPreemptedRequestForAContainer && lastRequest != null) { + if (recoverPreemptedRequest && lastRequest != null) { request.setNumContainers(lastRequest.getNumContainers() + 1); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/adf260a7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerPreemptEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerPreemptEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerPreemptEvent.java new file mode 100644 index 0000000..7ab2758 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerPreemptEvent.java @@ -0,0 +1,57 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; + +/** + * Simple event class used to communicate containers unreservations, preemption, killing + */ +public class ContainerPreemptEvent extends SchedulerEvent { + + private final ApplicationAttemptId aid; + private final RMContainer container; + + public ContainerPreemptEvent(ApplicationAttemptId aid, RMContainer container, + SchedulerEventType type) { + super(type); + this.aid = aid; + this.container = container; + } + + public RMContainer getContainer(){ + return this.container; + } + + public ApplicationAttemptId getAppId() { + return aid; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(super.toString()); + sb.append(" ").append(getAppId()); + sb.append(" ").append(getContainer().getContainerId()); + return sb.toString(); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/adf260a7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/PreemptableResourceScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/PreemptableResourceScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/PreemptableResourceScheduler.java index 5a37295..c89696d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/PreemptableResourceScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/PreemptableResourceScheduler.java @@ -31,7 +31,7 @@ public interface PreemptableResourceScheduler extends ResourceScheduler { * ask the scheduler to drop the reservation for the given container. * @param container Reference to reserved container allocation. */ - void killReservedContainer(RMContainer container); + void dropContainerReservation(RMContainer container); /** * Ask the scheduler to obtain back the container from a specific application @@ -45,6 +45,6 @@ public interface PreemptableResourceScheduler extends ResourceScheduler { * Ask the scheduler to forcibly interrupt the container given as input * @param container */ - void killPreemptedContainer(RMContainer container); + void killContainer(RMContainer container); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/adf260a7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index d91c79e..b43c106 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -322,7 +322,7 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { return false; } - public synchronized void recoverResourceRequestsForContainer( + public synchronized void recoverResourceRequests( List<ResourceRequest> requests) { if (!isStopped) { appSchedulingInfo.updateResourceRequests(requests, true); http://git-wip-us.apache.org/repos/asf/hadoop/blob/adf260a7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 26b6a2b..84b7d9b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -93,6 +93,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeDecreaseContai import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; @@ -114,14 +115,13 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptA import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerRescheduledEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeLabelsUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.utils.Lock; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; @@ -865,7 +865,7 @@ public class CapacityScheduler extends LOG.info("Skip killing " + rmContainer.getContainerId()); continue; } - super.completedContainer( + completedContainer( rmContainer, SchedulerUtils.createAbnormalContainerStatus( rmContainer.getContainerId(), SchedulerUtils.COMPLETED_APPLICATION), @@ -874,7 +874,7 @@ public class CapacityScheduler extends // Release all reserved containers for (RMContainer rmContainer : attempt.getReservedContainers()) { - super.completedContainer( + completedContainer( rmContainer, SchedulerUtils.createAbnormalContainerStatus( rmContainer.getContainerId(), "Application Complete"), @@ -1047,7 +1047,7 @@ public class CapacityScheduler extends for (ContainerStatus completedContainer : completedContainers) { ContainerId containerId = completedContainer.getContainerId(); RMContainer container = getRMContainer(containerId); - super.completedContainer(container, completedContainer, + completedContainer(container, completedContainer, RMContainerEventType.FINISHED); if (container != null) { releasedContainers++; @@ -1128,7 +1128,7 @@ public class CapacityScheduler extends // Unreserve container on this node RMContainer reservedContainer = node.getReservedContainer(); if (null != reservedContainer) { - killReservedContainer(reservedContainer); + dropContainerReservation(reservedContainer); } // Update node labels after we've done this @@ -1372,19 +1372,18 @@ public class CapacityScheduler extends ContainerExpiredSchedulerEvent containerExpiredEvent = (ContainerExpiredSchedulerEvent) event; ContainerId containerId = containerExpiredEvent.getContainerId(); - super.completedContainer(getRMContainer(containerId), + completedContainer(getRMContainer(containerId), SchedulerUtils.createAbnormalContainerStatus( containerId, SchedulerUtils.EXPIRED_CONTAINER), RMContainerEventType.EXPIRE); } break; - case KILL_RESERVED_CONTAINER: + case DROP_RESERVATION: { - ContainerPreemptEvent killReservedContainerEvent = - (ContainerPreemptEvent) event; - RMContainer container = killReservedContainerEvent.getContainer(); - killReservedContainer(container); + ContainerPreemptEvent dropReservationEvent = (ContainerPreemptEvent)event; + RMContainer container = dropReservationEvent.getContainer(); + dropContainerReservation(container); } break; case PREEMPT_CONTAINER: @@ -1396,11 +1395,19 @@ public class CapacityScheduler extends preemptContainer(aid, containerToBePreempted); } break; - case KILL_PREEMPTED_CONTAINER: + case KILL_CONTAINER: { ContainerPreemptEvent killContainerEvent = (ContainerPreemptEvent)event; RMContainer containerToBeKilled = killContainerEvent.getContainer(); - killPreemptedContainer(containerToBeKilled); + killContainer(containerToBeKilled); + } + break; + case CONTAINER_RESCHEDULED: + { + ContainerRescheduledEvent containerRescheduledEvent = + (ContainerRescheduledEvent) event; + RMContainer container = containerRescheduledEvent.getContainer(); + recoverResourceRequestForContainer(container); } break; default: @@ -1455,7 +1462,7 @@ public class CapacityScheduler extends // Remove running containers List<RMContainer> runningContainers = node.getRunningContainers(); for (RMContainer container : runningContainers) { - super.completedContainer(container, + completedContainer(container, SchedulerUtils.createAbnormalContainerStatus( container.getContainerId(), SchedulerUtils.LOST_CONTAINER), @@ -1465,7 +1472,7 @@ public class CapacityScheduler extends // Remove reservations, if any RMContainer reservedContainer = node.getReservedContainer(); if (reservedContainer != null) { - super.completedContainer(reservedContainer, + completedContainer(reservedContainer, SchedulerUtils.createAbnormalContainerStatus( reservedContainer.getContainerId(), SchedulerUtils.LOST_CONTAINER), @@ -1481,8 +1488,13 @@ public class CapacityScheduler extends @Lock(CapacityScheduler.class) @Override - protected synchronized void completedContainerInternal(RMContainer rmContainer, + protected synchronized void completedContainer(RMContainer rmContainer, ContainerStatus containerStatus, RMContainerEventType event) { + if (rmContainer == null) { + LOG.info("Container " + containerStatus.getContainerId() + + " completed with event " + event); + return; + } Container container = rmContainer.getContainer(); @@ -1584,14 +1596,11 @@ public class CapacityScheduler extends } @Override - public void killReservedContainer(RMContainer container) { + public void dropContainerReservation(RMContainer container) { if(LOG.isDebugEnabled()){ - LOG.debug(SchedulerEventType.KILL_RESERVED_CONTAINER + ":" - + container.toString()); + LOG.debug("DROP_RESERVATION:" + container.toString()); } - // TODO: What happens if this is no longer a reserved container, for e.g if - // the reservation became an allocation. - super.completedContainer(container, + completedContainer(container, SchedulerUtils.createAbnormalContainerStatus( container.getContainerId(), SchedulerUtils.UNRESERVED_CONTAINER), @@ -1601,24 +1610,23 @@ public class CapacityScheduler extends @Override public void preemptContainer(ApplicationAttemptId aid, RMContainer cont) { if(LOG.isDebugEnabled()){ - LOG.debug(SchedulerEventType.PREEMPT_CONTAINER + ": appAttempt:" - + aid.toString() + " container: " + cont.toString()); + LOG.debug("PREEMPT_CONTAINER: application:" + aid.toString() + + " container: " + cont.toString()); } FiCaSchedulerApp app = getApplicationAttempt(aid); if (app != null) { - app.preemptContainer(cont.getContainerId()); + app.addPreemptContainer(cont.getContainerId()); } } @Override - public void killPreemptedContainer(RMContainer cont) { + public void killContainer(RMContainer cont) { if (LOG.isDebugEnabled()) { - LOG.debug(SchedulerEventType.KILL_PREEMPTED_CONTAINER + ": container" - + cont.toString()); + LOG.debug("KILL_CONTAINER: container" + cont.toString()); } - super.completedContainer(cont, SchedulerUtils - .createPreemptedContainerStatus(cont.getContainerId(), - SchedulerUtils.PREEMPTED_CONTAINER), RMContainerEventType.KILL); + completedContainer(cont, SchedulerUtils.createPreemptedContainerStatus( + cont.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER), + RMContainerEventType.KILL); } @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/adf260a7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index 3960293..4b88415 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -301,7 +301,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { return ret; } - public synchronized void preemptContainer(ContainerId cont) { + public synchronized void addPreemptContainer(ContainerId cont) { // ignore already completed containers if (liveContainers.containsKey(cont)) { containersToPreempt.add(cont); http://git-wip-us.apache.org/repos/asf/hadoop/blob/adf260a7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/ContainerPreemptEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/ContainerPreemptEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/ContainerPreemptEvent.java deleted file mode 100644 index 4b0be0c..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/ContainerPreemptEvent.java +++ /dev/null @@ -1,55 +0,0 @@ -/** -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event; - -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; - -/** - * Simple event class used to communicate containers unreservations, preemption, killing - */ -public class ContainerPreemptEvent extends SchedulerEvent { - - private final ApplicationAttemptId aid; - private final RMContainer container; - - public ContainerPreemptEvent(ApplicationAttemptId aid, RMContainer container, - SchedulerEventType type) { - super(type); - this.aid = aid; - this.container = container; - } - - public RMContainer getContainer(){ - return this.container; - } - - public ApplicationAttemptId getAppId() { - return aid; - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(super.toString()); - sb.append(" ").append(getAppId()); - sb.append(" ").append(getContainer().getContainerId()); - return sb.toString(); - } - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/adf260a7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/ContainerRescheduledEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/ContainerRescheduledEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/ContainerRescheduledEvent.java new file mode 100644 index 0000000..de2ce36 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/ContainerRescheduledEvent.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event; + +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; + +public class ContainerRescheduledEvent extends SchedulerEvent { + + private RMContainer container; + + public ContainerRescheduledEvent(RMContainer container) { + super(SchedulerEventType.CONTAINER_RESCHEDULED); + this.container = container; + } + + public RMContainer getContainer() { + return container; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/adf260a7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java index edc148f..40dd66b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java @@ -38,9 +38,11 @@ public enum SchedulerEventType { // Source: ContainerAllocationExpirer CONTAINER_EXPIRED, + // Source: RMContainer + CONTAINER_RESCHEDULED, + // Source: SchedulingEditPolicy - KILL_RESERVED_CONTAINER, - PREEMPT_CONTAINER, // Mark a container for preemption in the near future - KILL_PREEMPTED_CONTAINER // Kill a container previously marked for - // preemption + DROP_RESERVATION, + PREEMPT_CONTAINER, + KILL_CONTAINER } http://git-wip-us.apache.org/repos/asf/hadoop/blob/adf260a7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 4df47cc..9c16e49 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -85,6 +85,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptA import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerRescheduledEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent; @@ -497,7 +498,7 @@ public class FairScheduler extends // TODO: Not sure if this ever actually adds this to the list of cleanup // containers on the RMNode (see SchedulerNode.releaseContainer()). - super.completedContainer(container, status, RMContainerEventType.KILL); + completedContainer(container, status, RMContainerEventType.KILL); LOG.info("Killing container" + container + " (after waiting for preemption for " + (getClock().getTime() - time) + "ms)"); @@ -806,7 +807,7 @@ public class FairScheduler extends LOG.info("Skip killing " + rmContainer.getContainerId()); continue; } - super.completedContainer(rmContainer, + completedContainer(rmContainer, SchedulerUtils.createAbnormalContainerStatus( rmContainer.getContainerId(), SchedulerUtils.COMPLETED_APPLICATION), @@ -815,7 +816,7 @@ public class FairScheduler extends // Release all reserved containers for (RMContainer rmContainer : attempt.getReservedContainers()) { - super.completedContainer(rmContainer, + completedContainer(rmContainer, SchedulerUtils.createAbnormalContainerStatus( rmContainer.getContainerId(), "Application Complete"), @@ -842,9 +843,13 @@ public class FairScheduler extends * Clean up a completed container. */ @Override - protected synchronized void completedContainerInternal( - RMContainer rmContainer, ContainerStatus containerStatus, - RMContainerEventType event) { + protected synchronized void completedContainer(RMContainer rmContainer, + ContainerStatus containerStatus, RMContainerEventType event) { + if (rmContainer == null) { + LOG.info("Container " + containerStatus.getContainerId() + + " completed with event " + event); + return; + } Container container = rmContainer.getContainer(); @@ -914,7 +919,7 @@ public class FairScheduler extends // Remove running containers List<RMContainer> runningContainers = node.getRunningContainers(); for (RMContainer container : runningContainers) { - super.completedContainer(container, + completedContainer(container, SchedulerUtils.createAbnormalContainerStatus( container.getContainerId(), SchedulerUtils.LOST_CONTAINER), @@ -924,7 +929,7 @@ public class FairScheduler extends // Remove reservations, if any RMContainer reservedContainer = node.getReservedContainer(); if (reservedContainer != null) { - super.completedContainer(reservedContainer, + completedContainer(reservedContainer, SchedulerUtils.createAbnormalContainerStatus( reservedContainer.getContainerId(), SchedulerUtils.LOST_CONTAINER), @@ -1052,7 +1057,7 @@ public class FairScheduler extends for (ContainerStatus completedContainer : completedContainers) { ContainerId containerId = completedContainer.getContainerId(); LOG.debug("Container FINISHED: " + containerId); - super.completedContainer(getRMContainer(containerId), + completedContainer(getRMContainer(containerId), completedContainer, RMContainerEventType.FINISHED); } @@ -1297,12 +1302,21 @@ public class FairScheduler extends ContainerExpiredSchedulerEvent containerExpiredEvent = (ContainerExpiredSchedulerEvent)event; ContainerId containerId = containerExpiredEvent.getContainerId(); - super.completedContainer(getRMContainer(containerId), + completedContainer(getRMContainer(containerId), SchedulerUtils.createAbnormalContainerStatus( containerId, SchedulerUtils.EXPIRED_CONTAINER), RMContainerEventType.EXPIRE); break; + case CONTAINER_RESCHEDULED: + if (!(event instanceof ContainerRescheduledEvent)) { + throw new RuntimeException("Unexpected event type: " + event); + } + ContainerRescheduledEvent containerRescheduledEvent = + (ContainerRescheduledEvent) event; + RMContainer container = containerRescheduledEvent.getContainer(); + recoverResourceRequestForContainer(container); + break; default: LOG.error("Unknown event arrived at FairScheduler: " + event.toString()); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/adf260a7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index 5787ba6..8e75d11 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -74,10 +74,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; @@ -86,6 +86,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptA import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerRescheduledEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent; @@ -467,7 +468,7 @@ public class FifoScheduler extends LOG.info("Skip killing " + container.getContainerId()); continue; } - super.completedContainer(container, + completedContainer(container, SchedulerUtils.createAbnormalContainerStatus( container.getContainerId(), SchedulerUtils.COMPLETED_APPLICATION), RMContainerEventType.KILL); @@ -738,7 +739,7 @@ public class FifoScheduler extends for (ContainerStatus completedContainer : completedContainers) { ContainerId containerId = completedContainer.getContainerId(); LOG.debug("Container FINISHED: " + containerId); - super.completedContainer(getRMContainer(containerId), + completedContainer(getRMContainer(containerId), completedContainer, RMContainerEventType.FINISHED); } @@ -857,13 +858,21 @@ public class FifoScheduler extends ContainerExpiredSchedulerEvent containerExpiredEvent = (ContainerExpiredSchedulerEvent) event; ContainerId containerid = containerExpiredEvent.getContainerId(); - super.completedContainer(getRMContainer(containerid), + completedContainer(getRMContainer(containerid), SchedulerUtils.createAbnormalContainerStatus( containerid, SchedulerUtils.EXPIRED_CONTAINER), RMContainerEventType.EXPIRE); } break; + case CONTAINER_RESCHEDULED: + { + ContainerRescheduledEvent containerRescheduledEvent = + (ContainerRescheduledEvent) event; + RMContainer container = containerRescheduledEvent.getContainer(); + recoverResourceRequestForContainer(container); + } + break; default: LOG.error("Invalid eventtype " + event.getType() + ". Ignoring!"); } @@ -871,9 +880,12 @@ public class FifoScheduler extends @Lock(FifoScheduler.class) @Override - protected synchronized void completedContainerInternal( - RMContainer rmContainer, ContainerStatus containerStatus, - RMContainerEventType event) { + protected synchronized void completedContainer(RMContainer rmContainer, + ContainerStatus containerStatus, RMContainerEventType event) { + if (rmContainer == null) { + LOG.info("Null container completed..."); + return; + } // Get the application for the finished container Container container = rmContainer.getContainer(); @@ -919,7 +931,7 @@ public class FifoScheduler extends } // Kill running containers for(RMContainer container : node.getRunningContainers()) { - super.completedContainer(container, + completedContainer(container, SchedulerUtils.createAbnormalContainerStatus( container.getContainerId(), SchedulerUtils.LOST_CONTAINER), http://git-wip-us.apache.org/repos/asf/hadoop/blob/adf260a7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index a5d14c3..0372cd7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -275,8 +275,7 @@ public class MockRM extends ResourceManager { nm.nodeHeartbeat(true); } container = getResourceScheduler().getRMContainer(containerId); - System.out.println("Waiting for container " + containerId + " to be " - + containerState + ", container is null right now."); + System.out.println("Waiting for container " + containerId + " to be allocated."); Thread.sleep(100); if (timeoutMillisecs <= timeoutSecs * 100) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/adf260a7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java index a54aeec..db7c96a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java @@ -29,8 +29,8 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.SchedulerEventDispatcher; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; import org.junit.Assert; @@ -55,11 +55,10 @@ public class TestRMDispatcher { ApplicationAttemptId appAttemptId = mock(ApplicationAttemptId.class); RMContainer container = mock(RMContainer.class); ContainerPreemptEvent event1 = new ContainerPreemptEvent( - appAttemptId, container, SchedulerEventType.KILL_RESERVED_CONTAINER); + appAttemptId, container, SchedulerEventType.DROP_RESERVATION); rmDispatcher.getEventHandler().handle(event1); - ContainerPreemptEvent event2 = - new ContainerPreemptEvent(appAttemptId, container, - SchedulerEventType.KILL_PREEMPTED_CONTAINER); + ContainerPreemptEvent event2 = new ContainerPreemptEvent( + appAttemptId, container, SchedulerEventType.KILL_CONTAINER); rmDispatcher.getEventHandler().handle(event2); ContainerPreemptEvent event3 = new ContainerPreemptEvent( appAttemptId, container, SchedulerEventType.PREEMPT_CONTAINER); @@ -67,9 +66,9 @@ public class TestRMDispatcher { // Wait for events to be processed by scheduler dispatcher. Thread.sleep(1000); verify(sched, times(3)).handle(any(SchedulerEvent.class)); - verify(sched).killReservedContainer(container); + verify(sched).dropContainerReservation(container); verify(sched).preemptContainer(appAttemptId, container); - verify(sched).killPreemptedContainer(container); + verify(sched).killContainer(container); } catch (InterruptedException e) { Assert.fail(); } finally { http://git-wip-us.apache.org/repos/asf/hadoop/blob/adf260a7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java index 3d0c823..f1fe1ea 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java @@ -566,7 +566,7 @@ public class TestAMRestart { ContainerId amContainer = ContainerId.newContainerId(am1.getApplicationAttemptId(), 1); // Preempt the first attempt; - scheduler.killPreemptedContainer(scheduler.getRMContainer(amContainer)); + scheduler.killContainer(scheduler.getRMContainer(amContainer)); am1.waitForState(RMAppAttemptState.FAILED); Assert.assertTrue(! attempt1.shouldCountTowardsMaxAttemptRetry()); @@ -582,7 +582,7 @@ public class TestAMRestart { // Preempt the second attempt. ContainerId amContainer2 = ContainerId.newContainerId(am2.getApplicationAttemptId(), 1); - scheduler.killPreemptedContainer(scheduler.getRMContainer(amContainer2)); + scheduler.killContainer(scheduler.getRMContainer(amContainer2)); am2.waitForState(RMAppAttemptState.FAILED); Assert.assertTrue(! attempt2.shouldCountTowardsMaxAttemptRetry()); @@ -677,7 +677,7 @@ public class TestAMRestart { ContainerId.newContainerId(am1.getApplicationAttemptId(), 1); // Forcibly preempt the am container; - scheduler.killPreemptedContainer(scheduler.getRMContainer(amContainer)); + scheduler.killContainer(scheduler.getRMContainer(amContainer)); am1.waitForState(RMAppAttemptState.FAILED); Assert.assertTrue(! attempt1.shouldCountTowardsMaxAttemptRetry()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/adf260a7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java index d96f09c..7a3ce56 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java @@ -23,7 +23,7 @@ import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.Pro import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.OBSERVE_ONLY; import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.TOTAL_PREEMPTION_PER_ROUND; import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.WAIT_TIME_BEFORE_KILL; -import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType.KILL_PREEMPTED_CONTAINER; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType.KILL_CONTAINER; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType.PREEMPT_CONTAINER; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -67,6 +67,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.Proportion import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.resource.Priority; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; @@ -76,7 +77,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueu import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy; @@ -289,7 +289,7 @@ public class TestProportionalCapacityPreemptionPolicy { List<ContainerPreemptEvent> events = evtCaptor.getAllValues(); for (ContainerPreemptEvent e : events.subList(20, 20)) { assertEquals(appC, e.getAppId()); - assertEquals(KILL_PREEMPTED_CONTAINER, e.getType()); + assertEquals(KILL_CONTAINER, e.getType()); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/adf260a7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java index fc2d9c4..7c33f78 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java @@ -29,7 +29,6 @@ import java.util.List; import java.util.Map; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; @@ -47,15 +46,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; -import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.Assert; import org.junit.Test; @@ -498,114 +493,6 @@ public class TestAbstractYarnScheduler extends ParameterizedSchedulerTestBase { } } - /** - * Test to verify that ResourceRequests recovery back to the right app-attempt - * after a container gets killed at ACQUIRED state: YARN-4502. - * - * @throws Exception - */ - @Test - public void testResourceRequestRecoveryToTheRightAppAttempt() - throws Exception { - - configureScheduler(); - YarnConfiguration conf = getConf(); - MockRM rm = new MockRM(conf); - try { - rm.start(); - RMApp rmApp = - rm.submitApp(200, "name", "user", - new HashMap<ApplicationAccessType, String>(), false, "default", -1, - null, "Test", false, true); - MockNM node = - new MockNM("127.0.0.1:1234", 10240, rm.getResourceTrackerService()); - node.registerNode(); - - MockAM am1 = MockRM.launchAndRegisterAM(rmApp, rm, node); - ApplicationAttemptId applicationAttemptOneID = - am1.getApplicationAttemptId(); - ContainerId am1ContainerID = - ContainerId.newContainerId(applicationAttemptOneID, 1); - - // allocate NUM_CONTAINERS containers - am1.allocate("127.0.0.1", 1024, 1, new ArrayList<ContainerId>()); - node.nodeHeartbeat(true); - - // wait for containers to be allocated. - List<Container> containers = - am1.allocate(new ArrayList<ResourceRequest>(), - new ArrayList<ContainerId>()).getAllocatedContainers(); - while (containers.size() != 1) { - node.nodeHeartbeat(true); - containers.addAll(am1.allocate(new ArrayList<ResourceRequest>(), - new ArrayList<ContainerId>()).getAllocatedContainers()); - Thread.sleep(200); - } - - // launch a 2nd container, for testing running-containers transfer. - node.nodeHeartbeat(applicationAttemptOneID, 2, ContainerState.RUNNING); - ContainerId runningContainerID = - ContainerId.newContainerId(applicationAttemptOneID, 2); - rm.waitForState(node, runningContainerID, RMContainerState.RUNNING); - - // 3rd container is in Allocated state. - int ALLOCATED_CONTAINER_PRIORITY = 1047; - am1.allocate("127.0.0.1", 1024, 1, ALLOCATED_CONTAINER_PRIORITY, - new ArrayList<ContainerId>(), null); - node.nodeHeartbeat(true); - ContainerId allocatedContainerID = - ContainerId.newContainerId(applicationAttemptOneID, 3); - rm.waitForContainerAllocated(node, allocatedContainerID); - rm.waitForState(node, allocatedContainerID, RMContainerState.ALLOCATED); - RMContainer allocatedContainer = - rm.getResourceScheduler().getRMContainer(allocatedContainerID); - - // Capture scheduler app-attempt before AM crash. - SchedulerApplicationAttempt firstSchedulerAppAttempt = - ((AbstractYarnScheduler<SchedulerApplicationAttempt, SchedulerNode>) rm - .getResourceScheduler()) - .getApplicationAttempt(applicationAttemptOneID); - - // AM crashes, and a new app-attempt gets created - node.nodeHeartbeat(applicationAttemptOneID, 1, ContainerState.COMPLETE); - rm.waitForState(node, am1ContainerID, RMContainerState.COMPLETED); - RMAppAttempt rmAppAttempt2 = MockRM.waitForAttemptScheduled(rmApp, rm); - ApplicationAttemptId applicationAttemptTwoID = - rmAppAttempt2.getAppAttemptId(); - Assert.assertEquals(2, applicationAttemptTwoID.getAttemptId()); - - // All outstanding allocated containers will be killed (irrespective of - // keep-alive of container across app-attempts) - Assert.assertEquals(RMContainerState.KILLED, - allocatedContainer.getState()); - - // The core part of this test - // The killed containers' ResourceRequests are recovered back to the - // original app-attempt, not the new one - for (ResourceRequest request : firstSchedulerAppAttempt - .getAppSchedulingInfo().getAllResourceRequests()) { - if (request.getPriority().getPriority() == 0) { - Assert.assertEquals(0, request.getNumContainers()); - } else if (request.getPriority().getPriority() == ALLOCATED_CONTAINER_PRIORITY) { - Assert.assertEquals(1, request.getNumContainers()); - } - } - - // Also, only one running container should be transferred after AM - // launches - MockRM.launchAM(rmApp, rm, node); - List<Container> transferredContainers = - rm.getResourceScheduler().getTransferredContainers( - applicationAttemptTwoID); - Assert.assertEquals(1, transferredContainers.size()); - Assert.assertEquals(runningContainerID, transferredContainers.get(0) - .getId()); - - } finally { - rm.stop(); - } - } - private void verifyMaximumResourceCapability( Resource expectedMaximumResource, YarnScheduler scheduler) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/adf260a7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java index e32a33b..2ad805a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java @@ -205,7 +205,7 @@ public class TestApplicationPriority { if (++counter > 2) { break; } - cs.killPreemptedContainer(schedulerAppAttempt.getRMContainer(c.getId())); + cs.killContainer(schedulerAppAttempt.getRMContainer(c.getId())); } // check node report, 12 GB used and 4 GB available @@ -513,7 +513,7 @@ public class TestApplicationPriority { if (++counter > 2) { break; } - cs.killPreemptedContainer(schedulerAppAttemptApp1.getRMContainer(c.getId())); + cs.killContainer(schedulerAppAttemptApp1.getRMContainer(c.getId())); iterator.remove(); } @@ -543,7 +543,7 @@ public class TestApplicationPriority { if (++counter > 1) { break; } - cs.killPreemptedContainer(schedulerAppAttemptApp1.getRMContainer(c.getId())); + cs.killContainer(schedulerAppAttemptApp1.getRMContainer(c.getId())); iterator.remove(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/adf260a7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index e139df6..7c95cdc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -1170,7 +1170,7 @@ public class TestCapacityScheduler { // kill the 3 containers for (Container c : allocatedContainers) { - cs.killPreemptedContainer(schedulerAppAttempt.getRMContainer(c.getId())); + cs.killContainer(schedulerAppAttempt.getRMContainer(c.getId())); } // check values @@ -1179,7 +1179,7 @@ public class TestCapacityScheduler { Resource.newInstance(CONTAINER_MEMORY * 3, 3), false, 3); // kill app0-attempt0 AM container - cs.killPreemptedContainer(schedulerAppAttempt.getRMContainer(app0 + cs.killContainer(schedulerAppAttempt.getRMContainer(app0 .getCurrentAppAttempt().getMasterContainer().getId())); // wait for app0 failed @@ -1202,7 +1202,7 @@ public class TestCapacityScheduler { allocatedContainers = am1.allocateAndWaitForContainers(3, CONTAINER_MEMORY, nm1); for (Container c : allocatedContainers) { - cs.killPreemptedContainer(schedulerAppAttempt.getRMContainer(c.getId())); + cs.killContainer(schedulerAppAttempt.getRMContainer(c.getId())); } // check values @@ -1251,7 +1251,7 @@ public class TestCapacityScheduler { } // Call killContainer to preempt the container - cs.killPreemptedContainer(rmContainer); + cs.killContainer(rmContainer); Assert.assertEquals(3, requests.size()); for (ResourceRequest request : requests) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/adf260a7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 1b1418a..430eba7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -18,11 +18,13 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; +import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.isA; @@ -50,7 +52,6 @@ import javax.xml.parsers.ParserConfigurationException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; -import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.security.GroupMappingServiceProvider; import org.apache.hadoop.yarn.MockApps; @@ -94,11 +95,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSch import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerRescheduledEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.QueuePlacementRule.Default; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy; @@ -4735,11 +4735,11 @@ public class TestFairScheduler extends FairSchedulerTestBase { } } } - + @Test(timeout = 5000) public void testRecoverRequestAfterPreemption() throws Exception { conf.setLong(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 10); - + ControlledClock clock = new ControlledClock(); scheduler.setClock(clock); scheduler.init(conf); @@ -4779,7 +4779,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { assertEquals(1, scheduler.getSchedulerApp(appAttemptId).getLiveContainers() .size()); - SchedulerApplicationAttempt app = scheduler.getSchedulerApp(appAttemptId); + FSAppAttempt app = scheduler.getSchedulerApp(appAttemptId); // ResourceRequest will be empty once NodeUpdate is completed Assert.assertNull(app.getResourceRequest(priority, host)); @@ -4797,8 +4797,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { scheduler.warnOrKillContainer(rmContainer); // Trigger container rescheduled event - scheduler.handle(new ContainerPreemptEvent(appAttemptId, rmContainer, - SchedulerEventType.KILL_PREEMPTED_CONTAINER)); + scheduler.handle(new ContainerRescheduledEvent(rmContainer)); List<ResourceRequest> requests = rmContainer.getResourceRequests(); // Once recovered, resource request will be present again in app @@ -4821,6 +4820,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { Assert.assertTrue(containers.size() == 1); } + @SuppressWarnings("resource") @Test public void testBlacklistNodes() throws Exception { scheduler.init(conf);
