This is an automated email from the ASF dual-hosted git repository.
wangda pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 3d46141 YARN-10506. Update queue creation logic to use weight mode
and allow the flexible static/dynamic creation. (Contributed by Qi Zhu, Andras
Gyori)
3d46141 is described below
commit 3d46141583e357ee12181fc9084ef6234bedf3f3
Author: Wangda Tan <[email protected]>
AuthorDate: Fri Jan 15 14:20:08 2021 -0800
YARN-10506. Update queue creation logic to use weight mode and allow the
flexible static/dynamic creation. (Contributed by Qi Zhu, Andras Gyori)
Change-Id: I118862fd5e11ee6888275e2bcf667fedfa56c5d7
---
.../placement/ApplicationPlacementContext.java | 13 +
.../scheduler/capacity/AbstractCSQueue.java | 54 ++-
.../capacity/AbstractManagedParentQueue.java | 2 +-
.../scheduler/capacity/CSQueueUtils.java | 18 +-
.../scheduler/capacity/CapacityScheduler.java | 84 ++--
.../CapacitySchedulerAutoQueueHandler.java | 127 ++++++
.../capacity/CapacitySchedulerConfigValidator.java | 14 +-
.../capacity/CapacitySchedulerConfiguration.java | 27 +-
.../capacity/CapacitySchedulerQueueManager.java | 35 +-
.../scheduler/capacity/LeafQueue.java | 17 +-
.../scheduler/capacity/ParentQueue.java | 165 +++++++-
.../scheduler/capacity/PlanQueue.java | 3 +-
.../scheduler/capacity/QueueCapacities.java | 2 +-
.../TestCapacitySchedulerAutoQueueCreation.java | 25 +-
.../TestCapacitySchedulerNewQueueAutoCreation.java | 436 +++++++++++++++++++++
.../scheduler/capacity/TestLeafQueue.java | 4 +
16 files changed, 927 insertions(+), 99 deletions(-)
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/ApplicationPlacementContext.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/ApplicationPlacementContext.java
index f2f92b8..3ae9ac4 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/ApplicationPlacementContext.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/ApplicationPlacementContext.java
@@ -42,6 +42,10 @@ public class ApplicationPlacementContext {
return queue;
}
+ public void setQueue(String q) {
+ queue = q;
+ }
+
public String getParentQueue() {
return parentQueue;
}
@@ -49,4 +53,13 @@ public class ApplicationPlacementContext {
public boolean hasParentQueue() {
return parentQueue != null;
}
+
+ public String getFullQueuePath() {
+ if (parentQueue != null) {
+ return parentQueue + "." + queue;
+ } else {
+ return queue;
+ }
+ }
+
}
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
index 9e7b0d8..fd144f2 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
@@ -151,6 +151,14 @@ public abstract class AbstractCSQueue implements CSQueue {
private Map<String, Float> userWeights = new HashMap<String, Float>();
private int maxParallelApps;
+ // is it a dynamic queue?
+ private boolean dynamicQueue = false;
+
+ // When this queue has application submit to?
+ // This property only applies to dynamic queue,
+ // and will be used to check when the queue need to be removed.
+ private long lastSubmittedTimestamp;
+
public AbstractCSQueue(CapacitySchedulerContext cs,
String queueName, CSQueue parent, CSQueue old) throws IOException {
this(cs, cs.getConfiguration(), queueName, parent, old);
@@ -172,7 +180,7 @@ public abstract class AbstractCSQueue implements CSQueue {
this.metrics = old != null ?
(CSQueueMetrics) old.getMetrics() :
CSQueueMetrics.forQueue(getQueuePath(), parent,
- cs.getConfiguration().getEnableUserMetrics(), cs.getConf());
+ cs.getConfiguration().getEnableUserMetrics(), configuration);
this.csContext = cs;
this.minimumAllocation = csContext.getMinimumResourceCapability();
@@ -192,6 +200,7 @@ public abstract class AbstractCSQueue implements CSQueue {
writeLock = lock.writeLock();
}
+ @VisibleForTesting
protected void setupConfigurableCapacities() {
setupConfigurableCapacities(csContext.getConfiguration());
}
@@ -345,11 +354,6 @@ public abstract class AbstractCSQueue implements CSQueue {
return defaultLabelExpression;
}
- void setupQueueConfigs(Resource clusterResource)
- throws IOException {
- setupQueueConfigs(clusterResource, csContext.getConfiguration());
- }
-
protected void setupQueueConfigs(Resource clusterResource,
CapacitySchedulerConfiguration configuration) throws
IOException {
@@ -405,7 +409,7 @@ public abstract class AbstractCSQueue implements CSQueue {
QueueState parentState = (parent == null) ? null : parent.getState();
initializeQueueState(previous, configuredState, parentState);
- authorizer = YarnAuthorizationProvider.getInstance(csContext.getConf());
+ authorizer = YarnAuthorizationProvider.getInstance(configuration);
this.acls = configuration.getAcls(getQueuePath());
@@ -437,7 +441,7 @@ public abstract class AbstractCSQueue implements CSQueue {
}
this.reservationsContinueLooking =
- csContext.getConfiguration().getReservationContinueLook();
+ configuration.getReservationContinueLook();
this.preemptionDisabled = isQueueHierarchyPreemptionDisabled(this,
configuration);
@@ -1609,4 +1613,38 @@ public abstract class AbstractCSQueue implements CSQueue
{
}
}
}
+
+ public boolean isDynamicQueue() {
+ readLock.lock();
+
+ try {
+ return dynamicQueue;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ public void setDynamicQueue(boolean dynamicQueue) {
+ writeLock.lock();
+
+ try {
+ this.dynamicQueue = dynamicQueue;
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ public long getLastSubmittedTimestamp() {
+ return lastSubmittedTimestamp;
+ }
+
+ // "Tab" the queue, so this queue won't be removed because of idle timeout.
+ public void signalToSubmitToQueue() {
+ writeLock.lock();
+ try {
+ this.lastSubmittedTimestamp = System.currentTimeMillis();
+ } finally {
+ writeLock.unlock();
+ }
+ }
}
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractManagedParentQueue.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractManagedParentQueue.java
index 7bdc311..a9e82a6 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractManagedParentQueue.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractManagedParentQueue.java
@@ -58,7 +58,7 @@ public abstract class AbstractManagedParentQueue extends
ParentQueue {
writeLock.lock();
try {
// Set new configs
- setupQueueConfigs(clusterResource);
+ setupQueueConfigs(clusterResource, csContext.getConfiguration());
} finally {
writeLock.unlock();
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java
index 3fc256b..a403476 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java
@@ -22,6 +22,7 @@ import java.util.Set;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import
org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import
org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.utils.Lock;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
@@ -40,7 +41,7 @@ public class CSQueueUtils {
float capacity, float maximumCapacity) {
if (maximumCapacity < 0.0f || maximumCapacity > 1.0f) {
throw new IllegalArgumentException(
- "Illegal value of maximumCapacity " + maximumCapacity +
+ "Illegal value of maximumCapacity " + maximumCapacity +
" used in call to setMaxCapacity for queue " + queuePath);
}
}
@@ -61,11 +62,11 @@ public class CSQueueUtils {
public static float computeAbsoluteMaximumCapacity(
float maximumCapacity, CSQueue parent) {
- float parentAbsMaxCapacity =
+ float parentAbsMaxCapacity =
(parent == null) ? 1.0f : parent.getAbsoluteMaximumCapacity();
return (parentAbsMaxCapacity * maximumCapacity);
}
-
+
public static void loadCapacitiesByLabelsFromConf(String queuePath,
QueueCapacities queueCapacities, CapacitySchedulerConfiguration csConf) {
queueCapacities.clearConfigurableFields();
@@ -312,4 +313,15 @@ public class CSQueueUtils {
}
}
}
+
+ public static ApplicationPlacementContext extractQueuePath(String queuePath)
{
+ int parentQueueNameEndIndex = queuePath.lastIndexOf(".");
+ if (parentQueueNameEndIndex > -1) {
+ String parent = queuePath.substring(0, parentQueueNameEndIndex).trim();
+ String leaf = queuePath.substring(parentQueueNameEndIndex + 1).trim();
+ return new ApplicationPlacementContext(leaf, parent);
+ } else{
+ return new ApplicationPlacementContext(queuePath);
+ }
+ }
}
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index 86f3023..89c1cf7 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -230,6 +230,8 @@ public class CapacityScheduler extends
private AppPriorityACLsManager appPriorityACLManager;
private boolean multiNodePlacementEnabled;
+ private CapacitySchedulerAutoQueueHandler autoQueueHandler;
+
private static boolean printedVerboseLoggingForAsyncScheduling = false;
/**
@@ -340,6 +342,9 @@ public class CapacityScheduler extends
this.labelManager, this.appPriorityACLManager);
this.queueManager.setCapacitySchedulerContext(this);
+ this.autoQueueHandler = new CapacitySchedulerAutoQueueHandler(
+ this.queueManager, this.conf);
+
this.workflowPriorityMappingsMgr = new WorkflowPriorityMappingsManager();
this.activitiesManager = new ActivitiesManager(rmContext);
@@ -924,7 +929,7 @@ public class CapacityScheduler extends
private CSQueue getOrCreateQueueFromPlacementContext(ApplicationId
applicationId, String user, String queueName,
ApplicationPlacementContext placementContext,
- boolean isRecovery) {
+ boolean isRecovery) {
CSQueue queue = getQueue(queueName);
@@ -3329,44 +3334,6 @@ public class CapacityScheduler extends
return null;
}
- private LeafQueue autoCreateLeafQueue(
- ApplicationPlacementContext placementContext)
- throws IOException, YarnException {
-
- AutoCreatedLeafQueue autoCreatedLeafQueue = null;
-
- String leafQueueName = placementContext.getQueue();
- String parentQueueName = placementContext.getParentQueue();
-
- if (!StringUtils.isEmpty(parentQueueName)) {
- CSQueue parentQueue = getQueue(parentQueueName);
-
- if (parentQueue != null && conf.isAutoCreateChildQueueEnabled(
- parentQueue.getQueuePath())) {
-
- ManagedParentQueue autoCreateEnabledParentQueue =
- (ManagedParentQueue) parentQueue;
- autoCreatedLeafQueue = new AutoCreatedLeafQueue(this, leafQueueName,
- autoCreateEnabledParentQueue);
-
- addQueue(autoCreatedLeafQueue);
-
- } else{
- throw new SchedulerDynamicEditException(
- "Could not auto-create leaf queue for " + leafQueueName
- + ". Queue mapping specifies an invalid parent queue "
- + "which does not exist "
- + parentQueueName);
- }
- } else{
- throw new SchedulerDynamicEditException(
- "Could not auto-create leaf queue for " + leafQueueName
- + ". Queue mapping does not specify"
- + " which parent queue it needs to be created under.");
- }
- return autoCreatedLeafQueue;
- }
-
@Override
public void resetSchedulerMetrics() {
CapacitySchedulerMetrics.destroy();
@@ -3403,4 +3370,43 @@ public class CapacityScheduler extends
public void setQueueManager(CapacitySchedulerQueueManager qm) {
this.queueManager = qm;
}
+
+ private LeafQueue autoCreateLeafQueue(
+ ApplicationPlacementContext placementContext)
+ throws IOException, YarnException {
+ String leafQueueName = placementContext.getQueue();
+ String parentQueueName = placementContext.getParentQueue();
+
+ if (!StringUtils.isEmpty(parentQueueName)) {
+ CSQueue parentQueue = getQueue(parentQueueName);
+
+ if (parentQueue == null) {
+ throw new SchedulerDynamicEditException(
+ "Could not auto-create leaf queue for " + leafQueueName
+ + ". Queue mapping specifies an invalid parent queue "
+ + "which does not exist " + parentQueueName);
+ }
+
+ if (parentQueue != null &&
+ conf.isAutoCreateChildQueueEnabled(parentQueue.getQueuePath())) {
+ // Case 1: Handle ManagedParentQueue
+ AutoCreatedLeafQueue autoCreatedLeafQueue = null;
+ ManagedParentQueue autoCreateEnabledParentQueue =
+ (ManagedParentQueue) parentQueue;
+ autoCreatedLeafQueue = new AutoCreatedLeafQueue(this, leafQueueName,
+ autoCreateEnabledParentQueue);
+
+ addQueue(autoCreatedLeafQueue);
+ return autoCreatedLeafQueue;
+
+ } else {
+ return autoQueueHandler.autoCreateQueue(placementContext);
+ }
+ }
+
+ throw new SchedulerDynamicEditException(
+ "Could not auto-create leaf queue for " + leafQueueName
+ + ". Queue mapping does not specify"
+ + " which parent queue it needs to be created under.");
+ }
}
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerAutoQueueHandler.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerAutoQueueHandler.java
new file mode 100644
index 0000000..1730021
--- /dev/null
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerAutoQueueHandler.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import
org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext;
+import
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Manages the validation and the creation of a Capacity Scheduler
+ * queue at runtime.
+ */
+public class CapacitySchedulerAutoQueueHandler {
+ private final CapacitySchedulerQueueManager queueManager;
+ private final CapacitySchedulerConfiguration conf;
+ private static final int MAXIMUM_DEPTH_ALLOWED = 2;
+
+ public CapacitySchedulerAutoQueueHandler(
+ CapacitySchedulerQueueManager queueManager,
+ CapacitySchedulerConfiguration conf) {
+ this.queueManager = queueManager;
+ this.conf = conf;
+ }
+
+ /**
+ * Creates a LeafQueue and its upper hierarchy given a path. A parent is
+ * eligible for creation if either the placement context creation flags are
+ * set, or the auto queue creation is enabled for the first static parent in
+ * the hierarchy.
+ *
+ * @param queue the application placement information of the queue
+ * @return LeafQueue part of a given queue path
+ * @throws YarnException if the given path is not eligible to be auto created
+ */
+ public LeafQueue autoCreateQueue(ApplicationPlacementContext queue)
+ throws YarnException {
+ ApplicationPlacementContext parentContext =
+ CSQueueUtils.extractQueuePath(queue.getParentQueue());
+ List<ApplicationPlacementContext> parentsToCreate = new ArrayList<>();
+
+ ApplicationPlacementContext queueCandidateContext = parentContext;
+ CSQueue existingQueueCandidate =
getQueue(queueCandidateContext.getQueue());
+
+ while (existingQueueCandidate == null) {
+ parentsToCreate.add(queueCandidateContext);
+ queueCandidateContext = CSQueueUtils.extractQueuePath(
+ queueCandidateContext.getParentQueue());
+ existingQueueCandidate = getQueue(queueCandidateContext.getQueue());
+ }
+
+ // Reverse the collection to to represent the hierarchy to be created
+ // from highest to lowest level
+ Collections.reverse(parentsToCreate);
+
+ if (!(existingQueueCandidate instanceof ParentQueue)) {
+ throw new SchedulerDynamicEditException(
+ "Could not auto create hierarchy of "
+ + queue.getFullQueuePath() + ". Queue "
+ + existingQueueCandidate.getQueuePath() +
+ " is not a ParentQueue."
+ );
+ }
+ ParentQueue existingParentQueue = (ParentQueue) existingQueueCandidate;
+ int depthLimit = extractDepthLimit(existingParentQueue);
+ // The number of levels to be created including the LeafQueue
+ // (which is last)
+ int levelsToCreate = parentsToCreate.size() + 1;
+
+ if (depthLimit == 0) {
+ throw new SchedulerDynamicEditException("Auto creation of queue " +
+ queue.getFullQueuePath() + " is not enabled under parent "
+ + existingParentQueue.getQueuePath());
+ }
+
+ if (levelsToCreate > depthLimit) {
+ throw new SchedulerDynamicEditException(
+ "Could not auto create queue " + queue.getFullQueuePath()
+ + ". In order to create the desired queue hierarchy, " +
+ levelsToCreate + " levels of queues would need " +
+ "to be created, which is above the limit.");
+ }
+
+ for (ApplicationPlacementContext current : parentsToCreate) {
+ existingParentQueue = existingParentQueue
+ .addDynamicParentQueue(current.getFullQueuePath());
+ queueManager.addQueue(existingParentQueue.getQueuePath(),
+ existingParentQueue);
+ }
+
+ LeafQueue leafQueue = existingParentQueue.addDynamicLeafQueue(
+ queue.getFullQueuePath());
+ queueManager.addQueue(leafQueue.getQueuePath(), leafQueue);
+
+ return leafQueue;
+ }
+
+ private int extractDepthLimit(ParentQueue parentQueue) {
+ if (parentQueue.isEligibleForAutoQueueCreation()) {
+ return MAXIMUM_DEPTH_ALLOWED;
+ } else {
+ return 0;
+ }
+ }
+
+ private CSQueue getQueue(String queue) {
+ return queue != null ? queueManager.getQueue(queue) : null;
+ }
+}
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfigValidator.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfigValidator.java
index c3b4df4..ef9f97a 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfigValidator.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfigValidator.java
@@ -106,6 +106,10 @@ public final class CapacitySchedulerConfigValidator {
}
}
+ private static boolean isDynamicQueue(CSQueue csQueue) {
+ return ((AbstractCSQueue)csQueue).isDynamicQueue();
+ }
+
/**
* Ensure all existing queues are present. Queues cannot be deleted if its
not
* in Stopped state, Queue's cannot be moved from one hierarchy to other
also.
@@ -144,10 +148,12 @@ public final class CapacitySchedulerConfigValidator {
LOG.info("Deleting Queue " + queuePath + ", as it is not"
+ " present in the modified capacity configuration xml");
} else {
- throw new IOException(oldQueue.getQueuePath() + " cannot be"
- + " deleted from the capacity scheduler configuration, as the"
- + " queue is not yet in stopped state. Current State : "
- + oldQueue.getState());
+ if (!isDynamicQueue(oldQueue)) {
+ throw new IOException(oldQueue.getQueuePath() + " cannot be"
+ + " deleted from the capacity scheduler configuration, as
the"
+ + " queue is not yet in stopped state. Current State : "
+ + oldQueue.getState());
+ }
}
} else if (!oldQueue.getQueuePath().equals(newQueue.getQueuePath())) {
//Queue's cannot be moved from one hierarchy to other
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
index 9188cec..abbc2d7 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
@@ -526,7 +526,7 @@ public class CapacitySchedulerConfiguration extends
ReservationSchedulerConfigur
throwExceptionForUnexpectedWeight(weight, queue, label);
return weight;
}
-
+
public float getNonLabeledQueueCapacity(String queue) {
String configuredCapacity = get(getQueuePrefix(queue) + CAPACITY);
boolean absoluteResourceConfigured = (configuredCapacity != null)
@@ -2009,6 +2009,17 @@ public class CapacitySchedulerConfiguration extends
ReservationSchedulerConfigur
AUTO_CREATE_CHILD_QUEUE_PREFIX + "enabled";
@Private
+ private static final String AUTO_QUEUE_CREATION_V2_PREFIX =
+ "auto-queue-creation-v2";
+
+ @Private
+ public static final String AUTO_QUEUE_CREATION_V2_ENABLED =
+ AUTO_QUEUE_CREATION_V2_PREFIX + ".enabled";
+
+ @Private
+ public static final boolean DEFAULT_AUTO_QUEUE_CREATION_ENABLED = false;
+
+ @Private
public static final String AUTO_CREATED_LEAF_QUEUE_TEMPLATE_PREFIX =
"leaf-queue-template";
@@ -2044,6 +2055,20 @@ public class CapacitySchedulerConfiguration extends
ReservationSchedulerConfigur
autoCreationEnabled);
}
+ public void setAutoQueueCreationV2Enabled(String queuePath,
+ boolean autoQueueCreation) {
+ setBoolean(
+ getQueuePrefix(queuePath) + AUTO_QUEUE_CREATION_V2_ENABLED,
+ autoQueueCreation);
+ }
+
+ public boolean isAutoQueueCreationV2Enabled(String queuePath) {
+ boolean isAutoQueueCreation = getBoolean(
+ getQueuePrefix(queuePath) + AUTO_QUEUE_CREATION_V2_ENABLED,
+ DEFAULT_AUTO_QUEUE_CREATION_ENABLED);
+ return isAutoQueueCreation;
+ }
+
/**
* Get the auto created leaf queue's template configuration prefix
* Leaf queue's template capacities are configured at the parent queue
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java
index a3d6571..c5ce700 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java
@@ -20,6 +20,8 @@ package
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -176,7 +178,7 @@ public class CapacitySchedulerQueueManager implements
SchedulerQueueManager<
throws IOException {
// Parse new queues
CSQueueStore newQueues = new CSQueueStore();
- CSQueue newRoot = parseQueue(this.csContext, newConf, null,
+ CSQueue newRoot = parseQueue(this.csContext, newConf, null,
CapacitySchedulerConfiguration.ROOT, newQueues, queues, NOOP);
// When failing over, if using configuration store, don't validate queue
@@ -212,7 +214,7 @@ public class CapacitySchedulerQueueManager implements
SchedulerQueueManager<
* @param conf the CapacitySchedulerConfiguration
* @param parent the parent queue
* @param queueName the queue name
- * @param queues all the queues
+ * @param newQueues all the queues
* @param oldQueues the old queues
* @param hook the queue hook
* @return the CSQueue
@@ -222,18 +224,28 @@ public class CapacitySchedulerQueueManager implements
SchedulerQueueManager<
CapacitySchedulerContext csContext,
CapacitySchedulerConfiguration conf,
CSQueue parent, String queueName,
- CSQueueStore queues,
+ CSQueueStore newQueues,
CSQueueStore oldQueues,
QueueHook hook) throws IOException {
CSQueue queue;
String fullQueueName = (parent == null) ?
queueName :
(parent.getQueuePath() + "." + queueName);
- String[] childQueueNames = conf.getQueues(fullQueueName);
+ String[] staticChildQueueNames = conf.getQueues(fullQueueName);
+ List<String> childQueueNames = staticChildQueueNames != null ?
+ Arrays.asList(staticChildQueueNames) : Collections.emptyList();
+
boolean isReservableQueue = conf.isReservable(fullQueueName);
boolean isAutoCreateEnabled = conf.isAutoCreateChildQueueEnabled(
fullQueueName);
- if (childQueueNames == null || childQueueNames.length == 0) {
+ boolean isDynamicParent = false;
+
+ CSQueue oldQueue = oldQueues.get(fullQueueName);
+ if (oldQueue instanceof ParentQueue) {
+ isDynamicParent = ((ParentQueue) oldQueue).isDynamicQueue();
+ }
+
+ if (childQueueNames.size() == 0 && !isDynamicParent) {
if (null == parent) {
throw new IllegalStateException(
"Queue configuration missing child queue names for " + queueName);
@@ -258,7 +270,7 @@ public class CapacitySchedulerQueueManager implements
SchedulerQueueManager<
}
childQueues.add(resQueue);
((PlanQueue) queue).setChildQueues(childQueues);
- queues.add(resQueue);
+ newQueues.add(resQueue);
} else if (isAutoCreateEnabled) {
queue = new ManagedParentQueue(csContext, queueName, parent,
@@ -291,14 +303,14 @@ public class CapacitySchedulerQueueManager implements
SchedulerQueueManager<
List<CSQueue> childQueues = new ArrayList<>();
for (String childQueueName : childQueueNames) {
CSQueue childQueue = parseQueue(csContext, conf, queue, childQueueName,
- queues, oldQueues, hook);
+ newQueues, oldQueues, hook);
childQueues.add(childQueue);
}
parentQueue.setChildQueues(childQueues);
}
- queues.add(queue);
+ newQueues.add(queue);
LOG.info("Initialized queue: " + fullQueueName);
return queue;
@@ -320,11 +332,12 @@ public class CapacitySchedulerQueueManager implements
SchedulerQueueManager<
}
}
- for (CSQueue queue: existingQueues.getQueues()) {
- if (newQueues.get(queue.getQueuePath()) == null && !(
+ for (CSQueue queue : existingQueues.getQueues()) {
+ if (!((AbstractCSQueue) queue).isDynamicQueue() && newQueues.get(
+ queue.getQueuePath()) == null && !(
queue instanceof AutoCreatedLeafQueue && conf
.isAutoCreateChildQueueEnabled(
- queue.getParent().getQueuePath()))) {
+ queue.getParent().getQueuePath()))) {
existingQueues.remove(queue);
}
}
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index 1e6f581..15c321f 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -168,11 +168,6 @@ public class LeafQueue extends AbstractCSQueue {
}
- protected void setupQueueConfigs(Resource clusterResource)
- throws IOException {
- setupQueueConfigs(clusterResource, csContext.getConfiguration());
- }
-
@SuppressWarnings("checkstyle:nowhitespaceafter")
protected void setupQueueConfigs(Resource clusterResource,
CapacitySchedulerConfiguration conf) throws
@@ -529,6 +524,13 @@ public class LeafQueue extends AbstractCSQueue {
writeLock.lock();
try {
+ // We skip reinitialize for dynamic queues, when this is called, and
+ // new queue is different from this queue, we will make this queue to be
+ // static queue.
+ if (newlyParsedQueue != this) {
+ this.setDynamicQueue(false);
+ }
+
// Sanity check
if (!(newlyParsedQueue instanceof LeafQueue) || !newlyParsedQueue
.getQueuePath().equals(getQueuePath())) {
@@ -552,11 +554,6 @@ public class LeafQueue extends AbstractCSQueue {
}
setupQueueConfigs(clusterResource, configuration);
-
- // queue metrics are updated, more resource may be available
- // activate the pending applications if possible
- activateApplications();
-
} finally {
writeLock.unlock();
}
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
index fc848c6..0a2f082 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
@@ -31,6 +31,7 @@ import
org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -108,11 +109,18 @@ public class ParentQueue extends AbstractCSQueue {
public ParentQueue(CapacitySchedulerContext cs,
String queueName, CSQueue parent, CSQueue old) throws IOException {
+ this(cs, cs.getConfiguration(), queueName, parent, old);
+ }
+
+ private ParentQueue(CapacitySchedulerContext cs,
+ CapacitySchedulerConfiguration csConf, String queueName, CSQueue parent,
+ CSQueue old)
+ throws IOException {
super(cs, queueName, parent, old);
this.scheduler = cs;
this.rootQueue = (parent == null);
- float rawCapacity =
cs.getConfiguration().getNonLabeledQueueCapacity(getQueuePath());
+ float rawCapacity = csConf.getNonLabeledQueueCapacity(getQueuePath());
if (rootQueue &&
(rawCapacity !=
CapacitySchedulerConfiguration.MAXIMUM_CAPACITY_VALUE)) {
@@ -125,7 +133,7 @@ public class ParentQueue extends AbstractCSQueue {
this.allowZeroCapacitySum =
cs.getConfiguration().getAllowZeroCapacitySum(getQueuePath());
- setupQueueConfigs(cs.getClusterResource());
+ setupQueueConfigs(cs.getClusterResource(), csConf);
LOG.info("Initialized parent-queue " + queueName +
" name=" + queueName +
@@ -139,11 +147,12 @@ public class ParentQueue extends AbstractCSQueue {
queueOrderingPolicy.getConfigName();
}
- protected void setupQueueConfigs(Resource clusterResource)
+ protected void setupQueueConfigs(Resource clusterResource,
+ CapacitySchedulerConfiguration csConf)
throws IOException {
writeLock.lock();
try {
- super.setupQueueConfigs(clusterResource);
+ super.setupQueueConfigs(clusterResource, csConf);
StringBuilder aclsString = new StringBuilder();
for (Map.Entry<AccessType, AccessControlList> e : acls.entrySet()) {
aclsString.append(e.getKey() + ":" + e.getValue().getAclString());
@@ -158,7 +167,7 @@ public class ParentQueue extends AbstractCSQueue {
}
// Initialize queue ordering policy
- queueOrderingPolicy =
csContext.getConfiguration().getQueueOrderingPolicy(
+ queueOrderingPolicy = csConf.getQueueOrderingPolicy(
getQueuePath(), parent == null ?
null :
((ParentQueue) parent).getQueueOrderingPolicyConfigName());
@@ -247,14 +256,11 @@ public class ParentQueue extends AbstractCSQueue {
+ "double check, details:" + diagMsg.toString());
}
- if (weightIsSet) {
+ if (weightIsSet || queues.isEmpty()) {
return QueueCapacityType.WEIGHT;
} else if (absoluteMinResSet) {
return QueueCapacityType.ABSOLUTE_RESOURCE;
- } else if (percentageIsSet) {
- return QueueCapacityType.PERCENT;
} else {
- // When all values equals to 0, consider it is a percent mode.
return QueueCapacityType.PERCENT;
}
}
@@ -464,12 +470,132 @@ public class ParentQueue extends AbstractCSQueue {
"numApps=" + getNumApplications() + ", " +
"numContainers=" + getNumContainers();
}
+
+ private CapacitySchedulerConfiguration getConfForAutoCreatedQueue(
+ String childQueuePath, boolean isLeaf) {
+ // Copy existing config
+ CapacitySchedulerConfiguration dupCSConfig =
+ new CapacitySchedulerConfiguration(
+ csContext.getConfiguration(), false);
+ if (isLeaf) {
+ // FIXME: Ideally we should disable user limit factor, see YARN-10531
+ // dupCSConfig.setUserLimitFactor(childQueuePath, );
+
+ // Set Max AM percentage to a higher value
+ dupCSConfig.setMaximumApplicationMasterResourcePerQueuePercent(
+ childQueuePath, 0.5f);
+ }
+
+ return dupCSConfig;
+ }
+
+ private CSQueue createNewQueue(String childQueuePath, boolean isLeaf)
+ throws SchedulerDynamicEditException {
+ try {
+ AbstractCSQueue childQueue;
+ String queueShortName = childQueuePath.substring(
+ childQueuePath.lastIndexOf(".") + 1);
+
+ if (isLeaf) {
+ childQueue = new LeafQueue(csContext,
+ getConfForAutoCreatedQueue(childQueuePath, isLeaf), queueShortName,
+ this, null);
+ } else{
+ childQueue = new ParentQueue(csContext,
+ getConfForAutoCreatedQueue(childQueuePath, isLeaf), queueShortName,
+ this, null);
+ }
+ childQueue.setDynamicQueue(true);
+ // It should be sufficient now, we don't need to set more, because
weights
+ // related setup will be handled in updateClusterResources
+
+ return childQueue;
+ } catch (IOException e) {
+ throw new SchedulerDynamicEditException(e.toString());
+ }
+ }
+
+ public ParentQueue addDynamicParentQueue(String queuePath)
+ throws SchedulerDynamicEditException {
+ return (ParentQueue) addDynamicChildQueue(queuePath, false);
+ }
+
+ public LeafQueue addDynamicLeafQueue(String queuePath)
+ throws SchedulerDynamicEditException {
+ return (LeafQueue) addDynamicChildQueue(queuePath, true);
+ }
+
+ // New method to add child queue
+ private CSQueue addDynamicChildQueue(String childQueuePath, boolean isLeaf)
+ throws SchedulerDynamicEditException {
+ writeLock.lock();
+ try {
+ // Check if queue exists, if queue exists, write a warning message (this
+ // should not happen, since it will be handled before calling this
method)
+ // , but we will move on.
+ CSQueue queue =
+ csContext.getCapacitySchedulerQueueManager().getQueueByFullName(
+ childQueuePath);
+ if (queue != null) {
+ LOG.warn(
+ "This should not happen, trying to create queue=" + childQueuePath
+ + ", however the queue already exists");
+ return queue;
+ }
+
+ // First, check if we allow creation or not
+ boolean weightsAreUsed = false;
+ try {
+ weightsAreUsed = getCapacityConfigurationTypeForQueues(childQueues)
+ == QueueCapacityType.WEIGHT;
+ } catch (IOException e) {
+ LOG.warn("Caught Exception during auto queue creation", e);
+ }
+ if (!weightsAreUsed) {
+ throw new SchedulerDynamicEditException(
+ "Trying to create new queue=" + childQueuePath
+ + " but not all the queues under parent=" + this.getQueuePath()
+ + " are using weight-based capacity. Failed to created queue");
+ }
+
+ CSQueue newQueue = createNewQueue(childQueuePath, isLeaf);
+ this.childQueues.add(newQueue);
+
+ // Call updateClusterResource
+ // , which will deal with all effectiveMin/MaxResource
+ // Calculation
+ this.updateClusterResource(csContext.getClusterResource(),
+ new ResourceLimits(this.csContext.getClusterResource()));
+
+ return newQueue;
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ /**
+ * Check whether this queue supports adding additional child queues
+ * dynamically.
+ * @return true, if queue is eligible to create additional queues
dynamically,
+ * false otherwise
+ */
+ public boolean isEligibleForAutoQueueCreation() {
+ return isDynamicQueue() || csContext.getConfiguration().
+ isAutoQueueCreationV2Enabled(getQueuePath());
+ }
@Override
public void reinitialize(CSQueue newlyParsedQueue,
Resource clusterResource) throws IOException {
writeLock.lock();
try {
+ // We skip reinitialize for dynamic queues, when this is called, and
+ // new queue is different from this queue, we will make this queue to be
+ // static queue.
+ if (newlyParsedQueue != this) {
+ this.setDynamicQueue(false);
+ }
+
// Sanity check
if (!(newlyParsedQueue instanceof ParentQueue) || !newlyParsedQueue
.getQueuePath().equals(getQueuePath())) {
@@ -481,7 +607,7 @@ public class ParentQueue extends AbstractCSQueue {
ParentQueue newlyParsedParentQueue = (ParentQueue) newlyParsedQueue;
// Set new configs
- setupQueueConfigs(clusterResource);
+ setupQueueConfigs(clusterResource, csContext.getConfiguration());
// Re-configure existing child queues and add new ones
// The CS has already checked to ensure all existing child queues are
present!
@@ -537,6 +663,10 @@ public class ParentQueue extends AbstractCSQueue {
Map.Entry<String, CSQueue> e = itr.next();
String queueName = e.getKey();
if (!newChildQueues.containsKey(queueName)) {
+ if (((AbstractCSQueue)e.getValue()).isDynamicQueue()) {
+ // Don't remove dynamic queue if we cannot find it in the config.
+ continue;
+ }
itr.remove();
}
}
@@ -1045,11 +1175,26 @@ public class ParentQueue extends AbstractCSQueue {
// below calculation for effective capacities
updateAbsoluteCapacities();
+ // Normalize all dynamic queue queue's weight to 1 for all accessible
node
+ // labels, this is important because existing node labels could keep
+ // changing when new node added, or node label mapping changed. We need
+ // this to ensure auto created queue can access all labels.
+ for (String nodeLabel : queueCapacities.getExistingNodeLabels()) {
+ for (CSQueue queue : childQueues) {
+ // For dynamic queue, we will set weight to 1 every time, because it
+ // is possible new labels added to the parent.
+ if (((AbstractCSQueue) queue).isDynamicQueue()) {
+ queue.getQueueCapacities().setWeight(nodeLabel, 1f);
+ }
+ }
+ }
+
// Normalize weight of children
if (getCapacityConfigurationTypeForQueues(childQueues)
== QueueCapacityType.WEIGHT) {
for (String nodeLabel : queueCapacities.getExistingNodeLabels()) {
float sumOfWeight = 0;
+
for (CSQueue queue : childQueues) {
float weight = Math.max(0,
queue.getQueueCapacities().getWeight(nodeLabel));
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java
index 79afcdc..4dd3317 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java
@@ -46,6 +46,7 @@ public class PlanQueue extends AbstractManagedParentQueue {
public PlanQueue(CapacitySchedulerContext cs, String queueName,
CSQueue parent, CSQueue old) throws IOException {
super(cs, queueName, parent, old);
+ updateAbsoluteCapacities();
this.schedulerContext = cs;
// Set the reservation queue attributes for the Plan
@@ -100,7 +101,7 @@ public class PlanQueue extends AbstractManagedParentQueue {
}
// Set new configs
- setupQueueConfigs(clusterResource);
+ setupQueueConfigs(clusterResource, csContext.getConfiguration());
updateQuotas(newlyParsedParentQueue.userLimit,
newlyParsedParentQueue.userLimitFactor,
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueCapacities.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueCapacities.java
index 46bb0ca..86d35d6 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueCapacities.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueCapacities.java
@@ -82,7 +82,7 @@ public class QueueCapacities {
.append("reserved_cap=" + capacitiesArr[7] + "%, ")
.append("abs_reserved_cap=" + capacitiesArr[8] + "%, ")
.append("weight=" + capacitiesArr[9] + "w, ")
- .append("normalized_weight=" + capacitiesArr[9] + "w}");
+ .append("normalized_weight=" + capacitiesArr[10] + "w}");
return sb.toString();
}
}
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoQueueCreation.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoQueueCreation.java
index 1ef3a29..300993b 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoQueueCreation.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoQueueCreation.java
@@ -907,7 +907,12 @@ public class TestCapacitySchedulerAutoQueueCreation
@Test
public void testDynamicAutoQueueCreationWithTags()
throws Exception {
- MockRM rm = null;
+ // This test we will reinitialize mockRM, so stop the previous initialized
+ // mockRM to avoid issues like MetricsSystem
+ if (mockRM != null) {
+ mockRM.stop();
+ }
+ mockRM = null;
try {
CapacitySchedulerConfiguration csConf
= new CapacitySchedulerConfiguration();
@@ -929,35 +934,35 @@ public class TestCapacitySchedulerAutoQueueCreation
RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
mgr.init(csConf);
- rm = new MockRM(csConf) {
+ mockRM = new MockRM(csConf) {
@Override
public RMNodeLabelsManager createNodeLabelManager() {
return mgr;
}
};
- rm.start();
- MockNM nm = rm.registerNode("127.0.0.1:1234", 16 * GB);
+ mockRM.start();
+ MockNM nm = mockRM.registerNode("127.0.0.1:1234", 16 * GB);
MockRMAppSubmissionData data =
- MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
+ MockRMAppSubmissionData.Builder.createWithMemory(GB, mockRM)
.withAppName("apptodynamicqueue")
.withUser("hadoop")
.withAcls(null)
.withUnmanagedAM(false)
.withApplicationTags(Sets.newHashSet("userid=testuser"))
.build();
- RMApp app = MockRMAppSubmitter.submit(rm, data);
- MockRM.launchAndRegisterAM(app, rm, nm);
+ RMApp app = MockRMAppSubmitter.submit(mockRM, data);
+ MockRM.launchAndRegisterAM(app, mockRM, nm);
nm.nodeHeartbeat(true);
- CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+ CapacityScheduler cs = (CapacityScheduler) mockRM.getResourceScheduler();
CSQueue queue = cs.getQueue("root.a.testuser");
assertNotNull("Leaf queue has not been auto-created", queue);
assertEquals("Number of running applications", 1,
queue.getNumApplications());
} finally {
- if (rm != null) {
- rm.close();
+ if (mockRM != null) {
+ mockRM.close();
}
}
}
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNewQueueAutoCreation.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNewQueueAutoCreation.java
new file mode 100644
index 0000000..25b2f4d
--- /dev/null
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNewQueueAutoCreation.java
@@ -0,0 +1,436 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import
org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
+import
org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestCapacitySchedulerNewQueueAutoCreation
+ extends TestCapacitySchedulerAutoCreatedQueueBase {
+ private static final Logger LOG = LoggerFactory.getLogger(
+ org.apache.hadoop.yarn.server.resourcemanager
+ .scheduler.capacity.TestCapacitySchedulerAutoCreatedQueueBase.class);
+ public static final int GB = 1024;
+ private static final int MAX_MEMORY = 1200;
+ private MockRM mockRM = null;
+ private CapacityScheduler cs;
+ private CapacitySchedulerConfiguration csConf;
+ private CapacitySchedulerAutoQueueHandler autoQueueHandler;
+
+ /*
+ Create the following structure:
+ root
+ / \
+ a b
+ /
+ a1
+ */
+ @Before
+ public void setUp() throws Exception {
+ csConf = new CapacitySchedulerConfiguration();
+ csConf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+ ResourceScheduler.class);
+
+ // By default, set 3 queues, a/b, and a.a1
+ csConf.setQueues("root", new String[]{"a", "b"});
+ csConf.setNonLabeledQueueWeight("root", 1f);
+ csConf.setNonLabeledQueueWeight("root.a", 1f);
+ csConf.setNonLabeledQueueWeight("root.b", 1f);
+ csConf.setQueues("root.a", new String[]{"a1"});
+ csConf.setNonLabeledQueueWeight("root.a.a1", 1f);
+ csConf.setAutoQueueCreationV2Enabled("root", true);
+ csConf.setAutoQueueCreationV2Enabled("root.a", true);
+ csConf.setAutoQueueCreationV2Enabled("root.e", true);
+ }
+
+ private void startScheduler() throws Exception {
+ RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
+ mgr.init(csConf);
+ mockRM = new MockRM(csConf) {
+ protected RMNodeLabelsManager createNodeLabelManager() {
+ return mgr;
+ }
+ };
+ cs = (CapacityScheduler) mockRM.getResourceScheduler();
+ cs.updatePlacementRules();
+ mockRM.start();
+ cs.start();
+ autoQueueHandler = new CapacitySchedulerAutoQueueHandler(
+ cs.getCapacitySchedulerQueueManager(), csConf);
+ mockRM.registerNode("h1:1234", MAX_MEMORY * GB); // label = x
+ }
+
+ /*
+ Create and validate the following structure:
+
+ root
+ ┌─────┬────────┬─────┴─────┬─────────┐
+ a b c-auto e-auto d-auto
+ | |
+ a1 e1-auto
+ */
+ private void createBasicQueueStructureAndValidate() throws Exception {
+ // queue's weights are 1
+ // root
+ // - a (w=1)
+ // - b (w=1)
+ // - c-auto (w=1)
+ // - d-auto (w=1)
+ // - e-auto (w=1)
+ // - e1-auto (w=1)
+ MockNM nm1 = mockRM.registerNode("h1:1234", 1200 * GB); // label = x
+
+ createQueue("root.c-auto");
+
+ // Check if queue c-auto got created
+ CSQueue c = cs.getQueue("root.c-auto");
+ Assert.assertEquals(1 / 3f, c.getAbsoluteCapacity(), 1e-6);
+ Assert.assertEquals(1f, c.getQueueCapacities().getWeight(), 1e-6);
+ Assert.assertEquals(400 * GB,
+ c.getQueueResourceQuotas().getEffectiveMinResource().getMemorySize());
+
+ // Now add another queue-d, in the same hierarchy
+ createQueue("root.d-auto");
+
+ // Because queue-d has the same weight of other sibling queue, its abs cap
+ // become 1/4
+ CSQueue d = cs.getQueue("root.d-auto");
+ Assert.assertEquals(1 / 4f, d.getAbsoluteCapacity(), 1e-6);
+ Assert.assertEquals(1f, d.getQueueCapacities().getWeight(), 1e-6);
+ Assert.assertEquals(300 * GB,
+ d.getQueueResourceQuotas().getEffectiveMinResource().getMemorySize());
+
+ // Now we check queue c again, it should also become 1/4 capacity
+ Assert.assertEquals(1 / 4f, c.getAbsoluteCapacity(), 1e-6);
+ Assert.assertEquals(1f, c.getQueueCapacities().getWeight(), 1e-6);
+ Assert.assertEquals(300 * GB,
+ c.getQueueResourceQuotas().getEffectiveMinResource().getMemorySize());
+
+ // Now we add a two-level queue, create leaf only
+ // Now add another queue a2-auto, under root.a
+ createQueue("root.a.a2-auto");
+
+ // root.a has 1/4 abs resource, a2/a1 has the same weight, so a2 has 1/8
abs
+ // capacity
+ CSQueue a2 = cs.getQueue("root.a.a2-auto");
+ Assert.assertEquals(1 / 8f, a2.getAbsoluteCapacity(), 1e-6);
+ Assert.assertEquals(1f, a2.getQueueCapacities().getWeight(), 1e-6);
+ Assert.assertEquals(150 * GB,
+ a2.getQueueResourceQuotas().getEffectiveMinResource().getMemorySize());
+
+ // try, create leaf + parent, will success
+ createQueue("root.e-auto.e1-auto");
+
+ // Now check capacity of e and e1 (under root we have 5 queues, so e1 get
+ // 1/5 capacity
+ CSQueue e = cs.getQueue("root.e-auto");
+ Assert.assertEquals(1 / 5f, e.getAbsoluteCapacity(), 1e-6);
+ Assert.assertEquals(1f, e.getQueueCapacities().getWeight(), 1e-6);
+ Assert.assertEquals(240 * GB,
+ e.getQueueResourceQuotas().getEffectiveMinResource().getMemorySize());
+
+ // Under e, there's only one queue, so e1/e have same capacity
+ CSQueue e1 = cs.getQueue("root.e-auto.e1-auto");
+ Assert.assertEquals(1 / 5f, e1.getAbsoluteCapacity(), 1e-6);
+ Assert.assertEquals(1f, e1.getQueueCapacities().getWeight(), 1e-6);
+ Assert.assertEquals(240 * GB,
+ e1.getQueueResourceQuotas().getEffectiveMinResource().getMemorySize());
+ }
+
+ /*
+ Create and validate the structure:
+ root
+ ┌─────┬────────┬─────┴───────┐
+ a b c-auto d-auto
+ |
+ a1
+ */
+ @Test
+ public void testAutoCreateQueueWithSiblingsUnderRoot() throws Exception {
+ startScheduler();
+
+ createQueue("root.c-auto");
+
+ // Check if queue c-auto got created
+ CSQueue c = cs.getQueue("root.c-auto");
+ Assert.assertEquals(1 / 3f, c.getAbsoluteCapacity(), 1e-6);
+ Assert.assertEquals(1f, c.getQueueCapacities().getWeight(), 1e-6);
+ Assert.assertEquals(400 * GB,
+ c.getQueueResourceQuotas().getEffectiveMinResource().getMemorySize());
+
+ // Now add another queue-d, in the same hierarchy
+ createQueue("root.d-auto");
+
+ // Because queue-d has the same weight of other sibling queue, its abs cap
+ // become 1/4
+ CSQueue d = cs.getQueue("root.d-auto");
+ Assert.assertEquals(1 / 4f, d.getAbsoluteCapacity(), 1e-6);
+ Assert.assertEquals(1f, d.getQueueCapacities().getWeight(), 1e-6);
+ Assert.assertEquals(300 * GB,
+ d.getQueueResourceQuotas().getEffectiveMinResource().getMemorySize());
+
+ // Now we check queue c again, it should also become 1/4 capacity
+ Assert.assertEquals(1 / 4f, c.getAbsoluteCapacity(), 1e-6);
+ Assert.assertEquals(1f, c.getQueueCapacities().getWeight(), 1e-6);
+ Assert.assertEquals(300 * GB,
+ c.getQueueResourceQuotas().getEffectiveMinResource().getMemorySize());
+ }
+
+ /*
+ Create and validate the structure:
+ root
+ ┌─────┴─────┐
+ b a
+ / \
+ a1 a2-auto
+ */
+ @Test
+ public void testAutoCreateQueueStaticParentOneLevel() throws Exception {
+ startScheduler();
+ // Now we add a two-level queue, create leaf only
+ // Now add another queue a2-auto, under root.a
+ createQueue("root.a.a2-auto");
+
+ // root.a has 1/2 abs resource, a2/a1 has the same weight, so a2 has 1/4
abs
+ // capacity
+ CSQueue a2 = cs.getQueue("root.a.a2-auto");
+ Assert.assertEquals(1 / 4f, a2.getAbsoluteCapacity(), 1e-6);
+ Assert.assertEquals(1f, a2.getQueueCapacities().getWeight(), 1e-6);
+ Assert.assertEquals(MAX_MEMORY * (1 / 4f) * GB,
+ a2.getQueueResourceQuotas().getEffectiveMinResource().getMemorySize(),
+ 1e-6);
+
+ }
+
+ /*
+ Create and validate the structure:
+ root
+ ┌─────┴─────┐
+ b a
+ | \
+ a1 a2-auto
+ | \
+ a3-auto a4-auto
+ */
+ @Test
+ public void testAutoCreateQueueAutoParentTwoLevelsWithSiblings()
+ throws Exception {
+ startScheduler();
+ csConf.setAutoQueueCreationV2Enabled("root.a.a2-auto", true);
+
+ // root.a has 1/2 abs resource -> a1 and a2-auto same weight 1/4
+ // -> a3-auto is alone with weight 1/4
+ createQueue("root.a.a2-auto.a3-auto");
+ CSQueue a3 = cs.getQueue("root.a.a2-auto.a3-auto");
+ Assert.assertEquals(1 / 4f, a3.getAbsoluteCapacity(), 1e-6);
+ Assert.assertEquals(1f, a3.getQueueCapacities().getWeight(), 1e-6);
+ Assert.assertEquals(MAX_MEMORY * (1 / 4f) * GB,
+ a3.getQueueResourceQuotas().getEffectiveMinResource().getMemorySize(),
+ 1e-6);
+
+ // root.a has 1/2 abs resource -> a1 and a2-auto same weight 1/4
+ // -> a3-auto and a4-auto same weight 1/8
+ createQueue("root.a.a2-auto.a4-auto");
+ CSQueue a4 = cs.getQueue("root.a.a2-auto.a4-auto");
+ Assert.assertEquals(1 / 8f, a3.getAbsoluteCapacity(), 1e-6);
+ Assert.assertEquals(1f, a3.getQueueCapacities().getWeight(), 1e-6);
+ Assert.assertEquals(MAX_MEMORY * (1 / 8f) * GB,
+ a4.getQueueResourceQuotas().getEffectiveMinResource().getMemorySize(),
+ 1e-6);
+ }
+
+ @Test(expected = SchedulerDynamicEditException.class)
+ public void testAutoCreateQueueShouldFailWhenNonParentQueue()
+ throws Exception {
+ startScheduler();
+ createQueue("root.a.a1.a2-auto");
+ }
+
+ @Test(expected = SchedulerDynamicEditException.class)
+ public void testAutoCreateQueueWhenSiblingsNotInWeightMode()
+ throws Exception {
+ startScheduler();
+ csConf.setCapacity("root.a", 50f);
+ csConf.setCapacity("root.b", 50f);
+ csConf.setCapacity("root.a.a1", 100f);
+ cs.reinitialize(csConf, mockRM.getRMContext());
+ createQueue("root.a.a2-auto");
+ }
+
+ @Test(expected = SchedulerDynamicEditException.class)
+ public void testAutoCreateQueueShouldFailIfDepthIsAboveLimit()
+ throws Exception {
+ startScheduler();
+ createQueue("root.a.a3-auto.a4-auto.a5-auto");
+ }
+
+ @Test(expected = SchedulerDynamicEditException.class)
+ public void testAutoCreateQueueShouldFailIfNotEnabledForParent()
+ throws Exception {
+ startScheduler();
+ csConf.setAutoQueueCreationV2Enabled("root", false);
+ cs.reinitialize(csConf, mockRM.getRMContext());
+ createQueue("root.c-auto");
+ }
+
+ @Test
+ public void testAutoCreateQueueRefresh() throws Exception {
+ startScheduler();
+
+ createBasicQueueStructureAndValidate();
+
+ // Refresh the queue to make sure all queues are still exist.
+ // (Basically, dynamic queues should not disappear after refresh).
+ cs.reinitialize(csConf, mockRM.getRMContext());
+
+ // Double confirm, after refresh, we should still see root queue has 5
+ // children.
+ Assert.assertEquals(5, cs.getQueue("root").getChildQueues().size());
+ Assert.assertNotNull(cs.getQueue("root.c-auto"));
+ }
+
+ @Test
+ public void testConvertDynamicToStaticQueue() throws Exception {
+ startScheduler();
+
+ createBasicQueueStructureAndValidate();
+
+ // Now, update root.a's weight to 6
+ csConf.setNonLabeledQueueWeight("root.a", 6f);
+ cs.reinitialize(csConf, mockRM.getRMContext());
+
+ // Double confirm, after refresh, we should still see root queue has 5
+ // children.
+ Assert.assertEquals(5, cs.getQueue("root").getChildQueues().size());
+
+ // Get queue a
+ CSQueue a = cs.getQueue("root.a");
+
+ // a's abs resource should be 6/10, (since a.weight=6, all other 4 peers
+ // have weight=1).
+ Assert.assertEquals(6 / 10f, a.getAbsoluteCapacity(), 1e-6);
+ Assert.assertEquals(720 * GB,
+ a.getQueueResourceQuotas().getEffectiveMinResource().getMemorySize());
+ Assert.assertEquals(6f, a.getQueueCapacities().getWeight(), 1e-6);
+
+ // Set queue c-auto's weight to 6, and mark c-auto to be static queue
+ csConf.setQueues("root", new String[]{"a", "b", "c-auto"});
+ csConf.setNonLabeledQueueWeight("root.c-auto", 6f);
+ cs.reinitialize(csConf, mockRM.getRMContext());
+
+ // Get queue c
+ CSQueue c = cs.getQueue("root.c-auto");
+
+ // c's abs resource should be 6/15, (since a/c.weight=6, all other 3 peers
+ // have weight=1).
+ Assert.assertEquals(6 / 15f, c.getAbsoluteCapacity(), 1e-6);
+ Assert.assertEquals(480 * GB,
+ c.getQueueResourceQuotas().getEffectiveMinResource().getMemorySize());
+ Assert.assertEquals(6f, c.getQueueCapacities().getWeight(), 1e-6);
+
+ // First, create e2-auto queue
+ createQueue("root.e-auto.e2-auto");
+
+ // Do change 2nd level queue from dynamic to static
+ csConf.setQueues("root", new String[]{"a", "b", "c-auto", "e-auto"});
+ csConf.setNonLabeledQueueWeight("root.e-auto", 6f);
+ csConf.setQueues("root.e-auto", new String[]{"e1-auto"});
+ csConf.setNonLabeledQueueWeight("root.e-auto.e1-auto", 6f);
+ cs.reinitialize(csConf, mockRM.getRMContext());
+
+ // Get queue e1
+ CSQueue e1 = cs.getQueue("root.e-auto.e1-auto");
+
+ // e's abs resource should be 6/20 * (6/7),
+ // (since a/c/e.weight=6, all other 2 peers
+ // have weight=1, and e1's weight is 6, e2's weight is 1).
+ float e1NormalizedWeight = (6 / 20f) * (6 / 7f);
+ Assert.assertEquals(e1NormalizedWeight, e1.getAbsoluteCapacity(), 1e-6);
+ assertQueueMinResource(e1, MAX_MEMORY * e1NormalizedWeight);
+ Assert.assertEquals(6f, e1.getQueueCapacities().getWeight(), 1e-6);
+ }
+
+ /*
+ Create the structure and convert d-auto to static and leave d1-auto as
dynamic
+ root
+ ┌─────┬─────────────┴──────┐
+ a b d-auto
+ | |
+ a1 d1-auto
+ */
+ @Test
+ public void testConvertDynamicParentToStaticParent() throws Exception {
+ startScheduler();
+ createQueue("root.d-auto.d1-auto");
+ csConf.setQueues("root", new String[]{"a", "b", "d-auto"});
+ csConf.setNonLabeledQueueWeight("root.a", 6f);
+ csConf.setNonLabeledQueueWeight("root.d-auto", 1f);
+ cs.reinitialize(csConf, mockRM.getRMContext());
+
+ CSQueue d = cs.getQueue("root.d-auto");
+
+ Assert.assertEquals(1 / 8f, d.getAbsoluteCapacity(), 1e-6);
+ assertQueueMinResource(d, MAX_MEMORY * (1 / 8f));
+ Assert.assertEquals(1f, d.getQueueCapacities().getWeight(), 1e-6);
+
+ CSQueue d1 = cs.getQueue("root.d-auto.d1-auto");
+ Assert.assertEquals(1 / 8f, d1.getAbsoluteCapacity(), 1e-6);
+ assertQueueMinResource(d1, MAX_MEMORY * (1 / 8f));
+ Assert.assertEquals(1f, d1.getQueueCapacities().getWeight(), 1e-6);
+ }
+
+ @Test
+ public void testAutoQueueCreationOnAppSubmission() throws Exception {
+ startScheduler();
+ createBasicQueueStructureAndValidate();
+
+ submitApp(cs, USER0, USER0, "root.e-auto");
+
+ AbstractCSQueue e = (AbstractCSQueue) cs.getQueue("root.e-auto");
+ Assert.assertNotNull(e);
+ Assert.assertTrue(e.isDynamicQueue());
+
+ AbstractCSQueue user0 = (AbstractCSQueue) cs.getQueue(
+ "root.e-auto." + USER0);
+ Assert.assertNotNull(user0);
+ Assert.assertTrue(user0.isDynamicQueue());
+ }
+
+ private LeafQueue createQueue(String queuePath) throws YarnException {
+ return autoQueueHandler.autoCreateQueue(
+ CSQueueUtils.extractQueuePath(queuePath));
+ }
+
+ private void assertQueueMinResource(CSQueue queue, float expected) {
+ Assert.assertEquals(Math.round(expected * GB),
+ queue.getQueueResourceQuotas().getEffectiveMinResource()
+ .getMemorySize(), 1e-6);
+ }
+}
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
index 3a6fe2a..0c9799d 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
@@ -3291,7 +3291,11 @@ public class TestLeafQueue {
newQueues, queues,
TestUtils.spyHook);
queues = newQueues;
+ // This will not update active apps
root.reinitialize(newRoot, csContext.getClusterResource());
+ // Cause this to update active apps
+ root.updateClusterResource(csContext.getClusterResource(),
+ new ResourceLimits(csContext.getClusterResource()));
// after reinitialization
assertEquals(3, e.getNumActiveApplications());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]