slfan1989 commented on code in PR #5131:
URL: https://github.com/apache/hadoop/pull/5131#discussion_r1036628806


##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/ZookeeperFederationStateStore.java:
##########
@@ -886,45 +1000,608 @@ public UpdateReservationHomeSubClusterResponse 
updateReservationHomeSubCluster(
     return UpdateReservationHomeSubClusterResponse.newInstance();
   }
 
+  /**
+   * ZookeeperFederationStateStore Supports Store NewMasterKey.
+   *
+   * @param request The request contains RouterMasterKey, which is an 
abstraction for DelegationKey
+   * @return routerMasterKeyResponse, the response contains the 
RouterMasterKey.
+   * @throws YarnException if the call to the state store is unsuccessful.
+   * @throws IOException An IO Error occurred.
+   */
   @Override
   public RouterMasterKeyResponse storeNewMasterKey(RouterMasterKeyRequest 
request)
       throws YarnException, IOException {
-    throw new NotImplementedException("Code is not implemented");
+
+    long start = clock.getTime();
+    // For the verification of the request, after passing the verification,
+    // the request and the internal objects will not be empty and can be used 
directly.
+    FederationRouterRMTokenInputValidator.validate(request);
+
+    // Parse the delegationKey from the request and get the ZK storage path.
+    DelegationKey delegationKey = convertMasterKeyToDelegationKey(request);
+    String nodeCreatePath = 
getMasterKeyZNodePathByDelegationKey(delegationKey);
+    LOG.debug("Storing RMDelegationKey_{}, ZkNodePath = {}.", 
delegationKey.getKeyId(),
+        nodeCreatePath);
+
+    // Write master key data to zk.
+    try(ByteArrayOutputStream os = new ByteArrayOutputStream();
+        DataOutputStream fsOut = new DataOutputStream(os)) {
+      delegationKey.write(fsOut);
+      put(nodeCreatePath, os.toByteArray(), false);
+    }
+
+    // Get the stored masterKey from zk.
+    RouterMasterKey masterKeyFromZK = getRouterMasterKeyFromZK(nodeCreatePath);
+    long end = clock.getTime();
+    opDurations.addStoreNewMasterKeyDuration(start, end);
+    return RouterMasterKeyResponse.newInstance(masterKeyFromZK);
   }
 
