Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3057#discussion_r95008182
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
 ---
    @@ -71,163 +64,93 @@
         */
        public static void install(SecurityConfiguration config) throws 
Exception {
     
    -           if (!config.securityIsEnabled()) {
    -                   // do not perform any initialization if no Kerberos 
crendetails are provided
    -                   return;
    -           }
    -
    -           // establish the JAAS config
    -           JaasConfiguration jaasConfig = new 
JaasConfiguration(config.keytab, config.principal);
    -           
javax.security.auth.login.Configuration.setConfiguration(jaasConfig);
    -
    -           populateSystemSecurityProperties(config.flinkConf);
    -
    -           // establish the UGI login user
    -           UserGroupInformation.setConfiguration(config.hadoopConf);
    -
    -           // only configure Hadoop security if we have security enabled
    -           if (UserGroupInformation.isSecurityEnabled()) {
    -
    -                   final UserGroupInformation loginUser;
    -
    -                   if (config.keytab != null && 
!StringUtils.isBlank(config.principal)) {
    -                           String keytabPath = (new 
File(config.keytab)).getAbsolutePath();
    -
    -                           
UserGroupInformation.loginUserFromKeytab(config.principal, keytabPath);
    -
    -                           loginUser = UserGroupInformation.getLoginUser();
    -
    -                           // supplement with any available tokens
    -                           String fileLocation = 
System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
    -                           if (fileLocation != null) {
    -                           /*
    -                            * Use reflection API since the API semantics 
are not available in Hadoop1 profile. Below APIs are
    -                            * used in the context of reading the stored 
tokens from UGI.
    -                            * Credentials cred = 
Credentials.readTokenStorageFile(new File(fileLocation), config.hadoopConf);
    -                            * loginUser.addCredentials(cred);
    -                           */
    -                                   try {
    -                                           Method 
readTokenStorageFileMethod = Credentials.class.getMethod("readTokenStorageFile",
    -                                                   File.class, 
org.apache.hadoop.conf.Configuration.class);
    -                                           Credentials cred = 
(Credentials) readTokenStorageFileMethod.invoke(null, new File(fileLocation),
    -                                                   config.hadoopConf);
    -                                           Method addCredentialsMethod = 
UserGroupInformation.class.getMethod("addCredentials",
    -                                                   Credentials.class);
    -                                           
addCredentialsMethod.invoke(loginUser, cred);
    -                                   } catch (NoSuchMethodException e) {
    -                                           LOG.warn("Could not find method 
implementations in the shaded jar. Exception: {}", e);
    -                                   }
    -                           }
    -                   } else {
    -                           // login with current user credentials (e.g. 
ticket cache)
    -                           try {
    -                                   //Use reflection API to get the login 
user object
    -                                   
//UserGroupInformation.loginUserFromSubject(null);
    -                                   Method loginUserFromSubjectMethod = 
UserGroupInformation.class.getMethod("loginUserFromSubject", Subject.class);
    -                                   Subject subject = null;
    -                                   loginUserFromSubjectMethod.invoke(null, 
subject);
    -                           } catch (NoSuchMethodException e) {
    -                                   LOG.warn("Could not find method 
implementations in the shaded jar. Exception: {}", e);
    -                           }
    -
    -                           // note that the stored tokens are read 
automatically
    -                           loginUser = UserGroupInformation.getLoginUser();
    +           // install the security modules
    +           List<SecurityModule> modules = new ArrayList();
    +           try {
    +                   for (Class<? extends SecurityModule> moduleClass : 
config.getSecurityModules()) {
    +                           SecurityModule module = 
moduleClass.newInstance();
    +                           module.install(config);
    +                           modules.add(module);
                        }
    +           }
    +           catch(Exception ex) {
    +                   throw new Exception("unable to establish the security 
context", ex);
    +           }
    +           installedModules = modules;
     
    -                   LOG.info("Hadoop user set to {}", loginUser.toString());
    +           // install a security context
    +           // use the Hadoop login user as the subject of the installed 
security context
    +           if (!(installedContext instanceof NoOpSecurityContext)) {
    +                   LOG.warn("overriding previous security context");
    +           }
    +           UserGroupInformation loginUser = 
UserGroupInformation.getLoginUser();
    +           installedContext = new HadoopSecurityContext(loginUser);
    +   }
     
    -                   boolean delegationToken = false;
    -                   final Text HDFS_DELEGATION_KIND = new 
Text("HDFS_DELEGATION_TOKEN");
    -                   Collection<Token<? extends TokenIdentifier>> usrTok = 
loginUser.getTokens();
    -                   for (Token<? extends TokenIdentifier> token : usrTok) {
    -                           final Text id = new Text(token.getIdentifier());
    -                           LOG.debug("Found user token " + id + " with " + 
token);
    -                           if 
(token.getKind().equals(HDFS_DELEGATION_KIND)) {
    -                                   delegationToken = true;
    +   static void uninstall() {
    +           if(installedModules != null) {
    +                   for (SecurityModule module : 
Lists.reverse(installedModules)) {
    +                           try {
    +                                   module.uninstall();
                                }
    -                   }
    -
    -                   if (!loginUser.hasKerberosCredentials()) {
    -                           //throw an error in non-yarn deployment if 
kerberos cache is not available
    -                           if (!delegationToken) {
    -                                   LOG.error("Hadoop Security is enabled 
but current login user does not have Kerberos Credentials");
    -                                   throw new RuntimeException("Hadoop 
Security is enabled but current login user does not have Kerberos Credentials");
    +                           catch(UnsupportedOperationException e) {
                                }
                        }
    -
    -                   if (!(installedContext instanceof NoOpSecurityContext)) 
{
    -                           LOG.warn("overriding previous security 
context");
    -                   }
    -
    -                   installedContext = new HadoopSecurityContext(loginUser);
    +                   installedModules = null;
                }
    -   }
     
    -   static void clearContext() {
                installedContext = new NoOpSecurityContext();
        }
     
    -   /*
    -    * This method configures some of the system properties that are 
require for ZK and Kafka SASL authentication
    -    * See: 
https://github.com/apache/kafka/blob/0.9.0/clients/src/main/java/org/apache/kafka/common/security/kerberos/Login.java#L289
    -    * See: 
https://github.com/sgroschupf/zkclient/blob/master/src/main/java/org/I0Itec/zkclient/ZkClient.java#L900
    -    * In this method, setting java.security.auth.login.config 
configuration is configured only to support ZK and
    -    * Kafka current code behavior.
    +   /**
    +    * The global security configuration.
    +    *
    +    * See {@link SecurityOptions} for corresponding configuration options.
         */
    -   private static void populateSystemSecurityProperties(Configuration 
configuration) {
    -           Preconditions.checkNotNull(configuration, "The supplied 
configuration was null");
    +   public static class SecurityConfiguration {
     
    -           boolean disableSaslClient = 
configuration.getBoolean(HighAvailabilityOptions.ZOOKEEPER_SASL_DISABLE);
    +           private static final List<Class<? extends SecurityModule>> 
DEFAULT_MODULES = Collections.unmodifiableList(
    +                   new ArrayList<Class<? extends SecurityModule>>() {{
    +                           add(HadoopModule.class);
    +                           add(JaasModule.class);
    +                           add(ZooKeeperModule.class);
    +                   }});
     
    -           if (disableSaslClient) {
    -                   LOG.info("SASL client auth for ZK will be disabled");
    -                   //SASL auth is disabled by default but will be enabled 
if specified in configuration
    -                   System.setProperty(ZOOKEEPER_SASL_CLIENT,"false");
    -                   return;
    -           }
    +           private final org.apache.hadoop.conf.Configuration hadoopConf;
     
    -           // load Jaas config file to initialize SASL
    -           final File jaasConfFile;
    -           try {
    -                   Path jaasConfPath = 
Files.createTempFile(JAAS_CONF_FILENAME, "");
    -                   InputStream jaasConfStream = 
SecurityUtils.class.getClassLoader().getResourceAsStream(JAAS_CONF_FILENAME);
    -                   Files.copy(jaasConfStream, jaasConfPath, 
StandardCopyOption.REPLACE_EXISTING);
    -                   jaasConfFile = jaasConfPath.toFile();
    -                   jaasConfFile.deleteOnExit();
    -                   jaasConfStream.close();
    -           } catch (IOException e) {
    -                   throw new RuntimeException("SASL auth is enabled for ZK 
but unable to " +
    -                           "locate pseudo Jaas config provided with 
Flink", e);
    -           }
    +           private final boolean useTicketCache;
     
    -           LOG.info("Enabling {} property with pseudo JAAS config file: 
{}",
    -                           JAVA_SECURITY_AUTH_LOGIN_CONFIG, 
jaasConfFile.getAbsolutePath());
    +           private final String keytab;
     
    -           //ZK client module lookup the configuration to handle SASL.
    -           
//https://github.com/sgroschupf/zkclient/blob/master/src/main/java/org/I0Itec/zkclient/ZkClient.java#L900
    -           System.setProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, 
jaasConfFile.getAbsolutePath());
    -           System.setProperty(ZOOKEEPER_SASL_CLIENT, "true");
    +           private final String principal;
     
    -           String zkSaslServiceName = 
configuration.getValue(HighAvailabilityOptions.ZOOKEEPER_SASL_SERVICE_NAME);
    -           if (!StringUtils.isBlank(zkSaslServiceName)) {
    -                   LOG.info("ZK SASL service name: {} is provided in the 
configuration", zkSaslServiceName);
    -                   System.setProperty(ZOOKEEPER_SASL_CLIENT_USERNAME, 
zkSaslServiceName);
    -           }
    +           private List<String> loginContextNames;
     
    -   }
    +           private String zkServiceName;
    --- End diff --
    
    Some fields are finals others not. Can all be final here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to