http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-runtime/src/main/resources/flink-jaas.conf
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/resources/flink-jaas.conf 
b/flink-runtime/src/main/resources/flink-jaas.conf
index 7f0f06b..d287ff4 100644
--- a/flink-runtime/src/main/resources/flink-jaas.conf
+++ b/flink-runtime/src/main/resources/flink-jaas.conf
@@ -1,3 +1,4 @@
+/**
 
################################################################################
 #  Licensed to the Apache Software Foundation (ASF) under one
 #  or more contributor license agreements.  See the NOTICE file
@@ -17,10 +18,6 @@
 
################################################################################
 # We are using this file as an workaround for the Kafka and ZK SASL 
implementation
 # since they explicitly look for java.security.auth.login.config property
-# The file itself is not used by the application since the internal 
implementation
-# uses a process-wide in-memory java security configuration object.
 # Please do not edit/delete this file - See FLINK-3929
-sample {
-  useKeyTab=false
-  useTicketCache=true;
-}
+**/
+

http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/KeytabOverlayTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/KeytabOverlayTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/KeytabOverlayTest.java
index 0570f28..1847ec4 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/KeytabOverlayTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/overlays/KeytabOverlayTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.clusterframework.overlays;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.clusterframework.ContainerSpecification;
 import org.junit.Rule;
@@ -46,7 +47,7 @@ public class KeytabOverlayTest extends 
ContainerOverlayTestBase {
                ContainerSpecification spec = new ContainerSpecification();
                overlay.configure(spec);
 
-               assertEquals(TARGET_PATH.getPath(), 
spec.getDynamicConfiguration().getString(ConfigConstants.SECURITY_KEYTAB_KEY, 
null));
+               assertEquals(TARGET_PATH.getPath(), 
spec.getDynamicConfiguration().getString(SecurityOptions.KERBEROS_LOGIN_KEYTAB));
                checkArtifact(spec, TARGET_PATH);
        }
 
@@ -64,7 +65,7 @@ public class KeytabOverlayTest extends 
ContainerOverlayTestBase {
                final Configuration conf = new Configuration();
                File keytab = tempFolder.newFile();
 
-               conf.setString(ConfigConstants.SECURITY_KEYTAB_KEY, 
keytab.getAbsolutePath());
+               conf.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB, 
keytab.getAbsolutePath());
                KeytabOverlay.Builder builder = 
KeytabOverlay.newBuilder().fromEnvironment(conf);
                assertEquals(builder.keytabPath, keytab);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-runtime/src/test/java/org/apache/flink/runtime/security/JaasConfigurationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/security/JaasConfigurationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/JaasConfigurationTest.java
