This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new c820534 [FLINK-18045] Fix Kerberos credentials checking
c820534 is described below
commit c8205341418d9139dc59d2fcd7b5dbae6b4c5f98
Author: Bartosz Krasinski <[email protected]>
AuthorDate: Wed Jun 3 00:20:25 2020 +0200
[FLINK-18045] Fix Kerberos credentials checking
This closes #12462.
---
.../org/apache/flink/runtime/util/HadoopUtils.java | 42 +++---
.../apache/flink/runtime/util/HadoopUtilsTest.java | 147 +++++++++++++++++++++
.../runtime/security/modules/HadoopModule.java | 8 +-
.../apache/flink/yarn/YarnClusterDescriptor.java | 10 +-
pom.xml | 1 +
5 files changed, 181 insertions(+), 27 deletions(-)
diff --git
a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java
b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java
index e91cd99..e01d59b 100644
---
a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java
+++
b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.util;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.HdfsConfiguration;
@@ -43,7 +44,7 @@ public class HadoopUtils {
private static final Logger LOG =
LoggerFactory.getLogger(HadoopUtils.class);
- private static final Text HDFS_DELEGATION_TOKEN_KIND = new
Text("HDFS_DELEGATION_TOKEN");
+ static final Text HDFS_DELEGATION_TOKEN_KIND = new
Text("HDFS_DELEGATION_TOKEN");
@SuppressWarnings("deprecation")
public static Configuration
getHadoopConfiguration(org.apache.flink.configuration.Configuration
flinkConfiguration) {
@@ -112,29 +113,36 @@ public class HadoopUtils {
return result;
}
- public static boolean isCredentialsConfigured(UserGroupInformation ugi,
boolean useTicketCache) throws Exception {
- if (UserGroupInformation.isSecurityEnabled()) {
- if (useTicketCache && !ugi.hasKerberosCredentials()) {
- // a delegation token is an adequate substitute
in most cases
- if (!HadoopUtils.hasHDFSDelegationToken()) {
- LOG.error("Hadoop security is enabled,
but current login user has neither Kerberos credentials " +
- "nor delegation tokens!");
- return false;
- } else {
- LOG.warn("Hadoop security is enabled
but current login user does not have Kerberos credentials, " +
- "use delegation token instead.
Flink application will terminate after token expires.");
- }
+ public static boolean isKerberosSecurityEnabled(UserGroupInformation
ugi) {
+ return UserGroupInformation.isSecurityEnabled() &&
ugi.getAuthenticationMethod() ==
UserGroupInformation.AuthenticationMethod.KERBEROS;
+ }
+
+ public static boolean areKerberosCredentialsValid(UserGroupInformation
ugi, boolean useTicketCache) {
+ Preconditions.checkState(isKerberosSecurityEnabled(ugi));
+
+ // note: UGI::hasKerberosCredentials inaccurately reports false
+ // for logins based on a keytab (fixed in Hadoop 2.6.1, see
HADOOP-10786),
+ // so we check only in ticket cache scenario.
+ if (useTicketCache && !ugi.hasKerberosCredentials()) {
+ if (hasHDFSDelegationToken(ugi)) {
+ LOG.warn("Hadoop security is enabled but
current login user does not have Kerberos credentials, " +
+ "use delegation token instead. Flink
application will terminate after token expires.");
+ return true;
+ } else {
+ LOG.error("Hadoop security is enabled, but
current login user has neither Kerberos credentials " +
+ "nor delegation tokens!");
+ return false;
}
}
+
return true;
}
/**
- * Indicates whether the current user has an HDFS delegation token.
+ * Indicates whether the user has an HDFS delegation token.
*/
- public static boolean hasHDFSDelegationToken() throws Exception {
- UserGroupInformation loginUser =
UserGroupInformation.getCurrentUser();
- Collection<Token<? extends TokenIdentifier>> usrTok =
loginUser.getTokens();
+ public static boolean hasHDFSDelegationToken(UserGroupInformation ugi) {
+ Collection<Token<? extends TokenIdentifier>> usrTok =
ugi.getTokens();
for (Token<? extends TokenIdentifier> token : usrTok) {
if (token.getKind().equals(HDFS_DELEGATION_TOKEN_KIND))
{
return true;
diff --git
a/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/util/HadoopUtilsTest.java
b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/util/HadoopUtilsTest.java
new file mode 100644
index 0000000..472a4ff
--- /dev/null
+++
b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/util/HadoopUtilsTest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.util.TestLogger;
+
+import org.apache.hadoop.conf.Configuration;
+import
org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
+import org.apache.hadoop.security.token.Token;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+import sun.security.krb5.KrbException;
+
+import static
org.apache.flink.runtime.util.HadoopUtils.HDFS_DELEGATION_TOKEN_KIND;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assume.assumeFalse;
+import static org.junit.Assume.assumeTrue;
+
+/**
+ * Unit tests for Hadoop utils.
+ */
+public class HadoopUtilsTest extends TestLogger {
+
+ @BeforeClass
+ public static void setPropertiesToEnableKerberosConfigInit() throws
KrbException {
+ System.setProperty("java.security.krb5.realm", "");
+ System.setProperty("java.security.krb5.kdc", "");
+ System.setProperty("java.security.krb5.conf", "/dev/null");
+ sun.security.krb5.Config.refresh();
+ }
+
+ @Test
+ public void
testShouldReturnFalseWhenNoKerberosCredentialsOrDelegationTokens() {
+
UserGroupInformation.setConfiguration(getHadoopConfigWithAuthMethod(AuthenticationMethod.KERBEROS));
+ UserGroupInformation userWithoutCredentialsOrTokens =
createTestUser(AuthenticationMethod.KERBEROS);
+
assumeFalse(userWithoutCredentialsOrTokens.hasKerberosCredentials());
+
+ boolean isKerberosEnabled =
HadoopUtils.isKerberosSecurityEnabled(userWithoutCredentialsOrTokens);
+ boolean result =
HadoopUtils.areKerberosCredentialsValid(userWithoutCredentialsOrTokens, true);
+
+ assertTrue(isKerberosEnabled);
+ assertFalse(result);
+ }
+
+ @Test
+ public void testShouldReturnTrueWhenDelegationTokenIsPresent() {
+
UserGroupInformation.setConfiguration(getHadoopConfigWithAuthMethod(AuthenticationMethod.KERBEROS));
+ UserGroupInformation userWithoutCredentialsButHavingToken =
createTestUser(AuthenticationMethod.KERBEROS);
+
userWithoutCredentialsButHavingToken.addToken(getHDFSDelegationToken());
+
assumeFalse(userWithoutCredentialsButHavingToken.hasKerberosCredentials());
+
+ boolean result =
HadoopUtils.areKerberosCredentialsValid(userWithoutCredentialsButHavingToken,
true);
+
+ assertTrue(result);
+ }
+
+ @Test
+ public void testShouldReturnTrueWhenKerberosCredentialsArePresent() {
+
UserGroupInformation.setConfiguration(getHadoopConfigWithAuthMethod(AuthenticationMethod.KERBEROS));
+ UserGroupInformation userWithCredentials =
Mockito.mock(UserGroupInformation.class);
+
Mockito.when(userWithCredentials.getAuthenticationMethod()).thenReturn(AuthenticationMethod.KERBEROS);
+
Mockito.when(userWithCredentials.hasKerberosCredentials()).thenReturn(true);
+
+ boolean result =
HadoopUtils.areKerberosCredentialsValid(userWithCredentials, true);
+
+ assertTrue(result);
+ }
+
+ @Test
+ public void isKerberosSecurityEnabled_NoKerberos_ReturnsFalse() {
+
UserGroupInformation.setConfiguration(getHadoopConfigWithAuthMethod(AuthenticationMethod.PROXY));
+ UserGroupInformation userWithAuthMethodOtherThanKerberos =
createTestUser(AuthenticationMethod.PROXY);
+
+ boolean result =
HadoopUtils.isKerberosSecurityEnabled(userWithAuthMethodOtherThanKerberos);
+
+ assertFalse(result);
+ }
+
+ @Test
+ public void testShouldReturnTrueIfTicketCacheIsNotUsed() {
+
UserGroupInformation.setConfiguration(getHadoopConfigWithAuthMethod(AuthenticationMethod.KERBEROS));
+ UserGroupInformation user =
createTestUser(AuthenticationMethod.KERBEROS);
+
+ boolean result = HadoopUtils.areKerberosCredentialsValid(user,
false);
+
+ assertTrue(result);
+ }
+
+ @Test
+ public void testShouldCheckIfTheUserHasHDFSDelegationToken() {
+ UserGroupInformation userWithToken =
createTestUser(AuthenticationMethod.KERBEROS);
+ userWithToken.addToken(getHDFSDelegationToken());
+
+ boolean result =
HadoopUtils.hasHDFSDelegationToken(userWithToken);
+
+ assertTrue(result);
+ }
+
+ @Test
+ public void testShouldReturnFalseIfTheUserHasNoHDFSDelegationToken() {
+ UserGroupInformation userWithoutToken =
createTestUser(AuthenticationMethod.KERBEROS);
+ assumeTrue(userWithoutToken.getTokens().isEmpty());
+
+ boolean result =
HadoopUtils.hasHDFSDelegationToken(userWithoutToken);
+
+ assertFalse(result);
+ }
+
+ private static Configuration
getHadoopConfigWithAuthMethod(AuthenticationMethod authenticationMethod) {
+ Configuration conf = new Configuration(true);
+ conf.set("hadoop.security.authentication",
authenticationMethod.name());
+ return conf;
+ }
+
+ private static UserGroupInformation createTestUser(AuthenticationMethod
authenticationMethod) {
+ UserGroupInformation user =
UserGroupInformation.createRemoteUser("test-user");
+ user.setAuthenticationMethod(authenticationMethod);
+ return user;
+ }
+
+ private static Token<DelegationTokenIdentifier>
getHDFSDelegationToken() {
+ Token<DelegationTokenIdentifier> token = new Token<>();
+ token.setKind(HDFS_DELEGATION_TOKEN_KIND);
+ return token;
+ }
+
+}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java
index b9250e6..f625b73 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java
@@ -137,11 +137,13 @@ public class HadoopModule implements SecurityModule {
loginUser = UserGroupInformation.getLoginUser();
}
- boolean isCredentialsConfigured =
HadoopUtils.isCredentialsConfigured(
- loginUser, securityConfig.useTicketCache());
+ LOG.info("Hadoop user set to {}", loginUser);
- LOG.info("Hadoop user set to {}, credentials check
status: {}", loginUser, isCredentialsConfigured);
+ if (HadoopUtils.isKerberosSecurityEnabled(loginUser)) {
+ boolean isCredentialsConfigured =
HadoopUtils.areKerberosCredentialsValid(loginUser,
securityConfig.useTicketCache());
+ LOG.info("Kerberos security is enabled and
credentials are {}.", isCredentialsConfigured ? "valid" : "invalid");
+ }
} catch (Throwable ex) {
throw new SecurityInstallException("Unable to set the
Hadoop login user", ex);
}
diff --git
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
index 6acd05d..52ac1b1 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
@@ -468,15 +468,11 @@ public class YarnClusterDescriptor implements
ClusterDescriptor<ApplicationId> {
@Nullable JobGraph jobGraph,
boolean detached) throws Exception {
- if (UserGroupInformation.isSecurityEnabled()) {
- // note: UGI::hasKerberosCredentials inaccurately
reports false
- // for logins based on a keytab (fixed in Hadoop 2.6.1,
see HADOOP-10786),
- // so we check only in ticket cache scenario.
+ final UserGroupInformation currentUser =
UserGroupInformation.getCurrentUser();
+ if (HadoopUtils.isKerberosSecurityEnabled(currentUser)) {
boolean useTicketCache =
flinkConfiguration.getBoolean(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE);
- boolean isCredentialsConfigured =
HadoopUtils.isCredentialsConfigured(
- UserGroupInformation.getCurrentUser(),
useTicketCache);
- if (!isCredentialsConfigured) {
+ if
(!HadoopUtils.areKerberosCredentialsValid(currentUser, useTicketCache)) {
throw new RuntimeException("Hadoop security
with Kerberos is enabled but the login user " +
"does not have Kerberos credentials or
delegation tokens!");
}
diff --git a/pom.xml b/pom.xml
index 89f791a..5d76464 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1007,6 +1007,7 @@ under the License.
<arg>--add-exports=java.base/sun.net.util=ALL-UNNAMED</arg>
<arg>--add-exports=java.management/sun.management=ALL-UNNAMED</arg>
<arg>--add-exports=java.rmi/sun.rmi.registry=ALL-UNNAMED</arg>
+
<arg>--add-exports=java.security.jgss/sun.security.krb5=ALL-UNNAMED</arg>
</compilerArgs>
</configuration>
</plugin>