This is an automated email from the ASF dual-hosted git repository.

xyao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6288e15  HADOOP-16828. Zookeeper Delegation Token Manager fetch 
sequence number by batch. Contributed by Fengnan Li.
6288e15 is described below

commit 6288e15118fab65a9a1452898e639313c6996769
Author: Xiaoyu Yao <x...@apache.org>
AuthorDate: Tue Jun 2 11:53:08 2020 -0700

    HADOOP-16828. Zookeeper Delegation Token Manager fetch sequence number by 
batch. Contributed by Fengnan Li.
---
 .../delegation/ZKDelegationTokenSecretManager.java | 54 ++++++++++++++++------
 .../TestZKDelegationTokenSecretManager.java        | 52 +++++++++++++++++++++
 2 files changed, 93 insertions(+), 13 deletions(-)

diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java
index f61590c..cd3b8c0 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java
@@ -98,12 +98,16 @@ public abstract class 
ZKDelegationTokenSecretManager<TokenIdent extends Abstract
       + "kerberos.keytab";
   public static final String ZK_DTSM_ZK_KERBEROS_PRINCIPAL = ZK_CONF_PREFIX
       + "kerberos.principal";
+  public static final String ZK_DTSM_TOKEN_SEQNUM_BATCH_SIZE = ZK_CONF_PREFIX
+      + "token.seqnum.batch.size";
 
   public static final int ZK_DTSM_ZK_NUM_RETRIES_DEFAULT = 3;
   public static final int ZK_DTSM_ZK_SESSION_TIMEOUT_DEFAULT = 10000;
   public static final int ZK_DTSM_ZK_CONNECTION_TIMEOUT_DEFAULT = 10000;
   public static final int ZK_DTSM_ZK_SHUTDOWN_TIMEOUT_DEFAULT = 10000;
   public static final String ZK_DTSM_ZNODE_WORKING_PATH_DEAFULT = "zkdtsm";
+  // by default it is still incrementing seq number by 1 each time
+  public static final int ZK_DTSM_TOKEN_SEQNUM_BATCH_SIZE_DEFAULT = 1;
 
   private static Logger LOG = LoggerFactory
       .getLogger(ZKDelegationTokenSecretManager.class);
@@ -135,6 +139,9 @@ public abstract class 
ZKDelegationTokenSecretManager<TokenIdent extends Abstract
   private PathChildrenCache tokenCache;
   private ExecutorService listenerThreadPool;
   private final long shutdownTimeout;
