This is an automated email from the ASF dual-hosted git repository. xyao pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push: new 6288e15 HADOOP-16828. Zookeeper Delegation Token Manager fetch sequence number by batch. Contributed by Fengnan Li. 6288e15 is described below commit 6288e15118fab65a9a1452898e639313c6996769 Author: Xiaoyu Yao <x...@apache.org> AuthorDate: Tue Jun 2 11:53:08 2020 -0700 HADOOP-16828. Zookeeper Delegation Token Manager fetch sequence number by batch. Contributed by Fengnan Li. --- .../delegation/ZKDelegationTokenSecretManager.java | 54 ++++++++++++++++------ .../TestZKDelegationTokenSecretManager.java | 52 +++++++++++++++++++++ 2 files changed, 93 insertions(+), 13 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java index f61590c..cd3b8c0 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java @@ -98,12 +98,16 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract + "kerberos.keytab"; public static final String ZK_DTSM_ZK_KERBEROS_PRINCIPAL = ZK_CONF_PREFIX + "kerberos.principal"; + public static final String ZK_DTSM_TOKEN_SEQNUM_BATCH_SIZE = ZK_CONF_PREFIX + + "token.seqnum.batch.size"; public static final int ZK_DTSM_ZK_NUM_RETRIES_DEFAULT = 3; public static final int ZK_DTSM_ZK_SESSION_TIMEOUT_DEFAULT = 10000; public static final int ZK_DTSM_ZK_CONNECTION_TIMEOUT_DEFAULT = 10000; public static final int ZK_DTSM_ZK_SHUTDOWN_TIMEOUT_DEFAULT = 10000; public static final String ZK_DTSM_ZNODE_WORKING_PATH_DEAFULT = "zkdtsm"; + // by default it is still incrementing seq number by 1 each time + public static final int ZK_DTSM_TOKEN_SEQNUM_BATCH_SIZE_DEFAULT = 1; private static Logger LOG = LoggerFactory .getLogger(ZKDelegationTokenSecretManager.class); @@ -135,6 +139,9 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract private PathChildrenCache tokenCache; private ExecutorService listenerThreadPool; private final long shutdownTimeout; + private final int seqNumBatchSize; + private int currentSeqNum; + private int currentMaxSeqNum; public ZKDelegationTokenSecretManager(Configuration conf) { super(conf.getLong(DelegationTokenManager.UPDATE_INTERVAL, @@ -147,6 +154,8 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract DelegationTokenManager.REMOVAL_SCAN_INTERVAL_DEFAULT) * 1000); shutdownTimeout = conf.getLong(ZK_DTSM_ZK_SHUTDOWN_TIMEOUT, ZK_DTSM_ZK_SHUTDOWN_TIMEOUT_DEFAULT); + seqNumBatchSize = conf.getInt(ZK_DTSM_TOKEN_SEQNUM_BATCH_SIZE, + ZK_DTSM_TOKEN_SEQNUM_BATCH_SIZE_DEFAULT); if (CURATOR_TL.get() != null) { zkClient = CURATOR_TL.get().usingNamespace( @@ -322,6 +331,12 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract if (delTokSeqCounter != null) { delTokSeqCounter.start(); } + // the first batch range should be allocated during this starting window + // by calling the incrSharedCount + currentSeqNum = incrSharedCount(delTokSeqCounter, seqNumBatchSize); + currentMaxSeqNum = currentSeqNum + seqNumBatchSize; + LOG.info("Fetched initial range of seq num, from {} to {} ", + currentSeqNum+1, currentMaxSeqNum); } catch (Exception e) { throw new IOException("Could not start Sequence Counter", e); } @@ -562,28 +577,41 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract return delTokSeqCounter.getCount(); } - private void incrSharedCount(SharedCount sharedCount) throws Exception { + private int incrSharedCount(SharedCount sharedCount, int batchSize) + throws Exception { while (true) { // Loop until we successfully increment the counter VersionedValue<Integer> versionedValue = sharedCount.getVersionedValue(); - if (sharedCount.trySetCount(versionedValue, versionedValue.getValue() + 1)) { - break; + if (sharedCount.trySetCount( + versionedValue, versionedValue.getValue() + batchSize)) { + return versionedValue.getValue(); } } } @Override protected int incrementDelegationTokenSeqNum() { - try { - incrSharedCount(delTokSeqCounter); - } catch (InterruptedException e) { - // The ExpirationThread is just finishing.. so dont do anything.. - LOG.debug("Thread interrupted while performing token counter increment", e); - Thread.currentThread().interrupt(); - } catch (Exception e) { - throw new RuntimeException("Could not increment shared counter !!", e); + // The secret manager will keep a local range of seq num which won't be + // seen by peers, so only when the range is exhausted it will ask zk for + // another range again + if (currentSeqNum >= currentMaxSeqNum) { + try { + // after a successful batch request, we can get the range starting point + currentSeqNum = incrSharedCount(delTokSeqCounter, seqNumBatchSize); + currentMaxSeqNum = currentSeqNum + seqNumBatchSize; + LOG.info("Fetched new range of seq num, from {} to {} ", + currentSeqNum+1, currentMaxSeqNum); + } catch (InterruptedException e) { + // The ExpirationThread is just finishing.. so dont do anything.. + LOG.debug( + "Thread interrupted while performing token counter increment", e); + Thread.currentThread().interrupt(); + } catch (Exception e) { + throw new RuntimeException("Could not increment shared counter !!", e); + } } - return delTokSeqCounter.getCount(); + + return ++currentSeqNum; } @Override @@ -603,7 +631,7 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract @Override protected int incrementCurrentKeyId() { try { - incrSharedCount(keyIdSeqCounter); + incrSharedCount(keyIdSeqCounter, 1); } catch (InterruptedException e) { // The ExpirationThread is just finishing.. so dont do anything.. LOG.debug("Thread interrupted while performing keyId increment", e); diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestZKDelegationTokenSecretManager.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestZKDelegationTokenSecretManager.java index c9571ff2..b2e1779 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestZKDelegationTokenSecretManager.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestZKDelegationTokenSecretManager.java @@ -219,6 +219,58 @@ public class TestZKDelegationTokenSecretManager { @SuppressWarnings("unchecked") @Test + public void testMultiNodeCompeteForSeqNum() throws Exception { + DelegationTokenManager tm1, tm2 = null; + String connectString = zkServer.getConnectString(); + Configuration conf = getSecretConf(connectString); + conf.setInt( + ZKDelegationTokenSecretManager.ZK_DTSM_TOKEN_SEQNUM_BATCH_SIZE, 1000); + tm1 = new DelegationTokenManager(conf, new Text("bla")); + tm1.init(); + + Token<DelegationTokenIdentifier> token1 = + (Token<DelegationTokenIdentifier>) tm1.createToken( + UserGroupInformation.getCurrentUser(), "foo"); + Assert.assertNotNull(token1); + AbstractDelegationTokenIdentifier id1 = + tm1.getDelegationTokenSecretManager().decodeTokenIdentifier(token1); + Assert.assertEquals( + "Token seq should be the same", 1, id1.getSequenceNumber()); + Token<DelegationTokenIdentifier> token2 = + (Token<DelegationTokenIdentifier>) tm1.createToken( + UserGroupInformation.getCurrentUser(), "foo"); + Assert.assertNotNull(token2); + AbstractDelegationTokenIdentifier id2 = + tm1.getDelegationTokenSecretManager().decodeTokenIdentifier(token2); + Assert.assertEquals( + "Token seq should be the same", 2, id2.getSequenceNumber()); + + tm2 = new DelegationTokenManager(conf, new Text("bla")); + tm2.init(); + + Token<DelegationTokenIdentifier> token3 = + (Token<DelegationTokenIdentifier>) tm2.createToken( + UserGroupInformation.getCurrentUser(), "foo"); + Assert.assertNotNull(token3); + AbstractDelegationTokenIdentifier id3 = + tm2.getDelegationTokenSecretManager().decodeTokenIdentifier(token3); + Assert.assertEquals( + "Token seq should be the same", 1001, id3.getSequenceNumber()); + Token<DelegationTokenIdentifier> token4 = + (Token<DelegationTokenIdentifier>) tm2.createToken( + UserGroupInformation.getCurrentUser(), "foo"); + Assert.assertNotNull(token4); + AbstractDelegationTokenIdentifier id4 = + tm2.getDelegationTokenSecretManager().decodeTokenIdentifier(token4); + Assert.assertEquals( + "Token seq should be the same", 1002, id4.getSequenceNumber()); + + verifyDestroy(tm1, conf); + verifyDestroy(tm2, conf); + } + + @SuppressWarnings("unchecked") + @Test public void testRenewTokenSingleManager() throws Exception { for (int i = 0; i < TEST_RETRIES; i++) { DelegationTokenManager tm1 = null; --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org