Hexiaoqiao commented on a change in pull request #2047:
URL: https://github.com/apache/hadoop/pull/2047#discussion_r434421860
##########
File path:
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/ZKDelegationTokenSecretManagerImpl.java
##########
@@ -19,38 +19,211 @@
package org.apache.hadoop.hdfs.server.federation.router.security.token;
import
org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.security.token.Token;
import
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
import
org.apache.hadoop.security.token.delegation.ZKDelegationTokenSecretManager;
+import org.apache.hadoop.util.Time;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
/**
* Zookeeper based router delegation token store implementation.
*/
public class ZKDelegationTokenSecretManagerImpl extends
ZKDelegationTokenSecretManager<AbstractDelegationTokenIdentifier> {
+ public static final String ZK_DTSM_ROUTER_TOKEN_SYNC_INTERVAL =
+ "zk-dt-secret-manager.router.token.sync.interval";
+ public static final int ZK_DTSM_ROUTER_TOKEN_SYNC_INTERVAL_DEFAULT = 5;
+
private static final Logger LOG =
LoggerFactory.getLogger(ZKDelegationTokenSecretManagerImpl.class);
- private Configuration conf = null;
+ private Configuration conf;
+
+ private final ScheduledExecutorService scheduler =
+ Executors.newSingleThreadScheduledExecutor();
+
+ // Local cache of delegation tokens, used for depracating tokens from
Review comment:
depracating -> deprecating?
##########
File path:
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/ZKDelegationTokenSecretManagerImpl.java
##########
@@ -19,38 +19,211 @@
package org.apache.hadoop.hdfs.server.federation.router.security.token;
import
org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.security.token.Token;
import
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
import
org.apache.hadoop.security.token.delegation.ZKDelegationTokenSecretManager;
+import org.apache.hadoop.util.Time;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
/**
* Zookeeper based router delegation token store implementation.
*/
public class ZKDelegationTokenSecretManagerImpl extends
ZKDelegationTokenSecretManager<AbstractDelegationTokenIdentifier> {
+ public static final String ZK_DTSM_ROUTER_TOKEN_SYNC_INTERVAL =
+ "zk-dt-secret-manager.router.token.sync.interval";
+ public static final int ZK_DTSM_ROUTER_TOKEN_SYNC_INTERVAL_DEFAULT = 5;
+
private static final Logger LOG =
LoggerFactory.getLogger(ZKDelegationTokenSecretManagerImpl.class);
- private Configuration conf = null;
+ private Configuration conf;
+
+ private final ScheduledExecutorService scheduler =
+ Executors.newSingleThreadScheduledExecutor();
+
+ // Local cache of delegation tokens, used for depracating tokens from
+ // currentTokenMap
+ private final Set<AbstractDelegationTokenIdentifier> localTokenCache =
+ new HashSet<>();
+ // Native zk client for getting all tokens
+ private ZooKeeper zookeeper;
+ private final String TOKEN_PATH = "/" + zkClient.getNamespace()
+ + ZK_DTSM_TOKENS_ROOT;
+ // The flag used to issue an extra check before deletion
+ // Since cancel token and token remover thread use the same
+ // API here and one router could have a token that is renewed
+ // by another router, thus token remover should always check ZK
+ // to confirm whether it has been renewed or not
+ private ThreadLocal<Boolean> checkAgainstZkBeforeDeletion =
+ new ThreadLocal<Boolean>() {
+ @Override
+ protected Boolean initialValue() {
+ return true;
+ }
+ };
public ZKDelegationTokenSecretManagerImpl(Configuration conf) {
super(conf);
this.conf = conf;
try {
- super.startThreads();
+ startThreads();
} catch (IOException e) {
LOG.error("Error starting threads for zkDelegationTokens", e);
}
LOG.info("Zookeeper delegation token secret manager instantiated");
}
+ @Override
+ public void startThreads() throws IOException {
+ super.startThreads();
+ // start token cache related work when watcher is disabled
+ if (!isTokenWatcherEnabled()) {
+ LOG.info("Watcher for tokens is disabled in this secret manager");
+ try {
+ // By default set this variable
+ checkAgainstZkBeforeDeletion.set(true);
+ // Ensure the token root path exists
+ if (zkClient.checkExists().forPath(ZK_DTSM_TOKENS_ROOT) == null) {
+ zkClient.create().creatingParentsIfNeeded()
+ .withMode(CreateMode.PERSISTENT)
+ .forPath(ZK_DTSM_TOKENS_ROOT);
+ }
+ // Set up zookeeper client
+ try {
+ zookeeper = zkClient.getZookeeperClient().getZooKeeper();
+ } catch (Exception e) {
+ LOG.info("Cannot get zookeeper client ", e);
+ } finally {
+ if (zookeeper == null) {
+ throw new IOException("Zookeeper client is null");
+ }
+ }
+
+ LOG.info("Start loading token cache");
+ long start = Time.now();
+ rebuildTokenCache(true);
+ LOG.info("Loaded token cache in {} milliseconds", Time.now() - start);
+
+ int syncInterval = conf.getInt(ZK_DTSM_ROUTER_TOKEN_SYNC_INTERVAL,
+ ZK_DTSM_ROUTER_TOKEN_SYNC_INTERVAL_DEFAULT);
+ scheduler.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ rebuildTokenCache(false);
+ } catch (Exception e) {
+ // ignore
+ }
+ }
+ }, syncInterval, syncInterval, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ LOG.error("Error rebuilding local cache for zkDelegationTokens ", e);
+ }
+ }
+ }
+
+ @Override
+ public void stopThreads() {
+ super.stopThreads();
+ scheduler.shutdown();
+ }
+
@Override
public DelegationTokenIdentifier createIdentifier() {
return new DelegationTokenIdentifier();
}
+
+ /**
+ * This function will rebuild local token cache from zk storage.
+ * It is first called when the secret manager is initialized and
+ * then regularly at a configured interval.
+ *
+ * @param initial whether this is called during initialization
+ * @throws IOException
+ */
+ private void rebuildTokenCache(boolean initial) throws IOException {
+ localTokenCache.clear();
+ // Use bare zookeeper client to get all children since curator will
+ // wrap the same API with a sorting process. This is time consuming given
+ // millions of tokens
+ List<String> zkTokens;
+ try {
+ zkTokens = zookeeper.getChildren(TOKEN_PATH, false);
+ } catch (KeeperException | InterruptedException e) {
+ throw new IOException("Tokens cannot be fetched from path "
+ + TOKEN_PATH, e);
+ }
+ byte[] data;
+ for (String tokenPath : zkTokens) {
+ try {
+ data = zkClient.getData().forPath(
+ ZK_DTSM_TOKENS_ROOT + "/" + tokenPath);
+ } catch (KeeperException.NoNodeException e) {
+ LOG.debug("No node in path [" + tokenPath + "]");
+ continue;
+ } catch (Exception ex) {
+ throw new IOException(ex);
+ }
+ // Store data to currentTokenMap
+ processTokenAddOrUpdate(data);
Review comment:
It looks there are double deserialize operations, it should be better if
we can merge them to one IMO.
##########
File path:
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/ZKDelegationTokenSecretManagerImpl.java
##########
@@ -19,38 +19,211 @@
package org.apache.hadoop.hdfs.server.federation.router.security.token;
import
org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.security.token.Token;
import
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
import
org.apache.hadoop.security.token.delegation.ZKDelegationTokenSecretManager;
+import org.apache.hadoop.util.Time;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
/**
* Zookeeper based router delegation token store implementation.
*/
public class ZKDelegationTokenSecretManagerImpl extends
ZKDelegationTokenSecretManager<AbstractDelegationTokenIdentifier> {
+ public static final String ZK_DTSM_ROUTER_TOKEN_SYNC_INTERVAL =
+ "zk-dt-secret-manager.router.token.sync.interval";
+ public static final int ZK_DTSM_ROUTER_TOKEN_SYNC_INTERVAL_DEFAULT = 5;
+
private static final Logger LOG =
LoggerFactory.getLogger(ZKDelegationTokenSecretManagerImpl.class);
- private Configuration conf = null;
+ private Configuration conf;
+
+ private final ScheduledExecutorService scheduler =
+ Executors.newSingleThreadScheduledExecutor();
+
+ // Local cache of delegation tokens, used for depracating tokens from
+ // currentTokenMap
+ private final Set<AbstractDelegationTokenIdentifier> localTokenCache =
+ new HashSet<>();
+ // Native zk client for getting all tokens
+ private ZooKeeper zookeeper;
+ private final String TOKEN_PATH = "/" + zkClient.getNamespace()
+ + ZK_DTSM_TOKENS_ROOT;
+ // The flag used to issue an extra check before deletion
+ // Since cancel token and token remover thread use the same
+ // API here and one router could have a token that is renewed
+ // by another router, thus token remover should always check ZK
+ // to confirm whether it has been renewed or not
+ private ThreadLocal<Boolean> checkAgainstZkBeforeDeletion =
+ new ThreadLocal<Boolean>() {
+ @Override
+ protected Boolean initialValue() {
+ return true;
+ }
+ };
public ZKDelegationTokenSecretManagerImpl(Configuration conf) {
super(conf);
this.conf = conf;
try {
- super.startThreads();
+ startThreads();
} catch (IOException e) {
LOG.error("Error starting threads for zkDelegationTokens", e);
}
LOG.info("Zookeeper delegation token secret manager instantiated");
}
+ @Override
+ public void startThreads() throws IOException {
+ super.startThreads();
+ // start token cache related work when watcher is disabled
+ if (!isTokenWatcherEnabled()) {
+ LOG.info("Watcher for tokens is disabled in this secret manager");
+ try {
+ // By default set this variable
+ checkAgainstZkBeforeDeletion.set(true);
+ // Ensure the token root path exists
+ if (zkClient.checkExists().forPath(ZK_DTSM_TOKENS_ROOT) == null) {
+ zkClient.create().creatingParentsIfNeeded()
+ .withMode(CreateMode.PERSISTENT)
+ .forPath(ZK_DTSM_TOKENS_ROOT);
+ }
+ // Set up zookeeper client
+ try {
+ zookeeper = zkClient.getZookeeperClient().getZooKeeper();
+ } catch (Exception e) {
+ LOG.info("Cannot get zookeeper client ", e);
+ } finally {
+ if (zookeeper == null) {
+ throw new IOException("Zookeeper client is null");
+ }
+ }
+
+ LOG.info("Start loading token cache");
+ long start = Time.now();
+ rebuildTokenCache(true);
+ LOG.info("Loaded token cache in {} milliseconds", Time.now() - start);
+
+ int syncInterval = conf.getInt(ZK_DTSM_ROUTER_TOKEN_SYNC_INTERVAL,
+ ZK_DTSM_ROUTER_TOKEN_SYNC_INTERVAL_DEFAULT);
+ scheduler.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ rebuildTokenCache(false);
+ } catch (Exception e) {
+ // ignore
+ }
+ }
+ }, syncInterval, syncInterval, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ LOG.error("Error rebuilding local cache for zkDelegationTokens ", e);
+ }
+ }
+ }
+
+ @Override
+ public void stopThreads() {
+ super.stopThreads();
+ scheduler.shutdown();
+ }
+
@Override
public DelegationTokenIdentifier createIdentifier() {
return new DelegationTokenIdentifier();
}
+
+ /**
+ * This function will rebuild local token cache from zk storage.
+ * It is first called when the secret manager is initialized and
+ * then regularly at a configured interval.
+ *
+ * @param initial whether this is called during initialization
+ * @throws IOException
+ */
+ private void rebuildTokenCache(boolean initial) throws IOException {
+ localTokenCache.clear();
+ // Use bare zookeeper client to get all children since curator will
+ // wrap the same API with a sorting process. This is time consuming given
+ // millions of tokens
+ List<String> zkTokens;
+ try {
+ zkTokens = zookeeper.getChildren(TOKEN_PATH, false);
+ } catch (KeeperException | InterruptedException e) {
+ throw new IOException("Tokens cannot be fetched from path "
+ + TOKEN_PATH, e);
+ }
+ byte[] data;
+ for (String tokenPath : zkTokens) {
+ try {
+ data = zkClient.getData().forPath(
+ ZK_DTSM_TOKENS_ROOT + "/" + tokenPath);
+ } catch (KeeperException.NoNodeException e) {
+ LOG.debug("No node in path [" + tokenPath + "]");
+ continue;
+ } catch (Exception ex) {
+ throw new IOException(ex);
+ }
+ // Store data to currentTokenMap
+ processTokenAddOrUpdate(data);
+ // Store data to localTokenCache for sync
+ AbstractDelegationTokenIdentifier ident = createIdentifier();
+ DataInputStream din = new DataInputStream(new
ByteArrayInputStream(data));
+ ident.readFields(din);
+ localTokenCache.add(ident);
+ }
+ if (!initial) {
+ // Sync zkTokens with local cache, specifically
+ // 1) add/update tokens to local cache from zk, which is done through
+ // processTokenAddOrUpdate above
+ // 2) remove tokens in local cache but not in zk anymore
+ for (AbstractDelegationTokenIdentifier ident : currentTokens.keySet()) {
Review comment:
After this loop, content of `loacalTokenCache` is equal to
`currentTokens` right? If that, should we set the currentTokens =
localTokenCache directly? Please correct me if wrong.
##########
File path:
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/security/token/TestZKDelegationTokenSecretManagerImpl.java
##########
@@ -0,0 +1,216 @@
+package org.apache.hadoop.hdfs.server.federation.security.token;
Review comment:
lack of license annotation?
##########
File path:
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/ZKDelegationTokenSecretManagerImpl.java
##########
@@ -19,38 +19,211 @@
package org.apache.hadoop.hdfs.server.federation.router.security.token;
import
org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.security.token.Token;
import
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
import
org.apache.hadoop.security.token.delegation.ZKDelegationTokenSecretManager;
+import org.apache.hadoop.util.Time;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
/**
* Zookeeper based router delegation token store implementation.
*/
public class ZKDelegationTokenSecretManagerImpl extends
ZKDelegationTokenSecretManager<AbstractDelegationTokenIdentifier> {
+ public static final String ZK_DTSM_ROUTER_TOKEN_SYNC_INTERVAL =
+ "zk-dt-secret-manager.router.token.sync.interval";
Review comment:
a. It is better to keep the same code style.
```
public static final String ZK_DTSM_ROUTER_TOKEN_SYNC_INTERVAL =
"zk-dt-secret-manager.router.token.sync.interval";
```
->
```
public static final String ZK_DTSM_ROUTER_TOKEN_SYNC_INTERVAL =
ZK_CONF_PREFIX + "router.token.sync.interval";
```
b. IMO, this is common improvement, it should not aimed at Router only,
right?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]