Repository: hadoop
Updated Branches:
  refs/heads/branch-3.0 cd9078df6 -> aa65f6c1a


YARN-7262. Add a hierarchy into the ZKRMStateStore for delegation token znodes 
to prevent jute buffer overflow (rkanter)

(cherry picked from commit b1de78619f3e5e25d6f9d5eaf41925f22d212fb9)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/aa65f6c1
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/aa65f6c1
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/aa65f6c1

Branch: refs/heads/branch-3.0
Commit: aa65f6c1ad0eadd0169d89e0ec83fb2b49693ae5
Parents: cd9078d
Author: Robert Kanter <rkan...@apache.org>
Authored: Thu Oct 26 17:47:32 2017 -0700
Committer: Robert Kanter <rkan...@apache.org>
Committed: Thu Oct 26 17:47:51 2017 -0700

----------------------------------------------------------------------
 .../hadoop/yarn/conf/YarnConfiguration.java     |   7 +
 .../src/main/resources/yarn-default.xml         |  18 +
 .../resourcemanager/recovery/RMStateStore.java  |   2 +
 .../recovery/ZKRMStateStore.java                | 411 +++++++++++++------
 .../recovery/TestZKRMStateStore.java            | 372 ++++++++++++++++-
 5 files changed, 673 insertions(+), 137 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/aa65f6c1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 0f4080a..8718809 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -610,6 +610,13 @@ public class YarnConfiguration extends Configuration {
       RM_ZK_PREFIX + "appid-node.split-index";
   public static final int DEFAULT_ZK_APPID_NODE_SPLIT_INDEX = 0;
 
+  /** Index at which the RM Delegation Token ids will be split so that the
+   * delegation token znodes stored in the zookeeper RM state store will be
+   * stored as two different znodes (parent-child). **/
+  public static final  String ZK_DELEGATION_TOKEN_NODE_SPLIT_INDEX =
+      RM_ZK_PREFIX + "delegation-token-node.split-index";
+  public static final int DEFAULT_ZK_DELEGATION_TOKEN_NODE_SPLIT_INDEX = 0;
+
   public static final String RM_ZK_ACL = RM_ZK_PREFIX + "acl";
   public static final String DEFAULT_RM_ZK_ACL = "world:anyone:rwcda";
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aa65f6c1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index ccfe10a..06a7add 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -594,6 +594,24 @@
   </property>
 
   <property>
+    <description>Index at which the RM Delegation Token ids will be split so
+      that the delegation token znodes stored in the zookeeper RM state store
+      will be stored as two different znodes (parent-child). The split is done
+      from the end. For instance, with no split, a delegation token znode will
+      be of the form RMDelegationToken_123456789. If the value of this config 
is
+      1, the delegation token znode will be broken into two parts:
+      RMDelegationToken_12345678 and 9 respectively with former being the 
parent
+      node. This config can take values from 0 to 4. 0 means there will be no
+      split. If the value is outside this range, it will be treated as 0 (i.e.
+      no split). A value larger than 0 (up to 4) should be configured if you 
are
+      running a large number of applications, with long-lived delegation tokens
+      and state store operations (e.g. failover) are failing due to LenError in
+      Zookeeper.</description>
+    <name>yarn.resourcemanager.zk-delegation-token-node.split-index</name>
+    <value>0</value>
+  </property>
+
+  <property>
     <description>Specifies the maximum size of the data that can be stored
       in a znode. Value should be same or less than jute.maxbuffer configured
       in zookeeper. Default value configured is 1MB.</description>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aa65f6c1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
index 35340e6..e8ed0b7 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
@@ -89,6 +89,8 @@ public abstract class RMStateStore extends AbstractService {
   @VisibleForTesting
   public static final String RM_APP_ROOT = "RMAppRoot";
   protected static final String RM_DT_SECRET_MANAGER_ROOT = 
"RMDTSecretManagerRoot";
+  protected static final String RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME =
+      "RMDelegationTokensRoot";
   protected static final String DELEGATION_KEY_PREFIX = "DelegationKey_";
   protected static final String DELEGATION_TOKEN_PREFIX = "RMDelegationToken_";
   protected static final String DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aa65f6c1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
index 5bff77f..5d3ca45 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
@@ -118,6 +118,22 @@ import java.util.Set;
  * |--- RM_DT_SECRET_MANAGER_ROOT
  *        |----- RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME
  *        |----- RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME
+ *        |       |----- 1
+ *        |       |      |----- (#TokenId barring last character)
+ *        |       |      |       |----- (#Last character of TokenId)
+ *        |       |      ....
+ *        |       |----- 2
+ *        |       |      |----- (#TokenId barring last 2 characters)
+ *        |       |      |       |----- (#Last 2 characters of TokenId)
+ *        |       |      ....
+ *        |       |----- 3
+ *        |       |      |----- (#TokenId barring last 3 characters)
+ *        |       |      |       |----- (#Last 3 characters of TokenId)
+ *        |       |      ....
+ *        |       |----- 4
+ *        |       |      |----- (#TokenId barring last 4 characters)
+ *        |       |      |       |----- (#Last 4 characters of TokenId)
+ *        |       |      ....
  *        |       |----- Token_1
  *        |       |----- Token_2
  *        |       ....
@@ -147,6 +163,11 @@ import java.util.Set;
  * splitting it in 2 parts, depending on a configurable split index. This 
limits
  * the number of application znodes returned in a single call while loading
  * app state.
+ *
+ * Changes from 1.4 to 1.5 - Change the structure of delegation token znode by
+ * splitting it in 2 parts, depending on a configurable split index. This 
limits
+ * the number of delegation token znodes returned in a single call while 
loading
+ * tokens state.
  */
 @Private
 @Unstable
@@ -162,7 +183,7 @@ public class ZKRMStateStore extends RMStateStore {
   @VisibleForTesting
   public static final String ROOT_ZNODE_NAME = "ZKRMStateRoot";
   protected static final Version CURRENT_VERSION_INFO = Version
-      .newInstance(1, 4);
+      .newInstance(1, 5);
   @VisibleForTesting
   public static final String RM_APP_ROOT_HIERARCHIES = "HIERARCHIES";
 
@@ -170,6 +191,7 @@ public class ZKRMStateStore extends RMStateStore {
   private String zkRootNodePath;
   private String rmAppRoot;
   private Map<Integer, String> rmAppRootHierarchies;
+  private Map<Integer, String> rmDelegationTokenHierarchies;
   private String rmDTSecretManagerRoot;
   private String dtMasterKeysRootPath;
   private String delegationTokensRootPath;
@@ -180,6 +202,8 @@ public class ZKRMStateStore extends RMStateStore {
   @VisibleForTesting
   protected String znodeWorkingPath;
   private int appIdNodeSplitIndex = 0;
+  @VisibleForTesting
+  protected int delegationTokenNodeSplitIndex = 0;
 
   /* Fencing related variables */
   private static final String FENCING_LOCK = "RM_ZK_FENCING_LOCK";
@@ -212,12 +236,13 @@ public class ZKRMStateStore extends RMStateStore {
   };
 
   /**
-   * Encapsulates full app node path and corresponding split index.
+   * Encapsulates znode path and corresponding split index for hierarchical
+   * znode layouts.
    */
-  private final static class AppNodeSplitInfo {
+  private final static class ZnodeSplitInfo {
     private final String path;
     private final int splitIndex;
-    AppNodeSplitInfo(String path, int splitIndex) {
+    ZnodeSplitInfo(String path, int splitIndex) {
       this.path = path;
       this.splitIndex = splitIndex;
     }
@@ -288,7 +313,7 @@ public class ZKRMStateStore extends RMStateStore {
     appIdNodeSplitIndex =
         conf.getInt(YarnConfiguration.ZK_APPID_NODE_SPLIT_INDEX,
             YarnConfiguration.DEFAULT_ZK_APPID_NODE_SPLIT_INDEX);
-    if (appIdNodeSplitIndex < 1 || appIdNodeSplitIndex > 4) {
+    if (appIdNodeSplitIndex < 0 || appIdNodeSplitIndex > 4) {
       LOG.info("Invalid value " + appIdNodeSplitIndex + " for config " +
           YarnConfiguration.ZK_APPID_NODE_SPLIT_INDEX + " specified. " +
               "Resetting it to " +
@@ -322,12 +347,30 @@ public class ZKRMStateStore extends RMStateStore {
         RM_DT_MASTER_KEYS_ROOT_ZNODE_NAME);
     delegationTokensRootPath = getNodePath(rmDTSecretManagerRoot,
         RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME);
+    rmDelegationTokenHierarchies = new HashMap<>(5);
+    rmDelegationTokenHierarchies.put(0, delegationTokensRootPath);
+    for (int splitIndex = 1; splitIndex <= 4; splitIndex++) {
+      rmDelegationTokenHierarchies.put(splitIndex,
+          getNodePath(delegationTokensRootPath, Integer.toString(splitIndex)));
+    }
     dtSequenceNumberPath = getNodePath(rmDTSecretManagerRoot,
         RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME);
     amrmTokenSecretManagerRoot =
         getNodePath(zkRootNodePath, AMRMTOKEN_SECRET_MANAGER_ROOT);
     reservationRoot = getNodePath(zkRootNodePath, RESERVATION_SYSTEM_ROOT);
     zkManager = resourceManager.getAndStartZKManager(conf);
+    delegationTokenNodeSplitIndex =
+        conf.getInt(YarnConfiguration.ZK_DELEGATION_TOKEN_NODE_SPLIT_INDEX,
+            YarnConfiguration.DEFAULT_ZK_DELEGATION_TOKEN_NODE_SPLIT_INDEX);
+    if (delegationTokenNodeSplitIndex < 0
+        || delegationTokenNodeSplitIndex > 4) {
+      LOG.info("Invalid value " + delegationTokenNodeSplitIndex + " for config 
"
+          + YarnConfiguration.ZK_DELEGATION_TOKEN_NODE_SPLIT_INDEX
+          + " specified.  Resetting it to " +
+          YarnConfiguration.DEFAULT_ZK_DELEGATION_TOKEN_NODE_SPLIT_INDEX);
+      delegationTokenNodeSplitIndex =
+          YarnConfiguration.DEFAULT_ZK_DELEGATION_TOKEN_NODE_SPLIT_INDEX;
+    }
   }
 
   @Override
@@ -350,6 +393,9 @@ public class ZKRMStateStore extends RMStateStore {
     create(rmDTSecretManagerRoot);
     create(dtMasterKeysRootPath);
     create(delegationTokensRootPath);
+    for (int splitIndex = 1; splitIndex <= 4; splitIndex++) {
+      create(rmDelegationTokenHierarchies.get(splitIndex));
+    }
     create(dtSequenceNumberPath);
     create(amrmTokenSecretManagerRoot);
     create(reservationRoot);
@@ -572,36 +618,63 @@ public class ZKRMStateStore extends RMStateStore {
   }
 
   private void loadRMDelegationTokenState(RMState rmState) throws Exception {
-    List<String> childNodes =
-        getChildren(delegationTokensRootPath);
-
-    for (String childNodeName : childNodes) {
-      String childNodePath =
-          getNodePath(delegationTokensRootPath, childNodeName);
-      byte[] childData = getData(childNodePath);
-
-      if (childData == null) {
-        LOG.warn("Content of " + childNodePath + " is broken.");
+    for (int splitIndex = 0; splitIndex <= 4; splitIndex++) {
+      String tokenRoot = rmDelegationTokenHierarchies.get(splitIndex);
+      if (tokenRoot == null) {
         continue;
       }
-
-      ByteArrayInputStream is = new ByteArrayInputStream(childData);
-
-      try (DataInputStream fsIn = new DataInputStream(is)) {
+      List<String> childNodes = getChildren(tokenRoot);
+      boolean dtNodeFound = false;
+      for (String childNodeName : childNodes) {
         if (childNodeName.startsWith(DELEGATION_TOKEN_PREFIX)) {
-          RMDelegationTokenIdentifierData identifierData =
-              new RMDelegationTokenIdentifierData();
-          identifierData.readFields(fsIn);
-          RMDelegationTokenIdentifier identifier =
-              identifierData.getTokenIdentifier();
-          long renewDate = identifierData.getRenewDate();
-          rmState.rmSecretManagerState.delegationTokenState.put(identifier,
-              renewDate);
-
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Loaded RMDelegationTokenIdentifier: " + identifier
-                + " renewDate=" + renewDate);
+          dtNodeFound = true;
+          String parentNodePath = getNodePath(tokenRoot, childNodeName);
+          if (splitIndex == 0) {
+            loadDelegationTokenFromNode(rmState, parentNodePath);
+          } else {
+            // If znode is partitioned.
+            List<String> leafNodes = getChildren(parentNodePath);
+            for (String leafNodeName : leafNodes) {
+              loadDelegationTokenFromNode(rmState,
+                  getNodePath(parentNodePath, leafNodeName));
+            }
           }
+        } else if (splitIndex == 0
+            && !(childNodeName.equals("1") || childNodeName.equals("2")
+            || childNodeName.equals("3") || childNodeName.equals("4"))) {
+          LOG.debug("Unknown child node with name " + childNodeName + " under" 
+
+              tokenRoot);
+        }
+      }
+      if (splitIndex != delegationTokenNodeSplitIndex && !dtNodeFound) {
+        // If no loaded delegation token exists for a particular split index 
and
+        // the split index for which tokens are being loaded is not the one
+        // configured, then we do not need to keep track of this hierarchy for
+        // storing/updating/removing delegation token znodes.
+        rmDelegationTokenHierarchies.remove(splitIndex);
+      }
+    }
+  }
+
+  private void loadDelegationTokenFromNode(RMState rmState, String path)
+      throws Exception {
+    byte[] data = getData(path);
+    if (data == null) {
+      LOG.warn("Content of " + path + " is broken.");
+    } else {
+      ByteArrayInputStream is = new ByteArrayInputStream(data);
+      try (DataInputStream fsIn = new DataInputStream(is)) {
+        RMDelegationTokenIdentifierData identifierData =
+            new RMDelegationTokenIdentifierData();
+        identifierData.readFields(fsIn);
+        RMDelegationTokenIdentifier identifier =
+            identifierData.getTokenIdentifier();
+        long renewDate = identifierData.getRenewDate();
+        rmState.rmSecretManagerState.delegationTokenState.put(identifier,
+            renewDate);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Loaded RMDelegationTokenIdentifier: " + identifier
+              + " renewDate=" + renewDate);
         }
       }
     }
@@ -649,8 +722,9 @@ public class ZKRMStateStore extends RMStateStore {
                   getNodePath(parentNodePath, leafNodeName), appIdStr);
             }
           }
-        } else {
-          LOG.info("Unknown child node with name: " + childNodeName);
+        } else if (!childNodeName.equals(RM_APP_ROOT_HIERARCHIES)){
+          LOG.debug("Unknown child node with name " + childNodeName + " under" 
+
+              appRoot);
         }
       }
       if (splitIndex != appIdNodeSplitIndex && !appNodeFound) {
@@ -683,36 +757,36 @@ public class ZKRMStateStore extends RMStateStore {
   }
 
   /**
-   * Get parent app node path based on full path and split index supplied.
-   * @param appIdPath App id path for which parent needs to be returned.
+   * Get znode path based on full path and split index supplied.
+   * @param path path for which parent needs to be returned.
    * @param splitIndex split index.
    * @return parent app node path.
    */
-  private String getSplitAppNodeParent(String appIdPath, int splitIndex) {
-    // Calculated as string upto index (appIdPath Length - split index - 1). We
+  private String getSplitZnodeParent(String path, int splitIndex) {
+    // Calculated as string up to index (path Length - split index - 1). We
     // deduct 1 to exclude path separator.
-    return appIdPath.substring(0, appIdPath.length() - splitIndex - 1);
+    return path.substring(0, path.length() - splitIndex - 1);
   }
 
   /**
-   * Checks if parent app node has no leaf nodes and if it does not have,
-   * removes it. Called while removing application.
-   * @param appIdPath path of app id to be removed.
+   * Checks if parent znode has no leaf nodes and if it does not have,
+   * removes it.
+   * @param path path of znode to be removed.
    * @param splitIndex split index.
    * @throws Exception if any problem occurs while performing ZK operation.
    */
-  private void checkRemoveParentAppNode(String appIdPath, int splitIndex)
+  private void checkRemoveParentZnode(String path, int splitIndex)
       throws Exception {
     if (splitIndex != 0) {
-      String parentAppNode = getSplitAppNodeParent(appIdPath, splitIndex);
+      String parentZnode = getSplitZnodeParent(path, splitIndex);
       List<String> children = null;
       try {
-        children = getChildren(parentAppNode);
+        children = getChildren(parentZnode);
       } catch (KeeperException.NoNodeException ke) {
-        // It should be fine to swallow this exception as the parent app node 
we
+        // It should be fine to swallow this exception as the parent znode we
         // intend to delete is already deleted.
         if (LOG.isDebugEnabled()) {
-          LOG.debug("Unable to remove app parent node " + parentAppNode +
+          LOG.debug("Unable to remove parent node " + parentZnode +
               " as it does not exist.");
         }
         return;
@@ -720,16 +794,16 @@ public class ZKRMStateStore extends RMStateStore {
       // No apps stored under parent path.
       if (children != null && children.isEmpty()) {
         try {
-          zkManager.safeDelete(parentAppNode, zkAcl, fencingNodePath);
+          zkManager.safeDelete(parentZnode, zkAcl, fencingNodePath);
           if (LOG.isDebugEnabled()) {
-            LOG.debug("No leaf app node exists. Removing parent node " +
-                parentAppNode);
+            LOG.debug("No leaf znode exists. Removing parent node " +
+                parentZnode);
           }
         } catch (KeeperException.NotEmptyException ke) {
-          // It should be fine to swallow this exception as the parent app node
+          // It should be fine to swallow this exception as the parent znode
           // has to be deleted only if it has no children. And this node has.
           if (LOG.isDebugEnabled()) {
-            LOG.debug("Unable to remove app parent node " + parentAppNode +
+            LOG.debug("Unable to remove app parent node " + parentZnode +
                 " as it has children.");
           }
         }
@@ -770,7 +844,7 @@ public class ZKRMStateStore extends RMStateStore {
     // Look for paths based on other split indices if path as per split index
     // does not exist.
     if (!exists(nodeUpdatePath)) {
-      AppNodeSplitInfo alternatePathInfo = getAlternatePath(appId.toString());
+      ZnodeSplitInfo alternatePathInfo = getAlternateAppPath(appId.toString());
       if (alternatePathInfo != null) {
         nodeUpdatePath = alternatePathInfo.path;
       } else {
@@ -778,7 +852,7 @@ public class ZKRMStateStore extends RMStateStore {
         pathExists = false;
         if (appIdNodeSplitIndex != 0) {
           String rootNode =
-              getSplitAppNodeParent(nodeUpdatePath, appIdNodeSplitIndex);
+              getSplitZnodeParent(nodeUpdatePath, appIdNodeSplitIndex);
           if (!exists(rootNode)) {
             zkManager.safeCreate(rootNode, null, zkAcl, CreateMode.PERSISTENT,
                 zkAcl, fencingNodePath);
@@ -819,7 +893,7 @@ public class ZKRMStateStore extends RMStateStore {
     String appDirPath = getLeafAppIdNodePath(appId, false);
     // Look for paths based on other split indices.
     if (!exists(appDirPath)) {
-      AppNodeSplitInfo alternatePathInfo = getAlternatePath(appId);
+      ZnodeSplitInfo alternatePathInfo = getAlternateAppPath(appId);
       if (alternatePathInfo == null) {
         if (operation == AppAttemptOp.REMOVE) {
           // Unexpected. Assume that app attempt has been deleted.
@@ -918,7 +992,7 @@ public class ZKRMStateStore extends RMStateStore {
     // Look for paths based on other split indices if path as per configured
     // split index does not exist.
     if (!exists(appIdRemovePath)) {
-      AppNodeSplitInfo alternatePathInfo = getAlternatePath(removeAppId);
+      ZnodeSplitInfo alternatePathInfo = getAlternateAppPath(removeAppId);
       if (alternatePathInfo != null) {
         appIdRemovePath = alternatePathInfo.path;
         splitIndex = alternatePathInfo.splitIndex;
@@ -946,24 +1020,60 @@ public class ZKRMStateStore extends RMStateStore {
           forPath(appIdRemovePath);
     }
     // Check if we should remove the parent app node as well.
-    checkRemoveParentAppNode(appIdRemovePath, splitIndex);
+    checkRemoveParentZnode(appIdRemovePath, splitIndex);
   }
 
   @Override
   protected synchronized void storeRMDelegationTokenState(
       RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate)
       throws Exception {
-    SafeTransaction trx = zkManager.createTransaction(zkAcl, fencingNodePath);
-    addStoreOrUpdateOps(trx, rmDTIdentifier, renewDate, false);
-    trx.commit();
+    String nodeCreatePath = getLeafDelegationTokenNodePath(
+        rmDTIdentifier.getSequenceNumber(), true);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Storing " + DELEGATION_TOKEN_PREFIX
+          + rmDTIdentifier.getSequenceNumber());
+    }
+
+    RMDelegationTokenIdentifierData identifierData =
+        new RMDelegationTokenIdentifierData(rmDTIdentifier, renewDate);
+    ByteArrayOutputStream seqOs = new ByteArrayOutputStream();
+    try (DataOutputStream seqOut = new DataOutputStream(seqOs)) {
+      SafeTransaction trx = zkManager.createTransaction(zkAcl,
+          fencingNodePath);
+      trx.create(nodeCreatePath, identifierData.toByteArray(), zkAcl,
+          CreateMode.PERSISTENT);
+      // Update Sequence number only while storing DT
+      seqOut.writeInt(rmDTIdentifier.getSequenceNumber());
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Storing " + dtSequenceNumberPath + ". SequenceNumber: "
+            + rmDTIdentifier.getSequenceNumber());
+      }
+
+      trx.setData(dtSequenceNumberPath, seqOs.toByteArray(), -1);
+      trx.commit();
+    }
   }
 
   @Override
   protected synchronized void removeRMDelegationTokenState(
       RMDelegationTokenIdentifier rmDTIdentifier) throws Exception {
-    String nodeRemovePath =
-        getNodePath(delegationTokensRootPath, DELEGATION_TOKEN_PREFIX
-            + rmDTIdentifier.getSequenceNumber());
+    String nodeRemovePath = getLeafDelegationTokenNodePath(
+        rmDTIdentifier.getSequenceNumber(), false);
+    int splitIndex = delegationTokenNodeSplitIndex;
+    // Look for paths based on other split indices if path as per configured
+    // split index does not exist.
+    if (!exists(nodeRemovePath)) {
+      ZnodeSplitInfo alternatePathInfo =
+          getAlternateDTPath(rmDTIdentifier.getSequenceNumber());
+      if (alternatePathInfo != null) {
+        nodeRemovePath = alternatePathInfo.path;
+        splitIndex = alternatePathInfo.splitIndex;
+      } else {
+        // Alternate path not found so return.
+        return;
+      }
+    }
 
     if (LOG.isDebugEnabled()) {
       LOG.debug("Removing RMDelegationToken_"
@@ -971,62 +1081,41 @@ public class ZKRMStateStore extends RMStateStore {
     }
 
     zkManager.safeDelete(nodeRemovePath, zkAcl, fencingNodePath);
+
+    // Check if we should remove the parent app node as well.
+    checkRemoveParentZnode(nodeRemovePath, splitIndex);
   }
 
   @Override
   protected synchronized void updateRMDelegationTokenState(
       RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate)
       throws Exception {
-    SafeTransaction trx = zkManager.createTransaction(zkAcl, fencingNodePath);
-    String nodeRemovePath =
-        getNodePath(delegationTokensRootPath, DELEGATION_TOKEN_PREFIX
-            + rmDTIdentifier.getSequenceNumber());
-
-    if (exists(nodeRemovePath)) {
-      // in case znode exists
-      addStoreOrUpdateOps(trx, rmDTIdentifier, renewDate, true);
-    } else {
-      // in case znode doesn't exist
-      addStoreOrUpdateOps(trx, rmDTIdentifier, renewDate, false);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Attempted to update a non-existing znode " + 
nodeRemovePath);
+    String nodeUpdatePath = getLeafDelegationTokenNodePath(
+        rmDTIdentifier.getSequenceNumber(), false);
+    boolean pathExists = true;
+    // Look for paths based on other split indices if path as per split index
+    // does not exist.
+    if (!exists(nodeUpdatePath)) {
+      ZnodeSplitInfo alternatePathInfo =
+          getAlternateDTPath(rmDTIdentifier.getSequenceNumber());
+      if (alternatePathInfo != null) {
+        nodeUpdatePath = alternatePathInfo.path;
+      } else {
+        pathExists = false;
       }
     }
 
-    trx.commit();
-  }
-
-  private void addStoreOrUpdateOps(SafeTransaction trx,
-      RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate,
-      boolean isUpdate) throws Exception {
-    // store RM delegation token
-    String nodeCreatePath = getNodePath(delegationTokensRootPath,
-        DELEGATION_TOKEN_PREFIX + rmDTIdentifier.getSequenceNumber());
-    RMDelegationTokenIdentifierData identifierData =
-        new RMDelegationTokenIdentifierData(rmDTIdentifier, renewDate);
-    ByteArrayOutputStream seqOs = new ByteArrayOutputStream();
-
-    try (DataOutputStream seqOut = new DataOutputStream(seqOs)) {
-
-      if (isUpdate) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Updating RMDelegationToken_"
-              + rmDTIdentifier.getSequenceNumber());
-        }
-        trx.setData(nodeCreatePath, identifierData.toByteArray(), -1);
-      } else {
-        trx.create(nodeCreatePath, identifierData.toByteArray(), zkAcl,
-            CreateMode.PERSISTENT);
-        // Update Sequence number only while storing DT
-        seqOut.writeInt(rmDTIdentifier.getSequenceNumber());
-
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Storing " + dtSequenceNumberPath + ". SequenceNumber: "
-              + rmDTIdentifier.getSequenceNumber());
-        }
-
-        trx.setData(dtSequenceNumberPath, seqOs.toByteArray(), -1);
+    if (pathExists) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Updating " + DELEGATION_TOKEN_PREFIX
+            + rmDTIdentifier.getSequenceNumber());
       }
+      RMDelegationTokenIdentifierData identifierData =
+          new RMDelegationTokenIdentifierData(rmDTIdentifier, renewDate);
+      zkManager.safeSetData(nodeUpdatePath, identifierData.toByteArray(), -1,
+          zkAcl, fencingNodePath);
+    } else {
+      storeRMDelegationTokenState(rmDTIdentifier, renewDate);
     }
   }
 
@@ -1156,19 +1245,19 @@ public class ZKRMStateStore extends RMStateStore {
    * Get alternate path for app id if path according to configured split index
    * does not exist. We look for path based on all possible split indices.
    * @param appId
-   * @return a {@link AppNodeSplitInfo} object containing the path and split
+   * @return a {@link ZnodeSplitInfo} object containing the path and split
    *    index if it exists, null otherwise.
    * @throws Exception if any problem occurs while performing ZK operation.
    */
-  private AppNodeSplitInfo getAlternatePath(String appId) throws Exception {
+  private ZnodeSplitInfo getAlternateAppPath(String appId) throws Exception {
     for (Map.Entry<Integer, String> entry : rmAppRootHierarchies.entrySet()) {
       // Look for other paths
       int splitIndex = entry.getKey();
       if (splitIndex != appIdNodeSplitIndex) {
         String alternatePath =
-            getLeafAppIdNodePath(appId, entry.getValue(), splitIndex, false);
+            getLeafZnodePath(appId, entry.getValue(), splitIndex, false);
         if (exists(alternatePath)) {
-          return new AppNodeSplitInfo(alternatePath, splitIndex);
+          return new ZnodeSplitInfo(alternatePath, splitIndex);
         }
       }
     }
@@ -1176,26 +1265,25 @@ public class ZKRMStateStore extends RMStateStore {
   }
 
   /**
-   * Returns leaf app node path based on app id and passed split index. If the
-   * passed flag createParentIfNotExists is true, also creates the parent app
-   * node if it does not exist.
-   * @param appId application id.
+   * Returns leaf znode path based on node name and passed split index. If the
+   * passed flag createParentIfNotExists is true, also creates the parent znode
+   * if it does not exist.
+   * @param nodeName the node name.
    * @param rootNode app root node based on split index.
-   * @param appIdNodeSplitIdx split index.
-   * @param createParentIfNotExists flag which determines if parent app node
+   * @param splitIdx split index.
+   * @param createParentIfNotExists flag which determines if parent znode
    *     needs to be created(as per split) if it does not exist.
-   * @return leaf app node path.
+   * @return leaf znode path.
    * @throws Exception if any problem occurs while performing ZK operation.
    */
-  private String getLeafAppIdNodePath(String appId, String rootNode,
-      int appIdNodeSplitIdx, boolean createParentIfNotExists) throws Exception 
{
-    if (appIdNodeSplitIdx == 0) {
-      return getNodePath(rootNode, appId);
+  private String getLeafZnodePath(String nodeName, String rootNode,
+      int splitIdx, boolean createParentIfNotExists) throws Exception {
+    if (splitIdx == 0) {
+      return getNodePath(rootNode, nodeName);
     }
-    String nodeName = appId;
-    int splitIdx = nodeName.length() - appIdNodeSplitIdx;
+    int split = nodeName.length() - splitIdx;
     String rootNodePath =
-        getNodePath(rootNode, nodeName.substring(0, splitIdx));
+        getNodePath(rootNode, nodeName.substring(0, split));
     if (createParentIfNotExists && !exists(rootNodePath)) {
       try {
         zkManager.safeCreate(rootNodePath, null, zkAcl, CreateMode.PERSISTENT,
@@ -1207,7 +1295,7 @@ public class ZKRMStateStore extends RMStateStore {
         }
       }
     }
-    return getNodePath(rootNodePath, nodeName.substring(splitIdx));
+    return getNodePath(rootNodePath, nodeName.substring(split));
   }
 
   /**
@@ -1222,10 +1310,77 @@ public class ZKRMStateStore extends RMStateStore {
    */
   private String getLeafAppIdNodePath(String appId,
       boolean createParentIfNotExists) throws Exception {
-    return getLeafAppIdNodePath(appId, rmAppRootHierarchies.get(
+    return getLeafZnodePath(appId, rmAppRootHierarchies.get(
         appIdNodeSplitIndex), appIdNodeSplitIndex, createParentIfNotExists);
   }
 
+  /**
+   * Returns leaf delegation token node path based on sequence number and
+   * configured split index. If the passed flag createParentIfNotExists is 
true,
+   * also creates the parent znode if it does not exist.  The sequence number
+   * is padded to be at least 4 digits wide to ensure consistency with the 
split
+   * indexing.
+   * @param rmDTSequenceNumber delegation token sequence number.
+   * @param createParentIfNotExists flag which determines if parent znode
+   *     needs to be created(as per split) if it does not exist.
+   * @return leaf delegation token node path.
+   * @throws Exception if any problem occurs while performing ZK operation.
+   */
+  private String getLeafDelegationTokenNodePath(int rmDTSequenceNumber,
+      boolean createParentIfNotExists) throws Exception {
+    return getLeafDelegationTokenNodePath(rmDTSequenceNumber,
+        createParentIfNotExists, delegationTokenNodeSplitIndex);
+  }
+
+  /**
+   * Returns leaf delegation token node path based on sequence number and
+   * passed split index. If the passed flag createParentIfNotExists is true,
+   * also creates the parent znode if it does not exist.  The sequence number
+   * is padded to be at least 4 digits wide to ensure consistency with the 
split
+   * indexing.
+   * @param rmDTSequenceNumber delegation token sequence number.
+   * @param createParentIfNotExists flag which determines if parent znode
+   *     needs to be created(as per split) if it does not exist.
+   * @param split the split index to use
+   * @return leaf delegation token node path.
+   * @throws Exception if any problem occurs while performing ZK operation.
+   */
+  private String getLeafDelegationTokenNodePath(int rmDTSequenceNumber,
+      boolean createParentIfNotExists, int split) throws Exception {
+    String nodeName = DELEGATION_TOKEN_PREFIX;
+    if (split == 0) {
+      nodeName += rmDTSequenceNumber;
+    } else {
+      nodeName += String.format("%04d", rmDTSequenceNumber);
+    }
+    return getLeafZnodePath(nodeName, rmDelegationTokenHierarchies.get(split),
+        split, createParentIfNotExists);
+  }
+
+  /**
+   * Get alternate path for delegation token if path according to configured
+   * split index does not exist. We look for path based on all possible split
+   * indices.
+   * @param rmDTSequenceNumber delegation token sequence number.
+   * @return a {@link ZnodeSplitInfo} object containing the path and split
+   *    index if it exists, null otherwise.
+   * @throws Exception if any problem occurs while performing ZK operation.
+   */
+  private ZnodeSplitInfo getAlternateDTPath(int rmDTSequenceNumber)
+      throws Exception {
+    // Check all possible paths until we find it
+    for (int splitIndex : rmDelegationTokenHierarchies.keySet()) {
+      if (splitIndex != delegationTokenNodeSplitIndex) {
+        String alternatePath = getLeafDelegationTokenNodePath(
+            rmDTSequenceNumber, false, splitIndex);
+        if (exists(alternatePath)) {
+          return new ZnodeSplitInfo(alternatePath, splitIndex);
+        }
+      }
+    }
+    return null;
+  }
+
   @VisibleForTesting
   byte[] getData(final String path) throws Exception {
     return zkManager.getData(path);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aa65f6c1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
index cdea7df..48163f9 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
@@ -69,7 +69,6 @@ import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooDefs.Perms;
 import org.apache.zookeeper.data.ACL;
 import org.junit.After;
-import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -80,16 +79,20 @@ import com.google.common.collect.Lists;
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 
 import java.io.IOException;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import javax.crypto.SecretKey;
 
@@ -133,10 +136,9 @@ public class TestZKRMStateStore extends 
RMStateStoreTestBase {
     TestZKRMStateStoreInternal store;
     String workingZnode;
 
-
     class TestZKRMStateStoreInternal extends ZKRMStateStore {
 
-      public TestZKRMStateStoreInternal(Configuration conf, String 
workingZnode)
+      TestZKRMStateStoreInternal(Configuration conf, String workingZnode)
           throws Exception {
         setResourceManager(new ResourceManager());
         init(conf);
@@ -145,7 +147,7 @@ public class TestZKRMStateStore extends 
RMStateStoreTestBase {
         assertTrue(znodeWorkingPath.equals(workingZnode));
       }
 
-      public String getVersionNode() {
+      private String getVersionNode() {
         return znodeWorkingPath + "/" + ROOT_ZNODE_NAME + "/" + VERSION_NODE;
       }
 
@@ -167,11 +169,11 @@ public class TestZKRMStateStore extends 
RMStateStoreTestBase {
         return rootPath + "/" + appPath;
       }
 
-      public String getAppNode(String appId) {
+      private String getAppNode(String appId) {
         return getAppNode(appId, 0);
       }
 
-      public String getAttemptNode(String appId, String attemptId) {
+      private String getAttemptNode(String appId, String attemptId) {
         return getAppNode(appId) + "/" + attemptId;
       }
 
@@ -179,10 +181,28 @@ public class TestZKRMStateStore extends 
RMStateStoreTestBase {
        * Emulating retrying createRootDir not to raise NodeExist exception
        * @throws Exception
        */
-      public void testRetryingCreateRootDir() throws Exception {
+      private void testRetryingCreateRootDir() throws Exception {
         create(znodeWorkingPath);
       }
 
+      private String getDelegationTokenNode(int rmDTSequenceNumber, int 
splitIdx) {
+        String rootPath = workingZnode + "/" + ROOT_ZNODE_NAME + "/" +
+            RM_DT_SECRET_MANAGER_ROOT + "/" +
+            RMStateStore.RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME;
+        String nodeName = DELEGATION_TOKEN_PREFIX;
+        if (splitIdx == 0) {
+          nodeName += rmDTSequenceNumber;
+        } else {
+          nodeName += String.format("%04d", rmDTSequenceNumber);
+        }
+        String path = nodeName;
+        if (splitIdx != 0) {
+          int idx = nodeName.length() - splitIdx;
+          path = splitIdx + "/" + nodeName.substring(0, idx) + "/"
+              + nodeName.substring(idx);
+        }
+        return rootPath + "/" + path;
+      }
     }
 
     private RMStateStore createStore(Configuration conf) throws Exception {
@@ -240,6 +260,17 @@ public class TestZKRMStateStore extends 
RMStateStoreTestBase {
           .forPath(store.getAttemptNode(
               attemptId.getApplicationId().toString(), attemptId.toString()));
     }
+
+    public boolean delegationTokenExists(RMDelegationTokenIdentifier token,
+        int index) throws Exception {
+      int rmDTSequenceNumber = token.getSequenceNumber();
+      return curatorFramework.checkExists().forPath(
+          store.getDelegationTokenNode(rmDTSequenceNumber, index)) != null;
+    }
+
+    public int getDelegationTokenNodeSplitIndex() {
+      return store.delegationTokenNodeSplitIndex;
+    }
   }
 
   @Test (timeout = 60000)
@@ -337,7 +368,8 @@ public class TestZKRMStateStore extends 
RMStateStoreTestBase {
     RMStateStore store = zkTester.getRMStateStore();
     Version defaultVersion = zkTester.getCurrentVersion();
     store.checkVersion();
-    Assert.assertEquals(defaultVersion, store.loadVersion());
+    assertEquals("Store had wrong version",
+        defaultVersion, store.loadVersion());
   }
 
   public static Configuration createHARMConf(String rmIds, String rmId,
@@ -551,11 +583,20 @@ public class TestZKRMStateStore extends 
RMStateStoreTestBase {
             new Text("renewer1"), new Text("realuser1"));
     Long renewDate1 = new Long(System.currentTimeMillis()); 
     dtId1.setSequenceNumber(1111);
+    assertFalse("Token " + dtId1
+        + " should not exist but was found in ZooKeeper",
+        zkTester.delegationTokenExists(dtId1, 0));
     store.storeRMDelegationToken(dtId1, renewDate1);
+    assertFalse("Token " + dtId1
+        + " should not exist but was found in ZooKeeper",
+        zkTester.delegationTokenExists(dtId1, 0));
     assertEquals("RMStateStore should have been in fenced state", true,
         store.isFencedState());
 
     store.updateRMDelegationToken(dtId1, renewDate1);
+    assertFalse("Token " + dtId1
+        + " should not exist but was found in ZooKeeper",
+        zkTester.delegationTokenExists(dtId1, 0));
     assertEquals("RMStateStore should have been in fenced state", true,
         store.isFencedState());
 
@@ -611,7 +652,7 @@ public class TestZKRMStateStore extends 
RMStateStoreTestBase {
     try {
       store.removeApplicationStateInternal(appStateRemoved);
     } catch (KeeperException.NoNodeException nne) {
-      Assert.fail("NoNodeException should not happen.");
+      fail("NoNodeException should not happen.");
     }
     store.close();
   }
@@ -1129,4 +1170,317 @@ public class TestZKRMStateStore extends 
RMStateStoreTestBase {
     // Close the state store.
     store.close();
   }
+
+  private static Configuration createConfForDelegationTokenNodeSplit(
+      int splitIndex) {
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.setInt(YarnConfiguration.ZK_DELEGATION_TOKEN_NODE_SPLIT_INDEX,
+        splitIndex);
+    return conf;
+  }
+
+  private void verifyDelegationTokensStateStore(
+      TestZKRMStateStoreTester zkTester,
+      Map<RMDelegationTokenIdentifier, Long> tokensWithRenewal,
+      Map<RMDelegationTokenIdentifier, Integer> tokensWithIndex,
+      int sequenceNumber) throws Exception {
+    RMStateStore.RMDTSecretManagerState secretManagerState =
+        zkTester.store.loadState().getRMDTSecretManagerState();
+    assertEquals("Unexpected token state",
+        tokensWithRenewal, secretManagerState.getTokenState());
+    assertEquals("Unexpected sequence number", sequenceNumber,
+        secretManagerState.getDTSequenceNumber());
+    for (Map.Entry<RMDelegationTokenIdentifier, Integer> tokenEntry
+        : tokensWithIndex.entrySet()) {
+      assertTrue("Expected to find token " + tokenEntry.getKey()
+          + " in zookeeper but did not",
+          zkTester.delegationTokenExists(tokenEntry.getKey(),
+          tokenEntry.getValue()));
+    }
+  }
+
+  private void verifyDelegationTokenInStateStore(
+      TestZKRMStateStoreTester zkTester, RMDelegationTokenIdentifier token,
+      long renewDate, int index) throws Exception {
+    RMStateStore.RMDTSecretManagerState secretManagerState =
+        zkTester.store.loadState().getRMDTSecretManagerState();
+    Map<RMDelegationTokenIdentifier, Long> tokenState =
+        secretManagerState.getTokenState();
+    assertTrue("token state does not contain " + token,
+        tokenState.containsKey(token));
+    assertTrue("token state does not contain a token with renewal " + 
renewDate,
+        tokenState.containsValue(renewDate));
+    assertTrue("Token " + token + "should exist but was not found in 
ZooKeeper",
+        zkTester.delegationTokenExists(token, index));
+  }
+
+  private RMDelegationTokenIdentifier storeUpdateAndVerifyDelegationToken(
+      TestZKRMStateStoreTester zkTester,
+      Map<RMDelegationTokenIdentifier, Long> tokensWithRenewal,
+      Map<RMDelegationTokenIdentifier, Integer> tokensWithIndex,
+      int sequenceNumber, int split) throws Exception {
+    // Store token
+    RMDelegationTokenIdentifier token =
+        new RMDelegationTokenIdentifier(new Text("owner"),
+            new Text("renewer"), new Text("realuser"));
+    assertFalse("Token should not exist but was found in ZooKeeper",
+        zkTester.delegationTokenExists(token, split));
+    token.setSequenceNumber(sequenceNumber);
+    Long renewDate = System.currentTimeMillis();
+    zkTester.store.storeRMDelegationToken(token, renewDate);
+    modifyRMDelegationTokenState();
+    tokensWithRenewal.put(token, renewDate);
+    tokensWithIndex.put(token, split);
+
+    // Verify the token
+    verifyDelegationTokensStateStore(zkTester, tokensWithRenewal,
+        tokensWithIndex, sequenceNumber);
+
+    // Update the token
+    renewDate = System.currentTimeMillis();
+    zkTester.store.updateRMDelegationToken(token, renewDate);
+    tokensWithRenewal.put(token, renewDate);
+    tokensWithIndex.put(token, split);
+
+    // Verify updates
+    verifyDelegationTokensStateStore(zkTester, tokensWithRenewal,
+        tokensWithIndex, sequenceNumber);
+
+    return token;
+  }
+
+  @Test
+  public void testDelegationTokenSplitIndexConfig() throws Exception {
+    // Valid values
+    TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester();
+    zkTester.getRMStateStore(createConfForDelegationTokenNodeSplit(0)).close();
+    assertEquals("Incorrect split index",
+        0, zkTester.getDelegationTokenNodeSplitIndex());
+    zkTester.getRMStateStore(createConfForDelegationTokenNodeSplit(1)).close();
+    assertEquals("Incorrect split index",
+        1, zkTester.getDelegationTokenNodeSplitIndex());
+    zkTester.getRMStateStore(createConfForDelegationTokenNodeSplit(2)).close();
+    assertEquals("Incorrect split index",
+        2, zkTester.getDelegationTokenNodeSplitIndex());
+    zkTester.getRMStateStore(createConfForDelegationTokenNodeSplit(3)).close();
+    assertEquals("Incorrect split index",
+        3, zkTester.getDelegationTokenNodeSplitIndex());
+    zkTester.getRMStateStore(createConfForDelegationTokenNodeSplit(4)).close();
+    assertEquals("Incorrect split index",
+        4, zkTester.getDelegationTokenNodeSplitIndex());
+
+    // Invalid values --> override to 0
+    
zkTester.getRMStateStore(createConfForDelegationTokenNodeSplit(-1)).close();
+    assertEquals("Incorrect split index",
+        0, zkTester.getDelegationTokenNodeSplitIndex());
+    zkTester.getRMStateStore(createConfForDelegationTokenNodeSplit(5)).close();
+    assertEquals("Incorrect split index",
+        0, zkTester.getDelegationTokenNodeSplitIndex());
+  }
+
+  @Test
+  public void testDelegationTokenNodeNoSplit() throws Exception {
+    testDelegationTokenNode(0);
+  }
+
+  @Test
+  public void testDelegationTokenNodeWithSplitOne() throws Exception {
+    testDelegationTokenNode(1);
+  }
+
+  @Test
+  public void testDelegationTokenNodeWithSplitTwo() throws Exception {
+    testDelegationTokenNode(2);
+  }
+
+  @Test
+  public void testDelegationTokenNodeWithSplitThree() throws Exception {
+    testDelegationTokenNode(3);
+  }
+
+  @Test
+  public void testDelegationTokenNodeWithSplitFour() throws Exception {
+    testDelegationTokenNode(4);
+  }
+
+  public void testDelegationTokenNode(int split) throws Exception {
+    TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester();
+    Configuration conf = createConfForDelegationTokenNodeSplit(split);
+    RMStateStore store = zkTester.getRMStateStore(conf);
+
+    // Store the token and verify
+    Map<RMDelegationTokenIdentifier, Long> tokensWithRenewal = new HashMap<>();
+    Map<RMDelegationTokenIdentifier, Integer> tokensWithIndex = new 
HashMap<>();
+    int sequenceNumber = 0;
+    RMDelegationTokenIdentifier token = storeUpdateAndVerifyDelegationToken(
+        zkTester, tokensWithRenewal, tokensWithIndex, sequenceNumber, split);
+
+    // Delete the token and verify
+    store.removeRMDelegationToken(token);
+    RMStateStore.RMDTSecretManagerState state =
+        store.loadState().getRMDTSecretManagerState();
+    tokensWithRenewal.clear();
+    tokensWithIndex.clear();
+    assertEquals("Unexpected token state",
+        tokensWithRenewal, state.getTokenState());
+    assertEquals("Unexpected sequence number",
+        sequenceNumber, state.getDTSequenceNumber());
+    assertFalse("Token should not exist but was found in ZooKeeper",
+        zkTester.delegationTokenExists(token, split));
+    store.close();
+  }
+
+  @Test
+  public void testDelegationTokenNodeWithSplitMultiple() throws Exception {
+    TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester();
+    Configuration conf = createConfForDelegationTokenNodeSplit(1);
+    RMStateStore store = zkTester.getRMStateStore(conf);
+
+    // With the split set to 1, we can store 10 tokens under a znode (i.e. 0-9)
+    // Try to store more than 10
+    Map<RMDelegationTokenIdentifier, Long> tokensWithRenewal = new HashMap<>();
+    Map<RMDelegationTokenIdentifier, Integer> tokensWithIndex = new 
HashMap<>();
+    Set<RMDelegationTokenIdentifier> tokensToDelete = new HashSet<>();
+    int sequenceNumber = 0;
+    for (int i = 0; i <= 12; i++) {
+      RMDelegationTokenIdentifier token =
+          new RMDelegationTokenIdentifier(new Text("owner" + i),
+              new Text("renewer" + i), new Text("realuser" + i));
+      sequenceNumber = i;
+      token.setSequenceNumber(sequenceNumber);
+      assertFalse("Token should not exist but was found in ZooKeeper",
+          zkTester.delegationTokenExists(token, 1));
+      Long renewDate = System.currentTimeMillis();
+      store.storeRMDelegationToken(token, renewDate);
+      modifyRMDelegationTokenState();
+      tokensWithRenewal.put(token, renewDate);
+      tokensWithIndex.put(token, 1);
+      switch (i) {
+        case 0:
+        case 3:
+        case 6:
+        case 11:
+          tokensToDelete.add(token);
+          break;
+        default:
+          break;
+      }
+    }
+    // Verify
+    verifyDelegationTokensStateStore(zkTester, tokensWithRenewal,
+        tokensWithIndex, sequenceNumber);
+
+    // Try deleting some tokens and adding some new ones
+    for (RMDelegationTokenIdentifier tokenToDelete : tokensToDelete) {
+      store.removeRMDelegationToken(tokenToDelete);
+      tokensWithRenewal.remove(tokenToDelete);
+      tokensWithIndex.remove(tokenToDelete);
+    }
+    for (int i = 13; i <= 22; i++) {
+      RMDelegationTokenIdentifier token =
+          new RMDelegationTokenIdentifier(new Text("owner" + i),
+              new Text("renewer" + i), new Text("realuser" + i));
+      sequenceNumber = i;
+      token.setSequenceNumber(sequenceNumber);
+      Long renewDate = System.currentTimeMillis();
+      store.storeRMDelegationToken(token, renewDate);
+      modifyRMDelegationTokenState();
+      tokensWithRenewal.put(token, renewDate);
+      tokensWithIndex.put(token, 1);
+    }
+    // Verify
+    verifyDelegationTokensStateStore(zkTester, tokensWithRenewal,
+        tokensWithIndex, sequenceNumber);
+    for (RMDelegationTokenIdentifier token : tokensToDelete) {
+      assertFalse("Token " + token
+              + " should not exist but was found in ZooKeeper",
+          zkTester.delegationTokenExists(token, 1));
+    }
+    store.close();
+  }
+
+  @Test
+  public void testDelegationTokenNodeWithSplitChangeAcrossRestarts()
+      throws Exception {
+    TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester();
+    Map<RMDelegationTokenIdentifier, Long> tokensWithRenewal = new HashMap<>();
+    Map<RMDelegationTokenIdentifier, Integer> tokensWithIndex = new 
HashMap<>();
+    int sequenceNumber = 0;
+
+    // Start the store with index 1
+    Configuration conf = createConfForDelegationTokenNodeSplit(1);
+    RMStateStore store = zkTester.getRMStateStore(conf);
+    // Store a token with index 1
+    RMDelegationTokenIdentifier token1 = storeUpdateAndVerifyDelegationToken(
+        zkTester, tokensWithRenewal, tokensWithIndex, sequenceNumber, 1);
+    store.close();
+
+    // Start the store with index 2
+    conf = createConfForDelegationTokenNodeSplit(2);
+    store = zkTester.getRMStateStore(conf);
+    // Verify token1 is still there and under the /1/ znode
+    verifyDelegationTokenInStateStore(
+        zkTester, token1, tokensWithRenewal.get(token1), 1);
+    // Store a token with index 2
+    sequenceNumber++;
+    RMDelegationTokenIdentifier token2 = storeUpdateAndVerifyDelegationToken(
+        zkTester, tokensWithRenewal, tokensWithIndex, sequenceNumber, 2);
+    // Update and verify token1
+    long renewDate1 = System.currentTimeMillis();
+    zkTester.store.updateRMDelegationToken(token1, renewDate1);
+    tokensWithRenewal.put(token1, renewDate1);
+    verifyDelegationTokenInStateStore(
+        zkTester, token1, tokensWithRenewal.get(token1), 1);
+    store.close();
+
+    // Start the store with index 0
+    conf = createConfForDelegationTokenNodeSplit(0);
+    store = zkTester.getRMStateStore(conf);
+    // Verify token1 is still there and under the /1/ znode
+    verifyDelegationTokenInStateStore(
+        zkTester, token1, tokensWithRenewal.get(token1), 1);
+    // Verify token2 is still there and under the /2/ znode
+    verifyDelegationTokenInStateStore(
+        zkTester, token2, tokensWithRenewal.get(token2), 2);
+    // Store a token with no index
+    sequenceNumber++;
+    RMDelegationTokenIdentifier token0 = storeUpdateAndVerifyDelegationToken(
+        zkTester, tokensWithRenewal, tokensWithIndex, sequenceNumber, 0);
+    store.close();
+
+    // Start the store with index 3
+    conf = createConfForDelegationTokenNodeSplit(3);
+    store = zkTester.getRMStateStore(conf);
+    // Verify token1 is still there and under the /1/ znode
+    verifyDelegationTokenInStateStore(
+        zkTester, token1, tokensWithRenewal.get(token1), 1);
+    // Verify token2 is still there and under the /2/ znode
+    verifyDelegationTokenInStateStore(
+        zkTester, token2, tokensWithRenewal.get(token2), 2);
+    // Verify token0 is still there and under the token root node
+    verifyDelegationTokenInStateStore(
+        zkTester, token0, tokensWithRenewal.get(token0), 0);
+    // Delete all tokens and verify
+    for (RMDelegationTokenIdentifier token : tokensWithRenewal.keySet()) {
+      store.removeRMDelegationToken(token);
+    }
+    tokensWithRenewal.clear();
+    tokensWithIndex.clear();
+    verifyDelegationTokensStateStore(
+        zkTester, tokensWithRenewal, tokensWithIndex, sequenceNumber);
+    assertFalse("Token " + token1
+            + " should not exist but was found in ZooKeeper",
+        zkTester.delegationTokenExists(token1, 1));
+    assertFalse("Token " + token1
+            + " should not exist but was found in ZooKeeper",
+        zkTester.delegationTokenExists(token2, 2));
+    assertFalse("Token " + token1
+            + " should not exist but was found in ZooKeeper",
+        zkTester.delegationTokenExists(token0, 0));
+    // Store a token with index 3
+    sequenceNumber++;
+    storeUpdateAndVerifyDelegationToken(zkTester, tokensWithRenewal,
+        tokensWithIndex, sequenceNumber, 3);
+    store.close();
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to