Repository: storm
Updated Branches:
  refs/heads/master 8120e15ef -> 4f5fdd928


STORM-3027: Make impersonation optional


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3983a28a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3983a28a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3983a28a

Branch: refs/heads/master
Commit: 3983a28aacf5f5a966e2009e7639ed4c0d0b1335
Parents: 8120e15
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Fri Mar 9 11:12:20 2018 -0600
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Wed Apr 11 09:57:42 2018 -0500

----------------------------------------------------------------------
 .../security/auth/ThriftConnectionType.java     | 26 +++++++++++------
 .../auth/digest/DigestSaslTransportPlugin.java  |  4 +--
 .../auth/digest/JassPasswordProvider.java       |  5 ++++
 .../kerberos/KerberosSaslTransportPlugin.java   |  6 ++--
 .../auth/kerberos/ServerCallbackHandler.java    |  8 +++++-
 .../auth/plain/PlainSaslTransportPlugin.java    |  5 ++--
 .../security/auth/sasl/PasswordProvider.java    |  8 ++++++
 .../security/auth/sasl/SaslTransportPlugin.java |  5 ++--
 .../sasl/SimpleSaslServerCallbackHandler.java   | 30 ++++++++++++++------
 9 files changed, 71 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/3983a28a/storm-client/src/jvm/org/apache/storm/security/auth/ThriftConnectionType.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/security/auth/ThriftConnectionType.java 
b/storm-client/src/jvm/org/apache/storm/security/auth/ThriftConnectionType.java
index 91d280c..4238908 100644
--- 
a/storm-client/src/jvm/org/apache/storm/security/auth/ThriftConnectionType.java
+++ 
b/storm-client/src/jvm/org/apache/storm/security/auth/ThriftConnectionType.java
@@ -30,15 +30,15 @@ import java.util.Map;
 public enum ThriftConnectionType {
     NIMBUS(Config.NIMBUS_THRIFT_TRANSPORT_PLUGIN, Config.NIMBUS_THRIFT_PORT, 
Config.NIMBUS_QUEUE_SIZE,
         Config.NIMBUS_THRIFT_THREADS, Config.NIMBUS_THRIFT_MAX_BUFFER_SIZE, 
Config.STORM_THRIFT_SOCKET_TIMEOUT_MS,
-        WorkerTokenServiceType.NIMBUS),
+        WorkerTokenServiceType.NIMBUS, true),
     SUPERVISOR(Config.SUPERVISOR_THRIFT_TRANSPORT_PLUGIN, 
Config.SUPERVISOR_THRIFT_PORT, Config.SUPERVISOR_QUEUE_SIZE,
         Config.SUPERVISOR_THRIFT_THREADS, 
Config.SUPERVISOR_THRIFT_MAX_BUFFER_SIZE,
-        Config.SUPERVISOR_THRIFT_SOCKET_TIMEOUT_MS, 
WorkerTokenServiceType.SUPERVISOR),
+        Config.SUPERVISOR_THRIFT_SOCKET_TIMEOUT_MS, 
WorkerTokenServiceType.SUPERVISOR, false),
     //A DRPC token only works for the invocations transport, not for the basic 
thrift transport.
     DRPC(Config.DRPC_THRIFT_TRANSPORT_PLUGIN, Config.DRPC_PORT, 
Config.DRPC_QUEUE_SIZE,
-         Config.DRPC_WORKER_THREADS, Config.DRPC_MAX_BUFFER_SIZE, null, null),
+         Config.DRPC_WORKER_THREADS, Config.DRPC_MAX_BUFFER_SIZE, null, null, 
false),
     DRPC_INVOCATIONS(Config.DRPC_INVOCATIONS_THRIFT_TRANSPORT_PLUGIN, 
Config.DRPC_INVOCATIONS_PORT, null,
-         Config.DRPC_INVOCATIONS_THREADS, Config.DRPC_MAX_BUFFER_SIZE, null, 
WorkerTokenServiceType.DRPC),
+         Config.DRPC_INVOCATIONS_THREADS, Config.DRPC_MAX_BUFFER_SIZE, null, 
WorkerTokenServiceType.DRPC, false),
     LOCAL_FAKE;
 
     private final String transConf;
