goiri commented on code in PR #4764:
URL: https://github.com/apache/hadoop/pull/4764#discussion_r954396015


##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java:
##########
@@ -925,13 +1041,61 @@ public ReservationListResponse listReservations(
   @Override
   public ReservationUpdateResponse updateReservation(
       ReservationUpdateRequest request) throws YarnException, IOException {
-    throw new NotImplementedException("Code is not implemented");
+
+    if (request == null || request.getReservationId() == null
+            || request.getReservationDefinition() == null) {

Review Comment:
   Indentation



##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java:
##########
@@ -888,13 +890,127 @@ public MoveApplicationAcrossQueuesResponse 
moveApplicationAcrossQueues(
   @Override
   public GetNewReservationResponse getNewReservation(
       GetNewReservationRequest request) throws YarnException, IOException {
-    throw new NotImplementedException("Code is not implemented");
+
+    if (request == null) {
+      routerMetrics.incrGetNewReservationFailedRetrieved();
+      String errMsg = "Missing getNewReservation request.";
+      RouterServerUtil.logAndThrowException(errMsg, null);
+    }
+
+    long startTime = clock.getTime();
+    Map<SubClusterId, SubClusterInfo> subClustersActive =
+        federationFacade.getSubClusters(true);
+
+    for (int i = 0; i < numSubmitRetries; ++i) {
+      SubClusterId subClusterId = getRandomActiveSubCluster(subClustersActive);
+      LOG.info("getNewReservation try #{} on SubCluster {}.", i, subClusterId);
+      ApplicationClientProtocol clientRMProxy = 
getClientRMProxyForSubCluster(subClusterId);
+      GetNewReservationResponse response = null;
+      try {
+        response = clientRMProxy.getNewReservation(request);
+        if (response != null) {
+          long stopTime = clock.getTime();
+          routerMetrics.succeededGetNewReservationRetrieved(stopTime - 
startTime);
+          return response;
+        }
+      } catch (Exception e) {
+        LOG.warn("Unable to create a new Reservation in SubCluster {}.", 
subClusterId.getId(), e);
+        subClustersActive.remove(subClusterId);
+      }
+    }
+
+    routerMetrics.incrGetNewReservationFailedRetrieved();
+    String errMsg = "Failed to create a new reservation.";
+    throw new YarnException(errMsg);
   }
 
   @Override
   public ReservationSubmissionResponse submitReservation(
       ReservationSubmissionRequest request) throws YarnException, IOException {
-    throw new NotImplementedException("Code is not implemented");
+
+    if (request == null || request.getReservationId() == null
+            || request.getReservationDefinition() == null || 
request.getQueue() == null) {
+      routerMetrics.incrSubmitReservationFailedRetrieved();
+      RouterServerUtil.logAndThrowException(
+          "Missing submitReservation request or reservationId " +
+               "or reservation definition or queue.", null);
+    }
+
+    long startTime = clock.getTime();
+    ReservationId reservationId = request.getReservationId();
+
+    long retryCount = 0;
+    boolean firstRetry = true;
+
+    while (retryCount < numSubmitRetries) {
+
+      SubClusterId subClusterId = 
policyFacade.getReservationHomeSubCluster(request);
+      LOG.info("submitReservation reservationId {} try #{} on SubCluster {}.",
+          reservationId, retryCount, subClusterId);
+
+      ReservationHomeSubCluster reservationHomeSubCluster =
+          ReservationHomeSubCluster.newInstance(reservationId, subClusterId);
+
+      // If it is the first attempt,use StateStore to add the
+      // mapping of reservationId and subClusterId.
+      // if the number of attempts is greater than 1, use StateStore to update 
the mapping.
+      if (firstRetry) {
+        try {
+          // persist the mapping of reservationId and the subClusterId which 
has
+          // been selected as its home
+          subClusterId = 
federationFacade.addReservationHomeSubCluster(reservationHomeSubCluster);
+          firstRetry = false;
+        } catch (YarnException e) {
+          routerMetrics.incrSubmitReservationFailedRetrieved();
+          RouterServerUtil.logAndThrowException(e,
+              "Unable to insert the ReservationId %s into the 
FederationStateStore.",
+                   reservationId);
+        }
+      } else {
+        try {
+          // update the mapping of reservationId and the home subClusterId to
+          // the new subClusterId we have selected
+          
federationFacade.updateReservationHomeSubCluster(reservationHomeSubCluster);
+        } catch (YarnException e) {
+          SubClusterId subClusterIdInStateStore =
+              federationFacade.getReservationHomeSubCluster(reservationId);
+          if (subClusterId == subClusterIdInStateStore) {
+            LOG.info("Reservation {} already submitted on SubCluster {}.",
+                reservationId, subClusterId);
+          } else {
+            routerMetrics.incrSubmitReservationFailedRetrieved();
+            RouterServerUtil.logAndThrowException(e,
+                "Unable to update the ReservationId %s into the 
FederationStateStore.",
+                     reservationId);

Review Comment:
   Indentation



##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java:
##########
@@ -888,13 +890,127 @@ public MoveApplicationAcrossQueuesResponse 
moveApplicationAcrossQueues(
   @Override
   public GetNewReservationResponse getNewReservation(
       GetNewReservationRequest request) throws YarnException, IOException {
-    throw new NotImplementedException("Code is not implemented");
+
+    if (request == null) {
+      routerMetrics.incrGetNewReservationFailedRetrieved();
+      String errMsg = "Missing getNewReservation request.";
+      RouterServerUtil.logAndThrowException(errMsg, null);
+    }
+
+    long startTime = clock.getTime();
+    Map<SubClusterId, SubClusterInfo> subClustersActive =
+        federationFacade.getSubClusters(true);
+
+    for (int i = 0; i < numSubmitRetries; ++i) {
+      SubClusterId subClusterId = getRandomActiveSubCluster(subClustersActive);
+      LOG.info("getNewReservation try #{} on SubCluster {}.", i, subClusterId);
+      ApplicationClientProtocol clientRMProxy = 
getClientRMProxyForSubCluster(subClusterId);
+      GetNewReservationResponse response = null;
+      try {
+        response = clientRMProxy.getNewReservation(request);
+        if (response != null) {
+          long stopTime = clock.getTime();
+          routerMetrics.succeededGetNewReservationRetrieved(stopTime - 
startTime);
+          return response;
+        }
+      } catch (Exception e) {
+        LOG.warn("Unable to create a new Reservation in SubCluster {}.", 
subClusterId.getId(), e);
+        subClustersActive.remove(subClusterId);
+      }
+    }
+
+    routerMetrics.incrGetNewReservationFailedRetrieved();
+    String errMsg = "Failed to create a new reservation.";
+    throw new YarnException(errMsg);
   }
 
   @Override
   public ReservationSubmissionResponse submitReservation(
       ReservationSubmissionRequest request) throws YarnException, IOException {
-    throw new NotImplementedException("Code is not implemented");
+
+    if (request == null || request.getReservationId() == null
+            || request.getReservationDefinition() == null || 
request.getQueue() == null) {
+      routerMetrics.incrSubmitReservationFailedRetrieved();
+      RouterServerUtil.logAndThrowException(
+          "Missing submitReservation request or reservationId " +
+               "or reservation definition or queue.", null);
+    }
+
+    long startTime = clock.getTime();
+    ReservationId reservationId = request.getReservationId();
+
+    long retryCount = 0;
+    boolean firstRetry = true;
+
+    while (retryCount < numSubmitRetries) {
+
+      SubClusterId subClusterId = 
policyFacade.getReservationHomeSubCluster(request);
+      LOG.info("submitReservation reservationId {} try #{} on SubCluster {}.",
+          reservationId, retryCount, subClusterId);
+
+      ReservationHomeSubCluster reservationHomeSubCluster =
+          ReservationHomeSubCluster.newInstance(reservationId, subClusterId);
+
+      // If it is the first attempt,use StateStore to add the
+      // mapping of reservationId and subClusterId.
+      // if the number of attempts is greater than 1, use StateStore to update 
the mapping.
+      if (firstRetry) {
+        try {
+          // persist the mapping of reservationId and the subClusterId which 
has
+          // been selected as its home
+          subClusterId = 
federationFacade.addReservationHomeSubCluster(reservationHomeSubCluster);
+          firstRetry = false;
+        } catch (YarnException e) {
+          routerMetrics.incrSubmitReservationFailedRetrieved();
+          RouterServerUtil.logAndThrowException(e,
+              "Unable to insert the ReservationId %s into the 
FederationStateStore.",
+                   reservationId);
+        }
+      } else {
+        try {
+          // update the mapping of reservationId and the home subClusterId to
+          // the new subClusterId we have selected
+          
federationFacade.updateReservationHomeSubCluster(reservationHomeSubCluster);
+        } catch (YarnException e) {
+          SubClusterId subClusterIdInStateStore =
+              federationFacade.getReservationHomeSubCluster(reservationId);
+          if (subClusterId == subClusterIdInStateStore) {
+            LOG.info("Reservation {} already submitted on SubCluster {}.",
+                reservationId, subClusterId);
+          } else {
+            routerMetrics.incrSubmitReservationFailedRetrieved();
+            RouterServerUtil.logAndThrowException(e,
+                "Unable to update the ReservationId %s into the 
FederationStateStore.",
+                     reservationId);
+          }
+        }
+      }
+
+      // Obtain the ApplicationClientProtocol of the corresponding RM 
according to the subClusterId,
+      // and call the submitReservation method, If the request is responded to,
+      // If the request is responded, it will return directly, otherwise 
retryCount+1,
+      // continue to submit other request.
+      try {
+        ApplicationClientProtocol clientRMProxy = 
getClientRMProxyForSubCluster(subClusterId);
+        ReservationSubmissionResponse response = 
clientRMProxy.submitReservation(request);
+        if (response != null) {
+          LOG.info("Reservation {} submitted on {}.", 
request.getReservationId(), subClusterId);
+          long stopTime = clock.getTime();
+          routerMetrics.succeededSubmitReservationRetrieved(stopTime - 
startTime);
+          return response;
+        }
+      } catch (Exception e) {
+        LOG.warn("Unable to submit the reservation {} to SubCluster {} error = 
{}.",
+            reservationId, subClusterId.getId(), e.getMessage(), e);
+      }
+
+      retryCount++;
+    }
+
+    routerMetrics.incrSubmitReservationFailedRetrieved();
+    String msg = String.format("Reservation %s failed to be submitted.",
+        request.getReservationId());

Review Comment:
   reservationId



##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java:
##########
@@ -888,13 +890,127 @@ public MoveApplicationAcrossQueuesResponse 
moveApplicationAcrossQueues(
   @Override
   public GetNewReservationResponse getNewReservation(
       GetNewReservationRequest request) throws YarnException, IOException {
-    throw new NotImplementedException("Code is not implemented");
+
+    if (request == null) {
+      routerMetrics.incrGetNewReservationFailedRetrieved();
+      String errMsg = "Missing getNewReservation request.";
+      RouterServerUtil.logAndThrowException(errMsg, null);
+    }
+
+    long startTime = clock.getTime();
+    Map<SubClusterId, SubClusterInfo> subClustersActive =

Review Comment:
   Single line?



##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java:
##########
@@ -1254,4 +1275,250 @@ public void testNodesToAttributes() throws Exception {
         NodeAttributeType.STRING, "nvida");
     Assert.assertTrue(nodeAttributeMap.get("0-host1").contains(gpu));
   }
+
+  @Test
+  public void testGetNewReservation() throws Exception {
+    LOG.info("Test FederationClientInterceptor : Get NewReservation request.");
+
+    // null request
+    LambdaTestUtils.intercept(YarnException.class,
+        "Missing getNewReservation request.", () -> 
interceptor.getNewReservation(null));
+
+    // normal request
+    GetNewReservationRequest request = GetNewReservationRequest.newInstance();
+    GetNewReservationResponse response = 
interceptor.getNewReservation(request);
+    Assert.assertNotNull(response);
+
+    ReservationId reservationId = response.getReservationId();
+    Assert.assertNotNull(reservationId);
+    Assert.assertTrue(reservationId.toString().contains("reservation"));
+    Assert.assertEquals(reservationId.getClusterTimestamp(), 
ResourceManager.getClusterTimeStamp());
+  }
+
+  @Test
+  public void testSubmitReservation() throws Exception {
+    LOG.info("Test FederationClientInterceptor : SubmitReservation request.");
+
+    // get new reservationId
+    GetNewReservationRequest request = GetNewReservationRequest.newInstance();
+    GetNewReservationResponse response = 
interceptor.getNewReservation(request);
+    Assert.assertNotNull(response);
+
+    // allow plan follower to synchronize, manually trigger an assignment
+    Map<SubClusterId, MockRM> mockRMs = interceptor.getMockRMs();
+    for (MockRM mockRM : mockRMs.values()) {
+      ReservationSystem reservationSystem = mockRM.getReservationSystem();
+      reservationSystem.synchronizePlan("root.decided", true);
+    }
+
+    // Submit Reservation
+    ReservationId reservationId = response.getReservationId();
+    ReservationDefinition rDefinition = createReservationDefinition(1024, 1);
+    ReservationSubmissionRequest rSubmissionRequest = 
ReservationSubmissionRequest.newInstance(
+        rDefinition, "decided", reservationId);
+
+    ReservationSubmissionResponse submissionResponse =
+        interceptor.submitReservation(rSubmissionRequest);
+    Assert.assertNotNull(submissionResponse);
+
+    SubClusterId subClusterId = 
stateStoreUtil.queryReservationHomeSC(reservationId);
+    Assert.assertNotNull(subClusterId);
+    Assert.assertTrue(subClusters.contains(subClusterId));
+  }
+
+  @Test
+  public void testSubmitReservationEmptyRequest() throws Exception {
+    LOG.info("Test FederationClientInterceptor : SubmitReservation request 
empty.");
+
+    // null request1
+    LambdaTestUtils.intercept(YarnException.class,
+        "Missing submitReservation request or reservationId or reservation 
definition or queue.",
+        () -> interceptor.submitReservation(null));
+
+    // null request2
+    LambdaTestUtils.intercept(YarnException.class,
+        "Missing submitReservation request or reservationId or reservation 
definition or queue.",
+        () -> interceptor.submitReservation(
+        ReservationSubmissionRequest.newInstance(null, null, null)));

Review Comment:
   indentation is not correct



##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java:
##########
@@ -888,13 +890,127 @@ public MoveApplicationAcrossQueuesResponse 
moveApplicationAcrossQueues(
   @Override
   public GetNewReservationResponse getNewReservation(
       GetNewReservationRequest request) throws YarnException, IOException {
-    throw new NotImplementedException("Code is not implemented");
+
+    if (request == null) {
+      routerMetrics.incrGetNewReservationFailedRetrieved();
+      String errMsg = "Missing getNewReservation request.";
+      RouterServerUtil.logAndThrowException(errMsg, null);
+    }
+
+    long startTime = clock.getTime();
+    Map<SubClusterId, SubClusterInfo> subClustersActive =
+        federationFacade.getSubClusters(true);
+
+    for (int i = 0; i < numSubmitRetries; ++i) {
+      SubClusterId subClusterId = getRandomActiveSubCluster(subClustersActive);
+      LOG.info("getNewReservation try #{} on SubCluster {}.", i, subClusterId);
+      ApplicationClientProtocol clientRMProxy = 
getClientRMProxyForSubCluster(subClusterId);
+      GetNewReservationResponse response = null;
+      try {
+        response = clientRMProxy.getNewReservation(request);
+        if (response != null) {
+          long stopTime = clock.getTime();
+          routerMetrics.succeededGetNewReservationRetrieved(stopTime - 
startTime);
+          return response;
+        }
+      } catch (Exception e) {
+        LOG.warn("Unable to create a new Reservation in SubCluster {}.", 
subClusterId.getId(), e);
+        subClustersActive.remove(subClusterId);
+      }
+    }
+
+    routerMetrics.incrGetNewReservationFailedRetrieved();
+    String errMsg = "Failed to create a new reservation.";
+    throw new YarnException(errMsg);
   }
 
   @Override
   public ReservationSubmissionResponse submitReservation(
       ReservationSubmissionRequest request) throws YarnException, IOException {
-    throw new NotImplementedException("Code is not implemented");
+
+    if (request == null || request.getReservationId() == null
+            || request.getReservationDefinition() == null || 
request.getQueue() == null) {
+      routerMetrics.incrSubmitReservationFailedRetrieved();
+      RouterServerUtil.logAndThrowException(
+          "Missing submitReservation request or reservationId " +
+               "or reservation definition or queue.", null);
+    }
+
+    long startTime = clock.getTime();
+    ReservationId reservationId = request.getReservationId();
+
+    long retryCount = 0;
+    boolean firstRetry = true;
+
+    while (retryCount < numSubmitRetries) {
+
+      SubClusterId subClusterId = 
policyFacade.getReservationHomeSubCluster(request);
+      LOG.info("submitReservation reservationId {} try #{} on SubCluster {}.",
+          reservationId, retryCount, subClusterId);
+
+      ReservationHomeSubCluster reservationHomeSubCluster =
+          ReservationHomeSubCluster.newInstance(reservationId, subClusterId);
+
+      // If it is the first attempt,use StateStore to add the
+      // mapping of reservationId and subClusterId.
+      // if the number of attempts is greater than 1, use StateStore to update 
the mapping.
+      if (firstRetry) {
+        try {
+          // persist the mapping of reservationId and the subClusterId which 
has
+          // been selected as its home
+          subClusterId = 
federationFacade.addReservationHomeSubCluster(reservationHomeSubCluster);
+          firstRetry = false;
+        } catch (YarnException e) {
+          routerMetrics.incrSubmitReservationFailedRetrieved();
+          RouterServerUtil.logAndThrowException(e,
+              "Unable to insert the ReservationId %s into the 
FederationStateStore.",
+                   reservationId);
+        }
+      } else {
+        try {
+          // update the mapping of reservationId and the home subClusterId to
+          // the new subClusterId we have selected
+          
federationFacade.updateReservationHomeSubCluster(reservationHomeSubCluster);
+        } catch (YarnException e) {
+          SubClusterId subClusterIdInStateStore =
+              federationFacade.getReservationHomeSubCluster(reservationId);
+          if (subClusterId == subClusterIdInStateStore) {
+            LOG.info("Reservation {} already submitted on SubCluster {}.",
+                reservationId, subClusterId);
+          } else {
+            routerMetrics.incrSubmitReservationFailedRetrieved();
+            RouterServerUtil.logAndThrowException(e,
+                "Unable to update the ReservationId %s into the 
FederationStateStore.",
+                     reservationId);
+          }
+        }
+      }
+
+      // Obtain the ApplicationClientProtocol of the corresponding RM 
according to the subClusterId,
+      // and call the submitReservation method, If the request is responded to,
+      // If the request is responded, it will return directly, otherwise 
retryCount+1,
+      // continue to submit other request.
+      try {
+        ApplicationClientProtocol clientRMProxy = 
getClientRMProxyForSubCluster(subClusterId);
+        ReservationSubmissionResponse response = 
clientRMProxy.submitReservation(request);
+        if (response != null) {
+          LOG.info("Reservation {} submitted on {}.", 
request.getReservationId(), subClusterId);

Review Comment:
   reservationId is extracted already



##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java:
##########
@@ -1254,4 +1275,250 @@ public void testNodesToAttributes() throws Exception {
         NodeAttributeType.STRING, "nvida");
     Assert.assertTrue(nodeAttributeMap.get("0-host1").contains(gpu));
   }
+
+  @Test
+  public void testGetNewReservation() throws Exception {
+    LOG.info("Test FederationClientInterceptor : Get NewReservation request.");
+
+    // null request
+    LambdaTestUtils.intercept(YarnException.class,
+        "Missing getNewReservation request.", () -> 
interceptor.getNewReservation(null));
+
+    // normal request
+    GetNewReservationRequest request = GetNewReservationRequest.newInstance();
+    GetNewReservationResponse response = 
interceptor.getNewReservation(request);
+    Assert.assertNotNull(response);
+
+    ReservationId reservationId = response.getReservationId();
+    Assert.assertNotNull(reservationId);
+    Assert.assertTrue(reservationId.toString().contains("reservation"));
+    Assert.assertEquals(reservationId.getClusterTimestamp(), 
ResourceManager.getClusterTimeStamp());
+  }
+
+  @Test
+  public void testSubmitReservation() throws Exception {
+    LOG.info("Test FederationClientInterceptor : SubmitReservation request.");
+
+    // get new reservationId
+    GetNewReservationRequest request = GetNewReservationRequest.newInstance();
+    GetNewReservationResponse response = 
interceptor.getNewReservation(request);
+    Assert.assertNotNull(response);
+
+    // allow plan follower to synchronize, manually trigger an assignment
+    Map<SubClusterId, MockRM> mockRMs = interceptor.getMockRMs();
+    for (MockRM mockRM : mockRMs.values()) {
+      ReservationSystem reservationSystem = mockRM.getReservationSystem();
+      reservationSystem.synchronizePlan("root.decided", true);
+    }
+
+    // Submit Reservation
+    ReservationId reservationId = response.getReservationId();
+    ReservationDefinition rDefinition = createReservationDefinition(1024, 1);
+    ReservationSubmissionRequest rSubmissionRequest = 
ReservationSubmissionRequest.newInstance(
+        rDefinition, "decided", reservationId);
+
+    ReservationSubmissionResponse submissionResponse =
+        interceptor.submitReservation(rSubmissionRequest);
+    Assert.assertNotNull(submissionResponse);
+
+    SubClusterId subClusterId = 
stateStoreUtil.queryReservationHomeSC(reservationId);
+    Assert.assertNotNull(subClusterId);
+    Assert.assertTrue(subClusters.contains(subClusterId));
+  }
+
+  @Test
+  public void testSubmitReservationEmptyRequest() throws Exception {
+    LOG.info("Test FederationClientInterceptor : SubmitReservation request 
empty.");
+
+    // null request1
+    LambdaTestUtils.intercept(YarnException.class,
+        "Missing submitReservation request or reservationId or reservation 
definition or queue.",
+        () -> interceptor.submitReservation(null));
+
+    // null request2
+    LambdaTestUtils.intercept(YarnException.class,
+        "Missing submitReservation request or reservationId or reservation 
definition or queue.",
+        () -> interceptor.submitReservation(
+        ReservationSubmissionRequest.newInstance(null, null, null)));
+
+    // null request3
+    ReservationSubmissionRequest request3 =
+        ReservationSubmissionRequest.newInstance(null, "q1", null);
+    LambdaTestUtils.intercept(YarnException.class,
+        "Missing submitReservation request or reservationId or reservation 
definition or queue.",
+        () -> interceptor.submitReservation(request3));
+
+    // null request4
+    ReservationId reservationId = ReservationId.newInstance(Time.now(), 1);
+    ReservationSubmissionRequest request4 =
+        ReservationSubmissionRequest.newInstance(null, null,  reservationId);
+    LambdaTestUtils.intercept(YarnException.class,
+        "Missing submitReservation request or reservationId or reservation 
definition or queue.",
+        () -> interceptor.submitReservation(request4));
+
+    // null request5
+    long defaultDuration = 600000;
+    long arrival = Time.now();
+    long deadline = arrival + (int)(defaultDuration * 1.1);
+
+    ReservationRequest rRequest = ReservationRequest.newInstance(
+        Resource.newInstance(1024, 1), 1, 1, defaultDuration);
+    ReservationRequest[] rRequests = new ReservationRequest[] {rRequest};
+    ReservationDefinition rDefinition = createReservationDefinition(arrival, 
deadline, rRequests,
+        ReservationRequestInterpreter.R_ALL, "u1");
+    ReservationSubmissionRequest request5 =
+        ReservationSubmissionRequest.newInstance(rDefinition, null,  
reservationId);
+    LambdaTestUtils.intercept(YarnException.class,
+        "Missing submitReservation request or reservationId or reservation 
definition or queue.",
+        () -> interceptor.submitReservation(request5));
+  }
+
+  @Test
+  public void testSubmitReservationMultipleSubmission() throws Exception {
+    LOG.info("Test FederationClientInterceptor: Submit Reservation - 
Multiple");
+
+    // get new reservationId
+    GetNewReservationRequest request = GetNewReservationRequest.newInstance();
+    GetNewReservationResponse response = 
interceptor.getNewReservation(request);
+    Assert.assertNotNull(response);
+
+    // allow plan follower to synchronize, manually trigger an assignment
+    Map<SubClusterId, MockRM> mockRMs = interceptor.getMockRMs();
+    for (MockRM mockRM : mockRMs.values()) {
+      ReservationSystem reservationSystem = mockRM.getReservationSystem();
+      reservationSystem.synchronizePlan("root.decided", true);
+    }
+
+    // First Submit Reservation
+    ReservationId reservationId = response.getReservationId();
+    ReservationDefinition rDefinition = createReservationDefinition(1024, 1);
+    ReservationSubmissionRequest rSubmissionRequest = 
ReservationSubmissionRequest.newInstance(
+        rDefinition, "decided", reservationId);
+    ReservationSubmissionResponse submissionResponse =
+        interceptor.submitReservation(rSubmissionRequest);
+    Assert.assertNotNull(submissionResponse);
+
+    SubClusterId subClusterId1 = 
stateStoreUtil.queryReservationHomeSC(reservationId);
+    Assert.assertNotNull(subClusterId1);
+    Assert.assertTrue(subClusters.contains(subClusterId1));
+
+    // First Retry
+    ReservationSubmissionResponse submissionResponse1 =
+        interceptor.submitReservation(rSubmissionRequest);
+    Assert.assertNotNull(submissionResponse1);
+    SubClusterId subClusterId2 = 
stateStoreUtil.queryReservationHomeSC(reservationId);
+    Assert.assertNotNull(subClusterId2);
+    Assert.assertEquals(subClusterId1, subClusterId2);
+  }
+
+  @Test
+  public void testUpdateReservation() throws Exception {
+    LOG.info("Test FederationClientInterceptor : UpdateReservation request.");
+
+    // get new reservationId
+    GetNewReservationRequest request = GetNewReservationRequest.newInstance();
+    GetNewReservationResponse response = 
interceptor.getNewReservation(request);
+    Assert.assertNotNull(response);
+
+    // allow plan follower to synchronize, manually trigger an assignment
+    Map<SubClusterId, MockRM> mockRMs = interceptor.getMockRMs();
+    for (MockRM mockRM : mockRMs.values()) {
+      ReservationSystem reservationSystem = mockRM.getReservationSystem();
+      reservationSystem.synchronizePlan("root.decided", true);
+    }
+
+    // Submit Reservation
+    ReservationId reservationId = response.getReservationId();
+    ReservationDefinition rDefinition = createReservationDefinition(1024, 1);
+    ReservationSubmissionRequest rSubmissionRequest = 
ReservationSubmissionRequest.newInstance(
+        rDefinition, "decided", reservationId);
+
+    ReservationSubmissionResponse submissionResponse =
+        interceptor.submitReservation(rSubmissionRequest);
+    Assert.assertNotNull(submissionResponse);
+
+    // Update Reservation
+    ReservationDefinition rDefinition2 = createReservationDefinition(2048, 1);
+    ReservationUpdateRequest updateRequest =
+        ReservationUpdateRequest.newInstance(rDefinition2, reservationId);
+    ReservationUpdateResponse updateResponse =
+        interceptor.updateReservation(updateRequest);
+    Assert.assertNotNull(updateResponse);
+
+    SubClusterId subClusterId = 
stateStoreUtil.queryReservationHomeSC(reservationId);
+    Assert.assertNotNull(subClusterId);
+  }
+
+  @Test
+  public void testDeleteReservation() throws Exception {
+    LOG.info("Test FederationClientInterceptor : DeleteReservation request.");
+
+    // get new reservationId
+    GetNewReservationRequest request = GetNewReservationRequest.newInstance();
+    GetNewReservationResponse response = 
interceptor.getNewReservation(request);
+    Assert.assertNotNull(response);
+
+    // allow plan follower to synchronize, manually trigger an assignment
+    Map<SubClusterId, MockRM> mockRMs = interceptor.getMockRMs();
+    for (MockRM mockRM : mockRMs.values()) {
+      ReservationSystem reservationSystem = mockRM.getReservationSystem();
+      reservationSystem.synchronizePlan("root.decided", true);
+    }
+
+    // Submit Reservation
+    ReservationId reservationId = response.getReservationId();
+    ReservationDefinition rDefinition = createReservationDefinition(1024, 1);
+    ReservationSubmissionRequest rSubmissionRequest = 
ReservationSubmissionRequest.newInstance(
+        rDefinition, "decided", reservationId);
+
+    ReservationSubmissionResponse submissionResponse =
+        interceptor.submitReservation(rSubmissionRequest);
+    Assert.assertNotNull(submissionResponse);
+
+    // Delete Reservation
+    ReservationDeleteRequest deleteRequest = 
ReservationDeleteRequest.newInstance(reservationId);
+    ReservationDeleteResponse deleteResponse = 
interceptor.deleteReservation(deleteRequest);
+    Assert.assertNotNull(deleteResponse);
+
+    LambdaTestUtils.intercept(YarnException.class,
+        "Reservation " + reservationId + " does not exist",
+        () -> stateStoreUtil.queryReservationHomeSC(reservationId));
+  }
+
+
+  private ReservationDefinition createReservationDefinition(int memory, int 
core) {
+    // get reservationId
+    long defaultDuration = 600000;

Review Comment:
   constant final etc



##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java:
##########
@@ -1254,4 +1275,250 @@ public void testNodesToAttributes() throws Exception {
         NodeAttributeType.STRING, "nvida");
     Assert.assertTrue(nodeAttributeMap.get("0-host1").contains(gpu));
   }
+
+  @Test
+  public void testGetNewReservation() throws Exception {
+    LOG.info("Test FederationClientInterceptor : Get NewReservation request.");
+
+    // null request
+    LambdaTestUtils.intercept(YarnException.class,
+        "Missing getNewReservation request.", () -> 
interceptor.getNewReservation(null));
+
+    // normal request
+    GetNewReservationRequest request = GetNewReservationRequest.newInstance();
+    GetNewReservationResponse response = 
interceptor.getNewReservation(request);
+    Assert.assertNotNull(response);
+
+    ReservationId reservationId = response.getReservationId();
+    Assert.assertNotNull(reservationId);
+    Assert.assertTrue(reservationId.toString().contains("reservation"));
+    Assert.assertEquals(reservationId.getClusterTimestamp(), 
ResourceManager.getClusterTimeStamp());
+  }
+
+  @Test
+  public void testSubmitReservation() throws Exception {
+    LOG.info("Test FederationClientInterceptor : SubmitReservation request.");
+
+    // get new reservationId
+    GetNewReservationRequest request = GetNewReservationRequest.newInstance();
+    GetNewReservationResponse response = 
interceptor.getNewReservation(request);
+    Assert.assertNotNull(response);
+
+    // allow plan follower to synchronize, manually trigger an assignment
+    Map<SubClusterId, MockRM> mockRMs = interceptor.getMockRMs();
+    for (MockRM mockRM : mockRMs.values()) {
+      ReservationSystem reservationSystem = mockRM.getReservationSystem();
+      reservationSystem.synchronizePlan("root.decided", true);
+    }
+
+    // Submit Reservation
+    ReservationId reservationId = response.getReservationId();
+    ReservationDefinition rDefinition = createReservationDefinition(1024, 1);
+    ReservationSubmissionRequest rSubmissionRequest = 
ReservationSubmissionRequest.newInstance(
+        rDefinition, "decided", reservationId);
+
+    ReservationSubmissionResponse submissionResponse =
+        interceptor.submitReservation(rSubmissionRequest);
+    Assert.assertNotNull(submissionResponse);
+
+    SubClusterId subClusterId = 
stateStoreUtil.queryReservationHomeSC(reservationId);
+    Assert.assertNotNull(subClusterId);
+    Assert.assertTrue(subClusters.contains(subClusterId));
+  }
+
+  @Test
+  public void testSubmitReservationEmptyRequest() throws Exception {
+    LOG.info("Test FederationClientInterceptor : SubmitReservation request 
empty.");
+
+    // null request1
+    LambdaTestUtils.intercept(YarnException.class,
+        "Missing submitReservation request or reservationId or reservation 
definition or queue.",
+        () -> interceptor.submitReservation(null));
+
+    // null request2
+    LambdaTestUtils.intercept(YarnException.class,
+        "Missing submitReservation request or reservationId or reservation 
definition or queue.",
+        () -> interceptor.submitReservation(
+        ReservationSubmissionRequest.newInstance(null, null, null)));
+
+    // null request3
+    ReservationSubmissionRequest request3 =
+        ReservationSubmissionRequest.newInstance(null, "q1", null);
+    LambdaTestUtils.intercept(YarnException.class,
+        "Missing submitReservation request or reservationId or reservation 
definition or queue.",
+        () -> interceptor.submitReservation(request3));
+
+    // null request4
+    ReservationId reservationId = ReservationId.newInstance(Time.now(), 1);
+    ReservationSubmissionRequest request4 =
+        ReservationSubmissionRequest.newInstance(null, null,  reservationId);
+    LambdaTestUtils.intercept(YarnException.class,
+        "Missing submitReservation request or reservationId or reservation 
definition or queue.",
+        () -> interceptor.submitReservation(request4));
+
+    // null request5
+    long defaultDuration = 600000;
+    long arrival = Time.now();
+    long deadline = arrival + (int)(defaultDuration * 1.1);
+
+    ReservationRequest rRequest = ReservationRequest.newInstance(
+        Resource.newInstance(1024, 1), 1, 1, defaultDuration);
+    ReservationRequest[] rRequests = new ReservationRequest[] {rRequest};
+    ReservationDefinition rDefinition = createReservationDefinition(arrival, 
deadline, rRequests,
+        ReservationRequestInterpreter.R_ALL, "u1");
+    ReservationSubmissionRequest request5 =
+        ReservationSubmissionRequest.newInstance(rDefinition, null,  
reservationId);
+    LambdaTestUtils.intercept(YarnException.class,
+        "Missing submitReservation request or reservationId or reservation 
definition or queue.",
+        () -> interceptor.submitReservation(request5));
+  }
+
+  @Test
+  public void testSubmitReservationMultipleSubmission() throws Exception {
+    LOG.info("Test FederationClientInterceptor: Submit Reservation - 
Multiple");
+
+    // get new reservationId
+    GetNewReservationRequest request = GetNewReservationRequest.newInstance();
+    GetNewReservationResponse response = 
interceptor.getNewReservation(request);
+    Assert.assertNotNull(response);
+
+    // allow plan follower to synchronize, manually trigger an assignment
+    Map<SubClusterId, MockRM> mockRMs = interceptor.getMockRMs();
+    for (MockRM mockRM : mockRMs.values()) {
+      ReservationSystem reservationSystem = mockRM.getReservationSystem();
+      reservationSystem.synchronizePlan("root.decided", true);
+    }
+
+    // First Submit Reservation
+    ReservationId reservationId = response.getReservationId();
+    ReservationDefinition rDefinition = createReservationDefinition(1024, 1);
+    ReservationSubmissionRequest rSubmissionRequest = 
ReservationSubmissionRequest.newInstance(
+        rDefinition, "decided", reservationId);
+    ReservationSubmissionResponse submissionResponse =
+        interceptor.submitReservation(rSubmissionRequest);
+    Assert.assertNotNull(submissionResponse);
+
+    SubClusterId subClusterId1 = 
stateStoreUtil.queryReservationHomeSC(reservationId);
+    Assert.assertNotNull(subClusterId1);
+    Assert.assertTrue(subClusters.contains(subClusterId1));
+
+    // First Retry
+    ReservationSubmissionResponse submissionResponse1 =
+        interceptor.submitReservation(rSubmissionRequest);
+    Assert.assertNotNull(submissionResponse1);
+    SubClusterId subClusterId2 = 
stateStoreUtil.queryReservationHomeSC(reservationId);
+    Assert.assertNotNull(subClusterId2);
+    Assert.assertEquals(subClusterId1, subClusterId2);
+  }
+
+  @Test
+  public void testUpdateReservation() throws Exception {
+    LOG.info("Test FederationClientInterceptor : UpdateReservation request.");
+
+    // get new reservationId
+    GetNewReservationRequest request = GetNewReservationRequest.newInstance();
+    GetNewReservationResponse response = 
interceptor.getNewReservation(request);
+    Assert.assertNotNull(response);
+
+    // allow plan follower to synchronize, manually trigger an assignment
+    Map<SubClusterId, MockRM> mockRMs = interceptor.getMockRMs();
+    for (MockRM mockRM : mockRMs.values()) {
+      ReservationSystem reservationSystem = mockRM.getReservationSystem();
+      reservationSystem.synchronizePlan("root.decided", true);
+    }
+
+    // Submit Reservation
+    ReservationId reservationId = response.getReservationId();
+    ReservationDefinition rDefinition = createReservationDefinition(1024, 1);
+    ReservationSubmissionRequest rSubmissionRequest = 
ReservationSubmissionRequest.newInstance(
+        rDefinition, "decided", reservationId);
+
+    ReservationSubmissionResponse submissionResponse =
+        interceptor.submitReservation(rSubmissionRequest);
+    Assert.assertNotNull(submissionResponse);
+
+    // Update Reservation
+    ReservationDefinition rDefinition2 = createReservationDefinition(2048, 1);
+    ReservationUpdateRequest updateRequest =
+        ReservationUpdateRequest.newInstance(rDefinition2, reservationId);
+    ReservationUpdateResponse updateResponse =
+        interceptor.updateReservation(updateRequest);
+    Assert.assertNotNull(updateResponse);
+
+    SubClusterId subClusterId = 
stateStoreUtil.queryReservationHomeSC(reservationId);
+    Assert.assertNotNull(subClusterId);
+  }
+
+  @Test
+  public void testDeleteReservation() throws Exception {
+    LOG.info("Test FederationClientInterceptor : DeleteReservation request.");
+
+    // get new reservationId
+    GetNewReservationRequest request = GetNewReservationRequest.newInstance();
+    GetNewReservationResponse response = 
interceptor.getNewReservation(request);
+    Assert.assertNotNull(response);
+
+    // allow plan follower to synchronize, manually trigger an assignment
+    Map<SubClusterId, MockRM> mockRMs = interceptor.getMockRMs();
+    for (MockRM mockRM : mockRMs.values()) {
+      ReservationSystem reservationSystem = mockRM.getReservationSystem();
+      reservationSystem.synchronizePlan("root.decided", true);
+    }
+
+    // Submit Reservation
+    ReservationId reservationId = response.getReservationId();
+    ReservationDefinition rDefinition = createReservationDefinition(1024, 1);
+    ReservationSubmissionRequest rSubmissionRequest = 
ReservationSubmissionRequest.newInstance(
+        rDefinition, "decided", reservationId);
+
+    ReservationSubmissionResponse submissionResponse =
+        interceptor.submitReservation(rSubmissionRequest);
+    Assert.assertNotNull(submissionResponse);
+
+    // Delete Reservation
+    ReservationDeleteRequest deleteRequest = 
ReservationDeleteRequest.newInstance(reservationId);
+    ReservationDeleteResponse deleteResponse = 
interceptor.deleteReservation(deleteRequest);
+    Assert.assertNotNull(deleteResponse);
+
+    LambdaTestUtils.intercept(YarnException.class,
+        "Reservation " + reservationId + " does not exist",
+        () -> stateStoreUtil.queryReservationHomeSC(reservationId));
+  }
+
+
+  private ReservationDefinition createReservationDefinition(int memory, int 
core) {
+    // get reservationId
+    long defaultDuration = 600000;

Review Comment:
   10 * 60 * 1000



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to