This is an automated email from the ASF dual-hosted git repository.
mbalassi pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.16 by this push:
new 98bcabb592d [FLINK-29622][runtime][security] Start kerberos delegation
token provider only if the user provided valid credentials
98bcabb592d is described below
commit 98bcabb592dbc0b62fa70f93d4f3063832f12b03
Author: Márton Balassi <[email protected]>
AuthorDate: Wed Oct 19 19:44:42 2022 +0200
[FLINK-29622][runtime][security] Start kerberos delegation token provider
only if the user provided valid credentials
Co-authored-by: Gabor Somogyi <[email protected]>
---
.../token/KerberosDelegationTokenManagerFactory.java | 16 +++++++++++++---
1 file changed, 13 insertions(+), 3 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManagerFactory.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManagerFactory.java
index 60494b105d7..f9cd4fffb07 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManagerFactory.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManagerFactory.java
@@ -28,6 +28,7 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
+import java.io.IOException;
import java.util.concurrent.ExecutorService;
/** A factory for {@link KerberosDelegationTokenManager}. */
@@ -40,12 +41,21 @@ public class KerberosDelegationTokenManagerFactory {
ClassLoader classLoader,
Configuration configuration,
@Nullable ScheduledExecutor scheduledExecutor,
- @Nullable ExecutorService ioExecutor) {
+ @Nullable ExecutorService ioExecutor)
+ throws IOException {
if
(configuration.getBoolean(SecurityOptions.KERBEROS_FETCH_DELEGATION_TOKEN)) {
if (HadoopDependency.isHadoopCommonOnClasspath(classLoader)) {
- return new KerberosDelegationTokenManager(
- configuration, scheduledExecutor, ioExecutor);
+ KerberosLoginProvider kerberosLoginProvider =
+ new KerberosLoginProvider(configuration);
+ if (kerberosLoginProvider.isLoginPossible()) {
+ return new KerberosDelegationTokenManager(
+ configuration, scheduledExecutor, ioExecutor);
+ } else {
+ LOG.info(
+ "Cannot use kerberos delegation token manager no
valid kerberos credentials provided.");
+ return new NoOpDelegationTokenManager();
+ }
} else {
LOG.info(
"Cannot use kerberos delegation token manager because
Hadoop cannot be found in the Classpath.");