YARN-2888. Corrective mechanisms for rebalancing NM container queues. (asuresh)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f0ac18d0 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f0ac18d0 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f0ac18d0 Branch: refs/heads/trunk Commit: f0ac18d001d97914a9ee810b1fab56c5cebff830 Parents: 1f2794b Author: Arun Suresh <asur...@apache.org> Authored: Fri May 13 13:38:36 2016 -0700 Committer: Arun Suresh <asur...@apache.org> Committed: Fri May 13 13:38:36 2016 -0700 ---------------------------------------------------------------------- .../hadoop/yarn/conf/YarnConfiguration.java | 48 +++- .../yarn/conf/TestYarnConfigurationFields.java | 14 +- .../protocolrecords/NodeHeartbeatResponse.java | 4 + .../impl/pb/NodeHeartbeatResponsePBImpl.java | 42 +++ .../api/records/ContainerQueuingLimit.java | 44 +++ .../impl/pb/ContainerQueuingLimitPBImpl.java | 80 ++++++ .../yarn_server_common_service_protos.proto | 6 + .../hadoop/yarn/server/nodemanager/Context.java | 4 +- .../yarn/server/nodemanager/NodeManager.java | 8 +- .../nodemanager/NodeStatusUpdaterImpl.java | 171 ++++++------ .../containermanager/ContainerManager.java | 42 +++ .../containermanager/ContainerManagerImpl.java | 12 +- .../queuing/QueuingContainerManagerImpl.java | 38 +++ .../amrmproxy/BaseAMRMProxyTest.java | 3 +- .../server/resourcemanager/ClusterMonitor.java | 6 +- .../DistributedSchedulingService.java | 194 +++++++------ .../yarn/server/resourcemanager/RMContext.java | 4 + .../server/resourcemanager/RMContextImpl.java | 14 + .../server/resourcemanager/ResourceManager.java | 2 + .../resourcemanager/ResourceTrackerService.java | 7 + .../distributed/NodeQueueLoadMonitor.java | 271 +++++++++++++++++++ .../distributed/QueueLimitCalculator.java | 125 +++++++++ .../scheduler/distributed/TopKNodeSelector.java | 223 --------------- .../distributed/TestNodeQueueLoadMonitor.java | 195 +++++++++++++ .../distributed/TestTopKNodeSelector.java | 147 ---------- 25 files changed, 1156 insertions(+), 548 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0ac18d0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index e9c8ea9..0b150c2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -344,17 +344,47 @@ public class YarnConfiguration extends Configuration { YARN_PREFIX + "distributed-scheduling.top-k"; public static final int DIST_SCHEDULING_TOP_K_DEFAULT = 10; - /** Frequency for computing Top K Best Nodes */ - public static final String DIST_SCHEDULING_TOP_K_COMPUTE_INT_MS = - YARN_PREFIX + "distributed-scheduling.top-k-compute-interval-ms"; - public static final long DIST_SCHEDULING_TOP_K_COMPUTE_INT_MS_DEFAULT = 1000; - - /** Comparator for determining Node Load for Distributed Scheduling */ - public static final String DIST_SCHEDULING_TOP_K_COMPARATOR = - YARN_PREFIX + "distributed-scheduling.top-k-comparator"; - public static final String DIST_SCHEDULING_TOP_K_COMPARATOR_DEFAULT = + /** Frequency for computing least loaded NMs. */ + public static final String NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS = + YARN_PREFIX + "nm-container-queuing.sorting-nodes-interval-ms"; + public static final long + NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS_DEFAULT = 1000; + + /** Comparator for determining Node Load for Distributed Scheduling. */ + public static final String NM_CONTAINER_QUEUING_LOAD_COMPARATOR = + YARN_PREFIX + "nm-container-queuing.load-comparator"; + public static final String NM_CONTAINER_QUEUING_LOAD_COMPARATOR_DEFAULT = "QUEUE_LENGTH"; + /** Value of standard deviation used for calculation of queue limit + * thresholds. */ + public static final String NM_CONTAINER_QUEUING_LIMIT_STDEV = + YARN_PREFIX + "nm-container-queuing.queue-limit-stdev"; + public static final float NM_CONTAINER_QUEUING_LIMIT_STDEV_DEFAULT = + 1.0f; + + /** Min length of container queue at NodeManager. */ + public static final String NM_CONTAINER_QUEUING_MIN_QUEUE_LENGTH = + YARN_PREFIX + "nm-container-queuing.min-queue-length"; + public static final int NM_CONTAINER_QUEUING_MIN_QUEUE_LENGTH_DEFAULT = 1; + + /** Max length of container queue at NodeManager. */ + public static final String NM_CONTAINER_QUEUING_MAX_QUEUE_LENGTH = + YARN_PREFIX + "nm-container-queuing.max-queue-length"; + public static final int NM_CONTAINER_QUEUING_MAX_QUEUE_LENGTH_DEFAULT = 10; + + /** Min wait time of container queue at NodeManager. */ + public static final String NM_CONTAINER_QUEUING_MIN_QUEUE_WAIT_TIME_MS = + YARN_PREFIX + "nm-container-queuing.min-queue-wait-time-ms"; + public static final int NM_CONTAINER_QUEUING_MIN_QUEUE_WAIT_TIME_MS_DEFAULT = + 1; + + /** Max wait time of container queue at NodeManager. */ + public static final String NM_CONTAINER_QUEUING_MAX_QUEUE_WAIT_TIME_MS = + YARN_PREFIX + "nm-container-queuing.max-queue-wait-time-ms"; + public static final int NM_CONTAINER_QUEUING_MAX_QUEUE_WAIT_TIME_MS_DEFAULT = + 10; + /** * Enable/disable intermediate-data encryption at YARN level. For now, this * only is used by the FileSystemRMStateStore to setup right file-system http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0ac18d0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java index c92a276..61b698d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java @@ -135,9 +135,19 @@ public class TestYarnConfigurationFields extends TestConfigurationFieldsBase { configurationPrefixToSkipCompare .add(YarnConfiguration.DIST_SCHEDULING_TOP_K); configurationPrefixToSkipCompare - .add(YarnConfiguration.DIST_SCHEDULING_TOP_K_COMPUTE_INT_MS); + .add(YarnConfiguration.NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS); configurationPrefixToSkipCompare - .add(YarnConfiguration.DIST_SCHEDULING_TOP_K_COMPARATOR); + .add(YarnConfiguration.NM_CONTAINER_QUEUING_LOAD_COMPARATOR); + configurationPrefixToSkipCompare + .add(YarnConfiguration.NM_CONTAINER_QUEUING_MAX_QUEUE_LENGTH); + configurationPrefixToSkipCompare + .add(YarnConfiguration.NM_CONTAINER_QUEUING_MIN_QUEUE_LENGTH); + configurationPrefixToSkipCompare + .add(YarnConfiguration.NM_CONTAINER_QUEUING_MAX_QUEUE_WAIT_TIME_MS); + configurationPrefixToSkipCompare + .add(YarnConfiguration.NM_CONTAINER_QUEUING_MIN_QUEUE_WAIT_TIME_MS); + configurationPrefixToSkipCompare + .add(YarnConfiguration.NM_CONTAINER_QUEUING_LIMIT_STDEV); // Set by container-executor.cfg configurationPrefixToSkipCompare.add(YarnConfiguration.NM_USER_HOME_DIR); http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0ac18d0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java index f8a1320..bd04a0d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java @@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeAction; @@ -78,4 +79,7 @@ public interface NodeHeartbeatResponse { List<Container> getContainersToDecrease(); void addAllContainersToDecrease(Collection<Container> containersToDecrease); + + ContainerQueuingLimit getContainerQueuingLimit(); + void setContainerQueuingLimit(ContainerQueuingLimit containerQueuingLimit); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0ac18d0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java index 224e50b..a259158 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java @@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ContainerQueuingLimitProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerRequestProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeActionProto; @@ -46,8 +47,10 @@ import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatR import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProtoOrBuilder; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SystemCredentialsForAppsProto; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; +import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeAction; +import org.apache.hadoop.yarn.server.api.records.impl.pb.ContainerQueuingLimitPBImpl; import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl; @@ -65,6 +68,7 @@ public class NodeHeartbeatResponsePBImpl extends private MasterKey containerTokenMasterKey = null; private MasterKey nmTokenMasterKey = null; + private ContainerQueuingLimit containerQueuingLimit = null; private List<Container> containersToDecrease = null; private List<SignalContainerRequest> containersToSignal = null; @@ -102,6 +106,10 @@ public class NodeHeartbeatResponsePBImpl extends builder.setNmTokenMasterKey( convertToProtoFormat(this.nmTokenMasterKey)); } + if (this.containerQueuingLimit != null) { + builder.setContainerQueuingLimit( + convertToProtoFormat(this.containerQueuingLimit)); + } if (this.systemCredentials != null) { addSystemCredentialsToProto(); } @@ -197,6 +205,30 @@ public class NodeHeartbeatResponsePBImpl extends } @Override + public ContainerQueuingLimit getContainerQueuingLimit() { + NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder; + if (this.containerQueuingLimit != null) { + return this.containerQueuingLimit; + } + if (!p.hasContainerQueuingLimit()) { + return null; + } + this.containerQueuingLimit = + convertFromProtoFormat(p.getContainerQueuingLimit()); + return this.containerQueuingLimit; + } + + @Override + public void setContainerQueuingLimit(ContainerQueuingLimit + containerQueuingLimit) { + maybeInitBuilder(); + if (containerQueuingLimit == null) { + builder.clearContainerQueuingLimit(); + } + this.containerQueuingLimit = containerQueuingLimit; + } + + @Override public NodeAction getNodeAction() { NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder; if (!p.hasNodeAction()) { @@ -638,6 +670,16 @@ public class NodeHeartbeatResponsePBImpl extends builder.addAllContainersToSignal(iterable); } + private ContainerQueuingLimit convertFromProtoFormat( + ContainerQueuingLimitProto p) { + return new ContainerQueuingLimitPBImpl(p); + } + + private ContainerQueuingLimitProto convertToProtoFormat( + ContainerQueuingLimit c) { + return ((ContainerQueuingLimitPBImpl)c).getProto(); + } + private SignalContainerRequestPBImpl convertFromProtoFormat( SignalContainerRequestProto p) { return new SignalContainerRequestPBImpl(p); http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0ac18d0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/ContainerQueuingLimit.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/ContainerQueuingLimit.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/ContainerQueuingLimit.java new file mode 100644 index 0000000..a8ae927 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/ContainerQueuingLimit.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api.records; + +import org.apache.hadoop.yarn.util.Records; + +/** + * Used to hold max wait time / queue length information to be + * passed back to the NodeManager. + */ +public abstract class ContainerQueuingLimit { + + public static ContainerQueuingLimit newInstance() { + ContainerQueuingLimit containerQueuingLimit = + Records.newRecord(ContainerQueuingLimit.class); + containerQueuingLimit.setMaxQueueLength(-1); + containerQueuingLimit.setMaxQueueWaitTimeInMs(-1); + return containerQueuingLimit; + } + + public abstract int getMaxQueueLength(); + + public abstract void setMaxQueueLength(int queueLength); + + public abstract int getMaxQueueWaitTimeInMs(); + + public abstract void setMaxQueueWaitTimeInMs(int waitTime); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0ac18d0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/ContainerQueuingLimitPBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/ContainerQueuingLimitPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/ContainerQueuingLimitPBImpl.java new file mode 100644 index 0000000..d071a08 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/ContainerQueuingLimitPBImpl.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api.records.impl.pb; + +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ContainerQueuingLimitProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ContainerQueuingLimitProtoOrBuilder; +import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; + +/** + * Implementation of ContainerQueuingLimit interface. + */ +public class ContainerQueuingLimitPBImpl extends ContainerQueuingLimit { + + private ContainerQueuingLimitProto proto = + ContainerQueuingLimitProto.getDefaultInstance(); + private ContainerQueuingLimitProto.Builder builder = null; + private boolean viaProto = false; + + public ContainerQueuingLimitPBImpl() { + builder = ContainerQueuingLimitProto.newBuilder(); + } + + public ContainerQueuingLimitPBImpl(ContainerQueuingLimitProto proto) { + this.proto = proto; + this.viaProto = true; + } + + public ContainerQueuingLimitProto getProto() { + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = ContainerQueuingLimitProto.newBuilder(proto); + } + viaProto = false; + } + + @Override + public int getMaxQueueWaitTimeInMs() { + ContainerQueuingLimitProtoOrBuilder p = viaProto ? proto : builder; + return p.getMaxQueueWaitTimeInMs(); + } + + @Override + public void setMaxQueueWaitTimeInMs(int waitTime) { + maybeInitBuilder(); + builder.setMaxQueueWaitTimeInMs(waitTime); + } + + @Override + public int getMaxQueueLength() { + ContainerQueuingLimitProtoOrBuilder p = viaProto ? proto : builder; + return p.getMaxQueueLength(); + } + + @Override + public void setMaxQueueLength(int queueLength) { + maybeInitBuilder(); + builder.setMaxQueueLength(queueLength); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0ac18d0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto index 786d8ee..a977653 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto @@ -100,6 +100,12 @@ message NodeHeartbeatResponseProto { optional bool areNodeLabelsAcceptedByRM = 11 [default = false]; repeated ContainerProto containers_to_decrease = 12; repeated SignalContainerRequestProto containers_to_signal = 13; + optional ContainerQueuingLimitProto container_queuing_limit = 14; +} + +message ContainerQueuingLimitProto { + optional int32 max_queue_length = 1; + optional int32 max_queue_wait_time_in_ms = 2; } message SystemCredentialsForAppsProto { http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0ac18d0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java index 626d0a1..cfcf1bd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java @@ -23,13 +23,13 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; import org.apache.hadoop.security.Credentials; -import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManager; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; @@ -82,7 +82,7 @@ public interface Context { NodeHealthStatus getNodeHealthStatus(); - ContainerManagementProtocol getContainerManager(); + ContainerManager getContainerManager(); NodeResourceMonitor getNodeResourceMonitor(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0ac18d0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index b48706d..6ca7ffe 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -47,7 +47,6 @@ import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.ShutdownHookManager; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler; -import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; @@ -60,6 +59,7 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManager; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; @@ -465,7 +465,7 @@ public class NodeManager extends CompositeService private final NMContainerTokenSecretManager containerTokenSecretManager; private final NMTokenSecretManagerInNM nmTokenSecretManager; - private ContainerManagementProtocol containerManager; + private ContainerManager containerManager; private NodeResourceMonitor nodeResourceMonitor; private final LocalDirsHandlerService dirsHandler; private final ApplicationACLsManager aclsManager; @@ -555,11 +555,11 @@ public class NodeManager extends CompositeService } @Override - public ContainerManagementProtocol getContainerManager() { + public ContainerManager getContainerManager() { return this.containerManager; } - public void setContainerManager(ContainerManagementProtocol containerManager) { + public void setContainerManager(ContainerManager containerManager) { this.containerManager = containerManager; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0ac18d0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index c0f02e9..9b74f8d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -71,13 +71,14 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerRequest; + +import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor; @@ -401,8 +402,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements LOG.info(successfullRegistrationMsg); LOG.info("Notifying ContainerManager to unblock new container-requests"); - ((ContainerManagerImpl) this.context.getContainerManager()) - .setBlockNewContainerRequests(false); + this.context.getContainerManager().setBlockNewContainerRequests(false); } private List<ApplicationId> createKeepAliveApplicationList() { @@ -465,10 +465,8 @@ public class NodeStatusUpdaterImpl extends AbstractService implements * @return Resource utilization of all the containers. */ private ResourceUtilization getContainersUtilization() { - ContainerManagerImpl containerManager = - (ContainerManagerImpl) this.context.getContainerManager(); ContainersMonitor containersMonitor = - containerManager.getContainersMonitor(); + this.context.getContainerManager().getContainersMonitor(); return containersMonitor.getContainersUtilization(); } @@ -735,7 +733,6 @@ public class NodeStatusUpdaterImpl extends AbstractService implements Set<NodeLabel> nodeLabelsForHeartbeat = nodeLabelsHandler.getNodeLabelsForHeartbeat(); NodeStatus nodeStatus = getNodeStatus(lastHeartbeatID); - NodeHeartbeatRequest request = NodeHeartbeatRequest.newInstance(nodeStatus, NodeStatusUpdaterImpl.this.context @@ -760,82 +757,70 @@ public class NodeStatusUpdaterImpl extends AbstractService implements nextHeartBeatInterval = response.getNextHeartBeatInterval(); updateMasterKeys(response); - if (response.getNodeAction() == NodeAction.SHUTDOWN) { - LOG.warn("Recieved SHUTDOWN signal from Resourcemanager as part of" - + " heartbeat, hence shutting down."); - LOG.warn("Message from ResourceManager: " - + response.getDiagnosticsMessage()); - context.setDecommissioned(true); - dispatcher.getEventHandler().handle( - new NodeManagerEvent(NodeManagerEventType.SHUTDOWN)); - break; - } - if (response.getNodeAction() == NodeAction.RESYNC) { - LOG.warn("Node is out of sync with ResourceManager," - + " hence resyncing."); - LOG.warn("Message from ResourceManager: " - + response.getDiagnosticsMessage()); - // Invalidate the RMIdentifier while resync - NodeStatusUpdaterImpl.this.rmIdentifier = - ResourceManagerConstants.RM_INVALID_IDENTIFIER; - dispatcher.getEventHandler().handle( - new NodeManagerEvent(NodeManagerEventType.RESYNC)); - pendingCompletedContainers.clear(); - break; - } - - nodeLabelsHandler.verifyRMHeartbeatResponseForNodeLabels(response); - - // Explicitly put this method after checking the resync response. We - // don't want to remove the completed containers before resync - // because these completed containers will be reported back to RM - // when NM re-registers with RM. - // Only remove the cleanedup containers that are acked - removeOrTrackCompletedContainersFromContext(response + if (!handleShutdownOrResyncCommand(response)) { + nodeLabelsHandler.verifyRMHeartbeatResponseForNodeLabels( + response); + + // Explicitly put this method after checking the resync + // response. We + // don't want to remove the completed containers before resync + // because these completed containers will be reported back to RM + // when NM re-registers with RM. + // Only remove the cleanedup containers that are acked + removeOrTrackCompletedContainersFromContext(response .getContainersToBeRemovedFromNM()); - logAggregationReportForAppsTempList.clear(); - lastHeartbeatID = response.getResponseId(); - List<ContainerId> containersToCleanup = response - .getContainersToCleanup(); - if (!containersToCleanup.isEmpty()) { - dispatcher.getEventHandler().handle( - new CMgrCompletedContainersEvent(containersToCleanup, - CMgrCompletedContainersEvent.Reason.BY_RESOURCEMANAGER)); - } - List<ApplicationId> appsToCleanup = - response.getApplicationsToCleanup(); - //Only start tracking for keepAlive on FINISH_APP - trackAppsForKeepAlive(appsToCleanup); - if (!appsToCleanup.isEmpty()) { - dispatcher.getEventHandler().handle( - new CMgrCompletedAppsEvent(appsToCleanup, - CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER)); - } - - Map<ApplicationId, ByteBuffer> systemCredentials = - response.getSystemCredentialsForApps(); - if (systemCredentials != null && !systemCredentials.isEmpty()) { - ((NMContext) context) - .setSystemCrendentialsForApps(parseCredentials(systemCredentials)); - } + logAggregationReportForAppsTempList.clear(); + lastHeartbeatID = response.getResponseId(); + List<ContainerId> containersToCleanup = response + .getContainersToCleanup(); + if (!containersToCleanup.isEmpty()) { + dispatcher.getEventHandler().handle( + new CMgrCompletedContainersEvent(containersToCleanup, + CMgrCompletedContainersEvent.Reason + .BY_RESOURCEMANAGER)); + } + List<ApplicationId> appsToCleanup = + response.getApplicationsToCleanup(); + //Only start tracking for keepAlive on FINISH_APP + trackAppsForKeepAlive(appsToCleanup); + if (!appsToCleanup.isEmpty()) { + dispatcher.getEventHandler().handle( + new CMgrCompletedAppsEvent(appsToCleanup, + CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER)); + } + Map<ApplicationId, ByteBuffer> systemCredentials = + response.getSystemCredentialsForApps(); + if (systemCredentials != null && !systemCredentials.isEmpty()) { + ((NMContext) context).setSystemCrendentialsForApps( + parseCredentials(systemCredentials)); + } + List<org.apache.hadoop.yarn.api.records.Container> + containersToDecrease = response.getContainersToDecrease(); + if (!containersToDecrease.isEmpty()) { + dispatcher.getEventHandler().handle( + new CMgrDecreaseContainersResourceEvent( + containersToDecrease) + ); + } - List<org.apache.hadoop.yarn.api.records.Container> - containersToDecrease = response.getContainersToDecrease(); - if (!containersToDecrease.isEmpty()) { - dispatcher.getEventHandler().handle( - new CMgrDecreaseContainersResourceEvent(containersToDecrease) - ); - } + // SignalContainer request originally comes from end users via + // ClientRMProtocol's SignalContainer. Forward the request to + // ContainerManager which will dispatch the event to + // ContainerLauncher. + List<SignalContainerRequest> containersToSignal = response + .getContainersToSignalList(); + if (containersToSignal.size() != 0) { + dispatcher.getEventHandler().handle( + new CMgrSignalContainersEvent(containersToSignal)); + } - // SignalContainer request originally comes from end users via - // ClientRMProtocol's SignalContainer. Forward the request to - // ContainerManager which will dispatch the event to ContainerLauncher. - List<SignalContainerRequest> containersToSignal = response - .getContainersToSignalList(); - if (containersToSignal.size() != 0) { - dispatcher.getEventHandler().handle( - new CMgrSignalContainersEvent(containersToSignal)); + // Update QueuingLimits if ContainerManager supports queuing + ContainerQueuingLimit queuingLimit = + response.getContainerQueuingLimit(); + if (queuingLimit != null) { + context.getContainerManager().updateQueuingLimit(queuingLimit); + } } } catch (ConnectException e) { //catch and throw the exception if tried MAX wait time to connect RM @@ -883,6 +868,34 @@ public class NodeStatusUpdaterImpl extends AbstractService implements statusUpdater.start(); } + private boolean handleShutdownOrResyncCommand( + NodeHeartbeatResponse response) { + if (response.getNodeAction() == NodeAction.SHUTDOWN) { + LOG.warn("Recieved SHUTDOWN signal from Resourcemanager as part of" + + " heartbeat, hence shutting down."); + LOG.warn("Message from ResourceManager: " + + response.getDiagnosticsMessage()); + context.setDecommissioned(true); + dispatcher.getEventHandler().handle( + new NodeManagerEvent(NodeManagerEventType.SHUTDOWN)); + return true; + } + if (response.getNodeAction() == NodeAction.RESYNC) { + LOG.warn("Node is out of sync with ResourceManager," + + " hence resyncing."); + LOG.warn("Message from ResourceManager: " + + response.getDiagnosticsMessage()); + // Invalidate the RMIdentifier while resync + NodeStatusUpdaterImpl.this.rmIdentifier = + ResourceManagerConstants.RM_INVALID_IDENTIFIER; + dispatcher.getEventHandler().handle( + new NodeManagerEvent(NodeManagerEventType.RESYNC)); + pendingCompletedContainers.clear(); + return true; + } + return false; + } + private List<LogAggregationReport> getLogAggregationReportsForApps( ConcurrentLinkedQueue<LogAggregationReport> lastestLogAggregationStatus) { LogAggregationReport status; http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0ac18d0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManager.java new file mode 100644 index 0000000..0da02b3 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManager.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager; + +import org.apache.hadoop.service.ServiceStateChangeListener; +import org.apache.hadoop.yarn.api.ContainerManagementProtocol; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; +import org.apache.hadoop.yarn.server.nodemanager.ContainerManagerEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor + .ContainersMonitor; + +/** + * The ContainerManager is an entity that manages the life cycle of Containers. + */ +public interface ContainerManager extends ServiceStateChangeListener, + ContainerManagementProtocol, + EventHandler<ContainerManagerEvent> { + + ContainersMonitor getContainersMonitor(); + + void updateQueuingLimit(ContainerQueuingLimit queuingLimit); + + void setBlockNewContainerRequests(boolean blockNewContainerRequests); + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0ac18d0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index 4383d2b..e1c4131 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -53,7 +53,6 @@ import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.service.Service; -import org.apache.hadoop.service.ServiceStateChangeListener; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; @@ -95,6 +94,7 @@ import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.Containe import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.security.NMTokenIdentifier; import org.apache.hadoop.yarn.server.api.ContainerType; +import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent; import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedContainersEvent; import org.apache.hadoop.yarn.server.nodemanager.CMgrDecreaseContainersResourceEvent; @@ -150,8 +150,7 @@ import com.google.protobuf.ByteString; import org.apache.hadoop.yarn.util.resource.Resources; public class ContainerManagerImpl extends CompositeService implements - ServiceStateChangeListener, ContainerManagementProtocol, - EventHandler<ContainerManagerEvent> { + ContainerManager { /** * Extra duration to wait for applications to be killed on shutdown. @@ -410,6 +409,7 @@ public class ContainerManagerImpl extends CompositeService implements } } + @Override public ContainersMonitor getContainersMonitor() { return this.containersMonitor; } @@ -1398,6 +1398,7 @@ public class ContainerManagerImpl extends CompositeService implements } } + @Override public void setBlockNewContainerRequests(boolean blockNewContainerRequests) { this.blockNewContainerRequests.set(blockNewContainerRequests); } @@ -1434,4 +1435,9 @@ public class ContainerManagerImpl extends CompositeService implements protected boolean isServiceStopped() { return serviceStopped; } + + @Override + public void updateQueuingLimit(ContainerQueuingLimit queuingLimit) { + LOG.trace("Implementation does not support queuing of Containers !!"); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0ac18d0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java index 94d3172..5b1b77a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java @@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.security.NMTokenIdentifier; +import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; @@ -83,6 +84,7 @@ public class QueuingContainerManagerImpl extends ContainerManagerImpl { private Queue<AllocatedContainerInfo> queuedOpportunisticContainers; private Set<ContainerId> opportunisticContainersToKill; + private final ContainerQueuingLimit queuingLimit; public QueuingContainerManagerImpl(Context context, ContainerExecutor exec, DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater, @@ -95,6 +97,7 @@ public class QueuingContainerManagerImpl extends ContainerManagerImpl { this.queuedOpportunisticContainers = new ConcurrentLinkedQueue<>(); this.opportunisticContainersToKill = Collections.synchronizedSet( new HashSet<ContainerId>()); + this.queuingLimit = ContainerQueuingLimit.newInstance(); } @Override @@ -526,6 +529,41 @@ public class QueuingContainerManagerImpl extends ContainerManagerImpl { } } + @Override + public void updateQueuingLimit(ContainerQueuingLimit limit) { + this.queuingLimit.setMaxQueueLength(limit.getMaxQueueLength()); + // TODO: Include wait time as well once it is implemented + if (this.queuingLimit.getMaxQueueLength() > -1) { + shedQueuedOpportunisticContainers(); + } + } + + private void shedQueuedOpportunisticContainers() { + int numAllowed = this.queuingLimit.getMaxQueueLength(); + Iterator<AllocatedContainerInfo> containerIter = + queuedOpportunisticContainers.iterator(); + while (containerIter.hasNext()) { + AllocatedContainerInfo cInfo = containerIter.next(); + if (numAllowed <= 0) { + containerIter.remove(); + ContainerTokenIdentifier containerTokenIdentifier = this.context + .getQueuingContext().getQueuedContainers().remove( + cInfo.getContainerTokenIdentifier().getContainerID()); + // The Container might have already started while we were + // iterating.. + if (containerTokenIdentifier != null) { + this.context.getQueuingContext().getKilledQueuedContainers() + .putIfAbsent(cInfo.getContainerTokenIdentifier(), + "Container De-queued to meet global queuing limits. " + + "Max Queue length[" + + this.queuingLimit.getMaxQueueLength() + "]"); + } + } + numAllowed--; + } + } + + static class AllocatedContainerInfo { private final ContainerTokenIdentifier containerTokenIdentifier; private final StartContainerRequest startRequest; http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0ac18d0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java index ce405f8..6ff74c9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java @@ -64,6 +64,7 @@ import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; import org.apache.hadoop.yarn.server.nodemanager.NodeResourceMonitor; import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManager; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; @@ -642,7 +643,7 @@ public abstract class BaseAMRMProxyTest { } @Override - public ContainerManagementProtocol getContainerManager() { + public ContainerManager getContainerManager() { return null; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0ac18d0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMonitor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMonitor.java index 4fd62d0..5fb05ca 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMonitor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMonitor.java @@ -24,13 +24,17 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import java.util.List; +/** + * Implementations of this class are notified of changes to the cluster's state, + * such as node addition, removal and updates. + */ public interface ClusterMonitor { void addNode(List<NMContainerStatus> containerStatuses, RMNode rmNode); void removeNode(RMNode removedRMNode); - void nodeUpdate(RMNode rmNode); + void updateNode(RMNode rmNode); void updateNodeResource(RMNode rmNode, ResourceOption resourceOption); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0ac18d0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java index 170d91a..a93f683 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java @@ -46,8 +46,10 @@ import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.proto.ApplicationMasterProtocol.ApplicationMasterProtocolService; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed - .TopKNodeSelector; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.NodeQueueLoadMonitor; + + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.QueueLimitCalculator; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent; @@ -57,7 +59,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretMan import java.io.IOException; import java.net.InetSocketAddress; -import java.util.ArrayList; import java.util.HashSet; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -76,30 +77,64 @@ public class DistributedSchedulingService extends ApplicationMasterService private static final Log LOG = LogFactory.getLog(DistributedSchedulingService.class); - private final TopKNodeSelector clusterMonitor; + private final NodeQueueLoadMonitor nodeMonitor; private final ConcurrentHashMap<String, Set<NodeId>> rackToNode = new ConcurrentHashMap<>(); private final ConcurrentHashMap<String, Set<NodeId>> hostToNode = new ConcurrentHashMap<>(); + private final int k; public DistributedSchedulingService(RMContext rmContext, YarnScheduler scheduler) { super(DistributedSchedulingService.class.getName(), rmContext, scheduler); - int k = rmContext.getYarnConfiguration().getInt( + this.k = rmContext.getYarnConfiguration().getInt( YarnConfiguration.DIST_SCHEDULING_TOP_K, YarnConfiguration.DIST_SCHEDULING_TOP_K_DEFAULT); - long topKComputationInterval = rmContext.getYarnConfiguration().getLong( - YarnConfiguration.DIST_SCHEDULING_TOP_K_COMPUTE_INT_MS, - YarnConfiguration.DIST_SCHEDULING_TOP_K_COMPUTE_INT_MS_DEFAULT); - TopKNodeSelector.TopKComparator comparator = - TopKNodeSelector.TopKComparator.valueOf( + long nodeSortInterval = rmContext.getYarnConfiguration().getLong( + YarnConfiguration.NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS, + YarnConfiguration. + NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS_DEFAULT); + NodeQueueLoadMonitor.LoadComparator comparator = + NodeQueueLoadMonitor.LoadComparator.valueOf( rmContext.getYarnConfiguration().get( - YarnConfiguration.DIST_SCHEDULING_TOP_K_COMPARATOR, - YarnConfiguration.DIST_SCHEDULING_TOP_K_COMPARATOR_DEFAULT)); - TopKNodeSelector topKSelector = - new TopKNodeSelector(k, topKComputationInterval, comparator); - this.clusterMonitor = topKSelector; + YarnConfiguration.NM_CONTAINER_QUEUING_LOAD_COMPARATOR, + YarnConfiguration. + NM_CONTAINER_QUEUING_LOAD_COMPARATOR_DEFAULT)); + + NodeQueueLoadMonitor topKSelector = + new NodeQueueLoadMonitor(nodeSortInterval, comparator); + + float sigma = rmContext.getYarnConfiguration() + .getFloat(YarnConfiguration.NM_CONTAINER_QUEUING_LIMIT_STDEV, + YarnConfiguration.NM_CONTAINER_QUEUING_LIMIT_STDEV_DEFAULT); + + int limitMin, limitMax; + + if (comparator == NodeQueueLoadMonitor.LoadComparator.QUEUE_LENGTH) { + limitMin = rmContext.getYarnConfiguration() + .getInt(YarnConfiguration.NM_CONTAINER_QUEUING_MIN_QUEUE_LENGTH, + YarnConfiguration. + NM_CONTAINER_QUEUING_MIN_QUEUE_LENGTH_DEFAULT); + limitMax = rmContext.getYarnConfiguration() + .getInt(YarnConfiguration.NM_CONTAINER_QUEUING_MAX_QUEUE_LENGTH, + YarnConfiguration. + NM_CONTAINER_QUEUING_MAX_QUEUE_LENGTH_DEFAULT); + } else { + limitMin = rmContext.getYarnConfiguration() + .getInt( + YarnConfiguration.NM_CONTAINER_QUEUING_MIN_QUEUE_WAIT_TIME_MS, + YarnConfiguration. + NM_CONTAINER_QUEUING_MIN_QUEUE_WAIT_TIME_MS_DEFAULT); + limitMax = rmContext.getYarnConfiguration() + .getInt( + YarnConfiguration.NM_CONTAINER_QUEUING_MAX_QUEUE_WAIT_TIME_MS, + YarnConfiguration. + NM_CONTAINER_QUEUING_MAX_QUEUE_WAIT_TIME_MS_DEFAULT); + } + + topKSelector.initThresholdCalculator(sigma, limitMin, limitMax); + this.nodeMonitor = topKSelector; } @Override @@ -189,7 +224,7 @@ public class DistributedSchedulingService extends ApplicationMasterService // Set nodes to be used for scheduling dsResp.setNodesForScheduling( - new ArrayList<>(this.clusterMonitor.selectNodes())); + this.nodeMonitor.selectLeastLoadedNodes(this.k)); return dsResp; } @@ -201,7 +236,7 @@ public class DistributedSchedulingService extends ApplicationMasterService (DistSchedAllocateResponse.class); dsResp.setAllocateResponse(response); dsResp.setNodesForScheduling( - new ArrayList<>(this.clusterMonitor.selectNodes())); + this.nodeMonitor.selectLeastLoadedNodes(this.k)); return dsResp; } @@ -229,67 +264,72 @@ public class DistributedSchedulingService extends ApplicationMasterService @Override public void handle(SchedulerEvent event) { switch (event.getType()) { - case NODE_ADDED: - if (!(event instanceof NodeAddedSchedulerEvent)) { - throw new RuntimeException("Unexpected event type: " + event); - } - NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent)event; - clusterMonitor.addNode(nodeAddedEvent.getContainerReports(), - nodeAddedEvent.getAddedRMNode()); - addToMapping(rackToNode, nodeAddedEvent.getAddedRMNode().getRackName(), - nodeAddedEvent.getAddedRMNode().getNodeID()); - addToMapping(hostToNode, nodeAddedEvent.getAddedRMNode().getHostName(), - nodeAddedEvent.getAddedRMNode().getNodeID()); - break; - case NODE_REMOVED: - if (!(event instanceof NodeRemovedSchedulerEvent)) { - throw new RuntimeException("Unexpected event type: " + event); - } - NodeRemovedSchedulerEvent nodeRemovedEvent = - (NodeRemovedSchedulerEvent)event; - clusterMonitor.removeNode(nodeRemovedEvent.getRemovedRMNode()); - removeFromMapping(rackToNode, - nodeRemovedEvent.getRemovedRMNode().getRackName(), - nodeRemovedEvent.getRemovedRMNode().getNodeID()); - removeFromMapping(hostToNode, - nodeRemovedEvent.getRemovedRMNode().getHostName(), - nodeRemovedEvent.getRemovedRMNode().getNodeID()); - break; - case NODE_UPDATE: - if (!(event instanceof NodeUpdateSchedulerEvent)) { - throw new RuntimeException("Unexpected event type: " + event); - } - NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event; - clusterMonitor.nodeUpdate(nodeUpdatedEvent.getRMNode()); - break; - case NODE_RESOURCE_UPDATE: - if (!(event instanceof NodeResourceUpdateSchedulerEvent)) { - throw new RuntimeException("Unexpected event type: " + event); - } - NodeResourceUpdateSchedulerEvent nodeResourceUpdatedEvent = - (NodeResourceUpdateSchedulerEvent)event; - clusterMonitor.updateNodeResource(nodeResourceUpdatedEvent.getRMNode(), - nodeResourceUpdatedEvent.getResourceOption()); - break; - - // <-- IGNORED EVENTS : START --> - case APP_ADDED: - break; - case APP_REMOVED: - break; - case APP_ATTEMPT_ADDED: - break; - case APP_ATTEMPT_REMOVED: - break; - case CONTAINER_EXPIRED: - break; - case NODE_LABELS_UPDATE: - break; - // <-- IGNORED EVENTS : END --> - default: - LOG.error("Unknown event arrived at DistributedSchedulingService: " - + event.toString()); + case NODE_ADDED: + if (!(event instanceof NodeAddedSchedulerEvent)) { + throw new RuntimeException("Unexpected event type: " + event); + } + NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent) event; + nodeMonitor.addNode(nodeAddedEvent.getContainerReports(), + nodeAddedEvent.getAddedRMNode()); + addToMapping(rackToNode, nodeAddedEvent.getAddedRMNode().getRackName(), + nodeAddedEvent.getAddedRMNode().getNodeID()); + addToMapping(hostToNode, nodeAddedEvent.getAddedRMNode().getHostName(), + nodeAddedEvent.getAddedRMNode().getNodeID()); + break; + case NODE_REMOVED: + if (!(event instanceof NodeRemovedSchedulerEvent)) { + throw new RuntimeException("Unexpected event type: " + event); + } + NodeRemovedSchedulerEvent nodeRemovedEvent = + (NodeRemovedSchedulerEvent) event; + nodeMonitor.removeNode(nodeRemovedEvent.getRemovedRMNode()); + removeFromMapping(rackToNode, + nodeRemovedEvent.getRemovedRMNode().getRackName(), + nodeRemovedEvent.getRemovedRMNode().getNodeID()); + removeFromMapping(hostToNode, + nodeRemovedEvent.getRemovedRMNode().getHostName(), + nodeRemovedEvent.getRemovedRMNode().getNodeID()); + break; + case NODE_UPDATE: + if (!(event instanceof NodeUpdateSchedulerEvent)) { + throw new RuntimeException("Unexpected event type: " + event); + } + NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent) + event; + nodeMonitor.updateNode(nodeUpdatedEvent.getRMNode()); + break; + case NODE_RESOURCE_UPDATE: + if (!(event instanceof NodeResourceUpdateSchedulerEvent)) { + throw new RuntimeException("Unexpected event type: " + event); + } + NodeResourceUpdateSchedulerEvent nodeResourceUpdatedEvent = + (NodeResourceUpdateSchedulerEvent) event; + nodeMonitor.updateNodeResource(nodeResourceUpdatedEvent.getRMNode(), + nodeResourceUpdatedEvent.getResourceOption()); + break; + + // <-- IGNORED EVENTS : START --> + case APP_ADDED: + break; + case APP_REMOVED: + break; + case APP_ATTEMPT_ADDED: + break; + case APP_ATTEMPT_REMOVED: + break; + case CONTAINER_EXPIRED: + break; + case NODE_LABELS_UPDATE: + break; + // <-- IGNORED EVENTS : END --> + default: + LOG.error("Unknown event arrived at DistributedSchedulingService: " + + event.toString()); } + } + public QueueLimitCalculator getNodeManagerQueueLimitCalculator() { + return nodeMonitor.getThresholdCalculator(); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0ac18d0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java index f50da3b..b063ecb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java @@ -39,6 +39,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessM import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.QueueLimitCalculator; import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer; @@ -139,4 +141,6 @@ public interface RMContext { void setLeaderElectorService(LeaderElectorService elector); LeaderElectorService getLeaderElectorService(); + + QueueLimitCalculator getNodeManagerQueueLimitCalculator(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0ac18d0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java index ec2aeb7..d228388 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java @@ -43,6 +43,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessM import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.QueueLimitCalculator; import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer; @@ -74,6 +76,8 @@ public class RMContextImpl implements RMContext { private SystemMetricsPublisher systemMetricsPublisher; private LeaderElectorService elector; + private QueueLimitCalculator queueLimitCalculator; + /** * Default constructor. To be used in conjunction with setter methods for * individual fields. @@ -472,4 +476,14 @@ public class RMContextImpl implements RMContext { public void setQueuePlacementManager(PlacementManager placementMgr) { this.activeServiceContext.setQueuePlacementManager(placementMgr); } + + @Override + public QueueLimitCalculator getNodeManagerQueueLimitCalculator() { + return this.queueLimitCalculator; + } + + public void setContainerQueueLimitCalculator( + QueueLimitCalculator limitCalculator) { + this.queueLimitCalculator = limitCalculator; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0ac18d0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 6c80a58..f9d3325 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -1154,6 +1154,8 @@ public class ResourceManager extends CompositeService implements Recoverable { addService(distSchedulerEventDispatcher); rmDispatcher.register(SchedulerEventType.class, distSchedulerEventDispatcher); + this.rmContext.setContainerQueueLimitCalculator( + distributedSchedulingService.getNodeManagerQueueLimitCalculator()); return distributedSchedulingService; } return new ApplicationMasterService(this.rmContext, scheduler); http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0ac18d0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index b0bc565..d306b60 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -536,6 +536,13 @@ public class ResourceTrackerService extends AbstractService implements } } + // 6. Send Container Queuing Limits back to the Node. This will be used by + // the node to truncate the number of Containers queued for execution. + if (this.rmContext.getNodeManagerQueueLimitCalculator() != null) { + nodeHeartBeatResponse.setContainerQueuingLimit( + this.rmContext.getNodeManagerQueueLimitCalculator() + .createContainerQueuingLimit()); + } return nodeHeartBeatResponse; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0ac18d0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java new file mode 100644 index 0000000..21f4f6e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java @@ -0,0 +1,271 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.ResourceOption; +import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; +import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus; +import org.apache.hadoop.yarn.server.resourcemanager.ClusterMonitor; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * The NodeQueueLoadMonitor keeps track of load metrics (such as queue length + * and total wait time) associated with Container Queues on the Node Manager. + * It uses this information to periodically sort the Nodes from least to most + * loaded. + */ +public class NodeQueueLoadMonitor implements ClusterMonitor { + + final static Log LOG = LogFactory.getLog(NodeQueueLoadMonitor.class); + + /** + * The comparator used to specify the metric against which the load + * of two Nodes are compared. + */ + public enum LoadComparator implements Comparator<ClusterNode> { + QUEUE_LENGTH, + QUEUE_WAIT_TIME; + + @Override + public int compare(ClusterNode o1, ClusterNode o2) { + if (getMetric(o1) == getMetric(o2)) { + return o1.timestamp < o2.timestamp ? +1 : -1; + } + return getMetric(o1) > getMetric(o2) ? +1 : -1; + } + + public int getMetric(ClusterNode c) { + return (this == QUEUE_LENGTH) ? c.queueLength : c.queueWaitTime; + } + } + + static class ClusterNode { + int queueLength = 0; + int queueWaitTime = -1; + double timestamp; + final NodeId nodeId; + + public ClusterNode(NodeId nodeId) { + this.nodeId = nodeId; + updateTimestamp(); + } + + public ClusterNode setQueueLength(int qLength) { + this.queueLength = qLength; + return this; + } + + public ClusterNode setQueueWaitTime(int wTime) { + this.queueWaitTime = wTime; + return this; + } + + public ClusterNode updateTimestamp() { + this.timestamp = System.currentTimeMillis(); + return this; + } + } + + private final ScheduledExecutorService scheduledExecutor; + + private final List<NodeId> sortedNodes; + private final Map<NodeId, ClusterNode> clusterNodes = + new ConcurrentHashMap<>(); + private final LoadComparator comparator; + private QueueLimitCalculator thresholdCalculator; + + Runnable computeTask = new Runnable() { + @Override + public void run() { + synchronized (sortedNodes) { + sortedNodes.clear(); + sortedNodes.addAll(sortNodes()); + if (thresholdCalculator != null) { + thresholdCalculator.update(); + } + } + } + }; + + @VisibleForTesting + NodeQueueLoadMonitor(LoadComparator comparator) { + this.sortedNodes = new ArrayList<>(); + this.comparator = comparator; + this.scheduledExecutor = null; + } + + public NodeQueueLoadMonitor(long nodeComputationInterval, + LoadComparator comparator) { + this.sortedNodes = new ArrayList<>(); + this.scheduledExecutor = Executors.newScheduledThreadPool(1); + this.comparator = comparator; + this.scheduledExecutor.scheduleAtFixedRate(computeTask, + nodeComputationInterval, nodeComputationInterval, + TimeUnit.MILLISECONDS); + } + + List<NodeId> getSortedNodes() { + return sortedNodes; + } + + public QueueLimitCalculator getThresholdCalculator() { + return thresholdCalculator; + } + + Map<NodeId, ClusterNode> getClusterNodes() { + return clusterNodes; + } + + Comparator<ClusterNode> getComparator() { + return comparator; + } + + public void initThresholdCalculator(float sigma, int limitMin, int limitMax) { + this.thresholdCalculator = + new QueueLimitCalculator(this, sigma, limitMin, limitMax); + } + + @Override + public void addNode(List<NMContainerStatus> containerStatuses, RMNode + rmNode) { + LOG.debug("Node added event from: " + rmNode.getNode().getName()); + // Ignoring this currently : at least one NODE_UPDATE heartbeat is + // required to ensure node eligibility. + } + + @Override + public void removeNode(RMNode removedRMNode) { + LOG.debug("Node delete event for: " + removedRMNode.getNode().getName()); + synchronized (this.clusterNodes) { + if (this.clusterNodes.containsKey(removedRMNode.getNodeID())) { + this.clusterNodes.remove(removedRMNode.getNodeID()); + LOG.debug("Delete ClusterNode: " + removedRMNode.getNodeID()); + } else { + LOG.debug("Node not in list!"); + } + } + } + + @Override + public void updateNode(RMNode rmNode) { + LOG.debug("Node update event from: " + rmNode.getNodeID()); + QueuedContainersStatus queuedContainersStatus = + rmNode.getQueuedContainersStatus(); + int estimatedQueueWaitTime = + queuedContainersStatus.getEstimatedQueueWaitTime(); + int waitQueueLength = queuedContainersStatus.getWaitQueueLength(); + // Add nodes to clusterNodes. If estimatedQueueTime is -1, ignore node + // UNLESS comparator is based on queue length. + synchronized (this.clusterNodes) { + ClusterNode currentNode = this.clusterNodes.get(rmNode.getNodeID()); + if (currentNode == null) { + if (estimatedQueueWaitTime != -1 + || comparator == LoadComparator.QUEUE_LENGTH) { + this.clusterNodes.put(rmNode.getNodeID(), + new ClusterNode(rmNode.getNodeID()) + .setQueueWaitTime(estimatedQueueWaitTime) + .setQueueLength(waitQueueLength)); + LOG.info("Inserting ClusterNode [" + rmNode.getNodeID() + "]" + + "with queue wait time [" + estimatedQueueWaitTime + "] and " + + "wait queue length [" + waitQueueLength + "]"); + } else { + LOG.warn("IGNORING ClusterNode [" + rmNode.getNodeID() + "]" + + "with queue wait time [" + estimatedQueueWaitTime + "] and " + + "wait queue length [" + waitQueueLength + "]"); + } + } else { + if (estimatedQueueWaitTime != -1 + || comparator == LoadComparator.QUEUE_LENGTH) { + currentNode + .setQueueWaitTime(estimatedQueueWaitTime) + .setQueueLength(waitQueueLength) + .updateTimestamp(); + LOG.info("Updating ClusterNode [" + rmNode.getNodeID() + "]" + + "with queue wait time [" + estimatedQueueWaitTime + "] and " + + "wait queue length [" + waitQueueLength + "]"); + } else { + this.clusterNodes.remove(rmNode.getNodeID()); + LOG.info("Deleting ClusterNode [" + rmNode.getNodeID() + "]" + + "with queue wait time [" + currentNode.queueWaitTime + "] and " + + "wait queue length [" + currentNode.queueLength + "]"); + } + } + } + } + + @Override + public void updateNodeResource(RMNode rmNode, ResourceOption resourceOption) { + LOG.debug("Node resource update event from: " + rmNode.getNodeID()); + // Ignoring this currently. + } + + /** + * Returns all Node Ids as ordered list from Least to Most Loaded. + * @return ordered list of nodes + */ + public List<NodeId> selectNodes() { + return selectLeastLoadedNodes(-1); + } + + /** + * Returns 'K' of the least Loaded Node Ids as ordered list. + * @param k max number of nodes to return + * @return ordered list of nodes + */ + public List<NodeId> selectLeastLoadedNodes(int k) { + synchronized (this.sortedNodes) { + return ((k < this.sortedNodes.size()) && (k >= 0)) ? + new ArrayList<>(this.sortedNodes).subList(0, k) : + new ArrayList<>(this.sortedNodes); + } + } + + private List<NodeId> sortNodes() { + synchronized (this.clusterNodes) { + ArrayList aList = new ArrayList<>(this.clusterNodes.values()); + List<NodeId> retList = new ArrayList<>(); + Object[] nodes = aList.toArray(); + // Collections.sort would do something similar by calling Arrays.sort + // internally but would finally iterate through the input list (aList) + // to reset the value of each element. Since we don't really care about + // 'aList', we can use the iteration to create the list of nodeIds which + // is what we ultimately care about. + Arrays.sort(nodes, (Comparator)comparator); + for (int j=0; j < nodes.length; j++) { + retList.add(((ClusterNode)nodes[j]).nodeId); + } + return retList; + } + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0ac18d0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/QueueLimitCalculator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/QueueLimitCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/QueueLimitCalculator.java new file mode 100644 index 0000000..ab3a577 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/QueueLimitCalculator.java @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed; + +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.NodeQueueLoadMonitor.ClusterNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.NodeQueueLoadMonitor.LoadComparator; + +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * This class interacts with the NodeQueueLoadMonitor to keep track of the + * mean and standard deviation of the configured metrics (queue length or queue + * wait time) used to characterize the queue load of a specific node. + * The NodeQueueLoadMonitor triggers an update (by calling the + * <code>update()</code> method) every time it performs a re-ordering of + * all nodes. + */ +public class QueueLimitCalculator { + + class Stats { + private final AtomicInteger mean = new AtomicInteger(0); + private final AtomicInteger stdev = new AtomicInteger(0); + + /** + * Not thread safe. Caller should synchronize on sorted nodes list. + */ + void update() { + List<NodeId> sortedNodes = nodeSelector.getSortedNodes(); + if (sortedNodes.size() > 0) { + // Calculate mean + int sum = 0; + for (NodeId n : sortedNodes) { + sum += getMetric(getNode(n)); + } + mean.set(sum / sortedNodes.size()); + + // Calculate stdev + int sqrSumMean = 0; + for (NodeId n : sortedNodes) { + int val = getMetric(getNode(n)); + sqrSumMean += Math.pow(val - mean.get(), 2); + } + stdev.set( + (int) Math.round(Math.sqrt( + sqrSumMean / (float) sortedNodes.size()))); + } + } + + private ClusterNode getNode(NodeId nId) { + return nodeSelector.getClusterNodes().get(nId); + } + + private int getMetric(ClusterNode cn) { + return (cn != null) ? ((LoadComparator)nodeSelector.getComparator()) + .getMetric(cn) : 0; + } + + public int getMean() { + return mean.get(); + } + + public int getStdev() { + return stdev.get(); + } + } + + private final NodeQueueLoadMonitor nodeSelector; + private final float sigma; + private final int rangeMin; + private final int rangeMax; + private final Stats stats = new Stats(); + + QueueLimitCalculator(NodeQueueLoadMonitor selector, float sigma, + int rangeMin, int rangeMax) { + this.nodeSelector = selector; + this.sigma = sigma; + this.rangeMax = rangeMax; + this.rangeMin = rangeMin; + } + + private int determineThreshold() { + return (int) (stats.getMean() + sigma * stats.getStdev()); + } + + void update() { + this.stats.update(); + } + + private int getThreshold() { + int thres = determineThreshold(); + return Math.min(rangeMax, Math.max(rangeMin, thres)); + } + + public ContainerQueuingLimit createContainerQueuingLimit() { + ContainerQueuingLimit containerQueuingLimit = + ContainerQueuingLimit.newInstance(); + if (nodeSelector.getComparator() == LoadComparator.QUEUE_WAIT_TIME) { + containerQueuingLimit.setMaxQueueWaitTimeInMs(getThreshold()); + containerQueuingLimit.setMaxQueueLength(-1); + } else { + containerQueuingLimit.setMaxQueueWaitTimeInMs(-1); + containerQueuingLimit.setMaxQueueLength(getThreshold()); + } + return containerQueuingLimit; + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org