This is an automated email from the ASF dual-hosted git repository.

inigoiri pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 17035da46ef YARN-11226. [Federation] Add createNewReservation, 
submitReservation, updateReservation, deleteReservation REST APIs for Router.  
(#5175)
17035da46ef is described below

commit 17035da46efa945f7eeff3e7f253a7031c6bcdd0
Author: slfan1989 <55643692+slfan1...@users.noreply.github.com>
AuthorDate: Fri Dec 23 03:25:09 2022 +0800

    YARN-11226. [Federation] Add createNewReservation, submitReservation, 
updateReservation, deleteReservation REST APIs for Router.  (#5175)
---
 .../utils/FederationStateStoreFacade.java          |  89 ++++++++
 .../yarn/server/router/RouterServerUtil.java       | 110 ++++++++-
 .../webapp/AbstractRESTRequestInterceptor.java     |  34 +--
 .../router/webapp/FederationInterceptorREST.java   | 238 +++++++++++++++++++-
 .../yarn/server/router/TestRouterServerUtil.java   | 125 +++++++++++
 .../router/webapp/BaseRouterWebServicesTest.java   |  10 +-
 .../webapp/MockDefaultRequestInterceptorREST.java  | 249 +++++++++++++++++----
 .../webapp/TestFederationInterceptorREST.java      | 226 ++++++++++++++++++-
 .../webapp/TestableFederationInterceptorREST.java  |  68 +++++-
 9 files changed, 1052 insertions(+), 97 deletions(-)

diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
index e1ebce82892..8c36fba1f1e 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
@@ -1062,4 +1062,93 @@ public final class FederationStateStoreFacade {
       updateApplicationHomeSubCluster(subClusterId, applicationId, 
appHomeSubCluster);
     }
   }
+
+  /**
+   * Exists ReservationHomeSubCluster Mapping.
+   *
+   * @param reservationId reservationId
+   * @return true - exist, false - not exist
+   */
+  public boolean existsReservationHomeSubCluster(ReservationId reservationId) {
+    try {
+      SubClusterId subClusterId = getReservationHomeSubCluster(reservationId);
+      if (subClusterId != null) {
+        return true;
+      }
+    } catch (YarnException e) {
+      LOG.warn("get homeSubCluster by reservationId = {} error.", 
reservationId, e);
+    }
+    return false;
+  }
+
+  /**
+   * Save Reservation And HomeSubCluster Mapping.
+   *
+   * @param reservationId reservationId
+   * @param homeSubCluster homeSubCluster
+   * @throws YarnException on failure
+   */
+  public void addReservationHomeSubCluster(ReservationId reservationId,
+      ReservationHomeSubCluster homeSubCluster) throws YarnException {
+    try {
+      // persist the mapping of reservationId and the subClusterId which has
+      // been selected as its home
+      addReservationHomeSubCluster(homeSubCluster);
+    } catch (YarnException e) {
+      String msg = String.format(
+          "Unable to insert the ReservationId %s into the 
FederationStateStore.", reservationId);
+      throw new YarnException(msg, e);
+    }
+  }
+
+  /**
+   * Update Reservation And HomeSubCluster Mapping.
+   *
+   * @param subClusterId subClusterId
+   * @param reservationId reservationId
+   * @param homeSubCluster homeSubCluster
+   * @throws YarnException on failure
+   */
+  public void updateReservationHomeSubCluster(SubClusterId subClusterId,
+      ReservationId reservationId, ReservationHomeSubCluster homeSubCluster) 
throws YarnException {
+    try {
+      // update the mapping of reservationId and the home subClusterId to
+      // the new subClusterId we have selected
+      updateReservationHomeSubCluster(homeSubCluster);
+    } catch (YarnException e) {
+      SubClusterId subClusterIdInStateStore = 
getReservationHomeSubCluster(reservationId);
+      if (subClusterId == subClusterIdInStateStore) {
+        LOG.info("Reservation {} already submitted on SubCluster {}.", 
reservationId, subClusterId);
+      } else {
+        String msg = String.format(
+            "Unable to update the ReservationId %s into the 
FederationStateStore.", reservationId);
+        throw new YarnException(msg, e);
+      }
+    }
+  }
+
+  /**
+   * Add or Update ReservationHomeSubCluster.
+   *
+   * @param reservationId reservationId.
+   * @param subClusterId homeSubClusterId, this is selected by strategy.
+   * @param retryCount number of retries.
+   * @throws YarnException yarn exception.
+   */
+  public void addOrUpdateReservationHomeSubCluster(ReservationId reservationId,
+      SubClusterId subClusterId, int retryCount) throws YarnException {
+    Boolean exists = existsReservationHomeSubCluster(reservationId);
+    ReservationHomeSubCluster reservationHomeSubCluster =
+        ReservationHomeSubCluster.newInstance(reservationId, subClusterId);
+    if (!exists || retryCount == 0) {
+      // persist the mapping of reservationId and the subClusterId which has
+      // been selected as its home.
+      addReservationHomeSubCluster(reservationId, reservationHomeSubCluster);
+    } else {
+      // update the mapping of reservationId and the home subClusterId to
+      // the new subClusterId we have selected.
+      updateReservationHomeSubCluster(subClusterId, reservationId,
+          reservationHomeSubCluster);
+    }
+  }
 }
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java
index 8c880f25ddb..8fa6ca2f055 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java
@@ -27,6 +27,16 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ReservationRequests;
+import 
org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDefinitionInfo;
+import 
org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationRequestsInfo;
+import 
org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationRequestInfo;
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
@@ -57,6 +67,8 @@ public final class RouterServerUtil {
 
   private static final String EPOCH_PREFIX = "e";
 
+  private static final String RESERVEIDSTR_PREFIX = "reservation_";
+
   /** Disable constructor. */
   private RouterServerUtil() {
   }
@@ -494,6 +506,15 @@ public final class RouterServerUtil {
         ? token.decodeIdentifier().getRenewer().toString() : 
user.getShortUserName();
   }
 
+  /**
+   * Set User information.
+   *
+   * If the username is empty, we will use the Yarn Router user directly.
+   * Do not create a proxy user if userName matches the userName on current 
UGI.
+   *
+   * @param userName userName.
+   * @return UserGroupInformation.
+   */
   public static UserGroupInformation setupUser(final String userName) {
     UserGroupInformation user = null;
     try {
@@ -513,7 +534,94 @@ public final class RouterServerUtil {
       return user;
     } catch (IOException e) {
       throw RouterServerUtil.logAndReturnYarnRunTimeException(e,
-          "Error while creating Router RMAdmin Service for user : %s.", user);
+          "Error while creating Router Service for user : %s.", user);
+    }
+  }
+
+  /**
+   * Check reservationId is accurate.
+   *
+   * We need to ensure that reservationId cannot be empty and
+   * can be converted to ReservationId object normally.
+   *
+   * @param reservationId reservationId.
+   * @throws IllegalArgumentException If the format of the reservationId is 
not accurate,
+   * an IllegalArgumentException needs to be thrown.
+   */
+  @Public
+  @Unstable
+  public static void validateReservationId(String reservationId) throws 
IllegalArgumentException {
+
+    if (reservationId == null || reservationId.isEmpty()) {
+      throw new IllegalArgumentException("Parameter error, the reservationId 
is empty or null.");
+    }
+
+    if (!reservationId.startsWith(RESERVEIDSTR_PREFIX)) {
+      throw new IllegalArgumentException("Invalid ReservationId: " + 
reservationId);
+    }
+
+    String[] resFields = reservationId.split("_");
+    if (resFields.length != 3) {
+      throw new IllegalArgumentException("Invalid ReservationId: " + 
reservationId);
+    }
+
+    String clusterTimestamp = resFields[1];
+    String id = resFields[2];
+    if (!NumberUtils.isDigits(id) || !NumberUtils.isDigits(clusterTimestamp)) {
+      throw new IllegalArgumentException("Invalid ReservationId: " + 
reservationId);
+    }
+  }
+
+  /**
+   * Convert ReservationDefinitionInfo to ReservationDefinition.
+   *
+   * @param definitionInfo ReservationDefinitionInfo Object.
+   * @return ReservationDefinition.
+   */
+  public static ReservationDefinition convertReservationDefinition(
+      ReservationDefinitionInfo definitionInfo) {
+    if (definitionInfo == null || definitionInfo.getReservationRequests() == 
null
+        || definitionInfo.getReservationRequests().getReservationRequest() == 
null
+        || 
definitionInfo.getReservationRequests().getReservationRequest().isEmpty()) {
+      throw new RuntimeException("definitionInfo Or ReservationRequests is 
Null.");
     }
+
+    // basic variable
+    long arrival = definitionInfo.getArrival();
+    long deadline = definitionInfo.getDeadline();
+
+    // ReservationRequests reservationRequests
+    String name = definitionInfo.getReservationName();
+    String recurrenceExpression = definitionInfo.getRecurrenceExpression();
+    Priority priority = Priority.newInstance(definitionInfo.getPriority());
+
+    // reservation requests info
+    List<ReservationRequest> reservationRequestList = new ArrayList<>();
+
+    ReservationRequestsInfo reservationRequestsInfo = 
definitionInfo.getReservationRequests();
+
+    List<ReservationRequestInfo> reservationRequestInfos =
+        reservationRequestsInfo.getReservationRequest();
+
+    for (ReservationRequestInfo resRequestInfo : reservationRequestInfos) {
+      ResourceInfo resourceInfo = resRequestInfo.getCapability();
+      Resource capability =
+          Resource.newInstance(resourceInfo.getMemorySize(), 
resourceInfo.getvCores());
+      ReservationRequest reservationRequest = 
ReservationRequest.newInstance(capability,
+          resRequestInfo.getNumContainers(), 
resRequestInfo.getMinConcurrency(),
+          resRequestInfo.getDuration());
+      reservationRequestList.add(reservationRequest);
+    }
+
+    ReservationRequestInterpreter[] values = 
ReservationRequestInterpreter.values();
+    ReservationRequestInterpreter reservationRequestInterpreter =
+        values[reservationRequestsInfo.getReservationRequestsInterpreter()];
+    ReservationRequests reservationRequests = ReservationRequests.newInstance(
+        reservationRequestList, reservationRequestInterpreter);
+
+    ReservationDefinition definition = ReservationDefinition.newInstance(
+        arrival, deadline, reservationRequests, name, recurrenceExpression, 
priority);
+
+    return definition;
   }
 }
\ No newline at end of file
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/AbstractRESTRequestInterceptor.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/AbstractRESTRequestInterceptor.java
index 66b2495e2c9..baf931ac46e 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/AbstractRESTRequestInterceptor.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/AbstractRESTRequestInterceptor.java
@@ -20,9 +20,7 @@ package org.apache.hadoop.yarn.server.router.webapp;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
-
-import java.io.IOException;
+import org.apache.hadoop.yarn.server.router.RouterServerUtil;
 
 /**
  * Extends the RequestInterceptor class and provides common functionality which
@@ -68,7 +66,7 @@ public abstract class AbstractRESTRequestInterceptor
    */
   @Override
   public void init(String userName) {
-    setupUser(userName);
+    this.user = RouterServerUtil.setupUser(userName);
     if (this.nextInterceptor != null) {
       this.nextInterceptor.init(userName);
     }
@@ -92,34 +90,6 @@ public abstract class AbstractRESTRequestInterceptor
     return this.nextInterceptor;
   }
 
-  /**
-   * Set User information.
-   *
-   * If the username is empty, we will use the Yarn Router user directly.
-   * Do not create a proxy user if user name matches the user name on current 
UGI.
-   * @param userName userName.
-   */
-  private void setupUser(final String userName) {
-    try {
-      if (userName == null || userName.isEmpty()) {
-        user = UserGroupInformation.getCurrentUser();
-      } else if (UserGroupInformation.isSecurityEnabled()) {
-        user = UserGroupInformation.createProxyUser(userName, 
UserGroupInformation.getLoginUser());
-      } else if 
(userName.equalsIgnoreCase(UserGroupInformation.getCurrentUser().getUserName()))
 {
-        user = UserGroupInformation.getCurrentUser();
-      } else {
-        user = UserGroupInformation.createProxyUser(userName,
-            UserGroupInformation.getCurrentUser());
-      }
-    } catch (IOException e) {
-      String message = "Error while creating Router RMAdmin Service for user:";
-      if (user != null) {
-        message += ", user: " + user;
-      }
-      throw new YarnRuntimeException(message, e);
-    }
-  }
-
   public UserGroupInformation getUser() {
     return user;
   }
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java
index 61edfb363d0..93e7a16cd91 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java
@@ -51,11 +51,13 @@ import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.Sets;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.util.concurrent.HadoopExecutors;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeLabel;
 import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@@ -101,6 +103,7 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceOptionIn
 import 
org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.BulkActivitiesInfo;
 import 
org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelInfo;
+import 
org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDefinitionInfo;
 import org.apache.hadoop.yarn.server.router.RouterMetrics;
 import org.apache.hadoop.yarn.server.router.RouterServerUtil;
 import org.apache.hadoop.yarn.server.router.clientrm.ClientMethod;
@@ -1588,28 +1591,239 @@ public class FederationInterceptorREST extends 
AbstractRESTRequestInterceptor {
   @Override
   public Response createNewReservation(HttpServletRequest hsr)
       throws AuthorizationException, IOException, InterruptedException {
-    throw new NotImplementedException("Code is not implemented");
+    long startTime = clock.getTime();
+    try {
+      Map<SubClusterId, SubClusterInfo> subClustersActive =
+          federationFacade.getSubClusters(true);
+      // We declare blackList and retries.
+      List<SubClusterId> blackList = new ArrayList<>();
+      int actualRetryNums = federationFacade.getRetryNumbers(numSubmitRetries);
+      Response response = ((FederationActionRetry<Response>) (retryCount) ->
+          invokeCreateNewReservation(subClustersActive, blackList, hsr, 
retryCount)).
+          runWithRetries(actualRetryNums, submitIntervalTime);
+      // If the response is not empty and the status is SC_OK,
+      // this request can be returned directly.
+      if (response != null && response.getStatus() == 
HttpServletResponse.SC_OK) {
+        long stopTime = clock.getTime();
+        routerMetrics.succeededGetNewReservationRetrieved(stopTime - 
startTime);
+        return response;
+      }
+    } catch (FederationPolicyException e) {
+      // If a FederationPolicyException is thrown, the service is unavailable.
+      routerMetrics.incrGetNewReservationFailedRetrieved();
+      return 
Response.status(Status.SERVICE_UNAVAILABLE).entity(e.getLocalizedMessage()).build();
+    } catch (Exception e) {
+      routerMetrics.incrGetNewReservationFailedRetrieved();
+      return 
Response.status(Status.INTERNAL_SERVER_ERROR).entity(e.getLocalizedMessage()).build();
+    }
+
+    // return error message directly.
+    String errMsg = "Fail to create a new reservation.";
+    LOG.error(errMsg);
+    routerMetrics.incrGetNewReservationFailedRetrieved();
+    return 
Response.status(Status.INTERNAL_SERVER_ERROR).entity(errMsg).build();
+  }
+
+  private Response invokeCreateNewReservation(Map<SubClusterId, 
SubClusterInfo> subClustersActive,
+      List<SubClusterId> blackList, HttpServletRequest hsr, int retryCount)
+      throws YarnException, IOException, InterruptedException {
+    SubClusterId subClusterId =
+        federationFacade.getRandomActiveSubCluster(subClustersActive, 
blackList);
+    LOG.info("createNewReservation try #{} on SubCluster {}.", retryCount, 
subClusterId);
+    SubClusterInfo subClusterInfo = subClustersActive.get(subClusterId);
+    DefaultRequestInterceptorREST interceptor = 
getOrCreateInterceptorForSubCluster(
+        subClusterId, subClusterInfo.getRMWebServiceAddress());
+    try {
+      Response response = interceptor.createNewReservation(hsr);
+      if (response != null && response.getStatus() == 
HttpServletResponse.SC_OK) {
+        return response;
+      }
+    } catch (Exception e) {
+      blackList.add(subClusterId);
+      RouterServerUtil.logAndThrowException(e.getMessage(), e);
+    }
+    // We need to throw the exception directly.
+    String msg = String.format("Unable to create a new ReservationId in 
SubCluster %s.",
+        subClusterId.getId());
+    throw new YarnException(msg);
   }
 
   @Override
   public Response submitReservation(ReservationSubmissionRequestInfo 
resContext,
-      HttpServletRequest hsr)
-      throws AuthorizationException, IOException, InterruptedException {
-    throw new NotImplementedException("Code is not implemented");
+      HttpServletRequest hsr) throws AuthorizationException, IOException, 
InterruptedException {
+    long startTime = clock.getTime();
+    if (resContext == null || resContext.getReservationId() == null
+        || resContext.getReservationDefinition() == null || 
resContext.getQueue() == null) {
+      routerMetrics.incrSubmitReservationFailedRetrieved();
+      String errMsg = "Missing submitReservation resContext or reservationId " 
+
+          "or reservation definition or queue.";
+      return Response.status(Status.BAD_REQUEST).entity(errMsg).build();
+    }
+
+    // Check that the resId format is accurate
+    String resId = resContext.getReservationId();
+    try {
+      RouterServerUtil.validateReservationId(resId);
+    } catch (IllegalArgumentException e) {
+      routerMetrics.incrSubmitReservationFailedRetrieved();
+      throw e;
+    }
+
+    List<SubClusterId> blackList = new ArrayList<>();
+    try {
+      int activeSubClustersCount = 
federationFacade.getActiveSubClustersCount();
+      int actualRetryNums = Math.min(activeSubClustersCount, numSubmitRetries);
+      Response response = ((FederationActionRetry<Response>) (retryCount) ->
+          invokeSubmitReservation(resContext, blackList, hsr, retryCount)).
+          runWithRetries(actualRetryNums, submitIntervalTime);
+      if (response != null) {
+        long stopTime = clock.getTime();
+        routerMetrics.succeededSubmitReservationRetrieved(stopTime - 
startTime);
+        return response;
+      }
+    } catch (Exception e) {
+      routerMetrics.incrSubmitReservationFailedRetrieved();
+      return 
Response.status(Status.SERVICE_UNAVAILABLE).entity(e.getLocalizedMessage()).build();
+    }
+
+    routerMetrics.incrSubmitReservationFailedRetrieved();
+    String msg = String.format("Reservation %s failed to be submitted.", 
resId);
+    return Response.status(Status.SERVICE_UNAVAILABLE).entity(msg).build();
+  }
+
+  private Response invokeSubmitReservation(ReservationSubmissionRequestInfo 
requestContext,
+      List<SubClusterId> blackList, HttpServletRequest hsr, int retryCount)
+      throws YarnException, IOException, InterruptedException {
+    String resId = requestContext.getReservationId();
+    ReservationId reservationId = ReservationId.parseReservationId(resId);
+    ReservationDefinitionInfo definitionInfo = 
requestContext.getReservationDefinition();
+    ReservationDefinition definition =
+         RouterServerUtil.convertReservationDefinition(definitionInfo);
+
+    // First, Get SubClusterId according to specific strategy.
+    ReservationSubmissionRequest request = 
ReservationSubmissionRequest.newInstance(
+        definition, requestContext.getQueue(), reservationId);
+    SubClusterId subClusterId = null;
+
+    try {
+      // Get subClusterId from policy.
+      subClusterId = policyFacade.getReservationHomeSubCluster(request);
+
+      // Print the log of submitting the submitApplication.
+      LOG.info("submitReservation ReservationId {} try #{} on SubCluster {}.", 
reservationId,
+          retryCount, subClusterId);
+
+      // Step2. We Store the mapping relationship
+      // between Application and HomeSubCluster in stateStore.
+      federationFacade.addOrUpdateReservationHomeSubCluster(reservationId,
+          subClusterId, retryCount);
+
+      // Step3. We get subClusterInfo based on subClusterId.
+      SubClusterInfo subClusterInfo = 
federationFacade.getSubCluster(subClusterId);
+
+      DefaultRequestInterceptorREST interceptor = 
getOrCreateInterceptorForSubCluster(
+          subClusterInfo.getSubClusterId(), 
subClusterInfo.getRMWebServiceAddress());
+      HttpServletRequest hsrCopy = clone(hsr);
+      Response response = interceptor.submitReservation(requestContext, 
hsrCopy);
+      if (response != null && response.getStatus() == 
HttpServletResponse.SC_ACCEPTED) {
+        LOG.info("Reservation {} submitted on subCluster {}.", reservationId, 
subClusterId);
+        return response;
+      }
+      String msg = String.format("application %s failed to be submitted.", 
resId);
+      throw new YarnException(msg);
+    } catch (Exception e) {
+      LOG.warn("Unable to submit the reservation {} to SubCluster {}.", resId,
+          subClusterId, e);
+      if (subClusterId != null) {
+        blackList.add(subClusterId);
+      }
+      throw e;
+    }
   }
 
   @Override
   public Response updateReservation(ReservationUpdateRequestInfo resContext,
-      HttpServletRequest hsr)
-      throws AuthorizationException, IOException, InterruptedException {
-    throw new NotImplementedException("Code is not implemented");
+      HttpServletRequest hsr) throws AuthorizationException, IOException, 
InterruptedException {
+
+    // parameter verification
+    if (resContext == null || resContext.getReservationId() == null
+        || resContext.getReservationDefinition() == null) {
+      routerMetrics.incrUpdateReservationFailedRetrieved();
+      String errMsg = "Missing updateReservation resContext or reservationId " 
+
+          "or reservation definition.";
+      return Response.status(Status.BAD_REQUEST).entity(errMsg).build();
+    }
+
+    // get reservationId
+    String reservationId = resContext.getReservationId();
+
+    // Check that the reservationId format is accurate
+    try {
+      RouterServerUtil.validateReservationId(reservationId);
+    } catch (IllegalArgumentException e) {
+      routerMetrics.incrUpdateReservationFailedRetrieved();
+      throw e;
+    }
+
+    try {
+      SubClusterInfo subClusterInfo = 
getHomeSubClusterInfoByReservationId(reservationId);
+      DefaultRequestInterceptorREST interceptor = 
getOrCreateInterceptorForSubCluster(
+          subClusterInfo.getSubClusterId(), 
subClusterInfo.getRMWebServiceAddress());
+      HttpServletRequest hsrCopy = clone(hsr);
+      Response response = interceptor.updateReservation(resContext, hsrCopy);
+      if (response != null) {
+        return response;
+      }
+    } catch (Exception e) {
+      routerMetrics.incrUpdateReservationFailedRetrieved();
+      RouterServerUtil.logAndThrowRunTimeException("updateReservation 
Failed.", e);
+    }
+
+    // throw an exception
+    routerMetrics.incrUpdateReservationFailedRetrieved();
+    throw new YarnRuntimeException("updateReservation Failed, reservationId = 
" + reservationId);
   }
 
   @Override
   public Response deleteReservation(ReservationDeleteRequestInfo resContext,
       HttpServletRequest hsr)
       throws AuthorizationException, IOException, InterruptedException {
-    throw new NotImplementedException("Code is not implemented");
+
+    // parameter verification
+    if (resContext == null || resContext.getReservationId() == null) {
+      routerMetrics.incrDeleteReservationFailedRetrieved();
+      String errMsg = "Missing deleteReservation request or reservationId.";
+      return Response.status(Status.BAD_REQUEST).entity(errMsg).build();
+    }
+
+    // get ReservationId
+    String reservationId = resContext.getReservationId();
+
+    // Check that the reservationId format is accurate
+    try {
+      RouterServerUtil.validateReservationId(reservationId);
+    } catch (IllegalArgumentException e) {
+      routerMetrics.incrDeleteReservationFailedRetrieved();
+      throw e;
+    }
+
+    try {
+      SubClusterInfo subClusterInfo = 
getHomeSubClusterInfoByReservationId(reservationId);
+      DefaultRequestInterceptorREST interceptor = 
getOrCreateInterceptorForSubCluster(
+          subClusterInfo.getSubClusterId(), 
subClusterInfo.getRMWebServiceAddress());
+      HttpServletRequest hsrCopy = clone(hsr);
+      Response response = interceptor.deleteReservation(resContext, hsrCopy);
+      if (response != null) {
+        return response;
+      }
+    } catch (Exception e) {
+      routerMetrics.incrDeleteReservationFailedRetrieved();
+      RouterServerUtil.logAndThrowRunTimeException("deleteReservation 
Failed.", e);
+    }
+
+    // throw an exception
+    routerMetrics.incrDeleteReservationFailedRetrieved();
+    throw new YarnRuntimeException("deleteReservation Failed, reservationId = 
" + reservationId);
   }
 
   @Override
@@ -1627,9 +1841,9 @@ public class FederationInterceptorREST extends 
AbstractRESTRequestInterceptor {
       throw new IllegalArgumentException("Parameter error, the reservationId 
is empty or null.");
     }
 
-    // Check that the appId format is accurate
+    // Check that the reservationId format is accurate
     try {
-      ReservationId.parseReservationId(reservationId);
+      RouterServerUtil.validateReservationId(reservationId);
     } catch (IllegalArgumentException e) {
       routerMetrics.incrListReservationFailedRetrieved();
       throw e;
@@ -2190,6 +2404,10 @@ public class FederationInterceptorREST extends 
AbstractRESTRequestInterceptor {
   }
 
   @VisibleForTesting
+  public Map<SubClusterId, DefaultRequestInterceptorREST> getInterceptors() {
+    return interceptors;
+  }
+
   public void setAllowPartialResult(boolean allowPartialResult) {
     this.allowPartialResult = allowPartialResult;
   }
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterServerUtil.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterServerUtil.java
new file mode 100644
index 00000000000..e82f67d12d5
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterServerUtil.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */package org.apache.hadoop.yarn.server.router;
+
+import org.apache.hadoop.test.LambdaTestUtils;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.ReservationRequests;
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.api.records.Resource;
+import 
org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDefinitionInfo;
+import 
org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationRequestInfo;
+import 
org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationRequestsInfo;
+import 
org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationSubmissionRequestInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+import static 
org.apache.hadoop.yarn.server.router.webapp.TestFederationInterceptorREST.getReservationSubmissionRequestInfo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class TestRouterServerUtil {
+
+  public static final Logger LOG = 
LoggerFactory.getLogger(TestRouterServerUtil.class);
+
+  @Test
+  public void testConvertReservationDefinition() {
+    // Prepare parameters
+    ReservationId reservationId = ReservationId.newInstance(Time.now(), 1);
+    ReservationSubmissionRequestInfo requestInfo =
+        getReservationSubmissionRequestInfo(reservationId);
+    ReservationDefinitionInfo expectDefinitionInfo = 
requestInfo.getReservationDefinition();
+
+    // ReservationDefinitionInfo conversion ReservationDefinition
+    ReservationDefinition convertDefinition =
+        RouterServerUtil.convertReservationDefinition(expectDefinitionInfo);
+
+    // reservationDefinition is not null
+    assertNotNull(convertDefinition);
+    assertEquals(expectDefinitionInfo.getArrival(), 
convertDefinition.getArrival());
+    assertEquals(expectDefinitionInfo.getDeadline(), 
convertDefinition.getDeadline());
+
+    Priority priority = convertDefinition.getPriority();
+    assertNotNull(priority);
+    assertEquals(expectDefinitionInfo.getPriority(), priority.getPriority());
+    assertEquals(expectDefinitionInfo.getRecurrenceExpression(),
+        convertDefinition.getRecurrenceExpression());
+    assertEquals(expectDefinitionInfo.getReservationName(), 
convertDefinition.getReservationName());
+
+    ReservationRequestsInfo expectRequestsInfo = 
expectDefinitionInfo.getReservationRequests();
+    List<ReservationRequestInfo> expectRequestsInfoList =
+        expectRequestsInfo.getReservationRequest();
+
+    ReservationRequests convertReservationRequests =
+        convertDefinition.getReservationRequests();
+    assertNotNull(convertReservationRequests);
+
+    List<ReservationRequest> convertRequestList =
+        convertReservationRequests.getReservationResources();
+    assertNotNull(convertRequestList);
+    assertEquals(1, convertRequestList.size());
+
+    ReservationRequestInfo expectResRequestInfo = 
expectRequestsInfoList.get(0);
+    ReservationRequest convertResRequest = convertRequestList.get(0);
+    assertNotNull(convertResRequest);
+    assertEquals(expectResRequestInfo.getNumContainers(), 
convertResRequest.getNumContainers());
+    assertEquals(expectResRequestInfo.getDuration(), 
convertResRequest.getDuration());
+
+    ResourceInfo expectResourceInfo = expectResRequestInfo.getCapability();
+    Resource convertResource = convertResRequest.getCapability();
+    assertNotNull(expectResourceInfo);
+    assertEquals(expectResourceInfo.getMemorySize(), 
convertResource.getMemorySize());
+    assertEquals(expectResourceInfo.getvCores(), 
convertResource.getVirtualCores());
+  }
+
+  @Test
+  public void testConvertReservationDefinitionEmpty() throws Exception {
+
+    // param ReservationDefinitionInfo is Null
+    ReservationDefinitionInfo definitionInfo = null;
+
+    // null request1
+    LambdaTestUtils.intercept(RuntimeException.class,
+        "definitionInfo Or ReservationRequests is Null.",
+        () -> RouterServerUtil.convertReservationDefinition(definitionInfo));
+
+    // param ReservationRequests is Null
+    ReservationDefinitionInfo definitionInfo2 = new 
ReservationDefinitionInfo();
+
+    // null request2
+    LambdaTestUtils.intercept(RuntimeException.class,
+        "definitionInfo Or ReservationRequests is Null.",
+        () -> RouterServerUtil.convertReservationDefinition(definitionInfo2));
+
+    // param ReservationRequests is Null
+    ReservationDefinitionInfo definitionInfo3 = new 
ReservationDefinitionInfo();
+    ReservationRequestsInfo requestsInfo = new ReservationRequestsInfo();
+    definitionInfo3.setReservationRequests(requestsInfo);
+
+    // null request3
+    LambdaTestUtils.intercept(RuntimeException.class,
+        "definitionInfo Or ReservationRequests is Null.",
+        () -> RouterServerUtil.convertReservationDefinition(definitionInfo3));
+  }
+}
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/BaseRouterWebServicesTest.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/BaseRouterWebServicesTest.java
index a4294bc3610..9b086e51030 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/BaseRouterWebServicesTest.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/BaseRouterWebServicesTest.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.authorize.AuthorizationException;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ActivitiesInfo;
 import 
org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppActivitiesInfo;
 import 
org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptsInfo;
@@ -74,10 +75,17 @@ public abstract class BaseRouterWebServicesTest {
   private Router router;
   public final static int TEST_MAX_CACHE_SIZE = 10;
 
+  public static final String QUEUE_DEFAULT = "default";
+  public static final String QUEUE_DEFAULT_FULL = 
CapacitySchedulerConfiguration.ROOT +
+      CapacitySchedulerConfiguration.DOT + QUEUE_DEFAULT;
+  public static final String QUEUE_DEDICATED = "dedicated";
+  public static final String QUEUE_DEDICATED_FULL = 
CapacitySchedulerConfiguration.ROOT +
+      CapacitySchedulerConfiguration.DOT + QUEUE_DEDICATED;
+
   private RouterWebServices routerWebService;
 
   @Before
-  public void setUp() {
+  public void setUp() throws YarnException, IOException {
     this.conf = createConfiguration();
 
     router = spy(new Router());
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java
index 7f73434c766..e2ac5fbf260 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java
@@ -27,7 +27,9 @@ import java.util.Map;
 import java.util.HashMap;
 import java.util.Collections;
 import java.util.Arrays;
+import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 
@@ -40,6 +42,7 @@ import javax.ws.rs.core.Response.Status;
 
 import org.apache.commons.lang3.EnumUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.classification.VisibleForTesting;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -47,6 +50,11 @@ import 
org.apache.hadoop.security.authorize.AuthorizationException;
 import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet;
 import org.apache.hadoop.util.Sets;
 import org.apache.hadoop.util.Time;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationListRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -65,10 +73,11 @@ import 
org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
 import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
 import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
 import org.apache.hadoop.yarn.api.records.ApplicationTimeout;
+import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.api.records.ReservationRequests;
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
 import org.apache.hadoop.yarn.api.records.QueueACL;
-import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.ReservationListRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -78,7 +87,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import 
org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
-import 
org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
@@ -117,10 +125,19 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppQueue;
 import 
org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.StatisticsItemInfo;
 import 
org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationStatisticsInfo;
 import 
org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppActivitiesInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NewReservation;
+import 
org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationSubmissionRequestInfo;
+import 
org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationUpdateRequestInfo;
+import 
org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDeleteRequestInfo;
 import 
org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationListInfo;
+import 
org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDefinitionInfo;
+import 
org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationRequestInfo;
+import 
org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationRequestsInfo;
+import 
org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationUpdateResponseInfo;
+import 
org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDeleteResponseInfo;
+import org.apache.hadoop.yarn.server.router.RouterServerUtil;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.PartitionInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.RMQueueAclInfo;
-import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebServices;
 import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
 import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
 import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
@@ -134,6 +151,11 @@ import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
+import static 
org.apache.hadoop.yarn.server.router.webapp.BaseRouterWebServicesTest.QUEUE_DEFAULT;
+import static 
org.apache.hadoop.yarn.server.router.webapp.BaseRouterWebServicesTest.QUEUE_DEFAULT_FULL;
+import static 
org.apache.hadoop.yarn.server.router.webapp.BaseRouterWebServicesTest.QUEUE_DEDICATED;
+import static 
org.apache.hadoop.yarn.server.router.webapp.BaseRouterWebServicesTest.QUEUE_DEDICATED_FULL;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -153,19 +175,16 @@ public class MockDefaultRequestInterceptorREST
   private Map<ApplicationId, ApplicationReport> applicationMap = new 
HashMap<>();
   public static final String APP_STATE_RUNNING = "RUNNING";
 
-  private static final String QUEUE_DEFAULT = "default";
-  private static final String QUEUE_DEFAULT_FULL = 
CapacitySchedulerConfiguration.ROOT +
-      CapacitySchedulerConfiguration.DOT + QUEUE_DEFAULT;
-  private static final String QUEUE_DEDICATED = "dedicated";
-  public static final String QUEUE_DEDICATED_FULL = 
CapacitySchedulerConfiguration.ROOT +
-      CapacitySchedulerConfiguration.DOT + QUEUE_DEDICATED;
-
   // duration(milliseconds), 1mins
   public static final long DURATION = 60*1000;
 
   // Containers 4
   public static final int NUM_CONTAINERS = 4;
 
+  private Map<ReservationId, SubClusterId> reservationMap = new HashMap<>();
+  private AtomicLong resCounter = new AtomicLong();
+  private MockRM mockRM = null;
+
   private void validateRunning() throws ConnectException {
     if (!isRunning) {
       throw new ConnectException("RM is stopped");
@@ -859,44 +878,191 @@ public class MockDefaultRequestInterceptorREST
           " Please try again with a valid reservable queue.");
     }
 
-    MockRM mockRM = setupResourceManager();
+    ReservationId reservationID =
+        ReservationId.parseReservationId(reservationId);
 
-    ReservationId reservationID = 
ReservationId.parseReservationId(reservationId);
-    ReservationSystem reservationSystem = mockRM.getReservationSystem();
-    reservationSystem.synchronizePlan(QUEUE_DEDICATED_FULL, true);
+    if (!reservationMap.containsKey(reservationID)) {
+      throw new NotFoundException("reservationId with id: " + reservationId + 
" not found");
+    }
 
-    // Generate reserved resources
     ClientRMService clientService = mockRM.getClientRMService();
 
-    // arrival time from which the resource(s) can be allocated.
-    long arrival = Time.now();
-
-    // deadline by when the resource(s) must be allocated.
-    // The reason for choosing 1.05 is because this gives an integer
-    // DURATION * 0.05 = 3000(ms)
-    // deadline = arrival + 3000ms
-    long deadline = (long) (arrival + 1.05 * DURATION);
-
-    // In this test of reserved resources, we will apply for 4 containers (1 
core, 1GB memory)
-    // arrival = Time.now(), and make sure deadline - arrival > duration,
-    // the current setting is greater than 3000ms
-    ReservationSubmissionRequest submissionRequest =
-        ReservationSystemTestUtil.createSimpleReservationRequest(
-            reservationID, NUM_CONTAINERS, arrival, deadline, DURATION);
-    clientService.submitReservation(submissionRequest);
-
     // listReservations
     ReservationListRequest request = ReservationListRequest.newInstance(
-        queue, reservationID.toString(), startTime, endTime, 
includeResourceAllocations);
+        queue, reservationId, startTime, endTime, includeResourceAllocations);
     ReservationListResponse resRespInfo = 
clientService.listReservations(request);
     ReservationListInfo resResponse =
         new ReservationListInfo(resRespInfo, includeResourceAllocations);
 
-    if (mockRM != null) {
-      mockRM.stop();
+    return Response.status(Status.OK).entity(resResponse).build();
+  }
+
+  @Override
+  public Response createNewReservation(HttpServletRequest hsr)
+      throws AuthorizationException, IOException, InterruptedException {
+
+    if (!isRunning) {
+      throw new RuntimeException("RM is stopped");
+    }
+
+    ReservationId resId = ReservationId.newInstance(Time.now(), 
resCounter.incrementAndGet());
+    LOG.info("Allocated new reservationId: {}.", resId);
+
+    NewReservation reservationId = new NewReservation(resId.toString());
+    return Response.status(Status.OK).entity(reservationId).build();
+  }
+
+  @Override
+  public Response submitReservation(ReservationSubmissionRequestInfo 
resContext,
+      HttpServletRequest hsr) throws AuthorizationException, IOException, 
InterruptedException {
+
+    if (!isRunning) {
+      throw new RuntimeException("RM is stopped");
     }
 
-    return Response.status(Status.OK).entity(resResponse).build();
+    ReservationId reservationId = 
ReservationId.parseReservationId(resContext.getReservationId());
+    ReservationDefinitionInfo definitionInfo = 
resContext.getReservationDefinition();
+    ReservationDefinition definition =
+            RouterServerUtil.convertReservationDefinition(definitionInfo);
+    ReservationSubmissionRequest request = 
ReservationSubmissionRequest.newInstance(
+            definition, resContext.getQueue(), reservationId);
+    submitReservation(request);
+
+    LOG.info("Reservation submitted: {}.", reservationId);
+
+    SubClusterId subClusterId = getSubClusterId();
+    reservationMap.put(reservationId, subClusterId);
+
+    return Response.status(Status.ACCEPTED).build();
+  }
+
+  private void submitReservation(ReservationSubmissionRequest request) {
+    try {
+      // synchronize plan
+      ReservationSystem reservationSystem = mockRM.getReservationSystem();
+      reservationSystem.synchronizePlan(QUEUE_DEDICATED_FULL, true);
+      // Generate reserved resources
+      ClientRMService clientService = mockRM.getClientRMService();
+      clientService.submitReservation(request);
+    } catch (IOException | YarnException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public Response updateReservation(ReservationUpdateRequestInfo resContext,
+      HttpServletRequest hsr) throws AuthorizationException, IOException, 
InterruptedException {
+
+    if (resContext == null || resContext.getReservationId() == null ||
+        resContext.getReservationDefinition() == null) {
+      return Response.status(Status.BAD_REQUEST).build();
+    }
+
+    String resId = resContext.getReservationId();
+    ReservationId reservationId = ReservationId.parseReservationId(resId);
+
+    if (!reservationMap.containsKey(reservationId)) {
+      throw new NotFoundException("reservationId with id: " + reservationId + 
" not found");
+    }
+
+    // Generate reserved resources
+    updateReservation(resContext);
+
+    ReservationUpdateResponseInfo resRespInfo = new 
ReservationUpdateResponseInfo();
+    return Response.status(Status.OK).entity(resRespInfo).build();
+  }
+
+  private void updateReservation(ReservationUpdateRequestInfo resContext) 
throws IOException {
+
+    if (resContext == null) {
+      throw new BadRequestException("Input ReservationSubmissionContext should 
not be null");
+    }
+
+    ReservationDefinitionInfo resInfo = resContext.getReservationDefinition();
+    if (resInfo == null) {
+      throw new BadRequestException("Input ReservationDefinition should not be 
null");
+    }
+
+    ReservationRequestsInfo resReqsInfo = resInfo.getReservationRequests();
+    if (resReqsInfo == null || resReqsInfo.getReservationRequest() == null
+        || resReqsInfo.getReservationRequest().isEmpty()) {
+      throw new BadRequestException("The ReservationDefinition should " +
+          "contain at least one ReservationRequest");
+    }
+
+    if (resContext.getReservationId() == null) {
+      throw new BadRequestException("Update operations must specify an 
existing ReservaitonId");
+    }
+
+    ReservationRequestInterpreter[] values = 
ReservationRequestInterpreter.values();
+    ReservationRequestInterpreter requestInterpreter =
+        values[resReqsInfo.getReservationRequestsInterpreter()];
+    List<ReservationRequest> list = new ArrayList<>();
+
+    for (ReservationRequestInfo resReqInfo : 
resReqsInfo.getReservationRequest()) {
+      ResourceInfo rInfo = resReqInfo.getCapability();
+      Resource capability = Resource.newInstance(rInfo.getMemorySize(), 
rInfo.getvCores());
+      int numContainers = resReqInfo.getNumContainers();
+      int minConcurrency = resReqInfo.getMinConcurrency();
+      long duration = resReqInfo.getDuration();
+      ReservationRequest rr = ReservationRequest.newInstance(
+          capability, numContainers, minConcurrency, duration);
+      list.add(rr);
+    }
+
+    ReservationRequests reqs = ReservationRequests.newInstance(list, 
requestInterpreter);
+    ReservationDefinition rDef = ReservationDefinition.newInstance(
+        resInfo.getArrival(), resInfo.getDeadline(), reqs,
+        resInfo.getReservationName(), resInfo.getRecurrenceExpression(),
+        Priority.newInstance(resInfo.getPriority()));
+    ReservationUpdateRequest request = ReservationUpdateRequest.newInstance(
+        rDef, ReservationId.parseReservationId(resContext.getReservationId()));
+
+    ClientRMService clientService = mockRM.getClientRMService();
+    try {
+      clientService.updateReservation(request);
+    } catch (YarnException ex) {
+      throw new RuntimeException(ex);
+    }
+  }
+
+  @Override
+  public Response deleteReservation(ReservationDeleteRequestInfo resContext, 
HttpServletRequest hsr)
+      throws AuthorizationException, IOException, InterruptedException {
+    if (!isRunning) {
+      throw new RuntimeException("RM is stopped");
+    }
+
+    try {
+      String resId = resContext.getReservationId();
+      ReservationId reservationId = ReservationId.parseReservationId(resId);
+
+      if (!reservationMap.containsKey(reservationId)) {
+        throw new NotFoundException("reservationId with id: " + reservationId 
+ " not found");
+      }
+
+      ReservationDeleteRequest reservationDeleteRequest =
+          ReservationDeleteRequest.newInstance(reservationId);
+      ClientRMService clientService = mockRM.getClientRMService();
+      clientService.deleteReservation(reservationDeleteRequest);
+
+      ReservationDeleteResponseInfo resRespInfo = new 
ReservationDeleteResponseInfo();
+      reservationMap.remove(reservationId);
+
+      return Response.status(Status.OK).entity(resRespInfo).build();
+    } catch (YarnException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @VisibleForTesting
+  public MockRM getMockRM() {
+    return mockRM;
+  }
+
+  @VisibleForTesting
+  public void setMockRM(MockRM mockResourceManager) {
+    this.mockRM = mockResourceManager;
   }
 
   @Override
@@ -939,7 +1105,7 @@ public class MockDefaultRequestInterceptorREST
   public RMQueueAclInfo checkUserAccessToQueue(String queue, String username,
       String queueAclType, HttpServletRequest hsr) throws 
AuthorizationException {
 
-    ResourceManager mockRM = mock(ResourceManager.class);
+    ResourceManager mockResourceManager = mock(ResourceManager.class);
     Configuration conf = new YarnConfiguration();
 
     ResourceScheduler mockScheduler = new CapacityScheduler() {
@@ -959,8 +1125,9 @@ public class MockDefaultRequestInterceptorREST
       }
     };
 
-    when(mockRM.getResourceScheduler()).thenReturn(mockScheduler);
-    MockRMWebServices webSvc = new MockRMWebServices(mockRM, conf, 
mock(HttpServletResponse.class));
+    when(mockResourceManager.getResourceScheduler()).thenReturn(mockScheduler);
+    MockRMWebServices webSvc = new MockRMWebServices(mockResourceManager, conf,
+        mock(HttpServletResponse.class));
     return webSvc.checkUserAccessToQueue(queue, username, queueAclType, hsr);
   }
 
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java
index 7c82e71ea9b..14533d10871 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.test.LambdaTestUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.http.HttpConfig;
 import org.apache.hadoop.util.Time;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -45,6 +46,11 @@ import 
org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.NodeLabel;
 import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.api.records.ReservationRequests;
+import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.QueueACL;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -59,8 +65,6 @@ import 
org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
 import 
org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest;
 import 
org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse;
 import 
org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
-import 
org.apache.hadoop.yarn.server.federation.store.records.ReservationHomeSubCluster;
-import 
org.apache.hadoop.yarn.server.federation.store.records.AddReservationHomeSubClusterRequest;
 import 
org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
 import 
org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreTestUtil;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
@@ -86,6 +90,7 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppPriority;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppQueue;
 import 
org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationListInfo;
 import 
org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationInfo;
+import 
org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationSubmissionRequestInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.RMQueueAclInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.NodeIDsInfo;
 import org.apache.hadoop.yarn.server.router.webapp.cache.RouterAppInfoCacheKey;
@@ -94,6 +99,9 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppActivitiesInf
 import 
org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDefinitionInfo;
 import 
org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationRequestsInfo;
 import 
org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationRequestInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NewReservation;
+import 
org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationUpdateRequestInfo;
+import 
org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDeleteRequestInfo;
 import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
 import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
 import 
org.apache.hadoop.yarn.server.router.webapp.dao.FederationRMQueueAclInfo;
@@ -106,7 +114,8 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static 
org.apache.hadoop.yarn.server.router.webapp.MockDefaultRequestInterceptorREST.QUEUE_DEDICATED_FULL;
+import static 
org.apache.hadoop.yarn.server.router.webapp.MockDefaultRequestInterceptorREST.DURATION;
+import static 
org.apache.hadoop.yarn.server.router.webapp.MockDefaultRequestInterceptorREST.NUM_CONTAINERS;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -130,7 +139,7 @@ public class TestFederationInterceptorREST extends 
BaseRouterWebServicesTest {
   private List<SubClusterId> subClusters;
 
   @Override
-  public void setUp() {
+  public void setUp() throws YarnException, IOException {
     super.setUpConfig();
     interceptor = new TestableFederationInterceptorREST();
 
@@ -156,6 +165,13 @@ public class TestFederationInterceptorREST extends 
BaseRouterWebServicesTest {
       Assert.fail();
     }
 
+    for (SubClusterId subCluster : subClusters) {
+      SubClusterInfo subClusterInfo = 
stateStoreUtil.querySubClusterInfo(subCluster);
+      interceptor.getOrCreateInterceptorForSubCluster(
+          subCluster, subClusterInfo.getRMWebServiceAddress());
+    }
+
+    interceptor.setupResourceManager();
   }
 
   @Override
@@ -1100,14 +1116,9 @@ public class TestFederationInterceptorREST extends 
BaseRouterWebServicesTest {
   @Test
   public void testListReservation() throws Exception {
 
-    // Add ReservationId In stateStore
+    // submitReservation
     ReservationId reservationId = ReservationId.newInstance(Time.now(), 1);
-    SubClusterId homeSubClusterId = subClusters.get(0);
-    ReservationHomeSubCluster reservationHomeSubCluster =
-        ReservationHomeSubCluster.newInstance(reservationId, homeSubClusterId);
-    AddReservationHomeSubClusterRequest request =
-        
AddReservationHomeSubClusterRequest.newInstance(reservationHomeSubCluster);
-    stateStore.addReservationHomeSubCluster(request);
+    submitReservation(reservationId);
 
     // Call the listReservation method
     String applyReservationId = reservationId.toString();
@@ -1157,6 +1168,199 @@ public class TestFederationInterceptorREST extends 
BaseRouterWebServicesTest {
     Assert.assertEquals(1024, memory);
   }
 
+  @Test
+  public void testCreateNewReservation() throws Exception {
+    Response response = interceptor.createNewReservation(null);
+    Assert.assertNotNull(response);
+
+    Object entity = response.getEntity();
+    Assert.assertNotNull(entity);
+    Assert.assertTrue(entity instanceof NewReservation);
+
+    NewReservation newReservation = (NewReservation) entity;
+    Assert.assertNotNull(newReservation);
+    
Assert.assertTrue(newReservation.getReservationId().contains("reservation"));
+  }
+
+  @Test
+  public void testSubmitReservation() throws Exception {
+
+    // submit reservation
+    ReservationId reservationId = ReservationId.newInstance(Time.now(), 2);
+    Response response = submitReservation(reservationId);
+    Assert.assertNotNull(response);
+    Assert.assertEquals(Status.ACCEPTED.getStatusCode(), response.getStatus());
+
+    String applyReservationId = reservationId.toString();
+    Response reservationResponse = interceptor.listReservation(
+        QUEUE_DEDICATED_FULL, applyReservationId, -1, -1, false, null);
+    Assert.assertNotNull(reservationResponse);
+
+    Object entity = reservationResponse.getEntity();
+    Assert.assertNotNull(entity);
+    Assert.assertNotNull(entity instanceof ReservationListInfo);
+
+    ReservationListInfo listInfo = (ReservationListInfo) entity;
+    Assert.assertNotNull(listInfo);
+
+    List<ReservationInfo> reservationInfos = listInfo.getReservations();
+    Assert.assertNotNull(reservationInfos);
+    Assert.assertEquals(1, reservationInfos.size());
+
+    ReservationInfo reservationInfo = reservationInfos.get(0);
+    Assert.assertNotNull(reservationInfo);
+    Assert.assertEquals(reservationInfo.getReservationId(), 
applyReservationId);
+  }
+
+  @Test
+  public void testUpdateReservation() throws Exception {
+    // submit reservation
+    ReservationId reservationId = ReservationId.newInstance(Time.now(), 3);
+    Response response = submitReservation(reservationId);
+    Assert.assertNotNull(response);
+    Assert.assertEquals(Status.ACCEPTED.getStatusCode(), response.getStatus());
+
+    // update reservation
+    ReservationSubmissionRequest resSubRequest =
+        getReservationSubmissionRequest(reservationId, 6, 2048, 2);
+    ReservationDefinition reservationDefinition = 
resSubRequest.getReservationDefinition();
+    ReservationDefinitionInfo reservationDefinitionInfo =
+        new ReservationDefinitionInfo(reservationDefinition);
+
+    ReservationUpdateRequestInfo updateRequestInfo = new 
ReservationUpdateRequestInfo();
+    updateRequestInfo.setReservationId(reservationId.toString());
+    updateRequestInfo.setReservationDefinition(reservationDefinitionInfo);
+    Response updateReservationResp = 
interceptor.updateReservation(updateRequestInfo, null);
+    Assert.assertNotNull(updateReservationResp);
+    Assert.assertEquals(Status.OK.getStatusCode(), 
updateReservationResp.getStatus());
+
+    String applyReservationId = reservationId.toString();
+    Response reservationResponse = interceptor.listReservation(
+            QUEUE_DEDICATED_FULL, applyReservationId, -1, -1, false, null);
+    Assert.assertNotNull(reservationResponse);
+
+    Object entity = reservationResponse.getEntity();
+    Assert.assertNotNull(entity);
+    Assert.assertNotNull(entity instanceof ReservationListInfo);
+
+    ReservationListInfo listInfo = (ReservationListInfo) entity;
+    Assert.assertNotNull(listInfo);
+
+    List<ReservationInfo> reservationInfos = listInfo.getReservations();
+    Assert.assertNotNull(reservationInfos);
+    Assert.assertEquals(1, reservationInfos.size());
+
+    ReservationInfo reservationInfo = reservationInfos.get(0);
+    Assert.assertNotNull(reservationInfo);
+    Assert.assertEquals(reservationInfo.getReservationId(), 
applyReservationId);
+
+    ReservationDefinitionInfo resDefinitionInfo = 
reservationInfo.getReservationDefinition();
+    Assert.assertNotNull(resDefinitionInfo);
+
+    ReservationRequestsInfo reservationRequestsInfo = 
resDefinitionInfo.getReservationRequests();
+    Assert.assertNotNull(reservationRequestsInfo);
+
+    ArrayList<ReservationRequestInfo> reservationRequestInfoList =
+            reservationRequestsInfo.getReservationRequest();
+    Assert.assertNotNull(reservationRequestInfoList);
+    Assert.assertEquals(1, reservationRequestInfoList.size());
+
+    ReservationRequestInfo reservationRequestInfo = 
reservationRequestInfoList.get(0);
+    Assert.assertNotNull(reservationRequestInfo);
+    Assert.assertEquals(6, reservationRequestInfo.getNumContainers());
+
+    ResourceInfo resourceInfo = reservationRequestInfo.getCapability();
+    Assert.assertNotNull(resourceInfo);
+
+    int vCore = resourceInfo.getvCores();
+    long memory = resourceInfo.getMemorySize();
+    Assert.assertEquals(2, vCore);
+    Assert.assertEquals(2048, memory);
+  }
+
+  @Test
+  public void testDeleteReservation() throws Exception {
+    // submit reservation
+    ReservationId reservationId = ReservationId.newInstance(Time.now(), 4);
+    Response response = submitReservation(reservationId);
+    Assert.assertNotNull(response);
+    Assert.assertEquals(Status.ACCEPTED.getStatusCode(), response.getStatus());
+
+    String applyResId = reservationId.toString();
+    Response reservationResponse = interceptor.listReservation(
+        QUEUE_DEDICATED_FULL, applyResId, -1, -1, false, null);
+    Assert.assertNotNull(reservationResponse);
+
+    ReservationDeleteRequestInfo deleteRequestInfo =
+        new ReservationDeleteRequestInfo();
+    deleteRequestInfo.setReservationId(applyResId);
+    Response delResponse = interceptor.deleteReservation(deleteRequestInfo, 
null);
+    Assert.assertNotNull(delResponse);
+
+    LambdaTestUtils.intercept(Exception.class,
+        "reservationId with id: " + reservationId + " not found",
+        () -> interceptor.listReservation(QUEUE_DEDICATED_FULL, applyResId, 
-1, -1, false, null));
+  }
+
+  private Response submitReservation(ReservationId reservationId)
+       throws IOException, InterruptedException {
+    ReservationSubmissionRequestInfo resSubmissionRequestInfo =
+        getReservationSubmissionRequestInfo(reservationId);
+    Response response = 
interceptor.submitReservation(resSubmissionRequestInfo, null);
+    return response;
+  }
+
+  public static ReservationSubmissionRequestInfo 
getReservationSubmissionRequestInfo(
+      ReservationId reservationId) {
+
+    ReservationSubmissionRequest resSubRequest =
+        getReservationSubmissionRequest(reservationId, NUM_CONTAINERS, 1024, 
1);
+    ReservationDefinition reservationDefinition = 
resSubRequest.getReservationDefinition();
+
+    ReservationSubmissionRequestInfo resSubmissionRequestInfo =
+        new ReservationSubmissionRequestInfo();
+    resSubmissionRequestInfo.setQueue(resSubRequest.getQueue());
+    resSubmissionRequestInfo.setReservationId(reservationId.toString());
+    ReservationDefinitionInfo reservationDefinitionInfo =
+        new ReservationDefinitionInfo(reservationDefinition);
+    
resSubmissionRequestInfo.setReservationDefinition(reservationDefinitionInfo);
+
+    return resSubmissionRequestInfo;
+  }
+
+  public static ReservationSubmissionRequest getReservationSubmissionRequest(
+      ReservationId reservationId, int numContainers, int memory, int vcore) {
+
+    // arrival time from which the resource(s) can be allocated.
+    long arrival = Time.now();
+
+    // deadline by when the resource(s) must be allocated.
+    // The reason for choosing 1.05 is because this gives an integer
+    // DURATION * 0.05 = 3000(ms)
+    // deadline = arrival + 3000ms
+    long deadline = (long) (arrival + 1.05 * DURATION);
+
+    ReservationSubmissionRequest submissionRequest = 
createSimpleReservationRequest(
+        reservationId, numContainers, arrival, deadline, DURATION, memory, 
vcore);
+
+    return submissionRequest;
+  }
+
+  public static ReservationSubmissionRequest createSimpleReservationRequest(
+      ReservationId reservationId, int numContainers, long arrival,
+      long deadline, long duration, int memory, int vcore) {
+    // create a request with a single atomic ask
+    ReservationRequest r = ReservationRequest.newInstance(
+        Resource.newInstance(memory, vcore), numContainers, 1, duration);
+    ReservationRequests reqs = ReservationRequests.newInstance(
+        Collections.singletonList(r), ReservationRequestInterpreter.R_ALL);
+    ReservationDefinition rDef = ReservationDefinition.newInstance(
+        arrival, deadline, reqs, "testClientRMService#reservation", "0", 
Priority.UNDEFINED);
+    ReservationSubmissionRequest request = 
ReservationSubmissionRequest.newInstance(
+        rDef, QUEUE_DEDICATED_FULL, reservationId);
+    return request;
+  }
+
   @Test
   public void testWebAddressWithScheme() {
     // The style of the web address reported by the subCluster in the 
heartbeat is 0.0.0.0:8000
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestableFederationInterceptorREST.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestableFederationInterceptorREST.java
index 7126ca515c1..31fd756b664 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestableFederationInterceptorREST.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestableFederationInterceptorREST.java
@@ -18,10 +18,25 @@
 
 package org.apache.hadoop.yarn.server.router.webapp;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.hadoop.yarn.server.router.webapp.BaseRouterWebServicesTest.QUEUE_DEDICATED_FULL;
+import static 
org.apache.hadoop.yarn.server.router.webapp.BaseRouterWebServicesTest.QUEUE_DEFAULT_FULL;
+import static 
org.apache.hadoop.yarn.server.router.webapp.BaseRouterWebServicesTest.QUEUE_DEFAULT;
+import static 
org.apache.hadoop.yarn.server.router.webapp.BaseRouterWebServicesTest.QUEUE_DEDICATED;
 
 /**
  * Extends the FederationInterceptorREST and overrides methods to provide a
@@ -30,7 +45,11 @@ import 
org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
 public class TestableFederationInterceptorREST
     extends FederationInterceptorREST {
 
-  private List<SubClusterId> badSubCluster = new ArrayList<SubClusterId>();
+  private List<SubClusterId> badSubCluster = new ArrayList<>();
+  private MockRM mockRM = null;
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestableFederationInterceptorREST.class);
 
   /**
    * For testing purpose, some subclusters has to be down to simulate 
particular
@@ -51,4 +70,51 @@ public class TestableFederationInterceptorREST
     interceptor.setRunning(false);
   }
 
+  protected void setupResourceManager() throws IOException {
+
+    if (mockRM != null) {
+      return;
+    }
+
+    try {
+
+      DefaultMetricsSystem.setMiniClusterMode(true);
+      CapacitySchedulerConfiguration conf = new 
CapacitySchedulerConfiguration();
+
+      // Define default queue
+      conf.setCapacity(QUEUE_DEFAULT_FULL, 20);
+      // Define dedicated queues
+      String[] queues = new String[]{QUEUE_DEFAULT, QUEUE_DEDICATED};
+      conf.setQueues(CapacitySchedulerConfiguration.ROOT, queues);
+      conf.setCapacity(QUEUE_DEDICATED_FULL, 80);
+      conf.setReservable(QUEUE_DEDICATED_FULL, true);
+
+      conf.setClass(YarnConfiguration.RM_SCHEDULER,
+          CapacityScheduler.class, ResourceScheduler.class);
+      conf.setBoolean(YarnConfiguration.RM_RESERVATION_SYSTEM_ENABLE, true);
+      conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, 
false);
+
+      mockRM = new MockRM(conf);
+      mockRM.start();
+      mockRM.registerNode("127.0.0.1:5678", 100*1024, 100);
+
+      Map<SubClusterId, DefaultRequestInterceptorREST> interceptors = 
super.getInterceptors();
+      for (DefaultRequestInterceptorREST item : interceptors.values()) {
+        MockDefaultRequestInterceptorREST interceptor = 
(MockDefaultRequestInterceptorREST) item;
+        interceptor.setMockRM(mockRM);
+      }
+    } catch (Exception e) {
+      LOG.error("setupResourceManager failed.", e);
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public void shutdown() {
+    if (mockRM != null) {
+      mockRM.stop();
+      mockRM = null;
+    }
+    super.shutdown();
+  }
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to