@@ -49,20 +49,21 @@ public enum ThriftConnectionType {
     private final String socketTimeoutConf;
     private final boolean isFake;
     private final WorkerTokenServiceType wtType;
+    private final boolean impersonationAllowed;
 
     ThriftConnectionType() {
-        this(null, null, null, null, null, null, true, null);
+        this(null, null, null, null, null, null, true, null, false);
     }
     
     ThriftConnectionType(String transConf, String portConf, String qConf,
                          String threadsConf, String buffConf, String 
socketTimeoutConf,
-                         WorkerTokenServiceType wtType) {
-        this(transConf, portConf, qConf, threadsConf, buffConf, 
socketTimeoutConf, false, wtType);
+                         WorkerTokenServiceType wtType, boolean 
impersonationAllowed) {
+        this(transConf, portConf, qConf, threadsConf, buffConf, 
socketTimeoutConf, false, wtType, impersonationAllowed);
     }
     
     ThriftConnectionType(String transConf, String portConf, String qConf,
                          String threadsConf, String buffConf, String 
socketTimeoutConf, boolean isFake,
-                         WorkerTokenServiceType wtType) {
+                         WorkerTokenServiceType wtType, boolean 
impersonationAllowed) {
         this.transConf = transConf;
         this.portConf = portConf;
         this.qConf = qConf;
@@ -71,6 +72,7 @@ public enum ThriftConnectionType {
         this.socketTimeoutConf = socketTimeoutConf;
         this.isFake = isFake;
         this.wtType = wtType;
+        this.impersonationAllowed = impersonationAllowed;
     }
 
     public boolean isFake() {
@@ -126,4 +128,12 @@ public enum ThriftConnectionType {
     public WorkerTokenServiceType getWtType() {
         return wtType;
     }
+
+    /**
+     * Check if SASL impersonation is allowed for this transport type.
+     * @return true if it is else false.
+     */
+    public boolean isImpersonationAllowed() {
+        return impersonationAllowed;
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/3983a28a/storm-client/src/jvm/org/apache/storm/security/auth/digest/DigestSaslTransportPlugin.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/security/auth/digest/DigestSaslTransportPlugin.java
 
b/storm-client/src/jvm/org/apache/storm/security/auth/digest/DigestSaslTransportPlugin.java
index 1272712..4181932 100644
--- 
a/storm-client/src/jvm/org/apache/storm/security/auth/digest/DigestSaslTransportPlugin.java
+++ 
b/storm-client/src/jvm/org/apache/storm/security/auth/digest/DigestSaslTransportPlugin.java
@@ -41,9 +41,9 @@ public class DigestSaslTransportPlugin extends 
SaslTransportPlugin {
     public static final String DIGEST = "DIGEST-MD5";
     private static final Logger LOG = 
LoggerFactory.getLogger(DigestSaslTransportPlugin.class);
 
-    protected TTransportFactory getServerTransportFactory() throws IOException 
{
+    protected TTransportFactory getServerTransportFactory(boolean 
impersonationAllowed) throws IOException {
         //create an authentication callback handler
-        CallbackHandler serverCallbackHandler = new 
SimpleSaslServerCallbackHandler(
+        CallbackHandler serverCallbackHandler = new 
SimpleSaslServerCallbackHandler(impersonationAllowed,
             new WorkerTokenAuthorizer(conf, type),
             new JassPasswordProvider(loginConf));
 

http://git-wip-us.apache.org/repos/asf/storm/blob/3983a28a/storm-client/src/jvm/org/apache/storm/security/auth/digest/JassPasswordProvider.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/security/auth/digest/JassPasswordProvider.java
 
b/storm-client/src/jvm/org/apache/storm/security/auth/digest/JassPasswordProvider.java
index bb3f1bf..7858bed 100644
--- 
a/storm-client/src/jvm/org/apache/storm/security/auth/digest/JassPasswordProvider.java
+++ 
b/storm-client/src/jvm/org/apache/storm/security/auth/digest/JassPasswordProvider.java
@@ -82,4 +82,9 @@ public class JassPasswordProvider implements PasswordProvider 
{
     public Optional<char[]> getPasswordFor(String user) {
         return Optional.ofNullable(credentials.get(user));
     }
+
+    @Override
+    public boolean isImpersonationAllowed() {
+        return true;
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/3983a28a/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/KerberosSaslTransportPlugin.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/KerberosSaslTransportPlugin.java
 
b/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/KerberosSaslTransportPlugin.java
index 157ae54..f327914 100644
--- 
a/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/KerberosSaslTransportPlugin.java
+++ 
b/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/KerberosSaslTransportPlugin.java
@@ -99,9 +99,9 @@ public class KerberosSaslTransportPlugin extends 
SaslTransportPlugin {
     }
 
     @Override
-    public TTransportFactory getServerTransportFactory() throws IOException {
+    public TTransportFactory getServerTransportFactory(boolean 
impersonationAllowed) throws IOException {
         //create an authentication callback handler
-        CallbackHandler server_callback_handler = new 
ServerCallbackHandler(loginConf);
+        CallbackHandler server_callback_handler = new 
ServerCallbackHandler(loginConf, impersonationAllowed);
         
         //login our principal
         Subject subject = null;
@@ -138,7 +138,7 @@ public class KerberosSaslTransportPlugin extends 
SaslTransportPlugin {
 
         //Also add in support for worker tokens
         factory.addServerDefinition(DIGEST, AuthUtils.SERVICE, "localhost", 
null,
-            new SimpleSaslServerCallbackHandler(new 
WorkerTokenAuthorizer(conf, type)));
+            new SimpleSaslServerCallbackHandler(impersonationAllowed, new 
WorkerTokenAuthorizer(conf, type)));
 
         //create a wrap transport factory so that we could apply user 
credential during connections
         TUGIAssumingTransportFactory wrapFactory = new 
TUGIAssumingTransportFactory(factory, subject); 

http://git-wip-us.apache.org/repos/asf/storm/blob/3983a28a/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/ServerCallbackHandler.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/ServerCallbackHandler.java
 
b/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/ServerCallbackHandler.java
index 4da8652..b280187 100644
--- 
a/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/ServerCallbackHandler.java
+++ 
b/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/ServerCallbackHandler.java
@@ -36,8 +36,10 @@ import java.io.IOException;
  */
 public class ServerCallbackHandler implements CallbackHandler {
     private static final Logger LOG = 
LoggerFactory.getLogger(ServerCallbackHandler.class);
+    private final boolean impersonationAllowed;
 
-    public ServerCallbackHandler(Configuration configuration) throws 
IOException {
+    public ServerCallbackHandler(Configuration configuration, boolean 
impersonationAllowed) throws IOException {
+        this.impersonationAllowed = impersonationAllowed;
         if (configuration == null) {
             return;
         }
@@ -94,6 +96,10 @@ public class ServerCallbackHandler implements 
CallbackHandler {
             //When authNid and authZid are not equal , authNId is attempting 
to impersonate authZid, We
             //add the authNid as the real user in reqContext's subject which 
will be used during authorization.
             if (!ac.getAuthenticationID().equals(ac.getAuthorizationID())) {
+                if (!impersonationAllowed) {
+                    throw new 
IllegalArgumentException(ac.getAuthenticationID() + " attempting to impersonate 
" + ac.getAuthorizationID()
+                        + ".  This is not allowed by this server.");
+                }
                 ReqContext.context().setRealPrincipal(new 
SaslTransportPlugin.User(ac.getAuthenticationID()));
             } else {
                 ReqContext.context().setRealPrincipal(null);

http://git-wip-us.apache.org/repos/asf/storm/blob/3983a28a/storm-client/src/jvm/org/apache/storm/security/auth/plain/PlainSaslTransportPlugin.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/security/auth/plain/PlainSaslTransportPlugin.java
 
b/storm-client/src/jvm/org/apache/storm/security/auth/plain/PlainSaslTransportPlugin.java
index 2df61c1..cc1c997 100644
--- 
a/storm-client/src/jvm/org/apache/storm/security/auth/plain/PlainSaslTransportPlugin.java
+++ 
b/storm-client/src/jvm/org/apache/storm/security/auth/plain/PlainSaslTransportPlugin.java
@@ -44,9 +44,10 @@ public class PlainSaslTransportPlugin extends 
SaslTransportPlugin {
     private static final Logger LOG = 
LoggerFactory.getLogger(PlainSaslTransportPlugin.class);
 
     @Override
-    protected TTransportFactory getServerTransportFactory() throws IOException 
{
+    protected TTransportFactory getServerTransportFactory(boolean 
impersonationAllowed) throws IOException {
         //create an authentication callback handler
-        CallbackHandler serverCallbackHandler = new 
SimpleSaslServerCallbackHandler((userName) -> 
Optional.of("password".toCharArray()));
+        CallbackHandler serverCallbackHandler = new 
SimpleSaslServerCallbackHandler(impersonationAllowed,
+            (userName) -> Optional.of("password".toCharArray()));
         if 
(Security.getProvider(SaslPlainServer.SecurityProvider.SASL_PLAIN_SERVER) == 
null) {
             Security.addProvider(new SaslPlainServer.SecurityProvider());
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/3983a28a/storm-client/src/jvm/org/apache/storm/security/auth/sasl/PasswordProvider.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/security/auth/sasl/PasswordProvider.java
 
b/storm-client/src/jvm/org/apache/storm/security/auth/sasl/PasswordProvider.java
index d1e0c0f..497d6db 100644
--- 
a/storm-client/src/jvm/org/apache/storm/security/auth/sasl/PasswordProvider.java
+++ 
b/storm-client/src/jvm/org/apache/storm/security/auth/sasl/PasswordProvider.java
@@ -35,6 +35,14 @@ public interface PasswordProvider {
     Optional<char[]> getPasswordFor(String user);
 
     /**
+     * Should impersonation be allowed by this password provider.  The default 
is false.
+     * @return true if it should else false.
+     */
+    default boolean isImpersonationAllowed() {
+        return false;
+    }
+
+    /**
      * Convert the supplied user name to the actual user name that should be 
used
      * in the system.  This may be called on any name.  If it cannot be 
translated
      * then a null may be returned or an exception thrown.  If getPassword 
returns successfully

http://git-wip-us.apache.org/repos/asf/storm/blob/3983a28a/storm-client/src/jvm/org/apache/storm/security/auth/sasl/SaslTransportPlugin.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/security/auth/sasl/SaslTransportPlugin.java
 
b/storm-client/src/jvm/org/apache/storm/security/auth/sasl/SaslTransportPlugin.java
index 71cdbf5..b4f09e4 100644
--- 
a/storm-client/src/jvm/org/apache/storm/security/auth/sasl/SaslTransportPlugin.java
+++ 
b/storm-client/src/jvm/org/apache/storm/security/auth/sasl/SaslTransportPlugin.java
@@ -68,7 +68,7 @@ public abstract class SaslTransportPlugin implements 
ITransportPlugin {
     public TServer getServer(TProcessor processor) throws IOException, 
TTransportException {
         int configuredPort = type.getPort(conf);
         Integer socketTimeout = type.getSocketTimeOut(conf);
-        TTransportFactory serverTransportFactory = getServerTransportFactory();
+        TTransportFactory serverTransportFactory = 
getServerTransportFactory(type.isImpersonationAllowed());
         TServerSocket serverTransport = null;
         if (socketTimeout != null) {
             serverTransport = new TServerSocket(configuredPort, socketTimeout);
@@ -100,10 +100,11 @@ public abstract class SaslTransportPlugin implements 
ITransportPlugin {
 
     /**
      * Create the transport factory needed for serving.  All subclass must 
implement this method.
+     * @param impersonationAllowed true if SASL impersonation should be 
allowed, else false.
      * @return server transport factory
      * @throws IOException on any error.
      */
-    protected abstract TTransportFactory getServerTransportFactory() throws 
IOException;
+    protected abstract TTransportFactory getServerTransportFactory(boolean 
impersonationAllowed) throws IOException;
     
     @Override
     public int getPort() {

http://git-wip-us.apache.org/repos/asf/storm/blob/3983a28a/storm-client/src/jvm/org/apache/storm/security/auth/sasl/SimpleSaslServerCallbackHandler.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/security/auth/sasl/SimpleSaslServerCallbackHandler.java
 
b/storm-client/src/jvm/org/apache/storm/security/auth/sasl/SimpleSaslServerCallbackHandler.java
index d64fa1b..66488b9 100644
--- 
a/storm-client/src/jvm/org/apache/storm/security/auth/sasl/SimpleSaslServerCallbackHandler.java
+++ 
b/storm-client/src/jvm/org/apache/storm/security/auth/sasl/SimpleSaslServerCallbackHandler.java
@@ -31,28 +31,33 @@ import 
javax.security.auth.callback.UnsupportedCallbackException;
 import javax.security.sasl.AuthorizeCallback;
 import javax.security.sasl.RealmCallback;
 import org.apache.storm.security.auth.ReqContext;
+import org.apache.storm.streams.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class SimpleSaslServerCallbackHandler implements CallbackHandler {
     private static final Logger LOG = 
LoggerFactory.getLogger(SimpleSaslServerCallbackHandler.class);
     private final List<PasswordProvider> providers;
+    private final boolean impersonationAllowed;
 
     /**
      * Constructor with different password providers.
+     * @param impersonationAllowed true if impersonation is allowed else false.
      * @param providers what will provide a password.  They will be checked in 
order, and the first one to
      *     return a password wins.
      */
-    public SimpleSaslServerCallbackHandler(PasswordProvider ... providers) {
-        this(Arrays.asList(providers));
+    public SimpleSaslServerCallbackHandler(boolean impersonationAllowed, 
PasswordProvider ... providers) {
+        this(impersonationAllowed, Arrays.asList(providers));
     }
 
     /**
      * Constructor with different password providers.
+     * @param impersonationAllowed true if impersonation is allowed else false.
      * @param providers what will provide a password.  They will be checked in 
order, and the first one to
      *     return a password wins.
      */
-    public SimpleSaslServerCallbackHandler(List<PasswordProvider> providers) {
+    public SimpleSaslServerCallbackHandler(boolean impersonationAllowed, 
List<PasswordProvider> providers) {
+        this.impersonationAllowed = impersonationAllowed;
         this.providers = new ArrayList<>(providers);
     }
 
@@ -82,12 +87,12 @@ public class SimpleSaslServerCallbackHandler implements 
CallbackHandler {
         }
     }
 
-    private String translateName(String orig) {
+    private Pair<String, Boolean> translateName(String orig) {
         for (PasswordProvider provider: providers) {
             try {
                 String ret = provider.userName(orig);
                 if (ret != null) {
-                    return ret;
+                    return Pair.of(ret, provider.isImpersonationAllowed());
                 }
             } catch (Exception e) {
                 //Translating the name (this call) happens in a different 
callback from validating
@@ -103,7 +108,7 @@ public class SimpleSaslServerCallbackHandler implements 
CallbackHandler {
         // In the worst case we will return a serialized name after a password 
provider said that the password
         // was okay.  In that case the ACLs are likely to prevent the request 
from going through anyways.
         // But that is only if there is a bug in one of the password providers.
-        return orig;
+        return Pair.of(orig, false);
     }
 
     @Override
@@ -152,14 +157,19 @@ public class SimpleSaslServerCallbackHandler implements 
CallbackHandler {
         }
 
         if (ac != null) {
+            boolean allowImpersonation = impersonationAllowed;
             String nid = ac.getAuthenticationID();
             if (nid != null) {
-                nid = translateName(nid);
+                Pair<String, Boolean> tmp = translateName(nid);
+                nid = tmp.getFirst();
+                allowImpersonation = allowImpersonation && tmp.getSecond();
             }
 
             String zid = ac.getAuthorizationID();
             if (zid != null) {
-                zid = translateName(zid);
+                Pair<String, Boolean> tmp = translateName(zid);
+                zid = tmp.getFirst();
+                allowImpersonation = allowImpersonation && tmp.getSecond();
             }
             LOG.info("Successfully authenticated client: authenticationID = {} 
authorizationID = {}",
                 nid, zid);
@@ -177,6 +187,10 @@ public class SimpleSaslServerCallbackHandler implements 
CallbackHandler {
             if (!nid.equals(zid)) {
                 LOG.info("Impersonation attempt  authenticationID = {} 
authorizationID = {}",
                     nid,  zid);
+                if (!allowImpersonation) {
+                    throw new 
IllegalArgumentException(ac.getAuthenticationID() + " attempting to impersonate 
" + ac.getAuthorizationID()
+                        + ".  This is not allowed.");
+                }
                 ReqContext.context().setRealPrincipal(new 
SaslTransportPlugin.User(nid));
             } else {
                 ReqContext.context().setRealPrincipal(null);

Reply via email to