walterddr commented on a change in pull request #10836:
URL: https://github.com/apache/flink/pull/10836#discussion_r416703366
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
##########
@@ -56,42 +55,52 @@ public static SecurityContext getInstalledContext() {
* <p>Applies the configuration using the available security modules
(i.e. Hadoop, JAAS).
*/
public static void install(SecurityConfiguration config) throws
Exception {
+ // Install the security modules first before installing the
security context
+ installModules(config);
+ installContext(config);
+ }
+
+ static void installModules(SecurityConfiguration config) throws
Exception {
- // install the security modules
+ // install the security module factories
List<SecurityModule> modules = new ArrayList<>();
- try {
- for (SecurityModuleFactory moduleFactory :
config.getSecurityModuleFactories()) {
- SecurityModule module =
moduleFactory.createModule(config);
- // can be null if a SecurityModule is not
supported in the current environment
- if (module != null) {
- module.install();
- modules.add(module);
- }
+ for (String moduleFactoryClass :
config.getSecurityModuleFactories()) {
+ SecurityModuleFactory moduleFactory = null;
+ try {
+ moduleFactory =
SecurityFactoryServiceLoader.findModuleFactory(moduleFactoryClass);
+ } catch (NoMatchSecurityFactoryException ne) {
+ LOG.error("Unable to instantiate security
module factory {}", moduleFactoryClass);
+ throw new IllegalArgumentException("Unable to
find module factory class", ne);
+ }
+ SecurityModule module =
moduleFactory.createModule(config);
+ // can be null if a SecurityModule is not supported in
the current environment
+ if (module != null) {
+ module.install();
+ modules.add(module);
}
- }
- catch (Exception ex) {
- throw new Exception("unable to establish the security
context", ex);
}
installedModules = modules;
+ }
- // First check if we have Hadoop in the ClassPath. If not, we
simply don't do anything.
- try {
- Class.forName(
-
"org.apache.hadoop.security.UserGroupInformation",
- false,
- SecurityUtils.class.getClassLoader());
-
- // install a security context
- // use the Hadoop login user as the subject of the
installed security context
- if (!(installedContext instanceof NoOpSecurityContext))
{
- LOG.warn("overriding previous security
context");
+ static void installContext(SecurityConfiguration config) throws
Exception {
+ // install the security context factory
+ for (String contextFactoryClass :
config.getSecurityContextFactories()) {
+ try {
+ SecurityContextFactory contextFactory =
SecurityFactoryServiceLoader.findContextFactory(contextFactoryClass);
+ if (contextFactory.isCompatibleWith(config)) {
+ // install the first context that's
compatible.
+ installedContext =
contextFactory.createContext(config);
+ break;
+ } else {
+ LOG.warn("Unable to install
incompatible security context factory {}", contextFactoryClass);
+ }
+ } catch (NoMatchSecurityFactoryException ne) {
+ LOG.warn("Unable to instantiate security
context factory {}", contextFactoryClass);
}
- UserGroupInformation loginUser =
UserGroupInformation.getLoginUser();
- installedContext = new HadoopSecurityContext(loginUser);
- } catch (ClassNotFoundException e) {
- LOG.info("Cannot install HadoopSecurityContext because
Hadoop cannot be found in the Classpath.");
- } catch (LinkageError e) {
Review comment:
This Error was not captured correctly
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]