This is an automated email from the ASF dual-hosted git repository.

taoyang pushed a commit to branch YARN-11781
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit 7901e3931dda92ab32c5231fc2aa84fe1e6bcf4c
Author: Tao Yang <taoy...@apache.org>
AuthorDate: Fri Feb 28 12:07:35 2025 +0800

    YARN-11781. Implement dynamic requests handling in CapacityScheduler.
---
 .../scheduler/capacity/CapacityScheduler.java      |  63 ++
 .../capacity/CapacitySchedulerConfiguration.java   |  11 +
 .../capacity/CapacitySchedulerMetrics.java         |  11 +
 .../scheduler/capacity/RequestsHandleResponse.java |  58 ++
 .../scheduler/capacity/RequestsHandler.java        | 868 +++++++++++++++++++++
 .../TestCapacitySchedulerMetrics.java              |   4 +
 .../TestCapacitySchedulerRequestsHandler.java      | 118 +++
 .../scheduler/capacity/TestRequestsHandler.java    | 322 ++++++++
 8 files changed, 1455 insertions(+)

diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index 02ffe83a6df..8a716b6a013 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -34,8 +34,11 @@
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
 import 
org.apache.hadoop.yarn.server.resourcemanager.RMCriticalThreadUncaughtExceptionHandler;
 import 
org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext;
 import 
org.apache.hadoop.yarn.server.resourcemanager.placement.CSMappingPlacementRule;
@@ -240,6 +243,8 @@ public Configuration getConf() {
 
   private CSMaxRunningAppsEnforcer maxRunningEnforcer;
 
+  private RequestsHandler requestsHandler;
+
   public CapacityScheduler() {
     super(CapacityScheduler.class.getName());
     this.maxRunningEnforcer = new CSMaxRunningAppsEnforcer(this);
@@ -328,6 +333,7 @@ void initScheduler(Configuration configuration) throws
       offswitchPerHeartbeatLimit = this.conf.getOffSwitchPerHeartbeatLimit();
 
       initMultiNodePlacement();
+      initRequestsHandler();
       printSchedulerInitialized();
     } finally {
       writeLock.unlock();
@@ -455,8 +461,15 @@ public void reinitialize(Configuration newConf, RMContext 
rmContext,
         refreshMaximumAllocation(
             ResourceUtils.fetchMaximumAllocationFromConfig(this.conf));
         reinitializeQueues(this.conf);
+        reinitRequestsHandler(this.conf);
       } catch (Throwable t) {
         this.conf = oldConf;
+        try {
+          reinitRequestsHandler(this.conf);
+        } catch (Throwable innerT) {
+          LOG.error("Failed to re-init requests handler : {}",
+              innerT.getMessage(), innerT);
+        }
         reinitializeQueues(this.conf);
         refreshMaximumAllocation(
             ResourceUtils.fetchMaximumAllocationFromConfig(this.conf));
@@ -1337,6 +1350,26 @@ public Allocation allocate(ApplicationAttemptId 
applicationAttemptId,
       return EMPTY_ALLOCATION;
     }
 
+    // Handle requests
+    long requestsHandleStartTime = System.currentTimeMillis();
+    RequestsHandleResponse handleResponse =
+        handleRequests(applicationAttemptId, ask, schedulingRequests);
+    long requestsHandleElapsedMs =
+        System.currentTimeMillis() - requestsHandleStartTime;
+    CapacitySchedulerMetrics.getMetrics().addRequestsHandle(
+        requestsHandleElapsedMs);
+    if (handleResponse != null && handleResponse.isUpdated()) {
+      LOG.info("Updated requests: appId={}, elapsedMs={}; " +
+          "ResourceRequests: origin={}, updated={}; " +
+          "SchedulingRequests: origin={}, updated={}",
+          applicationAttemptId.getApplicationId(),
+          requestsHandleElapsedMs,
+          ask, handleResponse.getResourceRequests(),
+          schedulingRequests, handleResponse.getSchedulingRequests());
+      ask = handleResponse.getResourceRequests();
+      schedulingRequests = handleResponse.getSchedulingRequests();
+    }
+
     // Handle all container updates
     handleContainerUpdates(application, updateRequests);
 
@@ -3642,4 +3675,34 @@ public int compare(FiCaSchedulerApp app1, 
FiCaSchedulerApp app2) {
       }
     }
   }
+
+  /**
+   * initialize / reinitialize / handleRequests methods for RequestsHandler.
+   */
+  private void initRequestsHandler() throws IOException, YarnException {
+    Function<ApplicationAttemptId, Pair<FiCaSchedulerApp, RMApp>> appProvider =
+        appAttemptId -> {
+          FiCaSchedulerApp app = getApplicationAttempt(appAttemptId);
+          RMApp rmApp =
+              rmContext.getRMApps().get(appAttemptId.getApplicationId());
+          if (app == null || rmApp == null) {
+            return null;
+          }
+          return ImmutablePair.of(app, rmApp);
+        };
+    requestsHandler = new RequestsHandler(appProvider);
+    reinitRequestsHandler(this.conf);
+  }
+
+  private void reinitRequestsHandler(Configuration newConf)
+      throws IOException, YarnException {
+    requestsHandler.initialize(newConf);
+  }
+
+  protected RequestsHandleResponse handleRequests(
+      ApplicationAttemptId appAttemptId,
+      List<ResourceRequest> resourceRequests,
+      List<SchedulingRequest> schedulingRequests) {
+    return requestsHandler.handle(appAttemptId, resourceRequests, 
schedulingRequests);
+  }
 }
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
index ea5c892ce3e..c9442f37afd 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
@@ -430,6 +430,17 @@ public class CapacitySchedulerConfiguration extends 
ReservationSchedulerConfigur
   public static final String MAPPING_RULE_FORMAT_DEFAULT =
       MAPPING_RULE_FORMAT_LEGACY;
 
+  public static final String REQUEST_HANDLER_PREFIX =
+      PREFIX + "request-handler.";
+
+  public static final String REQUEST_HANDLER_ENABLED =
+      REQUEST_HANDLER_PREFIX + "enabled";
+
+  public static final boolean DEFAULT_REQUEST_HANDLER_ENABLED = false;
+
+  public static final String REQUEST_HANDLER_UPDATES =
+      REQUEST_HANDLER_PREFIX + "updates";
+
   private static final QueueCapacityConfigParser queueCapacityConfigParser
       = new QueueCapacityConfigParser();
   private static final String LEGACY_QUEUE_MODE_ENABLED = PREFIX + 
