fengnanli commented on a change in pull request #2047:
URL: https://github.com/apache/hadoop/pull/2047#discussion_r442576731



##########
File path: 
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/ZKDelegationTokenSecretManagerImpl.java
##########
@@ -19,38 +19,211 @@
 package org.apache.hadoop.hdfs.server.federation.router.security.token;
 
 import 
org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.security.token.Token;
 import 
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
 import 
org.apache.hadoop.security.token.delegation.ZKDelegationTokenSecretManager;
+import org.apache.hadoop.util.Time;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
 
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
 import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Zookeeper based router delegation token store implementation.
  */
 public class ZKDelegationTokenSecretManagerImpl extends
     ZKDelegationTokenSecretManager<AbstractDelegationTokenIdentifier> {
 
+  public static final String ZK_DTSM_ROUTER_TOKEN_SYNC_INTERVAL =
+      "zk-dt-secret-manager.router.token.sync.interval";
+  public static final int ZK_DTSM_ROUTER_TOKEN_SYNC_INTERVAL_DEFAULT = 5;
+
   private static final Logger LOG =
       LoggerFactory.getLogger(ZKDelegationTokenSecretManagerImpl.class);
 
-  private Configuration conf = null;
+  private Configuration conf;
+
+  private final ScheduledExecutorService scheduler =
+      Executors.newSingleThreadScheduledExecutor();
+
+  // Local cache of delegation tokens, used for depracating tokens from
+  // currentTokenMap
+  private final Set<AbstractDelegationTokenIdentifier> localTokenCache =
+      new HashSet<>();
+  // Native zk client for getting all tokens
+  private ZooKeeper zookeeper;
+  private final String TOKEN_PATH = "/" + zkClient.getNamespace()
+      + ZK_DTSM_TOKENS_ROOT;
+  // The flag used to issue an extra check before deletion
+  // Since cancel token and token remover thread use the same
+  // API here and one router could have a token that is renewed
+  // by another router, thus token remover should always check ZK
+  // to confirm whether it has been renewed or not
+  private ThreadLocal<Boolean> checkAgainstZkBeforeDeletion =
+      new ThreadLocal<Boolean>() {
+        @Override
+        protected Boolean initialValue() {
+          return true;
+        }
+      };
 
   public ZKDelegationTokenSecretManagerImpl(Configuration conf) {
     super(conf);
     this.conf = conf;
     try {
-      super.startThreads();
+      startThreads();
     } catch (IOException e) {
       LOG.error("Error starting threads for zkDelegationTokens", e);
     }
     LOG.info("Zookeeper delegation token secret manager instantiated");
   }
 
+  @Override
+  public void startThreads() throws IOException {
+    super.startThreads();
+    // start token cache related work when watcher is disabled
+    if (!isTokenWatcherEnabled()) {
+      LOG.info("Watcher for tokens is disabled in this secret manager");
+      try {
+        // By default set this variable
+        checkAgainstZkBeforeDeletion.set(true);
+        // Ensure the token root path exists
+        if (zkClient.checkExists().forPath(ZK_DTSM_TOKENS_ROOT) == null) {
+          zkClient.create().creatingParentsIfNeeded()
+              .withMode(CreateMode.PERSISTENT)
+              .forPath(ZK_DTSM_TOKENS_ROOT);
+        }
+        // Set up zookeeper client
+        try {
+          zookeeper = zkClient.getZookeeperClient().getZooKeeper();
+        } catch (Exception e) {
+          LOG.info("Cannot get zookeeper client ", e);
+        } finally {
+          if (zookeeper == null) {
+            throw new IOException("Zookeeper client is null");
+          }
+        }
+
+        LOG.info("Start loading token cache");
+        long start = Time.now();
+        rebuildTokenCache(true);
+        LOG.info("Loaded token cache in {} milliseconds", Time.now() - start);
+
+        int syncInterval = conf.getInt(ZK_DTSM_ROUTER_TOKEN_SYNC_INTERVAL,
+            ZK_DTSM_ROUTER_TOKEN_SYNC_INTERVAL_DEFAULT);
+        scheduler.scheduleAtFixedRate(new Runnable() {
+          @Override
+          public void run() {
+            try {
+              rebuildTokenCache(false);
+            } catch (Exception e) {
+              // ignore
+            }
+          }
+        }, syncInterval, syncInterval, TimeUnit.SECONDS);
+      } catch (Exception e) {
+        LOG.error("Error rebuilding local cache for zkDelegationTokens ", e);
+      }
+    }
+  }
+
+  @Override
+  public void stopThreads() {
+    super.stopThreads();
+    scheduler.shutdown();
+  }
+
   @Override
   public DelegationTokenIdentifier createIdentifier() {
     return new DelegationTokenIdentifier();
   }
+
+  /**
+   * This function will rebuild local token cache from zk storage.
+   * It is first called when the secret manager is initialized and
+   * then regularly at a configured interval.
+   *
+   * @param initial whether this is called during initialization
+   * @throws IOException
+   */
+  private void rebuildTokenCache(boolean initial) throws IOException {
+    localTokenCache.clear();
+    // Use bare zookeeper client to get all children since curator will
+    // wrap the same API with a sorting process. This is time consuming given
+    // millions of tokens
+    List<String> zkTokens;
+    try {
+      zkTokens = zookeeper.getChildren(TOKEN_PATH, false);
+    } catch (KeeperException | InterruptedException e) {
+      throw new IOException("Tokens cannot be fetched from path "
+          + TOKEN_PATH, e);
+    }
+    byte[] data;
+    for (String tokenPath : zkTokens) {
+      try {
+        data = zkClient.getData().forPath(
+            ZK_DTSM_TOKENS_ROOT + "/" + tokenPath);
+      } catch (KeeperException.NoNodeException e) {
+        LOG.debug("No node in path [" + tokenPath + "]");
+        continue;
+      } catch (Exception ex) {
+        throw new IOException(ex);
+      }
+      // Store data to currentTokenMap
+      processTokenAddOrUpdate(data);
+      // Store data to localTokenCache for sync
+      AbstractDelegationTokenIdentifier ident = createIdentifier();
+      DataInputStream din = new DataInputStream(new 
ByteArrayInputStream(data));
+      ident.readFields(din);
+      localTokenCache.add(ident);
+    }
+    if (!initial) {
+      // Sync zkTokens with local cache, specifically
+      // 1) add/update tokens to local cache from zk, which is done through
+      //    processTokenAddOrUpdate above
+      // 2) remove tokens in local cache but not in zk anymore
+      for (AbstractDelegationTokenIdentifier ident : currentTokens.keySet()) {

Review comment:
       `localTokenCache` is a set for tokenIdent, `currentTokens` is the map to 
store the actual TokenInformation. We will need to construct all of the 
TokenInformation to do the swap. And the swap needs to be done in a 
synchronized block. I think the current way is simpler and following the logic 
inside its parent class.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to