This is an automated email from the ASF dual-hosted git repository.

rongr pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.10 by this push:
     new 8751e69  [FLINK-15561] Unify Kerberos credentials checking
8751e69 is described below

commit 8751e69037d8a9b1756b75eed62a368c3ef29137
Author: Rong Rong <[email protected]>
AuthorDate: Sun Jan 12 15:16:51 2020 -0800

    [FLINK-15561] Unify Kerberos credentials checking
    
    Before, we had duplicate code in HadoopModule and YarnClusterDescriptor,
    now we use the same code for both. That code is refactored to a util.
---
 .../java/org/apache/flink/runtime/util/HadoopUtils.java | 17 +++++++++++++++++
 .../flink/runtime/security/modules/HadoopModule.java    | 15 +++------------
 .../org/apache/flink/yarn/YarnClusterDescriptor.java    | 10 +++++-----
 3 files changed, 25 insertions(+), 17 deletions(-)

diff --git 
a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java
 
b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java
index f9244d3..e91cd99 100644
--- 
a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java
+++ 
b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java
@@ -112,6 +112,23 @@ public class HadoopUtils {
                return result;
        }
 
+       public static boolean isCredentialsConfigured(UserGroupInformation ugi, 
boolean useTicketCache) throws Exception {
+               if (UserGroupInformation.isSecurityEnabled()) {
+                       if (useTicketCache && !ugi.hasKerberosCredentials()) {
+                               // a delegation token is an adequate substitute 
in most cases
+                               if (!HadoopUtils.hasHDFSDelegationToken()) {
+                                       LOG.error("Hadoop security is enabled, 
but current login user has neither Kerberos credentials " +
+                                               "nor delegation tokens!");
+                                       return false;
+                               } else {
+                                       LOG.warn("Hadoop security is enabled 
but current login user does not have Kerberos credentials, " +
+                                               "use delegation token instead. 
Flink application will terminate after token expires.");
+                               }
+                       }
+               }
+               return true;
+       }
+
        /**
         * Indicates whether the current user has an HDFS delegation token.
         */
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java
index 1e045ad..b9250e6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java
@@ -137,19 +137,10 @@ public class HadoopModule implements SecurityModule {
                                loginUser = UserGroupInformation.getLoginUser();
                        }
 
-                       if (UserGroupInformation.isSecurityEnabled()) {
-                               // note: UGI::hasKerberosCredentials 
inaccurately reports false
-                               // for logins based on a keytab (fixed in 
Hadoop 2.6.1, see HADOOP-10786),
-                               // so we check only in ticket cache scenario.
-                               if (securityConfig.useTicketCache() && 
!loginUser.hasKerberosCredentials()) {
-                                       // a delegation token is an adequate 
substitute in most cases
-                                       if 
(!HadoopUtils.hasHDFSDelegationToken()) {
-                                               LOG.warn("Hadoop security is 
enabled but current login user does not have Kerberos credentials");
-                                       }
-                               }
-                       }
+                       boolean isCredentialsConfigured = 
HadoopUtils.isCredentialsConfigured(
+                               loginUser, securityConfig.useTicketCache());
 
-                       LOG.info("Hadoop user set to {}", loginUser);
+                       LOG.info("Hadoop user set to {}, credentials check 
status: {}", loginUser, isCredentialsConfigured);
 
                } catch (Throwable ex) {
                        throw new SecurityInstallException("Unable to set the 
Hadoop login user", ex);
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
index 4a905ae..31bd6a1 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
@@ -43,6 +43,7 @@ import 
org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.runtime.util.HadoopUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.ShutdownHookUtil;
@@ -432,12 +433,11 @@ public class YarnClusterDescriptor implements 
ClusterDescriptor<ApplicationId> {
                        // so we check only in ticket cache scenario.
                        boolean useTicketCache = 
flinkConfiguration.getBoolean(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE);
 
-                       UserGroupInformation loginUser = 
UserGroupInformation.getCurrentUser();
-                       if (loginUser.getAuthenticationMethod() == 
UserGroupInformation.AuthenticationMethod.KERBEROS
-                                       && useTicketCache && 
!loginUser.hasKerberosCredentials()) {
-                               LOG.error("Hadoop security with Kerberos is 
enabled but the login user does not have Kerberos credentials");
+                       boolean isCredentialsConfigured = 
HadoopUtils.isCredentialsConfigured(
+                               UserGroupInformation.getCurrentUser(), 
useTicketCache);
+                       if (!isCredentialsConfigured) {
                                throw new RuntimeException("Hadoop security 
with Kerberos is enabled but the login user " +
-                                               "does not have Kerberos 
credentials");
+                                       "does not have Kerberos credentials or 
delegation tokens!");
                        }
                }
 

Reply via email to