deleted file mode 100644
index 89e5ef9..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/security/JaasConfigurationTest.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.security;
-
-import org.apache.hadoop.security.authentication.util.KerberosUtil;
-import org.junit.Test;
-
-import javax.security.auth.login.AppConfigurationEntry;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * Tests for the {@link JaasConfiguration}.
- */
-public class JaasConfigurationTest {
-
-       @Test
-       public void testInvalidKerberosParams() {
-               String keytab = "user.keytab";
-               String principal = null;
-               try {
-                       new JaasConfiguration(keytab, principal);
-               } catch(RuntimeException re) {
-                       assertEquals("Both keytab and principal are required 
and cannot be empty",re.getMessage());
-               }
-       }
-
-       @Test
-       public void testDefaultAceEntry() {
-               JaasConfiguration conf = new JaasConfiguration(null,null);
-               javax.security.auth.login.Configuration.setConfiguration(conf);
-               final AppConfigurationEntry[] entry = 
conf.getAppConfigurationEntry("test");
-               AppConfigurationEntry ace = entry[0];
-               assertEquals(ace.getLoginModuleName(), 
KerberosUtil.getKrb5LoginModuleName());
-       }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-runtime/src/test/java/org/apache/flink/runtime/security/KerberosUtilsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/security/KerberosUtilsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/KerberosUtilsTest.java
new file mode 100644
index 0000000..4c899e8
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/KerberosUtilsTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security;
+
+import org.apache.hadoop.security.authentication.util.KerberosUtil;
+import org.junit.Test;
+
+import javax.security.auth.login.AppConfigurationEntry;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * Tests for the {@link KerberosUtils}.
+ */
+public class KerberosUtilsTest {
+
+       @Test
+       public void testTicketCacheEntry() {
+               AppConfigurationEntry entry = KerberosUtils.ticketCacheEntry();
+               assertNotNull(entry);
+       }
+
+       @Test
+       public void testKeytabEntry() {
+               String keytab = "user.keytab";
+               String principal = "user";
+               AppConfigurationEntry entry = KerberosUtils.keytabEntry(keytab, 
principal);
+               assertNotNull(entry);
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityUtilsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityUtilsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityUtilsTest.java
index 2648a7a..c5624f4 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityUtilsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityUtilsTest.java
@@ -18,97 +18,64 @@
 package org.apache.flink.runtime.security;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.util.OperatingSystem;
-import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.flink.runtime.security.modules.SecurityModule;
 import org.junit.AfterClass;
-import org.junit.Assert;
 import org.junit.Test;
 
-import java.lang.reflect.Method;
+import java.util.Collections;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
+import static org.junit.Assert.*;
 
 /**
  * Tests for the {@link SecurityUtils}.
  */
 public class SecurityUtilsTest {
 
-       @AfterClass
-       public static void afterClass() {
-               SecurityUtils.clearContext();
-               
System.setProperty(SecurityUtils.JAVA_SECURITY_AUTH_LOGIN_CONFIG, "");
-       }
+       static class TestSecurityModule implements SecurityModule {
+               boolean installed;
 
-       @Test
-       public void testCreateInsecureHadoopCtx() {
-               SecurityUtils.SecurityConfiguration sc = new 
SecurityUtils.SecurityConfiguration(new Configuration());
-               try {
-                       SecurityUtils.install(sc);
-                       
assertEquals(UserGroupInformation.getLoginUser().getUserName(), 
getOSUserName());
-               } catch (Exception e) {
-                       fail(e.getMessage());
+               @Override
+               public void install(SecurityUtils.SecurityConfiguration 
configuration) throws SecurityInstallException {
+                       installed = true;
                }
-       }
 
-       @Test
-       public void testInvalidUGIContext() {
-               try {
-                       new HadoopSecurityContext(null);
-               } catch (RuntimeException re) {
-                       assertEquals("UGI passed cannot be 
null",re.getMessage());
+               @Override
+               public void uninstall() throws SecurityInstallException {
+                       installed = false;
                }
        }
 
+       @AfterClass
+       public static void afterClass() {
+               SecurityUtils.uninstall();
+       }
+
        @Test
-       /**
-        * The Jaas configuration file provided should not be overridden.
-        */
-       public void testJaasPropertyOverride() throws Exception {
-               String confFile = "jaas.conf";
-               
System.setProperty(SecurityUtils.JAVA_SECURITY_AUTH_LOGIN_CONFIG, confFile);
+       public void testModuleInstall() throws Exception {
+               SecurityUtils.SecurityConfiguration sc = new 
SecurityUtils.SecurityConfiguration(
+                       new Configuration(), new 
org.apache.hadoop.conf.Configuration(),
+                       Collections.singletonList(TestSecurityModule.class));
 
-               SecurityUtils.install(new 
SecurityUtils.SecurityConfiguration(new Configuration()));
+               SecurityUtils.install(sc);
+               assertEquals(1, SecurityUtils.getInstalledModules().size());
+               TestSecurityModule testModule = (TestSecurityModule) 
SecurityUtils.getInstalledModules().get(0);
+               assertTrue(testModule.installed);
 
-               Assert.assertEquals(
-                       confFile,
-                       
System.getProperty(SecurityUtils.JAVA_SECURITY_AUTH_LOGIN_CONFIG));
+               SecurityUtils.uninstall();
+               assertNull(SecurityUtils.getInstalledModules());
+               assertFalse(testModule.installed);
        }
 
+       @Test
+       public void testSecurityContext() throws Exception {
+               SecurityUtils.SecurityConfiguration sc = new 
SecurityUtils.SecurityConfiguration(
+                       new Configuration(), new 
org.apache.hadoop.conf.Configuration(),
+                       Collections.singletonList(TestSecurityModule.class));
 
-       private String getOSUserName() throws Exception {
-               String userName = "";
-               OperatingSystem os = 
OperatingSystem.getCurrentOperatingSystem();
-               String className;
-               String methodName;
-
-               switch(os) {
-                       case LINUX:
-                       case MAC_OS:
-                               className = 
"com.sun.security.auth.module.UnixSystem";
-                               methodName = "getUsername";
-                               break;
-                       case WINDOWS:
-                               className = 
"com.sun.security.auth.module.NTSystem";
-                               methodName = "getName";
-                               break;
-                       case SOLARIS:
-                               className = 
"com.sun.security.auth.module.SolarisSystem";
-                               methodName = "getUsername";
-                               break;
-                       case FREE_BSD:
-                       case UNKNOWN:
-                       default:
-                               className = null;
-                               methodName = null;
-               }
+               SecurityUtils.install(sc);
+               assertEquals(HadoopSecurityContext.class, 
SecurityUtils.getInstalledContext().getClass());
 
-               if( className != null ){
-                       Class<?> c = Class.forName( className );
-                       Method method = c.getDeclaredMethod( methodName );
-                       Object o = c.newInstance();
-                       userName = (String) method.invoke( o );
-               }
-               return userName;
+               SecurityUtils.uninstall();
+               assertEquals(NoOpSecurityContext.class, 
SecurityUtils.getInstalledContext().getClass());
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java
 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java
index de715c6..10450c3 100644
--- 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java
+++ 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java
@@ -18,10 +18,10 @@
 
 package org.apache.flink.test.util;
 
-import org.apache.flink.configuration.ConfigConstants;
+
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
-import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.hadoop.minikdc.MiniKdc;
 import org.junit.rules.TemporaryFolder;
@@ -30,10 +30,6 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 import java.io.File;
-import java.io.FileWriter;
-import java.io.BufferedWriter;
-import java.io.PrintWriter;
-import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
@@ -111,11 +107,11 @@ public class SecureTestEnvironment {
                        //the context can be reinitialized with Hadoop 
configuration by calling
                        //ctx.setHadoopConfiguration() for the UGI 
implementation to work properly.
                        //See Yarn test case module for reference
-                       createJaasConfig(baseDirForSecureRun);
                        Configuration flinkConfig = 
GlobalConfiguration.loadConfiguration();
-                       
flinkConfig.setString(ConfigConstants.SECURITY_KEYTAB_KEY, testKeytab);
-                       
flinkConfig.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY, testPrincipal);
-                       
flinkConfig.setBoolean(HighAvailabilityOptions.ZOOKEEPER_SASL_DISABLE, false);
+                       
flinkConfig.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB, testKeytab);
+                       
flinkConfig.setBoolean(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE, false);
+                       
flinkConfig.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, testPrincipal);
+                       
flinkConfig.setString(SecurityOptions.KERBEROS_LOGIN_CONTEXTS, 
"Client,KafkaClient");
                        SecurityUtils.SecurityConfiguration ctx = new 
SecurityUtils.SecurityConfiguration(flinkConfig);
                        TestingSecurityContext.install(ctx, 
getClientSecurityConfigurationMap());
 
@@ -178,8 +174,8 @@ public class SecureTestEnvironment {
                        conf = flinkConf;
                }
 
-               conf.setString(ConfigConstants.SECURITY_KEYTAB_KEY , 
testKeytab);
-               conf.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY , 
testPrincipal);
+               conf.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB , 
testKeytab);
+               conf.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL , 
testPrincipal);
 
                return conf;
        }
@@ -190,22 +186,19 @@ public class SecureTestEnvironment {
 
                if(testZkServerPrincipal != null ) {
                        TestingSecurityContext.ClientSecurityConfiguration 
zkServer =
-                                       new 
TestingSecurityContext.ClientSecurityConfiguration(testZkServerPrincipal, 
testKeytab,
-                                                       "Server", "zk-server");
+                                       new 
TestingSecurityContext.ClientSecurityConfiguration(testZkServerPrincipal, 
testKeytab);
                        clientSecurityConfigurationMap.put("Server",zkServer);
                }
 
                if(testZkClientPrincipal != null ) {
                        TestingSecurityContext.ClientSecurityConfiguration 
zkClient =
-                                       new 
TestingSecurityContext.ClientSecurityConfiguration(testZkClientPrincipal, 
testKeytab,
-                                                       "Client", "zk-client");
+                                       new 
TestingSecurityContext.ClientSecurityConfiguration(testZkClientPrincipal, 
testKeytab);
                        clientSecurityConfigurationMap.put("Client",zkClient);
                }
 
                if(testKafkaServerPrincipal != null ) {
                        TestingSecurityContext.ClientSecurityConfiguration 
kafkaServer =
-                                       new 
TestingSecurityContext.ClientSecurityConfiguration(testKafkaServerPrincipal, 
testKeytab,
-                                                       "KafkaServer", 
"kafka-server");
+                                       new 
TestingSecurityContext.ClientSecurityConfiguration(testKafkaServerPrincipal, 
testKeytab);
                        
clientSecurityConfigurationMap.put("KafkaServer",kafkaServer);
                }
 
@@ -220,23 +213,4 @@ public class SecureTestEnvironment {
                return hadoopServicePrincipal;
        }
 
-       /*
-        * Helper method to create a temporary JAAS configuration file to get 
around the Kafka and ZK SASL
-        * implementation lookup java.security.auth.login.config
-        */
-       private static void  createJaasConfig(File baseDirForSecureRun) {
-
-               try(FileWriter fw = new FileWriter(new 
File(baseDirForSecureRun, SecurityUtils.JAAS_CONF_FILENAME), true);
-                       BufferedWriter bw = new BufferedWriter(fw);
-                       PrintWriter out = new PrintWriter(bw))
-               {
-                       out.println("sample {");
-                       out.println("useKeyTab=false");
-                       out.println("useTicketCache=true;");
-                       out.println("};");
-               } catch (IOException e) {
-                       throw new RuntimeException("Exception occured while 
trying to create JAAS config.", e);
-               }
-
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestingJaasConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestingJaasConfiguration.java
 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestingJaasConfiguration.java
deleted file mode 100644
index 25b2362..0000000
--- 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestingJaasConfiguration.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.util;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.runtime.security.JaasConfiguration;
-import org.apache.hadoop.security.authentication.util.KerberosUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.security.auth.login.AppConfigurationEntry;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * {@link TestingJaasConfiguration} for handling the integration test case 
since it requires to manage
- * client principal as well as server principals of Hadoop/ZK which expects 
the host name to be populated
- * in specific way (localhost vs 127.0.0.1). This provides an abstraction to 
handle more than one Login Module
- * since the default {@link JaasConfiguration} behavior only supports 
global/unique principal identifier
- */
-
-@Internal
-public class TestingJaasConfiguration extends JaasConfiguration {
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(TestingJaasConfiguration.class);
-
-       public Map<String, TestingSecurityContext.ClientSecurityConfiguration> 
clientSecurityConfigurationMap;
-
-       TestingJaasConfiguration(String keytab, String principal, Map<String,
-                       TestingSecurityContext.ClientSecurityConfiguration> 
clientSecurityConfigurationMap) {
-               super(keytab, principal);
-               this.clientSecurityConfigurationMap = 
clientSecurityConfigurationMap;
-       }
-
-       @Override
-       public AppConfigurationEntry[] getAppConfigurationEntry(String 
applicationName) {
-
-               LOG.debug("In TestingJaasConfiguration - Application Requested: 
{}", applicationName);
-
-               AppConfigurationEntry[] appConfigurationEntry = 
super.getAppConfigurationEntry(applicationName);
-
-               if(clientSecurityConfigurationMap != null && 
clientSecurityConfigurationMap.size() > 0) {
-
-                       
if(clientSecurityConfigurationMap.containsKey(applicationName)) {
-
-                               LOG.debug("In TestingJaasConfiguration - 
Application: {} found in the supplied context", applicationName);
-
-                               
TestingSecurityContext.ClientSecurityConfiguration conf = 
clientSecurityConfigurationMap.get(applicationName);
-
-                               if(appConfigurationEntry != null && 
appConfigurationEntry.length > 0) {
-
-                                       for(int count=0; count < 
appConfigurationEntry.length; count++) {
-
-                                               AppConfigurationEntry ace = 
appConfigurationEntry[count];
-
-                                               if 
(ace.getOptions().containsKey("keyTab")) {
-
-                                                       String keyTab = 
conf.getKeytab();
-                                                       String principal = 
conf.getPrincipal();
-
-                                                       LOG.debug("In 
TestingJaasConfiguration - Application: {} from the supplied context will " +
-                                                                       "use 
Client Specific Keytab: {} and Principal: {}", applicationName, keyTab, 
principal);
-
-                                                       Map<String, String> 
newKeytabKerberosOptions = new HashMap<>();
-                                                       
newKeytabKerberosOptions.putAll(getKeytabKerberosOptions());
-
-                                                       
newKeytabKerberosOptions.put("keyTab", keyTab);
-                                                       
newKeytabKerberosOptions.put("principal", principal);
-
-                                                       AppConfigurationEntry 
keytabKerberosAce = new AppConfigurationEntry(
-                                                                       
KerberosUtil.getKrb5LoginModuleName(),
-                                                                       
AppConfigurationEntry.LoginModuleControlFlag.REQUIRED,
-                                                                       
newKeytabKerberosOptions);
-                                                       appConfigurationEntry = 
new AppConfigurationEntry[] {keytabKerberosAce};
-
-                                                       LOG.debug("---->Login 
Module is using Keytab based configuration<------");
-                                                       LOG.debug("Login Module 
Name: " + keytabKerberosAce.getLoginModuleName());
-                                                       LOG.debug("Control 
Flag: " + keytabKerberosAce.getControlFlag());
-                                                       LOG.debug("Options: " + 
keytabKerberosAce.getOptions());
-                                               }
-                                       }
-                               }
-                       }
-
-               }
-
-               return appConfigurationEntry;
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestingSecurityContext.java
----------------------------------------------------------------------
diff --git 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestingSecurityContext.java
 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestingSecurityContext.java
index 4343013..ff1810b 100644
--- 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestingSecurityContext.java
+++ 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestingSecurityContext.java
@@ -19,10 +19,16 @@
 package org.apache.flink.test.util;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.security.DynamicConfiguration;
+import org.apache.flink.runtime.security.KerberosUtils;
 import org.apache.flink.runtime.security.SecurityUtils;
+import org.apache.flink.runtime.security.modules.JaasModule;
 
+import javax.security.auth.login.AppConfigurationEntry;
 import java.util.Map;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+
 /*
  * Test security context to support handling both client and server principals 
in MiniKDC
  * This class is used only in integration test code for connectors like Kafka, 
HDFS etc.,
@@ -36,21 +42,20 @@ public class TestingSecurityContext {
 
                SecurityUtils.install(config);
 
-               // establish the JAAS config for Test environment
-               TestingJaasConfiguration jaasConfig = new 
TestingJaasConfiguration(config.getKeytab(),
-                               config.getPrincipal(), 
clientSecurityConfigurationMap);
-               
javax.security.auth.login.Configuration.setConfiguration(jaasConfig);
+               // install dynamic JAAS entries
+               
checkArgument(config.getSecurityModules().contains(JaasModule.class));
+               DynamicConfiguration jaasConf = (DynamicConfiguration) 
javax.security.auth.login.Configuration.getConfiguration();
+               for(Map.Entry<String,ClientSecurityConfiguration> e : 
clientSecurityConfigurationMap.entrySet()) {
+                       AppConfigurationEntry entry = 
KerberosUtils.keytabEntry(e.getValue().getKeytab(), 
e.getValue().getPrincipal());
+                       jaasConf.addAppConfigurationEntry(e.getKey(), entry);
+               }
        }
 
        public static class ClientSecurityConfiguration {
 
-               private String principal;
-
-               private String keytab;
+               private final String principal;
 
-               private String moduleName;
-
-               private String jaasServiceName;
+               private final String keytab;
 
                public String getPrincipal() {
                        return principal;
@@ -60,21 +65,10 @@ public class TestingSecurityContext {
                        return keytab;
                }
 
-               public String getModuleName() {
-                       return moduleName;
-               }
-
-               public String getJaasServiceName() {
-                       return jaasServiceName;
-               }
-
-               public ClientSecurityConfiguration(String principal, String 
keytab, String moduleName, String jaasServiceName) {
+               public ClientSecurityConfiguration(String principal, String 
keytab) {
                        this.principal = principal;
                        this.keytab = keytab;
-                       this.moduleName = moduleName;
-                       this.jaasServiceName = jaasServiceName;
                }
-
        }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOSecuredITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOSecuredITCase.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOSecuredITCase.java
index 45fd8d0..d3558a9 100644
--- 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOSecuredITCase.java
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOSecuredITCase.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.yarn;
 
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.flink.test.util.SecureTestEnvironment;
 import org.apache.flink.test.util.TestingSecurityContext;
@@ -53,13 +53,13 @@ public class YARNSessionFIFOSecuredITCase extends 
YARNSessionFIFOITCase {
                                SecureTestEnvironment.getTestKeytab());
 
                Configuration flinkConfig = new Configuration();
-               flinkConfig.setString(ConfigConstants.SECURITY_KEYTAB_KEY,
+               flinkConfig.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB,
                                SecureTestEnvironment.getTestKeytab());
-               flinkConfig.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY,
+               flinkConfig.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL,
                                
SecureTestEnvironment.getHadoopServicePrincipal());
 
-               SecurityUtils.SecurityConfiguration ctx = new 
SecurityUtils.SecurityConfiguration(flinkConfig);
-               ctx.setHadoopConfiguration(yarnConfiguration);
+               SecurityUtils.SecurityConfiguration ctx = new 
SecurityUtils.SecurityConfiguration(flinkConfig,
+                               yarnConfiguration);
                try {
                        TestingSecurityContext.install(ctx, 
SecureTestEnvironment.getClientSecurityConfigurationMap());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
index dc7cca3..ca8a0da 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
@@ -26,6 +26,7 @@ import org.apache.flink.client.cli.CommandLineOptions;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
 import org.apache.flink.test.util.TestBaseUtils;
@@ -424,8 +425,8 @@ public abstract class YarnTestBase extends TestLogger {
                                        LOG.info("writing keytab: " + keytab + 
" and principal: " + principal + " to config file");
                                        out.println("");
                                        out.println("#Security Configurations 
Auto Populated ");
-                                       
out.println(ConfigConstants.SECURITY_KEYTAB_KEY + ": " + keytab);
-                                       
out.println(ConfigConstants.SECURITY_PRINCIPAL_KEY + ": " + principal);
+                                       
out.println(SecurityOptions.KERBEROS_LOGIN_KEYTAB.key() + ": " + keytab);
+                                       
out.println(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL.key() + ": " + principal);
                                        out.println("");
                                } catch (IOException e) {
                                        throw new RuntimeException("Exception 
occured while trying to append the security configurations.", e);

http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index ca18439..70f3222 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -22,11 +22,11 @@ import org.apache.flink.client.CliFrontend;
 import org.apache.flink.client.deployment.ClusterDescriptor;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.GlobalConfiguration;
-import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.SecurityOptions;
+import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
-import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -404,6 +404,18 @@ public abstract class AbstractYarnClusterDescriptor 
implements ClusterDescriptor
        @Override
        public YarnClusterClient deploy() {
                try {
+                       if(UserGroupInformation.isSecurityEnabled()) {
+                               // note: UGI::hasKerberosCredentials 
inaccurately reports false
+                               // for logins based on a keytab (fixed in 
Hadoop 2.6.1, see HADOOP-10786),
+                               // so we check only in ticket cache scenario.
+                               boolean useTicketCache = 
flinkConfiguration.getBoolean(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE);
+                               UserGroupInformation loginUser = 
UserGroupInformation.getCurrentUser();
+                               if (useTicketCache && 
!loginUser.hasKerberosCredentials()) {
+                                       LOG.error("Hadoop security is enabled 
but the login user does not have Kerberos credentials");
+                                       throw new RuntimeException("Hadoop 
security is enabled but the login user " +
+                                                       "does not have Kerberos 
credentials");
+                               }
+                       }
                        return deployInternal();
                } catch (Exception e) {
                        throw new RuntimeException("Couldn't deploy Yarn 
cluster", e);
@@ -583,12 +595,6 @@ public abstract class AbstractYarnClusterDescriptor 
implements ClusterDescriptor
                        }
                }
 
-               //check if there is a JAAS config file
-               File jaasConfigFile = new File(configurationDirectory + 
File.separator + SecurityUtils.JAAS_CONF_FILENAME);
-               if (jaasConfigFile.exists() && jaasConfigFile.isFile()) {
-                       effectiveShipFiles.add(jaasConfigFile);
-               }
-
                addLibFolderToShipFiles(effectiveShipFiles);
 
                // add the user jar to the classpath of the to-be-created 
cluster
@@ -730,7 +736,7 @@ public abstract class AbstractYarnClusterDescriptor 
implements ClusterDescriptor
                // setup security tokens
                LocalResource keytabResource = null;
                Path remotePathKeytab = null;
-               String keytab = 
flinkConfiguration.getString(ConfigConstants.SECURITY_KEYTAB_KEY, null);
+               String keytab = 
flinkConfiguration.getString(SecurityOptions.KERBEROS_LOGIN_KEYTAB);
                if(keytab != null) {
                        LOG.info("Adding keytab {} to the AM container local 
resource bucket", keytab);
                        keytabResource = Records.newRecord(LocalResource.class);
@@ -773,7 +779,7 @@ public abstract class AbstractYarnClusterDescriptor 
implements ClusterDescriptor
 
                if(keytabResource != null) {
                        appMasterEnv.put(YarnConfigKeys.KEYTAB_PATH, 
remotePathKeytab.toString() );
-                       String principal = 
flinkConfiguration.getString(ConfigConstants.SECURITY_PRINCIPAL_KEY, null);
+                       String principal = 
flinkConfiguration.getString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL);
                        appMasterEnv.put(YarnConfigKeys.KEYTAB_PRINCIPAL, 
principal );
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index 1826d43..e4027d4 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -26,6 +26,7 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
@@ -169,21 +170,27 @@ public class YarnApplicationMasterRunner {
 
                        final Configuration flinkConfig = 
createConfiguration(currDir, dynamicProperties);
                        if(keytabPath != null && remoteKeytabPrincipal != null) 
{
-                               
flinkConfig.setString(ConfigConstants.SECURITY_KEYTAB_KEY, keytabPath);
-                               
flinkConfig.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY, 
remoteKeytabPrincipal);
+                               
flinkConfig.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB, keytabPath);
+                               
flinkConfig.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, 
remoteKeytabPrincipal);
                        }
 
-                       SecurityUtils.SecurityConfiguration sc = new 
SecurityUtils.SecurityConfiguration(flinkConfig);
+                       org.apache.hadoop.conf.Configuration 
hadoopConfiguration = null;
 
                        //To support Yarn Secure Integration Test Scenario
                        File krb5Conf = new File(currDir, Utils.KRB5_FILE_NAME);
                        if(krb5Conf.exists() && krb5Conf.canRead()) {
                                String krb5Path = krb5Conf.getAbsolutePath();
                                LOG.info("KRB5 Conf: {}", krb5Path);
-                               org.apache.hadoop.conf.Configuration conf = new 
org.apache.hadoop.conf.Configuration();
-                               
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, 
"kerberos");
-                               
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, "true");
-                               sc.setHadoopConfiguration(conf);
+                               hadoopConfiguration = new 
org.apache.hadoop.conf.Configuration();
+                               
hadoopConfiguration.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
 "kerberos");
+                               
hadoopConfiguration.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION,
 "true");
+                       }
+
+                       SecurityUtils.SecurityConfiguration sc;
+                       if(hadoopConfiguration != null) {
+                               sc = new 
SecurityUtils.SecurityConfiguration(flinkConfig, hadoopConfiguration);
+                       } else {
+                               sc = new 
SecurityUtils.SecurityConfiguration(flinkConfig);
                        }
 
                        SecurityUtils.install(sc);
@@ -256,8 +263,8 @@ public class YarnApplicationMasterRunner {
                                LOG.info("keytabPath: {}", keytabPath);
                        }
                        if(keytabPath != null && remoteKeytabPrincipal != null) 
{
-                               
config.setString(ConfigConstants.SECURITY_KEYTAB_KEY, keytabPath);
-                               
config.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY, remoteKeytabPrincipal);
+                               
config.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB, keytabPath);
+                               
config.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, 
remoteKeytabPrincipal);
                        }
 
                        // Hadoop/Yarn configuration (loads config data 
automatically from classpath files)

http://git-wip-us.apache.org/repos/asf/flink/blob/00193f7e/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
index 015eb1b..059f1aa 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
@@ -25,6 +25,7 @@ import java.util.concurrent.Callable;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.flink.runtime.taskmanager.TaskManager;
@@ -111,22 +112,28 @@ public class YarnTaskManagerRunner {
 
                try {
 
-                       SecurityUtils.SecurityConfiguration sc = new 
SecurityUtils.SecurityConfiguration(configuration);
+                       org.apache.hadoop.conf.Configuration 
hadoopConfiguration = null;
 
                        //To support Yarn Secure Integration Test Scenario
                        File krb5Conf = new File(currDir, Utils.KRB5_FILE_NAME);
                        if(krb5Conf.exists() && krb5Conf.canRead()) {
                                String krb5Path = krb5Conf.getAbsolutePath();
                                LOG.info("KRB5 Conf: {}", krb5Path);
-                               org.apache.hadoop.conf.Configuration conf = new 
org.apache.hadoop.conf.Configuration();
-                               
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, 
"kerberos");
-                               
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, "true");
-                               sc.setHadoopConfiguration(conf);
+                               hadoopConfiguration = new 
org.apache.hadoop.conf.Configuration();
+                               
hadoopConfiguration.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
 "kerberos");
+                               
hadoopConfiguration.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION,
 "true");
+                       }
+
+                       SecurityUtils.SecurityConfiguration sc;
+                       if(hadoopConfiguration != null) {
+                               sc = new 
SecurityUtils.SecurityConfiguration(configuration, hadoopConfiguration);
+                       } else {
+                               sc = new 
SecurityUtils.SecurityConfiguration(configuration);
                        }
 
                        if(keytabPath != null && remoteKeytabPrincipal != null) 
{
-                               
configuration.setString(ConfigConstants.SECURITY_KEYTAB_KEY, keytabPath);
-                               
configuration.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY, 
remoteKeytabPrincipal);
+                               
configuration.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB, keytabPath);
+                               
configuration.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, 
remoteKeytabPrincipal);
                        }
 
                        SecurityUtils.install(sc);

Reply via email to