This is an automated email from the ASF dual-hosted git repository.

mbalassi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 37ad3434b887023ad81df0a013d434ca17f04e89
Author: Gabor Somogyi <[email protected]>
AuthorDate: Wed Dec 14 18:16:27 2022 +0100

    [FLINK-30339][runtime][security] Add a unified delegation token manager
---
 docs/content.zh/docs/deployment/config.md          |   5 +
 .../generated/security_auth_kerberos_section.html  |  18 ---
 .../generated/security_configuration.html          |  36 ++---
 .../security_delegation_token_section.html         |  30 ++++
 .../flink/annotation/docs/Documentation.java       |   1 +
 .../flink/configuration/SecurityOptions.java       |  37 +++++
 .../runtime/entrypoint/ClusterEntrypoint.java      |   9 +-
 .../flink/runtime/minicluster/MiniCluster.java     |   9 +-
 ...ger.java => DefaultDelegationTokenManager.java} |  33 ++---
 ...a => DefaultDelegationTokenManagerFactory.java} |  31 +---
 .../token/DefaultDelegationTokenManagerTest.java   | 135 +++++++++++++++++
 .../KerberosDelegationTokenManagerITCase.java      | 160 ---------------------
 .../apache/flink/yarn/YarnClusterDescriptor.java   |   4 +-
 13 files changed, 254 insertions(+), 254 deletions(-)

