HADOOP-13487. Hadoop KMS should load old delegation tokens from Zookeeper on 
startup. Contributed by Xiao Chen.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f4d4d347
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f4d4d347
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f4d4d347

Branch: refs/heads/YARN-3368
Commit: f4d4d3474cfd2d1f2d243f5ae5cec17af38270b1
Parents: 22fc46d
Author: Xiao Chen <x...@apache.org>
Authored: Mon Aug 22 14:31:13 2016 -0700
Committer: Xiao Chen <x...@apache.org>
Committed: Mon Aug 22 14:42:13 2016 -0700

----------------------------------------------------------------------
 .../ZKDelegationTokenSecretManager.java         | 44 +++++++++
 .../TestZKDelegationTokenSecretManager.java     | 93 +++++++++++++++++++-
 2 files changed, 136 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/f4d4d347/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java
index c3ad9f3..6c66e98 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java
@@ -361,6 +361,7 @@ public abstract class 
ZKDelegationTokenSecretManager<TokenIdent extends Abstract
             }
           }
         }, listenerThreadPool);
+        loadFromZKCache(false);
       }
     } catch (Exception e) {
       throw new IOException("Could not start PathChildrenCache for keys", e);
@@ -389,6 +390,7 @@ public abstract class 
ZKDelegationTokenSecretManager<TokenIdent extends Abstract
             }
           }
         }, listenerThreadPool);
+        loadFromZKCache(true);
       }
     } catch (Exception e) {
       throw new IOException("Could not start PathChildrenCache for tokens", e);
@@ -396,6 +398,43 @@ public abstract class 
ZKDelegationTokenSecretManager<TokenIdent extends Abstract
     super.startThreads();
   }
 
