This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new c820534  [FLINK-18045] Fix Kerberos credentials checking
c820534 is described below

commit c8205341418d9139dc59d2fcd7b5dbae6b4c5f98
Author: Bartosz Krasinski <[email protected]>
AuthorDate: Wed Jun 3 00:20:25 2020 +0200

    [FLINK-18045] Fix Kerberos credentials checking
    
    This closes #12462.
---
 .../org/apache/flink/runtime/util/HadoopUtils.java |  42 +++---
 .../apache/flink/runtime/util/HadoopUtilsTest.java | 147 +++++++++++++++++++++
 .../runtime/security/modules/HadoopModule.java     |   8 +-
 .../apache/flink/yarn/YarnClusterDescriptor.java   |  10 +-
 pom.xml                                            |   1 +
 5 files changed, 181 insertions(+), 27 deletions(-)

diff --git 
a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java
 
b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java
index e91cd99..e01d59b 100644
--- 
a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java
+++ 
b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.util;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
@@ -43,7 +44,7 @@ public class HadoopUtils {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(HadoopUtils.class);
 
-       private static final Text HDFS_DELEGATION_TOKEN_KIND = new 
Text("HDFS_DELEGATION_TOKEN");
+       static final Text HDFS_DELEGATION_TOKEN_KIND = new 
Text("HDFS_DELEGATION_TOKEN");
 
        @SuppressWarnings("deprecation")
        public static Configuration 
getHadoopConfiguration(org.apache.flink.configuration.Configuration 
flinkConfiguration) {
@@ -112,29 +113,36 @@ public class HadoopUtils {
                return result;
        }
 
-       public static boolean isCredentialsConfigured(UserGroupInformation ugi, 
boolean useTicketCache) throws Exception {
-               if (UserGroupInformation.isSecurityEnabled()) {
-                       if (useTicketCache && !ugi.hasKerberosCredentials()) {
-                               // a delegation token is an adequate substitute 
in most cases
-                               if (!HadoopUtils.hasHDFSDelegationToken()) {
-                                       LOG.error("Hadoop security is enabled, 
but current login user has neither Kerberos credentials " +
-                                               "nor delegation tokens!");
-                                       return false;
-                               } else {
-                                       LOG.warn("Hadoop security is enabled 
but current login user does not have Kerberos credentials, " +
-                                               "use delegation token instead. 
Flink application will terminate after token expires.");
-                               }
+       public static boolean isKerberosSecurityEnabled(UserGroupInformation 
ugi) {
+               return UserGroupInformation.isSecurityEnabled() && 
ugi.getAuthenticationMethod() == 
UserGroupInformation.AuthenticationMethod.KERBEROS;
+       }
+
+       public static boolean areKerberosCredentialsValid(UserGroupInformation 
ugi, boolean useTicketCache) {
+               Preconditions.checkState(isKerberosSecurityEnabled(ugi));
+
+               // note: UGI::hasKerberosCredentials inaccurately reports false
+               // for logins based on a keytab (fixed in Hadoop 2.6.1, see 
HADOOP-10786),
+               // so we check only in ticket cache scenario.
+               if (useTicketCache && !ugi.hasKerberosCredentials()) {
+                       if (hasHDFSDelegationToken(ugi)) {
+                               LOG.warn("Hadoop security is enabled but 
current login user does not have Kerberos credentials, " +
+                                       "use delegation token instead. Flink 
application will terminate after token expires.");
+                               return true;
+                       } else {
+                               LOG.error("Hadoop security is enabled, but 
current login user has neither Kerberos credentials " +
+                                       "nor delegation tokens!");
+                               return false;
                        }
                }
+
                return true;
        }
 
        /**
-        * Indicates whether the current user has an HDFS delegation token.
+        * Indicates whether the user has an HDFS delegation token.
         */
-       public static boolean hasHDFSDelegationToken() throws Exception {
-               UserGroupInformation loginUser = 
UserGroupInformation.getCurrentUser();
-               Collection<Token<? extends TokenIdentifier>> usrTok = 
loginUser.getTokens();
+       public static boolean hasHDFSDelegationToken(UserGroupInformation ugi) {
+               Collection<Token<? extends TokenIdentifier>> usrTok = 
ugi.getTokens();
                for (Token<? extends TokenIdentifier> token : usrTok) {
                        if (token.getKind().equals(HDFS_DELEGATION_TOKEN_KIND)) 
{
                                return true;
diff --git 
a/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/util/HadoopUtilsTest.java
 
b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/util/HadoopUtilsTest.java
new file mode 100644
index 0000000..472a4ff
--- /dev/null
+++ 
b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/util/HadoopUtilsTest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.util.TestLogger;
+
+import org.apache.hadoop.conf.Configuration;
+import 
org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
+import org.apache.hadoop.security.token.Token;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+import sun.security.krb5.KrbException;
+
+import static 
org.apache.flink.runtime.util.HadoopUtils.HDFS_DELEGATION_TOKEN_KIND;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assume.assumeFalse;
+import static org.junit.Assume.assumeTrue;
+
+/**
+ * Unit tests for Hadoop utils.
+ */
+public class HadoopUtilsTest extends TestLogger {
+
+       @BeforeClass
+       public static void setPropertiesToEnableKerberosConfigInit() throws 
KrbException {
+               System.setProperty("java.security.krb5.realm", "");
+               System.setProperty("java.security.krb5.kdc", "");
+               System.setProperty("java.security.krb5.conf", "/dev/null");
+               sun.security.krb5.Config.refresh();
+       }
+
+       @Test
+       public void 
testShouldReturnFalseWhenNoKerberosCredentialsOrDelegationTokens() {
+               
UserGroupInformation.setConfiguration(getHadoopConfigWithAuthMethod(AuthenticationMethod.KERBEROS));
+               UserGroupInformation userWithoutCredentialsOrTokens = 
createTestUser(AuthenticationMethod.KERBEROS);
+               
assumeFalse(userWithoutCredentialsOrTokens.hasKerberosCredentials());
+
+               boolean isKerberosEnabled = 
HadoopUtils.isKerberosSecurityEnabled(userWithoutCredentialsOrTokens);
+               boolean result = 
HadoopUtils.areKerberosCredentialsValid(userWithoutCredentialsOrTokens, true);
+
+               assertTrue(isKerberosEnabled);
+               assertFalse(result);
+       }
+
+       @Test
+       public void testShouldReturnTrueWhenDelegationTokenIsPresent() {
+               
UserGroupInformation.setConfiguration(getHadoopConfigWithAuthMethod(AuthenticationMethod.KERBEROS));
+               UserGroupInformation userWithoutCredentialsButHavingToken = 
createTestUser(AuthenticationMethod.KERBEROS);
+               
userWithoutCredentialsButHavingToken.addToken(getHDFSDelegationToken());
+               
assumeFalse(userWithoutCredentialsButHavingToken.hasKerberosCredentials());
+
+               boolean result = 
HadoopUtils.areKerberosCredentialsValid(userWithoutCredentialsButHavingToken, 
true);
+
+               assertTrue(result);
+       }
+
+       @Test
+       public void testShouldReturnTrueWhenKerberosCredentialsArePresent() {
+               
UserGroupInformation.setConfiguration(getHadoopConfigWithAuthMethod(AuthenticationMethod.KERBEROS));
+               UserGroupInformation userWithCredentials = 
Mockito.mock(UserGroupInformation.class);
+               
Mockito.when(userWithCredentials.getAuthenticationMethod()).thenReturn(AuthenticationMethod.KERBEROS);
+               
Mockito.when(userWithCredentials.hasKerberosCredentials()).thenReturn(true);
+
+               boolean result = 
HadoopUtils.areKerberosCredentialsValid(userWithCredentials, true);
+
+               assertTrue(result);
+       }
+
+       @Test
+       public void isKerberosSecurityEnabled_NoKerberos_ReturnsFalse() {
+               
UserGroupInformation.setConfiguration(getHadoopConfigWithAuthMethod(AuthenticationMethod.PROXY));
+               UserGroupInformation userWithAuthMethodOtherThanKerberos = 
createTestUser(AuthenticationMethod.PROXY);
+
+               boolean result = 
HadoopUtils.isKerberosSecurityEnabled(userWithAuthMethodOtherThanKerberos);
+
+               assertFalse(result);
+       }
+
+       @Test
+       public void testShouldReturnTrueIfTicketCacheIsNotUsed() {
+               
UserGroupInformation.setConfiguration(getHadoopConfigWithAuthMethod(AuthenticationMethod.KERBEROS));
+               UserGroupInformation user = 
createTestUser(AuthenticationMethod.KERBEROS);
+
+               boolean result = HadoopUtils.areKerberosCredentialsValid(user, 
false);
+
+               assertTrue(result);
+       }
+
+       @Test
+       public void testShouldCheckIfTheUserHasHDFSDelegationToken() {
+               UserGroupInformation userWithToken = 
createTestUser(AuthenticationMethod.KERBEROS);
+               userWithToken.addToken(getHDFSDelegationToken());
+
+               boolean result = 
HadoopUtils.hasHDFSDelegationToken(userWithToken);
+
+               assertTrue(result);
+       }
+
+       @Test
+       public void testShouldReturnFalseIfTheUserHasNoHDFSDelegationToken() {
+               UserGroupInformation userWithoutToken = 
createTestUser(AuthenticationMethod.KERBEROS);
+               assumeTrue(userWithoutToken.getTokens().isEmpty());
+
+               boolean result = 
HadoopUtils.hasHDFSDelegationToken(userWithoutToken);
+
+               assertFalse(result);
+       }
+
+       private static Configuration 
getHadoopConfigWithAuthMethod(AuthenticationMethod authenticationMethod) {
+               Configuration conf = new Configuration(true);
+               conf.set("hadoop.security.authentication", 
authenticationMethod.name());
+               return conf;
+       }
+
+       private static UserGroupInformation createTestUser(AuthenticationMethod 
authenticationMethod) {
+               UserGroupInformation user = 
UserGroupInformation.createRemoteUser("test-user");
+               user.setAuthenticationMethod(authenticationMethod);
+               return user;
+       }
+
+       private static Token<DelegationTokenIdentifier> 
getHDFSDelegationToken() {
+               Token<DelegationTokenIdentifier> token = new Token<>();
+               token.setKind(HDFS_DELEGATION_TOKEN_KIND);
+               return token;
+       }
+
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java
index b9250e6..f625b73 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java
@@ -137,11 +137,13 @@ public class HadoopModule implements SecurityModule {
                                loginUser = UserGroupInformation.getLoginUser();
                        }
 
-                       boolean isCredentialsConfigured = 
HadoopUtils.isCredentialsConfigured(
-                               loginUser, securityConfig.useTicketCache());
+                       LOG.info("Hadoop user set to {}", loginUser);
 
-                       LOG.info("Hadoop user set to {}, credentials check 
status: {}", loginUser, isCredentialsConfigured);
+                       if (HadoopUtils.isKerberosSecurityEnabled(loginUser)) {
+                               boolean isCredentialsConfigured = 
HadoopUtils.areKerberosCredentialsValid(loginUser, 
securityConfig.useTicketCache());
 
+                               LOG.info("Kerberos security is enabled and 
credentials are {}.", isCredentialsConfigured ? "valid" : "invalid");
+                       }
                } catch (Throwable ex) {
                        throw new SecurityInstallException("Unable to set the 
Hadoop login user", ex);
                }
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
index 6acd05d..52ac1b1 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
@@ -468,15 +468,11 @@ public class YarnClusterDescriptor implements 
ClusterDescriptor<ApplicationId> {
                        @Nullable JobGraph jobGraph,
                        boolean detached) throws Exception {
 
-               if (UserGroupInformation.isSecurityEnabled()) {
-                       // note: UGI::hasKerberosCredentials inaccurately 
reports false
-                       // for logins based on a keytab (fixed in Hadoop 2.6.1, 
see HADOOP-10786),
-                       // so we check only in ticket cache scenario.
+               final UserGroupInformation currentUser = 
UserGroupInformation.getCurrentUser();
+               if (HadoopUtils.isKerberosSecurityEnabled(currentUser)) {
                        boolean useTicketCache = 
flinkConfiguration.getBoolean(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE);
 
-                       boolean isCredentialsConfigured = 
HadoopUtils.isCredentialsConfigured(
-                               UserGroupInformation.getCurrentUser(), 
useTicketCache);
-                       if (!isCredentialsConfigured) {
+                       if 
(!HadoopUtils.areKerberosCredentialsValid(currentUser, useTicketCache)) {
                                throw new RuntimeException("Hadoop security 
with Kerberos is enabled but the login user " +
                                        "does not have Kerberos credentials or 
delegation tokens!");
                        }
diff --git a/pom.xml b/pom.xml
index 89f791a..5d76464 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1007,6 +1007,7 @@ under the License.
                                                                
<arg>--add-exports=java.base/sun.net.util=ALL-UNNAMED</arg>
                                                                
<arg>--add-exports=java.management/sun.management=ALL-UNNAMED</arg>
                                                                
<arg>--add-exports=java.rmi/sun.rmi.registry=ALL-UNNAMED</arg>
+                                                               
<arg>--add-exports=java.security.jgss/sun.security.krb5=ALL-UNNAMED</arg>
                                                        </compilerArgs>
                                                </configuration>
                                        </plugin>

Reply via email to