slfan1989 commented on code in PR #4817:
URL: https://github.com/apache/hadoop/pull/4817#discussion_r961299942


##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java:
##########
@@ -1016,30 +1037,342 @@ private static byte[] getByteArray(ByteBuffer bb) {
   @Override
   public AddReservationHomeSubClusterResponse addReservationHomeSubCluster(
       AddReservationHomeSubClusterRequest request) throws YarnException {
-    throw new NotImplementedException("Code is not implemented");
+
+    // validate
+    FederationReservationHomeSubClusterStoreInputValidator.validate(request);
+    CallableStatement cstmt = null;
+
+    ReservationHomeSubCluster reservationHomeSubCluster = 
request.getReservationHomeSubCluster();
+    ReservationId reservationId = reservationHomeSubCluster.getReservationId();
+    SubClusterId subClusterId = reservationHomeSubCluster.getHomeSubCluster();
+    SubClusterId subClusterHomeId = null;
+
+    try {
+
+      // Defined the sp_addReservationHomeSubCluster procedure
+      // this procedure requires 4 parameters
+      // Input parameters
+      // 1)IN reservationId_IN varchar(128)
+      // 2)IN homeSubCluster_IN varchar(256)
+      // Output parameters
+      // 3)OUT storedHomeSubCluster_OUT varchar(256)
+      // 4)OUT rowCount_OUT int
+
+      // Call procedure
+      cstmt = getCallableStatement(CALL_SP_ADD_RESERVATION_HOME_SUBCLUSTER);
+
+      // Set the parameters for the stored procedure
+      // 1)IN reservationId_IN varchar(128)
+      cstmt.setString("reservationId_IN", reservationId.toString());
+      // 2)IN homeSubCluster_IN varchar(256)
+      cstmt.setString("homeSubCluster_IN", subClusterId.getId());
+      // 3) OUT storedHomeSubCluster_OUT varchar(256)
+      cstmt.registerOutParameter("storedHomeSubCluster_OUT", 
java.sql.Types.VARCHAR);
+      // 4) OUT rowCount_OUT int
+      cstmt.registerOutParameter("rowCount_OUT", java.sql.Types.INTEGER);
+
+      // Execute the query
+      long startTime = clock.getTime();
+      cstmt.executeUpdate();
+      long stopTime = clock.getTime();
+
+      // Get SubClusterHome
+      String subClusterHomeIdString = 
cstmt.getString("storedHomeSubCluster_OUT");
+      subClusterHomeId = SubClusterId.newInstance(subClusterHomeIdString);
+
+      // Get rowCount
+      int rowCount = cstmt.getInt("rowCount_OUT");
+
+      // For failover reason, we check the returned subClusterId.
+      // 1.If it is equal to the subClusterId we sent, the call added the new
+      // reservation into FederationStateStore.
+      // 2.If the call returns a different subClusterId
+      // it means we already tried to insert this reservation
+      // but a component (Router/StateStore/RM) failed during the submission.
+      if (subClusterId.equals(subClusterHomeId)) {
+        // if it is equal to 0
+        // it means the call did not add a new reservation into 
FederationStateStore.
+        if (rowCount == 0) {
+          LOG.info("The reservation {} was not inserted in the StateStore 
because it" +
+              " was already present in subCluster {}", reservationId, 
subClusterHomeId);
+        } else if (rowCount != 1) {
+          // if it is different from 1
+          // it means the call had a wrong behavior. Maybe the database is not 
set correctly.
+          FederationStateStoreUtils.logAndThrowStoreException(LOG,
+              "Wrong behavior during the insertion of subCluster %s according 
to reservation %s. " +
+              "The database expects to insert 1 record, but the number of " +
+              "inserted changes is greater than 1, " +
+              "please check the records of the database.",
+              subClusterId, reservationId);
+        }
+      } else {
+        // If it is different from 0,
+        // it means that there is a data situation that does not meet the 
expectations,
+        // and an exception should be thrown at this time
+        if (rowCount != 0) {
+          FederationStateStoreUtils.logAndThrowStoreException(LOG,
+              "The reservation %s does exist but was overwritten.", 
reservationId);
+        }
+        LOG.info("Reservation: {} already present with subCluster: {}.",
+            reservationId, subClusterHomeId);
+      }
+
+      // Record successful call time
+      FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - 
startTime);
+    } catch (SQLException e) {
+      FederationStateStoreClientMetrics.failedStateStoreCall();
+      FederationStateStoreUtils.logAndThrowRetriableException(e, LOG,
+          "Unable to insert the newly generated reservation %s to subCluster 
%s.",
+          reservationId, subClusterId);
+    } finally {
+      // Return to the pool the CallableStatement
+      FederationStateStoreUtils.returnToPool(LOG, cstmt);
+    }
+
+    return AddReservationHomeSubClusterResponse.newInstance(subClusterHomeId);
   }
 
   @Override
   public GetReservationHomeSubClusterResponse getReservationHomeSubCluster(
       GetReservationHomeSubClusterRequest request) throws YarnException {
-    throw new NotImplementedException("Code is not implemented");
+    // validate
+    FederationReservationHomeSubClusterStoreInputValidator.validate(request);
+
+    CallableStatement cstmt = null;
+    ReservationId reservationId = request.getReservationId();
+    SubClusterId subClusterId = null;
+
+    try {
+
+      // Defined the sp_getReservationHomeSubCluster procedure
+      // this procedure requires 2 parameters
+      // Input parameters
+      // 1)IN reservationId_IN varchar(128)
+      // Output parameters
+      // 2)OUT homeSubCluster_OUT varchar(256)
+
+      cstmt = getCallableStatement(CALL_SP_GET_RESERVATION_HOME_SUBCLUSTER);
+
+      // Set the parameters for the stored procedure
+      // 1)IN reservationId_IN varchar(128)
+      cstmt.setString("reservationId_IN", reservationId.toString());
+      // 2)OUT homeSubCluster_OUT varchar(256)
+      cstmt.registerOutParameter("homeSubCluster_OUT", java.sql.Types.VARCHAR);
+
+      // Execute the query
+      long startTime = clock.getTime();
+      cstmt.execute();
+      long stopTime = clock.getTime();
+
+      // Get Result
+      String subClusterHomeIdString = cstmt.getString("homeSubCluster_OUT");
+
+      if (StringUtils.isNotBlank(subClusterHomeIdString)) {
+        subClusterId = SubClusterId.newInstance(subClusterHomeIdString);
+      } else {
+        // If subClusterHomeIdString blank, we need to throw an exception
+        FederationStateStoreUtils.logAndThrowRetriableException(LOG,
+            "Reservation %s does not exist", reservationId);
+      }
+
+      LOG.info("Got the information about the specified reservation {} in 
subCluster = {}.",
+          reservationId, subClusterId);
+      FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - 
startTime);
+    } catch (SQLException e) {
+      FederationStateStoreClientMetrics.failedStateStoreCall();
+      FederationStateStoreUtils.logAndThrowRetriableException(e, LOG,
+          "Unable to obtain the reservation information according to %s.", 
reservationId);
+    } finally {
+      // Return to the pool the CallableStatement
+      FederationStateStoreUtils.returnToPool(LOG, cstmt);
+    }
+
+    ReservationHomeSubCluster homeSubCluster =
+        ReservationHomeSubCluster.newInstance(reservationId, subClusterId);
+    return GetReservationHomeSubClusterResponse.newInstance(homeSubCluster);
   }
 
   @Override
   public GetReservationsHomeSubClusterResponse getReservationsHomeSubCluster(
       GetReservationsHomeSubClusterRequest request) throws YarnException {
-    throw new NotImplementedException("Code is not implemented");
+    CallableStatement cstmt = null;
+    ResultSet rs = null;
+    List<ReservationHomeSubCluster> reservationsHomeSubClusters = new 
ArrayList<>();
+
+    try {
+
+      // Defined the sp_getReservationsHomeSubCluster procedure
+      // This procedure requires no input parameters, but will have 2 output 
parameters
+      // Output parameters
+      // 1)OUT reservationId
+      // 2)OUT homeSubCluster
+
+      cstmt = getCallableStatement(CALL_SP_GET_RESERVATIONS_HOME_SUBCLUSTER);
+
+      // Execute the query
+      long startTime = clock.getTime();
+      rs = cstmt.executeQuery();
+      long stopTime = clock.getTime();
+
+      while (rs.next()) {

Review Comment:
   This api can help to traverse the query results returned by the database, we 
can get the results line by line.
   
   - Method Detail
   Moves the cursor forward one row from its current position. A ResultSet 
cursor is initially positioned before the first row; the first call to the 
method next makes the first row the current row; the second call makes the 
second row the current row, and so on.
   When a call to the next method returns false, the cursor is positioned after 
the last row. 
   
   - Returns
   true if the new current row is valid; false if there are no more rows.
   
   - Throws
   SQLException - if a database access error occurs or this method is called on 
a closed result set.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to