http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-runtime/src/main/resources/flink-jaas.conf ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/resources/flink-jaas.conf b/flink-runtime/src/main/resources/flink-jaas.conf index 7f0f06b..d287ff4 100644 --- a/flink-runtime/src/main/resources/flink-jaas.conf +++ b/flink-runtime/src/main/resources/flink-jaas.conf @@ -1,3 +1,4 @@ +/** ################################################################################ # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file @@ -17,10 +18,6 @@ ################################################################################ # We are using this file as an workaround for the Kafka and ZK SASL implementation # since they explicitly look for java.security.auth.login.config property -# The file itself is not used by the application since the internal implementation -# uses a process-wide in-memory java security configuration object. # Please do not edit/delete this file - See FLINK-3929 -sample { - useKeyTab=false - useTicketCache=true; -} +**/ +
http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/KeytabOverlayTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/KeytabOverlayTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/KeytabOverlayTest.java index 0570f28..1847ec4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/KeytabOverlayTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/KeytabOverlayTest.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.clusterframework.overlays; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.SecurityOptions; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.clusterframework.ContainerSpecification; import org.junit.Rule; @@ -46,7 +47,7 @@ public class KeytabOverlayTest extends ContainerOverlayTestBase { ContainerSpecification spec = new ContainerSpecification(); overlay.configure(spec); - assertEquals(TARGET_PATH.getPath(), spec.getDynamicConfiguration().getString(ConfigConstants.SECURITY_KEYTAB_KEY, null)); + assertEquals(TARGET_PATH.getPath(), spec.getDynamicConfiguration().getString(SecurityOptions.KERBEROS_LOGIN_KEYTAB)); checkArtifact(spec, TARGET_PATH); } @@ -64,7 +65,7 @@ public class KeytabOverlayTest extends ContainerOverlayTestBase { final Configuration conf = new Configuration(); File keytab = tempFolder.newFile(); - conf.setString(ConfigConstants.SECURITY_KEYTAB_KEY, keytab.getAbsolutePath()); + conf.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB, keytab.getAbsolutePath()); KeytabOverlay.Builder builder = KeytabOverlay.newBuilder().fromEnvironment(conf); assertEquals(builder.keytabPath, keytab); } http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-runtime/src/test/java/org/apache/flink/runtime/security/JaasConfigurationTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/security/JaasConfigurationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/security/JaasConfigurationTest.java deleted file mode 100644 index 89e5ef9..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/security/JaasConfigurationTest.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.security; - -import org.apache.hadoop.security.authentication.util.KerberosUtil; -import org.junit.Test; - -import javax.security.auth.login.AppConfigurationEntry; - -import static org.junit.Assert.assertEquals; - -/** - * Tests for the {@link JaasConfiguration}. - */ -public class JaasConfigurationTest { - - @Test - public void testInvalidKerberosParams() { - String keytab = "user.keytab"; - String principal = null; - try { - new JaasConfiguration(keytab, principal); - } catch(RuntimeException re) { - assertEquals("Both keytab and principal are required and cannot be empty",re.getMessage()); - } - } - - @Test - public void testDefaultAceEntry() { - JaasConfiguration conf = new JaasConfiguration(null,null); - javax.security.auth.login.Configuration.setConfiguration(conf); - final AppConfigurationEntry[] entry = conf.getAppConfigurationEntry("test"); - AppConfigurationEntry ace = entry[0]; - assertEquals(ace.getLoginModuleName(), KerberosUtil.getKrb5LoginModuleName()); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-runtime/src/test/java/org/apache/flink/runtime/security/KerberosUtilsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/security/KerberosUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/security/KerberosUtilsTest.java new file mode 100644 index 0000000..4c899e8 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/security/KerberosUtilsTest.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.security; + +import org.apache.hadoop.security.authentication.util.KerberosUtil; +import org.junit.Test; + +import javax.security.auth.login.AppConfigurationEntry; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +/** + * Tests for the {@link KerberosUtils}. + */ +public class KerberosUtilsTest { + + @Test + public void testTicketCacheEntry() { + AppConfigurationEntry entry = KerberosUtils.ticketCacheEntry(); + assertNotNull(entry); + } + + @Test + public void testKeytabEntry() { + String keytab = "user.keytab"; + String principal = "user"; + AppConfigurationEntry entry = KerberosUtils.keytabEntry(keytab, principal); + assertNotNull(entry); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityUtilsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityUtilsTest.java index 2648a7a..c5624f4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityUtilsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityUtilsTest.java @@ -18,97 +18,64 @@ package org.apache.flink.runtime.security; import org.apache.flink.configuration.Configuration; -import org.apache.flink.util.OperatingSystem; -import org.apache.hadoop.security.UserGroupInformation; +import org.apache.flink.runtime.security.modules.SecurityModule; import org.junit.AfterClass; -import org.junit.Assert; import org.junit.Test; -import java.lang.reflect.Method; +import java.util.Collections; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; +import static org.junit.Assert.*; /** * Tests for the {@link SecurityUtils}. */ public class SecurityUtilsTest { - @AfterClass - public static void afterClass() { - SecurityUtils.clearContext(); - System.setProperty(SecurityUtils.JAVA_SECURITY_AUTH_LOGIN_CONFIG, ""); - } + static class TestSecurityModule implements SecurityModule { + boolean installed; - @Test - public void testCreateInsecureHadoopCtx() { - SecurityUtils.SecurityConfiguration sc = new SecurityUtils.SecurityConfiguration(new Configuration()); - try { - SecurityUtils.install(sc); - assertEquals(UserGroupInformation.getLoginUser().getUserName(), getOSUserName()); - } catch (Exception e) { - fail(e.getMessage()); + @Override + public void install(SecurityUtils.SecurityConfiguration configuration) throws SecurityInstallException { + installed = true; } - } - @Test - public void testInvalidUGIContext() { - try { - new HadoopSecurityContext(null); - } catch (RuntimeException re) { - assertEquals("UGI passed cannot be null",re.getMessage()); + @Override + public void uninstall() throws SecurityInstallException { + installed = false; } } + @AfterClass + public static void afterClass() { + SecurityUtils.uninstall(); + } + @Test - /** - * The Jaas configuration file provided should not be overridden. - */ - public void testJaasPropertyOverride() throws Exception { - String confFile = "jaas.conf"; - System.setProperty(SecurityUtils.JAVA_SECURITY_AUTH_LOGIN_CONFIG, confFile); + public void testModuleInstall() throws Exception { + SecurityUtils.SecurityConfiguration sc = new SecurityUtils.SecurityConfiguration( + new Configuration(), new org.apache.hadoop.conf.Configuration(), + Collections.singletonList(TestSecurityModule.class)); - SecurityUtils.install(new SecurityUtils.SecurityConfiguration(new Configuration())); + SecurityUtils.install(sc); + assertEquals(1, SecurityUtils.getInstalledModules().size()); + TestSecurityModule testModule = (TestSecurityModule) SecurityUtils.getInstalledModules().get(0); + assertTrue(testModule.installed); - Assert.assertEquals( - confFile, - System.getProperty(SecurityUtils.JAVA_SECURITY_AUTH_LOGIN_CONFIG)); + SecurityUtils.uninstall(); + assertNull(SecurityUtils.getInstalledModules()); + assertFalse(testModule.installed); } + @Test + public void testSecurityContext() throws Exception { + SecurityUtils.SecurityConfiguration sc = new SecurityUtils.SecurityConfiguration( + new Configuration(), new org.apache.hadoop.conf.Configuration(), + Collections.singletonList(TestSecurityModule.class)); - private String getOSUserName() throws Exception { - String userName = ""; - OperatingSystem os = OperatingSystem.getCurrentOperatingSystem(); - String className; - String methodName; - - switch(os) { - case LINUX: - case MAC_OS: - className = "com.sun.security.auth.module.UnixSystem"; - methodName = "getUsername"; - break; - case WINDOWS: - className = "com.sun.security.auth.module.NTSystem"; - methodName = "getName"; - break; - case SOLARIS: - className = "com.sun.security.auth.module.SolarisSystem"; - methodName = "getUsername"; - break; - case FREE_BSD: - case UNKNOWN: - default: - className = null; - methodName = null; - } + SecurityUtils.install(sc); + assertEquals(HadoopSecurityContext.class, SecurityUtils.getInstalledContext().getClass()); - if( className != null ){ - Class<?> c = Class.forName( className ); - Method method = c.getDeclaredMethod( methodName ); - Object o = c.newInstance(); - userName = (String) method.invoke( o ); - } - return userName; + SecurityUtils.uninstall(); + assertEquals(NoOpSecurityContext.class, SecurityUtils.getInstalledContext().getClass()); } } http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java index de715c6..10450c3 100644 --- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java @@ -18,10 +18,10 @@ package org.apache.flink.test.util; -import org.apache.flink.configuration.ConfigConstants; + import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.GlobalConfiguration; -import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.SecurityOptions; import org.apache.flink.runtime.security.SecurityUtils; import org.apache.hadoop.minikdc.MiniKdc; import org.junit.rules.TemporaryFolder; @@ -30,10 +30,6 @@ import org.slf4j.LoggerFactory; import javax.annotation.Nullable; import java.io.File; -import java.io.FileWriter; -import java.io.BufferedWriter; -import java.io.PrintWriter; -import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.Properties; @@ -111,11 +107,11 @@ public class SecureTestEnvironment { //the context can be reinitialized with Hadoop configuration by calling //ctx.setHadoopConfiguration() for the UGI implementation to work properly. //See Yarn test case module for reference - createJaasConfig(baseDirForSecureRun); Configuration flinkConfig = GlobalConfiguration.loadConfiguration(); - flinkConfig.setString(ConfigConstants.SECURITY_KEYTAB_KEY, testKeytab); - flinkConfig.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY, testPrincipal); - flinkConfig.setBoolean(HighAvailabilityOptions.ZOOKEEPER_SASL_DISABLE, false); + flinkConfig.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB, testKeytab); + flinkConfig.setBoolean(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE, false); + flinkConfig.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, testPrincipal); + flinkConfig.setString(SecurityOptions.KERBEROS_LOGIN_CONTEXTS, "Client,KafkaClient"); SecurityUtils.SecurityConfiguration ctx = new SecurityUtils.SecurityConfiguration(flinkConfig); TestingSecurityContext.install(ctx, getClientSecurityConfigurationMap()); @@ -178,8 +174,8 @@ public class SecureTestEnvironment { conf = flinkConf; } - conf.setString(ConfigConstants.SECURITY_KEYTAB_KEY , testKeytab); - conf.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY , testPrincipal); + conf.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB , testKeytab); + conf.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL , testPrincipal); return conf; } @@ -190,22 +186,19 @@ public class SecureTestEnvironment { if(testZkServerPrincipal != null ) { TestingSecurityContext.ClientSecurityConfiguration zkServer = - new TestingSecurityContext.ClientSecurityConfiguration(testZkServerPrincipal, testKeytab, - "Server", "zk-server"); + new TestingSecurityContext.ClientSecurityConfiguration(testZkServerPrincipal, testKeytab); clientSecurityConfigurationMap.put("Server",zkServer); } if(testZkClientPrincipal != null ) { TestingSecurityContext.ClientSecurityConfiguration zkClient = - new TestingSecurityContext.ClientSecurityConfiguration(testZkClientPrincipal, testKeytab, - "Client", "zk-client"); + new TestingSecurityContext.ClientSecurityConfiguration(testZkClientPrincipal, testKeytab); clientSecurityConfigurationMap.put("Client",zkClient); } if(testKafkaServerPrincipal != null ) { TestingSecurityContext.ClientSecurityConfiguration kafkaServer = - new TestingSecurityContext.ClientSecurityConfiguration(testKafkaServerPrincipal, testKeytab, - "KafkaServer", "kafka-server"); + new TestingSecurityContext.ClientSecurityConfiguration(testKafkaServerPrincipal, testKeytab); clientSecurityConfigurationMap.put("KafkaServer",kafkaServer); } @@ -220,23 +213,4 @@ public class SecureTestEnvironment { return hadoopServicePrincipal; } - /* - * Helper method to create a temporary JAAS configuration file to get around the Kafka and ZK SASL - * implementation lookup java.security.auth.login.config - */ - private static void createJaasConfig(File baseDirForSecureRun) { - - try(FileWriter fw = new FileWriter(new File(baseDirForSecureRun, SecurityUtils.JAAS_CONF_FILENAME), true); - BufferedWriter bw = new BufferedWriter(fw); - PrintWriter out = new PrintWriter(bw)) - { - out.println("sample {"); - out.println("useKeyTab=false"); - out.println("useTicketCache=true;"); - out.println("};"); - } catch (IOException e) { - throw new RuntimeException("Exception occured while trying to create JAAS config.", e); - } - - } } http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestingJaasConfiguration.java ---------------------------------------------------------------------- diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestingJaasConfiguration.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestingJaasConfiguration.java deleted file mode 100644 index 25b2362..0000000 --- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestingJaasConfiguration.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.test.util; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.runtime.security.JaasConfiguration; -import org.apache.hadoop.security.authentication.util.KerberosUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.security.auth.login.AppConfigurationEntry; -import java.util.HashMap; -import java.util.Map; - -/** - * {@link TestingJaasConfiguration} for handling the integration test case since it requires to manage - * client principal as well as server principals of Hadoop/ZK which expects the host name to be populated - * in specific way (localhost vs 127.0.0.1). This provides an abstraction to handle more than one Login Module - * since the default {@link JaasConfiguration} behavior only supports global/unique principal identifier - */ - -@Internal -public class TestingJaasConfiguration extends JaasConfiguration { - - private static final Logger LOG = LoggerFactory.getLogger(TestingJaasConfiguration.class); - - public Map<String, TestingSecurityContext.ClientSecurityConfiguration> clientSecurityConfigurationMap; - - TestingJaasConfiguration(String keytab, String principal, Map<String, - TestingSecurityContext.ClientSecurityConfiguration> clientSecurityConfigurationMap) { - super(keytab, principal); - this.clientSecurityConfigurationMap = clientSecurityConfigurationMap; - } - - @Override - public AppConfigurationEntry[] getAppConfigurationEntry(String applicationName) { - - LOG.debug("In TestingJaasConfiguration - Application Requested: {}", applicationName); - - AppConfigurationEntry[] appConfigurationEntry = super.getAppConfigurationEntry(applicationName); - - if(clientSecurityConfigurationMap != null && clientSecurityConfigurationMap.size() > 0) { - - if(clientSecurityConfigurationMap.containsKey(applicationName)) { - - LOG.debug("In TestingJaasConfiguration - Application: {} found in the supplied context", applicationName); - - TestingSecurityContext.ClientSecurityConfiguration conf = clientSecurityConfigurationMap.get(applicationName); - - if(appConfigurationEntry != null && appConfigurationEntry.length > 0) { - - for(int count=0; count < appConfigurationEntry.length; count++) { - - AppConfigurationEntry ace = appConfigurationEntry[count]; - - if (ace.getOptions().containsKey("keyTab")) { - - String keyTab = conf.getKeytab(); - String principal = conf.getPrincipal(); - - LOG.debug("In TestingJaasConfiguration - Application: {} from the supplied context will " + - "use Client Specific Keytab: {} and Principal: {}", applicationName, keyTab, principal); - - Map<String, String> newKeytabKerberosOptions = new HashMap<>(); - newKeytabKerberosOptions.putAll(getKeytabKerberosOptions()); - - newKeytabKerberosOptions.put("keyTab", keyTab); - newKeytabKerberosOptions.put("principal", principal); - - AppConfigurationEntry keytabKerberosAce = new AppConfigurationEntry( - KerberosUtil.getKrb5LoginModuleName(), - AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, - newKeytabKerberosOptions); - appConfigurationEntry = new AppConfigurationEntry[] {keytabKerberosAce}; - - LOG.debug("---->Login Module is using Keytab based configuration<------"); - LOG.debug("Login Module Name: " + keytabKerberosAce.getLoginModuleName()); - LOG.debug("Control Flag: " + keytabKerberosAce.getControlFlag()); - LOG.debug("Options: " + keytabKerberosAce.getOptions()); - } - } - } - } - - } - - return appConfigurationEntry; - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestingSecurityContext.java ---------------------------------------------------------------------- diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestingSecurityContext.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestingSecurityContext.java index 4343013..ff1810b 100644 --- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestingSecurityContext.java +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestingSecurityContext.java @@ -19,10 +19,16 @@ package org.apache.flink.test.util; import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.security.DynamicConfiguration; +import org.apache.flink.runtime.security.KerberosUtils; import org.apache.flink.runtime.security.SecurityUtils; +import org.apache.flink.runtime.security.modules.JaasModule; +import javax.security.auth.login.AppConfigurationEntry; import java.util.Map; +import static org.apache.flink.util.Preconditions.checkArgument; + /* * Test security context to support handling both client and server principals in MiniKDC * This class is used only in integration test code for connectors like Kafka, HDFS etc., @@ -36,21 +42,20 @@ public class TestingSecurityContext { SecurityUtils.install(config); - // establish the JAAS config for Test environment - TestingJaasConfiguration jaasConfig = new TestingJaasConfiguration(config.getKeytab(), - config.getPrincipal(), clientSecurityConfigurationMap); - javax.security.auth.login.Configuration.setConfiguration(jaasConfig); + // install dynamic JAAS entries + checkArgument(config.getSecurityModules().contains(JaasModule.class)); + DynamicConfiguration jaasConf = (DynamicConfiguration) javax.security.auth.login.Configuration.getConfiguration(); + for(Map.Entry<String,ClientSecurityConfiguration> e : clientSecurityConfigurationMap.entrySet()) { + AppConfigurationEntry entry = KerberosUtils.keytabEntry(e.getValue().getKeytab(), e.getValue().getPrincipal()); + jaasConf.addAppConfigurationEntry(e.getKey(), entry); + } } public static class ClientSecurityConfiguration { - private String principal; - - private String keytab; + private final String principal; - private String moduleName; - - private String jaasServiceName; + private final String keytab; public String getPrincipal() { return principal; @@ -60,21 +65,10 @@ public class TestingSecurityContext { return keytab; } - public String getModuleName() { - return moduleName; - } - - public String getJaasServiceName() { - return jaasServiceName; - } - - public ClientSecurityConfiguration(String principal, String keytab, String moduleName, String jaasServiceName) { + public ClientSecurityConfiguration(String principal, String keytab) { this.principal = principal; this.keytab = keytab; - this.moduleName = moduleName; - this.jaasServiceName = jaasServiceName; } - } } http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOSecuredITCase.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOSecuredITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOSecuredITCase.java index 45fd8d0..d3558a9 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOSecuredITCase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOSecuredITCase.java @@ -18,8 +18,8 @@ package org.apache.flink.yarn; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.SecurityOptions; import org.apache.flink.runtime.security.SecurityUtils; import org.apache.flink.test.util.SecureTestEnvironment; import org.apache.flink.test.util.TestingSecurityContext; @@ -53,13 +53,13 @@ public class YARNSessionFIFOSecuredITCase extends YARNSessionFIFOITCase { SecureTestEnvironment.getTestKeytab()); Configuration flinkConfig = new Configuration(); - flinkConfig.setString(ConfigConstants.SECURITY_KEYTAB_KEY, + flinkConfig.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB, SecureTestEnvironment.getTestKeytab()); - flinkConfig.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY, + flinkConfig.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, SecureTestEnvironment.getHadoopServicePrincipal()); - SecurityUtils.SecurityConfiguration ctx = new SecurityUtils.SecurityConfiguration(flinkConfig); - ctx.setHadoopConfiguration(yarnConfiguration); + SecurityUtils.SecurityConfiguration ctx = new SecurityUtils.SecurityConfiguration(flinkConfig, + yarnConfiguration); try { TestingSecurityContext.install(ctx, SecureTestEnvironment.getClientSecurityConfigurationMap()); http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java index dc7cca3..ca8a0da 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java @@ -26,6 +26,7 @@ import org.apache.flink.client.cli.CommandLineOptions; import org.apache.flink.client.program.ClusterClient; import org.apache.flink.client.program.PackagedProgram; import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.SecurityOptions; import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.yarn.cli.FlinkYarnSessionCli; import org.apache.flink.test.util.TestBaseUtils; @@ -424,8 +425,8 @@ public abstract class YarnTestBase extends TestLogger { LOG.info("writing keytab: " + keytab + " and principal: " + principal + " to config file"); out.println(""); out.println("#Security Configurations Auto Populated "); - out.println(ConfigConstants.SECURITY_KEYTAB_KEY + ": " + keytab); - out.println(ConfigConstants.SECURITY_PRINCIPAL_KEY + ": " + principal); + out.println(SecurityOptions.KERBEROS_LOGIN_KEYTAB.key() + ": " + keytab); + out.println(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL.key() + ": " + principal); out.println(""); } catch (IOException e) { throw new RuntimeException("Exception occured while trying to append the security configurations.", e); http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java index ca18439..70f3222 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java @@ -22,11 +22,11 @@ import org.apache.flink.client.CliFrontend; import org.apache.flink.client.deployment.ClusterDescriptor; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.GlobalConfiguration; -import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.configuration.SecurityOptions; +import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; -import org.apache.flink.runtime.security.SecurityUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -404,6 +404,18 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor @Override public YarnClusterClient deploy() { try { + if(UserGroupInformation.isSecurityEnabled()) { + // note: UGI::hasKerberosCredentials inaccurately reports false + // for logins based on a keytab (fixed in Hadoop 2.6.1, see HADOOP-10786), + // so we check only in ticket cache scenario. + boolean useTicketCache = flinkConfiguration.getBoolean(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE); + UserGroupInformation loginUser = UserGroupInformation.getCurrentUser(); + if (useTicketCache && !loginUser.hasKerberosCredentials()) { + LOG.error("Hadoop security is enabled but the login user does not have Kerberos credentials"); + throw new RuntimeException("Hadoop security is enabled but the login user " + + "does not have Kerberos credentials"); + } + } return deployInternal(); } catch (Exception e) { throw new RuntimeException("Couldn't deploy Yarn cluster", e); @@ -583,12 +595,6 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor } } - //check if there is a JAAS config file - File jaasConfigFile = new File(configurationDirectory + File.separator + SecurityUtils.JAAS_CONF_FILENAME); - if (jaasConfigFile.exists() && jaasConfigFile.isFile()) { - effectiveShipFiles.add(jaasConfigFile); - } - addLibFolderToShipFiles(effectiveShipFiles); // add the user jar to the classpath of the to-be-created cluster @@ -730,7 +736,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor // setup security tokens LocalResource keytabResource = null; Path remotePathKeytab = null; - String keytab = flinkConfiguration.getString(ConfigConstants.SECURITY_KEYTAB_KEY, null); + String keytab = flinkConfiguration.getString(SecurityOptions.KERBEROS_LOGIN_KEYTAB); if(keytab != null) { LOG.info("Adding keytab {} to the AM container local resource bucket", keytab); keytabResource = Records.newRecord(LocalResource.class); @@ -773,7 +779,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor if(keytabResource != null) { appMasterEnv.put(YarnConfigKeys.KEYTAB_PATH, remotePathKeytab.toString() ); - String principal = flinkConfiguration.getString(ConfigConstants.SECURITY_PRINCIPAL_KEY, null); + String principal = flinkConfiguration.getString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL); appMasterEnv.put(YarnConfigKeys.KEYTAB_PRINCIPAL, principal ); } http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java index 1826d43..e4027d4 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java @@ -26,6 +26,7 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.SecurityOptions; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.clusterframework.BootstrapTools; import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; @@ -169,21 +170,27 @@ public class YarnApplicationMasterRunner { final Configuration flinkConfig = createConfiguration(currDir, dynamicProperties); if(keytabPath != null && remoteKeytabPrincipal != null) { - flinkConfig.setString(ConfigConstants.SECURITY_KEYTAB_KEY, keytabPath); - flinkConfig.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY, remoteKeytabPrincipal); + flinkConfig.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB, keytabPath); + flinkConfig.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, remoteKeytabPrincipal); } - SecurityUtils.SecurityConfiguration sc = new SecurityUtils.SecurityConfiguration(flinkConfig); + org.apache.hadoop.conf.Configuration hadoopConfiguration = null; //To support Yarn Secure Integration Test Scenario File krb5Conf = new File(currDir, Utils.KRB5_FILE_NAME); if(krb5Conf.exists() && krb5Conf.canRead()) { String krb5Path = krb5Conf.getAbsolutePath(); LOG.info("KRB5 Conf: {}", krb5Path); - org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration(); - conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos"); - conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, "true"); - sc.setHadoopConfiguration(conf); + hadoopConfiguration = new org.apache.hadoop.conf.Configuration(); + hadoopConfiguration.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos"); + hadoopConfiguration.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, "true"); + } + + SecurityUtils.SecurityConfiguration sc; + if(hadoopConfiguration != null) { + sc = new SecurityUtils.SecurityConfiguration(flinkConfig, hadoopConfiguration); + } else { + sc = new SecurityUtils.SecurityConfiguration(flinkConfig); } SecurityUtils.install(sc); @@ -256,8 +263,8 @@ public class YarnApplicationMasterRunner { LOG.info("keytabPath: {}", keytabPath); } if(keytabPath != null && remoteKeytabPrincipal != null) { - config.setString(ConfigConstants.SECURITY_KEYTAB_KEY, keytabPath); - config.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY, remoteKeytabPrincipal); + config.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB, keytabPath); + config.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, remoteKeytabPrincipal); } // Hadoop/Yarn configuration (loads config data automatically from classpath files) http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java index 015eb1b..059f1aa 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java @@ -25,6 +25,7 @@ import java.util.concurrent.Callable; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.SecurityOptions; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.security.SecurityUtils; import org.apache.flink.runtime.taskmanager.TaskManager; @@ -111,22 +112,28 @@ public class YarnTaskManagerRunner { try { - SecurityUtils.SecurityConfiguration sc = new SecurityUtils.SecurityConfiguration(configuration); + org.apache.hadoop.conf.Configuration hadoopConfiguration = null; //To support Yarn Secure Integration Test Scenario File krb5Conf = new File(currDir, Utils.KRB5_FILE_NAME); if(krb5Conf.exists() && krb5Conf.canRead()) { String krb5Path = krb5Conf.getAbsolutePath(); LOG.info("KRB5 Conf: {}", krb5Path); - org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration(); - conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos"); - conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, "true"); - sc.setHadoopConfiguration(conf); + hadoopConfiguration = new org.apache.hadoop.conf.Configuration(); + hadoopConfiguration.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos"); + hadoopConfiguration.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, "true"); + } + + SecurityUtils.SecurityConfiguration sc; + if(hadoopConfiguration != null) { + sc = new SecurityUtils.SecurityConfiguration(configuration, hadoopConfiguration); + } else { + sc = new SecurityUtils.SecurityConfiguration(configuration); } if(keytabPath != null && remoteKeytabPrincipal != null) { - configuration.setString(ConfigConstants.SECURITY_KEYTAB_KEY, keytabPath); - configuration.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY, remoteKeytabPrincipal); + configuration.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB, keytabPath); + configuration.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, remoteKeytabPrincipal); } SecurityUtils.install(sc);
