This is an automated email from the ASF dual-hosted git repository.

mbalassi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6e5f8694b66d8dea8ae2926adafdb610c09d9b88
Author: Gabor Somogyi <[email protected]>
AuthorDate: Tue Dec 13 15:48:33 2022 +0100

    [FLINK-30422][runtime][security] Generalize token framework provider API
---
 .../runtime/resourcemanager/ResourceManager.java   |   3 +-
 ...Listener.java => DelegationTokenContainer.java} |  31 +++--
 .../security/token/DelegationTokenManager.java     |  16 ++-
 ...nProvider.java => DelegationTokenProvider.java} |  47 ++++---
 .../security/token/NoOpDelegationTokenManager.java |   5 +-
 .../token/hadoop/HBaseDelegationTokenProvider.java | 153 +++++++++------------
 .../token/hadoop/HadoopDelegationTokenUpdater.java |  18 ++-
 .../hadoop/HadoopFSDelegationTokenProvider.java    |  60 ++++----
 .../hadoop/KerberosDelegationTokenManager.java     | 125 ++++++++---------
 .../flink/runtime/taskexecutor/TaskExecutor.java   |   6 +-
 ...runtime.security.token.DelegationTokenProvider} |   0
 .../token/DelegationTokenContainerTest.java        |  73 ++++++++++
 ... ExceptionThrowingDelegationTokenProvider.java} |  30 ++--
 ...vider.java => TestDelegationTokenProvider.java} |  17 +--
 .../hadoop/HadoopDelegationTokenConverterTest.java |   2 +-
 .../hadoop/HadoopDelegationTokenUpdaterITCase.java |  15 +-
 .../KerberosDelegationTokenManagerITCase.java      |  25 ++--
 ...runtime.security.token.DelegationTokenProvider} |   4 +-
 .../apache/flink/yarn/YarnClusterDescriptor.java   |  10 +-
 19 files changed, 361 insertions(+), 279 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 5d38143974d..2dd390ac711 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -67,7 +67,6 @@ import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
 import org.apache.flink.runtime.rpc.Local;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.RpcServiceUtils;
-import org.apache.flink.runtime.security.token.DelegationTokenListener;
 import org.apache.flink.runtime.security.token.DelegationTokenManager;
 import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
 import org.apache.flink.runtime.slots.ResourceRequirement;
@@ -117,7 +116,7 @@ import static 
org.apache.flink.util.Preconditions.checkState;
  */
 public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
         extends FencedRpcEndpoint<ResourceManagerId>