+  /**
+   * Load the PathChildrenCache into the in-memory map. Possible caches to be
+   * loaded are keyCache and tokenCache.
+   *
+   * @param isTokenCache true if loading tokenCache, false if loading keyCache.
+   */
+  private void loadFromZKCache(final boolean isTokenCache) {
+    final String cacheName = isTokenCache ? "token" : "key";
+    LOG.info("Starting to load {} cache.", cacheName);
+    final List<ChildData> children;
+    if (isTokenCache) {
+      children = tokenCache.getCurrentData();
+    } else {
+      children = keyCache.getCurrentData();
+    }
+
+    int count = 0;
+    for (ChildData child : children) {
+      try {
+        if (isTokenCache) {
+          processTokenAddOrUpdate(child);
+        } else {
+          processKeyAddOrUpdate(child.getData());
+        }
+      } catch (Exception e) {
+        LOG.info("Ignoring node {} because it failed to load.",
+            child.getPath());
+        LOG.debug("Failure exception:", e);
+        ++count;
+      }
+    }
+    if (count > 0) {
+      LOG.warn("Ignored {} nodes while loading {} cache.", count, cacheName);
+    }
+    LOG.info("Loaded {} cache.", cacheName);
+  }
+
   private void processKeyAddOrUpdate(byte[] data) throws IOException {
     ByteArrayInputStream bin = new ByteArrayInputStream(data);
     DataInputStream din = new DataInputStream(bin);
@@ -890,4 +929,9 @@ public abstract class 
ZKDelegationTokenSecretManager<TokenIdent extends Abstract
   public ExecutorService getListenerThreadPool() {
     return listenerThreadPool;
   }
+
+  @VisibleForTesting
+  DelegationTokenInformation getTokenInfoFromMemory(TokenIdent ident) {
+    return currentTokens.get(ident);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f4d4d347/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestZKDelegationTokenSecretManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestZKDelegationTokenSecretManager.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestZKDelegationTokenSecretManager.java
index 185a994..c9571ff2 100644
--- 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestZKDelegationTokenSecretManager.java
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestZKDelegationTokenSecretManager.java
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 
+import com.google.common.base.Supplier;
 import org.apache.curator.RetryPolicy;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
@@ -37,6 +38,7 @@ import org.apache.hadoop.security.token.SecretManager;
 import org.apache.hadoop.security.token.Token;
 import 
org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier;
 import org.apache.hadoop.security.token.delegation.web.DelegationTokenManager;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Id;
@@ -44,12 +46,18 @@ import 
org.apache.zookeeper.server.auth.DigestAuthenticationProvider;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import static org.junit.Assert.fail;
 
-import org.junit.Test;
 
 public class TestZKDelegationTokenSecretManager {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestZKDelegationTokenSecretManager.class);
 
   private static final int TEST_RETRIES = 2;
 
@@ -61,6 +69,9 @@ public class TestZKDelegationTokenSecretManager {
 
   private TestingServer zkServer;
 
+  @Rule
+  public Timeout globalTimeout = new Timeout(300000);
+
   @Before
   public void setup() throws Exception {
     zkServer = new TestingServer();
@@ -382,4 +393,84 @@ public class TestZKDelegationTokenSecretManager {
     }
   }
 
+  @SuppressWarnings({ "unchecked" })
+  @Test
+  public void testNodesLoadedAfterRestart() throws Exception {
+    final String connectString = zkServer.getConnectString();
+    final Configuration conf = getSecretConf(connectString);
+    final int removeScan = 1;
+    // Set the remove scan interval to remove expired tokens
+    conf.setLong(DelegationTokenManager.REMOVAL_SCAN_INTERVAL, removeScan);
+    // Set the update interval to trigger background thread to run. The thread
+    // is hard-coded to sleep at least 5 seconds.
+    conf.setLong(DelegationTokenManager.UPDATE_INTERVAL, 5);
+    // Set token expire time to 5 seconds.
+    conf.setLong(DelegationTokenManager.RENEW_INTERVAL, 5);
+
+    DelegationTokenManager tm =
+        new DelegationTokenManager(conf, new Text("bla"));
+    tm.init();
+    Token<DelegationTokenIdentifier> token =
+        (Token<DelegationTokenIdentifier>) tm
+            .createToken(UserGroupInformation.getCurrentUser(), "good");
+    Assert.assertNotNull(token);
+    Token<DelegationTokenIdentifier> cancelled =
+        (Token<DelegationTokenIdentifier>) tm
+            .createToken(UserGroupInformation.getCurrentUser(), "cancelled");
+    Assert.assertNotNull(cancelled);
+    tm.verifyToken(token);
+    tm.verifyToken(cancelled);
+
+    // Cancel one token, verify it's gone
+    tm.cancelToken(cancelled, "cancelled");
+    final AbstractDelegationTokenSecretManager sm =
+        tm.getDelegationTokenSecretManager();
+    final ZKDelegationTokenSecretManager zksm =
+        (ZKDelegationTokenSecretManager) sm;
+    final AbstractDelegationTokenIdentifier idCancelled =
+        sm.decodeTokenIdentifier(cancelled);
+    LOG.info("Waiting for the cancelled token to be removed");
+
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        AbstractDelegationTokenSecretManager.DelegationTokenInformation dtinfo 
=
+            zksm.getTokenInfo(idCancelled);
+        return dtinfo == null;
+      }
+    }, 100, 5000);
+
+    // Fake a restart which launches a new tm
+    tm.destroy();
+    tm = new DelegationTokenManager(conf, new Text("bla"));
+    tm.init();
+    final AbstractDelegationTokenSecretManager smNew =
+        tm.getDelegationTokenSecretManager();
+    final ZKDelegationTokenSecretManager zksmNew =
+        (ZKDelegationTokenSecretManager) smNew;
+
+    // The cancelled token should be gone, and not loaded.
+    AbstractDelegationTokenIdentifier id =
+        smNew.decodeTokenIdentifier(cancelled);
+    AbstractDelegationTokenSecretManager.DelegationTokenInformation dtinfo =
+        zksmNew.getTokenInfo(id);
+    Assert.assertNull("canceled dt should be gone!", dtinfo);
+
+    // The good token should be loaded on startup, and removed after expiry.
+    id = smNew.decodeTokenIdentifier(token);
+    dtinfo = zksmNew.getTokenInfoFromMemory(id);
+    Assert.assertNotNull("good dt should be in memory!", dtinfo);
+
+    // Wait for the good token to expire.
+    Thread.sleep(5000);
+    final ZKDelegationTokenSecretManager zksm1 = zksmNew;
+    final AbstractDelegationTokenIdentifier id1 = id;
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        LOG.info("Waiting for the expired token to be removed...");
+        return zksm1.getTokenInfo(id1) == null;
+      }
+    }, 1000, 5000);
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to