http://git-wip-us.apache.org/repos/asf/hadoop/blob/b38643c9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ManagedParentQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ManagedParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ManagedParentQueue.java index ff795e4..cbdb21d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ManagedParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ManagedParentQueue.java @@ -17,13 +17,23 @@ */ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.resourcemanager.scheduler .SchedulerDynamicEditException; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica + .FiCaSchedulerApp; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; /** * Auto Creation enabled Parent queue. This queue initially does not have any @@ -44,54 +54,125 @@ public class ManagedParentQueue extends AbstractManagedParentQueue { final String queueName, final CSQueue parent, final CSQueue old) throws IOException { super(cs, queueName, parent, old); - String leafQueueTemplateConfPrefix = getLeafQueueConfigPrefix( - csContext.getConfiguration()); - this.leafQueueTemplate = initializeLeafQueueConfigs( - leafQueueTemplateConfPrefix).build(); + + shouldFailAutoCreationWhenGuaranteedCapacityExceeded = + csContext.getConfiguration() + .getShouldFailAutoQueueCreationWhenGuaranteedCapacityExceeded( + getQueuePath()); + + leafQueueTemplate = initializeLeafQueueConfigs().build(); StringBuffer queueInfo = new StringBuffer(); queueInfo.append("Created Managed Parent Queue: ").append(queueName).append( "]\nwith capacity: [").append(super.getCapacity()).append( "]\nwith max capacity: [").append(super.getMaximumCapacity()).append( - "\nwith max apps: [").append(leafQueueTemplate.getMaxApps()).append( - "]\nwith max apps per user: [").append( - leafQueueTemplate.getMaxAppsPerUser()).append("]\nwith user limit: [") - .append(leafQueueTemplate.getUserLimit()).append( - "]\nwith user limit factor: [").append( - leafQueueTemplate.getUserLimitFactor()).append("]."); + "]."); LOG.info(queueInfo.toString()); + + initializeQueueManagementPolicy(); } @Override public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource) throws IOException { - validate(newlyParsedQueue); - super.reinitialize(newlyParsedQueue, clusterResource); - String leafQueueTemplateConfPrefix = getLeafQueueConfigPrefix( - csContext.getConfiguration()); - this.leafQueueTemplate = initializeLeafQueueConfigs( - leafQueueTemplateConfPrefix).build(); + + try { + writeLock.lock(); + validate(newlyParsedQueue); + + shouldFailAutoCreationWhenGuaranteedCapacityExceeded = + csContext.getConfiguration() + .getShouldFailAutoQueueCreationWhenGuaranteedCapacityExceeded( + getQueuePath()); + + //validate if capacity is exceeded for child queues + if (shouldFailAutoCreationWhenGuaranteedCapacityExceeded) { + float childCap = sumOfChildCapacities(); + if (getCapacity() < childCap) { + throw new IOException( + "Total of Auto Created leaf queues guaranteed capacity : " + + childCap + " exceeds Parent queue's " + getQueuePath() + + " guaranteed capacity " + getCapacity() + "" + + ".Cannot enforce policy to auto" + + " create queues beyond parent queue's capacity"); + } + } + + leafQueueTemplate = initializeLeafQueueConfigs().build(); + + super.reinitialize(newlyParsedQueue, clusterResource); + + // run reinitialize on each existing queue, to trigger absolute cap + // recomputations + for (CSQueue res : this.getChildQueues()) { + res.reinitialize(res, clusterResource); + } + + //clear state in policy + reinitializeQueueManagementPolicy(); + + //reassign capacities according to policy + final List<QueueManagementChange> queueManagementChanges = + queueManagementPolicy.computeQueueManagementChanges(); + + validateAndApplyQueueManagementChanges(queueManagementChanges); + + StringBuffer queueInfo = new StringBuffer(); + queueInfo.append("Reinitialized Managed Parent Queue: ").append(queueName) + .append("]\nwith capacity: [").append(super.getCapacity()).append( + "]\nwith max capacity: [").append(super.getMaximumCapacity()).append( + "]."); + LOG.info(queueInfo.toString()); + } catch (YarnException ye) { + LOG.error("Exception while computing policy changes for leaf queue : " + + getQueueName(), ye); + throw new IOException(ye); + } finally { + writeLock.unlock(); + } } - @Override - protected AutoCreatedLeafQueueTemplate.Builder initializeLeafQueueConfigs( - String queuePath) { + private void initializeQueueManagementPolicy() { + queueManagementPolicy = + csContext.getConfiguration().getAutoCreatedQueueManagementPolicyClass( + getQueuePath()); + + queueManagementPolicy.init(csContext, this); + } + + private void reinitializeQueueManagementPolicy() { + AutoCreatedQueueManagementPolicy managementPolicy = + csContext.getConfiguration().getAutoCreatedQueueManagementPolicyClass( + getQueuePath()); + + if (!(managementPolicy.getClass().equals( + this.queueManagementPolicy.getClass()))) { + queueManagementPolicy = managementPolicy; + queueManagementPolicy.init(csContext, this); + } else{ + queueManagementPolicy.reinitialize(csContext, this); + } + } + + protected AutoCreatedLeafQueueConfig.Builder initializeLeafQueueConfigs() { - AutoCreatedLeafQueueTemplate.Builder leafQueueTemplate = - super.initializeLeafQueueConfigs(queuePath); + AutoCreatedLeafQueueConfig.Builder builder = + new AutoCreatedLeafQueueConfig.Builder(); - CapacitySchedulerConfiguration conf = csContext.getConfiguration(); - String leafQueueTemplateConfPrefix = getLeafQueueConfigPrefix(conf); + String leafQueueTemplateConfPrefix = getLeafQueueConfigPrefix( + csContext.getConfiguration()); + //Load template configuration + builder.configuration( + super.initializeLeafQueueConfigs(leafQueueTemplateConfPrefix)); + + //Load template capacities QueueCapacities queueCapacities = new QueueCapacities(false); - CSQueueUtils.loadUpdateAndCheckCapacities(leafQueueTemplateConfPrefix, + CSQueueUtils.loadUpdateAndCheckCapacities(csContext.getConfiguration() + .getAutoCreatedQueueTemplateConfPrefix(getQueuePath()), csContext.getConfiguration(), queueCapacities, getQueueCapacities()); - leafQueueTemplate.capacities(queueCapacities); - - shouldFailAutoCreationWhenGuaranteedCapacityExceeded = - conf.getShouldFailAutoQueueCreationWhenGuaranteedCapacityExceeded( - getQueuePath()); + builder.capacities(queueCapacities); - return leafQueueTemplate; + return builder; } protected void validate(final CSQueue newlyParsedQueue) throws IOException { @@ -106,7 +187,7 @@ public class ManagedParentQueue extends AbstractManagedParentQueue { @Override public void addChildQueue(CSQueue childQueue) - throws SchedulerDynamicEditException { + throws SchedulerDynamicEditException, IOException { try { writeLock.lock(); @@ -138,21 +219,164 @@ public class ManagedParentQueue extends AbstractManagedParentQueue { throw new SchedulerDynamicEditException( "Cannot auto create leaf queue " + leafQueueName + ". Child " + "queues capacities have reached parent queue : " - + parentQueue.getQueuePath() + " guaranteed capacity"); + + parentQueue.getQueuePath() + "'s guaranteed capacity"); } } AutoCreatedLeafQueue leafQueue = (AutoCreatedLeafQueue) childQueue; super.addChildQueue(leafQueue); - //TODO - refresh policy queue after capacity management is added + final AutoCreatedLeafQueueConfig initialLeafQueueTemplate = + queueManagementPolicy.getInitialLeafQueueConfiguration(leafQueue); + leafQueue.reinitializeFromTemplate(initialLeafQueueTemplate); } finally { writeLock.unlock(); } } - private String getLeafQueueConfigPrefix(CapacitySchedulerConfiguration conf) { - return conf.getAutoCreatedQueueTemplateConfPrefix(getQueuePath()); + public List<FiCaSchedulerApp> getScheduleableApplications() { + try { + readLock.lock(); + List<FiCaSchedulerApp> apps = new ArrayList<>(); + for (CSQueue childQueue : getChildQueues()) { + apps.addAll(((LeafQueue) childQueue).getApplications()); + } + return Collections.unmodifiableList(apps); + } finally { + readLock.unlock(); + } } + public List<FiCaSchedulerApp> getPendingApplications() { + try { + readLock.lock(); + List<FiCaSchedulerApp> apps = new ArrayList<>(); + for (CSQueue childQueue : getChildQueues()) { + apps.addAll(((LeafQueue) childQueue).getPendingApplications()); + } + return Collections.unmodifiableList(apps); + } finally { + readLock.unlock(); + } + } + + public List<FiCaSchedulerApp> getAllApplications() { + try { + readLock.lock(); + List<FiCaSchedulerApp> apps = new ArrayList<>(); + for (CSQueue childQueue : getChildQueues()) { + apps.addAll(((LeafQueue) childQueue).getAllApplications()); + } + return Collections.unmodifiableList(apps); + } finally { + readLock.unlock(); + } + } + + public String getLeafQueueConfigPrefix(CapacitySchedulerConfiguration conf) { + return CapacitySchedulerConfiguration.PREFIX + conf + .getAutoCreatedQueueTemplateConfPrefix(getQueuePath()); + } + + public boolean shouldFailAutoCreationWhenGuaranteedCapacityExceeded() { + return shouldFailAutoCreationWhenGuaranteedCapacityExceeded; + } + + /** + * Asynchronously called from scheduler to apply queue management changes + * + * @param queueManagementChanges + */ + public void validateAndApplyQueueManagementChanges( + List<QueueManagementChange> queueManagementChanges) + throws IOException, SchedulerDynamicEditException { + try { + writeLock.lock(); + + validateQueueManagementChanges(queueManagementChanges); + + applyQueueManagementChanges(queueManagementChanges); + + AutoCreatedQueueManagementPolicy policy = + getAutoCreatedQueueManagementPolicy(); + + //acquires write lock on policy + policy.commitQueueManagementChanges(queueManagementChanges); + + } finally { + writeLock.unlock(); + } + } + + public void validateQueueManagementChanges( + List<QueueManagementChange> queueManagementChanges) + throws SchedulerDynamicEditException { + + for (QueueManagementChange queueManagementChange : queueManagementChanges) { + + CSQueue childQueue = queueManagementChange.getQueue(); + + if (!(childQueue instanceof AutoCreatedLeafQueue)) { + throw new SchedulerDynamicEditException( + "queue should be " + "AutoCreatedLeafQueue. Found " + childQueue + .getClass()); + } + + if (!(AbstractManagedParentQueue.class. + isAssignableFrom(childQueue.getParent().getClass()))) { + LOG.error("Queue " + getQueueName() + + " is not an instance of PlanQueue or ManagedParentQueue." + " " + + "Ignoring update " + queueManagementChanges); + throw new SchedulerDynamicEditException( + "Queue " + getQueueName() + " is not a AutoEnabledParentQueue." + + " Ignoring update " + queueManagementChanges); + } + + switch (queueManagementChange.getQueueAction()){ + case UPDATE_QUEUE: + AutoCreatedLeafQueueConfig template = + queueManagementChange.getUpdatedQueueTemplate(); + ((AutoCreatedLeafQueue) childQueue).validateConfigurations(template); + break; + } + } + } + + private void applyQueueManagementChanges( + List<QueueManagementChange> queueManagementChanges) + throws SchedulerDynamicEditException, IOException { + for (QueueManagementChange queueManagementChange : queueManagementChanges) { + switch (queueManagementChange.getQueueAction()){ + case UPDATE_QUEUE: + AutoCreatedLeafQueue childQueueToBeUpdated = + (AutoCreatedLeafQueue) queueManagementChange.getQueue(); + //acquires write lock on leaf queue + childQueueToBeUpdated.reinitializeFromTemplate( + queueManagementChange.getUpdatedQueueTemplate()); + break; + } + } + } + + public CapacitySchedulerConfiguration getLeafQueueConfigs( + String leafQueueName) { + return getLeafQueueConfigs(getLeafQueueTemplate().getLeafQueueConfigs(), + leafQueueName); + } + + public CapacitySchedulerConfiguration getLeafQueueConfigs( + CapacitySchedulerConfiguration templateConfig, String leafQueueName) { + CapacitySchedulerConfiguration leafQueueConfigTemplate = new + CapacitySchedulerConfiguration(new Configuration(false), false); + for (final Iterator<Map.Entry<String, String>> iterator = + templateConfig.iterator(); iterator.hasNext(); ) { + Map.Entry<String, String> confKeyValuePair = iterator.next(); + final String name = confKeyValuePair.getKey().replaceFirst( + CapacitySchedulerConfiguration + .AUTO_CREATED_LEAF_QUEUE_TEMPLATE_PREFIX, + leafQueueName); + leafQueueConfigTemplate.set(name, confKeyValuePair.getValue()); + } + return leafQueueConfigTemplate; + } } \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b38643c9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java index b7f8aa6..757002f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java @@ -22,6 +22,7 @@ import java.io.IOException; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,61 +36,132 @@ public class PlanQueue extends AbstractManagedParentQueue { private static final Logger LOG = LoggerFactory.getLogger(PlanQueue.class); + private int maxAppsForReservation; + private int maxAppsPerUserForReservation; + private int userLimit; + private float userLimitFactor; + protected CapacitySchedulerContext schedulerContext; private boolean showReservationsAsQueues; public PlanQueue(CapacitySchedulerContext cs, String queueName, CSQueue parent, CSQueue old) throws IOException { super(cs, queueName, parent, old); - this.leafQueueTemplate = initializeLeafQueueConfigs(getQueuePath()).build(); + + this.schedulerContext = cs; + // Set the reservation queue attributes for the Plan + CapacitySchedulerConfiguration conf = cs.getConfiguration(); + String queuePath = super.getQueuePath(); + int maxAppsForReservation = conf.getMaximumApplicationsPerQueue(queuePath); + showReservationsAsQueues = conf.getShowReservationAsQueues(queuePath); + if (maxAppsForReservation < 0) { + maxAppsForReservation = + (int) (CapacitySchedulerConfiguration. + DEFAULT_MAXIMUM_SYSTEM_APPLICATIIONS * super + .getAbsoluteCapacity()); + } + int userLimit = conf.getUserLimit(queuePath); + float userLimitFactor = conf.getUserLimitFactor(queuePath); + int maxAppsPerUserForReservation = + (int) (maxAppsForReservation * (userLimit / 100.0f) * userLimitFactor); + updateQuotas(userLimit, userLimitFactor, maxAppsForReservation, + maxAppsPerUserForReservation); StringBuffer queueInfo = new StringBuffer(); - queueInfo.append("Created Plan Queue: ").append(queueName).append( - "]\nwith capacity: [").append(super.getCapacity()).append( - "]\nwith max capacity: [").append(super.getMaximumCapacity()).append( - "\nwith max apps: [").append(leafQueueTemplate.getMaxApps()).append( - "]\nwith max apps per user: [").append( - leafQueueTemplate.getMaxAppsPerUser()).append("]\nwith user limit: [") - .append(leafQueueTemplate.getUserLimit()).append( - "]\nwith user limit factor: [").append( - leafQueueTemplate.getUserLimitFactor()).append("]."); + queueInfo.append("Created Plan Queue: ").append(queueName) + .append("\nwith capacity: [").append(super.getCapacity()) + .append("]\nwith max capacity: [").append(super.getMaximumCapacity()) + .append("\nwith max reservation apps: [").append(maxAppsForReservation) + .append("]\nwith max reservation apps per user: [") + .append(maxAppsPerUserForReservation).append("]\nwith user limit: [") + .append(userLimit).append("]\nwith user limit factor: [") + .append(userLimitFactor).append("]."); LOG.info(queueInfo.toString()); } @Override - public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource) - throws IOException { - validate(newlyParsedQueue); - super.reinitialize(newlyParsedQueue, clusterResource); - this.leafQueueTemplate = initializeLeafQueueConfigs(getQueuePath()).build(); + public void reinitialize(CSQueue newlyParsedQueue, + Resource clusterResource) throws IOException { + try { + writeLock.lock(); + // Sanity check + if (!(newlyParsedQueue instanceof PlanQueue) || !newlyParsedQueue + .getQueuePath().equals(getQueuePath())) { + throw new IOException( + "Trying to reinitialize " + getQueuePath() + " from " + + newlyParsedQueue.getQueuePath()); + } + + PlanQueue newlyParsedParentQueue = (PlanQueue) newlyParsedQueue; + + if (newlyParsedParentQueue.getChildQueues().size() != 1) { + throw new IOException( + "Reservable Queue should not have sub-queues in the" + + "configuration expect the default reservation queue"); + } + + // Set new configs + setupQueueConfigs(clusterResource); + + updateQuotas(newlyParsedParentQueue.userLimit, + newlyParsedParentQueue.userLimitFactor, + newlyParsedParentQueue.maxAppsForReservation, + newlyParsedParentQueue.maxAppsPerUserForReservation); + + // run reinitialize on each existing queue, to trigger absolute cap + // recomputations + for (CSQueue res : this.getChildQueues()) { + res.reinitialize(res, clusterResource); + } + showReservationsAsQueues = + newlyParsedParentQueue.showReservationsAsQueues; + } finally { + writeLock.unlock(); + } } - @Override - protected AutoCreatedLeafQueueTemplate.Builder initializeLeafQueueConfigs - (String queuePath) { - AutoCreatedLeafQueueTemplate.Builder leafQueueTemplate = super - .initializeLeafQueueConfigs - (queuePath); - showReservationsAsQueues = csContext.getConfiguration() - .getShowReservationAsQueues(queuePath); - return leafQueueTemplate; + private void updateQuotas(int userLimit, float userLimitFactor, + int maxAppsForReservation, int maxAppsPerUserForReservation) { + this.userLimit = userLimit; + this.userLimitFactor = userLimitFactor; + this.maxAppsForReservation = maxAppsForReservation; + this.maxAppsPerUserForReservation = maxAppsPerUserForReservation; } - protected void validate(final CSQueue newlyParsedQueue) throws IOException { - // Sanity check - if (!(newlyParsedQueue instanceof PlanQueue) || !newlyParsedQueue - .getQueuePath().equals(getQueuePath())) { - throw new IOException( - "Trying to reinitialize " + getQueuePath() + " from " - + newlyParsedQueue.getQueuePath()); - } + /** + * Number of maximum applications for each of the reservations in this Plan. + * + * @return maxAppsForreservation + */ + public int getMaxApplicationsForReservations() { + return maxAppsForReservation; + } - PlanQueue newlyParsedParentQueue = (PlanQueue) newlyParsedQueue; + /** + * Number of maximum applications per user for each of the reservations in + * this Plan. + * + * @return maxAppsPerUserForreservation + */ + public int getMaxApplicationsPerUserForReservation() { + return maxAppsPerUserForReservation; + } - if (newlyParsedParentQueue.getChildQueues().size() != 1) { - throw new IOException( - "Reservable Queue should not have sub-queues in the" - + "configuration expect the default reservation queue"); - } + /** + * User limit value for each of the reservations in this Plan. + * + * @return userLimit + */ + public int getUserLimitForReservation() { + return userLimit; + } + + /** + * User limit factor value for each of the reservations in this Plan. + * + * @return userLimitFactor + */ + public float getUserLimitFactor() { + return userLimitFactor; } /** @@ -98,4 +170,4 @@ public class PlanQueue extends AbstractManagedParentQueue { public boolean showReservationsAsQueues() { return showReservationsAsQueues; } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/b38643c9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueManagementChange.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueManagementChange.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueManagementChange.java new file mode 100644 index 0000000..74d9b23 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueManagementChange.java @@ -0,0 +1,148 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.QueueState; + +/** + * Encapsulates Queue entitlement and state updates needed + * for adjusting capacity dynamically + * + */ +@Private +@Unstable +public abstract class QueueManagementChange { + + private final CSQueue queue; + + /** + * Updating the queue may involve entitlement updates + * and/or QueueState changes + * + * QueueAction can potentially be enhanced + * for adding, removing queues for queue management + */ + public enum QueueAction { + UPDATE_QUEUE + } + + private AutoCreatedLeafQueueConfig + queueTemplateUpdate; + + private final QueueAction queueAction; + /** + * Updated Queue state with the new entitlement + */ + private QueueState transitionToQueueState; + + public QueueManagementChange(final CSQueue queue, + final QueueAction queueAction) { + this.queue = queue; + this.queueAction = queueAction; + } + + public QueueManagementChange(final CSQueue queue, + final QueueAction queueAction, QueueState targetQueueState, + final AutoCreatedLeafQueueConfig + queueTemplateUpdates) { + this(queue, queueAction, queueTemplateUpdates); + this.transitionToQueueState = targetQueueState; + } + + public QueueManagementChange(final CSQueue queue, + final QueueAction queueAction, + final AutoCreatedLeafQueueConfig + queueTemplateUpdates) { + this(queue, queueAction); + this.queueTemplateUpdate = queueTemplateUpdates; + } + + public QueueState getTransitionToQueueState() { + return transitionToQueueState; + } + + public CSQueue getQueue() { + return queue; + } + + public AutoCreatedLeafQueueConfig getUpdatedQueueTemplate() { + return queueTemplateUpdate; + } + + public QueueAction getQueueAction() { + return queueAction; + } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (!(o instanceof QueueManagementChange)) + return false; + + QueueManagementChange that = (QueueManagementChange) o; + + if (queue != null ? !queue.equals(that.queue) : that.queue != null) + return false; + if (queueTemplateUpdate != null ? !queueTemplateUpdate.equals( + that.queueTemplateUpdate) : that.queueTemplateUpdate != null) + return false; + if (queueAction != that.queueAction) + return false; + return transitionToQueueState == that.transitionToQueueState; + } + + @Override + public int hashCode() { + int result = queue != null ? queue.hashCode() : 0; + result = 31 * result + (queueTemplateUpdate != null ? + queueTemplateUpdate.hashCode() : + 0); + result = 31 * result + (queueAction != null ? queueAction.hashCode() : 0); + result = 31 * result + (transitionToQueueState != null ? + transitionToQueueState.hashCode() : + 0); + return result; + } + + @Override + public String toString() { + return "QueueManagementChange{" + "queue=" + queue + + ", updatedEntitlementsByPartition=" + queueTemplateUpdate + + ", queueAction=" + queueAction + ", transitionToQueueState=" + + transitionToQueueState + '}'; + } + + public static class UpdateQueue extends QueueManagementChange { + + public UpdateQueue(final CSQueue queue, QueueState targetQueueState, + final AutoCreatedLeafQueueConfig + queueTemplateUpdate) { + super(queue, QueueAction.UPDATE_QUEUE, targetQueueState, + queueTemplateUpdate); + } + + public UpdateQueue(final CSQueue queue, + final AutoCreatedLeafQueueConfig + queueTemplateUpdate) { + super(queue, QueueAction.UPDATE_QUEUE, queueTemplateUpdate); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b38643c9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueManagementDynamicEditPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueManagementDynamicEditPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueManagementDynamicEditPolicy.java new file mode 100644 index 0000000..9b0cf7b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueManagementDynamicEditPolicy.java @@ -0,0 +1,272 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; + + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event + .QueueManagementChangeEvent; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.SystemClock; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; + +import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * Queue Management scheduling policy for managed parent queues which enable + * auto child queue creation + */ +public class QueueManagementDynamicEditPolicy implements SchedulingEditPolicy { + + private static final Log LOG = + LogFactory.getLog(QueueManagementDynamicEditPolicy.class); + + private Clock clock; + + // Pointer to other RM components + private RMContext rmContext; + private ResourceCalculator rc; + private CapacityScheduler scheduler; + private RMNodeLabelsManager nlm; + + private long monitoringInterval; + + private Set<String> managedParentQueues = new HashSet<>(); + + /** + * Instantiated by CapacitySchedulerConfiguration + */ + public QueueManagementDynamicEditPolicy() { + clock = SystemClock.getInstance(); + } + + @SuppressWarnings("unchecked") + @VisibleForTesting + public QueueManagementDynamicEditPolicy(RMContext context, + CapacityScheduler scheduler) { + init(context.getYarnConfiguration(), context, scheduler); + } + + @SuppressWarnings("unchecked") + @VisibleForTesting + public QueueManagementDynamicEditPolicy(RMContext context, + CapacityScheduler scheduler, Clock clock) { + init(context.getYarnConfiguration(), context, scheduler); + this.clock = clock; + } + + @Override + public void init(final Configuration config, final RMContext context, + final ResourceScheduler sched) { + LOG.info("Queue Management Policy monitor:" + this. + getClass().getCanonicalName()); + assert null == scheduler : "Unexpected duplicate call to init"; + if (!(sched instanceof CapacityScheduler)) { + throw new YarnRuntimeException("Class " + + sched.getClass().getCanonicalName() + " not instance of " + + CapacityScheduler.class.getCanonicalName()); + } + rmContext = context; + scheduler = (CapacityScheduler) sched; + clock = scheduler.getClock(); + + rc = scheduler.getResourceCalculator(); + nlm = scheduler.getRMContext().getNodeLabelManager(); + + CapacitySchedulerConfiguration csConfig = scheduler.getConfiguration(); + + monitoringInterval = csConfig.getLong( + CapacitySchedulerConfiguration.QUEUE_MANAGEMENT_MONITORING_INTERVAL, + CapacitySchedulerConfiguration. + DEFAULT_QUEUE_MANAGEMENT_MONITORING_INTERVAL); + + initQueues(); + } + + /** + * Reinitializes queues(Called on scheduler.reinitialize) + * @param config Configuration + * @param context The resourceManager's context + * @param sched The scheduler + */ + public void reinitialize(final Configuration config, final RMContext context, + final ResourceScheduler sched) { + //TODO - Wire with scheduler reinitialize and remove initQueues below? + initQueues(); + } + + private void initQueues() { + managedParentQueues.clear(); + for (Map.Entry<String, CSQueue> queues : scheduler + .getCapacitySchedulerQueueManager() + .getQueues().entrySet()) { + + String queueName = queues.getKey(); + CSQueue queue = queues.getValue(); + + if ( queue instanceof ManagedParentQueue) { + managedParentQueues.add(queueName); + } + } + } + + @Override + public void editSchedule() { + long startTs = clock.getTime(); + + initQueues(); + manageAutoCreatedLeafQueues(); + + if (LOG.isDebugEnabled()) { + LOG.debug("Total time used=" + (clock.getTime() - startTs) + " ms."); + } + } + + @VisibleForTesting + List<QueueManagementChange> manageAutoCreatedLeafQueues() + { + + List<QueueManagementChange> queueManagementChanges = new ArrayList<>(); + // All partitions to look at + + //Proceed only if there are queues to process + if (managedParentQueues.size() > 0) { + for (String parentQueueName : managedParentQueues) { + ManagedParentQueue parentQueue = + (ManagedParentQueue) scheduler.getCapacitySchedulerQueueManager(). + getQueue(parentQueueName); + + queueManagementChanges.addAll( + computeQueueManagementChanges + (parentQueue)); + } + } + return queueManagementChanges; + } + + + @VisibleForTesting + List<QueueManagementChange> computeQueueManagementChanges + (ManagedParentQueue parentQueue) { + + List<QueueManagementChange> queueManagementChanges = + Collections.emptyList(); + if (!parentQueue.shouldFailAutoCreationWhenGuaranteedCapacityExceeded()) { + + AutoCreatedQueueManagementPolicy policyClazz = + parentQueue.getAutoCreatedQueueManagementPolicy(); + long startTime = 0; + try { + if (LOG.isDebugEnabled()) { + LOG.debug(MessageFormat + .format("Trying to use {0} to compute preemption " + + "candidates", + policyClazz.getClass().getName())); + startTime = clock.getTime(); + } + + queueManagementChanges = policyClazz.computeQueueManagementChanges(); + + //Scheduler update is asynchronous + if (queueManagementChanges.size() > 0) { + QueueManagementChangeEvent queueManagementChangeEvent = + new QueueManagementChangeEvent(parentQueue, + queueManagementChanges); + scheduler.getRMContext().getDispatcher().getEventHandler().handle( + queueManagementChangeEvent); + } + + if (LOG.isDebugEnabled()) { + LOG.debug(MessageFormat.format("{0} uses {1} millisecond" + + " to run", + policyClazz.getClass().getName(), clock.getTime() + - startTime)); + if (queueManagementChanges.size() > 0) { + LOG.debug(" Updated queue management updates for parent queue" + + " [" + + parentQueue.getQueueName() + ": [\n" + queueManagementChanges + .toString() + "\n]"); + } + } + } catch (YarnException e) { + LOG.error( + "Could not compute child queue management updates for parent " + + "queue " + + parentQueue.getQueueName(), e); + } + } else{ + if (LOG.isDebugEnabled()) { + LOG.debug( + "Skipping queue management updates for parent queue " + + parentQueue + .getQueuePath() + " " + + "since configuration for auto creating queue's beyond " + + "parent's " + + "guaranteed capacity is disabled"); + } + } + return queueManagementChanges; + } + + @Override + public long getMonitoringInterval() { + return monitoringInterval; + } + + @Override + public String getPolicyName() { + return "QueueManagementDynamicEditPolicy"; + } + + public ResourceCalculator getResourceCalculator() { + return rc; + } + + public RMContext getRmContext() { + return rmContext; + } + + public ResourceCalculator getRC() { + return rc; + } + + public CapacityScheduler getScheduler() { + return scheduler; + } + + public Set<String> getManagedParentQueues() { + return managedParentQueues; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b38643c9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java new file mode 100644 index 0000000..34f4aa1 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import java.io.IOException; + +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This represents a dynamic {@link LeafQueue} managed by the + * {@link ReservationSystem} + * + */ +public class ReservationQueue extends AbstractAutoCreatedLeafQueue { + + private static final Logger LOG = LoggerFactory + .getLogger(ReservationQueue.class); + + private PlanQueue parent; + + public ReservationQueue(CapacitySchedulerContext cs, String queueName, + PlanQueue parent) throws IOException { + super(cs, queueName, parent, null); + // the following parameters are common to all reservation in the plan + updateQuotas(parent.getUserLimitForReservation(), + parent.getUserLimitFactor(), + parent.getMaxApplicationsForReservations(), + parent.getMaxApplicationsPerUserForReservation()); + this.parent = parent; + } + + @Override + public void reinitialize(CSQueue newlyParsedQueue, + Resource clusterResource) throws IOException { + try { + writeLock.lock(); + // Sanity check + if (!(newlyParsedQueue instanceof ReservationQueue) || !newlyParsedQueue + .getQueuePath().equals(getQueuePath())) { + throw new IOException( + "Trying to reinitialize " + getQueuePath() + " from " + + newlyParsedQueue.getQueuePath()); + } + super.reinitialize(newlyParsedQueue, clusterResource); + CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource, + this, labelManager, null); + + updateQuotas(parent.getUserLimitForReservation(), + parent.getUserLimitFactor(), + parent.getMaxApplicationsForReservations(), + parent.getMaxApplicationsPerUserForReservation()); + } finally { + writeLock.unlock(); + } + } + + private void updateQuotas(int userLimit, float userLimitFactor, + int maxAppsForReservation, int maxAppsPerUserForReservation) { + setUserLimit(userLimit); + setUserLimitFactor(userLimitFactor); + setMaxApplications(maxAppsForReservation); + maxApplicationsPerUser = maxAppsPerUserForReservation; + } + + @Override + protected void setupConfigurableCapacities(CapacitySchedulerConfiguration + configuration) { + super.setupConfigurableCapacities(queueCapacities); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/b38643c9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/queuemanagement/GuaranteedOrZeroCapacityOverTimePolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/queuemanagement/GuaranteedOrZeroCapacityOverTimePolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/queuemanagement/GuaranteedOrZeroCapacityOverTimePolicy.java new file mode 100644 index 0000000..aee6405 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/queuemanagement/GuaranteedOrZeroCapacityOverTimePolicy.java @@ -0,0 +1,745 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity + .queuemanagement; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler + .SchedulerDynamicEditException; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity + .AbstractAutoCreatedLeafQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity + .AutoCreatedLeafQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity + .AutoCreatedLeafQueueConfig; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity + .AutoCreatedQueueManagementPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity + .CapacitySchedulerContext; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity + .LeafQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity + .ManagedParentQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity + .ParentQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity + .QueueCapacities; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity + .QueueManagementChange; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica + .FiCaSchedulerApp; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.MonotonicClock; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import static org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager + .NO_LABEL; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler + .capacity.CSQueueUtils.EPSILON; + +/** + * Capacity Management policy for auto created leaf queues + * <p> + * Assigns capacity if available to leaf queues based on application + * submission order i.e leaf queues are assigned capacity in FCFS order based + * on application submission time. Updates leaf queue capacities to 0 when + * there are no pending or running apps under that queue. + */ +public class GuaranteedOrZeroCapacityOverTimePolicy + implements AutoCreatedQueueManagementPolicy { + + private CapacitySchedulerContext scheduler; + private ManagedParentQueue managedParentQueue; + + private static final Log LOG = LogFactory.getLog( + GuaranteedOrZeroCapacityOverTimePolicy.class); + + private AutoCreatedLeafQueueConfig ZERO_CAPACITY_ENTITLEMENT; + + private ReentrantReadWriteLock.WriteLock writeLock; + + private ReentrantReadWriteLock.ReadLock readLock; + + private ParentQueueState parentQueueState = new ParentQueueState(); + + private AutoCreatedLeafQueueConfig leafQueueTemplate; + + private QueueCapacities leafQueueTemplateCapacities; + + private Map<String, LeafQueueState> leafQueueStateMap = new HashMap<>(); + + private Clock clock = new MonotonicClock(); + + private class LeafQueueState { + + private AtomicBoolean isActive = new AtomicBoolean(false); + + private long mostRecentActivationTime; + + private long mostRecentDeactivationTime; + + public long getMostRecentActivationTime() { + return mostRecentActivationTime; + } + + public long getMostRecentDeactivationTime() { + return mostRecentDeactivationTime; + } + + /** + * Is the queue currently active or deactivated? + * + * @return true if Active else false + */ + public boolean isActive() { + return isActive.get(); + } + + private boolean activate() { + boolean ret = isActive.compareAndSet(false, true); + mostRecentActivationTime = clock.getTime(); + return ret; + } + + private boolean deactivate() { + boolean ret = isActive.compareAndSet(true, false); + mostRecentDeactivationTime = clock.getTime(); + return ret; + } + } + + private boolean containsLeafQueue(String leafQueueName) { + return leafQueueStateMap.containsKey(leafQueueName); + } + + private boolean addLeafQueueStateIfNotExists(String leafQueueName, + LeafQueueState leafQueueState) { + if (!containsLeafQueue(leafQueueName)) { + leafQueueStateMap.put(leafQueueName, leafQueueState); + return true; + } + return false; + } + + private boolean addLeafQueueStateIfNotExists(LeafQueue leafQueue) { + return addLeafQueueStateIfNotExists(leafQueue.getQueueName(), + new LeafQueueState()); + } + + private void clearLeafQueueState() { + leafQueueStateMap.clear(); + } + + private class ParentQueueState { + + private Map<String, Float> totalAbsoluteActivatedChildQueueCapacityByLabel = + new HashMap<String, Float>(); + + private float getAbsoluteActivatedChildQueueCapacity() { + return getAbsoluteActivatedChildQueueCapacity(NO_LABEL); + } + + private float getAbsoluteActivatedChildQueueCapacity(String nodeLabel) { + try { + readLock.lock(); + Float totalActivatedCapacity = getByLabel(nodeLabel); + if (totalActivatedCapacity != null) { + return totalActivatedCapacity; + } else{ + return 0; + } + } finally { + readLock.unlock(); + } + } + + private void incAbsoluteActivatedChildCapacity(String nodeLabel, + float childQueueCapacity) { + try { + writeLock.lock(); + Float activatedChildCapacity = getByLabel(nodeLabel); + if (activatedChildCapacity != null) { + setByLabel(nodeLabel, activatedChildCapacity + childQueueCapacity); + } else{ + setByLabel(nodeLabel, childQueueCapacity); + } + } finally { + writeLock.unlock(); + } + } + + private void decAbsoluteActivatedChildCapacity(String nodeLabel, + float childQueueCapacity) { + try { + writeLock.lock(); + Float activatedChildCapacity = getByLabel(nodeLabel); + if (activatedChildCapacity != null) { + setByLabel(nodeLabel, activatedChildCapacity - childQueueCapacity); + } else{ + setByLabel(nodeLabel, childQueueCapacity); + } + } finally { + writeLock.unlock(); + } + } + + Float getByLabel(String label) { + return totalAbsoluteActivatedChildQueueCapacityByLabel.get(label); + } + + Float setByLabel(String label, float val) { + return totalAbsoluteActivatedChildQueueCapacityByLabel.put(label, val); + } + + void clear() { + totalAbsoluteActivatedChildQueueCapacityByLabel.clear(); + } + } + + /** + * Comparator that orders applications by their submit time + */ + private class PendingApplicationComparator + implements Comparator<FiCaSchedulerApp> { + + @Override + public int compare(FiCaSchedulerApp app1, FiCaSchedulerApp app2) { + RMApp rmApp1 = scheduler.getRMContext().getRMApps().get( + app1.getApplicationId()); + RMApp rmApp2 = scheduler.getRMContext().getRMApps().get( + app2.getApplicationId()); + if (rmApp1 != null && rmApp2 != null) { + return Long.compare(rmApp1.getSubmitTime(), rmApp2.getSubmitTime()); + } else if (rmApp1 != null) { + return -1; + } else if (rmApp2 != null) { + return 1; + } else{ + return 0; + } + } + } + + private PendingApplicationComparator applicationComparator = + new PendingApplicationComparator(); + + @Override + public void init(final CapacitySchedulerContext schedulerContext, + final ParentQueue parentQueue) { + this.scheduler = schedulerContext; + + ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + readLock = lock.readLock(); + writeLock = lock.writeLock(); + + if (!(parentQueue instanceof ManagedParentQueue)) { + throw new IllegalArgumentException( + "Expected instance of type " + ManagedParentQueue.class); + } + + this.managedParentQueue = (ManagedParentQueue) parentQueue; + + initializeLeafQueueTemplate(this.managedParentQueue); + + LOG.info( + "Initialized queue management policy for parent queue " + parentQueue + .getQueueName() + " with leaf queue template capacities : [" + + leafQueueTemplate.getQueueCapacities() + "]"); + } + + private void initializeLeafQueueTemplate(ManagedParentQueue parentQueue) { + leafQueueTemplate = parentQueue.getLeafQueueTemplate(); + + leafQueueTemplateCapacities = leafQueueTemplate.getQueueCapacities(); + + ZERO_CAPACITY_ENTITLEMENT = buildTemplate(0.0f, + leafQueueTemplateCapacities.getMaximumCapacity()); + } + + @Override + public List<QueueManagementChange> computeQueueManagementChanges() + throws SchedulerDynamicEditException { + + //TODO : Add support for node labels on leaf queue template configurations + //synch/add missing leaf queue(s) if any to state + updateLeafQueueState(); + + try { + readLock.lock(); + List<QueueManagementChange> queueManagementChanges = new ArrayList<>(); + + // check if any leaf queues need to be deactivated based on pending + // applications and + float parentAbsoluteCapacity = + managedParentQueue.getQueueCapacities().getAbsoluteCapacity(); + + float leafQueueTemplateAbsoluteCapacity = + leafQueueTemplateCapacities.getAbsoluteCapacity(); + Map<String, QueueCapacities> deactivatedLeafQueues = + deactivateLeafQueuesIfInActive(managedParentQueue, queueManagementChanges); + + float deactivatedCapacity = getTotalDeactivatedCapacity( + deactivatedLeafQueues); + + float sumOfChildQueueActivatedCapacity = parentQueueState. + getAbsoluteActivatedChildQueueCapacity(); + + //Check if we need to activate anything at all? + float availableCapacity = getAvailableCapacity(parentAbsoluteCapacity, + deactivatedCapacity, sumOfChildQueueActivatedCapacity); + + if (LOG.isDebugEnabled()) { + LOG.debug( + "Parent queue : " + managedParentQueue.getQueueName() + " absCapacity = " + + parentAbsoluteCapacity + ", leafQueueAbsoluteCapacity = " + + leafQueueTemplateAbsoluteCapacity + ", deactivatedCapacity = " + + deactivatedCapacity + " , absChildActivatedCapacity = " + + sumOfChildQueueActivatedCapacity + ", availableCapacity = " + + availableCapacity); + } + + if (availableCapacity >= leafQueueTemplateAbsoluteCapacity) { + //sort applications across leaf queues by submit time + List<FiCaSchedulerApp> pendingApps = getSortedPendingApplications(); + + if (pendingApps.size() > 0) { + int maxLeafQueuesTobeActivated = getMaxLeavesToBeActivated( + availableCapacity, leafQueueTemplateAbsoluteCapacity, + pendingApps.size()); + + if (LOG.isDebugEnabled()) { + LOG.debug("Found " + maxLeafQueuesTobeActivated + + " leaf queues to be activated with " + pendingApps.size() + + " apps "); + } + + LinkedHashSet<String> leafQueuesToBeActivated = getSortedLeafQueues( + pendingApps, maxLeafQueuesTobeActivated, + deactivatedLeafQueues.keySet()); + + //Compute entitlement changes for the identified leaf queues + // which is appended to the List of queueManagementChanges + computeQueueManagementChanges(leafQueuesToBeActivated, + queueManagementChanges, availableCapacity, + leafQueueTemplateAbsoluteCapacity); + + if (LOG.isDebugEnabled()) { + if (leafQueuesToBeActivated.size() > 0) { + LOG.debug( + "Activated leaf queues : [" + leafQueuesToBeActivated + "]"); + } + } + } + } + return queueManagementChanges; + } finally { + readLock.unlock(); + } + } + + private float getTotalDeactivatedCapacity( + Map<String, QueueCapacities> deactivatedLeafQueues) { + float deactivatedCapacity = 0; + for (Iterator<Map.Entry<String, QueueCapacities>> iterator = + deactivatedLeafQueues.entrySet().iterator(); iterator.hasNext(); ) { + Map.Entry<String, QueueCapacities> deactivatedQueueCapacity = + iterator.next(); + deactivatedCapacity += + deactivatedQueueCapacity.getValue().getAbsoluteCapacity(); + } + return deactivatedCapacity; + } + + @VisibleForTesting + void updateLeafQueueState() { + try { + writeLock.lock(); + Set<String> newQueues = new HashSet<>(); + for (CSQueue newQueue : managedParentQueue.getChildQueues()) { + if (newQueue instanceof LeafQueue) { + addLeafQueueStateIfNotExists((LeafQueue) newQueue); + newQueues.add(newQueue.getQueueName()); + } + } + + for (Iterator<Map.Entry<String, LeafQueueState>> itr = + leafQueueStateMap.entrySet().iterator(); itr.hasNext(); ) { + Map.Entry<String, LeafQueueState> e = itr.next(); + String queueName = e.getKey(); + if (!newQueues.contains(queueName)) { + itr.remove(); + } + } + } finally { + writeLock.unlock(); + } + } + + private LinkedHashSet<String> getSortedLeafQueues( + final List<FiCaSchedulerApp> pendingApps, int leafQueuesNeeded, + Set<String> deactivatedQueues) throws SchedulerDynamicEditException { + + LinkedHashSet<String> leafQueues = new LinkedHashSet<>(leafQueuesNeeded); + int ctr = 0; + for (FiCaSchedulerApp app : pendingApps) { + + AutoCreatedLeafQueue leafQueue = + (AutoCreatedLeafQueue) app.getCSLeafQueue(); + String leafQueueName = leafQueue.getQueueName(); + + //Check if leafQueue is not active already and has any pending apps + if (ctr < leafQueuesNeeded) { + + if (!isActive(leafQueue)) { + if (!deactivatedQueues.contains(leafQueueName)) { + if (addLeafQueueIfNotExists(leafQueues, leafQueueName)) { + ctr++; + } + } + } + } else{ + break; + } + } + return leafQueues; + } + + private boolean addLeafQueueIfNotExists(Set<String> leafQueues, + String leafQueueName) { + boolean ret = false; + if (!leafQueues.contains(leafQueueName)) { + ret = leafQueues.add(leafQueueName); + } + return ret; + } + + @VisibleForTesting + public boolean isActive(final AutoCreatedLeafQueue leafQueue) + throws SchedulerDynamicEditException { + try { + readLock.lock(); + LeafQueueState leafQueueStatus = getLeafQueueState(leafQueue); + return leafQueueStatus.isActive(); + } finally { + readLock.unlock(); + } + } + + private Map<String, QueueCapacities> deactivateLeafQueuesIfInActive( + ParentQueue parentQueue, + List<QueueManagementChange> queueManagementChanges) + throws SchedulerDynamicEditException { + Map<String, QueueCapacities> deactivatedQueues = new HashMap<>(); + + for (CSQueue childQueue : parentQueue.getChildQueues()) { + AutoCreatedLeafQueue leafQueue = (AutoCreatedLeafQueue) childQueue; + + if (isActive(leafQueue) && !hasPendingApps(leafQueue)) { + queueManagementChanges.add( + new QueueManagementChange.UpdateQueue(leafQueue, + ZERO_CAPACITY_ENTITLEMENT)); + deactivatedQueues.put(leafQueue.getQueueName(), + leafQueueTemplateCapacities); + } else{ + if (LOG.isDebugEnabled()) { + LOG.debug(" Leaf queue has pending applications : " + leafQueue + .getNumApplications() + ".Skipping deactivation for " + + leafQueue); + } + } + } + + if (LOG.isDebugEnabled()) { + if (deactivatedQueues.size() > 0) { + LOG.debug("Deactivated leaf queues : " + deactivatedQueues); + } + } + return deactivatedQueues; + } + + private void computeQueueManagementChanges( + Set<String> leafQueuesToBeActivated, + List<QueueManagementChange> queueManagementChanges, + final float availableCapacity, + final float leafQueueTemplateAbsoluteCapacity) { + + float curAvailableCapacity = availableCapacity; + + for (String curLeafQueue : leafQueuesToBeActivated) { + // Activate queues if capacity is available + if (curAvailableCapacity >= leafQueueTemplateAbsoluteCapacity) { + AutoCreatedLeafQueue leafQueue = + (AutoCreatedLeafQueue) scheduler.getCapacitySchedulerQueueManager() + .getQueue(curLeafQueue); + if (leafQueue != null) { + AutoCreatedLeafQueueConfig newTemplate = buildTemplate( + leafQueueTemplateCapacities.getCapacity(), + leafQueueTemplateCapacities.getMaximumCapacity()); + queueManagementChanges.add( + new QueueManagementChange.UpdateQueue(leafQueue, newTemplate)); + curAvailableCapacity -= leafQueueTemplateAbsoluteCapacity; + } else{ + LOG.warn( + "Could not find queue in scheduler while trying to deactivate " + + curLeafQueue); + } + } + } + } + + @VisibleForTesting + public int getMaxLeavesToBeActivated(float availableCapacity, + float childQueueAbsoluteCapacity, int numPendingApps) + throws SchedulerDynamicEditException { + + if (childQueueAbsoluteCapacity > 0) { + int numLeafQueuesNeeded = (int) Math.floor( + availableCapacity / childQueueAbsoluteCapacity); + + return Math.min(numLeafQueuesNeeded, numPendingApps); + } else{ + throw new SchedulerDynamicEditException("Child queue absolute capacity " + + "is initialized to 0. Check parent queue's " + managedParentQueue + .getQueueName() + " leaf queue template configuration"); + } + } + + private float getAvailableCapacity(float parentAbsCapacity, + float deactivatedAbsCapacity, float totalChildQueueActivatedCapacity) { + return parentAbsCapacity - totalChildQueueActivatedCapacity + + deactivatedAbsCapacity + EPSILON; + } + + /** + * Commit queue management changes - which involves updating required state + * on parent/underlying leaf queues + * + * @param queueManagementChanges Queue Management changes to commit + * @throws SchedulerDynamicEditException when validation fails + */ + @Override + public void commitQueueManagementChanges( + List<QueueManagementChange> queueManagementChanges) + throws SchedulerDynamicEditException { + try { + writeLock.lock(); + for (QueueManagementChange queueManagementChange : + queueManagementChanges) { + AutoCreatedLeafQueueConfig updatedQueueTemplate = + queueManagementChange.getUpdatedQueueTemplate(); + CSQueue queue = queueManagementChange.getQueue(); + if (!(queue instanceof AutoCreatedLeafQueue)) { + throw new SchedulerDynamicEditException( + "Expected queue management change for AutoCreatedLeafQueue. " + + "Found " + queue.getClass().getName()); + } + + AutoCreatedLeafQueue leafQueue = (AutoCreatedLeafQueue) queue; + + if (updatedQueueTemplate.getQueueCapacities().getCapacity() > 0) { + if (isActive(leafQueue)) { + if (LOG.isDebugEnabled()) { + LOG.debug( + "Queue is already active. Skipping activation : " + queue + .getQueuePath()); + } + } else{ + activate(leafQueue); + } + } else{ + if (!isActive(leafQueue)) { + if (LOG.isDebugEnabled()) { + LOG.debug( + "Queue is already de-activated. " + "Skipping de-activation " + + ": " + leafQueue.getQueuePath()); + } + } else{ + deactivate(leafQueue); + } + } + } + } finally { + writeLock.unlock(); + } + } + + private void activate(final AutoCreatedLeafQueue leafQueue) + throws SchedulerDynamicEditException { + try { + writeLock.lock(); + getLeafQueueState(leafQueue).activate(); + + parentQueueState.incAbsoluteActivatedChildCapacity(NO_LABEL, + leafQueueTemplateCapacities.getAbsoluteCapacity()); + } finally { + writeLock.unlock(); + } + } + + private void deactivate(final AutoCreatedLeafQueue leafQueue) + throws SchedulerDynamicEditException { + try { + writeLock.lock(); + getLeafQueueState(leafQueue).deactivate(); + + for (String nodeLabel : managedParentQueue.getQueueCapacities() + .getExistingNodeLabels()) { + parentQueueState.decAbsoluteActivatedChildCapacity(nodeLabel, + leafQueueTemplateCapacities.getAbsoluteCapacity()); + } + } finally { + writeLock.unlock(); + } + } + + public boolean hasPendingApps(final AutoCreatedLeafQueue leafQueue) { + return leafQueue.getNumApplications() > 0; + } + + @Override + public void reinitialize(CapacitySchedulerContext schedulerContext, + final ParentQueue parentQueue) { + if (!(parentQueue instanceof ManagedParentQueue)) { + throw new IllegalStateException( + "Expected instance of type " + ManagedParentQueue.class + " found " + + " : " + parentQueue.getClass()); + } + + if (this.managedParentQueue != null && !parentQueue.getQueuePath().equals( + this.managedParentQueue.getQueuePath())) { + throw new IllegalStateException( + "Expected parent queue path to match " + this.managedParentQueue + .getQueuePath() + " found : " + parentQueue.getQueuePath()); + } + + this.managedParentQueue = (ManagedParentQueue) parentQueue; + + initializeLeafQueueTemplate(this.managedParentQueue); + + //clear state + parentQueueState.clear(); + clearLeafQueueState(); + + LOG.info( + "Reinitialized queue management policy for parent queue " + + parentQueue.getQueueName() +" with leaf queue template " + + "capacities : [" + + leafQueueTemplate.getQueueCapacities() + "]"); + } + + @Override + public AutoCreatedLeafQueueConfig getInitialLeafQueueConfiguration( + AbstractAutoCreatedLeafQueue leafQueue) + throws SchedulerDynamicEditException { + + if ( !(leafQueue instanceof AutoCreatedLeafQueue)) { + throw new SchedulerDynamicEditException("Not an instance of " + + "AutoCreatedLeafQueue : " + leafQueue.getClass()); + } + + AutoCreatedLeafQueue autoCreatedLeafQueue = + (AutoCreatedLeafQueue) leafQueue; + AutoCreatedLeafQueueConfig template = ZERO_CAPACITY_ENTITLEMENT; + try { + writeLock.lock(); + if (!addLeafQueueStateIfNotExists(leafQueue)) { + LOG.error("Leaf queue already exists in state : " + getLeafQueueState( + leafQueue)); + throw new SchedulerDynamicEditException( + "Leaf queue already exists in state : " + getLeafQueueState( + leafQueue)); + } + + float availableCapacity = getAvailableCapacity( + managedParentQueue.getQueueCapacities().getAbsoluteCapacity(), 0, + parentQueueState.getAbsoluteActivatedChildQueueCapacity()); + + if (availableCapacity >= leafQueueTemplateCapacities + .getAbsoluteCapacity()) { + activate(autoCreatedLeafQueue); + template = buildTemplate(leafQueueTemplateCapacities.getCapacity(), + leafQueueTemplateCapacities.getMaximumCapacity()); + } + } finally { + writeLock.unlock(); + } + return template; + } + + @VisibleForTesting + LeafQueueState getLeafQueueState(LeafQueue queue) + throws SchedulerDynamicEditException { + try { + readLock.lock(); + String queueName = queue.getQueueName(); + if (!containsLeafQueue(queueName)) { + throw new SchedulerDynamicEditException( + "Could not find leaf queue in " + "state " + queueName); + } else{ + return leafQueueStateMap.get(queueName); + } + } finally { + readLock.unlock(); + } + } + + @VisibleForTesting + public float getAbsoluteActivatedChildQueueCapacity() { + return parentQueueState.getAbsoluteActivatedChildQueueCapacity(); + } + + private List<FiCaSchedulerApp> getSortedPendingApplications() { + List<FiCaSchedulerApp> apps = new ArrayList<>( + managedParentQueue.getAllApplications()); + Collections.sort(apps, applicationComparator); + return apps; + } + + private AutoCreatedLeafQueueConfig buildTemplate(float capacity, + float maxCapacity) { + AutoCreatedLeafQueueConfig.Builder templateBuilder = + new AutoCreatedLeafQueueConfig.Builder(); + + QueueCapacities capacities = new QueueCapacities(false); + templateBuilder.capacities(capacities); + + for (String nodeLabel : managedParentQueue.getQueueCapacities() + .getExistingNodeLabels()) { + capacities.setCapacity(nodeLabel, capacity); + capacities.setMaximumCapacity(nodeLabel, maxCapacity); + } + + return new AutoCreatedLeafQueueConfig(templateBuilder); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b38643c9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/QueueEntitlement.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/QueueEntitlement.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/QueueEntitlement.java index 2a751e3..f4182f3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/QueueEntitlement.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/QueueEntitlement.java @@ -43,4 +43,26 @@ public class QueueEntitlement { public void setCapacity(float capacity) { this.capacity = capacity; } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (!(o instanceof QueueEntitlement)) + return false; + + QueueEntitlement that = (QueueEntitlement) o; + + if (Float.compare(that.capacity, capacity) != 0) + return false; + return Float.compare(that.maxCapacity, maxCapacity) == 0; + } + + @Override + public int hashCode() { + int result = (capacity != +0.0f ? Float.floatToIntBits(capacity) : 0); + result = 31 * result + (maxCapacity != +0.0f ? Float.floatToIntBits( + maxCapacity) : 0); + return result; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b38643c9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/QueueManagementChangeEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/QueueManagementChangeEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/QueueManagementChangeEvent.java new file mode 100644 index 0000000..926e1be --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/QueueManagementChangeEvent.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity + .ParentQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity + .QueueManagementChange; + +import java.util.List; + +/** + * Event to update scheduler of any queue management changes + */ +public class QueueManagementChangeEvent extends SchedulerEvent { + + private ParentQueue parentQueue; + private List<QueueManagementChange> queueManagementChanges; + + public QueueManagementChangeEvent(ParentQueue parentQueue, + List<QueueManagementChange> queueManagementChanges) { + super(SchedulerEventType.MANAGE_QUEUE); + this.parentQueue = parentQueue; + this.queueManagementChanges = queueManagementChanges; + } + + public ParentQueue getParentQueue() { + return parentQueue; + } + + public List<QueueManagementChange> getQueueManagementChanges() { + return queueManagementChanges; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b38643c9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java index 229e0bb..b107cf4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java @@ -51,5 +51,8 @@ public enum SchedulerEventType { MARK_CONTAINER_FOR_KILLABLE, // Cancel a killable container - MARK_CONTAINER_FOR_NONKILLABLE + MARK_CONTAINER_FOR_NONKILLABLE, + + //Queue Management Change + MANAGE_QUEUE } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b38643c9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestAutoCreatedLeafQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestAutoCreatedLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestAutoCreatedLeafQueue.java deleted file mode 100644 index b403e72..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestAutoCreatedLeafQueue.java +++ /dev/null @@ -1,113 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import java.io.IOException; - -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.server.resourcemanager.RMContext; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; -import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; -import org.apache.hadoop.yarn.util.resource.ResourceCalculator; -import org.apache.hadoop.yarn.util.resource.Resources; -import org.junit.Before; -import org.junit.Test; - -/** - * Test class for dynamic auto created leaf queues. - * @see AutoCreatedLeafQueue - */ -public class TestAutoCreatedLeafQueue { - - private CapacitySchedulerConfiguration csConf; - private CapacitySchedulerContext csContext; - final static int DEF_MAX_APPS = 10000; - final static int GB = 1024; - private final ResourceCalculator resourceCalculator = - new DefaultResourceCalculator(); - private AutoCreatedLeafQueue autoCreatedLeafQueue; - - @Before - public void setup() throws IOException { - // setup a context / conf - csConf = new CapacitySchedulerConfiguration(); - YarnConfiguration conf = new YarnConfiguration(); - csContext = mock(CapacitySchedulerContext.class); - when(csContext.getConfiguration()).thenReturn(csConf); - when(csContext.getConf()).thenReturn(conf); - when(csContext.getMinimumResourceCapability()).thenReturn( - Resources.createResource(GB, 1)); - when(csContext.getMaximumResourceCapability()).thenReturn( - Resources.createResource(16 * GB, 32)); - when(csContext.getClusterResource()).thenReturn( - Resources.createResource(100 * 16 * GB, 100 * 32)); - when(csContext.getResourceCalculator()).thenReturn(resourceCalculator); - RMContext mockRMContext = TestUtils.getMockRMContext(); - when(csContext.getRMContext()).thenReturn(mockRMContext); - - // create a queue - PlanQueue pq = new PlanQueue(csContext, "root", null, null); - autoCreatedLeafQueue = new AutoCreatedLeafQueue(csContext, "a", pq); - } - - private void validateAutoCreatedLeafQueue(double capacity) { - assertTrue(" actual capacity: " + autoCreatedLeafQueue.getCapacity(), - autoCreatedLeafQueue.getCapacity() - capacity < CSQueueUtils.EPSILON); - assertEquals(autoCreatedLeafQueue.maxApplications, DEF_MAX_APPS); - assertEquals(autoCreatedLeafQueue.maxApplicationsPerUser, DEF_MAX_APPS); - } - - @Test - public void testAddSubtractCapacity() throws Exception { - - // verify that setting, adding, subtracting capacity works - autoCreatedLeafQueue.setCapacity(1.0F); - validateAutoCreatedLeafQueue(1); - autoCreatedLeafQueue.setEntitlement(new QueueEntitlement(0.9f, 1f)); - validateAutoCreatedLeafQueue(0.9); - autoCreatedLeafQueue.setEntitlement(new QueueEntitlement(1f, 1f)); - validateAutoCreatedLeafQueue(1); - autoCreatedLeafQueue.setEntitlement(new QueueEntitlement(0f, 1f)); - validateAutoCreatedLeafQueue(0); - - try { - autoCreatedLeafQueue.setEntitlement(new QueueEntitlement(1.1f, 1f)); - fail(); - } catch (SchedulerDynamicEditException iae) { - // expected - validateAutoCreatedLeafQueue(1); - } - - try { - autoCreatedLeafQueue.setEntitlement(new QueueEntitlement(-0.1f, 1f)); - fail(); - } catch (SchedulerDynamicEditException iae) { - // expected - validateAutoCreatedLeafQueue(1); - } - - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
