Author: shv
Date: Thu Feb 25 21:43:20 2010
New Revision: 916468
URL: http://svn.apache.org/viewvc?rev=916468&view=rev
Log:
HADOOP-6573. Support for persistent delegation tokens. Contributed by Jitendra
Pandey.
Modified:
hadoop/common/trunk/CHANGES.txt
hadoop/common/trunk/src/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java
hadoop/common/trunk/src/test/core/org/apache/hadoop/security/token/delegation/TestDelegationToken.java
Modified: hadoop/common/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/CHANGES.txt?rev=916468&r1=916467&r2=916468&view=diff
==============================================================================
--- hadoop/common/trunk/CHANGES.txt (original)
+++ hadoop/common/trunk/CHANGES.txt Thu Feb 25 21:43:20 2010
@@ -170,6 +170,9 @@
HADOOP-6596. Add a version field to the AbstractDelegationTokenIdentifier's
serialized value. (omalley)
+ HADOOP-6573. Support for persistent delegation tokens.
+ (Jitendra Pandey via shv)
+
OPTIMIZATIONS
HADOOP-6467. Improve the performance on HarFileSystem.listStatus(..).
Modified:
hadoop/common/trunk/src/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java?rev=916468&r1=916467&r2=916468&view=diff
==============================================================================
---
hadoop/common/trunk/src/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java
(original)
+++
hadoop/common/trunk/src/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java
Thu Feb 25 21:43:20 2010
@@ -52,23 +52,30 @@
/**
* Cache of currently valid tokens, mapping from DelegationTokenIdentifier
- * to DelegationTokenInformation. Protected by its own lock.
+ * to DelegationTokenInformation. Protected by this object lock.
*/
- private final Map<TokenIdent, DelegationTokenInformation> currentTokens
+ protected final Map<TokenIdent, DelegationTokenInformation> currentTokens
= new HashMap<TokenIdent, DelegationTokenInformation>();
/**
- * Sequence number to create DelegationTokenIdentifier
+ * Sequence number to create DelegationTokenIdentifier.
+ * Protected by this object lock.
*/
- private int delegationTokenSequenceNumber = 0;
+ protected int delegationTokenSequenceNumber = 0;
- private final Map<Integer, DelegationKey> allKeys
+ /**
+ * Access to allKeys is protected by this object lock
+ */
+ protected final Map<Integer, DelegationKey> allKeys
= new HashMap<Integer, DelegationKey>();
/**
- * Access to currentId and currentKey is protected by this object lock.
+ * Access to currentId is protected by this object lock.
+ */
+ protected int currentId = 0;
+ /**
+ * Access to currentKey is protected by this object lock
*/
- private int currentId = 0;
private DelegationKey currentKey;
private long keyUpdateInterval;
@@ -76,7 +83,7 @@
private long tokenRemoverScanInterval;
private long tokenRenewInterval;
private Thread tokenRemoverThread;
- private volatile boolean running;
+ protected volatile boolean running;
public AbstractDelegationTokenSecretManager(long delegationKeyUpdateInterval,
long delegationTokenMaxLifetime, long delegationTokenRenewInterval,
@@ -112,27 +119,50 @@
return allKeys.values().toArray(new DelegationKey[0]);
}
- /** Update the current master key */
- private synchronized void updateCurrentKey() throws IOException {
+ protected void logUpdateMasterKey(DelegationKey key) throws IOException {
+ return;
+ }
+
+ /**
+ * Update the current master key
+ * This is called once by startThreads before tokenRemoverThread is created,
+ * and only by tokenRemoverThread afterwards.
+ */
+ private void updateCurrentKey() throws IOException {
LOG.info("Updating the current master key for generating delegation
tokens");
/* Create a new currentKey with an estimated expiry date. */
- currentId++;
- currentKey = new DelegationKey(currentId, System.currentTimeMillis()
+ int newCurrentId;
+ synchronized (this) {
+ newCurrentId = currentId+1;
+ }
+ DelegationKey newKey = new DelegationKey(newCurrentId, System
+ .currentTimeMillis()
+ keyUpdateInterval + tokenMaxLifetime, generateSecret());
- allKeys.put(currentKey.getKeyId(), currentKey);
+ //Log must be invoked outside the lock on 'this'
+ logUpdateMasterKey(newKey);
+ synchronized (this) {
+ currentId = newKey.getKeyId();
+ currentKey = newKey;
+ allKeys.put(currentKey.getKeyId(), currentKey);
+ }
}
- /** Update the current master key for generating delegation tokens */
- public synchronized void rollMasterKey() throws IOException {
- removeExpiredKeys();
- /* set final expiry date for retiring currentKey */
- currentKey.setExpiryDate(System.currentTimeMillis() + tokenMaxLifetime);
- /*
- * currentKey might have been removed by removeExpiredKeys(), if
- * updateMasterKey() isn't called at expected interval. Add it back to
- * allKeys just in case.
- */
- allKeys.put(currentKey.getKeyId(), currentKey);
+ /**
+ * Update the current master key for generating delegation tokens
+ * It should be called only by tokenRemoverThread.
+ */
+ void rollMasterKey() throws IOException {
+ synchronized (this) {
+ removeExpiredKeys();
+ /* set final expiry date for retiring currentKey */
+ currentKey.setExpiryDate(System.currentTimeMillis() + tokenMaxLifetime);
+ /*
+ * currentKey might have been removed by removeExpiredKeys(), if
+ * updateMasterKey() isn't called at expected interval. Add it back to
+ * allKeys just in case.
+ */
+ allKeys.put(currentKey.getKeyId(), currentKey);
+ }
updateCurrentKey();
}
@@ -148,35 +178,24 @@
}
@Override
- protected byte[] createPassword(TokenIdent identifier) {
+ protected synchronized byte[] createPassword(TokenIdent identifier) {
int sequenceNum;
- int id;
- DelegationKey key;
- long now = System.currentTimeMillis();
- synchronized (this) {
- id = currentId;
- key = currentKey;
- sequenceNum = ++delegationTokenSequenceNumber;
- }
+ long now = System.currentTimeMillis();
+ sequenceNum = ++delegationTokenSequenceNumber;
identifier.setIssueDate(now);
identifier.setMaxDate(now + tokenMaxLifetime);
- identifier.setMasterKeyId(id);
+ identifier.setMasterKeyId(currentId);
identifier.setSequenceNumber(sequenceNum);
- byte[] password = createPassword(identifier.getBytes(), key.getKey());
- synchronized (currentTokens) {
- currentTokens.put(identifier, new DelegationTokenInformation(now
- + tokenRenewInterval, password));
- }
+ byte[] password = createPassword(identifier.getBytes(),
currentKey.getKey());
+ currentTokens.put(identifier, new DelegationTokenInformation(now
+ + tokenRenewInterval, password));
return password;
}
@Override
- public byte[] retrievePassword(TokenIdent identifier
- ) throws InvalidToken {
- DelegationTokenInformation info = null;
- synchronized (currentTokens) {
- info = currentTokens.get(identifier);
- }
+ public synchronized byte[] retrievePassword(TokenIdent identifier)
+ throws InvalidToken {
+ DelegationTokenInformation info = currentTokens.get(identifier);
if (info == null) {
throw new InvalidToken("token is expired or doesn't exist");
}
@@ -195,18 +214,14 @@
* @throws InvalidToken if the token is invalid
* @throws AccessControlException if the user can't renew token
*/
- public long renewToken(Token<TokenIdent> token,
+ public synchronized long renewToken(Token<TokenIdent> token,
String renewer) throws InvalidToken, IOException {
long now = System.currentTimeMillis();
ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier());
DataInputStream in = new DataInputStream(buf);
TokenIdent id = createIdentifier();
id.readFields(in);
- synchronized (currentTokens) {
- if (currentTokens.get(id) == null) {
- throw new InvalidToken("Renewal request for unknown token");
- }
- }
+
if (id.getMaxDate() < now) {
throw new InvalidToken("User " + renewer +
" tried to renew an expired token");
@@ -222,36 +237,36 @@
"renewer specified as " +
id.getRenewer());
}
- DelegationKey key = null;
- synchronized (this) {
- key = allKeys.get(id.getMasterKeyId());
- }
+ DelegationKey key = allKeys.get(id.getMasterKeyId());
if (key == null) {
- throw new InvalidToken("Unable to find master key for keyId=" +
- id.getMasterKeyId() +
- " from cache. Failed to renew an unexpired token"+
- " with sequenceNumber=" + id.getSequenceNumber());
+ throw new InvalidToken("Unable to find master key for keyId="
+ + id.getMasterKeyId()
+ + " from cache. Failed to renew an unexpired token"
+ + " with sequenceNumber=" + id.getSequenceNumber());
}
byte[] password = createPassword(token.getIdentifier(), key.getKey());
if (!Arrays.equals(password, token.getPassword())) {
- throw new AccessControlException("Client " + renewer +
- " is trying to renew a token with " +
- "wrong password");
+ throw new AccessControlException("Client " + renewer
+ + " is trying to renew a token with " + "wrong password");
}
- DelegationTokenInformation info = new DelegationTokenInformation(
- Math.min(id.getMaxDate(), now + tokenRenewInterval), password);
- synchronized (currentTokens) {
- currentTokens.put(id, info);
+ long renewTime = Math.min(id.getMaxDate(), now + tokenRenewInterval);
+ DelegationTokenInformation info = new DelegationTokenInformation(renewTime,
+ password);
+
+ if (currentTokens.get(id) == null) {
+ throw new InvalidToken("Renewal request for unknown token");
}
- return info.getRenewDate();
+ currentTokens.put(id, info);
+ return renewTime;
}
/**
* Cancel a token by removing it from cache.
+ * @return Identifier of the canceled token
* @throws InvalidToken for invalid token
* @throws AccessControlException if the user isn't allowed to cancel
*/
- public void cancelToken(Token<TokenIdent> token,
+ public synchronized TokenIdent cancelToken(Token<TokenIdent> token,
String canceller) throws IOException {
ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier());
DataInputStream in = new DataInputStream(buf);
@@ -262,18 +277,17 @@
}
String owner = id.getUser().getUserName();
Text renewer = id.getRenewer();
- if (!canceller.equals(owner) &&
- (renewer == null || !canceller.equals(renewer.toString()))) {
- throw new AccessControlException(canceller +
- " is not authorized to cancel the
token");
+ if (!canceller.equals(owner)
+ && (renewer == null || !canceller.equals(renewer.toString()))) {
+ throw new AccessControlException(canceller
+ + " is not authorized to cancel the token");
}
DelegationTokenInformation info = null;
- synchronized (currentTokens) {
- info = currentTokens.remove(id);
- }
+ info = currentTokens.remove(id);
if (info == null) {
throw new InvalidToken("Token not found");
}
+ return id;
}
/**
@@ -285,16 +299,16 @@
return SecretManager.createSecretKey(key);
}
- /** Utility class to encapsulate a token's renew date and password. */
- private static class DelegationTokenInformation {
+ /** Class to encapsulate a token's renew date and password. */
+ public static class DelegationTokenInformation {
long renewDate;
byte[] password;
- DelegationTokenInformation(long renewDate, byte[] password) {
+ public DelegationTokenInformation(long renewDate, byte[] password) {
this.renewDate = renewDate;
this.password = password;
}
/** returns renew date */
- long getRenewDate() {
+ public long getRenewDate() {
return renewDate;
}
/** returns password */
@@ -304,15 +318,13 @@
}
/** Remove expired delegation tokens from cache */
- private void removeExpiredToken() {
+ private synchronized void removeExpiredToken() {
long now = System.currentTimeMillis();
- synchronized (currentTokens) {
- Iterator<DelegationTokenInformation> i =
currentTokens.values().iterator();
- while (i.hasNext()) {
- long renewDate = i.next().getRenewDate();
- if (now > renewDate) {
- i.remove();
- }
+ Iterator<DelegationTokenInformation> i = currentTokens.values().iterator();
+ while (i.hasNext()) {
+ long renewDate = i.next().getRenewDate();
+ if (now > renewDate) {
+ i.remove();
}
}
}
@@ -321,7 +333,9 @@
if (LOG.isDebugEnabled())
LOG.debug("Stopping expired delegation token remover thread");
running = false;
- tokenRemoverThread.interrupt();
+ if (tokenRemoverThread != null) {
+ tokenRemoverThread.interrupt();
+ }
}
private class ExpiredTokenRemover extends Thread {
Modified:
hadoop/common/trunk/src/test/core/org/apache/hadoop/security/token/delegation/TestDelegationToken.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/src/test/core/org/apache/hadoop/security/token/delegation/TestDelegationToken.java?rev=916468&r1=916467&r2=916468&view=diff
==============================================================================
---
hadoop/common/trunk/src/test/core/org/apache/hadoop/security/token/delegation/TestDelegationToken.java
(original)
+++
hadoop/common/trunk/src/test/core/org/apache/hadoop/security/token/delegation/TestDelegationToken.java
Thu Feb 25 21:43:20 2010
@@ -25,7 +25,10 @@
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import junit.framework.Assert;
@@ -36,8 +39,11 @@
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
+import
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation;
+import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.StringUtils;
import org.junit.Test;
@@ -91,6 +97,18 @@
protected byte[] createPassword(TestDelegationTokenIdentifier t) {
return super.createPassword(t);
}
+
+ public byte[] createPassword(TestDelegationTokenIdentifier t,
DelegationKey key) {
+ return SecretManager.createPassword(t.getBytes(), key.getKey());
+ }
+
+ public Map<TestDelegationTokenIdentifier, DelegationTokenInformation>
getAllTokens() {
+ return currentTokens;
+ }
+
+ public DelegationKey getKey(TestDelegationTokenIdentifier id) {
+ return allKeys.get(id.getMasterKeyId());
+ }
}
public static class TokenSelector extends
@@ -299,4 +317,52 @@
dtSecretManager.stopThreads();
}
}
+
+ @Test
+ public void testParallelDelegationTokenCreation() throws Exception {
+ final TestDelegationTokenSecretManager dtSecretManager =
+ new TestDelegationTokenSecretManager(2000, 24 * 60 * 60 * 1000,
+ 7 * 24 * 60 * 60 * 1000, 2000);
+ try {
+ dtSecretManager.startThreads();
+ int numThreads = 100;
+ final int numTokensPerThread = 100;
+ class tokenIssuerThread implements Runnable {
+
+ public void run() {
+ for(int i =0;i <numTokensPerThread; i++) {
+ generateDelegationToken(dtSecretManager, "auser", "arenewer");
+ try {
+ Thread.sleep(250);
+ } catch (Exception e) {
+ }
+ }
+ }
+ }
+ Thread[] issuers = new Thread[numThreads];
+ for (int i =0; i <numThreads; i++) {
+ issuers[i] = new Daemon(new tokenIssuerThread());
+ issuers[i].start();
+ }
+ for (int i =0; i <numThreads; i++) {
+ issuers[i].join();
+ }
+ Map<TestDelegationTokenIdentifier, DelegationTokenInformation>
tokenCache = dtSecretManager
+ .getAllTokens();
+ Assert.assertEquals(numTokensPerThread*numThreads, tokenCache.size());
+ Iterator<TestDelegationTokenIdentifier> iter =
tokenCache.keySet().iterator();
+ while (iter.hasNext()) {
+ TestDelegationTokenIdentifier id = iter.next();
+ DelegationTokenInformation info = tokenCache.get(id);
+ Assert.assertTrue(info != null);
+ DelegationKey key = dtSecretManager.getKey(id);
+ Assert.assertTrue(key != null);
+ byte[] storedPassword = dtSecretManager.retrievePassword(id);
+ byte[] password = dtSecretManager.createPassword(id, key);
+ Assert.assertTrue(Arrays.equals(password, storedPassword));
+ }
+ } finally {
+ dtSecretManager.stopThreads();
+ }
+ }
}