This is an automated email from the ASF dual-hosted git repository.

mbalassi pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.16 by this push:
     new 98bcabb592d [FLINK-29622][runtime][security] Start kerberos delegation 
token provider only if the user provided valid credentials
98bcabb592d is described below

commit 98bcabb592dbc0b62fa70f93d4f3063832f12b03
Author: Márton Balassi <[email protected]>
AuthorDate: Wed Oct 19 19:44:42 2022 +0200

    [FLINK-29622][runtime][security] Start kerberos delegation token provider 
only if the user provided valid credentials
    
    Co-authored-by: Gabor Somogyi <[email protected]>
---
 .../token/KerberosDelegationTokenManagerFactory.java     | 16 +++++++++++++---
 1 file changed, 13 insertions(+), 3 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManagerFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManagerFactory.java
index 60494b105d7..f9cd4fffb07 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManagerFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManagerFactory.java
@@ -28,6 +28,7 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
+import java.io.IOException;
 import java.util.concurrent.ExecutorService;
 
 /** A factory for {@link KerberosDelegationTokenManager}. */
@@ -40,12 +41,21 @@ public class KerberosDelegationTokenManagerFactory {
             ClassLoader classLoader,
             Configuration configuration,
             @Nullable ScheduledExecutor scheduledExecutor,
-            @Nullable ExecutorService ioExecutor) {
+            @Nullable ExecutorService ioExecutor)
+            throws IOException {
 
         if 
(configuration.getBoolean(SecurityOptions.KERBEROS_FETCH_DELEGATION_TOKEN)) {
             if (HadoopDependency.isHadoopCommonOnClasspath(classLoader)) {
-                return new KerberosDelegationTokenManager(
-                        configuration, scheduledExecutor, ioExecutor);
+                KerberosLoginProvider kerberosLoginProvider =
+                        new KerberosLoginProvider(configuration);
+                if (kerberosLoginProvider.isLoginPossible()) {
+                    return new KerberosDelegationTokenManager(
+                            configuration, scheduledExecutor, ioExecutor);
+                } else {
+                    LOG.info(
+                            "Cannot use kerberos delegation token manager no 
valid kerberos credentials provided.");
+                    return new NoOpDelegationTokenManager();
+                }
             } else {
                 LOG.info(
                         "Cannot use kerberos delegation token manager because 
Hadoop cannot be found in the Classpath.");

Reply via email to