-        implements DelegationTokenListener, ResourceManagerGateway {
+        implements DelegationTokenManager.Listener, ResourceManagerGateway {
 
     public static final String RESOURCE_MANAGER_NAME = "resourcemanager";
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenListener.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenContainer.java
similarity index 61%
rename from 
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenListener.java
rename to 
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenContainer.java
index 9a29e1acf3b..0f11452449b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenListener.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenContainer.java
@@ -18,17 +18,26 @@
 
 package org.apache.flink.runtime.security.token;
 
-import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.Experimental;
 
-/**
- * Listener for delegation tokens state changes in the {@link 
DelegationTokenManager}.
- *
- * <p>By registering it in the manager one can receive callbacks when events 
are happening related
- * to delegation tokens.
- */
-@Internal
-public interface DelegationTokenListener {
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+/** Container for delegation tokens. */
+@Experimental
+public class DelegationTokenContainer implements Serializable {
+    private Map<String, byte[]> tokens = new HashMap<>();
+
+    public Map<String, byte[]> getTokens() {
+        return tokens;
+    }
+
+    public void addToken(String key, byte[] value) {
+        tokens.put(key, value);
+    }
 
-    /** Callback function when new delegation tokens obtained. */
-    void onNewTokensObtained(byte[] tokens) throws Exception;
+    public boolean hasTokens() {
+        return !tokens.isEmpty();
+    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenManager.java
index 8a00c9ae154..57f53c75ccc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenManager.java
@@ -20,8 +20,6 @@ package org.apache.flink.runtime.security.token;
 
 import org.apache.flink.annotation.Internal;
 
-import org.apache.hadoop.security.Credentials;
-
 /**
  * Manager for delegation tokens in a Flink cluster.
  *
@@ -31,17 +29,27 @@ import org.apache.hadoop.security.Credentials;
  */
 @Internal
 public interface DelegationTokenManager {
+    /**
+     * Listener for events in the {@link DelegationTokenManager}.
+     *
+     * <p>By registering it in the manager one can receive callbacks when 
events are happening.
+     */
+    @Internal
+    interface Listener {
+        /** Callback function when new delegation tokens obtained. */
+        void onNewTokensObtained(byte[] tokens) throws Exception;
+    }
 
     /**
      * Obtains new tokens in a one-time fashion and leaves it up to the caller 
to distribute them.
      */
-    void obtainDelegationTokens(Credentials credentials) throws Exception;
+    void obtainDelegationTokens(DelegationTokenContainer container) throws 
Exception;
 
     /**
      * Creates a re-occurring task which obtains new tokens and automatically 
distributes them to
      * task managers.
      */
-    void start(DelegationTokenListener delegationTokenListener) throws 
Exception;
+    void start(Listener listener) throws Exception;
 
     /** Stops re-occurring token obtain task. */
     void stop();
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenProvider.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenProvider.java
similarity index 60%
rename from 
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenProvider.java
rename to 
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenProvider.java
index 061f32c6d6d..d98e5816fa9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenProvider.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenProvider.java
@@ -16,27 +16,44 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.security.token.hadoop;
+package org.apache.flink.runtime.security.token;
 
 import org.apache.flink.annotation.Experimental;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.security.token.DelegationTokenManager;
-
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.UserGroupInformation;
 
 import java.util.Optional;
 
 /**
- * Delegation token provider API for Hadoop components. Instances of token 
providers are loaded by
- * {@link DelegationTokenManager} through service loader.
- *
- * <h2>Important Notes</h2>
- *
- * <p>Tokens are stored in {@link UserGroupInformation}
+ * Delegation token provider API. Instances of {@link 
DelegationTokenProvider}s are loaded by {@link
+ * DelegationTokenManager} through service loader.
  */
 @Experimental
-public interface HadoopDelegationTokenProvider {
+public interface DelegationTokenProvider {
+    /** Container for obtained delegation tokens. */
+    class ObtainedDelegationTokens {
+        /** Serialized form of delegation tokens. */
+        private byte[] tokens;
+
+        /**
+         * Time until the tokens are valid, if valid forever then 
`Optional.empty()` should be
+         * returned.
+         */
+        private Optional<Long> validUntil;
+
+        public ObtainedDelegationTokens(byte[] tokens, Optional<Long> 
validUntil) {
+            this.tokens = tokens;
+            this.validUntil = validUntil;
+        }
+
+        public byte[] getTokens() {
+            return tokens;
+        }
+
+        public Optional<Long> getValidUntil() {
+            return validUntil;
+        }
+    }
+
     /** Name of the service to provide delegation tokens. This name should be 
unique. */
     String serviceName();
 
@@ -57,9 +74,7 @@ public interface HadoopDelegationTokenProvider {
     /**
      * Obtain delegation tokens for this service.
      *
-     * @param credentials Credentials to add tokens and security keys to.
-     * @return If the returned tokens are renewable and can be renewed, return 
the time of the next
-     *     renewal, otherwise `Optional.empty()` should be returned.
+     * @return the obtained delegation tokens.
      */
-    Optional<Long> obtainDelegationTokens(Credentials credentials) throws 
Exception;
+    ObtainedDelegationTokens obtainDelegationTokens() throws Exception;
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/NoOpDelegationTokenManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/NoOpDelegationTokenManager.java
index 92063bc07eb..cab68f825e6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/NoOpDelegationTokenManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/NoOpDelegationTokenManager.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.security.token;
 
 import org.apache.flink.annotation.Internal;
 
-import org.apache.hadoop.security.Credentials;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,12 +34,12 @@ public class NoOpDelegationTokenManager implements 
DelegationTokenManager {
     }
 
     @Override
-    public void obtainDelegationTokens(Credentials credentials) {
+    public void obtainDelegationTokens(DelegationTokenContainer container) {
         LOG.debug("obtainDelegationTokens");
     }
 
     @Override
-    public void start(DelegationTokenListener delegationTokenListener) {
+    public void start(Listener listener) {
         LOG.debug("start");
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HBaseDelegationTokenProvider.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HBaseDelegationTokenProvider.java
index 4441267f355..572a49eb8c3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HBaseDelegationTokenProvider.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HBaseDelegationTokenProvider.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.security.token.hadoop;
 
 import org.apache.flink.annotation.Experimental;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.security.token.DelegationTokenProvider;
 import org.apache.flink.runtime.util.HadoopUtils;
 import org.apache.flink.util.Preconditions;
 
@@ -42,7 +43,7 @@ import java.util.Optional;
  * tend to move this but that would be a breaking change.
  */
 @Experimental
-public class HBaseDelegationTokenProvider implements 
HadoopDelegationTokenProvider {
+public class HBaseDelegationTokenProvider implements DelegationTokenProvider {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(HBaseDelegationTokenProvider.class);
 
@@ -89,7 +90,7 @@ public class HBaseDelegationTokenProvider implements 
HadoopDelegationTokenProvid
     }
 
     @Override
-    public boolean delegationTokensRequired() {
+    public boolean delegationTokensRequired() throws Exception {
         try {
             if 
(!HadoopUtils.isKerberosSecurityEnabled(UserGroupInformation.getCurrentUser())) 
{
                 return false;
@@ -99,96 +100,74 @@ public class HBaseDelegationTokenProvider implements 
HadoopDelegationTokenProvid
             return false;
         }
         return Objects.nonNull(hbaseConf)
-                && 
hbaseConf.get("hbase.security.authentication").equals("kerberos");
+                && 
hbaseConf.get("hbase.security.authentication").equals("kerberos")
+                && kerberosLoginProvider.isLoginPossible();
     }
 
     @Override
-    public Optional<Long> obtainDelegationTokens(Credentials credentials) 
throws Exception {
-        if (kerberosLoginProvider.isLoginPossible()) {
-            UserGroupInformation freshUGI = 
kerberosLoginProvider.doLoginAndReturnUGI();
-            return freshUGI.doAs(
-                    (PrivilegedExceptionAction<Optional<Long>>)
-                            () -> {
-                                Token<?> token;
-                                try {
-                                    Preconditions.checkNotNull(hbaseConf);
-                                    try {
-                                        LOG.info("Obtaining Kerberos security 
token for HBase");
-                                        // ----
-                                        // Intended call: 
Token<AuthenticationTokenIdentifier> token
-                                        // =
-                                        // TokenUtil.obtainToken(conf);
-                                        token =
-                                                (Token<?>)
-                                                        Class.forName(
-                                                                        
"org.apache.hadoop.hbase.security.token.TokenUtil")
-                                                                .getMethod(
-                                                                        
"obtainToken",
-                                                                        
org.apache.hadoop.conf
-                                                                               
 .Configuration
-                                                                               
 .class)
-                                                                .invoke(null, 
hbaseConf);
-                                    } catch (NoSuchMethodException e) {
-                                        // for HBase 2
-
-                                        // ----
-                                        // Intended call: ConnectionFactory 
connectionFactory =
-                                        // 
ConnectionFactory.createConnection(conf);
-                                        Closeable connectionFactory =
-                                                (Closeable)
-                                                        Class.forName(
-                                                                        
"org.apache.hadoop.hbase.client.ConnectionFactory")
-                                                                .getMethod(
-                                                                        
"createConnection",
-                                                                        
org.apache.hadoop.conf
-                                                                               
 .Configuration
-                                                                               
 .class)
-                                                                .invoke(null, 
hbaseConf);
-                                        // ----
-                                        Class<?> connectionClass =
+    public ObtainedDelegationTokens obtainDelegationTokens() throws Exception {
+        UserGroupInformation freshUGI = 
kerberosLoginProvider.doLoginAndReturnUGI();
+        return freshUGI.doAs(
+                (PrivilegedExceptionAction<ObtainedDelegationTokens>)
+                        () -> {
+                            Token<?> token;
+                            Preconditions.checkNotNull(hbaseConf);
+                            try {
+                                LOG.info("Obtaining Kerberos security token 
for HBase");
+                                // ----
+                                // Intended call: 
Token<AuthenticationTokenIdentifier> token
+                                // =
+                                // TokenUtil.obtainToken(conf);
+                                token =
+                                        (Token<?>)
                                                 Class.forName(
-                                                        
"org.apache.hadoop.hbase.client.Connection");
-                                        // ----
-                                        // Intended call: 
Token<AuthenticationTokenIdentifier> token
-                                        // =
-                                        // 
TokenUtil.obtainToken(connectionFactory);
-                                        token =
-                                                (Token<?>)
-                                                        Class.forName(
-                                                                        
"org.apache.hadoop.hbase.security.token.TokenUtil")
-                                                                .getMethod(
-                                                                        
"obtainToken",
-                                                                        
connectionClass)
-                                                                .invoke(null, 
connectionFactory);
-                                        if (null != connectionFactory) {
-                                            connectionFactory.close();
-                                        }
-                                    }
-                                    if (token == null) {
-                                        LOG.error("No Kerberos security token 
for HBase available");
-                                    } else {
-                                        
credentials.addToken(token.getService(), token);
-                                        LOG.info(
-                                                "Added HBase Kerberos security 
token to credentials.");
-                                    }
-                                } catch (ClassNotFoundException
-                                        | NoSuchMethodException
-                                        | IllegalAccessException
-                                        | InvocationTargetException
-                                        | IOException e) {
-                                    LOG.info(
-                                            "HBase is not available (failed to 
obtain delegation tokens): {} : \"{}\".",
-                                            e.getClass().getSimpleName(),
-                                            e.getMessage());
+                                                                
"org.apache.hadoop.hbase.security.token.TokenUtil")
+                                                        .getMethod(
+                                                                "obtainToken",
+                                                                
org.apache.hadoop.conf.Configuration
+                                                                        .class)
+                                                        .invoke(null, 
hbaseConf);
+                            } catch (NoSuchMethodException e) {
+                                // for HBase 2
+
+                                // ----
+                                // Intended call: ConnectionFactory 
connectionFactory =
+                                // ConnectionFactory.createConnection(conf);
+                                Closeable connectionFactory =
+                                        (Closeable)
+                                                Class.forName(
+                                                                
"org.apache.hadoop.hbase.client.ConnectionFactory")
+                                                        .getMethod(
+                                                                
"createConnection",
+                                                                
org.apache.hadoop.conf.Configuration
+                                                                        .class)
+                                                        .invoke(null, 
hbaseConf);
+                                // ----
+                                Class<?> connectionClass =
+                                        
Class.forName("org.apache.hadoop.hbase.client.Connection");
+                                // ----
+                                // Intended call: 
Token<AuthenticationTokenIdentifier> token
+                                // =
+                                // TokenUtil.obtainToken(connectionFactory);
+                                token =
+                                        (Token<?>)
+                                                Class.forName(
+                                                                
"org.apache.hadoop.hbase.security.token.TokenUtil")
+                                                        
.getMethod("obtainToken", connectionClass)
+                                                        .invoke(null, 
connectionFactory);
+                                if (null != connectionFactory) {
+                                    connectionFactory.close();
                                 }
+                            }
 
-                                // HBase does not support to renew the 
delegation token currently
-                                // 
https://cwiki.apache.org/confluence/display/HADOOP2/Hbase+HBaseTokenAuthentication
-                                return Optional.empty();
-                            });
-        } else {
-            LOG.info("Real user has no kerberos credentials so no tokens 
obtained");
-            return Optional.empty();
-        }
+                            Credentials credentials = new Credentials();
+                            credentials.addToken(token.getService(), token);
+
+                            // HBase does not support to renew the delegation 
token currently
+                            // 
https://cwiki.apache.org/confluence/display/HADOOP2/Hbase+HBaseTokenAuthentication
+                            return new ObtainedDelegationTokens(
+                                    
HadoopDelegationTokenConverter.serialize(credentials),
+                                    Optional.empty());
+                        });
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenUpdater.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenUpdater.java
index a4528cfad54..d6ec75797a9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenUpdater.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenUpdater.java
@@ -19,14 +19,14 @@
 package org.apache.flink.runtime.security.token.hadoop;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.security.token.DelegationTokenContainer;
+import org.apache.flink.util.InstantiationUtil;
 
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-
 /** Delegation token updater functionality. */
 @Internal
 public final class HadoopDelegationTokenUpdater {
@@ -36,11 +36,17 @@ public final class HadoopDelegationTokenUpdater {
     private HadoopDelegationTokenUpdater() {}
 
     /** Updates delegation tokens for the current user. */
-    public static void addCurrentUserCredentials(byte[] credentialsBytes) 
throws IOException {
-        if (credentialsBytes == null || credentialsBytes.length == 0) {
-            throw new IllegalArgumentException("Illegal credentials tried to 
be set");
+    public static void addCurrentUserCredentials(byte[] containerBytes) throws 
Exception {
+        if (containerBytes == null || containerBytes.length == 0) {
+            throw new IllegalArgumentException("Illegal container tried to be 
processed");
+        }
+        DelegationTokenContainer container =
+                InstantiationUtil.deserializeObject(
+                        containerBytes, 
HadoopDelegationTokenUpdater.class.getClassLoader());
+        Credentials credentials = new Credentials();
+        for (byte[] v : container.getTokens().values()) {
+            credentials.addAll(HadoopDelegationTokenConverter.deserialize(v));
         }
-        Credentials credentials = 
HadoopDelegationTokenConverter.deserialize(credentialsBytes);
         LOG.info("Updating delegation tokens for current user");
         dumpAllTokens(credentials);
         UserGroupInformation.getCurrentUser().addCredentials(credentials);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopFSDelegationTokenProvider.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopFSDelegationTokenProvider.java
index 81d25eb0f51..a6a13f3a6cb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopFSDelegationTokenProvider.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopFSDelegationTokenProvider.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.ConfigUtils;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.configuration.SecurityOptions;
+import org.apache.flink.runtime.security.token.DelegationTokenProvider;
 import org.apache.flink.runtime.util.HadoopUtils;
 import org.apache.flink.util.FlinkRuntimeException;
 
@@ -47,7 +48,7 @@ import java.util.Set;
 
 /** Delegation token provider for Hadoop filesystems. */
 @Experimental
-public class HadoopFSDelegationTokenProvider implements 
HadoopDelegationTokenProvider {
+public class HadoopFSDelegationTokenProvider implements 
DelegationTokenProvider {
 
     private static final Logger LOG =
             LoggerFactory.getLogger(HadoopFSDelegationTokenProvider.class);
@@ -74,37 +75,38 @@ public class HadoopFSDelegationTokenProvider implements 
HadoopDelegationTokenPro
 
     @Override
     public boolean delegationTokensRequired() throws Exception {
-        return 
HadoopUtils.isKerberosSecurityEnabled(UserGroupInformation.getCurrentUser());
+        return 
HadoopUtils.isKerberosSecurityEnabled(UserGroupInformation.getCurrentUser())
+                && kerberosLoginProvider.isLoginPossible();
     }
 
     @Override
-    public Optional<Long> obtainDelegationTokens(Credentials credentials) 
throws Exception {
-        if (kerberosLoginProvider.isLoginPossible()) {
-            UserGroupInformation freshUGI = 
kerberosLoginProvider.doLoginAndReturnUGI();
-            return freshUGI.doAs(
-                    (PrivilegedExceptionAction<Optional<Long>>)
-                            () -> {
-                                Clock clock = Clock.systemDefaultZone();
-                                Set<FileSystem> fileSystemsToAccess = 
getFileSystemsToAccess();
-
-                                obtainDelegationTokens(
-                                        getRenewer(), fileSystemsToAccess, 
credentials);
-
-                                // Get the token renewal interval if it is not 
set. It will be
-                                // called
-                                // only once.
-                                if (tokenRenewalInterval == null) {
-                                    tokenRenewalInterval =
-                                            getTokenRenewalInterval(clock, 
fileSystemsToAccess);
-                                }
-                                return tokenRenewalInterval.flatMap(
-                                        interval ->
-                                                getTokenRenewalDate(clock, 
credentials, interval));
-                            });
-        } else {
-            LOG.info("Real user has no kerberos credentials so no tokens 
obtained");
-            return Optional.empty();
-        }
+    public ObtainedDelegationTokens obtainDelegationTokens() throws Exception {
+        UserGroupInformation freshUGI = 
kerberosLoginProvider.doLoginAndReturnUGI();
+        return freshUGI.doAs(
+                (PrivilegedExceptionAction<ObtainedDelegationTokens>)
+                        () -> {
+                            Credentials credentials = new Credentials();
+                            Clock clock = Clock.systemDefaultZone();
+                            Set<FileSystem> fileSystemsToAccess = 
getFileSystemsToAccess();
+
+                            obtainDelegationTokens(getRenewer(), 
fileSystemsToAccess, credentials);
+
+                            // Get the token renewal interval if it is not 
set. It will be
+                            // called
+                            // only once.
+                            if (tokenRenewalInterval == null) {
+                                tokenRenewalInterval =
+                                        getTokenRenewalInterval(clock, 
fileSystemsToAccess);
+                            }
+                            Optional<Long> validUntil =
+                                    tokenRenewalInterval.flatMap(
+                                            interval ->
+                                                    getTokenRenewalDate(
+                                                            clock, 
credentials, interval));
+                            return new ObtainedDelegationTokens(
+                                    
HadoopDelegationTokenConverter.serialize(credentials),
+                                    validUntil);
+                        });
     }
 
     @VisibleForTesting
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/KerberosDelegationTokenManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/KerberosDelegationTokenManager.java
index 60b754bd450..98bab881b1f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/KerberosDelegationTokenManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/KerberosDelegationTokenManager.java
@@ -21,12 +21,13 @@ package org.apache.flink.runtime.security.token.hadoop;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.security.token.DelegationTokenListener;
+import org.apache.flink.runtime.security.token.DelegationTokenContainer;
 import org.apache.flink.runtime.security.token.DelegationTokenManager;
+import org.apache.flink.runtime.security.token.DelegationTokenProvider;
 import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.concurrent.ScheduledExecutor;
 
-import org.apache.hadoop.security.Credentials;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -67,9 +68,7 @@ public class KerberosDelegationTokenManager implements 
DelegationTokenManager {
 
     private final long renewalRetryBackoffPeriod;
 
-    private final KerberosLoginProvider kerberosLoginProvider;
-
-    @VisibleForTesting final Map<String, HadoopDelegationTokenProvider> 
delegationTokenProviders;
+    @VisibleForTesting final Map<String, DelegationTokenProvider> 
delegationTokenProviders;
 
     @Nullable private final ScheduledExecutor scheduledExecutor;
 
@@ -81,7 +80,7 @@ public class KerberosDelegationTokenManager implements 
DelegationTokenManager {
     @Nullable
     private ScheduledFuture<?> tokensUpdateFuture;
 
-    @Nullable private DelegationTokenListener delegationTokenListener;
+    @Nullable private Listener listener;
 
     public KerberosDelegationTokenManager(
             Configuration configuration,
@@ -91,26 +90,29 @@ public class KerberosDelegationTokenManager implements 
DelegationTokenManager {
         this.tokensRenewalTimeRatio = 
configuration.get(KERBEROS_TOKENS_RENEWAL_TIME_RATIO);
         this.renewalRetryBackoffPeriod =
                 
configuration.get(KERBEROS_TOKENS_RENEWAL_RETRY_BACKOFF).toMillis();
-        this.kerberosLoginProvider = new KerberosLoginProvider(configuration);
         this.delegationTokenProviders = loadProviders();
         this.scheduledExecutor = scheduledExecutor;
         this.ioExecutor = ioExecutor;
     }
 
-    private Map<String, HadoopDelegationTokenProvider> loadProviders() {
+    private Map<String, DelegationTokenProvider> loadProviders() {
         LOG.info("Loading delegation token providers");
 
-        ServiceLoader<HadoopDelegationTokenProvider> serviceLoader =
-                ServiceLoader.load(HadoopDelegationTokenProvider.class);
+        ServiceLoader<DelegationTokenProvider> serviceLoader =
+                ServiceLoader.load(DelegationTokenProvider.class);
 
-        Map<String, HadoopDelegationTokenProvider> providers = new HashMap<>();
-        for (HadoopDelegationTokenProvider provider : serviceLoader) {
+        Map<String, DelegationTokenProvider> providers = new HashMap<>();
+        for (DelegationTokenProvider provider : serviceLoader) {
             try {
                 if (isProviderEnabled(provider.serviceName())) {
                     provider.init(configuration);
                     LOG.info(
                             "Delegation token provider {} loaded and 
initialized",
                             provider.serviceName());
+                    checkState(
+                            !providers.containsKey(provider.serviceName()),
+                            "Delegation token provider with service name {} 
has multiple implementations",
+                            provider.serviceName());
                     providers.put(provider.serviceName(), provider);
                 } else {
                     LOG.info(
@@ -148,47 +150,47 @@ public class KerberosDelegationTokenManager implements 
DelegationTokenManager {
      * Obtains new tokens in a one-time fashion and leaves it up to the caller 
to distribute them.
      */
     @Override
-    public void obtainDelegationTokens(Credentials credentials) throws 
Exception {
+    public void obtainDelegationTokens(DelegationTokenContainer container) 
throws Exception {
         LOG.info("Obtaining delegation tokens");
-        obtainDelegationTokensAndGetNextRenewal(credentials);
+        obtainDelegationTokensAndGetNextRenewal(container);
         LOG.info("Delegation tokens obtained successfully");
     }
 
-    protected Optional<Long> 
obtainDelegationTokensAndGetNextRenewal(Credentials credentials) {
-        Optional<Long> nextRenewal =
-                delegationTokenProviders.values().stream()
-                        .map(
-                                provider -> {
-                                    try {
-                                        Optional<Long> nr = Optional.empty();
-                                        if 
(provider.delegationTokensRequired()) {
-                                            LOG.debug(
-                                                    "Obtaining delegation 
token for service {}",
-                                                    provider.serviceName());
-                                            nr = 
provider.obtainDelegationTokens(credentials);
-                                            LOG.debug(
-                                                    "Obtained delegation token 
for service {} successfully",
-                                                    provider.serviceName());
-                                        } else {
-                                            LOG.debug(
-                                                    "Service {} does not need 
to obtain delegation token",
-                                                    provider.serviceName());
-                                        }
-                                        return nr;
-                                    } catch (Exception e) {
-                                        LOG.error(
-                                                "Failed to obtain delegation 
token for provider {}",
-                                                provider.serviceName(),
-                                                e);
-                                        throw new FlinkRuntimeException(e);
-                                    }
-                                })
-                        .flatMap(nr -> 
nr.map(Stream::of).orElseGet(Stream::empty))
-                        .min(Long::compare);
-
-        HadoopDelegationTokenUpdater.dumpAllTokens(credentials);
-
-        return nextRenewal;
+    protected Optional<Long> obtainDelegationTokensAndGetNextRenewal(
+            DelegationTokenContainer container) {
+        return delegationTokenProviders.values().stream()
+                .map(
+                        p -> {
+                            Optional<Long> nr = Optional.empty();
+                            try {
+                                if (p.delegationTokensRequired()) {
+                                    LOG.debug(
+                                            "Obtaining delegation token for 
service {}",
+                                            p.serviceName());
+                                    
DelegationTokenProvider.ObtainedDelegationTokens t =
+                                            p.obtainDelegationTokens();
+                                    checkNotNull(t, "Obtained delegation 
tokens must not be null");
+                                    container.addToken(p.serviceName(), 
t.getTokens());
+                                    nr = t.getValidUntil();
+                                    LOG.debug(
+                                            "Obtained delegation token for 
service {} successfully",
+                                            p.serviceName());
+                                } else {
+                                    LOG.debug(
+                                            "Service {} does not need to 
obtain delegation token",
+                                            p.serviceName());
+                                }
+                            } catch (Exception e) {
+                                LOG.error(
+                                        "Failed to obtain delegation token for 
provider {}",
+                                        p.serviceName(),
+                                        e);
+                                throw new FlinkRuntimeException(e);
+                            }
+                            return nr;
+                        })
+                .flatMap(nr -> nr.map(Stream::of).orElseGet(Stream::empty))
+                .min(Long::compare);
     }
 
     /**
@@ -196,20 +198,14 @@ public class KerberosDelegationTokenManager implements 
DelegationTokenManager {
      * task managers.
      */
     @Override
-    public void start(DelegationTokenListener delegationTokenListener) throws 
Exception {
+    public void start(Listener listener) throws Exception {
         checkNotNull(scheduledExecutor, "Scheduled executor must not be null");
         checkNotNull(ioExecutor, "IO executor must not be null");
-        this.delegationTokenListener =
-                checkNotNull(delegationTokenListener, "Delegation token 
listener must not be null");
+        this.listener = checkNotNull(listener, "Listener must not be null");
         synchronized (tokensUpdateFutureLock) {
             checkState(tokensUpdateFuture == null, "Manager is already 
started");
         }
 
-        if (!kerberosLoginProvider.isLoginPossible()) {
-            LOG.info("Renewal is NOT possible, skipping to start renewal 
task");
-            return;
-        }
-
         startTokensUpdate();
     }
 
@@ -217,17 +213,16 @@ public class KerberosDelegationTokenManager implements 
DelegationTokenManager {
     void startTokensUpdate() {
         try {
             LOG.info("Starting tokens update task");
-            Credentials credentials = new Credentials();
-            Optional<Long> nextRenewal = 
obtainDelegationTokensAndGetNextRenewal(credentials);
-
-            if (credentials.numberOfTokens() > 0) {
-                byte[] credentialsBytes = 
HadoopDelegationTokenConverter.serialize(credentials);
+            DelegationTokenContainer container = new 
DelegationTokenContainer();
+            Optional<Long> nextRenewal = 
obtainDelegationTokensAndGetNextRenewal(container);
 
-                
HadoopDelegationTokenUpdater.addCurrentUserCredentials(credentialsBytes);
+            if (container.hasTokens()) {
+                byte[] containerBytes = 
InstantiationUtil.serializeObject(container);
+                
HadoopDelegationTokenUpdater.addCurrentUserCredentials(containerBytes);
 
                 LOG.info("Notifying listener about new tokens");
-                checkNotNull(delegationTokenListener, "Listener must not be 
null");
-                delegationTokenListener.onNewTokensObtained(credentialsBytes);
+                checkNotNull(listener, "Listener must not be null");
+                listener.onNewTokensObtained(containerBytes);
                 LOG.info("Listener notified successfully");
             } else {
                 LOG.warn("No tokens obtained so skipping listener 
notification");
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 24384585475..bd8941622f0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -2376,11 +2376,11 @@ public class TaskExecutor extends RpcEndpoint 
implements TaskExecutorGateway {
             final ClusterInformation clusterInformation = 
success.getClusterInformation();
             final ResourceManagerGateway resourceManagerGateway = 
connection.getTargetGateway();
 
-            if (success.getInitialTokens() != null) {
+            byte[] tokens = success.getInitialTokens();
+            if (tokens != null) {
                 try {
                     log.info("Receive initial delegation tokens from resource 
manager");
-                    HadoopDelegationTokenUpdater.addCurrentUserCredentials(
-                            success.getInitialTokens());
+                    
HadoopDelegationTokenUpdater.addCurrentUserCredentials(tokens);
                 } catch (Throwable t) {
                     log.error("Could not update delegation tokens.", t);
                     ExceptionUtils.rethrowIfFatalError(t);
diff --git 
a/flink-runtime/src/main/resources/META-INF/services/org.apache.flink.runtime.security.token.hadoop.HadoopDelegationTokenProvider
 
b/flink-runtime/src/main/resources/META-INF/services/org.apache.flink.runtime.security.token.DelegationTokenProvider
similarity index 100%
rename from 
flink-runtime/src/main/resources/META-INF/services/org.apache.flink.runtime.security.token.hadoop.HadoopDelegationTokenProvider
rename to 
flink-runtime/src/main/resources/META-INF/services/org.apache.flink.runtime.security.token.DelegationTokenProvider
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/DelegationTokenContainerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/DelegationTokenContainerTest.java
new file mode 100644
index 00000000000..ed63463231f
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/DelegationTokenContainerTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security.token;
+
+import org.apache.flink.util.InstantiationUtil;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** Test for {@link DelegationTokenContainer}. */
+class DelegationTokenContainerTest {
+    private static final String TOKEN_KEY = "TEST_TOKEN_KEY";
+    private static final String TOKEN_VALUE = "TEST_TOKEN_VALUE";
+
+    @Test
+    public void testRoundTrip() throws Exception {
+        final DelegationTokenContainer container = new 
DelegationTokenContainer();
+        container.addToken(TOKEN_KEY, TOKEN_VALUE.getBytes());
+
+        final byte[] containerBytes = 
InstantiationUtil.serializeObject(container);
+        final DelegationTokenContainer deserializedContainer =
+                InstantiationUtil.deserializeObject(containerBytes, 
getClass().getClassLoader());
+
+        final Map<String, byte[]> genericTokens = 
deserializedContainer.getTokens();
+        assertEquals(1, genericTokens.size());
+        assertArrayEquals(TOKEN_VALUE.getBytes(), 
genericTokens.get(TOKEN_KEY));
+    }
+
+    @Test
+    public void getTokenShouldReturnNullWhenNoTokens() {
+        final DelegationTokenContainer container = new 
DelegationTokenContainer();
+
+        assertNull(container.getTokens().get(TOKEN_KEY));
+    }
+
+    @Test
+    public void hasTokensShouldReturnFalseWhenNoTokens() {
+        final DelegationTokenContainer container = new 
DelegationTokenContainer();
+
+        assertFalse(container.hasTokens());
+    }
+
+    @Test
+    public void hasTokensShouldReturnTrueWithGenericToken() {
+        final DelegationTokenContainer container = new 
DelegationTokenContainer();
+        container.addToken(TOKEN_KEY, TOKEN_VALUE.getBytes());
+
+        assertTrue(container.hasTokens());
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/ExceptionThrowingHadoopDelegationTokenProvider.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/ExceptionThrowingDelegationTokenProvider.java
similarity index 67%
rename from 
flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/ExceptionThrowingHadoopDelegationTokenProvider.java
rename to 
flink-runtime/src/test/java/org/apache/flink/runtime/security/token/ExceptionThrowingDelegationTokenProvider.java
index 234b0b68b13..05b2875a8d1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/ExceptionThrowingHadoopDelegationTokenProvider.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/ExceptionThrowingDelegationTokenProvider.java
@@ -16,34 +16,30 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.security.token.hadoop;
+package org.apache.flink.runtime.security.token;
 
 import org.apache.flink.configuration.Configuration;
 
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.token.Token;
-
 import java.util.Optional;
 
 /**
- * An example implementation of {@link HadoopDelegationTokenProvider} which 
throws exception when
- * enabled.
+ * An example implementation of {@link DelegationTokenProvider} which throws 
exception when enabled.
  */
-public class ExceptionThrowingHadoopDelegationTokenProvider
-        implements HadoopDelegationTokenProvider {
+public class ExceptionThrowingDelegationTokenProvider implements 
DelegationTokenProvider {
 
     public static volatile boolean throwInInit = false;
     public static volatile boolean throwInUsage = false;
+    public static volatile boolean addToken = false;
     public static volatile boolean constructed = false;
 
     public static void reset() {
         throwInInit = false;
         throwInUsage = false;
+        addToken = false;
         constructed = false;
     }
 
-    public ExceptionThrowingHadoopDelegationTokenProvider() {
+    public ExceptionThrowingDelegationTokenProvider() {
         constructed = true;
     }
 
@@ -64,18 +60,18 @@ public class ExceptionThrowingHadoopDelegationTokenProvider
         if (throwInUsage) {
             throw new IllegalArgumentException();
         }
-        return true;
+        return addToken;
     }
 
     @Override
-    public Optional<Long> obtainDelegationTokens(Credentials credentials) {
+    public ObtainedDelegationTokens obtainDelegationTokens() {
         if (throwInUsage) {
             throw new IllegalArgumentException();
         }
-        final Text tokenKind = new Text("TEST_TOKEN_KIND");
-        final Text tokenService = new Text("TEST_TOKEN_SERVICE");
-        credentials.addToken(
-                tokenService, new Token<>(new byte[4], new byte[4], tokenKind, 
tokenService));
-        return Optional.empty();
+        if (addToken) {
+            return new ObtainedDelegationTokens("TEST_TOKEN_VALUE".getBytes(), 
Optional.empty());
+        } else {
+            return null;
+        }
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/TestHadoopDelegationTokenProvider.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/TestDelegationTokenProvider.java
similarity index 70%
rename from 
flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/TestHadoopDelegationTokenProvider.java
rename to 
flink-runtime/src/test/java/org/apache/flink/runtime/security/token/TestDelegationTokenProvider.java
index 7b692ad8da2..a5cf75d3dc4 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/TestHadoopDelegationTokenProvider.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/TestDelegationTokenProvider.java
@@ -16,19 +16,12 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.security.token.hadoop;
+package org.apache.flink.runtime.security.token;
 
 import org.apache.flink.configuration.Configuration;
 
-import org.apache.hadoop.security.Credentials;
-
-import java.util.Optional;
-
-/**
- * An example implementation of {@link
- * 
org.apache.flink.runtime.security.token.hadoop.HadoopDelegationTokenProvider} 
which does nothing.
- */
-public class TestHadoopDelegationTokenProvider implements 
HadoopDelegationTokenProvider {
+/** An example implementation of {@link DelegationTokenProvider} which does 
nothing. */
+public class TestDelegationTokenProvider implements DelegationTokenProvider {
 
     @Override
     public String serviceName() {
@@ -44,7 +37,7 @@ public class TestHadoopDelegationTokenProvider implements 
HadoopDelegationTokenP
     }
 
     @Override
-    public Optional<Long> obtainDelegationTokens(Credentials credentials) {
-        return Optional.empty();
+    public ObtainedDelegationTokens obtainDelegationTokens() {
+        return null;
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenConverterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenConverterTest.java
index 8ab3625df6f..5a2e9db21c3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenConverterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenConverterTest.java
@@ -32,7 +32,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
 public class HadoopDelegationTokenConverterTest {
 
     @Test
-    public void testRoundTrip() throws IOException {
+    public void testRoundTrip() throws IOException, ClassNotFoundException {
         final Text tokenKind = new Text("TEST_TOKEN_KIND");
         final Text tokenService = new Text("TEST_TOKEN_SERVICE");
         Credentials credentials = new Credentials();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenUpdaterITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenUpdaterITCase.java
index 70d1a1cfab4..7da8411dc96 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenUpdaterITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenUpdaterITCase.java
@@ -18,6 +18,9 @@
 
 package org.apache.flink.runtime.security.token.hadoop;
 
+import org.apache.flink.runtime.security.token.DelegationTokenContainer;
+import org.apache.flink.util.InstantiationUtil;
+
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.Credentials;
@@ -27,8 +30,6 @@ import org.junit.jupiter.api.Test;
 import org.mockito.ArgumentCaptor;
 import org.mockito.MockedStatic;
 
-import java.io.IOException;
-
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.mock;
@@ -60,25 +61,27 @@ public class HadoopDelegationTokenUpdaterITCase {
                             () ->
                                     
HadoopDelegationTokenUpdater.addCurrentUserCredentials(
                                             credentialsBytes));
-            assertTrue(e.getMessage().contains("Illegal credentials"));
+            assertTrue(e.getMessage().contains("Illegal container"));
         }
     }
 
     @Test
-    public void addCurrentUserCredentialsShouldOverwriteCredentials() throws 
IOException {
+    public void addCurrentUserCredentialsShouldOverwriteCredentials() throws 
Exception {
         final Text tokenKind = new Text("TEST_TOKEN_KIND");
         final Text tokenService = new Text("TEST_TOKEN_SERVICE");
         Credentials credentials = new Credentials();
         credentials.addToken(
                 tokenService, new Token<>(new byte[4], new byte[4], tokenKind, 
tokenService));
-
         byte[] credentialsBytes = 
HadoopDelegationTokenConverter.serialize(credentials);
+        DelegationTokenContainer container = new DelegationTokenContainer();
+        container.addToken("TEST_TOKEN_KEY", credentialsBytes);
+        byte[] containerBytes = InstantiationUtil.serializeObject(container);
 
         try (MockedStatic<UserGroupInformation> ugi = 
mockStatic(UserGroupInformation.class)) {
             UserGroupInformation userGroupInformation = 
mock(UserGroupInformation.class);
             
ugi.when(UserGroupInformation::getCurrentUser).thenReturn(userGroupInformation);
 
-            
HadoopDelegationTokenUpdater.addCurrentUserCredentials(credentialsBytes);
+            
HadoopDelegationTokenUpdater.addCurrentUserCredentials(containerBytes);
             ArgumentCaptor<Credentials> argumentCaptor = 
ArgumentCaptor.forClass(Credentials.class);
             verify(userGroupInformation, 
times(1)).addCredentials(argumentCaptor.capture());
             assertTrue(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/KerberosDelegationTokenManagerITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/KerberosDelegationTokenManagerITCase.java
index fd329dce3e7..45b04eeeff0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/KerberosDelegationTokenManagerITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/KerberosDelegationTokenManagerITCase.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.security.token.hadoop;
 import org.apache.flink.configuration.Configuration;
 import 
org.apache.flink.core.testutils.ManuallyTriggeredScheduledExecutorService;
 import org.apache.flink.runtime.security.token.DelegationTokenManager;
+import 
org.apache.flink.runtime.security.token.ExceptionThrowingDelegationTokenProvider;
 import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
 
 import org.apache.hadoop.security.UserGroupInformation;
@@ -50,7 +51,7 @@ public class KerberosDelegationTokenManagerITCase {
 
     @Test
     public void isProviderEnabledMustGiveBackTrueByDefault() {
-        ExceptionThrowingHadoopDelegationTokenProvider.reset();
+        ExceptionThrowingDelegationTokenProvider.reset();
         Configuration configuration = new Configuration();
         KerberosDelegationTokenManager delegationTokenManager =
                 new KerberosDelegationTokenManager(configuration, null, null);
@@ -60,7 +61,7 @@ public class KerberosDelegationTokenManagerITCase {
 
     @Test
     public void isProviderEnabledMustGiveBackFalseWhenDisabled() {
-        ExceptionThrowingHadoopDelegationTokenProvider.reset();
+        ExceptionThrowingDelegationTokenProvider.reset();
         Configuration configuration = new Configuration();
         
configuration.setBoolean("security.kerberos.token.provider.test.enabled", 
false);
         KerberosDelegationTokenManager delegationTokenManager =
@@ -80,18 +81,18 @@ public class KerberosDelegationTokenManagerITCase {
                 Exception.class,
                 () -> {
                     try {
-                        ExceptionThrowingHadoopDelegationTokenProvider.reset();
-                        
ExceptionThrowingHadoopDelegationTokenProvider.throwInInit = true;
+                        ExceptionThrowingDelegationTokenProvider.reset();
+                        ExceptionThrowingDelegationTokenProvider.throwInInit = 
true;
                         new KerberosDelegationTokenManager(new 
Configuration(), null, null);
                     } finally {
-                        ExceptionThrowingHadoopDelegationTokenProvider.reset();
+                        ExceptionThrowingDelegationTokenProvider.reset();
                     }
                 });
     }
 
     @Test
     public void testAllProvidersLoaded() {
-        ExceptionThrowingHadoopDelegationTokenProvider.reset();
+        ExceptionThrowingDelegationTokenProvider.reset();
         Configuration configuration = new Configuration();
         
configuration.setBoolean("security.kerberos.token.provider.throw.enabled", 
false);
         KerberosDelegationTokenManager delegationTokenManager =
@@ -101,7 +102,7 @@ public class KerberosDelegationTokenManagerITCase {
         assertTrue(delegationTokenManager.isProviderLoaded("hadoopfs"));
         assertTrue(delegationTokenManager.isProviderLoaded("hbase"));
         assertTrue(delegationTokenManager.isProviderLoaded("test"));
-        assertTrue(ExceptionThrowingHadoopDelegationTokenProvider.constructed);
+        assertTrue(ExceptionThrowingDelegationTokenProvider.constructed);
         assertFalse(delegationTokenManager.isProviderLoaded("throw"));
     }
 
@@ -116,11 +117,11 @@ public class KerberosDelegationTokenManagerITCase {
             UserGroupInformation userGroupInformation = 
mock(UserGroupInformation.class);
             
ugi.when(UserGroupInformation::getCurrentUser).thenReturn(userGroupInformation);
 
-            ExceptionThrowingHadoopDelegationTokenProvider.reset();
+            ExceptionThrowingDelegationTokenProvider.reset();
+            ExceptionThrowingDelegationTokenProvider.addToken = true;
             Configuration configuration = new Configuration();
             
configuration.setBoolean("security.kerberos.token.provider.throw.enabled", 
true);
             AtomicInteger startTokensUpdateCallCount = new AtomicInteger(0);
-            KerberosLoginProvider kerberosLoginProvider = new 
KerberosLoginProvider(configuration);
             KerberosDelegationTokenManager delegationTokenManager =
                     new KerberosDelegationTokenManager(
                             configuration, scheduledExecutor, scheduler) {
@@ -132,10 +133,10 @@ public class KerberosDelegationTokenManagerITCase {
                     };
 
             delegationTokenManager.startTokensUpdate();
-            ExceptionThrowingHadoopDelegationTokenProvider.throwInUsage = true;
+            ExceptionThrowingDelegationTokenProvider.throwInUsage = true;
             scheduledExecutor.triggerScheduledTasks();
             scheduler.triggerAll();
-            ExceptionThrowingHadoopDelegationTokenProvider.throwInUsage = 
false;
+            ExceptionThrowingDelegationTokenProvider.throwInUsage = false;
             scheduledExecutor.triggerScheduledTasks();
             scheduler.triggerAll();
             delegationTokenManager.stopTokensUpdate();
@@ -146,7 +147,7 @@ public class KerberosDelegationTokenManagerITCase {
 
     @Test
     public void calculateRenewalDelayShouldConsiderRenewalRatio() {
-        ExceptionThrowingHadoopDelegationTokenProvider.reset();
+        ExceptionThrowingDelegationTokenProvider.reset();
         Configuration configuration = new Configuration();
         
configuration.setBoolean("security.kerberos.token.provider.throw.enabled", 
false);
         configuration.set(KERBEROS_TOKENS_RENEWAL_TIME_RATIO, 0.5);
diff --git 
a/flink-runtime/src/test/resources/META-INF/services/org.apache.flink.runtime.security.token.hadoop.HadoopDelegationTokenProvider
 
b/flink-runtime/src/test/resources/META-INF/services/org.apache.flink.runtime.security.token.DelegationTokenProvider
similarity index 83%
rename from 
flink-runtime/src/test/resources/META-INF/services/org.apache.flink.runtime.security.token.hadoop.HadoopDelegationTokenProvider
rename to 
flink-runtime/src/test/resources/META-INF/services/org.apache.flink.runtime.security.token.DelegationTokenProvider
index 74e3982f7bc..bd3ed1d6482 100644
--- 
a/flink-runtime/src/test/resources/META-INF/services/org.apache.flink.runtime.security.token.hadoop.HadoopDelegationTokenProvider
+++ 
b/flink-runtime/src/test/resources/META-INF/services/org.apache.flink.runtime.security.token.DelegationTokenProvider
@@ -13,5 +13,5 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-org.apache.flink.runtime.security.token.hadoop.TestHadoopDelegationTokenProvider
-org.apache.flink.runtime.security.token.hadoop.ExceptionThrowingHadoopDelegationTokenProvider
+org.apache.flink.runtime.security.token.TestDelegationTokenProvider
+org.apache.flink.runtime.security.token.ExceptionThrowingDelegationTokenProvider
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
index 235400fe7ce..7cd065cf206 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
@@ -52,6 +52,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.jobmanager.JobManagerProcessSpec;
 import org.apache.flink.runtime.jobmanager.JobManagerProcessUtils;
+import org.apache.flink.runtime.security.token.DelegationTokenContainer;
 import org.apache.flink.runtime.security.token.DelegationTokenManager;
 import 
org.apache.flink.runtime.security.token.hadoop.HadoopDelegationTokenConverter;
 import 
org.apache.flink.runtime.security.token.hadoop.KerberosDelegationTokenManager;
@@ -1294,12 +1295,15 @@ public class YarnClusterDescriptor implements 
ClusterDescriptor<ApplicationId> {
     private void setTokensFor(ContainerLaunchContext containerLaunchContext) 
throws Exception {
         LOG.info("Adding delegation tokens to the AM container.");
 
-        Credentials credentials = new Credentials();
-
         DelegationTokenManager delegationTokenManager =
                 new KerberosDelegationTokenManager(flinkConfiguration, null, 
null);
-        delegationTokenManager.obtainDelegationTokens(credentials);
+        DelegationTokenContainer container = new DelegationTokenContainer();
+        delegationTokenManager.obtainDelegationTokens(container);
 
+        Credentials credentials = new Credentials();
+        for (byte[] v : container.getTokens().values()) {
+            credentials.addAll(HadoopDelegationTokenConverter.deserialize(v));
+        }
         ByteBuffer tokens = 
ByteBuffer.wrap(HadoopDelegationTokenConverter.serialize(credentials));
         containerLaunchContext.setTokens(tokens);
 

Reply via email to