"legacy-queue-mode.enabled";
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerMetrics.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerMetrics.java
index 6277290246b..d500e2739c1 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerMetrics.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerMetrics.java
@@ -52,6 +52,8 @@ public class CapacitySchedulerMetrics {
   @Metric("Scheduler node update") MutableRate nodeUpdate;
   @Metric("Scheduler node heartbeat interval") MutableQuantiles
       schedulerNodeHBInterval;
+  @Metric("Requests handle")
+  private MutableRate requestsHandle;
 
   private static volatile CapacitySchedulerMetrics INSTANCE = null;
   private static MetricsRegistry registry;
@@ -128,4 +130,13 @@ public void addSchedulerNodeHBInterval(long 
heartbeatInterval) {
   public long getNumOfSchedulerNodeHBInterval() {
     return this.schedulerNodeHBInterval.getEstimator().getCount();
   }
+
+  public void addRequestsHandle(long latency) {
+    this.requestsHandle.add(latency);
+  }
+
+  @VisibleForTesting
+  public MutableRate getRequestsHandle() {
+    return requestsHandle;
+  }
 }
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/RequestsHandleResponse.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/RequestsHandleResponse.java
new file mode 100644
index 00000000000..216e0abc3e7
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/RequestsHandleResponse.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.SchedulingRequest;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Response of the handle method in {@link RequestsHandler}.
+ */
+public class RequestsHandleResponse {
+
+  private final boolean isUpdated;
+  private final List<ResourceRequest> resourceRequests;
+  private final List<SchedulingRequest> schedulingRequests;
+
+  public RequestsHandleResponse(boolean isUpdated,
+      List<ResourceRequest> resourceRequests,
+      List<SchedulingRequest> schedulingRequests) {
+    this.isUpdated = isUpdated;
+    this.resourceRequests = resourceRequests;
+    this.schedulingRequests = schedulingRequests;
+  }
+
+  public List<ResourceRequest> getResourceRequests() {
+    if (resourceRequests == null) {
+      return new ArrayList<>();
+    }
+    return resourceRequests;
+  }
+
+  public List<SchedulingRequest> getSchedulingRequests() {
+    return schedulingRequests;
+  }
+
+  public boolean isUpdated() {
+    return isUpdated;
+  }
+}
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/RequestsHandler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/RequestsHandler.java
new file mode 100644
index 00000000000..500de30c3cc
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/RequestsHandler.java
@@ -0,0 +1,868 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import javax.script.Bindings;
+import javax.script.Compilable;
+import javax.script.CompiledScript;
+import javax.script.ScriptEngine;
+import javax.script.ScriptEngineManager;
+import javax.script.ScriptException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.commons.text.StringSubstitutor;
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.thirdparty.com.google.common.cache.CacheLoader;
+import org.apache.hadoop.thirdparty.com.google.common.cache.CacheBuilder;
+import org.apache.hadoop.thirdparty.com.google.common.cache.LoadingCache;
+import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap;
+import org.apache.hadoop.util.Sets;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
+import org.apache.hadoop.yarn.api.records.NodeAttributeOpCode;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.ResourceSizing;
+import org.apache.hadoop.yarn.api.records.SchedulingRequest;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraints;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import 
org.apache.hadoop.yarn.util.constraint.PlacementConstraintParseException;
+import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.NODE;
+
+/**
+ * RequestHandler is used to handle requests from applications,
+ * It handles requests at the beginning of CapacityScheduler#allocate,
+ * and manages multiple update items which define which requests
+ * should be chosen and how to update them. based on the capacity-scheduler
+ * configuration and can be updated dynamically without restarting the RM.
+ */
+public class RequestsHandler {
+
+  protected static final Logger LOG =
+      LoggerFactory.getLogger(RequestsHandler.class);
+
+  private static final Pattern PLACEHOLDER_PATTERN =
+      Pattern.compile("\\$\\{[^}]+\\}");
+
+  private static final String APP_INFO_KEY_QUEUE = "queue";
+  private static final String APP_INFO_KEY_USER = "user";
+  private static final String APP_INFO_KEY_PRIORITY = "priority";
+  private static final String APP_INFO_KEY_ID = "id";
+  private static final String APP_INFO_KEY_NAME = "name";
+  private static final String APP_INFO_KEY_TYPE = "type";
+  private static final String APP_INFO_KEY_TAGS = "tags";
+  private static final String REQUEST_INFO_KEY_PRIORITY = "priority";
+  private static final String REQUEST_INFO_KEY_RESOURCE_NAME = "resourceName";
+  private static final String REQUEST_INFO_KEY_RELAX_LOCALITY = 
"relaxLocality";
+  private static final String REQUEST_INFO_KEY_EXECUTION_TYPE = 
"executionType";
+  private static final String REQUEST_INFO_KEY_ALLOCATION_TAGS =
+      "allocationTags";
+  private static final String REQUEST_INFO_KEY_IS_AM = "isAM";
+
+  private static final Set<String> VALID_PLACEHOLDER_KEYS =
+      Sets.newHashSet(APP_INFO_KEY_QUEUE, APP_INFO_KEY_USER,
+          APP_INFO_KEY_PRIORITY, APP_INFO_KEY_ID, APP_INFO_KEY_NAME,
+          APP_INFO_KEY_TYPE, APP_INFO_KEY_TAGS);
+
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+  private static final ScriptEngine SCRIPT_ENGINE =
+      new ScriptEngineManager().getEngineByName("JavaScript");
+
+  private final Function<ApplicationAttemptId, Pair<FiCaSchedulerApp, RMApp>>
+      appProvider;
+  private final ReentrantReadWriteLock.WriteLock writeLock;
+  private final ReentrantReadWriteLock.ReadLock readLock;
+
+  private boolean enabled = false;
+
+  private List<UpdateItem> updateItems;
+
+  // current updates conf value for comparing
+  private String updatesConfV;
+
+  public RequestsHandler(Function<ApplicationAttemptId,
+          Pair<FiCaSchedulerApp, RMApp>> appProvider) {
+    this.appProvider = appProvider;
+    ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+    writeLock = lock.writeLock();
+    readLock = lock.readLock();
+  }
+
+  public void initialize(Configuration conf)
+      throws IOException, YarnException {
+    if (SCRIPT_ENGINE == null) {
+      // disabled if script engine is not found
+      LOG.warn("Disabled RequestsHandler since script engine not found");
+      return;
+    }
+    boolean newEnabled =
+        conf.getBoolean(CapacitySchedulerConfiguration.REQUEST_HANDLER_ENABLED,
+            CapacitySchedulerConfiguration.DEFAULT_REQUEST_HANDLER_ENABLED);
+    List<UpdateItem> newUpdateItems = null;
+    String newUpdatesConfV = null;
+    if (newEnabled) {
+      newUpdatesConfV =
+          conf.get(CapacitySchedulerConfiguration.REQUEST_HANDLER_UPDATES);
+      UpdatesConf newUpdatesConf = null;
+      if (newUpdatesConfV != null && !newUpdatesConfV.isEmpty()) {
+        newUpdatesConf =
+            OBJECT_MAPPER.readValue(newUpdatesConfV, UpdatesConf.class);
+      }
+      if (newUpdatesConf != null && newUpdatesConf.getItems() != null &&
+          !newUpdatesConf.getItems().isEmpty()) {
+        newUpdateItems = new ArrayList<>();
+        for (int i = 0; i < newUpdatesConf.getItems().size(); i++) {
+          UpdateItemConf updateItemConf = newUpdatesConf.getItems().get(i);
+          newUpdateItems.add(new UpdateItem(i, updateItemConf, 
this.appProvider));
+        }
+      }
+    }
+    // update
+    writeLock.lock();
+    try{
+      if (enabled == newEnabled &&
+          StringUtils.equals(newUpdatesConfV, updatesConfV)) {
+        LOG.info("No changes detected in RequestsHandler configuration," +
+            " enabled={}, updatesConf={}", enabled, updatesConfV);
+        return;
+      }
+      enabled = newEnabled;
+      updateItems = newUpdateItems;
+      updatesConfV = newUpdatesConfV;
+      LOG.info("Initialized request updater, enabled={}, updatesConf={}",
+          enabled, updatesConfV);
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  public RequestsHandleResponse handle(ApplicationAttemptId appAttemptId,
+      List<ResourceRequest> resourceRequests,
+      List<SchedulingRequest> schedulingRequests) {
+    readLock.lock();
+    try {
+      if (!enabled || updateItems == null || updateItems.isEmpty()) {
+        return null;
+      }
+      return updateRequests(appAttemptId, resourceRequests, 
schedulingRequests);
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  /**
+   * UpdatesConf is the root object of the configuration.
+   */
+  public static class UpdatesConf {
+
+    @JsonProperty("items")
+    private List<UpdateItemConf> items;
+
+    public List<UpdateItemConf> getItems() {
+      return items;
+    }
+
+    public void setItems(List<UpdateItemConf> items) {
+      this.items = items;
+    }
+  }
+
+  public static class UpdateItemConf {
+
+    @JsonProperty("appMatchExpr")
+    private String appMatchExpr;
+
+    @JsonProperty("requestMatchExpr")
+    private String requestMatchExpr;
+
+    // whether to convert ResourceRequest to SchedulingRequest
+    @JsonProperty("isRRToSR")
+    private boolean isRRToSR;
+
+    @JsonProperty("partition")
+    private String partition;
+
+    @JsonProperty("executionType")
+    private String executionType;
+
+    @JsonProperty("allocationTags")
+    private Set<String> allocationTags;
+
+    @JsonProperty("placementConstraint")
+    private String placementConstraint;
+
+    public String getAppMatchExpr() {
+      return appMatchExpr;
+    }
+
+    public void setAppMatchExpr(String appMatchExpr) {
+      this.appMatchExpr = appMatchExpr;
+    }
+
+    public String getRequestMatchExpr() {
+      return requestMatchExpr;
+    }
+
+    public void setRequestMatchExpr(String requestMatchExpr) {
+      this.requestMatchExpr = requestMatchExpr;
+    }
+
+    public boolean isRRToSR() {
+      return isRRToSR;
+    }
+
+    public void setIsRRToSR(boolean isRRToSR) {
+      this.isRRToSR = isRRToSR;
+    }
+
+    public String getPartition() {
+      return partition;
+    }
+
+    public void setPartition(String partition) {
+      this.partition = partition;
+    }
+
+    public Set<String> getAllocationTags() {
+      return allocationTags;
+    }
+
+    public void setAllocationTags(Set<String> allocationTags) {
+      this.allocationTags = allocationTags;
+    }
+
+    public String getPlacementConstraint() {
+      return placementConstraint;
+    }
+
+    public void setPlacementConstraint(String placementConstraint) {
+      this.placementConstraint = placementConstraint;
+    }
+
+    public String getExecutionType() {
+      return executionType;
+    }
+
+    public void setExecutionType(String executionType) {
+      this.executionType = executionType;
+    }
+
+    public String toString() {
+      return "{" +
+          "appMatchExpr='" + appMatchExpr + '\'' +
+          ", requestMatchExpr='" + requestMatchExpr + '\'' +
+          ", isRRToSR=" + isRRToSR +
+          ", partition='" + partition + '\'' +
+          ", executionType='" + executionType + '\'' +
+          ", allocationTags=" + allocationTags +
+          ", placementConstraint='" + placementConstraint + '\'' +
+          '}';
+    }
+  }
+
+  public RequestsHandleResponse updateRequests(
+      ApplicationAttemptId appAttemptId,
+      List<ResourceRequest> resourceRequests,
+      List<SchedulingRequest> schedulingRequests) {
+    boolean isUpdated = false;
+    // update requests
+    for (UpdateItem updateItem : updateItems) {
+      DynamicAppInfo dynamicAppInfo =
+          updateItem.getDynamicAppInfo(appAttemptId);
+      if (dynamicAppInfo == null) {
+        break;
+      }
+      if (!dynamicAppInfo.isMatched) {
+        continue;
+      }
+
+      RequestsHandleResponse resp =
+          updateItem.updateRequests(appAttemptId.getApplicationId(),
+              resourceRequests, schedulingRequests, dynamicAppInfo);
+      if (resp.isUpdated()) {
+        isUpdated = true;
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(
+              "Updated requests: appId={}, updateItemConf={}, RR={}, SR={}",
+              appAttemptId.getApplicationId(), 
updateItem.updateItemConf.toString(),
+              resp.getResourceRequests(), resp.getSchedulingRequests());
+        }
+      }
+      resourceRequests = resp.getResourceRequests();
+      schedulingRequests = resp.getSchedulingRequests();
+    }
+    return new RequestsHandleResponse(isUpdated, resourceRequests,
+        schedulingRequests);
+  }
+
+  @VisibleForTesting
+  public boolean isEnabled() {
+    return enabled;
+  }
+
+  @VisibleForTesting
+  public List<UpdateItem> getUpdateItems() {
+    return updateItems;
+  }
+
+  /**
+   * UpdateItem is responsible for applying configured request updates based on
+   * matching rules.
+   * Variable substitution allows using placeholders like ${queue}, ${user},
+   * etc. in placement constraints and allocation tags.
+   * These variables are replaced with actual application properties at 
runtime.
+   */
+  public static class UpdateItem {
+
+    Function<ApplicationAttemptId, Pair<FiCaSchedulerApp, RMApp>> appProvider;
+
+    private final int confIndex;
+
+    /*
+     * Precompiled or preprocessed fields designed to enhance the performance
+     * of the update process.
+     */
+    private CompiledScript appMatchScript;
+    private CompiledScript requestMatchScript;
+    private PlacementConstraint placementConstraint;
+    private ExecutionType executionType;
+    private boolean hasPlaceholderForPC;
+    private boolean hasPlaceholderForAllocTags;
+
+    // Configuration for this update item
+    private final UpdateItemConf updateItemConf;
+
+    // Loader and Cache for dynamic app info
+    private final CacheLoader<ApplicationAttemptId, Optional<DynamicAppInfo>>
+        dynamicAppInfoLoader =
+        new CacheLoader<ApplicationAttemptId, Optional<DynamicAppInfo>>() {
+          @Override
+          public Optional<DynamicAppInfo> load(ApplicationAttemptId 
appAttemptId) {
+            Pair<FiCaSchedulerApp, RMApp> appPair =
+                appProvider.apply(appAttemptId);
+            if (appPair == null || appPair.getLeft() == null ||
+                appPair.getRight() == null) {
+              return Optional.empty();
+            }
+            FiCaSchedulerApp app = appPair.getLeft();
+            RMApp rmApp = appPair.getRight();
+            Map<String, Object> appInfo = convertToAppInfo(app, rmApp);
+            Map<String, String> appStrInfo = appInfo.entrySet().stream()
+                .collect(Collectors.toMap(Map.Entry::getKey,
+                    e -> e.getValue() != null ? e.getValue().toString() : ""));
+            boolean isMatched =
+                isAppMatch(appAttemptId.getApplicationId(), appInfo);
+            PlacementConstraint runtimePC =
+                getRuntimePlacementConstraint(appStrInfo);
+            Set<String> allocationTags = getRuntimeAllocationTags(appStrInfo);
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Loaded dynamic app info: confIndex={}, appId={}, " +
+                      "isMatched={}, placementConstraint={}, 
allocationTags={}",
+                  confIndex, appAttemptId.getApplicationId(), isMatched,
+                  runtimePC, allocationTags);
+            }
+            return Optional.of(new DynamicAppInfo(app, isMatched, runtimePC,
+                allocationTags));
+          }
+        };
+
+    private final LoadingCache<ApplicationAttemptId, Optional<DynamicAppInfo>>
+        dynamicAppInfoCache =
+        CacheBuilder.newBuilder().expireAfterWrite(30, TimeUnit.SECONDS)
+            .build(dynamicAppInfoLoader);
+
+    /**
+     * Constructs an UpdateItem with the specified configuration.
+     * Compiles scripts for application and request matching.
+     * Parses execution types and placement constraints from configuration.
+     *
+     * @param confIndex the index of this update item in the configuration
+     * @param updateItemConf the configuration for this update item
+     * @param appProvider a function to provide application information
+     * @throws YarnException if the scripts, execution type, or
+     *                        placement constraint cannot be parsed
+     */
+    public UpdateItem(int confIndex, UpdateItemConf updateItemConf,
+        Function<ApplicationAttemptId, Pair<FiCaSchedulerApp, RMApp>>
+            appProvider)
+        throws YarnException {
+      this.confIndex = confIndex;
+      this.appProvider = appProvider;
+      // compile app/request match-scripts
+      if (updateItemConf.getAppMatchExpr() != null) {
+        try {
+          appMatchScript = ((Compilable) SCRIPT_ENGINE).compile(
+              updateItemConf.getAppMatchExpr());
+        } catch (ScriptException e) {
+          throw new YarnException("Failed to compile app match expression: "
+              + updateItemConf.getAppMatchExpr(), e);
+        }
+      }
+      if (updateItemConf.getRequestMatchExpr() != null) {
+        try {
+          requestMatchScript = ((Compilable) SCRIPT_ENGINE).compile(
+              updateItemConf.getRequestMatchExpr());
+        } catch (ScriptException e) {
+          throw new YarnException("Failed to compile request match expression: 
"
+              + updateItemConf.getRequestMatchExpr(), e);
+        }
+      }
+      // parse execution type
+      if (updateItemConf.getExecutionType() != null) {
+        try{
+          executionType = 
ExecutionType.valueOf(updateItemConf.getExecutionType());
+        } catch (IllegalArgumentException e) {
+          throw new YarnException("Failed to parse execution-type: " +
+                  updateItemConf.getExecutionType(), e);
+        }
+      }
+      // determine if placement constraint contains placeholders
+      // and parse it if static
+      if (updateItemConf.getPlacementConstraint() != null) {
+        // parse placement constraint
+        try {
+          PlacementConstraint.AbstractConstraint absConstraint =
+              PlacementConstraintParser.parseExpression(
+                  updateItemConf.getPlacementConstraint());
+          placementConstraint = new PlacementConstraint(absConstraint);
+        } catch (PlacementConstraintParseException e) {
+          throw new YarnException("Failed to parse placement-constraint: " +
+              updateItemConf.getPlacementConstraint(), e);
+        }
+        // mark hasPlaceholder flag for placement constraint
+        if (hasPlaceholder(updateItemConf.getPlacementConstraint())) {
+          hasPlaceholderForPC = true;
+        }
+      }
+      // mark hasPlaceholder flag for allocation tags
+      if (updateItemConf.getAllocationTags() != null) {
+        for (String tag : updateItemConf.getAllocationTags()) {
+          if (hasPlaceholder(tag)) {
+            hasPlaceholderForAllocTags = true;
+          }
+        }
+      }
+
+      // include updateItemConf
+      this.updateItemConf = updateItemConf;
+    }
+
+    public DynamicAppInfo getDynamicAppInfo(ApplicationAttemptId appAttemptId) 
{
+      try {
+        Optional<DynamicAppInfo> opt = dynamicAppInfoCache.get(appAttemptId);
+        if (opt.isPresent()) {
+          return opt.get();
+        }
+      } catch (Exception e) {
+        LOG.error("Failed to get dynamic app info for appId: {}",
+            appAttemptId.getApplicationId(), e);
+      }
+      return null;
+    }
+
+    /**
+     * Checks if an application matches this update item's criteria.
+     * Uses JavaScript evaluation of the appMatchExpr against application 
properties.
+     *
+     * @param appId the application ID
+     * @param appInfo map of application information
+     * @return true if the application matches, false otherwise
+     */
+    public boolean isAppMatch(ApplicationId appId,
+        Map<String, Object> appInfo) {
+      if (appMatchScript == null) {
+        return true;
+      }
+      try {
+        Bindings bindings = SCRIPT_ENGINE.createBindings();
+        bindings.putAll(appInfo);
+        Boolean isMatched = (Boolean) appMatchScript.eval(bindings);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(
+              "Check app: appId={}, isMatched={}, appInfo={}, appMatchExpr={}",
+              appId, isMatched, appInfo, updateItemConf.getAppMatchExpr());
+        }
+        return isMatched;
+      } catch (Exception e) {
+        LOG.error(
+            "Failed to evaluate app-match-expr: appId={}, appMatchExpr={}",
+            appId, updateItemConf.getAppMatchExpr(), e);
+        return false;
+      }
+    }
+
+    /**
+     * Checks if a request matches this update item's criteria.
+     * Uses JavaScript evaluation of the requestMatchExpr against request 
properties.
+     *
+     * @param appId the application ID
+     * @param infoSupplier supplier for request information
+     * @return true if the request matches, false otherwise
+     */
+    public boolean isRequestMatch(ApplicationId appId,
+        Supplier<Map<String, Object>> infoSupplier) {
+      if (requestMatchScript == null) {
+        return true;
+      }
+      Map<String, Object> info = infoSupplier.get();
+      try {
+        Bindings bindings = SCRIPT_ENGINE.createBindings();
+        bindings.putAll(info);
+        Boolean isMatched = (Boolean) requestMatchScript.eval(bindings);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(
+              "Check request: appId={}, isMatched={}, reqInfo={}, 
requestMatchExpr={}",
+              appId, isMatched, info, updateItemConf.getRequestMatchExpr());
+        }
+        return isMatched;
+      } catch (Exception e) {
+        LOG.error("Failed to evaluate request-filter-expression: {}",
+            updateItemConf.getRequestMatchExpr(), e);
+        return false;
+      }
+    }
+
+    /**
+     * Updates resource requests and/or scheduling requests based on this 
update
+     * item's configuration. May convert resource requests to scheduling
+     * requests if isRRToSR is configured. Applies updates to each matching
+     * request, including execution type, placement constraints, and
+     * allocation tags with variable substitution.
+     *
+     * @param appId the application ID
+     * @param resourceRequests list of resource requests to process
+     * @param schedulingRequests list of scheduling requests to process
+     * @param dynamicAppInfo dynamic application information
+     * @return response containing updated requests and update status
+     */
+    private RequestsHandleResponse updateRequests(
+        ApplicationId appId,
+        List<ResourceRequest> resourceRequests,
+        List<SchedulingRequest> schedulingRequests,
+        DynamicAppInfo dynamicAppInfo) {
+      boolean isUpdated = false;
+      // update resource requests
+      if (resourceRequests != null) {
+        for (ResourceRequest rr: resourceRequests) {
+          if (!isRequestMatch(appId,
+              () -> convertToRequestInfo(dynamicAppInfo, rr))) {
+            continue;
+          }
+          updateResourceRequest(appId, rr);
+          isUpdated = true;
+        }
+      }
+      // when both isUpdated and isRRToSR are true, convert to SR at first
+      if (resourceRequests != null && !resourceRequests.isEmpty() &&
+          isUpdated && updateItemConf.isRRToSR) {
+        schedulingRequests = resourceRequests.stream()
+            .map(UpdateItem::convertToSchedulingRequest)
+            .collect(Collectors.toList());
+        resourceRequests = null;
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Converted to scheduling requests: appId={}, sr={}",
+              appId, schedulingRequests);
+        }
+      }
+      // update scheduling requests
+      if (schedulingRequests != null) {
+        for (SchedulingRequest sr: schedulingRequests) {
+          if (!isRequestMatch(appId, () -> 
convertToRequestInfo(dynamicAppInfo, sr))) {
+            continue;
+          }
+          updateSchedulingRequest(appId, sr, dynamicAppInfo);
+          isUpdated = true;
+        }
+      }
+      return new RequestsHandleResponse(isUpdated, resourceRequests,
+          schedulingRequests);
+    }
+
+    /**
+     * Converts a ResourceRequest to a SchedulingRequest.
+     * Preserves allocation request ID, priority, resource sizing, and node 
label expression.
+     * Maps node label expressions to placement constraints.
+     *
+     * @param resourceRequest the resource request to convert
+     * @return a new scheduling request with equivalent properties
+     */
+    public static SchedulingRequest convertToSchedulingRequest(
+        ResourceRequest resourceRequest) {
+      if (resourceRequest == null) {
+        return SchedulingRequest.newBuilder().build();
+      }
+      // Compatible with Hadoop2.x
+      // whose default value of execution-type-request is null
+      ExecutionTypeRequest executionTypeRequest =
+          resourceRequest.getExecutionTypeRequest();
+      if (executionTypeRequest == null) {
+        executionTypeRequest = ExecutionTypeRequest.newInstance();
+      }
+      SchedulingRequest sr = SchedulingRequest.newBuilder()
+          .executionType(executionTypeRequest)
+          .allocationRequestId(resourceRequest.getAllocationRequestId())
+          .priority(resourceRequest.getPriority())
+          .resourceSizing(ResourceSizing.newInstance(
+              resourceRequest.getNumContainers(),
+              resourceRequest.getCapability())).build();
+      if (resourceRequest.getNodeLabelExpression() != null) {
+        PlacementConstraint constraint =
+            PlacementConstraints.targetNodeAttribute(NODE,
+                NodeAttributeOpCode.EQ,
+                PlacementConstraints.PlacementTargets.nodePartition(
+                    resourceRequest.getNodeLabelExpression())).build();
+        sr.setPlacementConstraint(constraint);
+      }
+      return sr;
+    }
+
+    /**
+     * Updates a ResourceRequest with configuration from this update item.
+     * Can modify node label expression (partition) and execution type.
+     *
+     * @param appId application ID for logging
+     * @param rr resource request to update
+     */
+    private void updateResourceRequest(ApplicationId appId,
+        ResourceRequest rr) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Before updating resource request, appId={}, RR={}, conf={}",
+            appId, rr, updateItemConf.toString());
+      }
+      if (updateItemConf.partition != null) {
+        rr.setNodeLabelExpression(updateItemConf.partition);
+      }
+      if (executionType != null) {
+        rr.setExecutionTypeRequest(
+            ExecutionTypeRequest.newInstance(executionType));
+      }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Done updating resource request, appId={}, RR={}", appId, 
rr);
+      }
+    }
+
+    /**
+     * Updates a SchedulingRequest with configuration from this update item.
+     * Can modify execution type, placement constraint and allocation tags.
+     *
+     * @param appId application ID for logging
+     * @param sr scheduling request to update
+     * @param dynamicAppInfo dynamic application information
+     */
+    private void updateSchedulingRequest(ApplicationId appId,
+        SchedulingRequest sr, DynamicAppInfo dynamicAppInfo) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(
+            "Before updating scheduling request, appId={}, SR={}, conf={}",
+            appId, sr, updateItemConf.toString());
+      }
+      if (executionType != null) {
+        sr.setExecutionType(ExecutionTypeRequest.newInstance(executionType));
+      }
+      if (dynamicAppInfo.dynamicPlacementConstraint != null) {
+        sr.setPlacementConstraint(dynamicAppInfo.dynamicPlacementConstraint);
+      }
+      if (dynamicAppInfo.dynamicAllocationTags != null) {
+        sr.setAllocationTags(dynamicAppInfo.dynamicAllocationTags);
+      }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Done updating scheduling request, appId={}, SR={}", appId,
+            sr);
+      }
+    }
+
+    private PlacementConstraint getRuntimePlacementConstraint(
+        Map<String, String> appInfo) {
+      if (placementConstraint == null) {
+        return null;
+      }
+      if (!hasPlaceholderForPC) {
+        // return static placement constraint
+        return placementConstraint;
+      }
+      try {
+        // for dynamic placement constraint
+        StringSubstitutor substitutor = new StringSubstitutor(appInfo);
+        String substitutedPCExpression =
+            substitutor.replace(updateItemConf.getPlacementConstraint());
+        PlacementConstraint.AbstractConstraint absConstraint =
+            PlacementConstraintParser.parseExpression(substitutedPCExpression);
+        return new PlacementConstraint(absConstraint);
+      } catch (PlacementConstraintParseException e) {
+        LOG.warn("Failed to apply variable substitution to placement 
constraint. " +
+            "Skip setting placement constraint.", e);
+      }
+      return null;
+    }
+
+    private Set<String> getRuntimeAllocationTags(Map<String, String> appInfo) {
+      if (updateItemConf.getAllocationTags() == null ||
+          updateItemConf.getAllocationTags().isEmpty()) {
+        return null;
+      }
+      if (!hasPlaceholderForAllocTags) {
+        // return static allocation tags
+        return updateItemConf.getAllocationTags();
+      }
+      // for dynamic allocation tags
+      StringSubstitutor substitutor = new StringSubstitutor(appInfo);
+      return updateItemConf.getAllocationTags().stream()
+          .map(substitutor::replace).collect(Collectors.toSet());
+    }
+
+    @VisibleForTesting
+    public UpdateItemConf getUpdateItemConf() {
+      return updateItemConf;
+    }
+
+    @VisibleForTesting
+    public boolean hasPlaceholderForPC() {
+      return hasPlaceholderForPC;
+    }
+
+    @VisibleForTesting
+    public boolean hasPlaceholderForAllocTags() {
+      return hasPlaceholderForAllocTags;
+    }
+
+    @VisibleForTesting
+    public CompiledScript getAppMatchScript() {
+      return appMatchScript;
+    }
+
+    @VisibleForTesting
+    public CompiledScript getRequestMatchScript() {
+      return requestMatchScript;
+    }
+
+    @VisibleForTesting
+    public PlacementConstraint getPlacementConstraint() {
+      return placementConstraint;
+    }
+
+    @VisibleForTesting
+    public ExecutionType getExecutionType() {
+      return executionType;
+    }
+  }
+
+  private static Map<String, String> convertToStringAppInfo(
+      Map<String, Object> appInfo) {
+    return appInfo.entrySet().stream().collect(
+        Collectors.toMap(Map.Entry::getKey,
+            e -> e.getValue() != null ? e.getValue().toString() : ""));
+  }
+
+  private static Map<String, Object> convertToAppInfo(FiCaSchedulerApp app,
+      RMApp rmApp) {
+    return ImmutableMap.of(APP_INFO_KEY_QUEUE, app.getQueueName(),
+        APP_INFO_KEY_USER, app.getUser(),
+        APP_INFO_KEY_PRIORITY, app.getPriority() == null ?
+            0 : app.getPriority().getPriority(),
+        APP_INFO_KEY_ID, rmApp.getApplicationId().toString(),
+        APP_INFO_KEY_NAME, rmApp.getName(),
+        APP_INFO_KEY_TYPE, rmApp.getApplicationType(),
+        APP_INFO_KEY_TAGS, rmApp.getApplicationTags());
+  }
+
+  private static Map<String, Object> convertToRequestInfo(
+      DynamicAppInfo dynamicAppInfo, ResourceRequest rr) {
+    return ImmutableMap.of(REQUEST_INFO_KEY_PRIORITY,
+        rr.getPriority() == null ? 0 : rr.getPriority().getPriority(),
+        REQUEST_INFO_KEY_RESOURCE_NAME, rr.getResourceName(),
+        REQUEST_INFO_KEY_RELAX_LOCALITY, rr.getRelaxLocality(),
+        REQUEST_INFO_KEY_IS_AM, dynamicAppInfo.app.isWaitingForAMContainer());
+  }
+
+  private static Map<String, Object> convertToRequestInfo(
+      DynamicAppInfo dynamicAppInfo, SchedulingRequest sr) {
+    return ImmutableMap.of(REQUEST_INFO_KEY_PRIORITY,
+        sr.getPriority() == null ? 0 : sr.getPriority().getPriority(),
+        REQUEST_INFO_KEY_EXECUTION_TYPE, sr.getExecutionType() == null ?
+            "" :
+            (sr.getExecutionType().getExecutionType() == null ?
+                "" :
+                sr.getExecutionType().getExecutionType().name()),
+        REQUEST_INFO_KEY_ALLOCATION_TAGS, sr.getAllocationTags(),
+        REQUEST_INFO_KEY_IS_AM, dynamicAppInfo.app.isWaitingForAMContainer());
+  }
+
+  public static boolean hasPlaceholder(String text) throws YarnException {
+    if (text == null) {
+      return false;
+    }
+    // find out all placeholder
+    Matcher matcher = PLACEHOLDER_PATTERN.matcher(text);
+    int placeholderCount = 0;
+    while (matcher.find()) {
+      String placeholder = matcher.group();
+      String placeholderKey = placeholder.substring(2, placeholder.length() - 
1);
+      if (!VALID_PLACEHOLDER_KEYS.contains(placeholderKey)) {
+        throw new YarnException("Invalid placeholder: " + placeholder);
+      }
+      placeholderCount++;
+    }
+    return placeholderCount > 0;
+  }
+
+  public static class DynamicAppInfo {
+    private final FiCaSchedulerApp app;
+    private final boolean isMatched;
+    // dynamic placement constraint
+    private final PlacementConstraint dynamicPlacementConstraint;
+    // dynamic allocation tags
+    private final Set<String> dynamicAllocationTags;
+
+    public DynamicAppInfo(FiCaSchedulerApp app, boolean isMatched,
+        PlacementConstraint dynamicPlacementConstraint,
+        Set<String> dynamicAllocationTags) {
+      this.app = app;
+      this.isMatched = isMatched;
+      this.dynamicPlacementConstraint = dynamicPlacementConstraint;
+      this.dynamicAllocationTags = dynamicAllocationTags;
+    }
+  }
+}
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestCapacitySchedulerMetrics.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestCapacitySchedulerMetrics.java
index 894e8fd2f5a..bc0ee8162c6 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestCapacitySchedulerMetrics.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestCapacitySchedulerMetrics.java
@@ -84,6 +84,7 @@ public RMNodeLabelsManager createNodeLabelManager() {
 
     assertEquals(0, csMetrics.getNumOfAllocates());
     assertEquals(0, csMetrics.getNumOfCommitSuccess());
+    assertEquals(0, csMetrics.getRequestsHandle().lastStat().numSamples());
 
     RMApp rmApp = MockRMAppSubmitter.submit(rm,
         MockRMAppSubmissionData.Builder.createWithMemory(1024, rm)
@@ -101,6 +102,9 @@ public RMNodeLabelsManager createNodeLabelManager() {
     am.registerAppAttempt();
     am.allocate("*", 1024, 1, new ArrayList<>());
 
+    assertTrue(
+        csMetrics.getRequestsHandle().lastStat().numSamples() > 0);
+
     nm1.nodeHeartbeat(true);
     nm2.nodeHeartbeat(true);
 
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerRequestsHandler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerRequestsHandler.java
new file mode 100644
index 00000000000..ca78905417f
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerRequestsHandler.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import java.util.ArrayList;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap;
+import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmissionData;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmitter;
+import 
org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
+import 
org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import 
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
+import org.junit.Test;
+
+import static 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.GB;
+import static 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.toSet;
+import static 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils.getConfigurationWithQueueLabels;
+
+public class TestCapacitySchedulerRequestsHandler {
+
+  /**
+   * Simple e2e verification for requests-handler.
+   * - requests-handler can be enabled dynamically via reinitializing scheduler
+   * - partition will be updated for the matched app and request
+   */
+  @Test
+  public void testRequestsHandlerSimpleCase() throws Exception {
+    Configuration conf = new Configuration(false);
+    conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true);
+    conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
+        YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER);
+    conf = getConfigurationWithQueueLabels(conf);
+
+    final RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
+    mgr.init(conf);
+    mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x", "y"));
+    mgr.addLabelsToNode(ImmutableMap.of(
+        NodeId.newInstance("h1", 0), toSet("x")));
+
+    MockRM rm = new MockRM(conf) {
+      protected RMNodeLabelsManager createNodeLabelManager() {
+        return mgr;
+      }
+    };
+    rm.start();
+
+    MockNM nm1 = // label = x
+        new MockNM("h1:1234", 200 * GB, rm.getResourceTrackerService());
+    nm1.registerNode();
+    MockNM nm2 = // label = ""
+        new MockNM("h2:1234", 200 * GB, rm.getResourceTrackerService());
+    nm2.registerNode();
+
+    // Launch app1 in queue=a1
+    MockRMAppSubmissionData data1 =
+        MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
+            .withAppName("app1").withUser("root").withAcls(null)
+            .withQueue("a1").withUnmanagedAM(false).build();
+    RMApp app1 = MockRMAppSubmitter.submit(rm, data1);
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2);
+
+    // am1 asks for a container with no label which will be allocated on nm2
+    am1.allocate("*", GB, 1, 10, new ArrayList<>(), "");
+    ContainerId containerId =
+        ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
+    rm.waitForState(nm2, containerId, RMContainerState.ALLOCATED);
+
+    // refresh conf with requests-handler enabled
+    // partition will be updated to 'x' for apps in a1 queue
+    conf.set(CapacitySchedulerConfiguration.REQUEST_HANDLER_ENABLED, "true");
+    conf.set(CapacitySchedulerConfiguration.REQUEST_HANDLER_UPDATES,
+        "{\"items\":[{\"appMatchExpr\":\"queue=='a1'\"," +
+            "\"requestMatchExpr\":\"priority>5\"," +
+            " \"partition\":\"x\"}]}");
+    rm.getResourceScheduler().reinitialize(conf, rm.getRMContext());
+
+    // am1 asks for another container with no label
+    // request matched, partition will be updated to 'x' for this request
+    // so that it will be allocated on nm1
+    am1.allocate("*", GB, 1, 10, new ArrayList<>(), "");
+    containerId = ContainerId.newContainerId(am1.getApplicationAttemptId(), 3);
+    rm.waitForState(nm1, containerId, RMContainerState.ALLOCATED);
+
+    // am1 asks for another container with no label
+    // request not matched, partition won't be updated to 'x' for this request
+    // so that it will be allocated on nm2
+    am1.allocate("*", GB, 1, new ArrayList<>(), "");
+    containerId = ContainerId.newContainerId(am1.getApplicationAttemptId(), 4);
+    rm.waitForState(nm2, containerId, RMContainerState.ALLOCATED);
+
+    rm.stop();
+  }
+}
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestRequestsHandler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestRequestsHandler.java
new file mode 100644
index 00000000000..bdd6328a271
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestRequestsHandler.java
@@ -0,0 +1,322 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Function;
+
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet;
+import org.apache.hadoop.util.Lists;
+import org.apache.hadoop.util.Sets;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.SchedulingRequest;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+public class TestRequestsHandler {
+
+  private Configuration conf;
+  private RMContext rmContext;
+  private RequestsHandler requestsHandler;
+
+  @Before
+  public void setUp() {
+    rmContext = mock(RMContext.class);
+    CapacityScheduler scheduler = mock(CapacityScheduler.class);
+    when(rmContext.getScheduler()).thenReturn(scheduler);
+    ConcurrentHashMap<ApplicationId, RMApp> rmApps = new ConcurrentHashMap<>();
+    when(rmContext.getRMApps()).thenReturn(rmApps);
+
+    Function<ApplicationAttemptId, Pair<FiCaSchedulerApp, RMApp>> appProvider =
+        appAttemptId -> {
+          FiCaSchedulerApp app = scheduler.getApplicationAttempt(appAttemptId);
+          RMApp rmApp =
+              rmContext.getRMApps().get(appAttemptId.getApplicationId());
+          if (app == null || rmApp == null) {
+            return null;
+          }
+          return ImmutablePair.of(app, rmApp);
+        };
+    requestsHandler = new RequestsHandler(appProvider);
+    conf = new Configuration();
+    LogManager.getLogger(RequestsHandler.LOG.getName()).setLevel(Level.DEBUG);
+  }
+
+  @Test
+  public void testInitialize() throws IOException, YarnException {
+    // invalid conf
+    conf.setBoolean(CapacitySchedulerConfiguration.REQUEST_HANDLER_ENABLED,
+        true);
+    conf.set(CapacitySchedulerConfiguration.REQUEST_HANDLER_UPDATES, "{");
+    assertThrows(IOException.class, () -> requestsHandler.initialize(conf));
+
+    // invalid allocation tags
+    conf.setBoolean(CapacitySchedulerConfiguration.REQUEST_HANDLER_ENABLED,
+        true);
+    conf.set(CapacitySchedulerConfiguration.REQUEST_HANDLER_UPDATES,
+        "{\"items\":[{\"allocationTags\":\"xxx\"}]}");
+    assertThrows(IOException.class, () -> requestsHandler.initialize(conf));
+
+    // invalid placement constraint
+    conf.setBoolean(CapacitySchedulerConfiguration.REQUEST_HANDLER_ENABLED,
+        true);
+    conf.set(CapacitySchedulerConfiguration.REQUEST_HANDLER_UPDATES,
+        "{\"items\":[{\"placementConstraint\":\"and\"}]}");
+    assertThrows(YarnException.class, () -> requestsHandler.initialize(conf));
+    try {
+      requestsHandler.initialize(conf);
+    } catch (YarnException e) {
+      assertTrue(
+          e.getMessage().contains("Failed to parse placement-constraint"));
+    }
+
+    // invalid placeholder for allocation tags
+    conf.setBoolean(CapacitySchedulerConfiguration.REQUEST_HANDLER_ENABLED,
+        true);
+    conf.set(CapacitySchedulerConfiguration.REQUEST_HANDLER_UPDATES,
+        "{\"items\":[{\"allocationTags\":[\"xxx\",\"${invalid}\"]}]}");
+    assertThrows(YarnException.class, () -> requestsHandler.initialize(conf));
+    try {
+      requestsHandler.initialize(conf);
+    } catch (YarnException e) {
+      assertTrue(e.getMessage().contains("Invalid placeholder"));
+    }
+
+    // invalid placeholder for placement constraints
+    conf.setBoolean(CapacitySchedulerConfiguration.REQUEST_HANDLER_ENABLED,
+        true);
+    conf.set(CapacitySchedulerConfiguration.REQUEST_HANDLER_UPDATES,
+        "{\"items\":[{\"placementConstraint\":" +
+            "\"and(in,rack,${name}:notin,node,${invalid})\"}]}");
+    assertThrows(YarnException.class, () -> requestsHandler.initialize(conf));
+    try {
+      requestsHandler.initialize(conf);
+    } catch (YarnException e) {
+      assertTrue(e.getMessage().contains("Invalid placeholder"));
+    }
+
+    // disabled
+    conf.setBoolean(CapacitySchedulerConfiguration.REQUEST_HANDLER_ENABLED,
+        false);
+    conf.set(CapacitySchedulerConfiguration.REQUEST_HANDLER_UPDATES,
+        "{\"items\":[]}");
+    requestsHandler.initialize(conf);
+    assertFalse(requestsHandler.isEnabled());
+    assertNull(requestsHandler.getUpdateItems());
+
+    // enabled without items
+    conf.setBoolean(CapacitySchedulerConfiguration.REQUEST_HANDLER_ENABLED,
+        true);
+    conf.unset(CapacitySchedulerConfiguration.REQUEST_HANDLER_UPDATES);
+    requestsHandler.initialize(conf);
+    assertTrue(requestsHandler.isEnabled());
+    assertNull(requestsHandler.getUpdateItems());
+
+    // enabled with 1 item
+    conf.setBoolean(CapacitySchedulerConfiguration.REQUEST_HANDLER_ENABLED,
+        true);
+    conf.set(CapacitySchedulerConfiguration.REQUEST_HANDLER_UPDATES,
+        "{\"items\":[{\"appMatchExpr\":\"queue=='test'\", " +
+            "\"requestMatchExpr\":\"priority>10\", \"isRRToSR\":true," +
+            " 
\"placementConstraint\":\"and(in,rack,tag_${id}:notin,node,zk)\"," +
+            " \"executionType\":\"OPPORTUNISTIC\"," +
+            " \"allocationTags\":[\"tag1\", \"tag_${id}\"]}]}");
+    requestsHandler.initialize(conf);
+    assertTrue(requestsHandler.isEnabled());
+    assertNotNull(requestsHandler.getUpdateItems());
+    List<RequestsHandler.UpdateItem> items =
+        requestsHandler.getUpdateItems();
+    assertEquals(1, items.size());
+
+    RequestsHandler.UpdateItem item = items.get(0);
+    assertEquals("and(in,rack,tag_${id}:notin,node,zk)",
+        item.getPlacementConstraint().toString());
+    assertTrue(item.hasPlaceholderForPC());
+    assertTrue(item.hasPlaceholderForAllocTags());
+    assertEquals(ExecutionType.OPPORTUNISTIC, item.getExecutionType());
+    assertNotNull(item.getAppMatchScript());
+    assertNotNull(item.getRequestMatchScript());
+
+    RequestsHandler.UpdateItemConf itemConf = items.get(0).getUpdateItemConf();
+    assertEquals("queue=='test'", itemConf.getAppMatchExpr());
+    assertEquals("priority>10", itemConf.getRequestMatchExpr());
+    assertEquals("and(in,rack,tag_${id}:notin,node,zk)",
+        itemConf.getPlacementConstraint());
+    assertEquals("OPPORTUNISTIC", itemConf.getExecutionType());
+    assertEquals(2, itemConf.getAllocationTags().size());
+    assertTrue(itemConf.getAllocationTags().contains("tag1"));
+    assertTrue(itemConf.getAllocationTags().contains("tag_${id}"));
+
+    // turned to disabled
+    conf.setBoolean(CapacitySchedulerConfiguration.REQUEST_HANDLER_ENABLED,
+        false);
+    requestsHandler.initialize(conf);
+    assertFalse(requestsHandler.isEnabled());
+    assertNull(requestsHandler.getUpdateItems());
+
+    // turned to enabled
+    conf.setBoolean(CapacitySchedulerConfiguration.REQUEST_HANDLER_ENABLED,
+        true);
+    requestsHandler.initialize(conf);
+    assertTrue(requestsHandler.isEnabled());
+    assertNotNull(requestsHandler.getUpdateItems());
+    assertEquals(1, requestsHandler.getUpdateItems().size());
+  }
+
+  private FiCaSchedulerApp mockApp(int id, int priority,
+      String queueName, String user, String appName, String appType,
+      List<String> tags) {
+    ApplicationId appId = ApplicationId.newInstance(1, id);
+    ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
+        appId, 1);
+    FiCaSchedulerApp app = mock(FiCaSchedulerApp.class);
+    when(app.getApplicationAttemptId()).thenReturn(appAttemptId);
+    when(app.getApplicationId()).thenReturn(appId);
+    when(app.isWaitingForAMContainer()).thenReturn(true);
+    when(app.getQueueName()).thenReturn(queueName);
+    when(app.getUser()).thenReturn(user);
+    when(app.getPriority()).thenReturn(Priority.newInstance(priority));
+    CapacityScheduler scheduler = (CapacityScheduler) rmContext.getScheduler();
+    when(scheduler.getApplicationAttempt(
+        app.getApplicationAttemptId())).thenReturn(app);
+
+    RMApp rmApp = mock(RMApp.class);
+    when(rmApp.getApplicationId()).thenReturn(appId);
+    when(rmApp.getName()).thenReturn(appName);
+    when(rmApp.getApplicationType()).thenReturn(appType);
+    when(rmApp.getApplicationTags())
+        .thenReturn(ImmutableSet.copyOf(tags));
+
+    ConcurrentMap<ApplicationId, RMApp> rmApps =
+        rmContext.getRMApps();
+    rmApps.put(appId, rmApp);
+    return app;
+  }
+
+  @Test
+  public void testHandleRequests() throws IOException, YarnException {
+    // mock apps and request
+    FiCaSchedulerApp app1 = mockApp(1, 0, "test1", "user1", "app1",
+        "MapReduce", Lists.newArrayList("tag1", "tag2"));
+    FiCaSchedulerApp app2 = mockApp(2, 1, "test", "user2", "app2",
+        "MapReduce", Lists.newArrayList("tag1", "tag2"));
+
+    ResourceRequest rr1 = ResourceRequest.newInstance(Priority.newInstance(3),
+        "*", Resource.newInstance(4096, 2), 1, true);
+    
rr1.setExecutionTypeRequest(ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC));
+    rr1.setNodeLabelExpression("x");
+
+    /*
+     * check choosing by app, converting RR to SR
+     */
+    conf.setBoolean(CapacitySchedulerConfiguration.REQUEST_HANDLER_ENABLED,
+        true);
+    conf.set(CapacitySchedulerConfiguration.REQUEST_HANDLER_UPDATES,
+        "{\"items\":[{\"appMatchExpr\":\"queue=='test'\", 
\"isRRToSR\":true}]}");
+    requestsHandler.initialize(conf);
+
+    // app1 not-matched, won't convert to scheduling request
+    RequestsHandleResponse response =
+        requestsHandler.handle(app1.getApplicationAttemptId(),
+            Lists.newArrayList(rr1), null);
+    assertFalse(response.isUpdated());
+    assertEquals(1, response.getResourceRequests().size());
+    assertNull(response.getSchedulingRequests());
+
+    // app2 matched, will be converted to scheduling request
+    response = requestsHandler.handle(app2.getApplicationAttemptId(),
+        Lists.newArrayList(rr1), null);
+    assertTrue(response.isUpdated());
+    assertEquals(1, response.getSchedulingRequests().size());
+    assertEquals(0, response.getResourceRequests().size());
+    SchedulingRequest gotSR1 = response.getSchedulingRequests().get(0);
+    assertEquals(rr1.getPriority(), gotSR1.getPriority());
+    assertEquals(rr1.getCapability(), 
gotSR1.getResourceSizing().getResources());
+    assertEquals(rr1.getNumContainers(), 
gotSR1.getResourceSizing().getNumAllocations());
+    assertEquals(rr1.getExecutionTypeRequest(), gotSR1.getExecutionType());
+    assertEquals("node,EQ,yarn_node_partition/=[x]", 
gotSR1.getPlacementConstraint().toString());
+
+    /*
+     * check choosing by app and request, converting RR to SR,
+     * then updating priority, execution-type, and allocation-tags
+     */
+    conf.set(CapacitySchedulerConfiguration.REQUEST_HANDLER_UPDATES,
+        "{\"items\":[{\"appMatchExpr\":\"queue=='test'\", " +
+            "\"requestMatchExpr\":\"priority>10\", \"isRRToSR\":true," +
+            " \"placementConstraint\":\"and(in,rack,${id}:notin,node,zk)\"," +
+            " \"executionType\":\"OPPORTUNISTIC\"," +
+            " \"allocationTags\":[\"tag1\", \"${id}\"]}]}");
+    requestsHandler.initialize(conf);
+
+    ResourceRequest rr2 =
+        ResourceRequest.newInstance(Priority.newInstance(20), "*",
+            Resource.newInstance(1024, 1), 5, true);
+    response = requestsHandler.handle(app2.getApplicationAttemptId(),
+        Lists.newArrayList(rr1, rr2), null);
+    assertTrue(response.isUpdated());
+    assertEquals(2, response.getSchedulingRequests().size());
+
+    // both rr1 and rr2 should be converted to scheduling requests
+    // rr1 not matched
+    gotSR1 = response.getSchedulingRequests().get(0);
+    assertEquals(rr1.getPriority(), gotSR1.getPriority());
+    assertEquals(rr1.getCapability(),
+        gotSR1.getResourceSizing().getResources());
+    assertEquals(rr1.getNumContainers(),
+        gotSR1.getResourceSizing().getNumAllocations());
+    assertEquals(rr1.getExecutionTypeRequest(), gotSR1.getExecutionType());
+    assertEquals("node,EQ,yarn_node_partition/=[x]",
+        gotSR1.getPlacementConstraint().toString());
+
+    // rr2 matched, should be updated
+    SchedulingRequest gotSR2 = response.getSchedulingRequests().get(1);
+    assertEquals(rr2.getCapability(),
+        gotSR2.getResourceSizing().getResources());
+    assertEquals(rr2.getNumContainers(),
+        gotSR2.getResourceSizing().getNumAllocations());
+    assertEquals(ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC),
+        gotSR2.getExecutionType());
+    assertEquals(Sets.newHashSet("tag1", app2.getApplicationId().toString()),
+        gotSR2.getAllocationTags());
+    assertEquals(String.format("and(in,rack,%s:notin,node,zk)",
+            app2.getApplicationId().toString()),
+        gotSR2.getPlacementConstraint().toString());
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to