slfan1989 commented on code in PR #5131: URL: https://github.com/apache/hadoop/pull/5131#discussion_r1034873671
########## hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/ZookeeperFederationStateStore.java: ########## @@ -886,45 +975,554 @@ public UpdateReservationHomeSubClusterResponse updateReservationHomeSubCluster( return UpdateReservationHomeSubClusterResponse.newInstance(); } + /** + * ZookeeperFederationStateStore Supports Store NewMasterKey. + * + * @param request The request contains RouterMasterKey, which is an abstraction for DelegationKey + * @return routerMasterKeyResponse, the response contains the RouterMasterKey. + * @throws YarnException if the call to the state store is unsuccessful. + * @throws IOException An IO Error occurred. + */ @Override - public RouterMasterKeyResponse storeNewMasterKey(RouterMasterKeyRequest request) + public synchronized RouterMasterKeyResponse storeNewMasterKey(RouterMasterKeyRequest request) throws YarnException, IOException { - throw new NotImplementedException("Code is not implemented"); + + long start = clock.getTime(); + // For the verification of the request, after passing the verification, + // the request and the internal objects will not be empty and can be used directly. + FederationRouterRMTokenInputValidator.validate(request); + + // Parse the delegationKey from the request and get the ZK storage path. + DelegationKey delegationKey = convertMasterKeyToDelegationKey(request); + String nodeCreatePath = getMasterKeyZNodePathByDelegationKey(delegationKey); + LOG.debug("Storing RMDelegationKey_{}, ZkNodePath = {}.", delegationKey.getKeyId(), + nodeCreatePath); + + // Write master key data to zk. + try(ByteArrayOutputStream os = new ByteArrayOutputStream(); + DataOutputStream fsOut = new DataOutputStream(os)) { + delegationKey.write(fsOut); + put(nodeCreatePath, os.toByteArray(), false); + } + + // Get the stored masterKey from zk. + RouterMasterKey masterKeyFromZK = getRouterMasterKeyFromZK(nodeCreatePath); + long end = clock.getTime(); + opDurations.addStoreNewMasterKeyDuration(start, end); + return RouterMasterKeyResponse.newInstance(masterKeyFromZK); } + /** + * ZookeeperFederationStateStore Supports Remove MasterKey. + * + * @param request The request contains RouterMasterKey, which is an abstraction for DelegationKey + * @return routerMasterKeyResponse, the response contains the RouterMasterKey. + * @throws YarnException if the call to the state store is unsuccessful. + * @throws IOException An IO Error occurred. + */ @Override - public RouterMasterKeyResponse removeStoredMasterKey(RouterMasterKeyRequest request) + public synchronized RouterMasterKeyResponse removeStoredMasterKey(RouterMasterKeyRequest request) throws YarnException, IOException { - throw new NotImplementedException("Code is not implemented"); + + long start = clock.getTime(); + // For the verification of the request, after passing the verification, + // the request and the internal objects will not be empty and can be used directly. + FederationRouterRMTokenInputValidator.validate(request); + + try { + // Parse the delegationKey from the request and get the ZK storage path. + RouterMasterKey masterKey = request.getRouterMasterKey(); + DelegationKey delegationKey = convertMasterKeyToDelegationKey(request); + String nodeRemovePath = getMasterKeyZNodePathByDelegationKey(delegationKey); + LOG.debug("Removing RMDelegationKey_{}, ZkNodePath = {}.", delegationKey.getKeyId(), + nodeRemovePath); + + // Check if the path exists, Throws an exception if the path does not exist. + if (!exists(nodeRemovePath)) { + throw new YarnException("ZkNodePath = " + nodeRemovePath + " not exists!"); + } + + // try to remove masterKey. + zkManager.delete(nodeRemovePath); + long end = clock.getTime(); + opDurations.removeStoredMasterKeyDuration(start, end); + return RouterMasterKeyResponse.newInstance(masterKey); + } catch (Exception e) { + throw new YarnException(e); + } } + /** + * ZookeeperFederationStateStore Supports Remove MasterKey. + * + * @param request The request contains RouterMasterKey, which is an abstraction for DelegationKey + * @return routerMasterKeyResponse, the response contains the RouterMasterKey. + * @throws YarnException if the call to the state store is unsuccessful. + * @throws IOException An IO Error occurred. + */ @Override public RouterMasterKeyResponse getMasterKeyByDelegationKey(RouterMasterKeyRequest request) throws YarnException, IOException { - throw new NotImplementedException("Code is not implemented"); + + long start = clock.getTime(); + // For the verification of the request, after passing the verification, + // the request and the internal objects will not be empty and can be used directly. + FederationRouterRMTokenInputValidator.validate(request); + + try { + + // Parse the delegationKey from the request and get the ZK storage path. + DelegationKey delegationKey = convertMasterKeyToDelegationKey(request); + String nodePath = getMasterKeyZNodePathByDelegationKey(delegationKey); + + // Check if the path exists, Throws an exception if the path does not exist. + if (!exists(nodePath)) { + throw new YarnException("ZkNodePath = " + nodePath + " not exists!"); + } + + // Get the stored masterKey from zk. + RouterMasterKey routerMasterKey = getRouterMasterKeyFromZK(nodePath); + long end = clock.getTime(); + opDurations.getMasterKeyByDelegationKeyDuration(start, end); + return RouterMasterKeyResponse.newInstance(routerMasterKey); + } catch (Exception e) { + throw new YarnException(e); + } + } + + /** + * Get MasterKeyZNodePath based on DelegationKey. + * + * @param delegationKey delegationKey. + * @return masterKey ZNodePath. + */ + private String getMasterKeyZNodePathByDelegationKey(DelegationKey delegationKey) { + return getMasterKeyZNodePathByKeyId(delegationKey.getKeyId()); + } + + /** + * Get MasterKeyZNodePath based on KeyId. + * + * @param keyId master key id. + * @return masterKey ZNodePath. + */ + private String getMasterKeyZNodePathByKeyId(int keyId) { + String nodeName = ROUTER_RM_DELEGATION_KEY_PREFIX + keyId; + return getNodePath(routerRMDTMasterKeysRootPath, nodeName); + } + + /** + * Get RouterMasterKey from ZK. + * + * @param nodePath The path where masterKey is stored in zk. + * + * @return RouterMasterKey. + * @throws IOException An IO Error occurred. + */ + private RouterMasterKey getRouterMasterKeyFromZK(String nodePath) + throws IOException { + try { + byte[] data = get(nodePath); + if ((data == null) || (data.length == 0)) { + return null; + } + + ByteArrayInputStream bin = new ByteArrayInputStream(data); + DataInputStream din = new DataInputStream(bin); + DelegationKey key = new DelegationKey(); + key.readFields(din); + + return RouterMasterKey.newInstance(key.getKeyId(), + ByteBuffer.wrap(key.getEncodedKey()), key.getExpiryDate()); + } catch (Exception ex) { + LOG.error("No node in path {}.", nodePath); + throw new IOException(ex); + } } + /** + * ZookeeperFederationStateStore Supports Store RMDelegationTokenIdentifier. + * + * The stored token method is a synchronized method + * used to ensure that storeNewToken is a thread-safe method. + * + * @param request The request contains RouterRMToken (RMDelegationTokenIdentifier and renewDate) + * @return routerRMTokenResponse, the response contains the RouterStoreToken. + * @throws YarnException if the call to the state store is unsuccessful. + * @throws IOException An IO Error occurred. + */ @Override - public RouterRMTokenResponse storeNewToken(RouterRMTokenRequest request) + public synchronized RouterRMTokenResponse storeNewToken(RouterRMTokenRequest request) Review Comment: > Can we use a finer grain lock. There are many things synchronized that can be done in parallel. Maybe a lock per group too. Could we even do a read write lock? Thank you very much for your suggestion, your understanding is accurate, if we use synchronized keyword at method level, there is too much involved. When completing the PR (#5169), JIRA: YARN-11349. [Federation] Router Support Delegation Token With SQL. I carefully read the `AbstractDelegationTokenSecretManager` code, I think we should be able to remove the `synchronized` keyword in `ZookeeperFederationStateStore`. I am worried that concurrent access may cause different threads to generate the same primary key data, so that errors will occur when storing, so in the methods of adding, deleting, and updating MasterKey and Token I used the `synchronized` method. But I found out that this does not happen for generating the same primary key. > Group1 methods are mainly used to create or remove MasterKey The key information in `RouterMasterKey` is `DelegationKey`, and the primary key of `DelegationKey` is `keyId`. We found that `DelegationKey` will be constructed in `AbstractDelegationTokenSecretManager#updateCurrentKey`. - **AbstractDelegationTokenSecretManager#updateCurrentKey** This method is protected by synchronized when generating the primary key keyId and storing the masterKey, so the uniqueness of the primary key can be guaranteed ``` private void updateCurrentKey() throws IOException { LOG.info("Updating the current master key for generating delegation tokens"); int newCurrentId; // 1. newCurrentId is keyId, which has been protected synchronized (this) { newCurrentId = incrementCurrentKeyId(); } DelegationKey newKey = new DelegationKey(newCurrentId, System .currentTimeMillis() + keyUpdateInterval + tokenMaxLifetime, generateSecret()); logUpdateMasterKey(newKey); // 2. storeDelegationKey calls storeNewMasterKey, this part has been protected synchronized (this) { currentKey = newKey; storeDelegationKey(currentKey); } } protected void storeDelegationKey(DelegationKey key) throws IOException { allKeys.put(key.getKeyId(), key); storeNewMasterKey(key); } ``` We can find that the method of `removeMasterKey` is also protected. - **AbstractDelegationTokenSecretManager#removeExpiredKeys** ``` private synchronized void removeExpiredKeys() { long now = Time.now(); for (Iterator<Map.Entry<Integer, DelegationKey>> it = allKeys.entrySet() .iterator(); it.hasNext();) { Map.Entry<Integer, DelegationKey> e = it.next(); if (e.getValue().getExpiryDate() < now) { it.remove(); // ensure the tokens generated by this current key can be recovered // with this current key after this current key is rolled if(!e.getValue().equals(currentKey)) removeStoredMasterKey(e.getValue()); } } } ``` > Group2 methods are mainly used to create or update and remove Token For `DelegationToken`, we can also find that the `add`, `update`, and `remove` methods are all protected. - **AbstractDelegationTokenSecretManager#storeToken** ``` protected synchronized byte[] createPassword(TokenIdent identifier) { ..... DelegationTokenInformation tokenInfo = new DelegationTokenInformation(now + tokenRenewInterval, password, getTrackingIdIfEnabled(identifier)); try { // storeToken METRICS.trackStoreToken(() -> storeToken(identifier, tokenInfo)); } catch (IOException ioe) { LOG.error("Could not store token " + formatTokenId(identifier) + "!!", ioe); } return password; } ``` - **AbstractDelegationTokenSecretManager#renewToken** ``` public synchronized long renewToken(Token<TokenIdent> token, String renewer) throws InvalidToken, IOException { ...... long renewTime = Math.min(id.getMaxDate(), now + tokenRenewInterval); String trackingId = getTrackingIdIfEnabled(id); DelegationTokenInformation info = new DelegationTokenInformation(renewTime, password, trackingId); if (getTokenInfo(id) == null) { throw new InvalidToken("Renewal request for unknown token " + formatTokenId(id)); } // updateToken METRICS.trackUpdateToken(() -> updateToken(id, info)); return renewTime; } ``` - **AbstractDelegationTokenSecretManager#cancelToken** ``` public synchronized TokenIdent cancelToken(Token<TokenIdent> token, String canceller) throws IOException { ...... DelegationTokenInformation info = currentTokens.remove(id); if (info == null) { throw new InvalidToken("Token not found " + formatTokenId(id)); } // cancelToken METRICS.trackRemoveToken(() -> { removeTokenForOwnerStats(id); removeStoredToken(id); }); return id; } ``` > Group3 methods are used to increment `DelegationTokenSeqNum` and `MasterKeyId` These two methods are also protected in the abstract class. - **AbstractDelegationTokenSecretManager#incrementDelegationTokenSeqNum** ``` protected synchronized int incrementDelegationTokenSeqNum() { return ++delegationTokenSequenceNumber; } ``` - **AbstractDelegationTokenSecretManager#incrementCurrentKeyId** ``` protected synchronized int incrementCurrentKeyId() { return ++currentId; } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-issues-h...@hadoop.apache.org