+  private final int seqNumBatchSize;
+  private int currentSeqNum;
+  private int currentMaxSeqNum;
 
   public ZKDelegationTokenSecretManager(Configuration conf) {
     super(conf.getLong(DelegationTokenManager.UPDATE_INTERVAL,
@@ -147,6 +154,8 @@ public abstract class 
ZKDelegationTokenSecretManager<TokenIdent extends Abstract
             DelegationTokenManager.REMOVAL_SCAN_INTERVAL_DEFAULT) * 1000);
     shutdownTimeout = conf.getLong(ZK_DTSM_ZK_SHUTDOWN_TIMEOUT,
         ZK_DTSM_ZK_SHUTDOWN_TIMEOUT_DEFAULT);
+    seqNumBatchSize = conf.getInt(ZK_DTSM_TOKEN_SEQNUM_BATCH_SIZE,
+        ZK_DTSM_TOKEN_SEQNUM_BATCH_SIZE_DEFAULT);
     if (CURATOR_TL.get() != null) {
       zkClient =
           CURATOR_TL.get().usingNamespace(
@@ -322,6 +331,12 @@ public abstract class 
ZKDelegationTokenSecretManager<TokenIdent extends Abstract
       if (delTokSeqCounter != null) {
         delTokSeqCounter.start();
       }
+      // the first batch range should be allocated during this starting window
+      // by calling the incrSharedCount
+      currentSeqNum = incrSharedCount(delTokSeqCounter, seqNumBatchSize);
+      currentMaxSeqNum = currentSeqNum + seqNumBatchSize;
+      LOG.info("Fetched initial range of seq num, from {} to {} ",
+          currentSeqNum+1, currentMaxSeqNum);
     } catch (Exception e) {
       throw new IOException("Could not start Sequence Counter", e);
     }
@@ -562,28 +577,41 @@ public abstract class 
ZKDelegationTokenSecretManager<TokenIdent extends Abstract
     return delTokSeqCounter.getCount();
   }
 
-  private void incrSharedCount(SharedCount sharedCount) throws Exception {
+  private int incrSharedCount(SharedCount sharedCount, int batchSize)
+      throws Exception {
     while (true) {
       // Loop until we successfully increment the counter
       VersionedValue<Integer> versionedValue = sharedCount.getVersionedValue();
-      if (sharedCount.trySetCount(versionedValue, versionedValue.getValue() + 
1)) {
-        break;
+      if (sharedCount.trySetCount(
+          versionedValue, versionedValue.getValue() + batchSize)) {
+        return versionedValue.getValue();
       }
     }
   }
 
   @Override
   protected int incrementDelegationTokenSeqNum() {
-    try {
-      incrSharedCount(delTokSeqCounter);
-    } catch (InterruptedException e) {
-      // The ExpirationThread is just finishing.. so dont do anything..
-      LOG.debug("Thread interrupted while performing token counter increment", 
e);
-      Thread.currentThread().interrupt();
-    } catch (Exception e) {
-      throw new RuntimeException("Could not increment shared counter !!", e);
+    // The secret manager will keep a local range of seq num which won't be
+    // seen by peers, so only when the range is exhausted it will ask zk for
+    // another range again
+    if (currentSeqNum >= currentMaxSeqNum) {
+      try {
+        // after a successful batch request, we can get the range starting 
point
+        currentSeqNum = incrSharedCount(delTokSeqCounter, seqNumBatchSize);
+        currentMaxSeqNum = currentSeqNum + seqNumBatchSize;
+        LOG.info("Fetched new range of seq num, from {} to {} ",
+            currentSeqNum+1, currentMaxSeqNum);
+      } catch (InterruptedException e) {
+        // The ExpirationThread is just finishing.. so dont do anything..
+        LOG.debug(
+            "Thread interrupted while performing token counter increment", e);
+        Thread.currentThread().interrupt();
+      } catch (Exception e) {
+        throw new RuntimeException("Could not increment shared counter !!", e);
+      }
     }
-    return delTokSeqCounter.getCount();
+
+    return ++currentSeqNum;
   }
 
   @Override
@@ -603,7 +631,7 @@ public abstract class 
ZKDelegationTokenSecretManager<TokenIdent extends Abstract
   @Override
   protected int incrementCurrentKeyId() {
     try {
-      incrSharedCount(keyIdSeqCounter);
+      incrSharedCount(keyIdSeqCounter, 1);
     } catch (InterruptedException e) {
       // The ExpirationThread is just finishing.. so dont do anything..
       LOG.debug("Thread interrupted while performing keyId increment", e);
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestZKDelegationTokenSecretManager.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestZKDelegationTokenSecretManager.java
index c9571ff2..b2e1779 100644
--- 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestZKDelegationTokenSecretManager.java
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestZKDelegationTokenSecretManager.java
@@ -219,6 +219,58 @@ public class TestZKDelegationTokenSecretManager {
 
   @SuppressWarnings("unchecked")
   @Test
+  public void testMultiNodeCompeteForSeqNum() throws Exception {
+    DelegationTokenManager tm1, tm2 = null;
+    String connectString = zkServer.getConnectString();
+    Configuration conf = getSecretConf(connectString);
+    conf.setInt(
+        ZKDelegationTokenSecretManager.ZK_DTSM_TOKEN_SEQNUM_BATCH_SIZE, 1000);
+    tm1 = new DelegationTokenManager(conf, new Text("bla"));
+    tm1.init();
+
+    Token<DelegationTokenIdentifier> token1 =
+        (Token<DelegationTokenIdentifier>) tm1.createToken(
+            UserGroupInformation.getCurrentUser(), "foo");
+    Assert.assertNotNull(token1);
+    AbstractDelegationTokenIdentifier id1 =
+        tm1.getDelegationTokenSecretManager().decodeTokenIdentifier(token1);
+    Assert.assertEquals(
+        "Token seq should be the same", 1, id1.getSequenceNumber());
+    Token<DelegationTokenIdentifier> token2 =
+        (Token<DelegationTokenIdentifier>) tm1.createToken(
+            UserGroupInformation.getCurrentUser(), "foo");
+    Assert.assertNotNull(token2);
+    AbstractDelegationTokenIdentifier id2 =
+        tm1.getDelegationTokenSecretManager().decodeTokenIdentifier(token2);
+    Assert.assertEquals(
+        "Token seq should be the same", 2, id2.getSequenceNumber());
+
+    tm2 = new DelegationTokenManager(conf, new Text("bla"));
+    tm2.init();
+
+    Token<DelegationTokenIdentifier> token3 =
+        (Token<DelegationTokenIdentifier>) tm2.createToken(
+            UserGroupInformation.getCurrentUser(), "foo");
+    Assert.assertNotNull(token3);
+    AbstractDelegationTokenIdentifier id3 =
+        tm2.getDelegationTokenSecretManager().decodeTokenIdentifier(token3);
+    Assert.assertEquals(
+        "Token seq should be the same", 1001, id3.getSequenceNumber());
+    Token<DelegationTokenIdentifier> token4 =
+        (Token<DelegationTokenIdentifier>) tm2.createToken(
+            UserGroupInformation.getCurrentUser(), "foo");
+    Assert.assertNotNull(token4);
+    AbstractDelegationTokenIdentifier id4 =
+        tm2.getDelegationTokenSecretManager().decodeTokenIdentifier(token4);
+    Assert.assertEquals(
+        "Token seq should be the same", 1002, id4.getSequenceNumber());
+
+    verifyDestroy(tm1, conf);
+    verifyDestroy(tm2, conf);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
   public void testRenewTokenSingleManager() throws Exception {
     for (int i = 0; i < TEST_RETRIES; i++) {
       DelegationTokenManager tm1 = null;


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to