http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index f4c2032..848013c 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.runtime.security.SecurityContext;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -59,7 +60,6 @@ import java.io.IOException;
 import java.io.PrintStream;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
-import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -341,26 +341,8 @@ public abstract class AbstractYarnClusterDescriptor 
implements ClusterDescriptor
 
        @Override
        public YarnClusterClient deploy() {
-
                try {
-
-                       UserGroupInformation.setConfiguration(conf);
-                       UserGroupInformation ugi = 
UserGroupInformation.getCurrentUser();
-
-                       if (UserGroupInformation.isSecurityEnabled()) {
-                               if (!ugi.hasKerberosCredentials()) {
-                                       throw new YarnDeploymentException("In 
secure mode. Please provide Kerberos credentials in order to authenticate. " +
-                                               "You may use kinit to 
authenticate and request a TGT from the Kerberos server.");
-                               }
-                               return ugi.doAs(new 
PrivilegedExceptionAction<YarnClusterClient>() {
-                                       @Override
-                                       public YarnClusterClient run() throws 
Exception {
-                                               return deployInternal();
-                                       }
-                               });
-                       } else {
-                               return deployInternal();
-                       }
+                       return deployInternal();
                } catch (Exception e) {
                        throw new RuntimeException("Couldn't deploy Yarn 
cluster", e);
                }
@@ -539,9 +521,13 @@ public abstract class AbstractYarnClusterDescriptor 
implements ClusterDescriptor
                        }
                }
 
-               addLibFolderToShipFiles(effectiveShipFiles);
+               //check if there is a JAAS config file
+               File jaasConfigFile = new File(configurationDirectory + 
File.separator + SecurityContext.JAAS_CONF_FILENAME);
+               if (jaasConfigFile.exists() && jaasConfigFile.isFile()) {
+                       effectiveShipFiles.add(jaasConfigFile);
+               }
 
-               final ContainerLaunchContext amContainer = 
setupApplicationMasterContainer(hasLogback, hasLog4j);
+               addLibFolderToShipFiles(effectiveShipFiles);
 
                // Set-up ApplicationSubmissionContext for the application
                ApplicationSubmissionContext appContext = 
yarnApplication.getApplicationSubmissionContext();
@@ -626,8 +612,53 @@ public abstract class AbstractYarnClusterDescriptor 
implements ClusterDescriptor
                FsPermission permission = new FsPermission(FsAction.ALL, 
FsAction.NONE, FsAction.NONE);
                fs.setPermission(sessionFilesDir, permission); // set 
permission for path.
 
+               //To support Yarn Secure Integration Test Scenario
+               //In Integration test setup, the Yarn containers created by 
YarnMiniCluster does not have the Yarn site XML
+               //and KRB5 configuration files. We are adding these files as 
container local resources for the container
+               //applications (JM/TMs) to have proper secure cluster setup
+               Path remoteKrb5Path = null;
+               Path remoteYarnSiteXmlPath = null;
+               boolean hasKrb5 = false;
+               if(System.getenv("IN_TESTS") != null) {
+                       String krb5Config = 
System.getProperty("java.security.krb5.conf");
+                       if(krb5Config != null && krb5Config.length() != 0) {
+                               File krb5 = new File(krb5Config);
+                               LOG.info("Adding KRB5 configuration {} to the 
AM container local resource bucket", krb5.getAbsolutePath());
+                               LocalResource krb5ConfResource = 
Records.newRecord(LocalResource.class);
+                               Path krb5ConfPath = new 
Path(krb5.getAbsolutePath());
+                               remoteKrb5Path = Utils.setupLocalResource(fs, 
appId.toString(), krb5ConfPath, krb5ConfResource, fs.getHomeDirectory());
+                               localResources.put(Utils.KRB5_FILE_NAME, 
krb5ConfResource);
+
+                               File f = new 
File(System.getenv("YARN_CONF_DIR"),Utils.YARN_SITE_FILE_NAME);
+                               LOG.info("Adding Yarn configuration {} to the 
AM container local resource bucket", f.getAbsolutePath());
+                               LocalResource yarnConfResource = 
Records.newRecord(LocalResource.class);
+                               Path yarnSitePath = new 
Path(f.getAbsolutePath());
+                               remoteYarnSiteXmlPath = 
Utils.setupLocalResource(fs, appId.toString(), yarnSitePath, yarnConfResource, 
fs.getHomeDirectory());
+                               localResources.put(Utils.YARN_SITE_FILE_NAME, 
yarnConfResource);
+
+                               hasKrb5 = true;
+                       }
+               }
+
                // setup security tokens
-               Utils.setTokensFor(amContainer, paths, conf);
+               LocalResource keytabResource = null;
+               Path remotePathKeytab = null;
+               String keytab = 
flinkConfiguration.getString(ConfigConstants.SECURITY_KEYTAB_KEY, null);
+               if(keytab != null) {
+                       LOG.info("Adding keytab {} to the AM container local 
resource bucket", keytab);
+                       keytabResource = Records.newRecord(LocalResource.class);
+                       Path keytabPath = new Path(keytab);
+                       remotePathKeytab = Utils.setupLocalResource(fs, 
appId.toString(), keytabPath, keytabResource, fs.getHomeDirectory());
+                       localResources.put(Utils.KEYTAB_FILE_NAME, 
keytabResource);
+               }
+
+               final ContainerLaunchContext amContainer = 
setupApplicationMasterContainer(hasLogback, hasLog4j, hasKrb5);
+
+               if ( UserGroupInformation.isSecurityEnabled() && keytab == null 
) {
+                       //set tokens only when keytab is not provided
+                       LOG.info("Adding delegation token to the AM 
container..");
+                       Utils.setTokensFor(amContainer, paths, conf);
+               }
 
                amContainer.setLocalResources(localResources);
                fs.close();
@@ -646,11 +677,25 @@ public abstract class AbstractYarnClusterDescriptor 
implements ClusterDescriptor
                appMasterEnv.put(YarnConfigKeys.ENV_APP_ID, appId.toString());
                appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_HOME_DIR, 
fs.getHomeDirectory().toString());
                appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_SHIP_FILES, 
envShipFileList.toString());
-               appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_USERNAME, 
UserGroupInformation.getCurrentUser().getShortUserName());
                appMasterEnv.put(YarnConfigKeys.ENV_SLOTS, 
String.valueOf(slots));
                appMasterEnv.put(YarnConfigKeys.ENV_DETACHED, 
String.valueOf(detached));
                appMasterEnv.put(YarnConfigKeys.ENV_ZOOKEEPER_NAMESPACE, 
getZookeeperNamespace());
 
+               // 
https://github.com/apache/hadoop/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnApplicationSecurity.md#identity-on-an-insecure-cluster-hadoop_user_name
+               appMasterEnv.put(YarnConfigKeys.ENV_HADOOP_USER_NAME, 
UserGroupInformation.getCurrentUser().getUserName());
+
+               if(keytabResource != null) {
+                       appMasterEnv.put(YarnConfigKeys.KEYTAB_PATH, 
remotePathKeytab.toString() );
+                       String principal = 
flinkConfiguration.getString(ConfigConstants.SECURITY_PRINCIPAL_KEY, null);
+                       appMasterEnv.put(YarnConfigKeys.KEYTAB_PRINCIPAL, 
principal );
+               }
+
+               //To support Yarn Secure Integration Test Scenario
+               if(remoteYarnSiteXmlPath != null && remoteKrb5Path != null) {
+                       appMasterEnv.put(YarnConfigKeys.ENV_YARN_SITE_XML_PATH, 
remoteYarnSiteXmlPath.toString());
+                       appMasterEnv.put(YarnConfigKeys.ENV_KRB5_PATH, 
remoteKrb5Path.toString() );
+               }
+
                if(dynamicPropertiesEncoded != null) {
                        appMasterEnv.put(YarnConfigKeys.ENV_DYNAMIC_PROPERTIES, 
dynamicPropertiesEncoded);
                }
@@ -700,6 +745,7 @@ public abstract class AbstractYarnClusterDescriptor 
implements ClusterDescriptor
                                throw new YarnDeploymentException("Failed to 
deploy the cluster: " + e.getMessage());
                        }
                        YarnApplicationState appState = 
report.getYarnApplicationState();
+                       LOG.debug("Application State: {}", appState);
                        switch(appState) {
                                case FAILED:
                                case FINISHED:
@@ -996,7 +1042,9 @@ public abstract class AbstractYarnClusterDescriptor 
implements ClusterDescriptor
                }
        }
 
-       protected ContainerLaunchContext 
setupApplicationMasterContainer(boolean hasLogback, boolean hasLog4j) {
+       protected ContainerLaunchContext 
setupApplicationMasterContainer(boolean hasLogback,
+                                                                               
                                                        boolean hasLog4j,
+                                                                               
                                                        boolean hasKrb5) {
                // ------------------ Prepare Application Master Container  
------------------------------
 
                // respect custom JVM options in the YAML file
@@ -1021,6 +1069,12 @@ public abstract class AbstractYarnClusterDescriptor 
implements ClusterDescriptor
                        }
                }
 
+               //applicable only for YarnMiniCluster secure test run
+               //krb5.conf file will be available as local resource in JM/TM 
container
+               if(hasKrb5) {
+                       amCommand += " -Djava.security.krb5.conf=krb5.conf";
+               }
+
                amCommand += " " + getApplicationMasterClass().getName() + " "
                        + " 1>"
                        + ApplicationConstants.LOG_DIR_EXPANSION_VAR + 
"/jobmanager.out"

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
index 1496d61..94d4582 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
@@ -60,6 +60,14 @@ public final class Utils {
        
        private static final Logger LOG = LoggerFactory.getLogger(Utils.class);
 
+       /** Keytab file name populated in YARN container */
+       public static final String KEYTAB_FILE_NAME = "krb5.keytab";
+
+       /** KRB5 file name populated in YARN container for secure IT run */
+       public static final String KRB5_FILE_NAME = "krb5.conf";
+
+       /** Yarn site xml file name populated in YARN container for secure IT 
run */
+       public static final String YARN_SITE_FILE_NAME = "yarn-site.xml";
 
        /**
         * See documentation

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index 6619633..efb658a 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -32,6 +32,7 @@ import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.jobmanager.MemoryArchivist;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.process.ProcessReaper;
+import org.apache.flink.runtime.security.SecurityContext;
 import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.JvmShutdownSafeguard;
@@ -40,11 +41,11 @@ import org.apache.flink.runtime.util.SignalHandler;
 import org.apache.flink.runtime.webmonitor.WebMonitor;
 
 import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
@@ -60,11 +61,10 @@ import scala.concurrent.duration.FiniteDuration;
 import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.security.PrivilegedAction;
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.Map;
+import java.util.HashMap;
 import java.util.UUID;
+import java.util.Collections;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
@@ -117,7 +117,7 @@ public class YarnApplicationMasterRunner {
 
        /**
         * The instance entry point for the YARN application master. Obtains 
user group
-        * information and calls the main work method {@link 
#runApplicationMaster()} as a
+        * information and calls the main work method {@link 
#runApplicationMaster(Configuration)} as a
         * privileged action.
         *
         * @param args The command line arguments.
@@ -127,34 +127,66 @@ public class YarnApplicationMasterRunner {
                try {
                        LOG.debug("All environment variables: {}", ENV);
 
-                       final String yarnClientUsername = 
ENV.get(YarnConfigKeys.ENV_CLIENT_USERNAME);
+                       final String yarnClientUsername = 
ENV.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
                        require(yarnClientUsername != null, "YARN client user 
name environment variable {} not set",
-                               YarnConfigKeys.ENV_CLIENT_USERNAME);
+                               YarnConfigKeys.ENV_HADOOP_USER_NAME);
 
-                       final UserGroupInformation currentUser;
-                       try {
-                               currentUser = 
UserGroupInformation.getCurrentUser();
-                       } catch (Throwable t) {
-                               throw new Exception("Cannot access 
UserGroupInformation information for current user", t);
+                       final String currDir = ENV.get(Environment.PWD.key());
+                       require(currDir != null, "Current working directory 
variable (%s) not set", Environment.PWD.key());
+                       LOG.debug("Current working Directory: {}", currDir);
+
+                       final String remoteKeytabPath = 
ENV.get(YarnConfigKeys.KEYTAB_PATH);
+                       LOG.debug("remoteKeytabPath obtained {}", 
remoteKeytabPath);
+
+                       final String remoteKeytabPrincipal = 
ENV.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
+                       LOG.info("remoteKeytabPrincipal obtained {}", 
remoteKeytabPrincipal);
+
+                       String keytabPath = null;
+                       if(remoteKeytabPath != null) {
+                               File f = new File(currDir, 
Utils.KEYTAB_FILE_NAME);
+                               keytabPath = f.getAbsolutePath();
+                               LOG.debug("keytabPath: {}", keytabPath);
                        }
 
-                       LOG.info("YARN daemon runs as user {}. Running Flink 
Application Master/JobManager as user {}",
-                               currentUser.getShortUserName(), 
yarnClientUsername);
+                       UserGroupInformation currentUser = 
UserGroupInformation.getCurrentUser();
+
+                       LOG.info("YARN daemon is running as: {} Yarn client 
user obtainer: {}",
+                                       currentUser.getShortUserName(), 
yarnClientUsername );
+
+                       SecurityContext.SecurityConfiguration sc = new 
SecurityContext.SecurityConfiguration();
+
+                       //To support Yarn Secure Integration Test Scenario
+                       File krb5Conf = new File(currDir, Utils.KRB5_FILE_NAME);
+                       if(krb5Conf.exists() && krb5Conf.canRead()) {
+                               String krb5Path = krb5Conf.getAbsolutePath();
+                               LOG.info("KRB5 Conf: {}", krb5Path);
+                               org.apache.hadoop.conf.Configuration conf = new 
org.apache.hadoop.conf.Configuration();
+                               
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, 
"kerberos");
+                               
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, "true");
+                               sc.setHadoopConfiguration(conf);
+                       }
 
-                       UserGroupInformation ugi = 
UserGroupInformation.createRemoteUser(yarnClientUsername);
+                       // Flink configuration
+                       final Map<String, String> dynamicProperties =
+                                       
FlinkYarnSessionCli.getDynamicProperties(ENV.get(YarnConfigKeys.ENV_DYNAMIC_PROPERTIES));
+                       LOG.debug("YARN dynamic properties: {}", 
dynamicProperties);
 
-                       // transfer all security tokens, for example for 
authenticated HDFS and HBase access
-                       for (Token<?> token : currentUser.getTokens()) {
-                               ugi.addToken(token);
+                       final Configuration flinkConfig = 
createConfiguration(currDir, dynamicProperties);
+                       if(keytabPath != null && remoteKeytabPrincipal != null) 
{
+                               
flinkConfig.setString(ConfigConstants.SECURITY_KEYTAB_KEY, keytabPath);
+                               
flinkConfig.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY, 
remoteKeytabPrincipal);
                        }
+                       
flinkConfig.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, currDir);
 
-                       // run the actual work in a secured privileged action
-                       return ugi.doAs(new PrivilegedAction<Integer>() {
+                       
SecurityContext.install(sc.setFlinkConfiguration(flinkConfig));
+
+                       return SecurityContext.getInstalled().runSecured(new 
SecurityContext.FlinkSecuredRunner<Integer>() {
                                @Override
                                public Integer run() {
-                                       return runApplicationMaster();
+                                       return 
runApplicationMaster(flinkConfig);
                                }
                        });
+
                }
                catch (Throwable t) {
                        // make sure that everything whatever ends up in the log
@@ -172,7 +204,7 @@ public class YarnApplicationMasterRunner {
         *
         * @return The return code for the Java process.
         */
-       protected int runApplicationMaster() {
+       protected int runApplicationMaster(Configuration config) {
                ActorSystem actorSystem = null;
                WebMonitor webMonitor = null;
 
@@ -194,12 +226,21 @@ public class YarnApplicationMasterRunner {
 
                        LOG.info("YARN assigned hostname for application 
master: {}", appMasterHostname);
 
-                       // Flink configuration
-                       final Map<String, String> dynamicProperties =
-                               
FlinkYarnSessionCli.getDynamicProperties(ENV.get(YarnConfigKeys.ENV_DYNAMIC_PROPERTIES));
-                       LOG.debug("YARN dynamic properties: {}", 
dynamicProperties);
+                       //Update keytab and principal path to reflect YARN 
container path location
+                       final String remoteKeytabPath = 
ENV.get(YarnConfigKeys.KEYTAB_PATH);
 
-                       final Configuration config = 
createConfiguration(currDir, dynamicProperties);
+                       final String remoteKeytabPrincipal = 
ENV.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
+
+                       String keytabPath = null;
+                       if(remoteKeytabPath != null) {
+                               File f = new File(currDir, 
Utils.KEYTAB_FILE_NAME);
+                               keytabPath = f.getAbsolutePath();
+                               LOG.info("keytabPath: {}", keytabPath);
+                       }
+                       if(keytabPath != null && remoteKeytabPrincipal != null) 
{
+                               
config.setString(ConfigConstants.SECURITY_KEYTAB_KEY, keytabPath);
+                               
config.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY, remoteKeytabPrincipal);
+                       }
 
                        // Hadoop/Yarn configuration (loads config data 
automatically from classpath files)
                        final YarnConfiguration yarnConfig = new 
YarnConfiguration();
@@ -523,8 +564,20 @@ public class YarnApplicationMasterRunner {
                String shipListString = 
env.get(YarnConfigKeys.ENV_CLIENT_SHIP_FILES);
                require(shipListString != null, "Environment variable %s not 
set", YarnConfigKeys.ENV_CLIENT_SHIP_FILES);
 
-               String yarnClientUsername = 
env.get(YarnConfigKeys.ENV_CLIENT_USERNAME);
-               require(yarnClientUsername != null, "Environment variable %s 
not set", YarnConfigKeys.ENV_CLIENT_USERNAME);
+               String yarnClientUsername = 
env.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
+               require(yarnClientUsername != null, "Environment variable %s 
not set", YarnConfigKeys.ENV_HADOOP_USER_NAME);
+
+               final String remoteKeytabPath = 
env.get(YarnConfigKeys.KEYTAB_PATH);
+               LOG.info("TM:remoteKeytabPath obtained {}", remoteKeytabPath);
+
+               final String remoteKeytabPrincipal = 
env.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
+               LOG.info("TM:remoteKeytabPrincipal obtained {}", 
remoteKeytabPrincipal);
+
+               final String remoteYarnConfPath = 
env.get(YarnConfigKeys.ENV_YARN_SITE_XML_PATH);
+               LOG.info("TM:remoteYarnConfPath obtained {}", 
remoteYarnConfPath);
+
+               final String remoteKrb5Path = 
env.get(YarnConfigKeys.ENV_KRB5_PATH);
+               LOG.info("TM:remotekrb5Path obtained {}", remoteKrb5Path);
 
                String classPathString = 
env.get(YarnConfigKeys.ENV_FLINK_CLASSPATH);
                require(classPathString != null, "Environment variable %s not 
set", YarnConfigKeys.ENV_FLINK_CLASSPATH);
@@ -537,6 +590,33 @@ public class YarnApplicationMasterRunner {
                        throw new Exception("Could not access YARN's default 
file system", e);
                }
 
+               //register keytab
+               LocalResource keytabResource = null;
+               if(remoteKeytabPath != null) {
+                       LOG.info("Adding keytab {} to the AM container local 
resource bucket", remoteKeytabPath);
+                       keytabResource = Records.newRecord(LocalResource.class);
+                       Path keytabPath = new Path(remoteKeytabPath);
+                       Utils.registerLocalResource(yarnFileSystem, keytabPath, 
keytabResource);
+               }
+
+               //To support Yarn Secure Integration Test Scenario
+               LocalResource yarnConfResource = null;
+               LocalResource krb5ConfResource = null;
+               boolean hasKrb5 = false;
+               if(remoteYarnConfPath != null && remoteKrb5Path != null) {
+                       LOG.info("TM:Adding remoteYarnConfPath {} to the 
container local resource bucket", remoteYarnConfPath);
+                       yarnConfResource = 
Records.newRecord(LocalResource.class);
+                       Path yarnConfPath = new Path(remoteYarnConfPath);
+                       Utils.registerLocalResource(yarnFileSystem, 
yarnConfPath, yarnConfResource);
+
+                       LOG.info("TM:Adding remoteKrb5Path {} to the container 
local resource bucket", remoteKrb5Path);
+                       krb5ConfResource = 
Records.newRecord(LocalResource.class);
+                       Path krb5ConfPath = new Path(remoteKrb5Path);
+                       Utils.registerLocalResource(yarnFileSystem, 
krb5ConfPath, krb5ConfResource);
+
+                       hasKrb5 = true;
+               }
+
                // register Flink Jar with remote HDFS
                LocalResource flinkJar = Records.newRecord(LocalResource.class);
                {
@@ -563,6 +643,16 @@ public class YarnApplicationMasterRunner {
                taskManagerLocalResources.put("flink.jar", flinkJar);
                taskManagerLocalResources.put("flink-conf.yaml", flinkConf);
 
+               //To support Yarn Secure Integration Test Scenario
+               if(yarnConfResource != null && krb5ConfResource != null) {
+                       
taskManagerLocalResources.put(Utils.YARN_SITE_FILE_NAME, yarnConfResource);
+                       taskManagerLocalResources.put(Utils.KRB5_FILE_NAME, 
krb5ConfResource);
+               }
+
+               if(keytabResource != null) {
+                       taskManagerLocalResources.put(Utils.KEYTAB_FILE_NAME, 
keytabResource);
+               }
+
                // prepare additional files to be shipped
                for (String pathStr : shipListString.split(",")) {
                        if (!pathStr.isEmpty()) {
@@ -582,7 +672,7 @@ public class YarnApplicationMasterRunner {
 
                String launchCommand = 
BootstrapTools.getTaskManagerShellCommand(
                        flinkConfig, tmParams, ".", 
ApplicationConstants.LOG_DIR_EXPANSION_VAR,
-                       hasLogback, hasLog4j, taskManagerMainClass);
+                       hasLogback, hasLog4j, hasKrb5, taskManagerMainClass);
 
                log.info("Starting TaskManagers with command: " + 
launchCommand);
 
@@ -597,11 +687,17 @@ public class YarnApplicationMasterRunner {
                containerEnv.put(ENV_FLINK_CLASSPATH, classPathString);
                Utils.setupYarnClassPath(yarnConfig, containerEnv);
 
-               containerEnv.put(YarnConfigKeys.ENV_CLIENT_USERNAME, 
yarnClientUsername);
+               containerEnv.put(YarnConfigKeys.ENV_HADOOP_USER_NAME, 
UserGroupInformation.getCurrentUser().getUserName());
+
+               if(remoteKeytabPath != null && remoteKeytabPrincipal != null) {
+                       containerEnv.put(YarnConfigKeys.KEYTAB_PATH, 
remoteKeytabPath);
+                       containerEnv.put(YarnConfigKeys.KEYTAB_PRINCIPAL, 
remoteKeytabPrincipal);
+               }
 
                ctx.setEnvironment(containerEnv);
 
                try (DataOutputBuffer dob = new DataOutputBuffer()) {
+                       LOG.debug("Adding security tokens to Task Manager 
Container launch Context....");
                        UserGroupInformation user = 
UserGroupInformation.getCurrentUser();
                        Credentials credentials = user.getCredentials();
                        credentials.writeTokenStorageToStream(dob);

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-yarn/src/main/java/org/apache/flink/yarn/YarnConfigKeys.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnConfigKeys.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnConfigKeys.java
index b14d7b7..ada241c 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnConfigKeys.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnConfigKeys.java
@@ -32,7 +32,6 @@ public class YarnConfigKeys {
        public final static String ENV_APP_ID = "_APP_ID";
        public static final String ENV_CLIENT_HOME_DIR = "_CLIENT_HOME_DIR";
        public static final String ENV_CLIENT_SHIP_FILES = "_CLIENT_SHIP_FILES";
-       public static final String ENV_CLIENT_USERNAME = "_CLIENT_USERNAME";
        public static final String ENV_SLOTS = "_SLOTS";
        public static final String ENV_DETACHED = "_DETACHED";
        public static final String ENV_DYNAMIC_PROPERTIES = 
"_DYNAMIC_PROPERTIES";
@@ -41,8 +40,13 @@ public class YarnConfigKeys {
 
        public final static String FLINK_JAR_PATH = "_FLINK_JAR_PATH"; // the 
Flink jar resource location (in HDFS).
 
+       public final static String KEYTAB_PATH = "_KEYTAB_PATH";
+       public final static String KEYTAB_PRINCIPAL = "_KEYTAB_PRINCIPAL";
+       public final static String ENV_HADOOP_USER_NAME = "HADOOP_USER_NAME";
        public static final String ENV_ZOOKEEPER_NAMESPACE = 
"_ZOOKEEPER_NAMESPACE";
 
+       public static final String ENV_KRB5_PATH = "_KRB5_PATH";
+       public static final String ENV_YARN_SITE_XML_PATH = 
"_YARN_SITE_XML_PATH";
 
        // 
------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
index 9638137..c70a30b 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
@@ -18,22 +18,22 @@
 
 package org.apache.flink.yarn;
 
+import java.io.File;
 import java.io.IOException;
-import java.security.PrivilegedAction;
 import java.util.Map;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.security.SecurityContext;
 import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 
 import org.apache.flink.runtime.util.JvmShutdownSafeguard;
 import org.apache.flink.runtime.util.SignalHandler;
 import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
 
 import org.slf4j.Logger;
@@ -64,8 +64,18 @@ public class YarnTaskManagerRunner {
 
                // read the environment variables for YARN
                final Map<String, String> envs = System.getenv();
-               final String yarnClientUsername = 
envs.get(YarnConfigKeys.ENV_CLIENT_USERNAME);
+               final String yarnClientUsername = 
envs.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
                final String localDirs = envs.get(Environment.LOCAL_DIRS.key());
+               LOG.info("Current working/local Directory: {}", localDirs);
+
+               final String currDir = envs.get(Environment.PWD.key());
+               LOG.info("Current working Directory: {}", currDir);
+
+               final String remoteKeytabPath = 
envs.get(YarnConfigKeys.KEYTAB_PATH);
+               LOG.info("TM: remoteKeytabPath obtained {}", remoteKeytabPath);
+
+               final String remoteKeytabPrincipal = 
envs.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
+               LOG.info("TM: remoteKeytabPrincipal obtained {}", 
remoteKeytabPrincipal);
 
                // configure local directory
                String flinkTempDirs = 
configuration.getString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, null);
@@ -78,34 +88,66 @@ public class YarnTaskManagerRunner {
                                "specified in the Flink config: " + 
flinkTempDirs);
                }
 
-               LOG.info("YARN daemon runs as '" + 
UserGroupInformation.getCurrentUser().getShortUserName() +
-                       "' setting user to execute Flink TaskManager to '" + 
yarnClientUsername + "'");
-
                // tell akka to die in case of an error
                
configuration.setBoolean(ConfigConstants.AKKA_JVM_EXIT_ON_FATAL_ERROR, true);
 
-               UserGroupInformation ugi = 
UserGroupInformation.createRemoteUser(yarnClientUsername);
-               for (Token<? extends TokenIdentifier> toks : 
UserGroupInformation.getCurrentUser().getTokens()) {
-                       ugi.addToken(toks);
+               String keytabPath = null;
+               if(remoteKeytabPath != null) {
+                       File f = new File(currDir, Utils.KEYTAB_FILE_NAME);
+                       keytabPath = f.getAbsolutePath();
+                       LOG.info("keytabPath: {}", keytabPath);
                }
 
+               UserGroupInformation currentUser = 
UserGroupInformation.getCurrentUser();
+
+               LOG.info("YARN daemon is running as: {} Yarn client user 
obtainer: {}",
+                               currentUser.getShortUserName(), 
yarnClientUsername );
+
                // Infer the resource identifier from the environment variable
                String containerID = 
Preconditions.checkNotNull(envs.get(YarnFlinkResourceManager.ENV_FLINK_CONTAINER_ID));
                final ResourceID resourceId = new ResourceID(containerID);
                LOG.info("ResourceID assigned for this container: {}", 
resourceId);
 
-               ugi.doAs(new PrivilegedAction<Object>() {
-                       @Override
-                       public Object run() {
-                               try {
-                                       
TaskManager.selectNetworkInterfaceAndRunTaskManager(configuration, resourceId, 
taskManager);
-                               }
-                               catch (Throwable t) {
-                                       LOG.error("Error while starting the 
TaskManager", t);
-                                       
System.exit(TaskManager.STARTUP_FAILURE_RETURN_CODE());
-                               }
-                               return null;
+               try {
+
+                       SecurityContext.SecurityConfiguration sc = new 
SecurityContext.SecurityConfiguration();
+
+                       //To support Yarn Secure Integration Test Scenario
+                       File krb5Conf = new File(currDir, Utils.KRB5_FILE_NAME);
+                       if(krb5Conf.exists() && krb5Conf.canRead()) {
+                               String krb5Path = krb5Conf.getAbsolutePath();
+                               LOG.info("KRB5 Conf: {}", krb5Path);
+                               org.apache.hadoop.conf.Configuration conf = new 
org.apache.hadoop.conf.Configuration();
+                               
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, 
"kerberos");
+                               
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, "true");
+                               sc.setHadoopConfiguration(conf);
+                       }
+
+                       if(keytabPath != null && remoteKeytabPrincipal != null) 
{
+                               
configuration.setString(ConfigConstants.SECURITY_KEYTAB_KEY, keytabPath);
+                               
configuration.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY, 
remoteKeytabPrincipal);
                        }
-               });
+                       
configuration.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, currDir);
+
+                       
SecurityContext.install(sc.setFlinkConfiguration(configuration));
+
+                       SecurityContext.getInstalled().runSecured(new 
SecurityContext.FlinkSecuredRunner<Integer>() {
+                               @Override
+                               public Integer run() {
+                                       try {
+                                               
TaskManager.selectNetworkInterfaceAndRunTaskManager(configuration, resourceId, 
taskManager);
+                                       }
+                                       catch (Throwable t) {
+                                               LOG.error("Error while starting 
the TaskManager", t);
+                                               
System.exit(TaskManager.STARTUP_FAILURE_RETURN_CODE());
+                                       }
+                                       return null;
+                               }
+                       });
+               } catch(Exception e) {
+                       LOG.error("Exception occurred while launching Task 
Manager. Reason: {}", e);
+                       throw new RuntimeException(e);
+               }
+
        }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index 8f02a1c..b5364f0 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -24,11 +24,14 @@ import org.apache.commons.cli.Option;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.PosixParser;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.client.CliFrontend;
 import org.apache.flink.client.cli.CliFrontendParser;
 import org.apache.flink.client.cli.CustomCommandLine;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.runtime.security.SecurityContext;
 import org.apache.flink.yarn.AbstractYarnClusterDescriptor;
 import org.apache.flink.yarn.YarnClusterDescriptor;
 import org.apache.flink.yarn.YarnClusterClient;
@@ -460,9 +463,27 @@ public class FlinkYarnSessionCli implements 
CustomCommandLine<YarnClusterClient>
                }
        }
 
-       public static void main(String[] args) {
-               FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", ""); // 
no prefix for the YARN session
-               System.exit(cli.run(args));
+       public static void main(final String[] args) {
+               final FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", "", 
true); // no prefix for the YARN session
+
+               String confDirPath = 
CliFrontend.getConfigurationDirectoryFromEnv();
+               GlobalConfiguration.loadConfiguration(confDirPath);
+               Configuration flinkConfiguration = 
GlobalConfiguration.loadConfiguration();
+               
flinkConfiguration.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, 
confDirPath);
+               try {
+                       SecurityContext.install(new 
SecurityContext.SecurityConfiguration().setFlinkConfiguration(flinkConfiguration));
+                       int retCode = 
SecurityContext.getInstalled().runSecured(new 
SecurityContext.FlinkSecuredRunner<Integer>() {
+                               @Override
+                               public Integer run() {
+                                       return cli.run(args);
+                               }
+                       });
+                       System.exit(retCode);
+               } catch(Exception e) {
+                       e.printStackTrace();
+                       LOG.error("Exception Occured. Reason: {}", e);
+                       return;
+               }
        }
 
        @Override
@@ -523,6 +544,7 @@ public class FlinkYarnSessionCli implements 
CustomCommandLine<YarnClusterClient>
                try {
                        return yarnClusterDescriptor.deploy();
                } catch (Exception e) {
+                       LOG.error("Error while deploying YARN cluster: 
"+e.getMessage(), e);
                        throw new RuntimeException("Error deploying the YARN 
cluster", e);
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 02e868e..5b3148a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -107,6 +107,13 @@ under the License.
                <jackson.version>2.7.4</jackson.version>
                <metrics.version>3.1.0</metrics.version>
                <junit.version>4.11</junit.version>
+               <!--
+                       Keeping the MiniKDC version fixed instead of taking 
hadoop version dependency
+                       to support testing Kafka, ZK etc., modules that does 
not have Hadoop dependency
+                       Starting Hadoop 3, org.apache.kerby will be used 
instead of MiniKDC. We may have
+                       to revisit the impact at that time.
+               -->
+               <minikdc.version>2.7.2</minikdc.version>
        </properties>
 
        <dependencies>

http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/tools/log4j-travis.properties
----------------------------------------------------------------------
diff --git a/tools/log4j-travis.properties b/tools/log4j-travis.properties
index 53379b4..476cee3 100644
--- a/tools/log4j-travis.properties
+++ b/tools/log4j-travis.properties
@@ -45,3 +45,6 @@ log4j.logger.org.apache.flink.runtime.leaderretrieval=DEBUG
 # the tests
 log4j.logger.org.apache.flink.yarn.YARNSessionFIFOITCase=INFO, console
 log4j.logger.org.apache.flink.yarn.YARNSessionCapacitySchedulerITCase=INFO, 
console
+log4j.logger.org.apache.flink.streaming.connectors.kafka=INFO, console
+log4j.logger.org.I0Itec.zkclient=INFO, console
+log4j.logger.org.apache.flink.streaming.connectors.kafka.internals.SimpleConsumerThread=OFF
\ No newline at end of file

Reply via email to