+  /**
+   * ZookeeperFederationStateStore Supports Remove MasterKey.
+   *
+   * @param request The request contains RouterMasterKey, which is an 
abstraction for DelegationKey
+   * @return routerMasterKeyResponse, the response contains the 
RouterMasterKey.
+   * @throws YarnException if the call to the state store is unsuccessful.
+   * @throws IOException An IO Error occurred.
+   */
   @Override
   public RouterMasterKeyResponse removeStoredMasterKey(RouterMasterKeyRequest 
request)
       throws YarnException, IOException {
-    throw new NotImplementedException("Code is not implemented");
+
+    long start = clock.getTime();
+    // For the verification of the request, after passing the verification,
+    // the request and the internal objects will not be empty and can be used 
directly.
+    FederationRouterRMTokenInputValidator.validate(request);
+
+    try {
+      // Parse the delegationKey from the request and get the ZK storage path.
+      RouterMasterKey masterKey = request.getRouterMasterKey();
+      DelegationKey delegationKey = convertMasterKeyToDelegationKey(request);
+      String nodeRemovePath = 
getMasterKeyZNodePathByDelegationKey(delegationKey);
+      LOG.debug("Removing RMDelegationKey_{}, ZkNodePath = {}.", 
delegationKey.getKeyId(),
+          nodeRemovePath);
+
+      // Check if the path exists, Throws an exception if the path does not 
exist.
+      if (!exists(nodeRemovePath)) {
+        throw new YarnException("ZkNodePath = " + nodeRemovePath + " not 
exists!");
+      }
+
+      // try to remove masterKey.
+      zkManager.delete(nodeRemovePath);
+      long end = clock.getTime();
+      opDurations.removeStoredMasterKeyDuration(start, end);
+      return RouterMasterKeyResponse.newInstance(masterKey);
+    } catch (Exception e) {
+      throw new YarnException(e);
+    }
   }
 
+  /**
+   * ZookeeperFederationStateStore Supports Remove MasterKey.
+   *
+   * @param request The request contains RouterMasterKey, which is an 
abstraction for DelegationKey
+   * @return routerMasterKeyResponse, the response contains the 
RouterMasterKey.
+   * @throws YarnException if the call to the state store is unsuccessful.
+   * @throws IOException An IO Error occurred.
+   */
   @Override
   public RouterMasterKeyResponse 
getMasterKeyByDelegationKey(RouterMasterKeyRequest request)
       throws YarnException, IOException {
-    throw new NotImplementedException("Code is not implemented");
+
+    long start = clock.getTime();
+    // For the verification of the request, after passing the verification,
+    // the request and the internal objects will not be empty and can be used 
directly.
+    FederationRouterRMTokenInputValidator.validate(request);
+
+    try {
+
+      // Parse the delegationKey from the request and get the ZK storage path.
+      DelegationKey delegationKey = convertMasterKeyToDelegationKey(request);
+      String nodePath = getMasterKeyZNodePathByDelegationKey(delegationKey);
+
+      // Check if the path exists, Throws an exception if the path does not 
exist.
+      if (!exists(nodePath)) {
+        throw new YarnException("ZkNodePath = " + nodePath + " not exists!");
+      }
+
+      // Get the stored masterKey from zk.
+      RouterMasterKey routerMasterKey = getRouterMasterKeyFromZK(nodePath);
+      long end = clock.getTime();
+      opDurations.getMasterKeyByDelegationKeyDuration(start, end);
+      return RouterMasterKeyResponse.newInstance(routerMasterKey);
+    } catch (Exception e) {
+      throw new YarnException(e);
+    }
+  }
+
+  /**
+   * Get MasterKeyZNodePath based on DelegationKey.
+   *
+   * @param delegationKey delegationKey.
+   * @return masterKey ZNodePath.
+   */
+  private String getMasterKeyZNodePathByDelegationKey(DelegationKey 
delegationKey) {
+    return getMasterKeyZNodePathByKeyId(delegationKey.getKeyId());
   }
 
+  /**
+   * Get MasterKeyZNodePath based on KeyId.
+   *
+   * @param keyId master key id.
+   * @return masterKey ZNodePath.
+   */
+  private String getMasterKeyZNodePathByKeyId(int keyId) {
+    String nodeName = ROUTER_RM_DELEGATION_KEY_PREFIX + keyId;
+    return getNodePath(routerRMDTMasterKeysRootPath, nodeName);
+  }
+
+  /**
+   * Get RouterMasterKey from ZK.
+   *
+   * @param nodePath The path where masterKey is stored in zk.
+   *
+   * @return RouterMasterKey.
+   * @throws IOException An IO Error occurred.
+   */
+  private RouterMasterKey getRouterMasterKeyFromZK(String nodePath)
+      throws IOException {
+    try {
+      byte[] data = get(nodePath);
+      if ((data == null) || (data.length == 0)) {
+        return null;
+      }
+
+      ByteArrayInputStream bin = new ByteArrayInputStream(data);
+      DataInputStream din = new DataInputStream(bin);
+      DelegationKey key = new DelegationKey();
+      key.readFields(din);
+
+      return RouterMasterKey.newInstance(key.getKeyId(),
+          ByteBuffer.wrap(key.getEncodedKey()), key.getExpiryDate());
+    } catch (Exception ex) {
+      LOG.error("No node in path {}.", nodePath);
+      throw new IOException(ex);
+    }
+  }
+
+  /**
+   * ZookeeperFederationStateStore Supports Store RMDelegationTokenIdentifier.
+   *
+   * The stored token method is a synchronized method
+   * used to ensure that storeNewToken is a thread-safe method.
+   *
+   * @param request The request contains RouterRMToken 
(RMDelegationTokenIdentifier and renewDate)
+   * @return routerRMTokenResponse, the response contains the RouterStoreToken.
+   * @throws YarnException if the call to the state store is unsuccessful.
+   * @throws IOException An IO Error occurred.
+   */
   @Override
   public RouterRMTokenResponse storeNewToken(RouterRMTokenRequest request)
       throws YarnException, IOException {
-    throw new NotImplementedException("Code is not implemented");
+
+    long start = clock.getTime();
+    // We verify the RouterRMTokenRequest to ensure that the request is not 
empty,
+    // and that the internal RouterStoreToken is not empty.
+    FederationRouterRMTokenInputValidator.validate(request);
+
+    try {
+
+      // add delegationToken
+      storeOrUpdateRouterRMDT(request, false);
+
+      // Get the stored delegationToken from ZK and return.
+      RouterStoreToken resultStoreToken = getStoreTokenFromZK(request, false);
+      long end = clock.getTime();
+      opDurations.getStoreNewTokenDuration(start, end);
+      return RouterRMTokenResponse.newInstance(resultStoreToken);
+    } catch (YarnException | IOException e) {
+      throw e;
+    } catch (Exception e) {
+      throw new YarnException(e);
+    }
   }
 
+  /**
+   * ZookeeperFederationStateStore Supports Update RMDelegationTokenIdentifier.
+   *
+   * The update stored token method is a synchronized method
+   * used to ensure that storeNewToken is a thread-safe method.
+   *
+   * @param request The request contains RouterRMToken 
(RMDelegationTokenIdentifier and renewDate)
+   * @return routerRMTokenResponse, the response contains the RouterStoreToken.
+   * @throws YarnException if the call to the state store is unsuccessful.
+   * @throws IOException An IO Error occurred.
+   */
   @Override
   public RouterRMTokenResponse updateStoredToken(RouterRMTokenRequest request)
       throws YarnException, IOException {
-    throw new NotImplementedException("Code is not implemented");
+
+    long start = clock.getTime();
+    // We verify the RouterRMTokenRequest to ensure that the request is not 
empty,
+    // and that the internal RouterStoreToken is not empty.
+    FederationRouterRMTokenInputValidator.validate(request);
+
+    try {
+
+      // get the Token storage path
+      String nodePath = getStoreTokenZNodePathByTokenRequest(request);
+
+      // updateStoredToken needs to determine whether the zkNode exists.
+      // If it exists, update the token data.
+      // If it does not exist, write the new token data directly.
+      boolean pathExists = true;
+      if (!exists(nodePath)) {
+        pathExists = false;
+      }
+
+      if (pathExists) {
+        // update delegationToken
+        storeOrUpdateRouterRMDT(request, true);
+      } else {
+        // add new delegationToken
+        storeNewToken(request);
+      }
+
+      // Get the stored delegationToken from ZK and return.
+      RouterStoreToken resultStoreToken = getStoreTokenFromZK(request, false);
+      long end = clock.getTime();
+      opDurations.updateStoredTokenDuration(start, end);
+      return RouterRMTokenResponse.newInstance(resultStoreToken);
+    } catch (YarnException | IOException e) {
+      throw e;
+    } catch (Exception e) {
+      throw new YarnException(e);
+    }
   }
 
+  /**
+   * ZookeeperFederationStateStore Supports Remove RMDelegationTokenIdentifier.
+   *
+   * The remove stored token method is a synchronized method
+   * used to ensure that storeNewToken is a thread-safe method.
+   *
+   * @param request The request contains RouterRMToken 
(RMDelegationTokenIdentifier and renewDate)
+   * @return routerRMTokenResponse, the response contains the RouterStoreToken.
+   * @throws YarnException if the call to the state store is unsuccessful.
+   * @throws IOException An IO Error occurred.
+   */
   @Override
   public RouterRMTokenResponse removeStoredToken(RouterRMTokenRequest request)
       throws YarnException, IOException {
-    throw new NotImplementedException("Code is not implemented");
+
+    long start = clock.getTime();
+    // We verify the RouterRMTokenRequest to ensure that the request is not 
empty,
+    // and that the internal RouterStoreToken is not empty.
+    FederationRouterRMTokenInputValidator.validate(request);
+
+    try {
+
+      // get the Token storage path
+      String nodePath = getStoreTokenZNodePathByTokenRequest(request);
+
+      // If the path to be deleted does not exist, throw an exception directly.
+      if (!exists(nodePath)) {
+        throw new YarnException("ZkNodePath = " + nodePath + " not exists!");
+      }
+
+      // Check again, first get the data from ZK,
+      // if the data is not empty, then delete it
+      RouterStoreToken storeToken = getStoreTokenFromZK(request, false);
+      if (storeToken != null) {
+        zkManager.delete(nodePath);
+      }
+
+      // return deleted token data.
+      long end = clock.getTime();
+      opDurations.removeStoredTokenDuration(start, end);
+      return RouterRMTokenResponse.newInstance(storeToken);
+    } catch (YarnException | IOException e) {
+      throw e;
+    } catch (Exception e) {
+      throw new YarnException(e);
+    }
   }
 
+  /**
+   * The Router Supports GetTokenByRouterStoreToken.
+   *
+   * @param request The request contains RouterRMToken 
(RMDelegationTokenIdentifier and renewDate)
+   * @return RouterRMTokenResponse.
+   * @throws YarnException if the call to the state store is unsuccessful
+   * @throws IOException An IO Error occurred
+   */
   @Override
   public RouterRMTokenResponse getTokenByRouterStoreToken(RouterRMTokenRequest 
request)
       throws YarnException, IOException {
-    throw new NotImplementedException("Code is not implemented");
+
+    long start = clock.getTime();
+    // We verify the RouterRMTokenRequest to ensure that the request is not 
empty,
+    // and that the internal RouterStoreToken is not empty.
+    FederationRouterRMTokenInputValidator.validate(request);
+
+    try {
+
+      // Before get the token,
+      // we need to determine whether the path where the token is stored 
exists.
+      // If it doesn't exist, we will throw an exception.
+      String nodePath = getStoreTokenZNodePathByTokenRequest(request);
+      if (!exists(nodePath)) {
+        throw new YarnException("ZkNodePath = " + nodePath + " not exists!");
+      }
+
+      // Get the stored delegationToken from ZK and return.
+      RouterStoreToken resultStoreToken = getStoreTokenFromZK(request, false);
+      // return deleted token data.
+      long end = clock.getTime();
+      opDurations.getTokenByRouterStoreTokenDuration(start, end);
+      return RouterRMTokenResponse.newInstance(resultStoreToken);
+    } catch (YarnException | IOException e) {
+      throw e;
+    } catch (Exception e) {
+      throw new YarnException(e);
+    }
+  }
+
+  /**
+   * Convert MasterKey to DelegationKey.
+   *
+   * Before using this function,
+   * please use FederationRouterRMTokenInputValidator to verify the request.
+   * By default, the request is not empty, and the internal object is not 
empty.
+   *
+   * @param request RouterMasterKeyRequest
+   * @return DelegationKey.
+   */
+  private DelegationKey convertMasterKeyToDelegationKey(RouterMasterKeyRequest 
request) {
+    RouterMasterKey masterKey = request.getRouterMasterKey();
+    return convertMasterKeyToDelegationKey(masterKey);
+  }
+
+  /**
+   * Convert MasterKey to DelegationKey.
+   *
+   * @param masterKey masterKey.
+   * @return DelegationKey.
+   */
+  private DelegationKey convertMasterKeyToDelegationKey(RouterMasterKey 
masterKey) {
+    ByteBuffer keyByteBuf = masterKey.getKeyBytes();
+    byte[] keyBytes = new byte[keyByteBuf.remaining()];
+    keyByteBuf.get(keyBytes);
+    return new DelegationKey(masterKey.getKeyId(), masterKey.getExpiryDate(), 
keyBytes);
+  }
+
+  /**
+   * Check if a path exists in zk.
+   *
+   * @param path Path to be checked.
+   * @return Returns true if the path exists, false if the path does not exist.
+   * @throws Exception When an exception to access zk occurs.
+   */
+  @VisibleForTesting
+  boolean exists(final String path) throws Exception {
+    return zkManager.exists(path);
+  }
+
+  /**
+   * Add or update delegationToken.
+   *
+   * Before using this function,
+   * please use FederationRouterRMTokenInputValidator to verify the request.
+   * By default, the request is not empty, and the internal object is not 
empty.
+   *
+   * @param request storeToken
+   * @param isUpdate true, update the token; false, create a new token.
+   * @throws Exception exception occurs.
+   */
+  private void storeOrUpdateRouterRMDT(RouterRMTokenRequest request,  boolean 
isUpdate)
+      throws Exception {
+
+    RouterStoreToken routerStoreToken  = request.getRouterStoreToken();
+    String nodeCreatePath = getStoreTokenZNodePathByTokenRequest(request);
+    LOG.debug("nodeCreatePath = {}, isUpdate = {}", nodeCreatePath, isUpdate);
+    put(nodeCreatePath, routerStoreToken.toByteArray(), isUpdate);
+  }
+
+  /**
+   * Get ZNode Path of StoreToken.
+   *
+   * Before using this method, we should use 
FederationRouterRMTokenInputValidator
+   * to verify the request,ensure that the request is not empty,
+   * and ensure that the object in the request is not empty.
+   *
+   * @param request RouterMasterKeyRequest.
+   * @return RouterRMToken ZNode Path.
+   * @throws IOException io exception occurs.
+   */
+  private String getStoreTokenZNodePathByTokenRequest(RouterRMTokenRequest 
request)
+      throws IOException {
+    RouterStoreToken routerStoreToken = request.getRouterStoreToken();
+    YARNDelegationTokenIdentifier identifier = 
routerStoreToken.getTokenIdentifier();
+    return getStoreTokenZNodePathByIdentifier(identifier);
+  }
+
+  /**
+   * Get ZNode Path of StoreToken.
+   *
+   * @param identifier YARNDelegationTokenIdentifier
+   * @return RouterRMToken ZNode Path.
+   */
+  private String 
getStoreTokenZNodePathByIdentifier(YARNDelegationTokenIdentifier identifier) {
+    String nodePath = getNodePath(routerRMDelegationTokensRootPath,
+        ROUTER_RM_DELEGATION_TOKEN_PREFIX + identifier.getSequenceNumber());
+    return nodePath;
+  }
+
+  /**
+   * Get RouterStoreToken from ZK.
+   *
+   * @param request RouterMasterKeyRequest.
+   * @param quiet If true is silent mode, no error message is printed at this 
time,
+   * if false is non-silent mode, error message is printed at this time.
+   * @return RouterStoreToken.
+   * @throws IOException io exception occurs.
+   */
+  private RouterStoreToken getStoreTokenFromZK(RouterRMTokenRequest request,
+      boolean quiet) throws IOException {
+    RouterStoreToken routerStoreToken = request.getRouterStoreToken();
+    YARNDelegationTokenIdentifier identifier = 
routerStoreToken.getTokenIdentifier();
+    return getStoreTokenFromZK(identifier, quiet);
+  }
+
+  /**
+   * Get RouterStoreToken from ZK.
+   *
+   * @param identifier YARN DelegationToken Identifier
+   * @param quiet Whether it is in quiet mode,
+   *        if it is in quiet mode, no exception information will be output.
+   * @return RouterStoreToken.
+   * @throws IOException io exception occurs.
+   */
+  private RouterStoreToken getStoreTokenFromZK(YARNDelegationTokenIdentifier 
identifier,
+      boolean quiet) throws IOException {
+    // get the Token storage path
+    String nodePath = getStoreTokenZNodePathByIdentifier(identifier);
+    return getStoreTokenFromZK(nodePath, quiet);
+  }
+
+  /**
+   * Get RouterStoreToken from ZK.
+   *
+   * @param nodePath Znode location where data is stored.
+   * @param quiet Whether it is in quiet mode,
+   *        if it is in quiet mode, no exception information will be output.
+   * @return RouterStoreToken.
+   * @throws IOException io exception occurs.
+   */
+  private RouterStoreToken getStoreTokenFromZK(String nodePath, boolean quiet)
+      throws IOException {
+    try {
+      byte[] data = get(nodePath);
+      if ((data == null) || (data.length == 0)) {
+        return null;
+      }
+      ByteArrayInputStream bin = new ByteArrayInputStream(data);
+      DataInputStream din = new DataInputStream(bin);
+      RouterStoreToken storeToken = Records.newRecord(RouterStoreToken.class);
+      storeToken.readFields(din);
+      return storeToken;
+    } catch (Exception ex) {
+      if (!quiet) {
+        LOG.error("No node in path [" + nodePath + "]");

Review Comment:
   Thank you very much for reviewing the code, I will fix it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org

Reply via email to