This is an automated email from the ASF dual-hosted git repository. mbalassi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 37ad3434b887023ad81df0a013d434ca17f04e89 Author: Gabor Somogyi <[email protected]> AuthorDate: Wed Dec 14 18:16:27 2022 +0100 [FLINK-30339][runtime][security] Add a unified delegation token manager --- docs/content.zh/docs/deployment/config.md | 5 + .../generated/security_auth_kerberos_section.html | 18 --- .../generated/security_configuration.html | 36 ++--- .../security_delegation_token_section.html | 30 ++++ .../flink/annotation/docs/Documentation.java | 1 + .../flink/configuration/SecurityOptions.java | 37 +++++ .../runtime/entrypoint/ClusterEntrypoint.java | 9 +- .../flink/runtime/minicluster/MiniCluster.java | 9 +- ...ger.java => DefaultDelegationTokenManager.java} | 33 ++--- ...a => DefaultDelegationTokenManagerFactory.java} | 31 +--- .../token/DefaultDelegationTokenManagerTest.java | 135 +++++++++++++++++ .../KerberosDelegationTokenManagerITCase.java | 160 --------------------- .../apache/flink/yarn/YarnClusterDescriptor.java | 4 +- 13 files changed, 254 insertions(+), 254 deletions(-) diff --git a/docs/content.zh/docs/deployment/config.md b/docs/content.zh/docs/deployment/config.md index cb05cd9e358..71a6237f11b 100644 --- a/docs/content.zh/docs/deployment/config.md +++ b/docs/content.zh/docs/deployment/config.md @@ -204,6 +204,11 @@ Flink's network connections can be secured via SSL. Please refer to the [SSL Set {{< generated/security_ssl_section >}} +### Delegation token + +Flink has a pluggable authentication protocol agnostic delegation token framework. + +{{< generated/security_delegation_token_section >}} ### Auth with External Systems diff --git a/docs/layouts/shortcodes/generated/security_auth_kerberos_section.html b/docs/layouts/shortcodes/generated/security_auth_kerberos_section.html index 0d97daba7f2..ad063064b49 100644 --- a/docs/layouts/shortcodes/generated/security_auth_kerberos_section.html +++ b/docs/layouts/shortcodes/generated/security_auth_kerberos_section.html @@ -14,12 +14,6 @@ <td>List<String></td> <td>A comma-separated list of Kerberos-secured Hadoop filesystems Flink is going to access. For example, security.kerberos.access.hadoopFileSystems=hdfs://namenode2:9002,hdfs://namenode3:9003. The JobManager needs to have access to these filesystems to retrieve the security tokens.</td> </tr> - <tr> - <td><h5>security.kerberos.fetch.delegation-token</h5></td> - <td style="word-wrap: break-word;">true</td> - <td>Boolean</td> - <td>Indicates whether to fetch the delegation tokens for external services the Flink job needs to contact. Only HDFS and HBase are supported. It is used in Yarn deployments. If true, Flink will fetch HDFS and HBase delegation tokens and inject them into Yarn AM containers. If false, Flink will assume that the delegation tokens are managed outside of Flink. As a consequence, it will not fetch delegation tokens for HDFS and HBase. You may need to disable this option, if you rel [...] - </tr> <tr> <td><h5>security.kerberos.login.contexts</h5></td> <td style="word-wrap: break-word;">(none)</td> @@ -50,17 +44,5 @@ <td>Duration</td> <td>The time period when keytab login happens automatically in order to always have a valid TGT.</td> </tr> - <tr> - <td><h5>security.kerberos.tokens.renewal.retry.backoff</h5></td> - <td style="word-wrap: break-word;">1 h</td> - <td>Duration</td> - <td>The time period how long to wait before retrying to obtain new delegation tokens after a failure.</td> - </tr> - <tr> - <td><h5>security.kerberos.tokens.renewal.time-ratio</h5></td> - <td style="word-wrap: break-word;">0.75</td> - <td>Double</td> - <td>Ratio of the tokens's expiration time when new credentials should be re-obtained.</td> - </tr> </tbody> </table> diff --git a/docs/layouts/shortcodes/generated/security_configuration.html b/docs/layouts/shortcodes/generated/security_configuration.html index 9b34482d61b..a7e479fe44a 100644 --- a/docs/layouts/shortcodes/generated/security_configuration.html +++ b/docs/layouts/shortcodes/generated/security_configuration.html @@ -14,18 +14,30 @@ <td>List<String></td> <td>List of factories that should be used to instantiate a security context. If multiple are configured, Flink will use the first compatible factory. You should have a NoOpSecurityContextFactory in this list as a fallback.</td> </tr> + <tr> + <td><h5>security.delegation.tokens.enabled</h5></td> + <td style="word-wrap: break-word;">true</td> + <td>Boolean</td> + <td>Indicates whether to start delegation tokens system for external services.</td> + </tr> + <tr> + <td><h5>security.delegation.tokens.renewal.retry.backoff</h5></td> + <td style="word-wrap: break-word;">1 h</td> + <td>Duration</td> + <td>The time period how long to wait before retrying to obtain new delegation tokens after a failure.</td> + </tr> + <tr> + <td><h5>security.delegation.tokens.renewal.time-ratio</h5></td> + <td style="word-wrap: break-word;">0.75</td> + <td>Double</td> + <td>Ratio of the tokens's expiration time when new credentials should be re-obtained.</td> + </tr> <tr> <td><h5>security.kerberos.access.hadoopFileSystems</h5></td> <td style="word-wrap: break-word;">(none)</td> <td>List<String></td> <td>A comma-separated list of Kerberos-secured Hadoop filesystems Flink is going to access. For example, security.kerberos.access.hadoopFileSystems=hdfs://namenode2:9002,hdfs://namenode3:9003. The JobManager needs to have access to these filesystems to retrieve the security tokens.</td> </tr> - <tr> - <td><h5>security.kerberos.fetch.delegation-token</h5></td> - <td style="word-wrap: break-word;">true</td> - <td>Boolean</td> - <td>Indicates whether to fetch the delegation tokens for external services the Flink job needs to contact. Only HDFS and HBase are supported. It is used in Yarn deployments. If true, Flink will fetch HDFS and HBase delegation tokens and inject them into Yarn AM containers. If false, Flink will assume that the delegation tokens are managed outside of Flink. As a consequence, it will not fetch delegation tokens for HDFS and HBase. You may need to disable this option, if you rel [...] - </tr> <tr> <td><h5>security.kerberos.krb5-conf.path</h5></td> <td style="word-wrap: break-word;">(none)</td> @@ -62,18 +74,6 @@ <td>Duration</td> <td>The time period when keytab login happens automatically in order to always have a valid TGT.</td> </tr> - <tr> - <td><h5>security.kerberos.tokens.renewal.retry.backoff</h5></td> - <td style="word-wrap: break-word;">1 h</td> - <td>Duration</td> - <td>The time period how long to wait before retrying to obtain new delegation tokens after a failure.</td> - </tr> - <tr> - <td><h5>security.kerberos.tokens.renewal.time-ratio</h5></td> - <td style="word-wrap: break-word;">0.75</td> - <td>Double</td> - <td>Ratio of the tokens's expiration time when new credentials should be re-obtained.</td> - </tr> <tr> <td><h5>security.module.factory.classes</h5></td> <td style="word-wrap: break-word;">"org.apache.flink.runtime.security.modules.HadoopModuleFactory";<wbr>"org.apache.flink.runtime.security.modules.JaasModuleFactory";<wbr>"org.apache.flink.runtime.security.modules.ZookeeperModuleFactory"</td> diff --git a/docs/layouts/shortcodes/generated/security_delegation_token_section.html b/docs/layouts/shortcodes/generated/security_delegation_token_section.html new file mode 100644 index 00000000000..771aec31669 --- /dev/null +++ b/docs/layouts/shortcodes/generated/security_delegation_token_section.html @@ -0,0 +1,30 @@ +<table class="configuration table table-bordered"> + <thead> + <tr> + <th class="text-left" style="width: 20%">Key</th> + <th class="text-left" style="width: 15%">Default</th> + <th class="text-left" style="width: 10%">Type</th> + <th class="text-left" style="width: 55%">Description</th> + </tr> + </thead> + <tbody> + <tr> + <td><h5>security.delegation.tokens.enabled</h5></td> + <td style="word-wrap: break-word;">true</td> + <td>Boolean</td> + <td>Indicates whether to start delegation tokens system for external services.</td> + </tr> + <tr> + <td><h5>security.delegation.tokens.renewal.retry.backoff</h5></td> + <td style="word-wrap: break-word;">1 h</td> + <td>Duration</td> + <td>The time period how long to wait before retrying to obtain new delegation tokens after a failure.</td> + </tr> + <tr> + <td><h5>security.delegation.tokens.renewal.time-ratio</h5></td> + <td style="word-wrap: break-word;">0.75</td> + <td>Double</td> + <td>Ratio of the tokens's expiration time when new credentials should be re-obtained.</td> + </tr> + </tbody> +</table> diff --git a/flink-annotations/src/main/java/org/apache/flink/annotation/docs/Documentation.java b/flink-annotations/src/main/java/org/apache/flink/annotation/docs/Documentation.java index 7bdb196e661..7bab129bcba 100644 --- a/flink-annotations/src/main/java/org/apache/flink/annotation/docs/Documentation.java +++ b/flink-annotations/src/main/java/org/apache/flink/annotation/docs/Documentation.java @@ -72,6 +72,7 @@ public final class Documentation { public static final String SECURITY_SSL = "security_ssl"; public static final String SECURITY_AUTH_KERBEROS = "security_auth_kerberos"; + public static final String SECURITY_DELEGATION_TOKEN = "security_delegation_token"; public static final String SECURITY_AUTH_ZOOKEEPER = "security_auth_zk"; public static final String STATE_BACKEND_ROCKSDB = "state_backend_rocksdb"; diff --git a/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java index 9205bd32e40..e90faaea21d 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java @@ -110,6 +110,8 @@ public class SecurityOptions { + " (for example, `Client,KafkaClient` to use the credentials for ZooKeeper authentication and for" + " Kafka authentication)"); + /** @deprecated Use {@link #DELEGATION_TOKENS_ENABLED}. */ + @Deprecated @Documentation.Section(Documentation.Sections.SECURITY_AUTH_KERBEROS) public static final ConfigOption<Boolean> KERBEROS_FETCH_DELEGATION_TOKEN = key("security.kerberos.fetch.delegation-token") @@ -132,6 +134,8 @@ public class SecurityOptions { .withDescription( "The time period when keytab login happens automatically in order to always have a valid TGT."); + /** @deprecated Use {@link #DELEGATION_TOKENS_RENEWAL_RETRY_BACKOFF}. */ + @Deprecated @Documentation.Section(Documentation.Sections.SECURITY_AUTH_KERBEROS) public static final ConfigOption<Duration> KERBEROS_TOKENS_RENEWAL_RETRY_BACKOFF = key("security.kerberos.tokens.renewal.retry.backoff") @@ -140,6 +144,8 @@ public class SecurityOptions { .withDescription( "The time period how long to wait before retrying to obtain new delegation tokens after a failure."); + /** @deprecated Use {@link #DELEGATION_TOKENS_RENEWAL_TIME_RATIO}. */ + @Deprecated @Documentation.Section(Documentation.Sections.SECURITY_AUTH_KERBEROS) public static final ConfigOption<Double> KERBEROS_TOKENS_RENEWAL_TIME_RATIO = key("security.kerberos.tokens.renewal.time-ratio") @@ -160,6 +166,37 @@ public class SecurityOptions { + "security.kerberos.access.hadoopFileSystems=hdfs://namenode2:9002,hdfs://namenode3:9003. " + "The JobManager needs to have access to these filesystems to retrieve the security tokens."); + // ------------------------------------------------------------------------ + // Delegation Token Options + // ------------------------------------------------------------------------ + + @Documentation.Section(Documentation.Sections.SECURITY_DELEGATION_TOKEN) + public static final ConfigOption<Boolean> DELEGATION_TOKENS_ENABLED = + key("security.delegation.tokens.enabled") + .booleanType() + .defaultValue(true) + .withDeprecatedKeys(KERBEROS_FETCH_DELEGATION_TOKEN.key()) + .withDescription( + "Indicates whether to start delegation tokens system for external services."); + + @Documentation.Section(Documentation.Sections.SECURITY_DELEGATION_TOKEN) + public static final ConfigOption<Duration> DELEGATION_TOKENS_RENEWAL_RETRY_BACKOFF = + key("security.delegation.tokens.renewal.retry.backoff") + .durationType() + .defaultValue(Duration.ofHours(1)) + .withDeprecatedKeys(KERBEROS_TOKENS_RENEWAL_RETRY_BACKOFF.key()) + .withDescription( + "The time period how long to wait before retrying to obtain new delegation tokens after a failure."); + + @Documentation.Section(Documentation.Sections.SECURITY_DELEGATION_TOKEN) + public static final ConfigOption<Double> DELEGATION_TOKENS_RENEWAL_TIME_RATIO = + key("security.delegation.tokens.renewal.time-ratio") + .doubleType() + .defaultValue(0.75) + .withDeprecatedKeys(KERBEROS_TOKENS_RENEWAL_TIME_RATIO.key()) + .withDescription( + "Ratio of the tokens's expiration time when new credentials should be re-obtained."); + // ------------------------------------------------------------------------ // ZooKeeper Security Options // ------------------------------------------------------------------------ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java index 243dbe059a6..92c1a5ff79d 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java @@ -65,8 +65,8 @@ import org.apache.flink.runtime.rpc.RpcUtils; import org.apache.flink.runtime.security.SecurityConfiguration; import org.apache.flink.runtime.security.SecurityUtils; import org.apache.flink.runtime.security.contexts.SecurityContext; +import org.apache.flink.runtime.security.token.DefaultDelegationTokenManagerFactory; import org.apache.flink.runtime.security.token.DelegationTokenManager; -import org.apache.flink.runtime.security.token.hadoop.KerberosDelegationTokenManagerFactory; import org.apache.flink.runtime.util.ZooKeeperUtils; import org.apache.flink.runtime.webmonitor.retriever.impl.RpcMetricQueryServiceRetriever; import org.apache.flink.util.AutoCloseableAsync; @@ -389,11 +389,8 @@ public abstract class ClusterEntrypoint implements AutoCloseableAsync, FatalErro configuration.setString(BlobServerOptions.PORT, String.valueOf(blobServer.getPort())); heartbeatServices = createHeartbeatServices(configuration); delegationTokenManager = - KerberosDelegationTokenManagerFactory.create( - getClass().getClassLoader(), - configuration, - commonRpcService.getScheduledExecutor(), - ioExecutor); + DefaultDelegationTokenManagerFactory.create( + configuration, commonRpcService.getScheduledExecutor(), ioExecutor); metricRegistry = createMetricRegistry(configuration, pluginManager, rpcSystem); final RpcService metricQueryServiceRpcService = diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java index a4c2fd06968..98614e3ecc9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java @@ -89,8 +89,8 @@ import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.RpcSystem; import org.apache.flink.runtime.rpc.RpcUtils; import org.apache.flink.runtime.scheduler.ExecutionGraphInfo; +import org.apache.flink.runtime.security.token.DefaultDelegationTokenManagerFactory; import org.apache.flink.runtime.security.token.DelegationTokenManager; -import org.apache.flink.runtime.security.token.hadoop.KerberosDelegationTokenManagerFactory; import org.apache.flink.runtime.taskexecutor.TaskExecutor; import org.apache.flink.runtime.taskexecutor.TaskManagerRunner; import org.apache.flink.runtime.webmonitor.retriever.LeaderRetriever; @@ -428,11 +428,8 @@ public class MiniCluster implements AutoCloseableAsync { heartbeatServices = HeartbeatServices.fromConfiguration(configuration); delegationTokenManager = - KerberosDelegationTokenManagerFactory.create( - getClass().getClassLoader(), - configuration, - commonRpcService.getScheduledExecutor(), - ioExecutor); + DefaultDelegationTokenManagerFactory.create( + configuration, commonRpcService.getScheduledExecutor(), ioExecutor); blobCacheService = BlobUtils.createBlobCacheService( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/KerberosDelegationTokenManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManager.java similarity index 89% rename from flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/KerberosDelegationTokenManager.java rename to flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManager.java index 98bab881b1f..1703d796dec 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/KerberosDelegationTokenManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManager.java @@ -16,14 +16,12 @@ * limitations under the License. */ -package org.apache.flink.runtime.security.token.hadoop; +package org.apache.flink.runtime.security.token; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.security.token.DelegationTokenContainer; -import org.apache.flink.runtime.security.token.DelegationTokenManager; -import org.apache.flink.runtime.security.token.DelegationTokenProvider; +import org.apache.flink.runtime.security.token.hadoop.HadoopDelegationTokenUpdater; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.concurrent.ScheduledExecutor; @@ -44,8 +42,8 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.stream.Stream; -import static org.apache.flink.configuration.SecurityOptions.KERBEROS_TOKENS_RENEWAL_RETRY_BACKOFF; -import static org.apache.flink.configuration.SecurityOptions.KERBEROS_TOKENS_RENEWAL_TIME_RATIO; +import static org.apache.flink.configuration.SecurityOptions.DELEGATION_TOKENS_RENEWAL_RETRY_BACKOFF; +import static org.apache.flink.configuration.SecurityOptions.DELEGATION_TOKENS_RENEWAL_TIME_RATIO; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkState; @@ -53,14 +51,14 @@ import static org.apache.flink.util.Preconditions.checkState; * Manager for delegation tokens in a Flink cluster. * * <p>When delegation token renewal is enabled, this manager will make sure long-running apps can - * run without interruption while accessing secured services. It periodically logs in to the KDC - * with user-provided credentials, and contacts all the configured secure services to obtain - * delegation tokens to be distributed to the rest of the application. + * run without interruption while accessing secured services. It periodically contacts all the + * configured secure services to obtain delegation tokens to be distributed to the rest of the + * application. */ @Internal -public class KerberosDelegationTokenManager implements DelegationTokenManager { +public class DefaultDelegationTokenManager implements DelegationTokenManager { - private static final Logger LOG = LoggerFactory.getLogger(KerberosDelegationTokenManager.class); + private static final Logger LOG = LoggerFactory.getLogger(DefaultDelegationTokenManager.class); private final Configuration configuration; @@ -82,14 +80,14 @@ public class KerberosDelegationTokenManager implements DelegationTokenManager { @Nullable private Listener listener; - public KerberosDelegationTokenManager( + public DefaultDelegationTokenManager( Configuration configuration, @Nullable ScheduledExecutor scheduledExecutor, @Nullable ExecutorService ioExecutor) { this.configuration = checkNotNull(configuration, "Flink configuration must not be null"); - this.tokensRenewalTimeRatio = configuration.get(KERBEROS_TOKENS_RENEWAL_TIME_RATIO); + this.tokensRenewalTimeRatio = configuration.get(DELEGATION_TOKENS_RENEWAL_TIME_RATIO); this.renewalRetryBackoffPeriod = - configuration.get(KERBEROS_TOKENS_RENEWAL_RETRY_BACKOFF).toMillis(); + configuration.get(DELEGATION_TOKENS_RENEWAL_RETRY_BACKOFF).toMillis(); this.delegationTokenProviders = loadProviders(); this.scheduledExecutor = scheduledExecutor; this.ioExecutor = ioExecutor; @@ -120,13 +118,10 @@ public class KerberosDelegationTokenManager implements DelegationTokenManager { provider.serviceName()); } } catch (Exception | NoClassDefFoundError e) { - LOG.error( + LOG.warn( "Failed to initialize delegation token provider {}", provider.serviceName(), e); - if (!(e instanceof NoClassDefFoundError)) { - throw new FlinkRuntimeException(e); - } } } @@ -138,7 +133,7 @@ public class KerberosDelegationTokenManager implements DelegationTokenManager { @VisibleForTesting boolean isProviderEnabled(String serviceName) { return configuration.getBoolean( - String.format("security.kerberos.token.provider.%s.enabled", serviceName), true); + String.format("security.delegation.token.provider.%s.enabled", serviceName), true); } @VisibleForTesting diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/KerberosDelegationTokenManagerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManagerFactory.java similarity index 51% rename from flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/KerberosDelegationTokenManagerFactory.java rename to flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManagerFactory.java index 3a812bfc769..025b535bd67 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/KerberosDelegationTokenManagerFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManagerFactory.java @@ -16,14 +16,11 @@ * limitations under the License. */ -package org.apache.flink.runtime.security.token.hadoop; +package org.apache.flink.runtime.security.token; import org.apache.flink.annotation.Internal; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.SecurityOptions; -import org.apache.flink.runtime.hadoop.HadoopDependency; -import org.apache.flink.runtime.security.token.DelegationTokenManager; -import org.apache.flink.runtime.security.token.NoOpDelegationTokenManager; import org.apache.flink.util.concurrent.ScheduledExecutor; import org.slf4j.Logger; @@ -34,37 +31,21 @@ import javax.annotation.Nullable; import java.io.IOException; import java.util.concurrent.ExecutorService; -/** A factory for {@link KerberosDelegationTokenManager}. */ +/** A factory for {@link DefaultDelegationTokenManager}. */ @Internal -public class KerberosDelegationTokenManagerFactory { +public class DefaultDelegationTokenManagerFactory { private static final Logger LOG = - LoggerFactory.getLogger(KerberosDelegationTokenManagerFactory.class); + LoggerFactory.getLogger(DefaultDelegationTokenManagerFactory.class); public static DelegationTokenManager create( - ClassLoader classLoader, Configuration configuration, @Nullable ScheduledExecutor scheduledExecutor, @Nullable ExecutorService ioExecutor) throws IOException { - if (configuration.getBoolean(SecurityOptions.KERBEROS_FETCH_DELEGATION_TOKEN)) { - if (HadoopDependency.isHadoopCommonOnClasspath(classLoader)) { - KerberosLoginProvider kerberosLoginProvider = - new KerberosLoginProvider(configuration); - if (kerberosLoginProvider.isLoginPossible()) { - return new KerberosDelegationTokenManager( - configuration, scheduledExecutor, ioExecutor); - } else { - LOG.info( - "Cannot use kerberos delegation token manager no valid kerberos credentials provided."); - return new NoOpDelegationTokenManager(); - } - } else { - LOG.info( - "Cannot use kerberos delegation token manager because Hadoop cannot be found in the Classpath."); - return new NoOpDelegationTokenManager(); - } + if (configuration.getBoolean(SecurityOptions.DELEGATION_TOKENS_ENABLED)) { + return new DefaultDelegationTokenManager(configuration, scheduledExecutor, ioExecutor); } else { return new NoOpDelegationTokenManager(); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManagerTest.java new file mode 100644 index 00000000000..8092f330a99 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManagerTest.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.security.token; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.testutils.ManuallyTriggeredScheduledExecutorService; +import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.time.Clock; +import java.time.ZoneId; +import java.util.concurrent.atomic.AtomicInteger; + +import static java.time.Instant.ofEpochMilli; +import static org.apache.flink.configuration.SecurityOptions.DELEGATION_TOKENS_RENEWAL_TIME_RATIO; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** Test for {@link DelegationTokenManager}. */ +public class DefaultDelegationTokenManagerTest { + + @BeforeEach + public void beforeEach() { + ExceptionThrowingDelegationTokenProvider.reset(); + } + + @AfterAll + public static void afterAll() { + ExceptionThrowingDelegationTokenProvider.reset(); + } + + @Test + public void isProviderEnabledMustGiveBackTrueByDefault() { + Configuration configuration = new Configuration(); + DefaultDelegationTokenManager delegationTokenManager = + new DefaultDelegationTokenManager(configuration, null, null); + + assertTrue(delegationTokenManager.isProviderEnabled("test")); + } + + @Test + public void isProviderEnabledMustGiveBackFalseWhenDisabled() { + Configuration configuration = new Configuration(); + configuration.setBoolean("security.delegation.token.provider.test.enabled", false); + DefaultDelegationTokenManager delegationTokenManager = + new DefaultDelegationTokenManager(configuration, null, null); + + assertFalse(delegationTokenManager.isProviderEnabled("test")); + } + + @Test + public void configurationIsNullMustFailFast() { + assertThrows(Exception.class, () -> new DefaultDelegationTokenManager(null, null, null)); + } + + @Test + public void testAllProvidersLoaded() { + Configuration configuration = new Configuration(); + configuration.setBoolean("security.delegation.token.provider.throw.enabled", false); + DefaultDelegationTokenManager delegationTokenManager = + new DefaultDelegationTokenManager(configuration, null, null); + + assertEquals(3, delegationTokenManager.delegationTokenProviders.size()); + assertTrue(delegationTokenManager.isProviderLoaded("hadoopfs")); + assertTrue(delegationTokenManager.isProviderLoaded("hbase")); + assertTrue(delegationTokenManager.isProviderLoaded("test")); + assertTrue(ExceptionThrowingDelegationTokenProvider.constructed); + assertFalse(delegationTokenManager.isProviderLoaded("throw")); + } + + @Test + public void startTokensUpdateShouldScheduleRenewal() { + final ManuallyTriggeredScheduledExecutor scheduledExecutor = + new ManuallyTriggeredScheduledExecutor(); + final ManuallyTriggeredScheduledExecutorService scheduler = + new ManuallyTriggeredScheduledExecutorService(); + + ExceptionThrowingDelegationTokenProvider.addToken = true; + Configuration configuration = new Configuration(); + configuration.setBoolean("security.delegation.token.provider.throw.enabled", true); + AtomicInteger startTokensUpdateCallCount = new AtomicInteger(0); + DefaultDelegationTokenManager delegationTokenManager = + new DefaultDelegationTokenManager(configuration, scheduledExecutor, scheduler) { + @Override + void startTokensUpdate() { + startTokensUpdateCallCount.incrementAndGet(); + super.startTokensUpdate(); + } + }; + + delegationTokenManager.startTokensUpdate(); + ExceptionThrowingDelegationTokenProvider.throwInUsage = true; + scheduledExecutor.triggerScheduledTasks(); + scheduler.triggerAll(); + ExceptionThrowingDelegationTokenProvider.throwInUsage = false; + scheduledExecutor.triggerScheduledTasks(); + scheduler.triggerAll(); + delegationTokenManager.stopTokensUpdate(); + + assertEquals(3, startTokensUpdateCallCount.get()); + } + + @Test + public void calculateRenewalDelayShouldConsiderRenewalRatio() { + Configuration configuration = new Configuration(); + configuration.setBoolean("security.delegation.token.provider.throw.enabled", false); + configuration.set(DELEGATION_TOKENS_RENEWAL_TIME_RATIO, 0.5); + DefaultDelegationTokenManager delegationTokenManager = + new DefaultDelegationTokenManager(configuration, null, null); + + Clock constantClock = Clock.fixed(ofEpochMilli(100), ZoneId.systemDefault()); + assertEquals(50, delegationTokenManager.calculateRenewalDelay(constantClock, 200)); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/KerberosDelegationTokenManagerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/KerberosDelegationTokenManagerITCase.java deleted file mode 100644 index 45b04eeeff0..00000000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/KerberosDelegationTokenManagerITCase.java +++ /dev/null @@ -1,160 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.security.token.hadoop; - -import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.testutils.ManuallyTriggeredScheduledExecutorService; -import org.apache.flink.runtime.security.token.DelegationTokenManager; -import org.apache.flink.runtime.security.token.ExceptionThrowingDelegationTokenProvider; -import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor; - -import org.apache.hadoop.security.UserGroupInformation; -import org.junit.jupiter.api.Test; -import org.mockito.MockedStatic; - -import java.time.Clock; -import java.time.ZoneId; -import java.util.concurrent.atomic.AtomicInteger; - -import static java.time.Instant.ofEpochMilli; -import static org.apache.flink.configuration.SecurityOptions.KERBEROS_TOKENS_RENEWAL_TIME_RATIO; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.mockStatic; - -/** - * Test for {@link DelegationTokenManager}. - * - * <p>This class is an ITCase because the mocking breaks the {@link UserGroupInformation} class for - * other tests. - */ -public class KerberosDelegationTokenManagerITCase { - - @Test - public void isProviderEnabledMustGiveBackTrueByDefault() { - ExceptionThrowingDelegationTokenProvider.reset(); - Configuration configuration = new Configuration(); - KerberosDelegationTokenManager delegationTokenManager = - new KerberosDelegationTokenManager(configuration, null, null); - - assertTrue(delegationTokenManager.isProviderEnabled("test")); - } - - @Test - public void isProviderEnabledMustGiveBackFalseWhenDisabled() { - ExceptionThrowingDelegationTokenProvider.reset(); - Configuration configuration = new Configuration(); - configuration.setBoolean("security.kerberos.token.provider.test.enabled", false); - KerberosDelegationTokenManager delegationTokenManager = - new KerberosDelegationTokenManager(configuration, null, null); - - assertFalse(delegationTokenManager.isProviderEnabled("test")); - } - - @Test - public void configurationIsNullMustFailFast() { - assertThrows(Exception.class, () -> new KerberosDelegationTokenManager(null, null, null)); - } - - @Test - public void oneProviderThrowsExceptionMustFailFast() { - assertThrows( - Exception.class, - () -> { - try { - ExceptionThrowingDelegationTokenProvider.reset(); - ExceptionThrowingDelegationTokenProvider.throwInInit = true; - new KerberosDelegationTokenManager(new Configuration(), null, null); - } finally { - ExceptionThrowingDelegationTokenProvider.reset(); - } - }); - } - - @Test - public void testAllProvidersLoaded() { - ExceptionThrowingDelegationTokenProvider.reset(); - Configuration configuration = new Configuration(); - configuration.setBoolean("security.kerberos.token.provider.throw.enabled", false); - KerberosDelegationTokenManager delegationTokenManager = - new KerberosDelegationTokenManager(configuration, null, null); - - assertEquals(3, delegationTokenManager.delegationTokenProviders.size()); - assertTrue(delegationTokenManager.isProviderLoaded("hadoopfs")); - assertTrue(delegationTokenManager.isProviderLoaded("hbase")); - assertTrue(delegationTokenManager.isProviderLoaded("test")); - assertTrue(ExceptionThrowingDelegationTokenProvider.constructed); - assertFalse(delegationTokenManager.isProviderLoaded("throw")); - } - - @Test - public void startTokensUpdateShouldScheduleRenewal() { - final ManuallyTriggeredScheduledExecutor scheduledExecutor = - new ManuallyTriggeredScheduledExecutor(); - final ManuallyTriggeredScheduledExecutorService scheduler = - new ManuallyTriggeredScheduledExecutorService(); - - try (MockedStatic<UserGroupInformation> ugi = mockStatic(UserGroupInformation.class)) { - UserGroupInformation userGroupInformation = mock(UserGroupInformation.class); - ugi.when(UserGroupInformation::getCurrentUser).thenReturn(userGroupInformation); - - ExceptionThrowingDelegationTokenProvider.reset(); - ExceptionThrowingDelegationTokenProvider.addToken = true; - Configuration configuration = new Configuration(); - configuration.setBoolean("security.kerberos.token.provider.throw.enabled", true); - AtomicInteger startTokensUpdateCallCount = new AtomicInteger(0); - KerberosDelegationTokenManager delegationTokenManager = - new KerberosDelegationTokenManager( - configuration, scheduledExecutor, scheduler) { - @Override - void startTokensUpdate() { - startTokensUpdateCallCount.incrementAndGet(); - super.startTokensUpdate(); - } - }; - - delegationTokenManager.startTokensUpdate(); - ExceptionThrowingDelegationTokenProvider.throwInUsage = true; - scheduledExecutor.triggerScheduledTasks(); - scheduler.triggerAll(); - ExceptionThrowingDelegationTokenProvider.throwInUsage = false; - scheduledExecutor.triggerScheduledTasks(); - scheduler.triggerAll(); - delegationTokenManager.stopTokensUpdate(); - - assertEquals(3, startTokensUpdateCallCount.get()); - } - } - - @Test - public void calculateRenewalDelayShouldConsiderRenewalRatio() { - ExceptionThrowingDelegationTokenProvider.reset(); - Configuration configuration = new Configuration(); - configuration.setBoolean("security.kerberos.token.provider.throw.enabled", false); - configuration.set(KERBEROS_TOKENS_RENEWAL_TIME_RATIO, 0.5); - KerberosDelegationTokenManager delegationTokenManager = - new KerberosDelegationTokenManager(configuration, null, null); - - Clock constantClock = Clock.fixed(ofEpochMilli(100), ZoneId.systemDefault()); - assertEquals(50, delegationTokenManager.calculateRenewalDelay(constantClock, 200)); - } -} diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java index 7cd065cf206..f692a501d41 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java @@ -52,10 +52,10 @@ import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; import org.apache.flink.runtime.jobmanager.JobManagerProcessSpec; import org.apache.flink.runtime.jobmanager.JobManagerProcessUtils; +import org.apache.flink.runtime.security.token.DefaultDelegationTokenManager; import org.apache.flink.runtime.security.token.DelegationTokenContainer; import org.apache.flink.runtime.security.token.DelegationTokenManager; import org.apache.flink.runtime.security.token.hadoop.HadoopDelegationTokenConverter; -import org.apache.flink.runtime.security.token.hadoop.KerberosDelegationTokenManager; import org.apache.flink.runtime.security.token.hadoop.KerberosLoginProvider; import org.apache.flink.runtime.util.HadoopUtils; import org.apache.flink.util.CollectionUtil; @@ -1296,7 +1296,7 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> { LOG.info("Adding delegation tokens to the AM container."); DelegationTokenManager delegationTokenManager = - new KerberosDelegationTokenManager(flinkConfiguration, null, null); + new DefaultDelegationTokenManager(flinkConfiguration, null, null); DelegationTokenContainer container = new DelegationTokenContainer(); delegationTokenManager.obtainDelegationTokens(container);
