Repository: storm Updated Branches: refs/heads/master 2709a1cda -> 6a26bef8f
[STORM-2806] Give users an option to disable the login cache Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/864188f1 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/864188f1 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/864188f1 Branch: refs/heads/master Commit: 864188f1cfb18aedccf9e235e18de3b059ebdc80 Parents: 566db74 Author: Ethan Li <[email protected]> Authored: Thu Nov 9 15:54:47 2017 -0600 Committer: Ethan Li <[email protected]> Committed: Thu Nov 9 17:02:19 2017 -0600 ---------------------------------------------------------------------- .../kerberos/KerberosSaslTransportPlugin.java | 90 +++++++++++++------- 1 file changed, 59 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/864188f1/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/KerberosSaslTransportPlugin.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/KerberosSaslTransportPlugin.java b/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/KerberosSaslTransportPlugin.java index 642be76..b6571ba 100644 --- a/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/KerberosSaslTransportPlugin.java +++ b/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/KerberosSaslTransportPlugin.java @@ -53,41 +53,43 @@ public class KerberosSaslTransportPlugin extends SaslTransportPlugin { public static final String KERBEROS = "GSSAPI"; private static final Logger LOG = LoggerFactory.getLogger(KerberosSaslTransportPlugin.class); private static Map <LoginCacheKey, Login> loginCache = new ConcurrentHashMap<>(); + private static final String DISABLE_LOGIN_CACHE = "disableLoginCache"; private class LoginCacheKey { - private String _keyString = null; + private String keyString = null; - public LoginCacheKey(Configuration conf, String login_context) throws IOException { - if (conf == null) { - throw new IllegalArgumentException("Configuration should not be null"); - } - SortedMap<String, ?> authConf = AuthUtils.pullConfig(conf, login_context); - if (authConf!=null) { + public LoginCacheKey(SortedMap<String, ?> authConf) throws IOException { + if (authConf != null) { StringBuilder stringBuilder = new StringBuilder(); for (String configKey: authConf.keySet()) { + //DISABLE_LOGIN_CACHE indicates whether or not to use the LoginCache. + //So we exclude it from the keyString + if (configKey.equals(DISABLE_LOGIN_CACHE)) { + continue; + } String configValue = (String) authConf.get(configKey); stringBuilder.append(configKey); stringBuilder.append(configValue); } - _keyString = stringBuilder.toString(); + keyString = stringBuilder.toString(); } else { - throw new RuntimeException("Error in parsing the kerberos login Configuration, returned null"); + throw new IllegalArgumentException("Configuration should not be null"); } } @Override public int hashCode() { - return _keyString.hashCode(); + return keyString.hashCode(); } @Override public boolean equals(Object obj) { - return (obj instanceof LoginCacheKey) && _keyString.equals(((LoginCacheKey)obj)._keyString); + return (obj instanceof LoginCacheKey) && keyString.equals(((LoginCacheKey)obj).keyString); } @Override public String toString() { - return (_keyString); + return (keyString); } } @@ -135,29 +137,55 @@ public class KerberosSaslTransportPlugin extends SaslTransportPlugin { return wrapFactory; } + private Login mkLogin() throws IOException { + try { + //create an authentication callback handler + ClientCallbackHandler client_callback_handler = new ClientCallbackHandler(login_conf); + //specify a configuration object to be used + Configuration.setConfiguration(login_conf); + //now login + Login login = new Login(AuthUtils.LOGIN_CONTEXT_CLIENT, client_callback_handler); + login.startThreadIfNeeded(); + return login; + } catch (LoginException ex) { + LOG.error("Server failed to login in principal:" + ex, ex); + throw new RuntimeException(ex); + } + } + @Override public TTransport connect(TTransport transport, String serverHost, String asUser) throws TTransportException, IOException { - //create an authentication callback handler - ClientCallbackHandler client_callback_handler = new ClientCallbackHandler(login_conf); - //login our user - LoginCacheKey key = new LoginCacheKey(login_conf, AuthUtils.LOGIN_CONTEXT_CLIENT); - Login login = loginCache.get(key); - if (login == null) { - LOG.debug("Kerberos Login was not found in the Login Cache, attempting to contact the Kerberos Server"); - synchronized (loginCache) { - login = loginCache.get(key); - if (login == null) { - try { - //specify a configuration object to be used - Configuration.setConfiguration(login_conf); - //now login - login = new Login(AuthUtils.LOGIN_CONTEXT_CLIENT, client_callback_handler); - login.startThreadIfNeeded(); + SortedMap<String, ?> authConf = AuthUtils.pullConfig(login_conf, AuthUtils.LOGIN_CONTEXT_CLIENT); + if (authConf == null) { + throw new RuntimeException("Error in parsing the kerberos login Configuration, returned null"); + } + + boolean disableLoginCache = false; + if (authConf.containsKey(DISABLE_LOGIN_CACHE)) { + disableLoginCache = Boolean.valueOf((String) authConf.get(DISABLE_LOGIN_CACHE)); + } + + Login login; + LoginCacheKey key = new LoginCacheKey(authConf); + if (disableLoginCache) { + LOG.debug("Kerberos Login Cache is disabled, attempting to contact the Kerberos Server"); + login = mkLogin(); + //this is to prevent the potential bug that + //if the Login Cache is (1) enabled, and then (2) disabled and then (3) enabled again, + //and if the LoginCacheKey remains unchanged, (3) will use the Login cache from (1), which could be wrong, + //because the TGT cache (as well as the principle) could have been changed during (2) + loginCache.remove(key); + } else { + LOG.debug("Trying to get the Kerberos Login from the Login Cache"); + login = loginCache.get(key); + if (login == null) { + synchronized (loginCache) { + login = loginCache.get(key); + if (login == null) { + LOG.debug("Kerberos Login was not found in the Login Cache, attempting to contact the Kerberos Server"); + login = mkLogin(); loginCache.put(key, login); - } catch (LoginException ex) { - LOG.error("Server failed to login in principal:" + ex, ex); - throw new RuntimeException(ex); } } }
