slfan1989 commented on code in PR #4817:
URL: https://github.com/apache/hadoop/pull/4817#discussion_r957972961


##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java:
##########
@@ -1016,30 +1035,275 @@ private static byte[] getByteArray(ByteBuffer bb) {
   @Override
   public AddReservationHomeSubClusterResponse addReservationHomeSubCluster(
       AddReservationHomeSubClusterRequest request) throws YarnException {
-    throw new NotImplementedException("Code is not implemented");
+    // validate
+    FederationReservationHomeSubClusterStoreInputValidator.validate(request);
+    CallableStatement cstmt = null;
+
+    ReservationHomeSubCluster reservationHomeSubCluster = 
request.getReservationHomeSubCluster();
+    ReservationId reservationId = reservationHomeSubCluster.getReservationId();
+    SubClusterId subClusterId = reservationHomeSubCluster.getHomeSubCluster();
+    SubClusterId subClusterHomeId = null;
+
+    try {
+      // Call procedure
+      cstmt = getCallableStatement(CALL_SP_ADD_RESERVATION_HOME_SUBCLUSTER);
+
+      // Set the parameters for the stored procedure
+      cstmt.setString(1, reservationId.toString());
+      cstmt.setString(2, subClusterId.getId());
+      cstmt.registerOutParameter(3, java.sql.Types.VARCHAR);
+      cstmt.registerOutParameter(4, java.sql.Types.INTEGER);
+
+      // Execute the query
+      long startTime = clock.getTime();
+      cstmt.executeUpdate();
+      long stopTime = clock.getTime();
+
+      // Get SubClusterHome
+      String subClusterHomeIdString = cstmt.getString(3);
+      subClusterHomeId = SubClusterId.newInstance(subClusterHomeIdString);
+
+      // Get rowCount
+      int rowCount = cstmt.getInt(4);
+
+      // For failover reason, we check the returned subClusterId.
+      // 1.If it is equal to the subClusterId we sent, the call added the new
+      // reservation into FederationStateStore.
+      // 2.If the call returns a different subClusterId
+      // it means we already tried to insert this reservation
+      // but a component (Router/StateStore/RM) failed during the submission.
+      if (subClusterId.equals(subClusterHomeId)) {
+        // if it is equal to 0
+        // it means the call did not add a new reservation into 
FederationStateStore.
+        if (rowCount == 0) {
+          LOG.info("The reservation {} was not inserted in the StateStore 
because it" +
+              " was already present in subCluster {}", reservationId, 
subClusterHomeId);
+        } else if (rowCount != 1) {
+          // if it is different from 1
+          // it means the call had a wrong behavior. Maybe the database is not 
set correctly.
+          FederationStateStoreUtils.logAndThrowStoreException(LOG,
+              "Wrong behavior during the insertion of subCluster %s.", 
subClusterId);
+        }
+      } else {
+        // If it is different from 0,
+        // it means that there is a data situation that does not meet the 
expectations,
+        // and an exception should be thrown at this time
+        if (rowCount != 0) {
+          FederationStateStoreUtils.logAndThrowStoreException(LOG,
+              "The reservation %s does exist but was overwritten.", 
reservationId);
+        }
+        LOG.info("Reservation: {} already present with subCluster: {}.",
+            reservationId, subClusterHomeId);
+      }
+
+      // Record successful call time
+      FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - 
startTime);
+    } catch (SQLException e) {
+      FederationStateStoreClientMetrics.failedStateStoreCall();
+      FederationStateStoreUtils.logAndThrowRetriableException(e, LOG,
+          "Unable to insert the newly generated reservation %s to subCluster 
%s.",
+          reservationId, subClusterId);
+    } finally {
+      // Return to the pool the CallableStatement
+      FederationStateStoreUtils.returnToPool(LOG, cstmt);
+    }
+
+    return AddReservationHomeSubClusterResponse.newInstance(subClusterHomeId);
   }
 
   @Override
   public GetReservationHomeSubClusterResponse getReservationHomeSubCluster(
       GetReservationHomeSubClusterRequest request) throws YarnException {
-    throw new NotImplementedException("Code is not implemented");
+    // validate
+    FederationReservationHomeSubClusterStoreInputValidator.validate(request);
+
+    CallableStatement cstmt = null;
+    ReservationId reservationId = request.getReservationId();
+    SubClusterId subClusterId = null;
+
+    try {
+      cstmt = getCallableStatement(CALL_SP_GET_RESERVATION_HOME_SUBCLUSTER);
+
+      // Set the parameters for the stored procedure
+      cstmt.setString(1, reservationId.toString());
+      cstmt.registerOutParameter(2, java.sql.Types.VARCHAR);
+
+      // Execute the query
+      long startTime = clock.getTime();
+      cstmt.execute();
+      long stopTime = clock.getTime();
+
+      // Get Result
+      String subClusterHomeIdString = cstmt.getString(2);
+
+      if (StringUtils.isNotBlank(subClusterHomeIdString)) {
+        subClusterId = SubClusterId.newInstance(subClusterHomeIdString);
+      } else {
+        // If subClusterHomeIdString blank, we need to throw an exception
+        FederationStateStoreUtils.logAndThrowRetriableException(LOG,
+            "Reservation %s does not exist", reservationId);
+      }
+
+      LOG.info("Got the information about the specified reservation {} in 
subCluster = {}.",
+          reservationId, subClusterId);
+      FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - 
startTime);
+    } catch (SQLException e) {
+      FederationStateStoreClientMetrics.failedStateStoreCall();
+      FederationStateStoreUtils.logAndThrowRetriableException(e, LOG,
+          "Unable to obtain the reservation information according to %s.", 
reservationId);
+    } finally {
+      // Return to the pool the CallableStatement
+      FederationStateStoreUtils.returnToPool(LOG, cstmt);
+    }
+
+    ReservationHomeSubCluster homeSubCluster =
+        ReservationHomeSubCluster.newInstance(reservationId, subClusterId);
+    return GetReservationHomeSubClusterResponse.newInstance(homeSubCluster);
   }
 
   @Override
   public GetReservationsHomeSubClusterResponse getReservationsHomeSubCluster(
       GetReservationsHomeSubClusterRequest request) throws YarnException {
-    throw new NotImplementedException("Code is not implemented");
+    CallableStatement cstmt = null;
+    ResultSet rs = null;
+    List<ReservationHomeSubCluster> reservationsHomeSubClusters = new 
ArrayList<>();
+
+    try {
+      cstmt = getCallableStatement(CALL_SP_GET_RESERVATIONS_HOME_SUBCLUSTER);
+
+      // Execute the query
+      long startTime = clock.getTime();
+      rs = cstmt.executeQuery();
+      long stopTime = clock.getTime();
+
+      while (rs.next()) {
+        // Extract the output for each tuple
+        String dbReservationId = rs.getString(1);
+        String dbHomeSubCluster = rs.getString(2);
+
+        // Generate parameters
+        ReservationId reservationId = 
ReservationId.parseReservationId(dbReservationId);
+        SubClusterId homeSubCluster = 
SubClusterId.newInstance(dbHomeSubCluster);
+        ReservationHomeSubCluster reservationHomeSubCluster =
+            ReservationHomeSubCluster.newInstance(reservationId, 
homeSubCluster);
+        reservationsHomeSubClusters.add(reservationHomeSubCluster);
+      }
+
+      FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - 
startTime);
+    } catch (Exception e) {
+      FederationStateStoreClientMetrics.failedStateStoreCall();
+      FederationStateStoreUtils.logAndThrowRetriableException(LOG,
+          "Unable to obtain the information for all the reservations.", e);
+    } finally {
+      // Return to the pool the CallableStatement
+      FederationStateStoreUtils.returnToPool(LOG, cstmt, null, rs);
+    }
+
+    return GetReservationsHomeSubClusterResponse.newInstance(
+        reservationsHomeSubClusters);
   }
 
   @Override
   public DeleteReservationHomeSubClusterResponse 