diff --git a/docs/content.zh/docs/deployment/config.md 
b/docs/content.zh/docs/deployment/config.md
index cb05cd9e358..71a6237f11b 100644
--- a/docs/content.zh/docs/deployment/config.md
+++ b/docs/content.zh/docs/deployment/config.md
@@ -204,6 +204,11 @@ Flink's network connections can be secured via SSL. Please 
refer to the [SSL Set
 
 {{< generated/security_ssl_section >}}
 
+### Delegation token
+
+Flink has a pluggable authentication protocol agnostic delegation token 
framework.
+
+{{< generated/security_delegation_token_section >}}
 
 ### Auth with External Systems
 
diff --git 
a/docs/layouts/shortcodes/generated/security_auth_kerberos_section.html 
b/docs/layouts/shortcodes/generated/security_auth_kerberos_section.html
index 0d97daba7f2..ad063064b49 100644
--- a/docs/layouts/shortcodes/generated/security_auth_kerberos_section.html
+++ b/docs/layouts/shortcodes/generated/security_auth_kerberos_section.html
@@ -14,12 +14,6 @@
             <td>List&lt;String&gt;</td>
             <td>A comma-separated list of Kerberos-secured Hadoop filesystems 
Flink is going to access. For example, 
security.kerberos.access.hadoopFileSystems=hdfs://namenode2:9002,hdfs://namenode3:9003.
 The JobManager needs to have access to these filesystems to retrieve the 
security tokens.</td>
         </tr>
-        <tr>
-            <td><h5>security.kerberos.fetch.delegation-token</h5></td>
-            <td style="word-wrap: break-word;">true</td>
-            <td>Boolean</td>
-            <td>Indicates whether to fetch the delegation tokens for external 
services the Flink job needs to contact. Only HDFS and HBase are supported. It 
is used in Yarn deployments. If true, Flink will fetch HDFS and HBase 
delegation tokens and inject them into Yarn AM containers. If false, Flink will 
assume that the delegation tokens are managed outside of Flink. As a 
consequence, it will not fetch delegation tokens for HDFS and HBase. You may 
need to disable this option, if you rel [...]
-        </tr>
         <tr>
             <td><h5>security.kerberos.login.contexts</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
@@ -50,17 +44,5 @@
             <td>Duration</td>
             <td>The time period when keytab login happens automatically in 
order to always have a valid TGT.</td>
         </tr>
-        <tr>
-            <td><h5>security.kerberos.tokens.renewal.retry.backoff</h5></td>
-            <td style="word-wrap: break-word;">1 h</td>
-            <td>Duration</td>
-            <td>The time period how long to wait before retrying to obtain new 
delegation tokens after a failure.</td>
-        </tr>
-        <tr>
-            <td><h5>security.kerberos.tokens.renewal.time-ratio</h5></td>
-            <td style="word-wrap: break-word;">0.75</td>
-            <td>Double</td>
-            <td>Ratio of the tokens's expiration time when new credentials 
should be re-obtained.</td>
-        </tr>
     </tbody>
 </table>
diff --git a/docs/layouts/shortcodes/generated/security_configuration.html 
b/docs/layouts/shortcodes/generated/security_configuration.html
index 9b34482d61b..a7e479fe44a 100644
--- a/docs/layouts/shortcodes/generated/security_configuration.html
+++ b/docs/layouts/shortcodes/generated/security_configuration.html
@@ -14,18 +14,30 @@
             <td>List&lt;String&gt;</td>
             <td>List of factories that should be used to instantiate a 
security context. If multiple are configured, Flink will use the first 
compatible factory. You should have a NoOpSecurityContextFactory in this list 
as a fallback.</td>
         </tr>
+        <tr>
+            <td><h5>security.delegation.tokens.enabled</h5></td>
+            <td style="word-wrap: break-word;">true</td>
+            <td>Boolean</td>
+            <td>Indicates whether to start delegation tokens system for 
external services.</td>
+        </tr>
+        <tr>
+            <td><h5>security.delegation.tokens.renewal.retry.backoff</h5></td>
+            <td style="word-wrap: break-word;">1 h</td>
+            <td>Duration</td>
+            <td>The time period how long to wait before retrying to obtain new 
delegation tokens after a failure.</td>
+        </tr>
+        <tr>
+            <td><h5>security.delegation.tokens.renewal.time-ratio</h5></td>
+            <td style="word-wrap: break-word;">0.75</td>
+            <td>Double</td>
+            <td>Ratio of the tokens's expiration time when new credentials 
should be re-obtained.</td>
+        </tr>
         <tr>
             <td><h5>security.kerberos.access.hadoopFileSystems</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
             <td>List&lt;String&gt;</td>
             <td>A comma-separated list of Kerberos-secured Hadoop filesystems 
Flink is going to access. For example, 
security.kerberos.access.hadoopFileSystems=hdfs://namenode2:9002,hdfs://namenode3:9003.
 The JobManager needs to have access to these filesystems to retrieve the 
security tokens.</td>
         </tr>
-        <tr>
-            <td><h5>security.kerberos.fetch.delegation-token</h5></td>
-            <td style="word-wrap: break-word;">true</td>
-            <td>Boolean</td>
-            <td>Indicates whether to fetch the delegation tokens for external 
services the Flink job needs to contact. Only HDFS and HBase are supported. It 
is used in Yarn deployments. If true, Flink will fetch HDFS and HBase 
delegation tokens and inject them into Yarn AM containers. If false, Flink will 
assume that the delegation tokens are managed outside of Flink. As a 
consequence, it will not fetch delegation tokens for HDFS and HBase. You may 
need to disable this option, if you rel [...]
-        </tr>
         <tr>
             <td><h5>security.kerberos.krb5-conf.path</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
@@ -62,18 +74,6 @@
             <td>Duration</td>
             <td>The time period when keytab login happens automatically in 
order to always have a valid TGT.</td>
         </tr>
-        <tr>
-            <td><h5>security.kerberos.tokens.renewal.retry.backoff</h5></td>
-            <td style="word-wrap: break-word;">1 h</td>
-            <td>Duration</td>
-            <td>The time period how long to wait before retrying to obtain new 
delegation tokens after a failure.</td>
-        </tr>
-        <tr>
-            <td><h5>security.kerberos.tokens.renewal.time-ratio</h5></td>
-            <td style="word-wrap: break-word;">0.75</td>
-            <td>Double</td>
-            <td>Ratio of the tokens's expiration time when new credentials 
should be re-obtained.</td>
-        </tr>
         <tr>
             <td><h5>security.module.factory.classes</h5></td>
             <td style="word-wrap: 
break-word;">"org.apache.flink.runtime.security.modules.HadoopModuleFactory";<wbr>"org.apache.flink.runtime.security.modules.JaasModuleFactory";<wbr>"org.apache.flink.runtime.security.modules.ZookeeperModuleFactory"</td>
diff --git 
a/docs/layouts/shortcodes/generated/security_delegation_token_section.html 
b/docs/layouts/shortcodes/generated/security_delegation_token_section.html
new file mode 100644
index 00000000000..771aec31669
--- /dev/null
+++ b/docs/layouts/shortcodes/generated/security_delegation_token_section.html
@@ -0,0 +1,30 @@
+<table class="configuration table table-bordered">
+    <thead>
+        <tr>
+            <th class="text-left" style="width: 20%">Key</th>
+            <th class="text-left" style="width: 15%">Default</th>
+            <th class="text-left" style="width: 10%">Type</th>
+            <th class="text-left" style="width: 55%">Description</th>
+        </tr>
+    </thead>
+    <tbody>
+        <tr>
+            <td><h5>security.delegation.tokens.enabled</h5></td>
+            <td style="word-wrap: break-word;">true</td>
+            <td>Boolean</td>
+            <td>Indicates whether to start delegation tokens system for 
external services.</td>
+        </tr>
+        <tr>
+            <td><h5>security.delegation.tokens.renewal.retry.backoff</h5></td>
+            <td style="word-wrap: break-word;">1 h</td>
+            <td>Duration</td>
+            <td>The time period how long to wait before retrying to obtain new 
delegation tokens after a failure.</td>
+        </tr>
+        <tr>
+            <td><h5>security.delegation.tokens.renewal.time-ratio</h5></td>
+            <td style="word-wrap: break-word;">0.75</td>
+            <td>Double</td>
+            <td>Ratio of the tokens's expiration time when new credentials 
should be re-obtained.</td>
+        </tr>
+    </tbody>
+</table>
diff --git 
a/flink-annotations/src/main/java/org/apache/flink/annotation/docs/Documentation.java
 
b/flink-annotations/src/main/java/org/apache/flink/annotation/docs/Documentation.java
index 7bdb196e661..7bab129bcba 100644
--- 
a/flink-annotations/src/main/java/org/apache/flink/annotation/docs/Documentation.java
+++ 
b/flink-annotations/src/main/java/org/apache/flink/annotation/docs/Documentation.java
@@ -72,6 +72,7 @@ public final class Documentation {
 
         public static final String SECURITY_SSL = "security_ssl";
         public static final String SECURITY_AUTH_KERBEROS = 
"security_auth_kerberos";
+        public static final String SECURITY_DELEGATION_TOKEN = 
"security_delegation_token";
         public static final String SECURITY_AUTH_ZOOKEEPER = 
"security_auth_zk";
 
         public static final String STATE_BACKEND_ROCKSDB = 
"state_backend_rocksdb";
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java 
b/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
index 9205bd32e40..e90faaea21d 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
@@ -110,6 +110,8 @@ public class SecurityOptions {
                                     + " (for example, `Client,KafkaClient` to 
use the credentials for ZooKeeper authentication and for"
                                     + " Kafka authentication)");
 
+    /** @deprecated Use {@link #DELEGATION_TOKENS_ENABLED}. */
+    @Deprecated
     @Documentation.Section(Documentation.Sections.SECURITY_AUTH_KERBEROS)
     public static final ConfigOption<Boolean> KERBEROS_FETCH_DELEGATION_TOKEN =
             key("security.kerberos.fetch.delegation-token")
@@ -132,6 +134,8 @@ public class SecurityOptions {
                     .withDescription(
                             "The time period when keytab login happens 
automatically in order to always have a valid TGT.");
 
+    /** @deprecated Use {@link #DELEGATION_TOKENS_RENEWAL_RETRY_BACKOFF}. */
+    @Deprecated
     @Documentation.Section(Documentation.Sections.SECURITY_AUTH_KERBEROS)
     public static final ConfigOption<Duration> 
KERBEROS_TOKENS_RENEWAL_RETRY_BACKOFF =
             key("security.kerberos.tokens.renewal.retry.backoff")
@@ -140,6 +144,8 @@ public class SecurityOptions {
                     .withDescription(
                             "The time period how long to wait before retrying 
to obtain new delegation tokens after a failure.");
 
+    /** @deprecated Use {@link #DELEGATION_TOKENS_RENEWAL_TIME_RATIO}. */
+    @Deprecated
     @Documentation.Section(Documentation.Sections.SECURITY_AUTH_KERBEROS)
     public static final ConfigOption<Double> 
KERBEROS_TOKENS_RENEWAL_TIME_RATIO =
             key("security.kerberos.tokens.renewal.time-ratio")
@@ -160,6 +166,37 @@ public class SecurityOptions {
                                     + 
"security.kerberos.access.hadoopFileSystems=hdfs://namenode2:9002,hdfs://namenode3:9003.
 "
                                     + "The JobManager needs to have access to 
these filesystems to retrieve the security tokens.");
 
+    // ------------------------------------------------------------------------
+    //  Delegation Token Options
+    // ------------------------------------------------------------------------
+
+    @Documentation.Section(Documentation.Sections.SECURITY_DELEGATION_TOKEN)
+    public static final ConfigOption<Boolean> DELEGATION_TOKENS_ENABLED =
+            key("security.delegation.tokens.enabled")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDeprecatedKeys(KERBEROS_FETCH_DELEGATION_TOKEN.key())
+                    .withDescription(
+                            "Indicates whether to start delegation tokens 
system for external services.");
+
+    @Documentation.Section(Documentation.Sections.SECURITY_DELEGATION_TOKEN)
+    public static final ConfigOption<Duration> 
DELEGATION_TOKENS_RENEWAL_RETRY_BACKOFF =
+            key("security.delegation.tokens.renewal.retry.backoff")
+                    .durationType()
+                    .defaultValue(Duration.ofHours(1))
+                    
.withDeprecatedKeys(KERBEROS_TOKENS_RENEWAL_RETRY_BACKOFF.key())
+                    .withDescription(
+                            "The time period how long to wait before retrying 
to obtain new delegation tokens after a failure.");
+
+    @Documentation.Section(Documentation.Sections.SECURITY_DELEGATION_TOKEN)
+    public static final ConfigOption<Double> 
DELEGATION_TOKENS_RENEWAL_TIME_RATIO =
+            key("security.delegation.tokens.renewal.time-ratio")
+                    .doubleType()
+                    .defaultValue(0.75)
+                    
.withDeprecatedKeys(KERBEROS_TOKENS_RENEWAL_TIME_RATIO.key())
+                    .withDescription(
+                            "Ratio of the tokens's expiration time when new 
credentials should be re-obtained.");
+
     // ------------------------------------------------------------------------
     //  ZooKeeper Security Options
     // ------------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index 243dbe059a6..92c1a5ff79d 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -65,8 +65,8 @@ import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.security.SecurityConfiguration;
 import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.flink.runtime.security.contexts.SecurityContext;
+import 
org.apache.flink.runtime.security.token.DefaultDelegationTokenManagerFactory;
 import org.apache.flink.runtime.security.token.DelegationTokenManager;
-import 
org.apache.flink.runtime.security.token.hadoop.KerberosDelegationTokenManagerFactory;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
 import 
org.apache.flink.runtime.webmonitor.retriever.impl.RpcMetricQueryServiceRetriever;
 import org.apache.flink.util.AutoCloseableAsync;
@@ -389,11 +389,8 @@ public abstract class ClusterEntrypoint implements 
AutoCloseableAsync, FatalErro
             configuration.setString(BlobServerOptions.PORT, 
String.valueOf(blobServer.getPort()));
             heartbeatServices = createHeartbeatServices(configuration);
             delegationTokenManager =
-                    KerberosDelegationTokenManagerFactory.create(
-                            getClass().getClassLoader(),
-                            configuration,
-                            commonRpcService.getScheduledExecutor(),
-                            ioExecutor);
+                    DefaultDelegationTokenManagerFactory.create(
+                            configuration, 
commonRpcService.getScheduledExecutor(), ioExecutor);
             metricRegistry = createMetricRegistry(configuration, 
pluginManager, rpcSystem);
 
             final RpcService metricQueryServiceRpcService =
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index a4c2fd06968..98614e3ecc9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -89,8 +89,8 @@ import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.RpcSystem;
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
+import 
org.apache.flink.runtime.security.token.DefaultDelegationTokenManagerFactory;
 import org.apache.flink.runtime.security.token.DelegationTokenManager;
-import 
org.apache.flink.runtime.security.token.hadoop.KerberosDelegationTokenManagerFactory;
 import org.apache.flink.runtime.taskexecutor.TaskExecutor;
 import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
 import org.apache.flink.runtime.webmonitor.retriever.LeaderRetriever;
@@ -428,11 +428,8 @@ public class MiniCluster implements AutoCloseableAsync {
                 heartbeatServices = 
HeartbeatServices.fromConfiguration(configuration);
 
                 delegationTokenManager =
-                        KerberosDelegationTokenManagerFactory.create(
-                                getClass().getClassLoader(),
-                                configuration,
-                                commonRpcService.getScheduledExecutor(),
-                                ioExecutor);
+                        DefaultDelegationTokenManagerFactory.create(
+                                configuration, 
commonRpcService.getScheduledExecutor(), ioExecutor);
 
                 blobCacheService =
                         BlobUtils.createBlobCacheService(
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/KerberosDelegationTokenManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManager.java
similarity index 89%
rename from 
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/KerberosDelegationTokenManager.java
rename to 
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManager.java
index 98bab881b1f..1703d796dec 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/KerberosDelegationTokenManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManager.java
@@ -16,14 +16,12 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.security.token.hadoop;
+package org.apache.flink.runtime.security.token;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.security.token.DelegationTokenContainer;
-import org.apache.flink.runtime.security.token.DelegationTokenManager;
-import org.apache.flink.runtime.security.token.DelegationTokenProvider;
+import 
org.apache.flink.runtime.security.token.hadoop.HadoopDelegationTokenUpdater;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.concurrent.ScheduledExecutor;
@@ -44,8 +42,8 @@ import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Stream;
 
-import static 
org.apache.flink.configuration.SecurityOptions.KERBEROS_TOKENS_RENEWAL_RETRY_BACKOFF;
-import static 
org.apache.flink.configuration.SecurityOptions.KERBEROS_TOKENS_RENEWAL_TIME_RATIO;
+import static 
org.apache.flink.configuration.SecurityOptions.DELEGATION_TOKENS_RENEWAL_RETRY_BACKOFF;
+import static 
org.apache.flink.configuration.SecurityOptions.DELEGATION_TOKENS_RENEWAL_TIME_RATIO;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
 
@@ -53,14 +51,14 @@ import static 
org.apache.flink.util.Preconditions.checkState;
  * Manager for delegation tokens in a Flink cluster.
  *
  * <p>When delegation token renewal is enabled, this manager will make sure 
long-running apps can
- * run without interruption while accessing secured services. It periodically 
logs in to the KDC
- * with user-provided credentials, and contacts all the configured secure 
services to obtain
- * delegation tokens to be distributed to the rest of the application.
+ * run without interruption while accessing secured services. It periodically 
contacts all the
+ * configured secure services to obtain delegation tokens to be distributed to 
the rest of the
+ * application.
  */
 @Internal
-public class KerberosDelegationTokenManager implements DelegationTokenManager {
+public class DefaultDelegationTokenManager implements DelegationTokenManager {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(KerberosDelegationTokenManager.class);
+    private static final Logger LOG = 
LoggerFactory.getLogger(DefaultDelegationTokenManager.class);
 
     private final Configuration configuration;
 
@@ -82,14 +80,14 @@ public class KerberosDelegationTokenManager implements 
DelegationTokenManager {
 
     @Nullable private Listener listener;
 
-    public KerberosDelegationTokenManager(
+    public DefaultDelegationTokenManager(
             Configuration configuration,
             @Nullable ScheduledExecutor scheduledExecutor,
             @Nullable ExecutorService ioExecutor) {
         this.configuration = checkNotNull(configuration, "Flink configuration 
must not be null");
-        this.tokensRenewalTimeRatio = 
configuration.get(KERBEROS_TOKENS_RENEWAL_TIME_RATIO);
+        this.tokensRenewalTimeRatio = 
configuration.get(DELEGATION_TOKENS_RENEWAL_TIME_RATIO);
         this.renewalRetryBackoffPeriod =
-                
configuration.get(KERBEROS_TOKENS_RENEWAL_RETRY_BACKOFF).toMillis();
+                
configuration.get(DELEGATION_TOKENS_RENEWAL_RETRY_BACKOFF).toMillis();
         this.delegationTokenProviders = loadProviders();
         this.scheduledExecutor = scheduledExecutor;
         this.ioExecutor = ioExecutor;
@@ -120,13 +118,10 @@ public class KerberosDelegationTokenManager implements 
DelegationTokenManager {
                             provider.serviceName());
                 }
             } catch (Exception | NoClassDefFoundError e) {
-                LOG.error(
+                LOG.warn(
                         "Failed to initialize delegation token provider {}",
                         provider.serviceName(),
                         e);
-                if (!(e instanceof NoClassDefFoundError)) {
-                    throw new FlinkRuntimeException(e);
-                }
             }
         }
 
@@ -138,7 +133,7 @@ public class KerberosDelegationTokenManager implements 
DelegationTokenManager {
     @VisibleForTesting
     boolean isProviderEnabled(String serviceName) {
         return configuration.getBoolean(
-                String.format("security.kerberos.token.provider.%s.enabled", 
serviceName), true);
+                String.format("security.delegation.token.provider.%s.enabled", 
serviceName), true);
     }
 
     @VisibleForTesting
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/KerberosDelegationTokenManagerFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManagerFactory.java
similarity index 51%
rename from 
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/KerberosDelegationTokenManagerFactory.java
rename to 
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManagerFactory.java
index 3a812bfc769..025b535bd67 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/KerberosDelegationTokenManagerFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManagerFactory.java
@@ -16,14 +16,11 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.security.token.hadoop;
+package org.apache.flink.runtime.security.token;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.SecurityOptions;
-import org.apache.flink.runtime.hadoop.HadoopDependency;
-import org.apache.flink.runtime.security.token.DelegationTokenManager;
-import org.apache.flink.runtime.security.token.NoOpDelegationTokenManager;
 import org.apache.flink.util.concurrent.ScheduledExecutor;
 
 import org.slf4j.Logger;
@@ -34,37 +31,21 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.concurrent.ExecutorService;
 
-/** A factory for {@link KerberosDelegationTokenManager}. */
+/** A factory for {@link DefaultDelegationTokenManager}. */
 @Internal
-public class KerberosDelegationTokenManagerFactory {
+public class DefaultDelegationTokenManagerFactory {
 
     private static final Logger LOG =
-            
LoggerFactory.getLogger(KerberosDelegationTokenManagerFactory.class);
+            
LoggerFactory.getLogger(DefaultDelegationTokenManagerFactory.class);
 
     public static DelegationTokenManager create(
-            ClassLoader classLoader,
             Configuration configuration,
             @Nullable ScheduledExecutor scheduledExecutor,
             @Nullable ExecutorService ioExecutor)
             throws IOException {
 
-        if 
(configuration.getBoolean(SecurityOptions.KERBEROS_FETCH_DELEGATION_TOKEN)) {
-            if (HadoopDependency.isHadoopCommonOnClasspath(classLoader)) {
-                KerberosLoginProvider kerberosLoginProvider =
-                        new KerberosLoginProvider(configuration);
-                if (kerberosLoginProvider.isLoginPossible()) {
-                    return new KerberosDelegationTokenManager(
-                            configuration, scheduledExecutor, ioExecutor);
-                } else {
-                    LOG.info(
-                            "Cannot use kerberos delegation token manager no 
valid kerberos credentials provided.");
-                    return new NoOpDelegationTokenManager();
-                }
-            } else {
-                LOG.info(
-                        "Cannot use kerberos delegation token manager because 
Hadoop cannot be found in the Classpath.");
-                return new NoOpDelegationTokenManager();
-            }
+        if 
(configuration.getBoolean(SecurityOptions.DELEGATION_TOKENS_ENABLED)) {
+            return new DefaultDelegationTokenManager(configuration, 
scheduledExecutor, ioExecutor);
         } else {
             return new NoOpDelegationTokenManager();
         }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManagerTest.java
new file mode 100644
index 00000000000..8092f330a99
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManagerTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security.token;
+
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.core.testutils.ManuallyTriggeredScheduledExecutorService;
+import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.time.Clock;
+import java.time.ZoneId;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static java.time.Instant.ofEpochMilli;
+import static 
org.apache.flink.configuration.SecurityOptions.DELEGATION_TOKENS_RENEWAL_TIME_RATIO;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** Test for {@link DelegationTokenManager}. */
+public class DefaultDelegationTokenManagerTest {
+
+    @BeforeEach
+    public void beforeEach() {
+        ExceptionThrowingDelegationTokenProvider.reset();
+    }
+
+    @AfterAll
+    public static void afterAll() {
+        ExceptionThrowingDelegationTokenProvider.reset();
+    }
+
+    @Test
+    public void isProviderEnabledMustGiveBackTrueByDefault() {
+        Configuration configuration = new Configuration();
+        DefaultDelegationTokenManager delegationTokenManager =
+                new DefaultDelegationTokenManager(configuration, null, null);
+
+        assertTrue(delegationTokenManager.isProviderEnabled("test"));
+    }
+
+    @Test
+    public void isProviderEnabledMustGiveBackFalseWhenDisabled() {
+        Configuration configuration = new Configuration();
+        
configuration.setBoolean("security.delegation.token.provider.test.enabled", 
false);
+        DefaultDelegationTokenManager delegationTokenManager =
+                new DefaultDelegationTokenManager(configuration, null, null);
+
+        assertFalse(delegationTokenManager.isProviderEnabled("test"));
+    }
+
+    @Test
+    public void configurationIsNullMustFailFast() {
+        assertThrows(Exception.class, () -> new 
DefaultDelegationTokenManager(null, null, null));
+    }
+
+    @Test
+    public void testAllProvidersLoaded() {
+        Configuration configuration = new Configuration();
+        
configuration.setBoolean("security.delegation.token.provider.throw.enabled", 
false);
+        DefaultDelegationTokenManager delegationTokenManager =
+                new DefaultDelegationTokenManager(configuration, null, null);
+
+        assertEquals(3, 
delegationTokenManager.delegationTokenProviders.size());
+        assertTrue(delegationTokenManager.isProviderLoaded("hadoopfs"));
+        assertTrue(delegationTokenManager.isProviderLoaded("hbase"));
+        assertTrue(delegationTokenManager.isProviderLoaded("test"));
+        assertTrue(ExceptionThrowingDelegationTokenProvider.constructed);
+        assertFalse(delegationTokenManager.isProviderLoaded("throw"));
+    }
+
+    @Test
+    public void startTokensUpdateShouldScheduleRenewal() {
+        final ManuallyTriggeredScheduledExecutor scheduledExecutor =
+                new ManuallyTriggeredScheduledExecutor();
+        final ManuallyTriggeredScheduledExecutorService scheduler =
+                new ManuallyTriggeredScheduledExecutorService();
+
+        ExceptionThrowingDelegationTokenProvider.addToken = true;
+        Configuration configuration = new Configuration();
+        
configuration.setBoolean("security.delegation.token.provider.throw.enabled", 
true);
+        AtomicInteger startTokensUpdateCallCount = new AtomicInteger(0);
+        DefaultDelegationTokenManager delegationTokenManager =
+                new DefaultDelegationTokenManager(configuration, 
scheduledExecutor, scheduler) {
+                    @Override
+                    void startTokensUpdate() {
+                        startTokensUpdateCallCount.incrementAndGet();
+                        super.startTokensUpdate();
+                    }
+                };
+
+        delegationTokenManager.startTokensUpdate();
+        ExceptionThrowingDelegationTokenProvider.throwInUsage = true;
+        scheduledExecutor.triggerScheduledTasks();
+        scheduler.triggerAll();
+        ExceptionThrowingDelegationTokenProvider.throwInUsage = false;
+        scheduledExecutor.triggerScheduledTasks();
+        scheduler.triggerAll();
+        delegationTokenManager.stopTokensUpdate();
+
+        assertEquals(3, startTokensUpdateCallCount.get());
+    }
+
+    @Test
+    public void calculateRenewalDelayShouldConsiderRenewalRatio() {
+        Configuration configuration = new Configuration();
+        
configuration.setBoolean("security.delegation.token.provider.throw.enabled", 
false);
+        configuration.set(DELEGATION_TOKENS_RENEWAL_TIME_RATIO, 0.5);
+        DefaultDelegationTokenManager delegationTokenManager =
+                new DefaultDelegationTokenManager(configuration, null, null);
+
+        Clock constantClock = Clock.fixed(ofEpochMilli(100), 
ZoneId.systemDefault());
+        assertEquals(50, 
delegationTokenManager.calculateRenewalDelay(constantClock, 200));
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/KerberosDelegationTokenManagerITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/KerberosDelegationTokenManagerITCase.java
deleted file mode 100644
index 45b04eeeff0..00000000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/KerberosDelegationTokenManagerITCase.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.security.token.hadoop;
-
-import org.apache.flink.configuration.Configuration;
-import 
org.apache.flink.core.testutils.ManuallyTriggeredScheduledExecutorService;
-import org.apache.flink.runtime.security.token.DelegationTokenManager;
-import 
org.apache.flink.runtime.security.token.ExceptionThrowingDelegationTokenProvider;
-import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
-
-import org.apache.hadoop.security.UserGroupInformation;
-import org.junit.jupiter.api.Test;
-import org.mockito.MockedStatic;
-
-import java.time.Clock;
-import java.time.ZoneId;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static java.time.Instant.ofEpochMilli;
-import static 
org.apache.flink.configuration.SecurityOptions.KERBEROS_TOKENS_RENEWAL_TIME_RATIO;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.mockStatic;
-
-/**
- * Test for {@link DelegationTokenManager}.
- *
- * <p>This class is an ITCase because the mocking breaks the {@link 
UserGroupInformation} class for
- * other tests.
- */
-public class KerberosDelegationTokenManagerITCase {
-
-    @Test
-    public void isProviderEnabledMustGiveBackTrueByDefault() {
-        ExceptionThrowingDelegationTokenProvider.reset();
-        Configuration configuration = new Configuration();
-        KerberosDelegationTokenManager delegationTokenManager =
-                new KerberosDelegationTokenManager(configuration, null, null);
-
-        assertTrue(delegationTokenManager.isProviderEnabled("test"));
-    }
-
-    @Test
-    public void isProviderEnabledMustGiveBackFalseWhenDisabled() {
-        ExceptionThrowingDelegationTokenProvider.reset();
-        Configuration configuration = new Configuration();
-        
configuration.setBoolean("security.kerberos.token.provider.test.enabled", 
false);
-        KerberosDelegationTokenManager delegationTokenManager =
-                new KerberosDelegationTokenManager(configuration, null, null);
-
-        assertFalse(delegationTokenManager.isProviderEnabled("test"));
-    }
-
-    @Test
-    public void configurationIsNullMustFailFast() {
-        assertThrows(Exception.class, () -> new 
KerberosDelegationTokenManager(null, null, null));
-    }
-
-    @Test
-    public void oneProviderThrowsExceptionMustFailFast() {
-        assertThrows(
-                Exception.class,
-                () -> {
-                    try {
-                        ExceptionThrowingDelegationTokenProvider.reset();
-                        ExceptionThrowingDelegationTokenProvider.throwInInit = 
true;
-                        new KerberosDelegationTokenManager(new 
Configuration(), null, null);
-                    } finally {
-                        ExceptionThrowingDelegationTokenProvider.reset();
-                    }
-                });
-    }
-
-    @Test
-    public void testAllProvidersLoaded() {
-        ExceptionThrowingDelegationTokenProvider.reset();
-        Configuration configuration = new Configuration();
-        
configuration.setBoolean("security.kerberos.token.provider.throw.enabled", 
false);
-        KerberosDelegationTokenManager delegationTokenManager =
-                new KerberosDelegationTokenManager(configuration, null, null);
-
-        assertEquals(3, 
delegationTokenManager.delegationTokenProviders.size());
-        assertTrue(delegationTokenManager.isProviderLoaded("hadoopfs"));
-        assertTrue(delegationTokenManager.isProviderLoaded("hbase"));
-        assertTrue(delegationTokenManager.isProviderLoaded("test"));
-        assertTrue(ExceptionThrowingDelegationTokenProvider.constructed);
-        assertFalse(delegationTokenManager.isProviderLoaded("throw"));
-    }
-
-    @Test
-    public void startTokensUpdateShouldScheduleRenewal() {
-        final ManuallyTriggeredScheduledExecutor scheduledExecutor =
-                new ManuallyTriggeredScheduledExecutor();
-        final ManuallyTriggeredScheduledExecutorService scheduler =
-                new ManuallyTriggeredScheduledExecutorService();
-
-        try (MockedStatic<UserGroupInformation> ugi = 
mockStatic(UserGroupInformation.class)) {
-            UserGroupInformation userGroupInformation = 
mock(UserGroupInformation.class);
-            
ugi.when(UserGroupInformation::getCurrentUser).thenReturn(userGroupInformation);
-
-            ExceptionThrowingDelegationTokenProvider.reset();
-            ExceptionThrowingDelegationTokenProvider.addToken = true;
-            Configuration configuration = new Configuration();
-            
configuration.setBoolean("security.kerberos.token.provider.throw.enabled", 
true);
-            AtomicInteger startTokensUpdateCallCount = new AtomicInteger(0);
-            KerberosDelegationTokenManager delegationTokenManager =
-                    new KerberosDelegationTokenManager(
-                            configuration, scheduledExecutor, scheduler) {
-                        @Override
-                        void startTokensUpdate() {
-                            startTokensUpdateCallCount.incrementAndGet();
-                            super.startTokensUpdate();
-                        }
-                    };
-
-            delegationTokenManager.startTokensUpdate();
-            ExceptionThrowingDelegationTokenProvider.throwInUsage = true;
-            scheduledExecutor.triggerScheduledTasks();
-            scheduler.triggerAll();
-            ExceptionThrowingDelegationTokenProvider.throwInUsage = false;
-            scheduledExecutor.triggerScheduledTasks();
-            scheduler.triggerAll();
-            delegationTokenManager.stopTokensUpdate();
-
-            assertEquals(3, startTokensUpdateCallCount.get());
-        }
-    }
-
-    @Test
-    public void calculateRenewalDelayShouldConsiderRenewalRatio() {
-        ExceptionThrowingDelegationTokenProvider.reset();
-        Configuration configuration = new Configuration();
-        
configuration.setBoolean("security.kerberos.token.provider.throw.enabled", 
false);
-        configuration.set(KERBEROS_TOKENS_RENEWAL_TIME_RATIO, 0.5);
-        KerberosDelegationTokenManager delegationTokenManager =
-                new KerberosDelegationTokenManager(configuration, null, null);
-
-        Clock constantClock = Clock.fixed(ofEpochMilli(100), 
ZoneId.systemDefault());
-        assertEquals(50, 
delegationTokenManager.calculateRenewalDelay(constantClock, 200));
-    }
-}
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
index 7cd065cf206..f692a501d41 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
@@ -52,10 +52,10 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.jobmanager.JobManagerProcessSpec;
 import org.apache.flink.runtime.jobmanager.JobManagerProcessUtils;
+import org.apache.flink.runtime.security.token.DefaultDelegationTokenManager;
 import org.apache.flink.runtime.security.token.DelegationTokenContainer;
 import org.apache.flink.runtime.security.token.DelegationTokenManager;
 import 
org.apache.flink.runtime.security.token.hadoop.HadoopDelegationTokenConverter;
-import 
org.apache.flink.runtime.security.token.hadoop.KerberosDelegationTokenManager;
 import org.apache.flink.runtime.security.token.hadoop.KerberosLoginProvider;
 import org.apache.flink.runtime.util.HadoopUtils;
 import org.apache.flink.util.CollectionUtil;
@@ -1296,7 +1296,7 @@ public class YarnClusterDescriptor implements 
ClusterDescriptor<ApplicationId> {
         LOG.info("Adding delegation tokens to the AM container.");
 
         DelegationTokenManager delegationTokenManager =
-                new KerberosDelegationTokenManager(flinkConfiguration, null, 
null);
+                new DefaultDelegationTokenManager(flinkConfiguration, null, 
null);
         DelegationTokenContainer container = new DelegationTokenContainer();
         delegationTokenManager.obtainDelegationTokens(container);
 


Reply via email to