http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java index f4c2032..848013c 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java @@ -24,6 +24,7 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; +import org.apache.flink.runtime.security.SecurityContext; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -59,7 +60,6 @@ import java.io.IOException; import java.io.PrintStream; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; -import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -341,26 +341,8 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor @Override public YarnClusterClient deploy() { - try { - - UserGroupInformation.setConfiguration(conf); - UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); - - if (UserGroupInformation.isSecurityEnabled()) { - if (!ugi.hasKerberosCredentials()) { - throw new YarnDeploymentException("In secure mode. Please provide Kerberos credentials in order to authenticate. " + - "You may use kinit to authenticate and request a TGT from the Kerberos server."); - } - return ugi.doAs(new PrivilegedExceptionAction<YarnClusterClient>() { - @Override - public YarnClusterClient run() throws Exception { - return deployInternal(); - } - }); - } else { - return deployInternal(); - } + return deployInternal(); } catch (Exception e) { throw new RuntimeException("Couldn't deploy Yarn cluster", e); } @@ -539,9 +521,13 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor } } - addLibFolderToShipFiles(effectiveShipFiles); + //check if there is a JAAS config file + File jaasConfigFile = new File(configurationDirectory + File.separator + SecurityContext.JAAS_CONF_FILENAME); + if (jaasConfigFile.exists() && jaasConfigFile.isFile()) { + effectiveShipFiles.add(jaasConfigFile); + } - final ContainerLaunchContext amContainer = setupApplicationMasterContainer(hasLogback, hasLog4j); + addLibFolderToShipFiles(effectiveShipFiles); // Set-up ApplicationSubmissionContext for the application ApplicationSubmissionContext appContext = yarnApplication.getApplicationSubmissionContext(); @@ -626,8 +612,53 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor FsPermission permission = new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE); fs.setPermission(sessionFilesDir, permission); // set permission for path. + //To support Yarn Secure Integration Test Scenario + //In Integration test setup, the Yarn containers created by YarnMiniCluster does not have the Yarn site XML + //and KRB5 configuration files. We are adding these files as container local resources for the container + //applications (JM/TMs) to have proper secure cluster setup + Path remoteKrb5Path = null; + Path remoteYarnSiteXmlPath = null; + boolean hasKrb5 = false; + if(System.getenv("IN_TESTS") != null) { + String krb5Config = System.getProperty("java.security.krb5.conf"); + if(krb5Config != null && krb5Config.length() != 0) { + File krb5 = new File(krb5Config); + LOG.info("Adding KRB5 configuration {} to the AM container local resource bucket", krb5.getAbsolutePath()); + LocalResource krb5ConfResource = Records.newRecord(LocalResource.class); + Path krb5ConfPath = new Path(krb5.getAbsolutePath()); + remoteKrb5Path = Utils.setupLocalResource(fs, appId.toString(), krb5ConfPath, krb5ConfResource, fs.getHomeDirectory()); + localResources.put(Utils.KRB5_FILE_NAME, krb5ConfResource); + + File f = new File(System.getenv("YARN_CONF_DIR"),Utils.YARN_SITE_FILE_NAME); + LOG.info("Adding Yarn configuration {} to the AM container local resource bucket", f.getAbsolutePath()); + LocalResource yarnConfResource = Records.newRecord(LocalResource.class); + Path yarnSitePath = new Path(f.getAbsolutePath()); + remoteYarnSiteXmlPath = Utils.setupLocalResource(fs, appId.toString(), yarnSitePath, yarnConfResource, fs.getHomeDirectory()); + localResources.put(Utils.YARN_SITE_FILE_NAME, yarnConfResource); + + hasKrb5 = true; + } + } + // setup security tokens - Utils.setTokensFor(amContainer, paths, conf); + LocalResource keytabResource = null; + Path remotePathKeytab = null; + String keytab = flinkConfiguration.getString(ConfigConstants.SECURITY_KEYTAB_KEY, null); + if(keytab != null) { + LOG.info("Adding keytab {} to the AM container local resource bucket", keytab); + keytabResource = Records.newRecord(LocalResource.class); + Path keytabPath = new Path(keytab); + remotePathKeytab = Utils.setupLocalResource(fs, appId.toString(), keytabPath, keytabResource, fs.getHomeDirectory()); + localResources.put(Utils.KEYTAB_FILE_NAME, keytabResource); + } + + final ContainerLaunchContext amContainer = setupApplicationMasterContainer(hasLogback, hasLog4j, hasKrb5); + + if ( UserGroupInformation.isSecurityEnabled() && keytab == null ) { + //set tokens only when keytab is not provided + LOG.info("Adding delegation token to the AM container.."); + Utils.setTokensFor(amContainer, paths, conf); + } amContainer.setLocalResources(localResources); fs.close(); @@ -646,11 +677,25 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor appMasterEnv.put(YarnConfigKeys.ENV_APP_ID, appId.toString()); appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_HOME_DIR, fs.getHomeDirectory().toString()); appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_SHIP_FILES, envShipFileList.toString()); - appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_USERNAME, UserGroupInformation.getCurrentUser().getShortUserName()); appMasterEnv.put(YarnConfigKeys.ENV_SLOTS, String.valueOf(slots)); appMasterEnv.put(YarnConfigKeys.ENV_DETACHED, String.valueOf(detached)); appMasterEnv.put(YarnConfigKeys.ENV_ZOOKEEPER_NAMESPACE, getZookeeperNamespace()); + // https://github.com/apache/hadoop/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnApplicationSecurity.md#identity-on-an-insecure-cluster-hadoop_user_name + appMasterEnv.put(YarnConfigKeys.ENV_HADOOP_USER_NAME, UserGroupInformation.getCurrentUser().getUserName()); + + if(keytabResource != null) { + appMasterEnv.put(YarnConfigKeys.KEYTAB_PATH, remotePathKeytab.toString() ); + String principal = flinkConfiguration.getString(ConfigConstants.SECURITY_PRINCIPAL_KEY, null); + appMasterEnv.put(YarnConfigKeys.KEYTAB_PRINCIPAL, principal ); + } + + //To support Yarn Secure Integration Test Scenario + if(remoteYarnSiteXmlPath != null && remoteKrb5Path != null) { + appMasterEnv.put(YarnConfigKeys.ENV_YARN_SITE_XML_PATH, remoteYarnSiteXmlPath.toString()); + appMasterEnv.put(YarnConfigKeys.ENV_KRB5_PATH, remoteKrb5Path.toString() ); + } + if(dynamicPropertiesEncoded != null) { appMasterEnv.put(YarnConfigKeys.ENV_DYNAMIC_PROPERTIES, dynamicPropertiesEncoded); } @@ -700,6 +745,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor throw new YarnDeploymentException("Failed to deploy the cluster: " + e.getMessage()); } YarnApplicationState appState = report.getYarnApplicationState(); + LOG.debug("Application State: {}", appState); switch(appState) { case FAILED: case FINISHED: @@ -996,7 +1042,9 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor } } - protected ContainerLaunchContext setupApplicationMasterContainer(boolean hasLogback, boolean hasLog4j) { + protected ContainerLaunchContext setupApplicationMasterContainer(boolean hasLogback, + boolean hasLog4j, + boolean hasKrb5) { // ------------------ Prepare Application Master Container ------------------------------ // respect custom JVM options in the YAML file @@ -1021,6 +1069,12 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor } } + //applicable only for YarnMiniCluster secure test run + //krb5.conf file will be available as local resource in JM/TM container + if(hasKrb5) { + amCommand += " -Djava.security.krb5.conf=krb5.conf"; + } + amCommand += " " + getApplicationMasterClass().getName() + " " + " 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager.out"
http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java index 1496d61..94d4582 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java @@ -60,6 +60,14 @@ public final class Utils { private static final Logger LOG = LoggerFactory.getLogger(Utils.class); + /** Keytab file name populated in YARN container */ + public static final String KEYTAB_FILE_NAME = "krb5.keytab"; + + /** KRB5 file name populated in YARN container for secure IT run */ + public static final String KRB5_FILE_NAME = "krb5.conf"; + + /** Yarn site xml file name populated in YARN container for secure IT run */ + public static final String YARN_SITE_FILE_NAME = "yarn-site.xml"; /** * See documentation http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java index 6619633..efb658a 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java @@ -32,6 +32,7 @@ import org.apache.flink.runtime.jobmanager.JobManager; import org.apache.flink.runtime.jobmanager.MemoryArchivist; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.process.ProcessReaper; +import org.apache.flink.runtime.security.SecurityContext; import org.apache.flink.runtime.taskmanager.TaskManager; import org.apache.flink.runtime.util.EnvironmentInformation; import org.apache.flink.runtime.util.JvmShutdownSafeguard; @@ -40,11 +41,11 @@ import org.apache.flink.runtime.util.SignalHandler; import org.apache.flink.runtime.webmonitor.WebMonitor; import org.apache.flink.yarn.cli.FlinkYarnSessionCli; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; @@ -60,11 +61,10 @@ import scala.concurrent.duration.FiniteDuration; import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; -import java.security.PrivilegedAction; -import java.util.Collections; -import java.util.HashMap; import java.util.Map; +import java.util.HashMap; import java.util.UUID; +import java.util.Collections; import java.util.concurrent.TimeUnit; import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH; @@ -117,7 +117,7 @@ public class YarnApplicationMasterRunner { /** * The instance entry point for the YARN application master. Obtains user group - * information and calls the main work method {@link #runApplicationMaster()} as a + * information and calls the main work method {@link #runApplicationMaster(Configuration)} as a * privileged action. * * @param args The command line arguments. @@ -127,34 +127,66 @@ public class YarnApplicationMasterRunner { try { LOG.debug("All environment variables: {}", ENV); - final String yarnClientUsername = ENV.get(YarnConfigKeys.ENV_CLIENT_USERNAME); + final String yarnClientUsername = ENV.get(YarnConfigKeys.ENV_HADOOP_USER_NAME); require(yarnClientUsername != null, "YARN client user name environment variable {} not set", - YarnConfigKeys.ENV_CLIENT_USERNAME); + YarnConfigKeys.ENV_HADOOP_USER_NAME); - final UserGroupInformation currentUser; - try { - currentUser = UserGroupInformation.getCurrentUser(); - } catch (Throwable t) { - throw new Exception("Cannot access UserGroupInformation information for current user", t); + final String currDir = ENV.get(Environment.PWD.key()); + require(currDir != null, "Current working directory variable (%s) not set", Environment.PWD.key()); + LOG.debug("Current working Directory: {}", currDir); + + final String remoteKeytabPath = ENV.get(YarnConfigKeys.KEYTAB_PATH); + LOG.debug("remoteKeytabPath obtained {}", remoteKeytabPath); + + final String remoteKeytabPrincipal = ENV.get(YarnConfigKeys.KEYTAB_PRINCIPAL); + LOG.info("remoteKeytabPrincipal obtained {}", remoteKeytabPrincipal); + + String keytabPath = null; + if(remoteKeytabPath != null) { + File f = new File(currDir, Utils.KEYTAB_FILE_NAME); + keytabPath = f.getAbsolutePath(); + LOG.debug("keytabPath: {}", keytabPath); } - LOG.info("YARN daemon runs as user {}. Running Flink Application Master/JobManager as user {}", - currentUser.getShortUserName(), yarnClientUsername); + UserGroupInformation currentUser = UserGroupInformation.getCurrentUser(); + + LOG.info("YARN daemon is running as: {} Yarn client user obtainer: {}", + currentUser.getShortUserName(), yarnClientUsername ); + + SecurityContext.SecurityConfiguration sc = new SecurityContext.SecurityConfiguration(); + + //To support Yarn Secure Integration Test Scenario + File krb5Conf = new File(currDir, Utils.KRB5_FILE_NAME); + if(krb5Conf.exists() && krb5Conf.canRead()) { + String krb5Path = krb5Conf.getAbsolutePath(); + LOG.info("KRB5 Conf: {}", krb5Path); + org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration(); + conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos"); + conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, "true"); + sc.setHadoopConfiguration(conf); + } - UserGroupInformation ugi = UserGroupInformation.createRemoteUser(yarnClientUsername); + // Flink configuration + final Map<String, String> dynamicProperties = + FlinkYarnSessionCli.getDynamicProperties(ENV.get(YarnConfigKeys.ENV_DYNAMIC_PROPERTIES)); + LOG.debug("YARN dynamic properties: {}", dynamicProperties); - // transfer all security tokens, for example for authenticated HDFS and HBase access - for (Token<?> token : currentUser.getTokens()) { - ugi.addToken(token); + final Configuration flinkConfig = createConfiguration(currDir, dynamicProperties); + if(keytabPath != null && remoteKeytabPrincipal != null) { + flinkConfig.setString(ConfigConstants.SECURITY_KEYTAB_KEY, keytabPath); + flinkConfig.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY, remoteKeytabPrincipal); } + flinkConfig.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, currDir); - // run the actual work in a secured privileged action - return ugi.doAs(new PrivilegedAction<Integer>() { + SecurityContext.install(sc.setFlinkConfiguration(flinkConfig)); + + return SecurityContext.getInstalled().runSecured(new SecurityContext.FlinkSecuredRunner<Integer>() { @Override public Integer run() { - return runApplicationMaster(); + return runApplicationMaster(flinkConfig); } }); + } catch (Throwable t) { // make sure that everything whatever ends up in the log @@ -172,7 +204,7 @@ public class YarnApplicationMasterRunner { * * @return The return code for the Java process. */ - protected int runApplicationMaster() { + protected int runApplicationMaster(Configuration config) { ActorSystem actorSystem = null; WebMonitor webMonitor = null; @@ -194,12 +226,21 @@ public class YarnApplicationMasterRunner { LOG.info("YARN assigned hostname for application master: {}", appMasterHostname); - // Flink configuration - final Map<String, String> dynamicProperties = - FlinkYarnSessionCli.getDynamicProperties(ENV.get(YarnConfigKeys.ENV_DYNAMIC_PROPERTIES)); - LOG.debug("YARN dynamic properties: {}", dynamicProperties); + //Update keytab and principal path to reflect YARN container path location + final String remoteKeytabPath = ENV.get(YarnConfigKeys.KEYTAB_PATH); - final Configuration config = createConfiguration(currDir, dynamicProperties); + final String remoteKeytabPrincipal = ENV.get(YarnConfigKeys.KEYTAB_PRINCIPAL); + + String keytabPath = null; + if(remoteKeytabPath != null) { + File f = new File(currDir, Utils.KEYTAB_FILE_NAME); + keytabPath = f.getAbsolutePath(); + LOG.info("keytabPath: {}", keytabPath); + } + if(keytabPath != null && remoteKeytabPrincipal != null) { + config.setString(ConfigConstants.SECURITY_KEYTAB_KEY, keytabPath); + config.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY, remoteKeytabPrincipal); + } // Hadoop/Yarn configuration (loads config data automatically from classpath files) final YarnConfiguration yarnConfig = new YarnConfiguration(); @@ -523,8 +564,20 @@ public class YarnApplicationMasterRunner { String shipListString = env.get(YarnConfigKeys.ENV_CLIENT_SHIP_FILES); require(shipListString != null, "Environment variable %s not set", YarnConfigKeys.ENV_CLIENT_SHIP_FILES); - String yarnClientUsername = env.get(YarnConfigKeys.ENV_CLIENT_USERNAME); - require(yarnClientUsername != null, "Environment variable %s not set", YarnConfigKeys.ENV_CLIENT_USERNAME); + String yarnClientUsername = env.get(YarnConfigKeys.ENV_HADOOP_USER_NAME); + require(yarnClientUsername != null, "Environment variable %s not set", YarnConfigKeys.ENV_HADOOP_USER_NAME); + + final String remoteKeytabPath = env.get(YarnConfigKeys.KEYTAB_PATH); + LOG.info("TM:remoteKeytabPath obtained {}", remoteKeytabPath); + + final String remoteKeytabPrincipal = env.get(YarnConfigKeys.KEYTAB_PRINCIPAL); + LOG.info("TM:remoteKeytabPrincipal obtained {}", remoteKeytabPrincipal); + + final String remoteYarnConfPath = env.get(YarnConfigKeys.ENV_YARN_SITE_XML_PATH); + LOG.info("TM:remoteYarnConfPath obtained {}", remoteYarnConfPath); + + final String remoteKrb5Path = env.get(YarnConfigKeys.ENV_KRB5_PATH); + LOG.info("TM:remotekrb5Path obtained {}", remoteKrb5Path); String classPathString = env.get(YarnConfigKeys.ENV_FLINK_CLASSPATH); require(classPathString != null, "Environment variable %s not set", YarnConfigKeys.ENV_FLINK_CLASSPATH); @@ -537,6 +590,33 @@ public class YarnApplicationMasterRunner { throw new Exception("Could not access YARN's default file system", e); } + //register keytab + LocalResource keytabResource = null; + if(remoteKeytabPath != null) { + LOG.info("Adding keytab {} to the AM container local resource bucket", remoteKeytabPath); + keytabResource = Records.newRecord(LocalResource.class); + Path keytabPath = new Path(remoteKeytabPath); + Utils.registerLocalResource(yarnFileSystem, keytabPath, keytabResource); + } + + //To support Yarn Secure Integration Test Scenario + LocalResource yarnConfResource = null; + LocalResource krb5ConfResource = null; + boolean hasKrb5 = false; + if(remoteYarnConfPath != null && remoteKrb5Path != null) { + LOG.info("TM:Adding remoteYarnConfPath {} to the container local resource bucket", remoteYarnConfPath); + yarnConfResource = Records.newRecord(LocalResource.class); + Path yarnConfPath = new Path(remoteYarnConfPath); + Utils.registerLocalResource(yarnFileSystem, yarnConfPath, yarnConfResource); + + LOG.info("TM:Adding remoteKrb5Path {} to the container local resource bucket", remoteKrb5Path); + krb5ConfResource = Records.newRecord(LocalResource.class); + Path krb5ConfPath = new Path(remoteKrb5Path); + Utils.registerLocalResource(yarnFileSystem, krb5ConfPath, krb5ConfResource); + + hasKrb5 = true; + } + // register Flink Jar with remote HDFS LocalResource flinkJar = Records.newRecord(LocalResource.class); { @@ -563,6 +643,16 @@ public class YarnApplicationMasterRunner { taskManagerLocalResources.put("flink.jar", flinkJar); taskManagerLocalResources.put("flink-conf.yaml", flinkConf); + //To support Yarn Secure Integration Test Scenario + if(yarnConfResource != null && krb5ConfResource != null) { + taskManagerLocalResources.put(Utils.YARN_SITE_FILE_NAME, yarnConfResource); + taskManagerLocalResources.put(Utils.KRB5_FILE_NAME, krb5ConfResource); + } + + if(keytabResource != null) { + taskManagerLocalResources.put(Utils.KEYTAB_FILE_NAME, keytabResource); + } + // prepare additional files to be shipped for (String pathStr : shipListString.split(",")) { if (!pathStr.isEmpty()) { @@ -582,7 +672,7 @@ public class YarnApplicationMasterRunner { String launchCommand = BootstrapTools.getTaskManagerShellCommand( flinkConfig, tmParams, ".", ApplicationConstants.LOG_DIR_EXPANSION_VAR, - hasLogback, hasLog4j, taskManagerMainClass); + hasLogback, hasLog4j, hasKrb5, taskManagerMainClass); log.info("Starting TaskManagers with command: " + launchCommand); @@ -597,11 +687,17 @@ public class YarnApplicationMasterRunner { containerEnv.put(ENV_FLINK_CLASSPATH, classPathString); Utils.setupYarnClassPath(yarnConfig, containerEnv); - containerEnv.put(YarnConfigKeys.ENV_CLIENT_USERNAME, yarnClientUsername); + containerEnv.put(YarnConfigKeys.ENV_HADOOP_USER_NAME, UserGroupInformation.getCurrentUser().getUserName()); + + if(remoteKeytabPath != null && remoteKeytabPrincipal != null) { + containerEnv.put(YarnConfigKeys.KEYTAB_PATH, remoteKeytabPath); + containerEnv.put(YarnConfigKeys.KEYTAB_PRINCIPAL, remoteKeytabPrincipal); + } ctx.setEnvironment(containerEnv); try (DataOutputBuffer dob = new DataOutputBuffer()) { + LOG.debug("Adding security tokens to Task Manager Container launch Context...."); UserGroupInformation user = UserGroupInformation.getCurrentUser(); Credentials credentials = user.getCredentials(); credentials.writeTokenStorageToStream(dob); http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-yarn/src/main/java/org/apache/flink/yarn/YarnConfigKeys.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnConfigKeys.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnConfigKeys.java index b14d7b7..ada241c 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnConfigKeys.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnConfigKeys.java @@ -32,7 +32,6 @@ public class YarnConfigKeys { public final static String ENV_APP_ID = "_APP_ID"; public static final String ENV_CLIENT_HOME_DIR = "_CLIENT_HOME_DIR"; public static final String ENV_CLIENT_SHIP_FILES = "_CLIENT_SHIP_FILES"; - public static final String ENV_CLIENT_USERNAME = "_CLIENT_USERNAME"; public static final String ENV_SLOTS = "_SLOTS"; public static final String ENV_DETACHED = "_DETACHED"; public static final String ENV_DYNAMIC_PROPERTIES = "_DYNAMIC_PROPERTIES"; @@ -41,8 +40,13 @@ public class YarnConfigKeys { public final static String FLINK_JAR_PATH = "_FLINK_JAR_PATH"; // the Flink jar resource location (in HDFS). + public final static String KEYTAB_PATH = "_KEYTAB_PATH"; + public final static String KEYTAB_PRINCIPAL = "_KEYTAB_PRINCIPAL"; + public final static String ENV_HADOOP_USER_NAME = "HADOOP_USER_NAME"; public static final String ENV_ZOOKEEPER_NAMESPACE = "_ZOOKEEPER_NAMESPACE"; + public static final String ENV_KRB5_PATH = "_KRB5_PATH"; + public static final String ENV_YARN_SITE_XML_PATH = "_YARN_SITE_XML_PATH"; // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java index 9638137..c70a30b 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java @@ -18,22 +18,22 @@ package org.apache.flink.yarn; +import java.io.File; import java.io.IOException; -import java.security.PrivilegedAction; import java.util.Map; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.security.SecurityContext; import org.apache.flink.runtime.taskmanager.TaskManager; import org.apache.flink.runtime.util.EnvironmentInformation; import org.apache.flink.runtime.util.JvmShutdownSafeguard; import org.apache.flink.runtime.util.SignalHandler; import org.apache.flink.util.Preconditions; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; import org.slf4j.Logger; @@ -64,8 +64,18 @@ public class YarnTaskManagerRunner { // read the environment variables for YARN final Map<String, String> envs = System.getenv(); - final String yarnClientUsername = envs.get(YarnConfigKeys.ENV_CLIENT_USERNAME); + final String yarnClientUsername = envs.get(YarnConfigKeys.ENV_HADOOP_USER_NAME); final String localDirs = envs.get(Environment.LOCAL_DIRS.key()); + LOG.info("Current working/local Directory: {}", localDirs); + + final String currDir = envs.get(Environment.PWD.key()); + LOG.info("Current working Directory: {}", currDir); + + final String remoteKeytabPath = envs.get(YarnConfigKeys.KEYTAB_PATH); + LOG.info("TM: remoteKeytabPath obtained {}", remoteKeytabPath); + + final String remoteKeytabPrincipal = envs.get(YarnConfigKeys.KEYTAB_PRINCIPAL); + LOG.info("TM: remoteKeytabPrincipal obtained {}", remoteKeytabPrincipal); // configure local directory String flinkTempDirs = configuration.getString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, null); @@ -78,34 +88,66 @@ public class YarnTaskManagerRunner { "specified in the Flink config: " + flinkTempDirs); } - LOG.info("YARN daemon runs as '" + UserGroupInformation.getCurrentUser().getShortUserName() + - "' setting user to execute Flink TaskManager to '" + yarnClientUsername + "'"); - // tell akka to die in case of an error configuration.setBoolean(ConfigConstants.AKKA_JVM_EXIT_ON_FATAL_ERROR, true); - UserGroupInformation ugi = UserGroupInformation.createRemoteUser(yarnClientUsername); - for (Token<? extends TokenIdentifier> toks : UserGroupInformation.getCurrentUser().getTokens()) { - ugi.addToken(toks); + String keytabPath = null; + if(remoteKeytabPath != null) { + File f = new File(currDir, Utils.KEYTAB_FILE_NAME); + keytabPath = f.getAbsolutePath(); + LOG.info("keytabPath: {}", keytabPath); } + UserGroupInformation currentUser = UserGroupInformation.getCurrentUser(); + + LOG.info("YARN daemon is running as: {} Yarn client user obtainer: {}", + currentUser.getShortUserName(), yarnClientUsername ); + // Infer the resource identifier from the environment variable String containerID = Preconditions.checkNotNull(envs.get(YarnFlinkResourceManager.ENV_FLINK_CONTAINER_ID)); final ResourceID resourceId = new ResourceID(containerID); LOG.info("ResourceID assigned for this container: {}", resourceId); - ugi.doAs(new PrivilegedAction<Object>() { - @Override - public Object run() { - try { - TaskManager.selectNetworkInterfaceAndRunTaskManager(configuration, resourceId, taskManager); - } - catch (Throwable t) { - LOG.error("Error while starting the TaskManager", t); - System.exit(TaskManager.STARTUP_FAILURE_RETURN_CODE()); - } - return null; + try { + + SecurityContext.SecurityConfiguration sc = new SecurityContext.SecurityConfiguration(); + + //To support Yarn Secure Integration Test Scenario + File krb5Conf = new File(currDir, Utils.KRB5_FILE_NAME); + if(krb5Conf.exists() && krb5Conf.canRead()) { + String krb5Path = krb5Conf.getAbsolutePath(); + LOG.info("KRB5 Conf: {}", krb5Path); + org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration(); + conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos"); + conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, "true"); + sc.setHadoopConfiguration(conf); + } + + if(keytabPath != null && remoteKeytabPrincipal != null) { + configuration.setString(ConfigConstants.SECURITY_KEYTAB_KEY, keytabPath); + configuration.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY, remoteKeytabPrincipal); } - }); + configuration.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, currDir); + + SecurityContext.install(sc.setFlinkConfiguration(configuration)); + + SecurityContext.getInstalled().runSecured(new SecurityContext.FlinkSecuredRunner<Integer>() { + @Override + public Integer run() { + try { + TaskManager.selectNetworkInterfaceAndRunTaskManager(configuration, resourceId, taskManager); + } + catch (Throwable t) { + LOG.error("Error while starting the TaskManager", t); + System.exit(TaskManager.STARTUP_FAILURE_RETURN_CODE()); + } + return null; + } + }); + } catch(Exception e) { + LOG.error("Exception occurred while launching Task Manager. Reason: {}", e); + throw new RuntimeException(e); + } + } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java index 8f02a1c..b5364f0 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java @@ -24,11 +24,14 @@ import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; import org.apache.commons.cli.PosixParser; import org.apache.commons.lang3.StringUtils; +import org.apache.flink.client.CliFrontend; import org.apache.flink.client.cli.CliFrontendParser; import org.apache.flink.client.cli.CustomCommandLine; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.runtime.security.SecurityContext; import org.apache.flink.yarn.AbstractYarnClusterDescriptor; import org.apache.flink.yarn.YarnClusterDescriptor; import org.apache.flink.yarn.YarnClusterClient; @@ -460,9 +463,27 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient> } } - public static void main(String[] args) { - FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", ""); // no prefix for the YARN session - System.exit(cli.run(args)); + public static void main(final String[] args) { + final FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", "", true); // no prefix for the YARN session + + String confDirPath = CliFrontend.getConfigurationDirectoryFromEnv(); + GlobalConfiguration.loadConfiguration(confDirPath); + Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration(); + flinkConfiguration.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, confDirPath); + try { + SecurityContext.install(new SecurityContext.SecurityConfiguration().setFlinkConfiguration(flinkConfiguration)); + int retCode = SecurityContext.getInstalled().runSecured(new SecurityContext.FlinkSecuredRunner<Integer>() { + @Override + public Integer run() { + return cli.run(args); + } + }); + System.exit(retCode); + } catch(Exception e) { + e.printStackTrace(); + LOG.error("Exception Occured. Reason: {}", e); + return; + } } @Override @@ -523,6 +544,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient> try { return yarnClusterDescriptor.deploy(); } catch (Exception e) { + LOG.error("Error while deploying YARN cluster: "+e.getMessage(), e); throw new RuntimeException("Error deploying the YARN cluster", e); } http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 02e868e..5b3148a 100644 --- a/pom.xml +++ b/pom.xml @@ -107,6 +107,13 @@ under the License. <jackson.version>2.7.4</jackson.version> <metrics.version>3.1.0</metrics.version> <junit.version>4.11</junit.version> + <!-- + Keeping the MiniKDC version fixed instead of taking hadoop version dependency + to support testing Kafka, ZK etc., modules that does not have Hadoop dependency + Starting Hadoop 3, org.apache.kerby will be used instead of MiniKDC. We may have + to revisit the impact at that time. + --> + <minikdc.version>2.7.2</minikdc.version> </properties> <dependencies> http://git-wip-us.apache.org/repos/asf/flink/blob/25a622fd/tools/log4j-travis.properties ---------------------------------------------------------------------- diff --git a/tools/log4j-travis.properties b/tools/log4j-travis.properties index 53379b4..476cee3 100644 --- a/tools/log4j-travis.properties +++ b/tools/log4j-travis.properties @@ -45,3 +45,6 @@ log4j.logger.org.apache.flink.runtime.leaderretrieval=DEBUG # the tests log4j.logger.org.apache.flink.yarn.YARNSessionFIFOITCase=INFO, console log4j.logger.org.apache.flink.yarn.YARNSessionCapacitySchedulerITCase=INFO, console +log4j.logger.org.apache.flink.streaming.connectors.kafka=INFO, console +log4j.logger.org.I0Itec.zkclient=INFO, console +log4j.logger.org.apache.flink.streaming.connectors.kafka.internals.SimpleConsumerThread=OFF \ No newline at end of file