Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3057#discussion_r95008182 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java --- @@ -71,163 +64,93 @@ */ public static void install(SecurityConfiguration config) throws Exception { - if (!config.securityIsEnabled()) { - // do not perform any initialization if no Kerberos crendetails are provided - return; - } - - // establish the JAAS config - JaasConfiguration jaasConfig = new JaasConfiguration(config.keytab, config.principal); - javax.security.auth.login.Configuration.setConfiguration(jaasConfig); - - populateSystemSecurityProperties(config.flinkConf); - - // establish the UGI login user - UserGroupInformation.setConfiguration(config.hadoopConf); - - // only configure Hadoop security if we have security enabled - if (UserGroupInformation.isSecurityEnabled()) { - - final UserGroupInformation loginUser; - - if (config.keytab != null && !StringUtils.isBlank(config.principal)) { - String keytabPath = (new File(config.keytab)).getAbsolutePath(); - - UserGroupInformation.loginUserFromKeytab(config.principal, keytabPath); - - loginUser = UserGroupInformation.getLoginUser(); - - // supplement with any available tokens - String fileLocation = System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION); - if (fileLocation != null) { - /* - * Use reflection API since the API semantics are not available in Hadoop1 profile. Below APIs are - * used in the context of reading the stored tokens from UGI. - * Credentials cred = Credentials.readTokenStorageFile(new File(fileLocation), config.hadoopConf); - * loginUser.addCredentials(cred); - */ - try { - Method readTokenStorageFileMethod = Credentials.class.getMethod("readTokenStorageFile", - File.class, org.apache.hadoop.conf.Configuration.class); - Credentials cred = (Credentials) readTokenStorageFileMethod.invoke(null, new File(fileLocation), - config.hadoopConf); - Method addCredentialsMethod = UserGroupInformation.class.getMethod("addCredentials", - Credentials.class); - addCredentialsMethod.invoke(loginUser, cred); - } catch (NoSuchMethodException e) { - LOG.warn("Could not find method implementations in the shaded jar. Exception: {}", e); - } - } - } else { - // login with current user credentials (e.g. ticket cache) - try { - //Use reflection API to get the login user object - //UserGroupInformation.loginUserFromSubject(null); - Method loginUserFromSubjectMethod = UserGroupInformation.class.getMethod("loginUserFromSubject", Subject.class); - Subject subject = null; - loginUserFromSubjectMethod.invoke(null, subject); - } catch (NoSuchMethodException e) { - LOG.warn("Could not find method implementations in the shaded jar. Exception: {}", e); - } - - // note that the stored tokens are read automatically - loginUser = UserGroupInformation.getLoginUser(); + // install the security modules + List<SecurityModule> modules = new ArrayList(); + try { + for (Class<? extends SecurityModule> moduleClass : config.getSecurityModules()) { + SecurityModule module = moduleClass.newInstance(); + module.install(config); + modules.add(module); } + } + catch(Exception ex) { + throw new Exception("unable to establish the security context", ex); + } + installedModules = modules; - LOG.info("Hadoop user set to {}", loginUser.toString()); + // install a security context + // use the Hadoop login user as the subject of the installed security context + if (!(installedContext instanceof NoOpSecurityContext)) { + LOG.warn("overriding previous security context"); + } + UserGroupInformation loginUser = UserGroupInformation.getLoginUser(); + installedContext = new HadoopSecurityContext(loginUser); + } - boolean delegationToken = false; - final Text HDFS_DELEGATION_KIND = new Text("HDFS_DELEGATION_TOKEN"); - Collection<Token<? extends TokenIdentifier>> usrTok = loginUser.getTokens(); - for (Token<? extends TokenIdentifier> token : usrTok) { - final Text id = new Text(token.getIdentifier()); - LOG.debug("Found user token " + id + " with " + token); - if (token.getKind().equals(HDFS_DELEGATION_KIND)) { - delegationToken = true; + static void uninstall() { + if(installedModules != null) { + for (SecurityModule module : Lists.reverse(installedModules)) { + try { + module.uninstall(); } - } - - if (!loginUser.hasKerberosCredentials()) { - //throw an error in non-yarn deployment if kerberos cache is not available - if (!delegationToken) { - LOG.error("Hadoop Security is enabled but current login user does not have Kerberos Credentials"); - throw new RuntimeException("Hadoop Security is enabled but current login user does not have Kerberos Credentials"); + catch(UnsupportedOperationException e) { } } - - if (!(installedContext instanceof NoOpSecurityContext)) { - LOG.warn("overriding previous security context"); - } - - installedContext = new HadoopSecurityContext(loginUser); + installedModules = null; } - } - static void clearContext() { installedContext = new NoOpSecurityContext(); } - /* - * This method configures some of the system properties that are require for ZK and Kafka SASL authentication - * See: https://github.com/apache/kafka/blob/0.9.0/clients/src/main/java/org/apache/kafka/common/security/kerberos/Login.java#L289 - * See: https://github.com/sgroschupf/zkclient/blob/master/src/main/java/org/I0Itec/zkclient/ZkClient.java#L900 - * In this method, setting java.security.auth.login.config configuration is configured only to support ZK and - * Kafka current code behavior. + /** + * The global security configuration. + * + * See {@link SecurityOptions} for corresponding configuration options. */ - private static void populateSystemSecurityProperties(Configuration configuration) { - Preconditions.checkNotNull(configuration, "The supplied configuration was null"); + public static class SecurityConfiguration { - boolean disableSaslClient = configuration.getBoolean(HighAvailabilityOptions.ZOOKEEPER_SASL_DISABLE); + private static final List<Class<? extends SecurityModule>> DEFAULT_MODULES = Collections.unmodifiableList( + new ArrayList<Class<? extends SecurityModule>>() {{ + add(HadoopModule.class); + add(JaasModule.class); + add(ZooKeeperModule.class); + }}); - if (disableSaslClient) { - LOG.info("SASL client auth for ZK will be disabled"); - //SASL auth is disabled by default but will be enabled if specified in configuration - System.setProperty(ZOOKEEPER_SASL_CLIENT,"false"); - return; - } + private final org.apache.hadoop.conf.Configuration hadoopConf; - // load Jaas config file to initialize SASL - final File jaasConfFile; - try { - Path jaasConfPath = Files.createTempFile(JAAS_CONF_FILENAME, ""); - InputStream jaasConfStream = SecurityUtils.class.getClassLoader().getResourceAsStream(JAAS_CONF_FILENAME); - Files.copy(jaasConfStream, jaasConfPath, StandardCopyOption.REPLACE_EXISTING); - jaasConfFile = jaasConfPath.toFile(); - jaasConfFile.deleteOnExit(); - jaasConfStream.close(); - } catch (IOException e) { - throw new RuntimeException("SASL auth is enabled for ZK but unable to " + - "locate pseudo Jaas config provided with Flink", e); - } + private final boolean useTicketCache; - LOG.info("Enabling {} property with pseudo JAAS config file: {}", - JAVA_SECURITY_AUTH_LOGIN_CONFIG, jaasConfFile.getAbsolutePath()); + private final String keytab; - //ZK client module lookup the configuration to handle SASL. - //https://github.com/sgroschupf/zkclient/blob/master/src/main/java/org/I0Itec/zkclient/ZkClient.java#L900 - System.setProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, jaasConfFile.getAbsolutePath()); - System.setProperty(ZOOKEEPER_SASL_CLIENT, "true"); + private final String principal; - String zkSaslServiceName = configuration.getValue(HighAvailabilityOptions.ZOOKEEPER_SASL_SERVICE_NAME); - if (!StringUtils.isBlank(zkSaslServiceName)) { - LOG.info("ZK SASL service name: {} is provided in the configuration", zkSaslServiceName); - System.setProperty(ZOOKEEPER_SASL_CLIENT_USERNAME, zkSaslServiceName); - } + private List<String> loginContextNames; - } + private String zkServiceName; --- End diff -- Some fields are finals others not. Can all be final here?
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---