goiri commented on code in PR #4764:
URL: https://github.com/apache/hadoop/pull/4764#discussion_r957553830


##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java:
##########
@@ -888,13 +890,88 @@ public MoveApplicationAcrossQueuesResponse 
moveApplicationAcrossQueues(
   @Override
   public GetNewReservationResponse getNewReservation(
       GetNewReservationRequest request) throws YarnException, IOException {
-    throw new NotImplementedException("Code is not implemented");
+
+    if (request == null) {
+      routerMetrics.incrGetNewReservationFailedRetrieved();
+      String errMsg = "Missing getNewReservation request.";
+      RouterServerUtil.logAndThrowException(errMsg, null);
+    }
+
+    long startTime = clock.getTime();
+    Map<SubClusterId, SubClusterInfo> subClustersActive = 
federationFacade.getSubClusters(true);
+
+    for (int i = 0; i < numSubmitRetries; ++i) {
+      SubClusterId subClusterId = getRandomActiveSubCluster(subClustersActive);
+      LOG.info("getNewReservation try #{} on SubCluster {}.", i, subClusterId);
+      ApplicationClientProtocol clientRMProxy = 
getClientRMProxyForSubCluster(subClusterId);
+      try {
+        GetNewReservationResponse response = 
clientRMProxy.getNewReservation(request);
+        if (response != null) {
+          long stopTime = clock.getTime();
+          routerMetrics.succeededGetNewReservationRetrieved(stopTime - 
startTime);
+          return response;
+        }
+      } catch (Exception e) {
+        LOG.warn("Unable to create a new Reservation in SubCluster {}.", 
subClusterId.getId(), e);
+        subClustersActive.remove(subClusterId);
+      }
+    }
+
+    routerMetrics.incrGetNewReservationFailedRetrieved();
+    String errMsg = "Failed to create a new reservation.";
+    throw new YarnException(errMsg);
   }
 
   @Override
   public ReservationSubmissionResponse submitReservation(
       ReservationSubmissionRequest request) throws YarnException, IOException {
-    throw new NotImplementedException("Code is not implemented");
+
+    if (request == null || request.getReservationId() == null
+        || request.getReservationDefinition() == null || request.getQueue() == 
null) {
+      routerMetrics.incrSubmitReservationFailedRetrieved();
+      RouterServerUtil.logAndThrowException(
+          "Missing submitReservation request or reservationId " +
+          "or reservation definition or queue.", null);
+    }
+
+    long startTime = clock.getTime();
+    ReservationId reservationId = request.getReservationId();
+
+    for (int i = 0; i < numSubmitRetries; i++) {
+      try {
+        // First, Get SubClusterId according to specific strategy.
+        SubClusterId subClusterId = 
policyFacade.getReservationHomeSubCluster(request);
+        LOG.info("submitReservation ReservationId {} try #{} on SubCluster 
{}.",
+            reservationId, i, subClusterId);
+        ReservationHomeSubCluster reservationHomeSubCluster =
+            ReservationHomeSubCluster.newInstance(reservationId, subClusterId);
+
+        // Second, determine whether the current ReservationId has a 
corresponding subCluster.
+        // If it does not exist, add it. If it exists, update it.
+        Boolean exists = isExistsReservationHomeSubCluster(reservationId);
+        if(!exists) {

Review Comment:
   `if (!exists) {`



##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java:
##########
@@ -1633,4 +1790,49 @@ public FederationStateStoreFacade getFederationFacade() {
   public Map<SubClusterId, ApplicationClientProtocol> getClientRMProxies() {
     return clientRMProxies;
   }
+
+  private Boolean isExistsReservationHomeSubCluster(ReservationId 
reservationId) {
+    try {
+      SubClusterId subClusterId = 
federationFacade.getReservationHomeSubCluster(reservationId);
+      if (subClusterId != null) {
+        return true;
+      }
+    } catch (YarnException e) {
+      LOG.warn("get homeSubCluster by reservationId = {} error.", 
reservationId, e);
+    }
+    return false;
+  }
+
+  private void addReservationHomeSubCluster(ReservationId reservationId,
+      ReservationHomeSubCluster homeSubCluster) throws YarnException {
+    try {
+      // persist the mapping of reservationId and the subClusterId which has
+      // been selected as its home
+      federationFacade.addReservationHomeSubCluster(homeSubCluster);
+    } catch (YarnException e) {
+      RouterServerUtil.logAndThrowException(e,
+          "Unable to insert the ReservationId %s into the 
FederationStateStore.",
+           reservationId);

Review Comment:
   Indentation.



##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java:
##########
@@ -1633,4 +1790,49 @@ public FederationStateStoreFacade getFederationFacade() {
   public Map<SubClusterId, ApplicationClientProtocol> getClientRMProxies() {
     return clientRMProxies;
   }
+
+  private Boolean isExistsReservationHomeSubCluster(ReservationId 
reservationId) {

Review Comment:
   existsReservationHomeSubCluster()



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to