This is an automated email from the ASF dual-hosted git repository. taoyang pushed a commit to branch YARN-11781 in repository https://gitbox.apache.org/repos/asf/hadoop.git
commit 7901e3931dda92ab32c5231fc2aa84fe1e6bcf4c Author: Tao Yang <taoy...@apache.org> AuthorDate: Fri Feb 28 12:07:35 2025 +0800 YARN-11781. Implement dynamic requests handling in CapacityScheduler. --- .../scheduler/capacity/CapacityScheduler.java | 63 ++ .../capacity/CapacitySchedulerConfiguration.java | 11 + .../capacity/CapacitySchedulerMetrics.java | 11 + .../scheduler/capacity/RequestsHandleResponse.java | 58 ++ .../scheduler/capacity/RequestsHandler.java | 868 +++++++++++++++++++++ .../TestCapacitySchedulerMetrics.java | 4 + .../TestCapacitySchedulerRequestsHandler.java | 118 +++ .../scheduler/capacity/TestRequestsHandler.java | 322 ++++++++ 8 files changed, 1455 insertions(+) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 02ffe83a6df..8a716b6a013 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -34,8 +34,11 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.yarn.server.resourcemanager.RMCriticalThreadUncaughtExceptionHandler; import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext; import org.apache.hadoop.yarn.server.resourcemanager.placement.CSMappingPlacementRule; @@ -240,6 +243,8 @@ public Configuration getConf() { private CSMaxRunningAppsEnforcer maxRunningEnforcer; + private RequestsHandler requestsHandler; + public CapacityScheduler() { super(CapacityScheduler.class.getName()); this.maxRunningEnforcer = new CSMaxRunningAppsEnforcer(this); @@ -328,6 +333,7 @@ void initScheduler(Configuration configuration) throws offswitchPerHeartbeatLimit = this.conf.getOffSwitchPerHeartbeatLimit(); initMultiNodePlacement(); + initRequestsHandler(); printSchedulerInitialized(); } finally { writeLock.unlock(); @@ -455,8 +461,15 @@ public void reinitialize(Configuration newConf, RMContext rmContext, refreshMaximumAllocation( ResourceUtils.fetchMaximumAllocationFromConfig(this.conf)); reinitializeQueues(this.conf); + reinitRequestsHandler(this.conf); } catch (Throwable t) { this.conf = oldConf; + try { + reinitRequestsHandler(this.conf); + } catch (Throwable innerT) { + LOG.error("Failed to re-init requests handler : {}", + innerT.getMessage(), innerT); + } reinitializeQueues(this.conf); refreshMaximumAllocation( ResourceUtils.fetchMaximumAllocationFromConfig(this.conf)); @@ -1337,6 +1350,26 @@ public Allocation allocate(ApplicationAttemptId applicationAttemptId, return EMPTY_ALLOCATION; } + // Handle requests + long requestsHandleStartTime = System.currentTimeMillis(); + RequestsHandleResponse handleResponse = + handleRequests(applicationAttemptId, ask, schedulingRequests); + long requestsHandleElapsedMs = + System.currentTimeMillis() - requestsHandleStartTime; + CapacitySchedulerMetrics.getMetrics().addRequestsHandle( + requestsHandleElapsedMs); + if (handleResponse != null && handleResponse.isUpdated()) { + LOG.info("Updated requests: appId={}, elapsedMs={}; " + + "ResourceRequests: origin={}, updated={}; " + + "SchedulingRequests: origin={}, updated={}", + applicationAttemptId.getApplicationId(), + requestsHandleElapsedMs, + ask, handleResponse.getResourceRequests(), + schedulingRequests, handleResponse.getSchedulingRequests()); + ask = handleResponse.getResourceRequests(); + schedulingRequests = handleResponse.getSchedulingRequests(); + } + // Handle all container updates handleContainerUpdates(application, updateRequests); @@ -3642,4 +3675,34 @@ public int compare(FiCaSchedulerApp app1, FiCaSchedulerApp app2) { } } } + + /** + * initialize / reinitialize / handleRequests methods for RequestsHandler. + */ + private void initRequestsHandler() throws IOException, YarnException { + Function<ApplicationAttemptId, Pair<FiCaSchedulerApp, RMApp>> appProvider = + appAttemptId -> { + FiCaSchedulerApp app = getApplicationAttempt(appAttemptId); + RMApp rmApp = + rmContext.getRMApps().get(appAttemptId.getApplicationId()); + if (app == null || rmApp == null) { + return null; + } + return ImmutablePair.of(app, rmApp); + }; + requestsHandler = new RequestsHandler(appProvider); + reinitRequestsHandler(this.conf); + } + + private void reinitRequestsHandler(Configuration newConf) + throws IOException, YarnException { + requestsHandler.initialize(newConf); + } + + protected RequestsHandleResponse handleRequests( + ApplicationAttemptId appAttemptId, + List<ResourceRequest> resourceRequests, + List<SchedulingRequest> schedulingRequests) { + return requestsHandler.handle(appAttemptId, resourceRequests, schedulingRequests); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index ea5c892ce3e..c9442f37afd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -430,6 +430,17 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur public static final String MAPPING_RULE_FORMAT_DEFAULT = MAPPING_RULE_FORMAT_LEGACY; + public static final String REQUEST_HANDLER_PREFIX = + PREFIX + "request-handler."; + + public static final String REQUEST_HANDLER_ENABLED = + REQUEST_HANDLER_PREFIX + "enabled"; + + public static final boolean DEFAULT_REQUEST_HANDLER_ENABLED = false; + + public static final String REQUEST_HANDLER_UPDATES = + REQUEST_HANDLER_PREFIX + "updates"; + private static final QueueCapacityConfigParser queueCapacityConfigParser = new QueueCapacityConfigParser(); private static final String LEGACY_QUEUE_MODE_ENABLED = PREFIX + "legacy-queue-mode.enabled"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerMetrics.java index 6277290246b..d500e2739c1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerMetrics.java @@ -52,6 +52,8 @@ public class CapacitySchedulerMetrics { @Metric("Scheduler node update") MutableRate nodeUpdate; @Metric("Scheduler node heartbeat interval") MutableQuantiles schedulerNodeHBInterval; + @Metric("Requests handle") + private MutableRate requestsHandle; private static volatile CapacitySchedulerMetrics INSTANCE = null; private static MetricsRegistry registry; @@ -128,4 +130,13 @@ public void addSchedulerNodeHBInterval(long heartbeatInterval) { public long getNumOfSchedulerNodeHBInterval() { return this.schedulerNodeHBInterval.getEstimator().getCount(); } + + public void addRequestsHandle(long latency) { + this.requestsHandle.add(latency); + } + + @VisibleForTesting + public MutableRate getRequestsHandle() { + return requestsHandle; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/RequestsHandleResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/RequestsHandleResponse.java new file mode 100644 index 00000000000..216e0abc3e7 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/RequestsHandleResponse.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.SchedulingRequest; + +import java.util.ArrayList; +import java.util.List; + +/** + * Response of the handle method in {@link RequestsHandler}. + */ +public class RequestsHandleResponse { + + private final boolean isUpdated; + private final List<ResourceRequest> resourceRequests; + private final List<SchedulingRequest> schedulingRequests; + + public RequestsHandleResponse(boolean isUpdated, + List<ResourceRequest> resourceRequests, + List<SchedulingRequest> schedulingRequests) { + this.isUpdated = isUpdated; + this.resourceRequests = resourceRequests; + this.schedulingRequests = schedulingRequests; + } + + public List<ResourceRequest> getResourceRequests() { + if (resourceRequests == null) { + return new ArrayList<>(); + } + return resourceRequests; + } + + public List<SchedulingRequest> getSchedulingRequests() { + return schedulingRequests; + } + + public boolean isUpdated() { + return isUpdated; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/RequestsHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/RequestsHandler.java new file mode 100644 index 00000000000..500de30c3cc --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/RequestsHandler.java @@ -0,0 +1,868 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.ObjectMapper; + +import javax.script.Bindings; +import javax.script.Compilable; +import javax.script.CompiledScript; +import javax.script.ScriptEngine; +import javax.script.ScriptEngineManager; +import javax.script.ScriptException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.commons.text.StringSubstitutor; +import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.thirdparty.com.google.common.cache.CacheLoader; +import org.apache.hadoop.thirdparty.com.google.common.cache.CacheBuilder; +import org.apache.hadoop.thirdparty.com.google.common.cache.LoadingCache; +import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap; +import org.apache.hadoop.util.Sets; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; +import org.apache.hadoop.yarn.api.records.NodeAttributeOpCode; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.ResourceSizing; +import org.apache.hadoop.yarn.api.records.SchedulingRequest; +import org.apache.hadoop.yarn.api.resource.PlacementConstraint; +import org.apache.hadoop.yarn.api.resource.PlacementConstraints; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParseException; +import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParser; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.NODE; + +/** + * RequestHandler is used to handle requests from applications, + * It handles requests at the beginning of CapacityScheduler#allocate, + * and manages multiple update items which define which requests + * should be chosen and how to update them. based on the capacity-scheduler + * configuration and can be updated dynamically without restarting the RM. + */ +public class RequestsHandler { + + protected static final Logger LOG = + LoggerFactory.getLogger(RequestsHandler.class); + + private static final Pattern PLACEHOLDER_PATTERN = + Pattern.compile("\\$\\{[^}]+\\}"); + + private static final String APP_INFO_KEY_QUEUE = "queue"; + private static final String APP_INFO_KEY_USER = "user"; + private static final String APP_INFO_KEY_PRIORITY = "priority"; + private static final String APP_INFO_KEY_ID = "id"; + private static final String APP_INFO_KEY_NAME = "name"; + private static final String APP_INFO_KEY_TYPE = "type"; + private static final String APP_INFO_KEY_TAGS = "tags"; + private static final String REQUEST_INFO_KEY_PRIORITY = "priority"; + private static final String REQUEST_INFO_KEY_RESOURCE_NAME = "resourceName"; + private static final String REQUEST_INFO_KEY_RELAX_LOCALITY = "relaxLocality"; + private static final String REQUEST_INFO_KEY_EXECUTION_TYPE = "executionType"; + private static final String REQUEST_INFO_KEY_ALLOCATION_TAGS = + "allocationTags"; + private static final String REQUEST_INFO_KEY_IS_AM = "isAM"; + + private static final Set<String> VALID_PLACEHOLDER_KEYS = + Sets.newHashSet(APP_INFO_KEY_QUEUE, APP_INFO_KEY_USER, + APP_INFO_KEY_PRIORITY, APP_INFO_KEY_ID, APP_INFO_KEY_NAME, + APP_INFO_KEY_TYPE, APP_INFO_KEY_TAGS); + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private static final ScriptEngine SCRIPT_ENGINE = + new ScriptEngineManager().getEngineByName("JavaScript"); + + private final Function<ApplicationAttemptId, Pair<FiCaSchedulerApp, RMApp>> + appProvider; + private final ReentrantReadWriteLock.WriteLock writeLock; + private final ReentrantReadWriteLock.ReadLock readLock; + + private boolean enabled = false; + + private List<UpdateItem> updateItems; + + // current updates conf value for comparing + private String updatesConfV; + + public RequestsHandler(Function<ApplicationAttemptId, + Pair<FiCaSchedulerApp, RMApp>> appProvider) { + this.appProvider = appProvider; + ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + writeLock = lock.writeLock(); + readLock = lock.readLock(); + } + + public void initialize(Configuration conf) + throws IOException, YarnException { + if (SCRIPT_ENGINE == null) { + // disabled if script engine is not found + LOG.warn("Disabled RequestsHandler since script engine not found"); + return; + } + boolean newEnabled = + conf.getBoolean(CapacitySchedulerConfiguration.REQUEST_HANDLER_ENABLED, + CapacitySchedulerConfiguration.DEFAULT_REQUEST_HANDLER_ENABLED); + List<UpdateItem> newUpdateItems = null; + String newUpdatesConfV = null; + if (newEnabled) { + newUpdatesConfV = + conf.get(CapacitySchedulerConfiguration.REQUEST_HANDLER_UPDATES); + UpdatesConf newUpdatesConf = null; + if (newUpdatesConfV != null && !newUpdatesConfV.isEmpty()) { + newUpdatesConf = + OBJECT_MAPPER.readValue(newUpdatesConfV, UpdatesConf.class); + } + if (newUpdatesConf != null && newUpdatesConf.getItems() != null && + !newUpdatesConf.getItems().isEmpty()) { + newUpdateItems = new ArrayList<>(); + for (int i = 0; i < newUpdatesConf.getItems().size(); i++) { + UpdateItemConf updateItemConf = newUpdatesConf.getItems().get(i); + newUpdateItems.add(new UpdateItem(i, updateItemConf, this.appProvider)); + } + } + } + // update + writeLock.lock(); + try{ + if (enabled == newEnabled && + StringUtils.equals(newUpdatesConfV, updatesConfV)) { + LOG.info("No changes detected in RequestsHandler configuration," + + " enabled={}, updatesConf={}", enabled, updatesConfV); + return; + } + enabled = newEnabled; + updateItems = newUpdateItems; + updatesConfV = newUpdatesConfV; + LOG.info("Initialized request updater, enabled={}, updatesConf={}", + enabled, updatesConfV); + } finally { + writeLock.unlock(); + } + } + + public RequestsHandleResponse handle(ApplicationAttemptId appAttemptId, + List<ResourceRequest> resourceRequests, + List<SchedulingRequest> schedulingRequests) { + readLock.lock(); + try { + if (!enabled || updateItems == null || updateItems.isEmpty()) { + return null; + } + return updateRequests(appAttemptId, resourceRequests, schedulingRequests); + } finally { + readLock.unlock(); + } + } + + /** + * UpdatesConf is the root object of the configuration. + */ + public static class UpdatesConf { + + @JsonProperty("items") + private List<UpdateItemConf> items; + + public List<UpdateItemConf> getItems() { + return items; + } + + public void setItems(List<UpdateItemConf> items) { + this.items = items; + } + } + + public static class UpdateItemConf { + + @JsonProperty("appMatchExpr") + private String appMatchExpr; + + @JsonProperty("requestMatchExpr") + private String requestMatchExpr; + + // whether to convert ResourceRequest to SchedulingRequest + @JsonProperty("isRRToSR") + private boolean isRRToSR; + + @JsonProperty("partition") + private String partition; + + @JsonProperty("executionType") + private String executionType; + + @JsonProperty("allocationTags") + private Set<String> allocationTags; + + @JsonProperty("placementConstraint") + private String placementConstraint; + + public String getAppMatchExpr() { + return appMatchExpr; + } + + public void setAppMatchExpr(String appMatchExpr) { + this.appMatchExpr = appMatchExpr; + } + + public String getRequestMatchExpr() { + return requestMatchExpr; + } + + public void setRequestMatchExpr(String requestMatchExpr) { + this.requestMatchExpr = requestMatchExpr; + } + + public boolean isRRToSR() { + return isRRToSR; + } + + public void setIsRRToSR(boolean isRRToSR) { + this.isRRToSR = isRRToSR; + } + + public String getPartition() { + return partition; + } + + public void setPartition(String partition) { + this.partition = partition; + } + + public Set<String> getAllocationTags() { + return allocationTags; + } + + public void setAllocationTags(Set<String> allocationTags) { + this.allocationTags = allocationTags; + } + + public String getPlacementConstraint() { + return placementConstraint; + } + + public void setPlacementConstraint(String placementConstraint) { + this.placementConstraint = placementConstraint; + } + + public String getExecutionType() { + return executionType; + } + + public void setExecutionType(String executionType) { + this.executionType = executionType; + } + + public String toString() { + return "{" + + "appMatchExpr='" + appMatchExpr + '\'' + + ", requestMatchExpr='" + requestMatchExpr + '\'' + + ", isRRToSR=" + isRRToSR + + ", partition='" + partition + '\'' + + ", executionType='" + executionType + '\'' + + ", allocationTags=" + allocationTags + + ", placementConstraint='" + placementConstraint + '\'' + + '}'; + } + } + + public RequestsHandleResponse updateRequests( + ApplicationAttemptId appAttemptId, + List<ResourceRequest> resourceRequests, + List<SchedulingRequest> schedulingRequests) { + boolean isUpdated = false; + // update requests + for (UpdateItem updateItem : updateItems) { + DynamicAppInfo dynamicAppInfo = + updateItem.getDynamicAppInfo(appAttemptId); + if (dynamicAppInfo == null) { + break; + } + if (!dynamicAppInfo.isMatched) { + continue; + } + + RequestsHandleResponse resp = + updateItem.updateRequests(appAttemptId.getApplicationId(), + resourceRequests, schedulingRequests, dynamicAppInfo); + if (resp.isUpdated()) { + isUpdated = true; + if (LOG.isDebugEnabled()) { + LOG.debug( + "Updated requests: appId={}, updateItemConf={}, RR={}, SR={}", + appAttemptId.getApplicationId(), updateItem.updateItemConf.toString(), + resp.getResourceRequests(), resp.getSchedulingRequests()); + } + } + resourceRequests = resp.getResourceRequests(); + schedulingRequests = resp.getSchedulingRequests(); + } + return new RequestsHandleResponse(isUpdated, resourceRequests, + schedulingRequests); + } + + @VisibleForTesting + public boolean isEnabled() { + return enabled; + } + + @VisibleForTesting + public List<UpdateItem> getUpdateItems() { + return updateItems; + } + + /** + * UpdateItem is responsible for applying configured request updates based on + * matching rules. + * Variable substitution allows using placeholders like ${queue}, ${user}, + * etc. in placement constraints and allocation tags. + * These variables are replaced with actual application properties at runtime. + */ + public static class UpdateItem { + + Function<ApplicationAttemptId, Pair<FiCaSchedulerApp, RMApp>> appProvider; + + private final int confIndex; + + /* + * Precompiled or preprocessed fields designed to enhance the performance + * of the update process. + */ + private CompiledScript appMatchScript; + private CompiledScript requestMatchScript; + private PlacementConstraint placementConstraint; + private ExecutionType executionType; + private boolean hasPlaceholderForPC; + private boolean hasPlaceholderForAllocTags; + + // Configuration for this update item + private final UpdateItemConf updateItemConf; + + // Loader and Cache for dynamic app info + private final CacheLoader<ApplicationAttemptId, Optional<DynamicAppInfo>> + dynamicAppInfoLoader = + new CacheLoader<ApplicationAttemptId, Optional<DynamicAppInfo>>() { + @Override + public Optional<DynamicAppInfo> load(ApplicationAttemptId appAttemptId) { + Pair<FiCaSchedulerApp, RMApp> appPair = + appProvider.apply(appAttemptId); + if (appPair == null || appPair.getLeft() == null || + appPair.getRight() == null) { + return Optional.empty(); + } + FiCaSchedulerApp app = appPair.getLeft(); + RMApp rmApp = appPair.getRight(); + Map<String, Object> appInfo = convertToAppInfo(app, rmApp); + Map<String, String> appStrInfo = appInfo.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, + e -> e.getValue() != null ? e.getValue().toString() : "")); + boolean isMatched = + isAppMatch(appAttemptId.getApplicationId(), appInfo); + PlacementConstraint runtimePC = + getRuntimePlacementConstraint(appStrInfo); + Set<String> allocationTags = getRuntimeAllocationTags(appStrInfo); + if (LOG.isDebugEnabled()) { + LOG.debug("Loaded dynamic app info: confIndex={}, appId={}, " + + "isMatched={}, placementConstraint={}, allocationTags={}", + confIndex, appAttemptId.getApplicationId(), isMatched, + runtimePC, allocationTags); + } + return Optional.of(new DynamicAppInfo(app, isMatched, runtimePC, + allocationTags)); + } + }; + + private final LoadingCache<ApplicationAttemptId, Optional<DynamicAppInfo>> + dynamicAppInfoCache = + CacheBuilder.newBuilder().expireAfterWrite(30, TimeUnit.SECONDS) + .build(dynamicAppInfoLoader); + + /** + * Constructs an UpdateItem with the specified configuration. + * Compiles scripts for application and request matching. + * Parses execution types and placement constraints from configuration. + * + * @param confIndex the index of this update item in the configuration + * @param updateItemConf the configuration for this update item + * @param appProvider a function to provide application information + * @throws YarnException if the scripts, execution type, or + * placement constraint cannot be parsed + */ + public UpdateItem(int confIndex, UpdateItemConf updateItemConf, + Function<ApplicationAttemptId, Pair<FiCaSchedulerApp, RMApp>> + appProvider) + throws YarnException { + this.confIndex = confIndex; + this.appProvider = appProvider; + // compile app/request match-scripts + if (updateItemConf.getAppMatchExpr() != null) { + try { + appMatchScript = ((Compilable) SCRIPT_ENGINE).compile( + updateItemConf.getAppMatchExpr()); + } catch (ScriptException e) { + throw new YarnException("Failed to compile app match expression: " + + updateItemConf.getAppMatchExpr(), e); + } + } + if (updateItemConf.getRequestMatchExpr() != null) { + try { + requestMatchScript = ((Compilable) SCRIPT_ENGINE).compile( + updateItemConf.getRequestMatchExpr()); + } catch (ScriptException e) { + throw new YarnException("Failed to compile request match expression: " + + updateItemConf.getRequestMatchExpr(), e); + } + } + // parse execution type + if (updateItemConf.getExecutionType() != null) { + try{ + executionType = ExecutionType.valueOf(updateItemConf.getExecutionType()); + } catch (IllegalArgumentException e) { + throw new YarnException("Failed to parse execution-type: " + + updateItemConf.getExecutionType(), e); + } + } + // determine if placement constraint contains placeholders + // and parse it if static + if (updateItemConf.getPlacementConstraint() != null) { + // parse placement constraint + try { + PlacementConstraint.AbstractConstraint absConstraint = + PlacementConstraintParser.parseExpression( + updateItemConf.getPlacementConstraint()); + placementConstraint = new PlacementConstraint(absConstraint); + } catch (PlacementConstraintParseException e) { + throw new YarnException("Failed to parse placement-constraint: " + + updateItemConf.getPlacementConstraint(), e); + } + // mark hasPlaceholder flag for placement constraint + if (hasPlaceholder(updateItemConf.getPlacementConstraint())) { + hasPlaceholderForPC = true; + } + } + // mark hasPlaceholder flag for allocation tags + if (updateItemConf.getAllocationTags() != null) { + for (String tag : updateItemConf.getAllocationTags()) { + if (hasPlaceholder(tag)) { + hasPlaceholderForAllocTags = true; + } + } + } + + // include updateItemConf + this.updateItemConf = updateItemConf; + } + + public DynamicAppInfo getDynamicAppInfo(ApplicationAttemptId appAttemptId) { + try { + Optional<DynamicAppInfo> opt = dynamicAppInfoCache.get(appAttemptId); + if (opt.isPresent()) { + return opt.get(); + } + } catch (Exception e) { + LOG.error("Failed to get dynamic app info for appId: {}", + appAttemptId.getApplicationId(), e); + } + return null; + } + + /** + * Checks if an application matches this update item's criteria. + * Uses JavaScript evaluation of the appMatchExpr against application properties. + * + * @param appId the application ID + * @param appInfo map of application information + * @return true if the application matches, false otherwise + */ + public boolean isAppMatch(ApplicationId appId, + Map<String, Object> appInfo) { + if (appMatchScript == null) { + return true; + } + try { + Bindings bindings = SCRIPT_ENGINE.createBindings(); + bindings.putAll(appInfo); + Boolean isMatched = (Boolean) appMatchScript.eval(bindings); + if (LOG.isDebugEnabled()) { + LOG.debug( + "Check app: appId={}, isMatched={}, appInfo={}, appMatchExpr={}", + appId, isMatched, appInfo, updateItemConf.getAppMatchExpr()); + } + return isMatched; + } catch (Exception e) { + LOG.error( + "Failed to evaluate app-match-expr: appId={}, appMatchExpr={}", + appId, updateItemConf.getAppMatchExpr(), e); + return false; + } + } + + /** + * Checks if a request matches this update item's criteria. + * Uses JavaScript evaluation of the requestMatchExpr against request properties. + * + * @param appId the application ID + * @param infoSupplier supplier for request information + * @return true if the request matches, false otherwise + */ + public boolean isRequestMatch(ApplicationId appId, + Supplier<Map<String, Object>> infoSupplier) { + if (requestMatchScript == null) { + return true; + } + Map<String, Object> info = infoSupplier.get(); + try { + Bindings bindings = SCRIPT_ENGINE.createBindings(); + bindings.putAll(info); + Boolean isMatched = (Boolean) requestMatchScript.eval(bindings); + if (LOG.isDebugEnabled()) { + LOG.debug( + "Check request: appId={}, isMatched={}, reqInfo={}, requestMatchExpr={}", + appId, isMatched, info, updateItemConf.getRequestMatchExpr()); + } + return isMatched; + } catch (Exception e) { + LOG.error("Failed to evaluate request-filter-expression: {}", + updateItemConf.getRequestMatchExpr(), e); + return false; + } + } + + /** + * Updates resource requests and/or scheduling requests based on this update + * item's configuration. May convert resource requests to scheduling + * requests if isRRToSR is configured. Applies updates to each matching + * request, including execution type, placement constraints, and + * allocation tags with variable substitution. + * + * @param appId the application ID + * @param resourceRequests list of resource requests to process + * @param schedulingRequests list of scheduling requests to process + * @param dynamicAppInfo dynamic application information + * @return response containing updated requests and update status + */ + private RequestsHandleResponse updateRequests( + ApplicationId appId, + List<ResourceRequest> resourceRequests, + List<SchedulingRequest> schedulingRequests, + DynamicAppInfo dynamicAppInfo) { + boolean isUpdated = false; + // update resource requests + if (resourceRequests != null) { + for (ResourceRequest rr: resourceRequests) { + if (!isRequestMatch(appId, + () -> convertToRequestInfo(dynamicAppInfo, rr))) { + continue; + } + updateResourceRequest(appId, rr); + isUpdated = true; + } + } + // when both isUpdated and isRRToSR are true, convert to SR at first + if (resourceRequests != null && !resourceRequests.isEmpty() && + isUpdated && updateItemConf.isRRToSR) { + schedulingRequests = resourceRequests.stream() + .map(UpdateItem::convertToSchedulingRequest) + .collect(Collectors.toList()); + resourceRequests = null; + if (LOG.isDebugEnabled()) { + LOG.debug("Converted to scheduling requests: appId={}, sr={}", + appId, schedulingRequests); + } + } + // update scheduling requests + if (schedulingRequests != null) { + for (SchedulingRequest sr: schedulingRequests) { + if (!isRequestMatch(appId, () -> convertToRequestInfo(dynamicAppInfo, sr))) { + continue; + } + updateSchedulingRequest(appId, sr, dynamicAppInfo); + isUpdated = true; + } + } + return new RequestsHandleResponse(isUpdated, resourceRequests, + schedulingRequests); + } + + /** + * Converts a ResourceRequest to a SchedulingRequest. + * Preserves allocation request ID, priority, resource sizing, and node label expression. + * Maps node label expressions to placement constraints. + * + * @param resourceRequest the resource request to convert + * @return a new scheduling request with equivalent properties + */ + public static SchedulingRequest convertToSchedulingRequest( + ResourceRequest resourceRequest) { + if (resourceRequest == null) { + return SchedulingRequest.newBuilder().build(); + } + // Compatible with Hadoop2.x + // whose default value of execution-type-request is null + ExecutionTypeRequest executionTypeRequest = + resourceRequest.getExecutionTypeRequest(); + if (executionTypeRequest == null) { + executionTypeRequest = ExecutionTypeRequest.newInstance(); + } + SchedulingRequest sr = SchedulingRequest.newBuilder() + .executionType(executionTypeRequest) + .allocationRequestId(resourceRequest.getAllocationRequestId()) + .priority(resourceRequest.getPriority()) + .resourceSizing(ResourceSizing.newInstance( + resourceRequest.getNumContainers(), + resourceRequest.getCapability())).build(); + if (resourceRequest.getNodeLabelExpression() != null) { + PlacementConstraint constraint = + PlacementConstraints.targetNodeAttribute(NODE, + NodeAttributeOpCode.EQ, + PlacementConstraints.PlacementTargets.nodePartition( + resourceRequest.getNodeLabelExpression())).build(); + sr.setPlacementConstraint(constraint); + } + return sr; + } + + /** + * Updates a ResourceRequest with configuration from this update item. + * Can modify node label expression (partition) and execution type. + * + * @param appId application ID for logging + * @param rr resource request to update + */ + private void updateResourceRequest(ApplicationId appId, + ResourceRequest rr) { + if (LOG.isDebugEnabled()) { + LOG.debug("Before updating resource request, appId={}, RR={}, conf={}", + appId, rr, updateItemConf.toString()); + } + if (updateItemConf.partition != null) { + rr.setNodeLabelExpression(updateItemConf.partition); + } + if (executionType != null) { + rr.setExecutionTypeRequest( + ExecutionTypeRequest.newInstance(executionType)); + } + if (LOG.isDebugEnabled()) { + LOG.debug("Done updating resource request, appId={}, RR={}", appId, rr); + } + } + + /** + * Updates a SchedulingRequest with configuration from this update item. + * Can modify execution type, placement constraint and allocation tags. + * + * @param appId application ID for logging + * @param sr scheduling request to update + * @param dynamicAppInfo dynamic application information + */ + private void updateSchedulingRequest(ApplicationId appId, + SchedulingRequest sr, DynamicAppInfo dynamicAppInfo) { + if (LOG.isDebugEnabled()) { + LOG.debug( + "Before updating scheduling request, appId={}, SR={}, conf={}", + appId, sr, updateItemConf.toString()); + } + if (executionType != null) { + sr.setExecutionType(ExecutionTypeRequest.newInstance(executionType)); + } + if (dynamicAppInfo.dynamicPlacementConstraint != null) { + sr.setPlacementConstraint(dynamicAppInfo.dynamicPlacementConstraint); + } + if (dynamicAppInfo.dynamicAllocationTags != null) { + sr.setAllocationTags(dynamicAppInfo.dynamicAllocationTags); + } + if (LOG.isDebugEnabled()) { + LOG.debug("Done updating scheduling request, appId={}, SR={}", appId, + sr); + } + } + + private PlacementConstraint getRuntimePlacementConstraint( + Map<String, String> appInfo) { + if (placementConstraint == null) { + return null; + } + if (!hasPlaceholderForPC) { + // return static placement constraint + return placementConstraint; + } + try { + // for dynamic placement constraint + StringSubstitutor substitutor = new StringSubstitutor(appInfo); + String substitutedPCExpression = + substitutor.replace(updateItemConf.getPlacementConstraint()); + PlacementConstraint.AbstractConstraint absConstraint = + PlacementConstraintParser.parseExpression(substitutedPCExpression); + return new PlacementConstraint(absConstraint); + } catch (PlacementConstraintParseException e) { + LOG.warn("Failed to apply variable substitution to placement constraint. " + + "Skip setting placement constraint.", e); + } + return null; + } + + private Set<String> getRuntimeAllocationTags(Map<String, String> appInfo) { + if (updateItemConf.getAllocationTags() == null || + updateItemConf.getAllocationTags().isEmpty()) { + return null; + } + if (!hasPlaceholderForAllocTags) { + // return static allocation tags + return updateItemConf.getAllocationTags(); + } + // for dynamic allocation tags + StringSubstitutor substitutor = new StringSubstitutor(appInfo); + return updateItemConf.getAllocationTags().stream() + .map(substitutor::replace).collect(Collectors.toSet()); + } + + @VisibleForTesting + public UpdateItemConf getUpdateItemConf() { + return updateItemConf; + } + + @VisibleForTesting + public boolean hasPlaceholderForPC() { + return hasPlaceholderForPC; + } + + @VisibleForTesting + public boolean hasPlaceholderForAllocTags() { + return hasPlaceholderForAllocTags; + } + + @VisibleForTesting + public CompiledScript getAppMatchScript() { + return appMatchScript; + } + + @VisibleForTesting + public CompiledScript getRequestMatchScript() { + return requestMatchScript; + } + + @VisibleForTesting + public PlacementConstraint getPlacementConstraint() { + return placementConstraint; + } + + @VisibleForTesting + public ExecutionType getExecutionType() { + return executionType; + } + } + + private static Map<String, String> convertToStringAppInfo( + Map<String, Object> appInfo) { + return appInfo.entrySet().stream().collect( + Collectors.toMap(Map.Entry::getKey, + e -> e.getValue() != null ? e.getValue().toString() : "")); + } + + private static Map<String, Object> convertToAppInfo(FiCaSchedulerApp app, + RMApp rmApp) { + return ImmutableMap.of(APP_INFO_KEY_QUEUE, app.getQueueName(), + APP_INFO_KEY_USER, app.getUser(), + APP_INFO_KEY_PRIORITY, app.getPriority() == null ? + 0 : app.getPriority().getPriority(), + APP_INFO_KEY_ID, rmApp.getApplicationId().toString(), + APP_INFO_KEY_NAME, rmApp.getName(), + APP_INFO_KEY_TYPE, rmApp.getApplicationType(), + APP_INFO_KEY_TAGS, rmApp.getApplicationTags()); + } + + private static Map<String, Object> convertToRequestInfo( + DynamicAppInfo dynamicAppInfo, ResourceRequest rr) { + return ImmutableMap.of(REQUEST_INFO_KEY_PRIORITY, + rr.getPriority() == null ? 0 : rr.getPriority().getPriority(), + REQUEST_INFO_KEY_RESOURCE_NAME, rr.getResourceName(), + REQUEST_INFO_KEY_RELAX_LOCALITY, rr.getRelaxLocality(), + REQUEST_INFO_KEY_IS_AM, dynamicAppInfo.app.isWaitingForAMContainer()); + } + + private static Map<String, Object> convertToRequestInfo( + DynamicAppInfo dynamicAppInfo, SchedulingRequest sr) { + return ImmutableMap.of(REQUEST_INFO_KEY_PRIORITY, + sr.getPriority() == null ? 0 : sr.getPriority().getPriority(), + REQUEST_INFO_KEY_EXECUTION_TYPE, sr.getExecutionType() == null ? + "" : + (sr.getExecutionType().getExecutionType() == null ? + "" : + sr.getExecutionType().getExecutionType().name()), + REQUEST_INFO_KEY_ALLOCATION_TAGS, sr.getAllocationTags(), + REQUEST_INFO_KEY_IS_AM, dynamicAppInfo.app.isWaitingForAMContainer()); + } + + public static boolean hasPlaceholder(String text) throws YarnException { + if (text == null) { + return false; + } + // find out all placeholder + Matcher matcher = PLACEHOLDER_PATTERN.matcher(text); + int placeholderCount = 0; + while (matcher.find()) { + String placeholder = matcher.group(); + String placeholderKey = placeholder.substring(2, placeholder.length() - 1); + if (!VALID_PLACEHOLDER_KEYS.contains(placeholderKey)) { + throw new YarnException("Invalid placeholder: " + placeholder); + } + placeholderCount++; + } + return placeholderCount > 0; + } + + public static class DynamicAppInfo { + private final FiCaSchedulerApp app; + private final boolean isMatched; + // dynamic placement constraint + private final PlacementConstraint dynamicPlacementConstraint; + // dynamic allocation tags + private final Set<String> dynamicAllocationTags; + + public DynamicAppInfo(FiCaSchedulerApp app, boolean isMatched, + PlacementConstraint dynamicPlacementConstraint, + Set<String> dynamicAllocationTags) { + this.app = app; + this.isMatched = isMatched; + this.dynamicPlacementConstraint = dynamicPlacementConstraint; + this.dynamicAllocationTags = dynamicAllocationTags; + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestCapacitySchedulerMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestCapacitySchedulerMetrics.java index 894e8fd2f5a..bc0ee8162c6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestCapacitySchedulerMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestCapacitySchedulerMetrics.java @@ -84,6 +84,7 @@ public RMNodeLabelsManager createNodeLabelManager() { assertEquals(0, csMetrics.getNumOfAllocates()); assertEquals(0, csMetrics.getNumOfCommitSuccess()); + assertEquals(0, csMetrics.getRequestsHandle().lastStat().numSamples()); RMApp rmApp = MockRMAppSubmitter.submit(rm, MockRMAppSubmissionData.Builder.createWithMemory(1024, rm) @@ -101,6 +102,9 @@ public RMNodeLabelsManager createNodeLabelManager() { am.registerAppAttempt(); am.allocate("*", 1024, 1, new ArrayList<>()); + assertTrue( + csMetrics.getRequestsHandle().lastStat().numSamples() > 0); + nm1.nodeHeartbeat(true); nm2.nodeHeartbeat(true); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerRequestsHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerRequestsHandler.java new file mode 100644 index 00000000000..ca78905417f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerRequestsHandler.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import java.util.ArrayList; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap; +import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.MockAM; +import org.apache.hadoop.yarn.server.resourcemanager.MockNM; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmissionData; +import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmitter; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; +import org.junit.Test; + +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.GB; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.toSet; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils.getConfigurationWithQueueLabels; + +public class TestCapacitySchedulerRequestsHandler { + + /** + * Simple e2e verification for requests-handler. + * - requests-handler can be enabled dynamically via reinitializing scheduler + * - partition will be updated for the matched app and request + */ + @Test + public void testRequestsHandlerSimpleCase() throws Exception { + Configuration conf = new Configuration(false); + conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true); + conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER, + YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER); + conf = getConfigurationWithQueueLabels(conf); + + final RMNodeLabelsManager mgr = new NullRMNodeLabelsManager(); + mgr.init(conf); + mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x", "y")); + mgr.addLabelsToNode(ImmutableMap.of( + NodeId.newInstance("h1", 0), toSet("x"))); + + MockRM rm = new MockRM(conf) { + protected RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + rm.start(); + + MockNM nm1 = // label = x + new MockNM("h1:1234", 200 * GB, rm.getResourceTrackerService()); + nm1.registerNode(); + MockNM nm2 = // label = "" + new MockNM("h2:1234", 200 * GB, rm.getResourceTrackerService()); + nm2.registerNode(); + + // Launch app1 in queue=a1 + MockRMAppSubmissionData data1 = + MockRMAppSubmissionData.Builder.createWithMemory(GB, rm) + .withAppName("app1").withUser("root").withAcls(null) + .withQueue("a1").withUnmanagedAM(false).build(); + RMApp app1 = MockRMAppSubmitter.submit(rm, data1); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2); + + // am1 asks for a container with no label which will be allocated on nm2 + am1.allocate("*", GB, 1, 10, new ArrayList<>(), ""); + ContainerId containerId = + ContainerId.newContainerId(am1.getApplicationAttemptId(), 2); + rm.waitForState(nm2, containerId, RMContainerState.ALLOCATED); + + // refresh conf with requests-handler enabled + // partition will be updated to 'x' for apps in a1 queue + conf.set(CapacitySchedulerConfiguration.REQUEST_HANDLER_ENABLED, "true"); + conf.set(CapacitySchedulerConfiguration.REQUEST_HANDLER_UPDATES, + "{\"items\":[{\"appMatchExpr\":\"queue=='a1'\"," + + "\"requestMatchExpr\":\"priority>5\"," + + " \"partition\":\"x\"}]}"); + rm.getResourceScheduler().reinitialize(conf, rm.getRMContext()); + + // am1 asks for another container with no label + // request matched, partition will be updated to 'x' for this request + // so that it will be allocated on nm1 + am1.allocate("*", GB, 1, 10, new ArrayList<>(), ""); + containerId = ContainerId.newContainerId(am1.getApplicationAttemptId(), 3); + rm.waitForState(nm1, containerId, RMContainerState.ALLOCATED); + + // am1 asks for another container with no label + // request not matched, partition won't be updated to 'x' for this request + // so that it will be allocated on nm2 + am1.allocate("*", GB, 1, new ArrayList<>(), ""); + containerId = ContainerId.newContainerId(am1.getApplicationAttemptId(), 4); + rm.waitForState(nm2, containerId, RMContainerState.ALLOCATED); + + rm.stop(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestRequestsHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestRequestsHandler.java new file mode 100644 index 00000000000..bdd6328a271 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestRequestsHandler.java @@ -0,0 +1,322 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.function.Function; + +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet; +import org.apache.hadoop.util.Lists; +import org.apache.hadoop.util.Sets; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.SchedulingRequest; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.log4j.Level; +import org.apache.log4j.LogManager; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + +public class TestRequestsHandler { + + private Configuration conf; + private RMContext rmContext; + private RequestsHandler requestsHandler; + + @Before + public void setUp() { + rmContext = mock(RMContext.class); + CapacityScheduler scheduler = mock(CapacityScheduler.class); + when(rmContext.getScheduler()).thenReturn(scheduler); + ConcurrentHashMap<ApplicationId, RMApp> rmApps = new ConcurrentHashMap<>(); + when(rmContext.getRMApps()).thenReturn(rmApps); + + Function<ApplicationAttemptId, Pair<FiCaSchedulerApp, RMApp>> appProvider = + appAttemptId -> { + FiCaSchedulerApp app = scheduler.getApplicationAttempt(appAttemptId); + RMApp rmApp = + rmContext.getRMApps().get(appAttemptId.getApplicationId()); + if (app == null || rmApp == null) { + return null; + } + return ImmutablePair.of(app, rmApp); + }; + requestsHandler = new RequestsHandler(appProvider); + conf = new Configuration(); + LogManager.getLogger(RequestsHandler.LOG.getName()).setLevel(Level.DEBUG); + } + + @Test + public void testInitialize() throws IOException, YarnException { + // invalid conf + conf.setBoolean(CapacitySchedulerConfiguration.REQUEST_HANDLER_ENABLED, + true); + conf.set(CapacitySchedulerConfiguration.REQUEST_HANDLER_UPDATES, "{"); + assertThrows(IOException.class, () -> requestsHandler.initialize(conf)); + + // invalid allocation tags + conf.setBoolean(CapacitySchedulerConfiguration.REQUEST_HANDLER_ENABLED, + true); + conf.set(CapacitySchedulerConfiguration.REQUEST_HANDLER_UPDATES, + "{\"items\":[{\"allocationTags\":\"xxx\"}]}"); + assertThrows(IOException.class, () -> requestsHandler.initialize(conf)); + + // invalid placement constraint + conf.setBoolean(CapacitySchedulerConfiguration.REQUEST_HANDLER_ENABLED, + true); + conf.set(CapacitySchedulerConfiguration.REQUEST_HANDLER_UPDATES, + "{\"items\":[{\"placementConstraint\":\"and\"}]}"); + assertThrows(YarnException.class, () -> requestsHandler.initialize(conf)); + try { + requestsHandler.initialize(conf); + } catch (YarnException e) { + assertTrue( + e.getMessage().contains("Failed to parse placement-constraint")); + } + + // invalid placeholder for allocation tags + conf.setBoolean(CapacitySchedulerConfiguration.REQUEST_HANDLER_ENABLED, + true); + conf.set(CapacitySchedulerConfiguration.REQUEST_HANDLER_UPDATES, + "{\"items\":[{\"allocationTags\":[\"xxx\",\"${invalid}\"]}]}"); + assertThrows(YarnException.class, () -> requestsHandler.initialize(conf)); + try { + requestsHandler.initialize(conf); + } catch (YarnException e) { + assertTrue(e.getMessage().contains("Invalid placeholder")); + } + + // invalid placeholder for placement constraints + conf.setBoolean(CapacitySchedulerConfiguration.REQUEST_HANDLER_ENABLED, + true); + conf.set(CapacitySchedulerConfiguration.REQUEST_HANDLER_UPDATES, + "{\"items\":[{\"placementConstraint\":" + + "\"and(in,rack,${name}:notin,node,${invalid})\"}]}"); + assertThrows(YarnException.class, () -> requestsHandler.initialize(conf)); + try { + requestsHandler.initialize(conf); + } catch (YarnException e) { + assertTrue(e.getMessage().contains("Invalid placeholder")); + } + + // disabled + conf.setBoolean(CapacitySchedulerConfiguration.REQUEST_HANDLER_ENABLED, + false); + conf.set(CapacitySchedulerConfiguration.REQUEST_HANDLER_UPDATES, + "{\"items\":[]}"); + requestsHandler.initialize(conf); + assertFalse(requestsHandler.isEnabled()); + assertNull(requestsHandler.getUpdateItems()); + + // enabled without items + conf.setBoolean(CapacitySchedulerConfiguration.REQUEST_HANDLER_ENABLED, + true); + conf.unset(CapacitySchedulerConfiguration.REQUEST_HANDLER_UPDATES); + requestsHandler.initialize(conf); + assertTrue(requestsHandler.isEnabled()); + assertNull(requestsHandler.getUpdateItems()); + + // enabled with 1 item + conf.setBoolean(CapacitySchedulerConfiguration.REQUEST_HANDLER_ENABLED, + true); + conf.set(CapacitySchedulerConfiguration.REQUEST_HANDLER_UPDATES, + "{\"items\":[{\"appMatchExpr\":\"queue=='test'\", " + + "\"requestMatchExpr\":\"priority>10\", \"isRRToSR\":true," + + " \"placementConstraint\":\"and(in,rack,tag_${id}:notin,node,zk)\"," + + " \"executionType\":\"OPPORTUNISTIC\"," + + " \"allocationTags\":[\"tag1\", \"tag_${id}\"]}]}"); + requestsHandler.initialize(conf); + assertTrue(requestsHandler.isEnabled()); + assertNotNull(requestsHandler.getUpdateItems()); + List<RequestsHandler.UpdateItem> items = + requestsHandler.getUpdateItems(); + assertEquals(1, items.size()); + + RequestsHandler.UpdateItem item = items.get(0); + assertEquals("and(in,rack,tag_${id}:notin,node,zk)", + item.getPlacementConstraint().toString()); + assertTrue(item.hasPlaceholderForPC()); + assertTrue(item.hasPlaceholderForAllocTags()); + assertEquals(ExecutionType.OPPORTUNISTIC, item.getExecutionType()); + assertNotNull(item.getAppMatchScript()); + assertNotNull(item.getRequestMatchScript()); + + RequestsHandler.UpdateItemConf itemConf = items.get(0).getUpdateItemConf(); + assertEquals("queue=='test'", itemConf.getAppMatchExpr()); + assertEquals("priority>10", itemConf.getRequestMatchExpr()); + assertEquals("and(in,rack,tag_${id}:notin,node,zk)", + itemConf.getPlacementConstraint()); + assertEquals("OPPORTUNISTIC", itemConf.getExecutionType()); + assertEquals(2, itemConf.getAllocationTags().size()); + assertTrue(itemConf.getAllocationTags().contains("tag1")); + assertTrue(itemConf.getAllocationTags().contains("tag_${id}")); + + // turned to disabled + conf.setBoolean(CapacitySchedulerConfiguration.REQUEST_HANDLER_ENABLED, + false); + requestsHandler.initialize(conf); + assertFalse(requestsHandler.isEnabled()); + assertNull(requestsHandler.getUpdateItems()); + + // turned to enabled + conf.setBoolean(CapacitySchedulerConfiguration.REQUEST_HANDLER_ENABLED, + true); + requestsHandler.initialize(conf); + assertTrue(requestsHandler.isEnabled()); + assertNotNull(requestsHandler.getUpdateItems()); + assertEquals(1, requestsHandler.getUpdateItems().size()); + } + + private FiCaSchedulerApp mockApp(int id, int priority, + String queueName, String user, String appName, String appType, + List<String> tags) { + ApplicationId appId = ApplicationId.newInstance(1, id); + ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance( + appId, 1); + FiCaSchedulerApp app = mock(FiCaSchedulerApp.class); + when(app.getApplicationAttemptId()).thenReturn(appAttemptId); + when(app.getApplicationId()).thenReturn(appId); + when(app.isWaitingForAMContainer()).thenReturn(true); + when(app.getQueueName()).thenReturn(queueName); + when(app.getUser()).thenReturn(user); + when(app.getPriority()).thenReturn(Priority.newInstance(priority)); + CapacityScheduler scheduler = (CapacityScheduler) rmContext.getScheduler(); + when(scheduler.getApplicationAttempt( + app.getApplicationAttemptId())).thenReturn(app); + + RMApp rmApp = mock(RMApp.class); + when(rmApp.getApplicationId()).thenReturn(appId); + when(rmApp.getName()).thenReturn(appName); + when(rmApp.getApplicationType()).thenReturn(appType); + when(rmApp.getApplicationTags()) + .thenReturn(ImmutableSet.copyOf(tags)); + + ConcurrentMap<ApplicationId, RMApp> rmApps = + rmContext.getRMApps(); + rmApps.put(appId, rmApp); + return app; + } + + @Test + public void testHandleRequests() throws IOException, YarnException { + // mock apps and request + FiCaSchedulerApp app1 = mockApp(1, 0, "test1", "user1", "app1", + "MapReduce", Lists.newArrayList("tag1", "tag2")); + FiCaSchedulerApp app2 = mockApp(2, 1, "test", "user2", "app2", + "MapReduce", Lists.newArrayList("tag1", "tag2")); + + ResourceRequest rr1 = ResourceRequest.newInstance(Priority.newInstance(3), + "*", Resource.newInstance(4096, 2), 1, true); + rr1.setExecutionTypeRequest(ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC)); + rr1.setNodeLabelExpression("x"); + + /* + * check choosing by app, converting RR to SR + */ + conf.setBoolean(CapacitySchedulerConfiguration.REQUEST_HANDLER_ENABLED, + true); + conf.set(CapacitySchedulerConfiguration.REQUEST_HANDLER_UPDATES, + "{\"items\":[{\"appMatchExpr\":\"queue=='test'\", \"isRRToSR\":true}]}"); + requestsHandler.initialize(conf); + + // app1 not-matched, won't convert to scheduling request + RequestsHandleResponse response = + requestsHandler.handle(app1.getApplicationAttemptId(), + Lists.newArrayList(rr1), null); + assertFalse(response.isUpdated()); + assertEquals(1, response.getResourceRequests().size()); + assertNull(response.getSchedulingRequests()); + + // app2 matched, will be converted to scheduling request + response = requestsHandler.handle(app2.getApplicationAttemptId(), + Lists.newArrayList(rr1), null); + assertTrue(response.isUpdated()); + assertEquals(1, response.getSchedulingRequests().size()); + assertEquals(0, response.getResourceRequests().size()); + SchedulingRequest gotSR1 = response.getSchedulingRequests().get(0); + assertEquals(rr1.getPriority(), gotSR1.getPriority()); + assertEquals(rr1.getCapability(), gotSR1.getResourceSizing().getResources()); + assertEquals(rr1.getNumContainers(), gotSR1.getResourceSizing().getNumAllocations()); + assertEquals(rr1.getExecutionTypeRequest(), gotSR1.getExecutionType()); + assertEquals("node,EQ,yarn_node_partition/=[x]", gotSR1.getPlacementConstraint().toString()); + + /* + * check choosing by app and request, converting RR to SR, + * then updating priority, execution-type, and allocation-tags + */ + conf.set(CapacitySchedulerConfiguration.REQUEST_HANDLER_UPDATES, + "{\"items\":[{\"appMatchExpr\":\"queue=='test'\", " + + "\"requestMatchExpr\":\"priority>10\", \"isRRToSR\":true," + + " \"placementConstraint\":\"and(in,rack,${id}:notin,node,zk)\"," + + " \"executionType\":\"OPPORTUNISTIC\"," + + " \"allocationTags\":[\"tag1\", \"${id}\"]}]}"); + requestsHandler.initialize(conf); + + ResourceRequest rr2 = + ResourceRequest.newInstance(Priority.newInstance(20), "*", + Resource.newInstance(1024, 1), 5, true); + response = requestsHandler.handle(app2.getApplicationAttemptId(), + Lists.newArrayList(rr1, rr2), null); + assertTrue(response.isUpdated()); + assertEquals(2, response.getSchedulingRequests().size()); + + // both rr1 and rr2 should be converted to scheduling requests + // rr1 not matched + gotSR1 = response.getSchedulingRequests().get(0); + assertEquals(rr1.getPriority(), gotSR1.getPriority()); + assertEquals(rr1.getCapability(), + gotSR1.getResourceSizing().getResources()); + assertEquals(rr1.getNumContainers(), + gotSR1.getResourceSizing().getNumAllocations()); + assertEquals(rr1.getExecutionTypeRequest(), gotSR1.getExecutionType()); + assertEquals("node,EQ,yarn_node_partition/=[x]", + gotSR1.getPlacementConstraint().toString()); + + // rr2 matched, should be updated + SchedulingRequest gotSR2 = response.getSchedulingRequests().get(1); + assertEquals(rr2.getCapability(), + gotSR2.getResourceSizing().getResources()); + assertEquals(rr2.getNumContainers(), + gotSR2.getResourceSizing().getNumAllocations()); + assertEquals(ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC), + gotSR2.getExecutionType()); + assertEquals(Sets.newHashSet("tag1", app2.getApplicationId().toString()), + gotSR2.getAllocationTags()); + assertEquals(String.format("and(in,rack,%s:notin,node,zk)", + app2.getApplicationId().toString()), + gotSR2.getPlacementConstraint().toString()); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org