deleteReservationHomeSubCluster(
       DeleteReservationHomeSubClusterRequest request) throws YarnException {
-    throw new NotImplementedException("Code is not implemented");
+
+    // validate
+    FederationReservationHomeSubClusterStoreInputValidator.validate(request);
+
+    CallableStatement cstmt = null;
+    ReservationId reservationId = request.getReservationId();
+
+    try {
+      cstmt = getCallableStatement(CALL_SP_DELETE_RESERVATION_HOME_SUBCLUSTER);
+
+      // Set the parameters for the stored procedure
+      cstmt.setString(1, reservationId.toString());
+      cstmt.registerOutParameter(2, java.sql.Types.INTEGER);
+
+      // Execute the query
+      long startTime = clock.getTime();
+      cstmt.executeUpdate();
+      long stopTime = clock.getTime();
+
+      int rowCount = cstmt.getInt(2);
+
+      // if it is equal to 0 it means the call
+      // did not delete the reservation from FederationStateStore
+      if (rowCount == 0) {
+        FederationStateStoreUtils.logAndThrowStoreException(LOG,
+            "Reservation %s does not exist", reservationId);
+      } else if (rowCount != 1) {
+        // if it is different from 1 it means the call
+        // had a wrong behavior. Maybe the database is not set correctly.
+        FederationStateStoreUtils.logAndThrowStoreException(LOG,
+            "Wrong behavior during deleting the reservation %s.", 
reservationId);

Review Comment:
   I will describe the exception message in more detail.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to