walterddr commented on a change in pull request #10836:
URL: https://github.com/apache/flink/pull/10836#discussion_r416703366



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
##########
@@ -56,42 +55,52 @@ public static SecurityContext getInstalledContext() {
         * <p>Applies the configuration using the available security modules 
(i.e. Hadoop, JAAS).
         */
        public static void install(SecurityConfiguration config) throws 
Exception {
+               // Install the security modules first before installing the 
security context
+               installModules(config);
+               installContext(config);
+       }
+
+       static void installModules(SecurityConfiguration config) throws 
Exception {
 
-               // install the security modules
+               // install the security module factories
                List<SecurityModule> modules = new ArrayList<>();
-               try {
-                       for (SecurityModuleFactory moduleFactory : 
config.getSecurityModuleFactories()) {
-                               SecurityModule module = 
moduleFactory.createModule(config);
-                               // can be null if a SecurityModule is not 
supported in the current environment
-                               if (module != null) {
-                                       module.install();
-                                       modules.add(module);
-                               }
+               for (String moduleFactoryClass : 
config.getSecurityModuleFactories()) {
+                       SecurityModuleFactory moduleFactory = null;
+                       try {
+                               moduleFactory = 
SecurityFactoryServiceLoader.findModuleFactory(moduleFactoryClass);
+                       } catch (NoMatchSecurityFactoryException ne) {
+                               LOG.error("Unable to instantiate security 
module factory {}", moduleFactoryClass);
+                               throw new IllegalArgumentException("Unable to 
find module factory class", ne);
+                       }
+                       SecurityModule module = 
moduleFactory.createModule(config);
+                       // can be null if a SecurityModule is not supported in 
the current environment
+                       if (module != null) {
+                               module.install();
+                               modules.add(module);
                        }
-               }
-               catch (Exception ex) {
-                       throw new Exception("unable to establish the security 
context", ex);
                }
                installedModules = modules;
+       }
 
-               // First check if we have Hadoop in the ClassPath. If not, we 
simply don't do anything.
-               try {
-                       Class.forName(
-                               
"org.apache.hadoop.security.UserGroupInformation",
-                               false,
-                               SecurityUtils.class.getClassLoader());
-
-                       // install a security context
-                       // use the Hadoop login user as the subject of the 
installed security context
-                       if (!(installedContext instanceof NoOpSecurityContext)) 
{
-                               LOG.warn("overriding previous security 
context");
+       static void installContext(SecurityConfiguration config) throws 
Exception {
+               // install the security context factory
+               for (String contextFactoryClass : 
config.getSecurityContextFactories()) {
+                       try {
+                               SecurityContextFactory contextFactory = 
SecurityFactoryServiceLoader.findContextFactory(contextFactoryClass);
+                               if (contextFactory.isCompatibleWith(config)) {
+                                       // install the first context that's 
compatible.
+                                       installedContext = 
contextFactory.createContext(config);
+                                       break;
+                               } else {
+                                       LOG.warn("Unable to install 
incompatible security context factory {}", contextFactoryClass);
+                               }
+                       } catch (NoMatchSecurityFactoryException ne) {
+                               LOG.warn("Unable to instantiate security 
context factory {}", contextFactoryClass);
                        }
-                       UserGroupInformation loginUser = 
UserGroupInformation.getLoginUser();
-                       installedContext = new HadoopSecurityContext(loginUser);
-               } catch (ClassNotFoundException e) {
-                       LOG.info("Cannot install HadoopSecurityContext because 
Hadoop cannot be found in the Classpath.");
-               } catch (LinkageError e) {

Review comment:
       This Error was not captured correctly




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to