This is an automated email from the ASF dual-hosted git repository.

inigoiri pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 694d2b646f2 HDFS-17148. RBF: SQLDelegationTokenSecretManager must 
cleanup expired tokens in SQL (#5966)
694d2b646f2 is described below

commit 694d2b646f22dfe7dcc5bfe6bc01dfcfa96f0f00
Author: hchaverri <55413673+hchave...@users.noreply.github.com>
AuthorDate: Fri Aug 25 17:44:04 2023 -0700

    HDFS-17148. RBF: SQLDelegationTokenSecretManager must cleanup expired 
tokens in SQL (#5966)
---
 .../AbstractDelegationTokenSecretManager.java      | 14 +++-
 .../SQLDelegationTokenSecretManager.java           | 45 ++++++++++++
 .../token/SQLDelegationTokenSecretManagerImpl.java | 24 +++++++
 .../TestSQLDelegationTokenSecretManagerImpl.java   | 80 ++++++++++++++++++++--
 4 files changed, 155 insertions(+), 8 deletions(-)

diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java
index bc0ef624d5a..abc4b3e9933 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java
@@ -178,6 +178,14 @@ public abstract class 
AbstractDelegationTokenSecretManager<TokenIdent
     return currentTokens.size();
   }
 
+  /**
+   * Interval for tokens to be renewed.
+   * @return Renew interval in milliseconds.
+   */
+  protected long getTokenRenewInterval() {
+    return this.tokenRenewInterval;
+  }
+
   /** 
    * Add a previously used master key to cache (when NN restarts), 
    * should be called before activate().
@@ -738,7 +746,7 @@ public abstract class 
AbstractDelegationTokenSecretManager<TokenIdent
     Set<TokenIdent> expiredTokens = new HashSet<TokenIdent>();
     synchronized (this) {
       Iterator<Map.Entry<TokenIdent, DelegationTokenInformation>> i =
-          currentTokens.entrySet().iterator();
+          getCandidateTokensForCleanup().entrySet().iterator();
       while (i.hasNext()) {
         Map.Entry<TokenIdent, DelegationTokenInformation> entry = i.next();
         long renewDate = entry.getValue().getRenewDate();
@@ -752,6 +760,10 @@ public abstract class 
AbstractDelegationTokenSecretManager<TokenIdent
     logExpireTokens(expiredTokens);
   }
 
+  protected Map<TokenIdent, DelegationTokenInformation> 
getCandidateTokensForCleanup() {
+    return this.currentTokens;
+  }
+
   protected void logExpireTokens(
       Collection<TokenIdent> expiredTokens) throws IOException {
     for (TokenIdent ident : expiredTokens) {
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/SQLDelegationTokenSecretManager.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/SQLDelegationTokenSecretManager.java
index 75f00d3f924..d2c41f31d1d 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/SQLDelegationTokenSecretManager.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/SQLDelegationTokenSecretManager.java
@@ -24,6 +24,8 @@ import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.classification.VisibleForTesting;
@@ -50,6 +52,9 @@ public abstract class 
SQLDelegationTokenSecretManager<TokenIdent
   private static final String SQL_DTSM_TOKEN_SEQNUM_BATCH_SIZE = 
SQL_DTSM_CONF_PREFIX
       + "token.seqnum.batch.size";
   public static final int DEFAULT_SEQ_NUM_BATCH_SIZE = 10;
+  public static final String SQL_DTSM_TOKEN_MAX_CLEANUP_RESULTS = 
SQL_DTSM_CONF_PREFIX
+      + "token.max.cleanup.results";
+  public static final int SQL_DTSM_TOKEN_MAX_CLEANUP_RESULTS_DEFAULT = 1000;
   public static final String SQL_DTSM_TOKEN_LOADING_CACHE_EXPIRATION = 
SQL_DTSM_CONF_PREFIX
       + "token.loading.cache.expiration";
   public static final long SQL_DTSM_TOKEN_LOADING_CACHE_EXPIRATION_DEFAULT =
@@ -63,6 +68,9 @@ public abstract class 
SQLDelegationTokenSecretManager<TokenIdent
   // exhausted, including during initialization.
   private final int seqNumBatchSize;
 
+  // Number of tokens to obtain from SQL during the cleanup process.
+  private final int maxTokenCleanupResults;
+
   // Last sequenceNum in the current batch that has been allocated to a token.
   private int currentSeqNum;
 
@@ -82,6 +90,8 @@ public abstract class 
SQLDelegationTokenSecretManager<TokenIdent
 
     this.seqNumBatchSize = conf.getInt(SQL_DTSM_TOKEN_SEQNUM_BATCH_SIZE,
         DEFAULT_SEQ_NUM_BATCH_SIZE);
+    this.maxTokenCleanupResults = 
conf.getInt(SQL_DTSM_TOKEN_MAX_CLEANUP_RESULTS,
+        SQL_DTSM_TOKEN_MAX_CLEANUP_RESULTS_DEFAULT);
 
     long cacheExpirationMs = 
conf.getTimeDuration(SQL_DTSM_TOKEN_LOADING_CACHE_EXPIRATION,
         SQL_DTSM_TOKEN_LOADING_CACHE_EXPIRATION_DEFAULT, 
TimeUnit.MILLISECONDS);
@@ -153,6 +163,39 @@ public abstract class 
SQLDelegationTokenSecretManager<TokenIdent
     return super.cancelToken(token, canceller);
   }
 
+  /**
+   * Obtain a list of tokens that will be considered for cleanup, based on the 
last
+   * time the token was updated in SQL. This list may include tokens that are 
not
+   * expired and should not be deleted (e.g. if the token was last renewed 
using a
+   * higher renewal interval).
+   * The number of results is limited to reduce performance impact. Some level 
of
+   * contention is expected when multiple routers run cleanup simultaneously.
+   * @return Map of tokens that have not been updated in SQL after the token 
renewal
+   *         period.
+   */
+  @Override
+  protected Map<TokenIdent, DelegationTokenInformation> 
getCandidateTokensForCleanup() {
+    Map<TokenIdent, DelegationTokenInformation> tokens = new HashMap<>();
+    try {
+      // Query SQL for tokens that haven't been updated after
+      // the last token renewal period.
+      long maxModifiedTime = Time.now() - getTokenRenewInterval();
+      Map<byte[], byte[]> tokenInfoBytesList = 
selectStaleTokenInfos(maxModifiedTime,
+          this.maxTokenCleanupResults);
+
+      LOG.info("Found {} tokens for cleanup", tokenInfoBytesList.size());
+      for (Map.Entry<byte[], byte[]> tokenInfoBytes : 
tokenInfoBytesList.entrySet()) {
+        TokenIdent tokenIdent = createTokenIdent(tokenInfoBytes.getKey());
+        DelegationTokenInformation tokenInfo = 
createTokenInfo(tokenInfoBytes.getValue());
+        tokens.put(tokenIdent, tokenInfo);
+      }
+    } catch (IOException | SQLException e) {
+      LOG.error("Failed to get candidate tokens for cleanup in SQL secret 
manager", e);
+    }
+
+    return tokens;
+  }
+
   /**
    * Removes the existing TokenInformation from the SQL database to
    * invalidate it.
@@ -415,6 +458,8 @@ public abstract class 
SQLDelegationTokenSecretManager<TokenIdent
   // Token operations in SQL database
   protected abstract byte[] selectTokenInfo(int sequenceNum, byte[] 
tokenIdentifier)
       throws SQLException;
+  protected abstract Map<byte[], byte[]> selectStaleTokenInfos(long 
maxModifiedTime,
+      int maxResults) throws SQLException;
   protected abstract void insertToken(int sequenceNum, byte[] tokenIdentifier, 
byte[] tokenInfo)
       throws SQLException;
   protected abstract void updateToken(int sequenceNum, byte[] tokenIdentifier, 
byte[] tokenInfo)
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/SQLDelegationTokenSecretManagerImpl.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/SQLDelegationTokenSecretManagerImpl.java
index 7da54778f31..e85baae0c3a 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/SQLDelegationTokenSecretManagerImpl.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/SQLDelegationTokenSecretManagerImpl.java
@@ -23,6 +23,9 @@ import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.util.HashMap;
+import java.util.Map;
 import org.apache.hadoop.classification.VisibleForTesting;
 import org.apache.hadoop.conf.Configuration;
 import 
org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
@@ -150,6 +153,27 @@ public class SQLDelegationTokenSecretManagerImpl
     });
   }
 
+  @Override
+  protected Map<byte[], byte[]> selectStaleTokenInfos(long maxModifiedTime, 
int maxResults)
+      throws SQLException {
+    return retryHandler.execute(() -> {
+      try (Connection connection = connectionFactory.getConnection();
+          PreparedStatement statement = connection.prepareStatement(
+              "SELECT tokenIdentifier, tokenInfo FROM Tokens WHERE 
modifiedTime < ?")) {
+        statement.setTimestamp(1, new Timestamp(maxModifiedTime));
+        statement.setMaxRows(maxResults);
+        try (ResultSet result = statement.executeQuery()) {
+          Map<byte[], byte[]> results = new HashMap<>();
+          while (result.next()) {
+            results.put(result.getBytes("tokenIdentifier"),
+                result.getBytes("tokenInfo"));
+          }
+          return results;
+        }
+      }
+    });
+  }
+
   @Override
   protected void insertDelegationKey(int keyId, byte[] delegationKey) throws 
SQLException {
     retryHandler.execute(() -> {
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/security/token/TestSQLDelegationTokenSecretManagerImpl.java
 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/security/token/TestSQLDelegationTokenSecretManagerImpl.java
index 5165f7fd770..679a2dc04da 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/security/token/TestSQLDelegationTokenSecretManagerImpl.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/security/token/TestSQLDelegationTokenSecretManagerImpl.java
@@ -30,12 +30,15 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReentrantLock;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.SecretManager;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import 
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
+import 
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
 import 
org.apache.hadoop.security.token.delegation.SQLDelegationTokenSecretManager;
+import 
org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier;
 import org.apache.hadoop.security.token.delegation.web.DelegationTokenManager;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.test.LambdaTestUtils;
@@ -52,6 +55,7 @@ public class TestSQLDelegationTokenSecretManagerImpl {
   private static final String CONNECTION_URL = "jdbc:derby:memory:TokenStore";
   private static final int TEST_MAX_RETRIES = 3;
   private static final int TOKEN_EXPIRATION_SECONDS = 1;
+  private static final int TOKEN_EXPIRATION_SCAN_SECONDS = 1;
   private static Configuration conf;
 
   @Before
@@ -75,6 +79,7 @@ public class TestSQLDelegationTokenSecretManagerImpl {
     conf.set(SQLConnectionFactory.CONNECTION_DRIVER, 
"org.apache.derby.jdbc.EmbeddedDriver");
     conf.setInt(SQLSecretManagerRetriableHandlerImpl.MAX_RETRIES, 
TEST_MAX_RETRIES);
     conf.setInt(SQLSecretManagerRetriableHandlerImpl.RETRY_SLEEP_TIME_MS, 10);
+    conf.setInt(DelegationTokenManager.REMOVAL_SCAN_INTERVAL, 
TOKEN_EXPIRATION_SCAN_SECONDS);
   }
 
   @AfterClass
@@ -190,6 +195,63 @@ public class TestSQLDelegationTokenSecretManagerImpl {
     }
   }
 
+  @Test
+  public void testRemoveExpiredTokens() throws Exception {
+    DelegationTokenManager tokenManager = 
createTokenManager(getShortLivedTokenConf());
+
+    try {
+      TestDelegationTokenSecretManager secretManager =
+          (TestDelegationTokenSecretManager) 
tokenManager.getDelegationTokenSecretManager();
+
+      // Create token to be constantly renewed.
+      Token<? extends AbstractDelegationTokenIdentifier> token1 =
+          tokenManager.createToken(UserGroupInformation.getCurrentUser(), 
"foo");
+      AbstractDelegationTokenIdentifier tokenId1 =
+          (AbstractDelegationTokenIdentifier) token1.decodeIdentifier();
+
+      // Create token expected to expire soon.
+      long expirationTime2 = Time.now();
+      AbstractDelegationTokenIdentifier tokenId2 = storeToken(secretManager, 
2, expirationTime2);
+
+      // Create token not expected to expire soon.
+      long expirationTime3 = Time.now() + 
TimeUnit.SECONDS.toMillis(TOKEN_EXPIRATION_SECONDS) * 10;
+      AbstractDelegationTokenIdentifier tokenId3 = storeToken(secretManager, 
3, expirationTime3);
+
+      GenericTestUtils.waitFor(() -> {
+        try {
+          // Constantly renew token so it doesn't expire.
+          tokenManager.renewToken(token1, "foo");
+
+          // Wait for cleanup to happen so expired token is deleted from SQL.
+          return !isTokenInSQL(secretManager, tokenId2);
+        } catch (IOException | SQLException e) {
+          throw new RuntimeException(e);
+        }
+      }, 100, 6000);
+
+      Assert.assertTrue("Renewed token must not be cleaned up",
+          isTokenInSQL(secretManager, tokenId1));
+      Assert.assertTrue("Token with future expiration must not be cleaned up",
+          isTokenInSQL(secretManager, tokenId3));
+    } finally {
+      stopTokenManager(tokenManager);
+    }
+  }
+
+  private AbstractDelegationTokenIdentifier storeToken(
+      TestDelegationTokenSecretManager secretManager, int sequenceNum, long 
expirationTime)
+      throws IOException {
+    AbstractDelegationTokenIdentifier tokenId = new 
DelegationTokenIdentifier(new Text("Test"));
+    tokenId.setOwner(new Text("foo"));
+    tokenId.setSequenceNumber(sequenceNum);
+
+    AbstractDelegationTokenSecretManager.DelegationTokenInformation tokenInfo =
+        new 
AbstractDelegationTokenSecretManager.DelegationTokenInformation(expirationTime, 
null);
+    secretManager.storeToken(tokenId, tokenInfo);
+
+    return tokenId;
+  }
+
   private Configuration getShortLivedTokenConf() {
     Configuration shortLivedConf = new Configuration(conf);
     shortLivedConf.setTimeDuration(
@@ -203,13 +265,12 @@ public class TestSQLDelegationTokenSecretManagerImpl {
       TestDelegationTokenSecretManager secretManager, 
AbstractDelegationTokenIdentifier tokenId,
       boolean expectedInSQL) throws SQLException {
     secretManager.removeExpiredStoredToken(tokenId);
-    byte[] tokenInfo = 
secretManager.selectTokenInfo(tokenId.getSequenceNumber(),
-        tokenId.getBytes());
-    if (expectedInSQL) {
-      Assert.assertNotNull("Verify token exists in database", tokenInfo);
-    } else {
-      Assert.assertNull("Verify token was removed from database", tokenInfo);
-    }
+    Assert.assertEquals(expectedInSQL, isTokenInSQL(secretManager, tokenId));
+  }
+
+  private boolean isTokenInSQL(TestDelegationTokenSecretManager secretManager,
+      AbstractDelegationTokenIdentifier tokenId) throws SQLException {
+    return secretManager.selectTokenInfo(tokenId.getSequenceNumber(), 
tokenId.getBytes()) != null;
   }
 
   @Test
@@ -544,6 +605,11 @@ public class TestSQLDelegationTokenSecretManagerImpl {
       super.removeExpiredStoredToken((AbstractDelegationTokenIdentifier) 
tokenId);
     }
 
+    public void storeToken(AbstractDelegationTokenIdentifier ident,
+        DelegationTokenInformation tokenInfo) throws IOException {
+      super.storeToken(ident, tokenInfo);
+    }
+
     public void setReadOnly(boolean readOnly) {
       ((TestConnectionFactory) getConnectionFactory()).readOnly = readOnly;
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to