This is an automated email from the ASF dual-hosted git repository.
abmodi pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4d3c580 YARN-9859. Refactoring of OpportunisticContainerAllocator.
Contributed by Abhishek Modi.
4d3c580 is described below
commit 4d3c580b03475a6ec9323d11e6875c542f8e3f6d
Author: Abhishek Modi <[email protected]>
AuthorDate: Mon Sep 30 23:40:15 2019 +0530
YARN-9859. Refactoring of OpportunisticContainerAllocator. Contributed by
Abhishek Modi.
---
...DistributedOpportunisticContainerAllocator.java | 357 +++++++++++++++++++++
.../scheduler/OpportunisticContainerAllocator.java | 347 +++-----------------
.../TestOpportunisticContainerAllocator.java | 2 +-
.../yarn/server/nodemanager/NodeManager.java | 3 +-
.../scheduler/TestDistributedScheduler.java | 4 +-
.../OpportunisticContainerAllocatorAMService.java | 8 +-
6 files changed, 416 insertions(+), 305 deletions(-)
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/DistributedOpportunisticContainerAllocator.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/DistributedOpportunisticContainerAllocator.java
new file mode 100644
index 0000000..da90167
--- /dev/null
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/DistributedOpportunisticContainerAllocator.java
@@ -0,0 +1,357 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.scheduler;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+import org.apache.hadoop.yarn.server.api.protocolrecords.RemoteNode;
+import org.apache.hadoop.yarn.server.metrics.OpportunisticSchedulerMetrics;
+import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * <p>
+ * The DistributedOpportunisticContainerAllocator allocates containers on a
+ * given list of nodes, after modifying the container sizes to respect the
+ * limits set by the ResourceManager. It tries to distribute the containers
+ * as evenly as possible.
+ * </p>
+ */
+public class DistributedOpportunisticContainerAllocator
+ extends OpportunisticContainerAllocator {
+
+ private static final int NODE_LOCAL_LOOP = 0;
+ private static final int RACK_LOCAL_LOOP = 1;
+ private static final int OFF_SWITCH_LOOP = 2;
+
+ private static final Logger LOG =
+
LoggerFactory.getLogger(DistributedOpportunisticContainerAllocator.class);
+
+ /**
+ * Create a new Opportunistic Container Allocator.
+ * @param tokenSecretManager TokenSecretManager
+ */
+ public DistributedOpportunisticContainerAllocator(
+ BaseContainerTokenSecretManager tokenSecretManager) {
+ super(tokenSecretManager);
+ }
+
+ /**
+ * Create a new Opportunistic Container Allocator.
+ * @param tokenSecretManager TokenSecretManager
+ * @param maxAllocationsPerAMHeartbeat max number of containers to be
+ * allocated in one AM heartbeat
+ */
+ public DistributedOpportunisticContainerAllocator(
+ BaseContainerTokenSecretManager tokenSecretManager,
+ int maxAllocationsPerAMHeartbeat) {
+ super(tokenSecretManager, maxAllocationsPerAMHeartbeat);
+ }
+
+ @Override
+ public List<Container> allocateContainers(ResourceBlacklistRequest blackList,
+ List<ResourceRequest> oppResourceReqs,
+ ApplicationAttemptId applicationAttemptId,
+ OpportunisticContainerContext opportContext, long rmIdentifier,
+ String appSubmitter) throws YarnException {
+
+ // Update black list.
+ updateBlacklist(blackList, opportContext);
+
+ // Add OPPORTUNISTIC requests to the outstanding ones.
+ opportContext.addToOutstandingReqs(oppResourceReqs);
+ Set<String> nodeBlackList = new HashSet<>(opportContext.getBlacklist());
+ Set<String> allocatedNodes = new HashSet<>();
+ List<Container> allocatedContainers = new ArrayList<>();
+
+ // Satisfy the outstanding OPPORTUNISTIC requests.
+ boolean continueLoop = true;
+ while (continueLoop) {
+ continueLoop = false;
+ List<Map<Resource, List<Allocation>>> allocations = new ArrayList<>();
+ for (SchedulerRequestKey schedulerKey :
+ opportContext.getOutstandingOpReqs().descendingKeySet()) {
+ // Allocated containers :
+ // Key = Requested Capability,
+ // Value = List of Containers of given cap (the actual container size
+ // might be different than what is requested, which is why
+ // we need the requested capability (key) to match against
+ // the outstanding reqs)
+ int remAllocs = -1;
+ int maxAllocationsPerAMHeartbeat = getMaxAllocationsPerAMHeartbeat();
+ if (maxAllocationsPerAMHeartbeat > 0) {
+ remAllocs =
+ maxAllocationsPerAMHeartbeat - allocatedContainers.size()
+ - getTotalAllocations(allocations);
+ if (remAllocs <= 0) {
+ LOG.info("Not allocating more containers as we have reached max "
+ + "allocations per AM heartbeat {}",
+ maxAllocationsPerAMHeartbeat);
+ break;
+ }
+ }
+ Map<Resource, List<Allocation>> allocation = allocate(
+ rmIdentifier, opportContext, schedulerKey, applicationAttemptId,
+ appSubmitter, nodeBlackList, allocatedNodes, remAllocs);
+ if (allocation.size() > 0) {
+ allocations.add(allocation);
+ continueLoop = true;
+ }
+ }
+ matchAllocation(allocations, allocatedContainers, opportContext);
+ }
+
+ return allocatedContainers;
+ }
+
+ private Map<Resource, List<Allocation>> allocate(long rmIdentifier,
+ OpportunisticContainerContext appContext, SchedulerRequestKey schedKey,
+ ApplicationAttemptId appAttId, String userName, Set<String> blackList,
+ Set<String> allocatedNodes, int maxAllocations)
+ throws YarnException {
+ Map<Resource, List<Allocation>> containers = new HashMap<>();
+ for (EnrichedResourceRequest enrichedAsk :
+ appContext.getOutstandingOpReqs().get(schedKey).values()) {
+ int remainingAllocs = -1;
+ if (maxAllocations > 0) {
+ int totalAllocated = 0;
+ for (List<Allocation> allocs : containers.values()) {
+ totalAllocated += allocs.size();
+ }
+ remainingAllocs = maxAllocations - totalAllocated;
+ if (remainingAllocs <= 0) {
+ LOG.info("Not allocating more containers as max allocations per AM "
+ + "heartbeat {} has reached", getMaxAllocationsPerAMHeartbeat());
+ break;
+ }
+ }
+ allocateContainersInternal(rmIdentifier, appContext.getAppParams(),
+ appContext.getContainerIdGenerator(), blackList, allocatedNodes,
+ appAttId, appContext.getNodeMap(), userName, containers, enrichedAsk,
+ remainingAllocs);
+ ResourceRequest anyAsk = enrichedAsk.getRequest();
+ if (!containers.isEmpty()) {
+ LOG.info("Opportunistic allocation requested for [priority={}, "
+ + "allocationRequestId={}, num_containers={}, capability={}] "
+ + "allocated = {}", anyAsk.getPriority(),
+ anyAsk.getAllocationRequestId(), anyAsk.getNumContainers(),
+ anyAsk.getCapability(), containers.keySet());
+ }
+ }
+ return containers;
+ }
+
+ private void allocateContainersInternal(long rmIdentifier,
+ AllocationParams appParams, ContainerIdGenerator idCounter,
+ Set<String> blacklist, Set<String> allocatedNodes,
+ ApplicationAttemptId id, Map<String, RemoteNode> allNodes,
+ String userName, Map<Resource, List<Allocation>> allocations,
+ EnrichedResourceRequest enrichedAsk, int maxAllocations)
+ throws YarnException {
+ if (allNodes.size() == 0) {
+ LOG.info("No nodes currently available to " +
+ "allocate OPPORTUNISTIC containers.");
+ return;
+ }
+ ResourceRequest anyAsk = enrichedAsk.getRequest();
+ int toAllocate = anyAsk.getNumContainers()
+ - (allocations.isEmpty() ? 0 :
+ allocations.get(anyAsk.getCapability()).size());
+ toAllocate = Math.min(toAllocate,
+ appParams.getMaxAllocationsPerSchedulerKeyPerRound());
+ if (maxAllocations >= 0) {
+ toAllocate = Math.min(maxAllocations, toAllocate);
+ }
+ int numAllocated = 0;
+ // Node Candidates are selected as follows:
+ // * Node local candidates selected in loop == 0
+ // * Rack local candidates selected in loop == 1
+ // * From loop == 2 onwards, we revert to off switch allocations.
+ int loopIndex = OFF_SWITCH_LOOP;
+ if (enrichedAsk.getNodeLocations().size() > 0) {
+ loopIndex = NODE_LOCAL_LOOP;
+ }
+ while (numAllocated < toAllocate) {
+ Collection<RemoteNode> nodeCandidates =
+ findNodeCandidates(loopIndex, allNodes, blacklist, allocatedNodes,
+ enrichedAsk);
+ for (RemoteNode rNode : nodeCandidates) {
+ String rNodeHost = rNode.getNodeId().getHost();
+ // Ignore black list
+ if (blacklist.contains(rNodeHost)) {
+ LOG.info("Nodes for scheduling has a blacklisted node" +
+ " [" + rNodeHost + "]..");
+ continue;
+ }
+ String location = ResourceRequest.ANY;
+ if (loopIndex == NODE_LOCAL_LOOP) {
+ if (enrichedAsk.getNodeLocations().contains(rNodeHost)) {
+ location = rNodeHost;
+ } else {
+ continue;
+ }
+ } else if (allocatedNodes.contains(rNodeHost)) {
+ LOG.info("Opportunistic container has already been allocated on {}.",
+ rNodeHost);
+ continue;
+ }
+ if (loopIndex == RACK_LOCAL_LOOP) {
+ if (enrichedAsk.getRackLocations().contains(rNode.getRackName())) {
+ location = rNode.getRackName();
+ } else {
+ continue;
+ }
+ }
+ Container container = createContainer(rmIdentifier, appParams,
+ idCounter, id, userName, allocations, location,
+ anyAsk, rNode);
+ numAllocated++;
+ updateMetrics(loopIndex);
+ allocatedNodes.add(rNodeHost);
+ LOG.info("Allocated [" + container.getId() + "] as opportunistic at " +
+ "location [" + location + "]");
+ if (numAllocated >= toAllocate) {
+ break;
+ }
+ }
+ if (loopIndex == NODE_LOCAL_LOOP &&
+ enrichedAsk.getRackLocations().size() > 0) {
+ loopIndex = RACK_LOCAL_LOOP;
+ } else {
+ loopIndex++;
+ }
+ // Handle case where there are no nodes remaining after blacklist is
+ // considered.
+ if (loopIndex > OFF_SWITCH_LOOP && numAllocated == 0) {
+ LOG.warn("Unable to allocate any opportunistic containers.");
+ break;
+ }
+ }
+ }
+
+
+
+ private void updateMetrics(int loopIndex) {
+ OpportunisticSchedulerMetrics metrics =
+ OpportunisticSchedulerMetrics.getMetrics();
+ if (loopIndex == NODE_LOCAL_LOOP) {
+ metrics.incrNodeLocalOppContainers();
+ } else if (loopIndex == RACK_LOCAL_LOOP) {
+ metrics.incrRackLocalOppContainers();
+ } else {
+ metrics.incrOffSwitchOppContainers();
+ }
+ }
+
+ private Collection<RemoteNode> findNodeCandidates(int loopIndex,
+ Map<String, RemoteNode> allNodes, Set<String> blackList,
+ Set<String> allocatedNodes, EnrichedResourceRequest enrichedRR) {
+ LinkedList<RemoteNode> retList = new LinkedList<>();
+ String partition = getRequestPartition(enrichedRR);
+ if (loopIndex > 1) {
+ for (RemoteNode remoteNode : allNodes.values()) {
+ if (StringUtils.equals(partition, getRemoteNodePartition(remoteNode)))
{
+ retList.add(remoteNode);
+ }
+ }
+ return retList;
+ } else {
+
+ int numContainers = enrichedRR.getRequest().getNumContainers();
+ while (numContainers > 0) {
+ if (loopIndex == 0) {
+ // Node local candidates
+ numContainers = collectNodeLocalCandidates(
+ allNodes, enrichedRR, retList, numContainers);
+ } else {
+ // Rack local candidates
+ numContainers =
+ collectRackLocalCandidates(allNodes, enrichedRR, retList,
+ blackList, allocatedNodes, numContainers);
+ }
+ if (numContainers == enrichedRR.getRequest().getNumContainers()) {
+ // If there is no change in numContainers, then there is no point
+ // in looping again.
+ break;
+ }
+ }
+ return retList;
+ }
+ }
+
+ private int collectRackLocalCandidates(Map<String, RemoteNode> allNodes,
+ EnrichedResourceRequest enrichedRR, LinkedList<RemoteNode> retList,
+ Set<String> blackList, Set<String> allocatedNodes, int numContainers) {
+ String partition = getRequestPartition(enrichedRR);
+ for (RemoteNode rNode : allNodes.values()) {
+ if (StringUtils.equals(partition, getRemoteNodePartition(rNode)) &&
+ enrichedRR.getRackLocations().contains(rNode.getRackName())) {
+ String rHost = rNode.getNodeId().getHost();
+ if (blackList.contains(rHost)) {
+ continue;
+ }
+ if (allocatedNodes.contains(rHost)) {
+ retList.addLast(rNode);
+ } else {
+ retList.addFirst(rNode);
+ numContainers--;
+ }
+ }
+ if (numContainers == 0) {
+ break;
+ }
+ }
+ return numContainers;
+ }
+
+ private int collectNodeLocalCandidates(Map<String, RemoteNode> allNodes,
+ EnrichedResourceRequest enrichedRR, List<RemoteNode> retList,
+ int numContainers) {
+ String partition = getRequestPartition(enrichedRR);
+ for (String nodeName : enrichedRR.getNodeLocations()) {
+ RemoteNode remoteNode = allNodes.get(nodeName);
+ if (remoteNode != null &&
+ StringUtils.equals(partition, getRemoteNodePartition(remoteNode))) {
+ retList.add(remoteNode);
+ numContainers--;
+ }
+ if (numContainers == 0) {
+ break;
+ }
+ }
+ return numContainers;
+ }
+}
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java
index 0ce1976..4a17a65 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java
@@ -19,7 +19,6 @@
package org.apache.hadoop.yarn.server.scheduler;
import com.google.common.annotations.VisibleForTesting;
-import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.util.Time;
@@ -38,21 +37,15 @@ import
org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.api.ContainerType;
import org.apache.hadoop.yarn.server.api.protocolrecords.RemoteNode;
-import org.apache.hadoop.yarn.server.metrics.OpportunisticSchedulerMetrics;
import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -61,16 +54,11 @@ import java.util.concurrent.atomic.AtomicLong;
/**
* <p>
- * The OpportunisticContainerAllocator allocates containers on a given list of
- * nodes, after modifying the container sizes to respect the limits set by the
- * ResourceManager. It tries to distribute the containers as evenly as
possible.
+ * Base abstract class for Opportunistic container allocations, that provides
+ * common functions required for Opportunistic container allocation.
* </p>
*/
-public class OpportunisticContainerAllocator {
-
- private static final int NODE_LOCAL_LOOP = 0;
- private static final int RACK_LOCAL_LOOP = 1;
- private static final int OFF_SWITCH_LOOP = 2;
+public abstract class OpportunisticContainerAllocator {
private int maxAllocationsPerAMHeartbeat = -1;
@@ -212,9 +200,6 @@ public class OpportunisticContainerAllocator {
}
}
- private static final Logger LOG =
- LoggerFactory.getLogger(OpportunisticContainerAllocator.class);
-
private static final ResourceCalculator RESOURCE_CALCULATOR =
new DominantResourceCalculator();
@@ -238,26 +223,30 @@ public class OpportunisticContainerAllocator {
}
}
- static class EnrichedResourceRequest {
+ /**
+ * This class encapsulates Resource Request and provides requests per
+ * node and rack.
+ */
+ public static class EnrichedResourceRequest {
private final Map<String, AtomicInteger> nodeLocations = new HashMap<>();
private final Map<String, AtomicInteger> rackLocations = new HashMap<>();
private final ResourceRequest request;
private final long timestamp;
- EnrichedResourceRequest(ResourceRequest request) {
+ public EnrichedResourceRequest(ResourceRequest request) {
this.request = request;
timestamp = Time.monotonicNow();
}
- long getTimestamp() {
+ public long getTimestamp() {
return timestamp;
}
- ResourceRequest getRequest() {
+ public ResourceRequest getRequest() {
return request;
}
- void addLocation(String location, int count) {
+ public void addLocation(String location, int count) {
Map<String, AtomicInteger> m = rackLocations;
if (!location.startsWith("/")) {
m = nodeLocations;
@@ -269,7 +258,7 @@ public class OpportunisticContainerAllocator {
}
}
- void removeLocation(String location) {
+ public void removeLocation(String location) {
Map<String, AtomicInteger> m = rackLocations;
AtomicInteger count = m.get(location);
if (count == null) {
@@ -284,14 +273,15 @@ public class OpportunisticContainerAllocator {
}
}
- Set<String> getNodeLocations() {
+ public Set<String> getNodeLocations() {
return nodeLocations.keySet();
}
- Set<String> getRackLocations() {
+ public Set<String> getRackLocations() {
return rackLocations.keySet();
}
}
+
/**
* Create a new Opportunistic Container Allocator.
* @param tokenSecretManager TokenSecretManager
@@ -320,6 +310,14 @@ public class OpportunisticContainerAllocator {
}
/**
+ * Get the Max Allocations per AM heartbeat.
+ * @return maxAllocationsPerAMHeartbeat.
+ */
+ public int getMaxAllocationsPerAMHeartbeat() {
+ return this.maxAllocationsPerAMHeartbeat;
+ }
+
+ /**
* Allocate OPPORTUNISTIC containers.
* @param blackList Resource BlackList Request
* @param oppResourceReqs Opportunistic Resource Requests
@@ -330,72 +328,37 @@ public class OpportunisticContainerAllocator {
* @return List of Containers.
* @throws YarnException YarnException
*/
- public List<Container> allocateContainers(ResourceBlacklistRequest blackList,
+ public abstract List<Container> allocateContainers(
+ ResourceBlacklistRequest blackList,
List<ResourceRequest> oppResourceReqs,
ApplicationAttemptId applicationAttemptId,
OpportunisticContainerContext opportContext, long rmIdentifier,
- String appSubmitter) throws YarnException {
+ String appSubmitter) throws YarnException;
+
- // Update black list.
+ protected void updateBlacklist(ResourceBlacklistRequest blackList,
+ OpportunisticContainerContext oppContext) {
if (blackList != null) {
- opportContext.getBlacklist().removeAll(blackList.getBlacklistRemovals());
- opportContext.getBlacklist().addAll(blackList.getBlacklistAdditions());
+ oppContext.getBlacklist().removeAll(blackList.getBlacklistRemovals());
+ oppContext.getBlacklist().addAll(blackList.getBlacklistAdditions());
}
+ }
- // Add OPPORTUNISTIC requests to the outstanding ones.
- opportContext.addToOutstandingReqs(oppResourceReqs);
- Set<String> nodeBlackList = new HashSet<>(opportContext.getBlacklist());
- Set<String> allocatedNodes = new HashSet<>();
- List<Container> allocatedContainers = new ArrayList<>();
-
- // Satisfy the outstanding OPPORTUNISTIC requests.
- boolean continueLoop = true;
- while (continueLoop) {
- continueLoop = false;
- List<Map<Resource, List<Allocation>>> allocations = new ArrayList<>();
- for (SchedulerRequestKey schedulerKey :
- opportContext.getOutstandingOpReqs().descendingKeySet()) {
- // Allocated containers :
- // Key = Requested Capability,
- // Value = List of Containers of given cap (the actual container size
- // might be different than what is requested, which is why
- // we need the requested capability (key) to match against
- // the outstanding reqs)
- int remAllocs = -1;
- if (maxAllocationsPerAMHeartbeat > 0) {
- remAllocs =
- maxAllocationsPerAMHeartbeat - allocatedContainers.size()
- - getTotalAllocations(allocations);
- if (remAllocs <= 0) {
- LOG.info("Not allocating more containers as we have reached max "
- + "allocations per AM heartbeat {}",
- maxAllocationsPerAMHeartbeat);
- break;
- }
- }
- Map<Resource, List<Allocation>> allocation = allocate(
- rmIdentifier, opportContext, schedulerKey, applicationAttemptId,
- appSubmitter, nodeBlackList, allocatedNodes, remAllocs);
- if (allocation.size() > 0) {
- allocations.add(allocation);
- continueLoop = true;
- }
- }
- for (Map<Resource, List<Allocation>> allocation : allocations) {
- for (Map.Entry<Resource, List<Allocation>> e : allocation.entrySet()) {
- opportContext.matchAllocationToOutstandingRequest(
- e.getKey(), e.getValue());
- for (Allocation alloc : e.getValue()) {
- allocatedContainers.add(alloc.getContainer());
- }
+ protected void matchAllocation(List<Map<Resource,
+ List<Allocation>>> allocations, List<Container> allocatedContainers,
+ OpportunisticContainerContext oppContext) {
+ for (Map<Resource, List<Allocation>> allocation : allocations) {
+ for (Map.Entry<Resource, List<Allocation>> e : allocation.entrySet()) {
+ oppContext.matchAllocationToOutstandingRequest(
+ e.getKey(), e.getValue());
+ for (Allocation alloc : e.getValue()) {
+ allocatedContainers.add(alloc.getContainer());
}
}
}
-
- return allocatedContainers;
}
- private int getTotalAllocations(
+ protected int getTotalAllocations(
List<Map<Resource, List<Allocation>>> allocations) {
int totalAllocs = 0;
for (Map<Resource, List<Allocation>> allocation : allocations) {
@@ -406,223 +369,8 @@ public class OpportunisticContainerAllocator {
return totalAllocs;
}
- private Map<Resource, List<Allocation>> allocate(long rmIdentifier,
- OpportunisticContainerContext appContext, SchedulerRequestKey schedKey,
- ApplicationAttemptId appAttId, String userName, Set<String> blackList,
- Set<String> allocatedNodes, int maxAllocations)
- throws YarnException {
- Map<Resource, List<Allocation>> containers = new HashMap<>();
- for (EnrichedResourceRequest enrichedAsk :
- appContext.getOutstandingOpReqs().get(schedKey).values()) {
- int remainingAllocs = -1;
- if (maxAllocations > 0) {
- int totalAllocated = 0;
- for (List<Allocation> allocs : containers.values()) {
- totalAllocated += allocs.size();
- }
- remainingAllocs = maxAllocations - totalAllocated;
- if (remainingAllocs <= 0) {
- LOG.info("Not allocating more containers as max allocations per AM "
- + "heartbeat {} has reached", maxAllocationsPerAMHeartbeat);
- break;
- }
- }
- allocateContainersInternal(rmIdentifier, appContext.getAppParams(),
- appContext.getContainerIdGenerator(), blackList, allocatedNodes,
- appAttId, appContext.getNodeMap(), userName, containers, enrichedAsk,
- remainingAllocs);
- ResourceRequest anyAsk = enrichedAsk.getRequest();
- if (!containers.isEmpty()) {
- LOG.info("Opportunistic allocation requested for [priority={}, "
- + "allocationRequestId={}, num_containers={}, capability={}] "
- + "allocated = {}", anyAsk.getPriority(),
- anyAsk.getAllocationRequestId(), anyAsk.getNumContainers(),
- anyAsk.getCapability(), containers.keySet());
- }
- }
- return containers;
- }
-
- private void allocateContainersInternal(long rmIdentifier,
- AllocationParams appParams, ContainerIdGenerator idCounter,
- Set<String> blacklist, Set<String> allocatedNodes,
- ApplicationAttemptId id, Map<String, RemoteNode> allNodes,
- String userName, Map<Resource, List<Allocation>> allocations,
- EnrichedResourceRequest enrichedAsk, int maxAllocations)
- throws YarnException {
- if (allNodes.size() == 0) {
- LOG.info("No nodes currently available to " +
- "allocate OPPORTUNISTIC containers.");
- return;
- }
- ResourceRequest anyAsk = enrichedAsk.getRequest();
- int toAllocate = anyAsk.getNumContainers()
- - (allocations.isEmpty() ? 0 :
- allocations.get(anyAsk.getCapability()).size());
- toAllocate = Math.min(toAllocate,
- appParams.getMaxAllocationsPerSchedulerKeyPerRound());
- if (maxAllocations >= 0) {
- toAllocate = Math.min(maxAllocations, toAllocate);
- }
- int numAllocated = 0;
- // Node Candidates are selected as follows:
- // * Node local candidates selected in loop == 0
- // * Rack local candidates selected in loop == 1
- // * From loop == 2 onwards, we revert to off switch allocations.
- int loopIndex = OFF_SWITCH_LOOP;
- if (enrichedAsk.getNodeLocations().size() > 0) {
- loopIndex = NODE_LOCAL_LOOP;
- }
- while (numAllocated < toAllocate) {
- Collection<RemoteNode> nodeCandidates =
- findNodeCandidates(loopIndex, allNodes, blacklist, allocatedNodes,
- enrichedAsk);
- for (RemoteNode rNode : nodeCandidates) {
- String rNodeHost = rNode.getNodeId().getHost();
- // Ignore black list
- if (blacklist.contains(rNodeHost)) {
- LOG.info("Nodes for scheduling has a blacklisted node" +
- " [" + rNodeHost + "]..");
- continue;
- }
- String location = ResourceRequest.ANY;
- if (loopIndex == NODE_LOCAL_LOOP) {
- if (enrichedAsk.getNodeLocations().contains(rNodeHost)) {
- location = rNodeHost;
- } else {
- continue;
- }
- } else if (allocatedNodes.contains(rNodeHost)) {
- LOG.info("Opportunistic container has already been allocated on {}.",
- rNodeHost);
- continue;
- }
- if (loopIndex == RACK_LOCAL_LOOP) {
- if (enrichedAsk.getRackLocations().contains(rNode.getRackName())) {
- location = rNode.getRackName();
- } else {
- continue;
- }
- }
- Container container = createContainer(rmIdentifier, appParams,
- idCounter, id, userName, allocations, location,
- anyAsk, rNode);
- numAllocated++;
- updateMetrics(loopIndex);
- allocatedNodes.add(rNodeHost);
- LOG.info("Allocated [" + container.getId() + "] as opportunistic at " +
- "location [" + location + "]");
- if (numAllocated >= toAllocate) {
- break;
- }
- }
- if (loopIndex == NODE_LOCAL_LOOP &&
- enrichedAsk.getRackLocations().size() > 0) {
- loopIndex = RACK_LOCAL_LOOP;
- } else {
- loopIndex++;
- }
- // Handle case where there are no nodes remaining after blacklist is
- // considered.
- if (loopIndex > OFF_SWITCH_LOOP && numAllocated == 0) {
- LOG.warn("Unable to allocate any opportunistic containers.");
- break;
- }
- }
- }
-
- private void updateMetrics(int loopIndex) {
- OpportunisticSchedulerMetrics metrics =
- OpportunisticSchedulerMetrics.getMetrics();
- if (loopIndex == NODE_LOCAL_LOOP) {
- metrics.incrNodeLocalOppContainers();
- } else if (loopIndex == RACK_LOCAL_LOOP) {
- metrics.incrRackLocalOppContainers();
- } else {
- metrics.incrOffSwitchOppContainers();
- }
- }
-
- private Collection<RemoteNode> findNodeCandidates(int loopIndex,
- Map<String, RemoteNode> allNodes, Set<String> blackList,
- Set<String> allocatedNodes, EnrichedResourceRequest enrichedRR) {
- LinkedList<RemoteNode> retList = new LinkedList<>();
- String partition = getRequestPartition(enrichedRR);
- if (loopIndex > 1) {
- for (RemoteNode remoteNode : allNodes.values()) {
- if (StringUtils.equals(partition, getRemoteNodePartition(remoteNode)))
{
- retList.add(remoteNode);
- }
- }
- return retList;
- } else {
-
- int numContainers = enrichedRR.getRequest().getNumContainers();
- while (numContainers > 0) {
- if (loopIndex == 0) {
- // Node local candidates
- numContainers = collectNodeLocalCandidates(
- allNodes, enrichedRR, retList, numContainers);
- } else {
- // Rack local candidates
- numContainers =
- collectRackLocalCandidates(allNodes, enrichedRR, retList,
- blackList, allocatedNodes, numContainers);
- }
- if (numContainers == enrichedRR.getRequest().getNumContainers()) {
- // If there is no change in numContainers, then there is no point
- // in looping again.
- break;
- }
- }
- return retList;
- }
- }
-
- private int collectRackLocalCandidates(Map<String, RemoteNode> allNodes,
- EnrichedResourceRequest enrichedRR, LinkedList<RemoteNode> retList,
- Set<String> blackList, Set<String> allocatedNodes, int numContainers) {
- String partition = getRequestPartition(enrichedRR);
- for (RemoteNode rNode : allNodes.values()) {
- if (StringUtils.equals(partition, getRemoteNodePartition(rNode)) &&
- enrichedRR.getRackLocations().contains(rNode.getRackName())) {
- String rHost = rNode.getNodeId().getHost();
- if (blackList.contains(rHost)) {
- continue;
- }
- if (allocatedNodes.contains(rHost)) {
- retList.addLast(rNode);
- } else {
- retList.addFirst(rNode);
- numContainers--;
- }
- }
- if (numContainers == 0) {
- break;
- }
- }
- return numContainers;
- }
-
- private int collectNodeLocalCandidates(Map<String, RemoteNode> allNodes,
- EnrichedResourceRequest enrichedRR, List<RemoteNode> retList,
- int numContainers) {
- String partition = getRequestPartition(enrichedRR);
- for (String nodeName : enrichedRR.getNodeLocations()) {
- RemoteNode remoteNode = allNodes.get(nodeName);
- if (remoteNode != null &&
- StringUtils.equals(partition, getRemoteNodePartition(remoteNode))) {
- retList.add(remoteNode);
- numContainers--;
- }
- if (numContainers == 0) {
- break;
- }
- }
- return numContainers;
- }
-
- private Container createContainer(long rmIdentifier,
+ @SuppressWarnings("checkstyle:parameternumber")
+ protected Container createContainer(long rmIdentifier,
AllocationParams appParams, ContainerIdGenerator idCounter,
ApplicationAttemptId id, String userName,
Map<Resource, List<Allocation>> allocations, String location,
@@ -654,6 +402,7 @@ public class OpportunisticContainerAllocator {
SchedulerRequestKey.create(rr), userName, node, cId, capability);
}
+ @SuppressWarnings("checkstyle:parameternumber")
private Container createContainer(long rmIdentifier, long tokenExpiry,
SchedulerRequestKey schedulerKey, String userName, RemoteNode node,
ContainerId cId, Resource capability) {
@@ -718,7 +467,7 @@ public class OpportunisticContainerAllocator {
return partitionedRequests;
}
- private String getRequestPartition(EnrichedResourceRequest enrichedRR) {
+ protected String getRequestPartition(EnrichedResourceRequest enrichedRR) {
String partition = enrichedRR.getRequest().getNodeLabelExpression();
if (partition == null) {
partition = CommonNodeLabelsManager.NO_LABEL;
@@ -726,7 +475,7 @@ public class OpportunisticContainerAllocator {
return partition;
}
- private String getRemoteNodePartition(RemoteNode node) {
+ protected String getRemoteNodePartition(RemoteNode node) {
String partition = node.getNodePartition();
if (partition == null) {
partition = CommonNodeLabelsManager.NO_LABEL;
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestOpportunisticContainerAllocator.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestOpportunisticContainerAllocator.java
index 548ddad..6a91f41 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestOpportunisticContainerAllocator.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestOpportunisticContainerAllocator.java
@@ -98,7 +98,7 @@ public class TestOpportunisticContainerAllocator {
return new byte[]{1, 2};
}
};
- allocator = new OpportunisticContainerAllocator(secMan);
+ allocator = new DistributedOpportunisticContainerAllocator(secMan);
oppCntxt = new OpportunisticContainerContext();
oppCntxt.getAppParams().setMinResource(Resource.newInstance(1024, 1));
oppCntxt.getAppParams().setIncrementResource(Resource.newInstance(512, 1));
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
index db3aaca..4bbae34 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
@@ -77,6 +77,7 @@ import
org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecret
import
org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
import
org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher;
import org.apache.hadoop.yarn.server.nodemanager.webapp.WebServer;
+import
org.apache.hadoop.yarn.server.scheduler.DistributedOpportunisticContainerAllocator;
import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.state.MultiStateTransitionListener;
@@ -479,7 +480,7 @@ public class NodeManager extends CompositeService
YarnConfiguration.
DEFAULT_OPP_CONTAINER_MAX_ALLOCATIONS_PER_AM_HEARTBEAT);
((NMContext) context).setQueueableContainerAllocator(
- new OpportunisticContainerAllocator(
+ new DistributedOpportunisticContainerAllocator(
context.getContainerTokenSecretManager(),
maxAllocationsPerAMHeartbeat));
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestDistributedScheduler.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestDistributedScheduler.java
index dee2a20..5a0715e 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestDistributedScheduler.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestDistributedScheduler.java
@@ -44,6 +44,7 @@ import
org.apache.hadoop.yarn.server.nodemanager.amrmproxy.RequestInterceptor;
import
org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
import
org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
+import
org.apache.hadoop.yarn.server.scheduler.DistributedOpportunisticContainerAllocator;
import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.Records;
@@ -232,7 +233,8 @@ public class TestDistributedScheduler {
};
nmContainerTokenSecretManager.setMasterKey(mKey);
OpportunisticContainerAllocator containerAllocator =
- new OpportunisticContainerAllocator(nmContainerTokenSecretManager);
+ new DistributedOpportunisticContainerAllocator(
+ nmContainerTokenSecretManager);
NMTokenSecretManagerInNM nmTokenSecretManagerInNM =
new NMTokenSecretManagerInNM();
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
index a360ed2..4475caf 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.yarn.server.metrics.OpportunisticSchedulerMetrics;
+import
org.apache.hadoop.yarn.server.scheduler.DistributedOpportunisticContainerAllocator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
@@ -233,9 +234,10 @@ public class OpportunisticContainerAllocatorAMService
YarnConfiguration.OPP_CONTAINER_MAX_ALLOCATIONS_PER_AM_HEARTBEAT,
YarnConfiguration.
DEFAULT_OPP_CONTAINER_MAX_ALLOCATIONS_PER_AM_HEARTBEAT);
- this.oppContainerAllocator = new OpportunisticContainerAllocator(
- rmContext.getContainerTokenSecretManager(),
- maxAllocationsPerAMHeartbeat);
+ this.oppContainerAllocator =
+ new DistributedOpportunisticContainerAllocator(
+ rmContext.getContainerTokenSecretManager(),
+ maxAllocationsPerAMHeartbeat);
this.k = rmContext.getYarnConfiguration().getInt(
YarnConfiguration.OPP_CONTAINER_ALLOCATION_NODES_NUMBER_USED,
YarnConfiguration.DEFAULT_OPP_CONTAINER_ALLOCATION_NODES_NUMBER_USED);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]