Repository: hadoop Updated Branches: refs/heads/trunk 670879ef4 -> db45f047a
HADOOP-11170. ZKDelegationTokenSecretManager fails to renewToken created by a peer. (Arun Suresh and Gregory Chanan via kasha) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/db45f047 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/db45f047 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/db45f047 Branch: refs/heads/trunk Commit: db45f047ab6b19d8a3e7752bb2cde10827cd8dad Parents: 670879e Author: Karthik Kambatla <[email protected]> Authored: Thu Oct 23 17:04:14 2014 -0700 Committer: Karthik Kambatla <[email protected]> Committed: Thu Oct 23 17:04:14 2014 -0700 ---------------------------------------------------------------------- hadoop-common-project/hadoop-common/CHANGES.txt | 3 + .../AbstractDelegationTokenSecretManager.java | 44 +++++-- .../ZKDelegationTokenSecretManager.java | 86 ++++++++---- .../TestZKDelegationTokenSecretManager.java | 131 +++++++++++++++---- 4 files changed, 205 insertions(+), 59 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/db45f047/hadoop-common-project/hadoop-common/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index d180ea0..7b48a4b 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -368,6 +368,9 @@ Release 2.7.0 - UNRELEASED HDFS-7227. Fix findbugs warning about NP_DEREFERENCE_OF_READLINE_VALUE in SpanReceiverHost (cmccabe) + HADOOP-11170. ZKDelegationTokenSecretManager fails to renewToken created by + a peer. (Arun Suresh and Gregory Chanan via kasha) + Release 2.6.0 - UNRELEASED INCOMPATIBLE CHANGES http://git-wip-us.apache.org/repos/asf/hadoop/blob/db45f047/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java index ebba6a1..ac399ec 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java @@ -20,10 +20,13 @@ package org.apache.hadoop.security.token.delegation; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.io.MD5Hash; import org.apache.hadoop.io.Text; import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.DataInputStream; +import java.io.DataOutputStream; import java.io.IOException; import java.util.Arrays; import java.util.HashMap; @@ -125,7 +128,7 @@ extends AbstractDelegationTokenIdentifier> * Reset all data structures and mutable state. */ public synchronized void reset() { - currentId = 0; + setCurrentKeyId(0); allKeys.clear(); setDelegationTokenSeqNum(0); currentTokens.clear(); @@ -138,8 +141,8 @@ extends AbstractDelegationTokenIdentifier> public synchronized void addKey(DelegationKey key) throws IOException { if (running) // a safety check throw new IOException("Can't add delegation key to a running SecretManager."); - if (key.getKeyId() > currentId) { - currentId = key.getKeyId(); + if (key.getKeyId() > getCurrentKeyId()) { + setCurrentKeyId(key.getKeyId()); } allKeys.put(key.getKeyId(), key); } @@ -186,6 +189,30 @@ extends AbstractDelegationTokenIdentifier> * For subclasses externalizing the storage, for example Zookeeper * based implementations */ + protected synchronized int getCurrentKeyId() { + return currentId; + } + + /** + * For subclasses externalizing the storage, for example Zookeeper + * based implementations + */ + protected synchronized int incrementCurrentKeyId() { + return ++currentId; + } + + /** + * For subclasses externalizing the storage, for example Zookeeper + * based implementations + */ + protected synchronized void setCurrentKeyId(int keyId) { + currentId = keyId; + } + + /** + * For subclasses externalizing the storage, for example Zookeeper + * based implementations + */ protected synchronized int getDelegationTokenSeqNum() { return delegationTokenSequenceNumber; } @@ -282,8 +309,8 @@ extends AbstractDelegationTokenIdentifier> return; } byte[] password = createPassword(identifier.getBytes(), dKey.getKey()); - if (identifier.getSequenceNumber() > delegationTokenSequenceNumber) { - delegationTokenSequenceNumber = identifier.getSequenceNumber(); + if (identifier.getSequenceNumber() > getDelegationTokenSeqNum()) { + setDelegationTokenSeqNum(identifier.getSequenceNumber()); } if (getTokenInfo(identifier) == null) { currentTokens.put(identifier, new DelegationTokenInformation(renewDate, @@ -303,7 +330,7 @@ extends AbstractDelegationTokenIdentifier> /* Create a new currentKey with an estimated expiry date. */ int newCurrentId; synchronized (this) { - newCurrentId = currentId+1; + newCurrentId = incrementCurrentKeyId(); } DelegationKey newKey = new DelegationKey(newCurrentId, System .currentTimeMillis() @@ -311,7 +338,6 @@ extends AbstractDelegationTokenIdentifier> //Log must be invoked outside the lock on 'this' logUpdateMasterKey(newKey); synchronized (this) { - currentId = newKey.getKeyId(); currentKey = newKey; storeDelegationKey(currentKey); } @@ -358,9 +384,9 @@ extends AbstractDelegationTokenIdentifier> sequenceNum = incrementDelegationTokenSeqNum(); identifier.setIssueDate(now); identifier.setMaxDate(now + tokenMaxLifetime); - identifier.setMasterKeyId(currentId); + identifier.setMasterKeyId(currentKey.getKeyId()); identifier.setSequenceNumber(sequenceNum); - LOG.info("Creating password for identifier: " + identifier); + LOG.info("Creating password for identifier: [" + MD5Hash.digest(identifier.getBytes()) + ", " + currentKey.getKeyId() + "]"); byte[] password = createPassword(identifier.getBytes(), currentKey.getKey()); DelegationTokenInformation tokenInfo = new DelegationTokenInformation(now + tokenRenewInterval, password, getTrackingIdIfEnabled(identifier)); http://git-wip-us.apache.org/repos/asf/hadoop/blob/db45f047/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java index 5f68844..82dd2da 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java @@ -48,7 +48,6 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.security.token.SecretManager; import org.apache.hadoop.security.token.delegation.web.DelegationTokenManager; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; @@ -56,6 +55,7 @@ import org.apache.zookeeper.ZooDefs.Perms; import org.apache.zookeeper.client.ZooKeeperSaslClient; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Id; +import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -104,6 +104,7 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract private static final String ZK_DTSM_NAMESPACE = "ZKDTSMRoot"; private static final String ZK_DTSM_SEQNUM_ROOT = "ZKDTSMSeqNumRoot"; + private static final String ZK_DTSM_KEYID_ROOT = "ZKDTSMKeyIdRoot"; private static final String ZK_DTSM_TOKENS_ROOT = "ZKDTSMTokensRoot"; private static final String ZK_DTSM_MASTER_KEY_ROOT = "ZKDTSMMasterKeyRoot"; @@ -119,7 +120,8 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract private final boolean isExternalClient; private final CuratorFramework zkClient; - private SharedCount seqCounter; + private SharedCount delTokSeqCounter; + private SharedCount keyIdSeqCounter; private PathChildrenCache keyCache; private PathChildrenCache tokenCache; private ExecutorService listenerThreadPool; @@ -276,7 +278,7 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract } @Override - public synchronized void startThreads() throws IOException { + public void startThreads() throws IOException { if (!isExternalClient) { try { zkClient.start(); @@ -285,14 +287,22 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract } } try { - seqCounter = new SharedCount(zkClient, ZK_DTSM_SEQNUM_ROOT, 0); - if (seqCounter != null) { - seqCounter.start(); + delTokSeqCounter = new SharedCount(zkClient, ZK_DTSM_SEQNUM_ROOT, 0); + if (delTokSeqCounter != null) { + delTokSeqCounter.start(); } } catch (Exception e) { throw new IOException("Could not start Sequence Counter", e); } try { + keyIdSeqCounter = new SharedCount(zkClient, ZK_DTSM_KEYID_ROOT, 0); + if (keyIdSeqCounter != null) { + keyIdSeqCounter.start(); + } + } catch (Exception e) { + throw new IOException("Could not start KeyId Counter", e); + } + try { createPersistentNode(ZK_DTSM_MASTER_KEY_ROOT); createPersistentNode(ZK_DTSM_TOKENS_ROOT); } catch (Exception e) { @@ -402,13 +412,16 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract } @Override - public synchronized void stopThreads() { + public void stopThreads() { try { if (!isExternalClient && (zkClient != null)) { zkClient.close(); } - if (seqCounter != null) { - seqCounter.close(); + if (delTokSeqCounter != null) { + delTokSeqCounter.close(); + } + if (keyIdSeqCounter != null) { + keyIdSeqCounter.close(); } if (keyCache != null) { keyCache.close(); @@ -434,31 +447,47 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract } @Override - protected synchronized int getDelegationTokenSeqNum() { - return seqCounter.getCount(); + protected int getDelegationTokenSeqNum() { + return delTokSeqCounter.getCount(); } @Override - protected synchronized int incrementDelegationTokenSeqNum() { + protected int incrementDelegationTokenSeqNum() { try { - while (!seqCounter.trySetCount(seqCounter.getCount() + 1)) { + while (!delTokSeqCounter.trySetCount(delTokSeqCounter.getCount() + 1)) { } } catch (Exception e) { throw new RuntimeException("Could not increment shared counter !!", e); } - return seqCounter.getCount(); + return delTokSeqCounter.getCount(); } @Override - protected synchronized void setDelegationTokenSeqNum(int seqNum) { + protected void setDelegationTokenSeqNum(int seqNum) { try { - seqCounter.setCount(seqNum); + delTokSeqCounter.setCount(seqNum); } catch (Exception e) { throw new RuntimeException("Could not set shared counter !!", e); } } @Override + protected int getCurrentKeyId() { + return keyIdSeqCounter.getCount(); + } + + @Override + protected int incrementCurrentKeyId() { + try { + while (!keyIdSeqCounter.trySetCount(keyIdSeqCounter.getCount() + 1)) { + } + } catch (Exception e) { + throw new RuntimeException("Could not increment shared keyId counter !!", e); + } + return keyIdSeqCounter.getCount(); + } + + @Override protected DelegationKey getDelegationKey(int keyId) { // First check if its I already have this key DelegationKey key = allKeys.get(keyId); @@ -518,6 +547,11 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract private DelegationTokenInformation getTokenInfoFromZK(TokenIdent ident) throws IOException { + return getTokenInfoFromZK(ident, false); + } + + private DelegationTokenInformation getTokenInfoFromZK(TokenIdent ident, + boolean quiet) throws IOException { String nodePath = getNodePath(ZK_DTSM_TOKENS_ROOT, DELEGATION_TOKEN_PREFIX + ident.getSequenceNumber()); @@ -539,7 +573,9 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract return tokenInfo; } } catch (KeeperException.NoNodeException e) { - LOG.error("No node in path [" + nodePath + "]"); + if (!quiet) { + LOG.error("No node in path [" + nodePath + "]"); + } } catch (Exception ex) { throw new IOException(ex); } @@ -604,7 +640,9 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract } try { if (zkClient.checkExists().forPath(nodeRemovePath) != null) { - zkClient.delete().forPath(nodeRemovePath); + while(zkClient.checkExists().forPath(nodeRemovePath) != null){ + zkClient.delete().guaranteed().forPath(nodeRemovePath); + } } else { LOG.debug("Attempted to delete a non-existing znode " + nodeRemovePath); } @@ -633,10 +671,10 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract + ident.getSequenceNumber()); try { if (zkClient.checkExists().forPath(nodeRemovePath) != null) { + addOrUpdateToken(ident, tokenInfo, true); + } else { addOrUpdateToken(ident, tokenInfo, false); LOG.debug("Attempted to update a non-existing znode " + nodeRemovePath); - } else { - addOrUpdateToken(ident, tokenInfo, true); } } catch (Exception e) { throw new RuntimeException("Could not update Stored Token ZKDTSMDelegationToken_" @@ -656,9 +694,11 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract } try { if (zkClient.checkExists().forPath(nodeRemovePath) != null) { - LOG.debug("Attempted to remove a non-existing znode " + nodeRemovePath); + while(zkClient.checkExists().forPath(nodeRemovePath) != null){ + zkClient.delete().guaranteed().forPath(nodeRemovePath); + } } else { - zkClient.delete().forPath(nodeRemovePath); + LOG.debug("Attempted to remove a non-existing znode " + nodeRemovePath); } } catch (Exception e) { throw new RuntimeException( @@ -682,7 +722,7 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract tokenOut.writeInt(info.getPassword().length); tokenOut.write(info.getPassword()); if (LOG.isDebugEnabled()) { - LOG.debug((isUpdate ? "Storing " : "Updating ") + LOG.debug((isUpdate ? "Updating " : "Storing ") + "ZKDTSMDelegationToken_" + ident.getSequenceNumber()); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/db45f047/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestZKDelegationTokenSecretManager.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestZKDelegationTokenSecretManager.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestZKDelegationTokenSecretManager.java index eece2de..b3049c4 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestZKDelegationTokenSecretManager.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestZKDelegationTokenSecretManager.java @@ -22,50 +22,127 @@ import org.apache.curator.test.TestingServer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.SecretManager; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier; import org.apache.hadoop.security.token.delegation.web.DelegationTokenManager; +import org.junit.After; import org.junit.Assert; +import org.junit.Before; + +import static org.junit.Assert.fail; + import org.junit.Test; public class TestZKDelegationTokenSecretManager { private static final long DAY_IN_SECS = 86400; + private TestingServer zkServer; + + @Before + public void setup() throws Exception { + zkServer = new TestingServer(); + zkServer.start(); + } + + @After + public void tearDown() throws Exception { + if (zkServer != null) { + zkServer.close(); + } + } + + protected Configuration getSecretConf(String connectString) { + Configuration conf = new Configuration(); + conf.setBoolean(DelegationTokenManager.ENABLE_ZK_KEY, true); + conf.set(ZKDelegationTokenSecretManager.ZK_DTSM_ZK_CONNECTION_STRING, connectString); + conf.set(ZKDelegationTokenSecretManager.ZK_DTSM_ZNODE_WORKING_PATH, "testPath"); + conf.set(ZKDelegationTokenSecretManager.ZK_DTSM_ZK_AUTH_TYPE, "none"); + conf.setLong(DelegationTokenManager.UPDATE_INTERVAL, DAY_IN_SECS); + conf.setLong(DelegationTokenManager.MAX_LIFETIME, DAY_IN_SECS); + conf.setLong(DelegationTokenManager.RENEW_INTERVAL, DAY_IN_SECS); + conf.setLong(DelegationTokenManager.REMOVAL_SCAN_INTERVAL, DAY_IN_SECS); + return conf; + } + @SuppressWarnings("unchecked") @Test - public void testZKDelTokSecretManager() throws Exception { - TestingServer zkServer = new TestingServer(); + public void testMultiNodeOperations() throws Exception { DelegationTokenManager tm1, tm2 = null; - zkServer.start(); + String connectString = zkServer.getConnectString(); + Configuration conf = getSecretConf(connectString); + tm1 = new DelegationTokenManager(conf, new Text("bla")); + tm1.init(); + tm2 = new DelegationTokenManager(conf, new Text("bla")); + tm2.init(); + + Token<DelegationTokenIdentifier> token = + (Token<DelegationTokenIdentifier>) tm1.createToken( + UserGroupInformation.getCurrentUser(), "foo"); + Assert.assertNotNull(token); + tm2.verifyToken(token); + tm2.renewToken(token, "foo"); + tm1.verifyToken(token); + tm1.cancelToken(token, "foo"); try { - String connectString = zkServer.getConnectString(); - Configuration conf = new Configuration(); - conf.setBoolean(DelegationTokenManager.ENABLE_ZK_KEY, true); - conf.set(ZKDelegationTokenSecretManager.ZK_DTSM_ZK_CONNECTION_STRING, connectString); - conf.set(ZKDelegationTokenSecretManager.ZK_DTSM_ZNODE_WORKING_PATH, "testPath"); - conf.set(ZKDelegationTokenSecretManager.ZK_DTSM_ZK_AUTH_TYPE, "none"); - conf.setLong(DelegationTokenManager.UPDATE_INTERVAL, DAY_IN_SECS); - conf.setLong(DelegationTokenManager.MAX_LIFETIME, DAY_IN_SECS); - conf.setLong(DelegationTokenManager.RENEW_INTERVAL, DAY_IN_SECS); - conf.setLong(DelegationTokenManager.REMOVAL_SCAN_INTERVAL, DAY_IN_SECS); - tm1 = new DelegationTokenManager(conf, new Text("foo")); - tm1.init(); - tm2 = new DelegationTokenManager(conf, new Text("foo")); - tm2.init(); - - Token<DelegationTokenIdentifier> token = - (Token<DelegationTokenIdentifier>) tm1.createToken( - UserGroupInformation.getCurrentUser(), "foo"); - Assert.assertNotNull(token); tm2.verifyToken(token); + fail("Expected InvalidToken"); + } catch (SecretManager.InvalidToken it) { + // Ignore + } - token = (Token<DelegationTokenIdentifier>) tm2.createToken( - UserGroupInformation.getCurrentUser(), "bar"); - Assert.assertNotNull(token); + token = (Token<DelegationTokenIdentifier>) tm2.createToken( + UserGroupInformation.getCurrentUser(), "bar"); + Assert.assertNotNull(token); + tm1.verifyToken(token); + tm1.renewToken(token, "bar"); + tm2.verifyToken(token); + tm2.cancelToken(token, "bar"); + try { tm1.verifyToken(token); - } finally { - zkServer.close(); + fail("Expected InvalidToken"); + } catch (SecretManager.InvalidToken it) { + // Ignore + } + } + + @SuppressWarnings("unchecked") + @Test + public void testRenewTokenSingleManager() throws Exception { + DelegationTokenManager tm1 = null; + String connectString = zkServer.getConnectString(); + Configuration conf = getSecretConf(connectString); + tm1 = new DelegationTokenManager(conf, new Text("foo")); + tm1.init(); + + Token<DelegationTokenIdentifier> token = + (Token<DelegationTokenIdentifier>) + tm1.createToken(UserGroupInformation.getCurrentUser(), "foo"); + Assert.assertNotNull(token); + tm1.renewToken(token, "foo"); + tm1.verifyToken(token); + } + + @SuppressWarnings("unchecked") + @Test + public void testCancelTokenSingleManager() throws Exception { + DelegationTokenManager tm1 = null; + String connectString = zkServer.getConnectString(); + Configuration conf = getSecretConf(connectString); + tm1 = new DelegationTokenManager(conf, new Text("foo")); + tm1.init(); + + Token<DelegationTokenIdentifier> token = + (Token<DelegationTokenIdentifier>) + tm1.createToken(UserGroupInformation.getCurrentUser(), "foo"); + Assert.assertNotNull(token); + tm1.cancelToken(token, "foo"); + try { + tm1.verifyToken(token); + fail("Expected InvalidToken"); + } catch (SecretManager.InvalidToken it) { + it.printStackTrace(); } } }
