http://git-wip-us.apache.org/repos/asf/drill/blob/920a12a6/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/security/TestUserBitKerberosEncryption.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/security/TestUserBitKerberosEncryption.java b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/security/TestUserBitKerberosEncryption.java index aa26fd6..640eb40 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/security/TestUserBitKerberosEncryption.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/security/TestUserBitKerberosEncryption.java @@ -111,7 +111,22 @@ public class TestUserBitKerberosEncryption extends BaseTestQuery { connectionProps.setProperty(DrillProperties.SERVICE_PRINCIPAL, krbHelper.SERVER_PRINCIPAL); connectionProps.setProperty(DrillProperties.USER, krbHelper.CLIENT_PRINCIPAL); connectionProps.setProperty(DrillProperties.KEYTAB, krbHelper.clientKeytab.getAbsolutePath()); - updateClient(connectionProps); + + newConfig = new DrillConfig(DrillConfig.create(cloneDefaultTestConfigProperties()) + .withValue(ExecConstants.USER_AUTHENTICATION_ENABLED, + ConfigValueFactory.fromAnyRef(true)) + .withValue(ExecConstants.USER_AUTHENTICATOR_IMPL, + ConfigValueFactory.fromAnyRef(UserAuthenticatorTestImpl.TYPE)) + .withValue(BootStrapContext.SERVICE_PRINCIPAL, + ConfigValueFactory.fromAnyRef(krbHelper.SERVER_PRINCIPAL)) + .withValue(BootStrapContext.SERVICE_KEYTAB_LOCATION, + ConfigValueFactory.fromAnyRef(krbHelper.serverKeytab.toString())) + .withValue(ExecConstants.AUTHENTICATION_MECHANISMS, + ConfigValueFactory.fromIterable(Lists.newArrayList("plain", "kerberos"))) + .withValue(ExecConstants.USER_ENCRYPTION_SASL_ENABLED, + ConfigValueFactory.fromAnyRef(true))); + + updateTestCluster(1, newConfig, connectionProps); // Run few queries using the new client testBuilder() @@ -145,7 +160,22 @@ public class TestUserBitKerberosEncryption extends BaseTestQuery { connectionProps.setProperty(DrillProperties.SERVICE_PRINCIPAL, krbHelper.SERVER_PRINCIPAL); connectionProps.setProperty(DrillProperties.USER, krbHelper.CLIENT_PRINCIPAL); connectionProps.setProperty(DrillProperties.KEYTAB, krbHelper.clientKeytab.getAbsolutePath()); - updateClient(connectionProps); + + newConfig = new DrillConfig(DrillConfig.create(cloneDefaultTestConfigProperties()) + .withValue(ExecConstants.USER_AUTHENTICATION_ENABLED, + ConfigValueFactory.fromAnyRef(true)) + .withValue(ExecConstants.USER_AUTHENTICATOR_IMPL, + ConfigValueFactory.fromAnyRef(UserAuthenticatorTestImpl.TYPE)) + .withValue(BootStrapContext.SERVICE_PRINCIPAL, + ConfigValueFactory.fromAnyRef(krbHelper.SERVER_PRINCIPAL)) + .withValue(BootStrapContext.SERVICE_KEYTAB_LOCATION, + ConfigValueFactory.fromAnyRef(krbHelper.serverKeytab.toString())) + .withValue(ExecConstants.AUTHENTICATION_MECHANISMS, + ConfigValueFactory.fromIterable(Lists.newArrayList("plain", "kerberos"))) + .withValue(ExecConstants.USER_ENCRYPTION_SASL_ENABLED, + ConfigValueFactory.fromAnyRef(true))); + + updateTestCluster(1, newConfig, connectionProps); assertTrue(UserRpcMetrics.getInstance().getEncryptedConnectionCount() == 1); assertTrue(UserRpcMetrics.getInstance().getUnEncryptedConnectionCount() == 0); @@ -177,10 +207,24 @@ public class TestUserBitKerberosEncryption extends BaseTestQuery { final Subject clientSubject = JaasKrbUtil.loginUsingKeytab(krbHelper.CLIENT_PRINCIPAL, krbHelper.clientKeytab.getAbsoluteFile()); + newConfig = new DrillConfig(DrillConfig.create(cloneDefaultTestConfigProperties()) + .withValue(ExecConstants.USER_AUTHENTICATION_ENABLED, + ConfigValueFactory.fromAnyRef(true)) + .withValue(ExecConstants.USER_AUTHENTICATOR_IMPL, + ConfigValueFactory.fromAnyRef(UserAuthenticatorTestImpl.TYPE)) + .withValue(BootStrapContext.SERVICE_PRINCIPAL, + ConfigValueFactory.fromAnyRef(krbHelper.SERVER_PRINCIPAL)) + .withValue(BootStrapContext.SERVICE_KEYTAB_LOCATION, + ConfigValueFactory.fromAnyRef(krbHelper.serverKeytab.toString())) + .withValue(ExecConstants.AUTHENTICATION_MECHANISMS, + ConfigValueFactory.fromIterable(Lists.newArrayList("plain", "kerberos"))) + .withValue(ExecConstants.USER_ENCRYPTION_SASL_ENABLED, + ConfigValueFactory.fromAnyRef(true))); + Subject.doAs(clientSubject, new PrivilegedExceptionAction<Void>() { @Override public Void run() throws Exception { - updateClient(connectionProps); + updateTestCluster(1, newConfig, connectionProps); return null; } });
http://git-wip-us.apache.org/repos/asf/drill/blob/920a12a6/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java index 34f75f4..f86d698 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java @@ -29,6 +29,7 @@ import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Properties; import org.apache.commons.math3.util.Pair; import org.apache.drill.exec.work.foreman.FragmentsRunner; @@ -205,7 +206,7 @@ public class TestDrillbitResilience extends DrillTest { // create a client final DrillConfig drillConfig = zkHelper.getConfig(); - drillClient = QueryTestUtil.createClient(drillConfig, remoteServiceSet, 1, null); + drillClient = QueryTestUtil.createClient(drillConfig, remoteServiceSet, 1, new Properties()); clearAllInjections(); } http://git-wip-us.apache.org/repos/asf/drill/blob/920a12a6/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestResourceLeak.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestResourceLeak.java b/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestResourceLeak.java index 94c8ebf..1098dc4 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestResourceLeak.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestResourceLeak.java @@ -87,7 +87,7 @@ public class TestResourceLeak extends DrillTest { bit = new Drillbit(config, serviceSet); bit.run(); - client = QueryTestUtil.createClient(config, serviceSet, 2, null); + client = QueryTestUtil.createClient(config, serviceSet, 2, new Properties()); } @Test http://git-wip-us.apache.org/repos/asf/drill/blob/920a12a6/exec/rpc/pom.xml ---------------------------------------------------------------------- diff --git a/exec/rpc/pom.xml b/exec/rpc/pom.xml index ea56574..5ed62ee 100644 --- a/exec/rpc/pom.xml +++ b/exec/rpc/pom.xml @@ -77,6 +77,10 @@ </exclusion> </exclusions> </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + </dependency> </dependencies> http://git-wip-us.apache.org/repos/asf/drill/blob/920a12a6/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicClient.java ---------------------------------------------------------------------- diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicClient.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicClient.java index 0f4ef1b..4395db3 100644 --- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicClient.java +++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicClient.java @@ -17,6 +17,10 @@ */ package org.apache.drill.exec.rpc; +import com.google.common.base.Preconditions; +import com.google.protobuf.Internal.EnumLite; +import com.google.protobuf.MessageLite; +import com.google.protobuf.Parser; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; @@ -32,16 +36,17 @@ import io.netty.handler.timeout.IdleStateEvent; import io.netty.handler.timeout.IdleStateHandler; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; - -import java.util.concurrent.TimeUnit; - import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.proto.GeneralRPCProtos.RpcMode; - -import com.google.common.base.Preconditions; -import com.google.protobuf.Internal.EnumLite; -import com.google.protobuf.MessageLite; -import com.google.protobuf.Parser; +import org.apache.drill.exec.rpc.security.AuthenticationOutcomeListener; +import org.apache.drill.exec.rpc.security.AuthenticatorFactory; +import org.apache.hadoop.security.UserGroupInformation; + +import javax.security.sasl.SaslClient; +import javax.security.sasl.SaslException; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; /** * @@ -69,6 +74,9 @@ public abstract class BasicClient<T extends EnumLite, CC extends ClientConnectio private final IdlePingHandler pingHandler; private ConnectionMultiListener.SSLHandshakeListener sslHandshakeListener = null; + // Determines if authentication is completed between client and server + private boolean authComplete = true; + public BasicClient(RpcConfig rpcMapping, ByteBufAllocator alloc, EventLoopGroup eventLoopGroup, T handshakeType, Class<HR> responseClass, Parser<HR> handshakeParser) { super(rpcMapping); @@ -133,6 +141,19 @@ public abstract class BasicClient<T extends EnumLite, CC extends ClientConnectio return false; } + /** + * Set's the state for authentication complete. + * @param authComplete - state to set. True means authentication between client and server is completed, false + * means authentication is in progress. + */ + protected void setAuthComplete(boolean authComplete) { + this.authComplete = authComplete; + } + + protected boolean isAuthComplete() { + return authComplete; + } + // Save the SslChannel after the SSL handshake so it can be closed later public void setSslChannel(Channel c) { @@ -180,7 +201,67 @@ public abstract class BasicClient<T extends EnumLite, CC extends ClientConnectio return (connection != null) && connection.isActive(); } - protected abstract void validateHandshake(HR validateHandshake) throws RpcException; + protected abstract List<String> validateHandshake(HR validateHandshake) throws RpcException; + + /** + * Creates various instances needed to start the SASL handshake. This is called from + * {@link BasicClient#validateHandshake(MessageLite)} if authentication is required from server side. + * @param connectionHandler - Connection handler used by client's to know about success/failure conditions. + * @param serverAuthMechanisms - List of auth mechanisms configured on server side + */ + protected abstract void prepareSaslHandshake(final RpcConnectionHandler<CC> connectionHandler, + List<String> serverAuthMechanisms) throws RpcException; + + /** + * Main method which starts the SASL handshake for all client channels (user/data/control) once it's determined + * after regular RPC handshake that authentication is required by server side. Once authentication is completed + * then only the underlying channel is made available to clients to send other RPC messages. Success and failure + * events are notified to the connection handler on which client waits. + * @param connectionHandler - Connection handler used by client's to know about success/failure conditions. + * @param saslProperties - SASL related properties needed to create SASL client. + * @param ugi - UserGroupInformation with logged in client side user + * @param authFactory - Authentication factory to use for this SASL handshake. + * @param rpcType - SASL_MESSAGE rpc type. + */ + protected void startSaslHandshake(final RpcConnectionHandler<CC> connectionHandler, + Map<String, ?> saslProperties, UserGroupInformation ugi, + AuthenticatorFactory authFactory, T rpcType) { + final String mechanismName = authFactory.getSimpleName(); + try { + final SaslClient saslClient = authFactory.createSaslClient(ugi, saslProperties); + if (saslClient == null) { + final Exception ex = new SaslException(String.format("Cannot initiate authentication using %s mechanism. " + + "Insufficient credentials or selected mechanism doesn't support configured security layers?", mechanismName)); + connectionHandler.connectionFailed(RpcConnectionHandler.FailureType.AUTHENTICATION, ex); + return; + } + connection.setSaslClient(saslClient); + } catch (final SaslException e) { + logger.error("Failed while creating SASL client for SASL handshake for connection", connection.getName()); + connectionHandler.connectionFailed(RpcConnectionHandler.FailureType.AUTHENTICATION, e); + return; + } + + logger.debug("Initiating SASL exchange."); + new AuthenticationOutcomeListener<>(this, connection, rpcType, ugi, + new RpcOutcomeListener<Void>() { + @Override + public void failed(RpcException ex) { + connectionHandler.connectionFailed(RpcConnectionHandler.FailureType.AUTHENTICATION, ex); + } + + @Override + public void success(Void value, ByteBuf buffer) { + authComplete = true; + connectionHandler.connectionSucceeded(connection); + } + + @Override + public void interrupted(InterruptedException ex) { + connectionHandler.connectionFailed(RpcConnectionHandler.FailureType.AUTHENTICATION, ex); + } + }).initiate(mechanismName); + } protected void finalizeConnection(HR handshake, CC connection) { // no-op @@ -204,12 +285,6 @@ public abstract class BasicClient<T extends EnumLite, CC extends ClientConnectio allowInEventLoop, dataBodies); } - // the command itself must be "run" by the caller (to avoid calling inEventLoop) - protected <M extends MessageLite> RpcCommand<M, CC> - getInitialCommand(final RpcCommand<M, CC> command) { - return command; - } - protected void connectAsClient(RpcConnectionHandler<CC> connectionListener, HS handshakeValue, String host, int port) { ConnectionMultiListener<T, CC, HS, HR, BasicClient<T, CC, HS, HR>> cml; http://git-wip-us.apache.org/repos/asf/drill/blob/920a12a6/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ConnectionMultiListener.java ---------------------------------------------------------------------- diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ConnectionMultiListener.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ConnectionMultiListener.java index 0cdca13..3fee5d7 100644 --- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ConnectionMultiListener.java +++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ConnectionMultiListener.java @@ -28,6 +28,7 @@ import org.apache.drill.common.exceptions.DrillException; import org.slf4j.Logger; import java.net.SocketAddress; +import java.util.List; import java.util.concurrent.TimeUnit; /** @@ -151,12 +152,21 @@ public class ConnectionMultiListener<T extends EnumLite, CC extends ClientConnec public void success(HR value, ByteBuf buffer) { // logger.debug("Handshake received. {}", value); try { - parent.validateHandshake(value); + final List<String> serverAuthMechanisms = parent.validateHandshake(value); parent.finalizeConnection(value, parent.connection); - connectionListener.connectionSucceeded(parent.connection); - // logger.debug("Handshake completed succesfully."); + + // If auth is required then start the SASL handshake + if (serverAuthMechanisms != null) { + parent.prepareSaslHandshake(connectionListener, serverAuthMechanisms); + } else { + connectionListener.connectionSucceeded(parent.connection); + logger.debug("Handshake completed successfully."); + } + } catch (NonTransientRpcException ex) { + logger.error("Failure while validating client and server sasl compatibility", ex); + connectionListener.connectionFailed(RpcConnectionHandler.FailureType.AUTHENTICATION, ex); } catch (Exception ex) { - logger.debug("Failure while validating handshake", ex); + logger.error("Failure while validating handshake", ex); connectionListener.connectionFailed(RpcConnectionHandler.FailureType.HANDSHAKE_VALIDATION, ex); } } http://git-wip-us.apache.org/repos/asf/drill/blob/920a12a6/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java ---------------------------------------------------------------------- diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java index a64a23b..3936170 100644 --- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java +++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java @@ -78,7 +78,7 @@ public abstract class ReconnectingConnection<C extends ClientConnection, HS exte } else { // logger.debug("No connection active, opening client connection."); BasicClient<?, C, HS, ?> client = getNewClient(); - ConnectionListeningFuture<T> future = new ConnectionListeningFuture<>(client.getInitialCommand(cmd)); + ConnectionListeningFuture<T> future = new ConnectionListeningFuture<>(cmd); client.connectAsClient(future, handshake, host, port); future.waitAndRun(); // logger.debug("Connection available and active, command now being run inline."); http://git-wip-us.apache.org/repos/asf/drill/blob/920a12a6/exec/rpc/src/main/java/org/apache/drill/exec/rpc/security/AuthenticationOutcomeListener.java ---------------------------------------------------------------------- diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/security/AuthenticationOutcomeListener.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/security/AuthenticationOutcomeListener.java new file mode 100644 index 0000000..5c34d01 --- /dev/null +++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/security/AuthenticationOutcomeListener.java @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.rpc.security; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; +import com.google.protobuf.ByteString; +import com.google.protobuf.Internal.EnumLite; +import com.google.protobuf.MessageLite; +import io.netty.buffer.ByteBuf; +import org.apache.drill.exec.proto.UserBitShared.SaslMessage; +import org.apache.drill.exec.proto.UserBitShared.SaslStatus; +import org.apache.drill.exec.rpc.BasicClient; +import org.apache.drill.exec.rpc.ClientConnection; +import org.apache.drill.exec.rpc.RpcException; +import org.apache.drill.exec.rpc.RpcOutcomeListener; +import org.apache.hadoop.security.UserGroupInformation; + +import javax.security.sasl.Sasl; +import javax.security.sasl.SaslClient; +import javax.security.sasl.SaslException; +import java.io.IOException; +import java.lang.reflect.UndeclaredThrowableException; +import java.security.PrivilegedExceptionAction; +import java.util.EnumMap; +import java.util.Map; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * Handles SASL exchange, on the client-side. + * + * @param <T> handshake rpc type + * @param <C> Client connection type + * @param <HS> Handshake send type + * @param <HR> Handshake receive type + */ +public class AuthenticationOutcomeListener<T extends EnumLite, C extends ClientConnection, + HS extends MessageLite, HR extends MessageLite> + implements RpcOutcomeListener<SaslMessage> { + private static final org.slf4j.Logger logger = + org.slf4j.LoggerFactory.getLogger(AuthenticationOutcomeListener.class); + + private static final ImmutableMap<SaslStatus, SaslChallengeProcessor> + CHALLENGE_PROCESSORS; + static { + final Map<SaslStatus, SaslChallengeProcessor> map = new EnumMap<>(SaslStatus.class); + map.put(SaslStatus.SASL_IN_PROGRESS, new SaslInProgressProcessor()); + map.put(SaslStatus.SASL_SUCCESS, new SaslSuccessProcessor()); + map.put(SaslStatus.SASL_FAILED, new SaslFailedProcessor()); + CHALLENGE_PROCESSORS = Maps.immutableEnumMap(map); + } + + private final BasicClient<T, C, HS, HR> client; + private final C connection; + private final T saslRpcType; + private final UserGroupInformation ugi; + private final RpcOutcomeListener<?> completionListener; + + public AuthenticationOutcomeListener(BasicClient<T, C, HS, HR> client, + C connection, T saslRpcType, UserGroupInformation ugi, + RpcOutcomeListener<?> completionListener) { + this.client = client; + this.connection = connection; + this.saslRpcType = saslRpcType; + this.ugi = ugi; + this.completionListener = completionListener; + } + + public void initiate(final String mechanismName) { + logger.trace("Initiating SASL exchange."); + try { + final ByteString responseData; + final SaslClient saslClient = connection.getSaslClient(); + if (saslClient.hasInitialResponse()) { + responseData = ByteString.copyFrom(evaluateChallenge(ugi, saslClient, new byte[0])); + } else { + responseData = ByteString.EMPTY; + } + client.send(new AuthenticationOutcomeListener<>(client, connection, saslRpcType, ugi, completionListener), + connection, + saslRpcType, + SaslMessage.newBuilder() + .setMechanism(mechanismName) + .setStatus(SaslStatus.SASL_START) + .setData(responseData) + .build(), + SaslMessage.class, + true /* the connection will not be backed up at this point */); + logger.trace("Initiated SASL exchange."); + } catch (final Exception e) { + completionListener.failed(RpcException.mapException(e)); + } + } + + @Override + public void failed(RpcException ex) { + completionListener.failed(RpcException.mapException(ex)); + } + + @Override + public void success(SaslMessage value, ByteBuf buffer) { + logger.trace("Server responded with message of type: {}", value.getStatus()); + final SaslChallengeProcessor processor = CHALLENGE_PROCESSORS.get(value.getStatus()); + if (processor == null) { + completionListener.failed(RpcException.mapException( + new SaslException("Server sent a corrupt message."))); + } else { + // SaslSuccessProcessor.process disposes saslClient so get mechanism here to use later in logging + final String mechanism = connection.getSaslClient().getMechanismName(); + try { + final SaslChallengeContext<C> context = new SaslChallengeContext<>(value, ugi, connection); + final SaslMessage saslResponse = processor.process(context); + + if (saslResponse != null) { + client.send(new AuthenticationOutcomeListener<>(client, connection, saslRpcType, ugi, completionListener), + connection, saslRpcType, saslResponse, SaslMessage.class, + true /* the connection will not be backed up at this point */); + } else { + // success + completionListener.success(null, null); + if (logger.isTraceEnabled()) { + logger.trace("Successfully authenticated to server using {} mechanism and encryption context: {}", + mechanism, connection.getEncryptionCtxtString()); + } + } + } catch (final Exception e) { + logger.error("Authentication with encryption context: {} using mechanism {} failed with {}", + connection.getEncryptionCtxtString(), mechanism, e.getMessage()); + completionListener.failed(RpcException.mapException(e)); + } + } + } + + @Override + public void interrupted(InterruptedException e) { + completionListener.interrupted(e); + } + + private static class SaslChallengeContext<C extends ClientConnection> { + + final SaslMessage challenge; + final UserGroupInformation ugi; + final C connection; + + SaslChallengeContext(SaslMessage challenge, UserGroupInformation ugi, C connection) { + this.challenge = checkNotNull(challenge); + this.ugi = checkNotNull(ugi); + this.connection = checkNotNull(connection); + } + } + + private interface SaslChallengeProcessor { + + /** + * Process challenge from server, and return a response. + * + * Returns null iff SASL exchange is complete and successful. + * + * @param context challenge context + * @return response + * @throws Exception in case of any failure + */ + <CC extends ClientConnection> + SaslMessage process(SaslChallengeContext<CC> context) throws Exception; + + } + + private static class SaslInProgressProcessor implements SaslChallengeProcessor { + + @Override + public <CC extends ClientConnection> SaslMessage process(SaslChallengeContext<CC> context) throws Exception { + final SaslMessage.Builder response = SaslMessage.newBuilder(); + final SaslClient saslClient = context.connection.getSaslClient(); + + final byte[] responseBytes = evaluateChallenge(context.ugi, saslClient, + context.challenge.getData().toByteArray()); + + final boolean isComplete = saslClient.isComplete(); + logger.trace("Evaluated challenge. Completed? {}.", isComplete); + response.setData(responseBytes != null ? ByteString.copyFrom(responseBytes) : ByteString.EMPTY); + // if isComplete, the client will get one more response from server + response.setStatus(isComplete ? SaslStatus.SASL_SUCCESS : SaslStatus.SASL_IN_PROGRESS); + return response.build(); + } + } + + private static class SaslSuccessProcessor implements SaslChallengeProcessor { + + @Override + public <CC extends ClientConnection> SaslMessage process(SaslChallengeContext<CC> context) throws Exception { + final SaslClient saslClient = context.connection.getSaslClient(); + + if (saslClient.isComplete()) { + handleSuccess(context); + return null; + } else { + // server completed before client; so try once, fail otherwise + evaluateChallenge(context.ugi, saslClient, context.challenge.getData().toByteArray()); // discard response + + if (saslClient.isComplete()) { + handleSuccess(context); + return null; + } else { + throw new SaslException("Server allegedly succeeded authentication, but client did not. Suspicious?"); + } + } + } + } + + private static class SaslFailedProcessor implements SaslChallengeProcessor { + + @Override + public <CC extends ClientConnection> SaslMessage process(SaslChallengeContext<CC> context) throws Exception { + throw new SaslException(String.format("Authentication failed. Incorrect credentials? [Details: %s]", + context.connection.getEncryptionCtxtString())); + } + } + + private static byte[] evaluateChallenge(final UserGroupInformation ugi, final SaslClient saslClient, + final byte[] challengeBytes) throws SaslException { + try { + return ugi.doAs(new PrivilegedExceptionAction<byte[]>() { + @Override + public byte[] run() throws Exception { + return saslClient.evaluateChallenge(challengeBytes); + } + }); + } catch (final UndeclaredThrowableException e) { + throw new SaslException( + String.format("Unexpected failure (%s)", saslClient.getMechanismName()), e.getCause()); + } catch (final IOException | InterruptedException e) { + if (e instanceof SaslException) { + throw (SaslException) e; + } else { + throw new SaslException( + String.format("Unexpected failure (%s)", saslClient.getMechanismName()), e); + } + } + } + + + private static <CC extends ClientConnection> void handleSuccess(SaslChallengeContext<CC> context) throws + SaslException { + final CC connection = context.connection; + final SaslClient saslClient = connection.getSaslClient(); + + try { + // Check if connection was marked for being secure then verify for negotiated QOP value for + // correctness. + final String negotiatedQOP = saslClient.getNegotiatedProperty(Sasl.QOP).toString(); + final String expectedQOP = connection.isEncryptionEnabled() + ? SaslProperties.QualityOfProtection.PRIVACY.getSaslQop() + : SaslProperties.QualityOfProtection.AUTHENTICATION.getSaslQop(); + + if (!(negotiatedQOP.equals(expectedQOP))) { + throw new SaslException(String.format("Mismatch in negotiated QOP value: %s and Expected QOP value: %s", + negotiatedQOP, expectedQOP)); + } + + // Update the rawWrapChunkSize with the negotiated buffer size since we cannot call encode with more than + // negotiated size of buffer. + if (connection.isEncryptionEnabled()) { + final int negotiatedRawSendSize = Integer.parseInt( + saslClient.getNegotiatedProperty(Sasl.RAW_SEND_SIZE).toString()); + if (negotiatedRawSendSize <= 0) { + throw new SaslException(String.format("Negotiated rawSendSize: %d is invalid. Please check the configured " + + "value of encryption.sasl.max_wrapped_size. It might be configured to a very small value.", + negotiatedRawSendSize)); + } + connection.setWrapSizeLimit(negotiatedRawSendSize); + } + } catch (Exception e) { + throw new SaslException(String.format("Unexpected failure while retrieving negotiated property values (%s)", + e.getMessage()), e); + } + + if (connection.isEncryptionEnabled()) { + connection.addSecurityHandlers(); + } else { + // Encryption is not required hence we don't need to hold on to saslClient object. + connection.disposeSaslClient(); + } + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/920a12a6/exec/rpc/src/main/java/org/apache/drill/exec/rpc/security/AuthenticatorFactory.java ---------------------------------------------------------------------- diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/security/AuthenticatorFactory.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/security/AuthenticatorFactory.java new file mode 100644 index 0000000..307ae97 --- /dev/null +++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/security/AuthenticatorFactory.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.rpc.security; + +import org.apache.hadoop.security.UserGroupInformation; + +import javax.security.sasl.SaslClient; +import javax.security.sasl.SaslException; +import javax.security.sasl.SaslServer; +import java.io.IOException; +import java.util.Map; + +/** + * An implementation of this factory will be initialized once at startup, if the authenticator is enabled + * (see {@link #getSimpleName}). For every request for this mechanism (i.e. after establishing a connection), + * {@link #createSaslServer} will be invoked on the server-side and {@link #createSaslClient} will be invoked + * on the client-side. + * + * Note: + * + Custom authenticators must have a default constructor. + * + * Examples: PlainFactory and KerberosFactory. + */ +public interface AuthenticatorFactory extends AutoCloseable { + + /** + * Name of the mechanism, in upper case. + * + * If this mechanism is present in the list of enabled mechanisms, an instance of this factory is loaded. Note + * that the simple name maybe the same as it's SASL name. + * + * @return mechanism name + */ + String getSimpleName(); + + /** + * Create and get the login user based on the given properties. + * + * @param properties config properties + * @return ugi + * @throws IOException + */ + UserGroupInformation createAndLoginUser(Map<String, ?> properties) throws IOException; + + /** + * The caller is responsible for {@link SaslServer#dispose disposing} the returned SaslServer. + * + * @param ugi ugi + * @param properties config properties + * @return sasl server + * @throws SaslException + */ + SaslServer createSaslServer(UserGroupInformation ugi, Map<String, ?> properties) throws SaslException; + + /** + * The caller is responsible for {@link SaslClient#dispose disposing} the returned SaslClient. + * + * @param ugi ugi + * @param properties config properties + * @return sasl client + * @throws SaslException + */ + SaslClient createSaslClient(UserGroupInformation ugi, Map<String, ?> properties) throws SaslException; + +} http://git-wip-us.apache.org/repos/asf/drill/blob/920a12a6/exec/rpc/src/main/java/org/apache/drill/exec/rpc/security/SaslProperties.java ---------------------------------------------------------------------- diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/security/SaslProperties.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/security/SaslProperties.java new file mode 100644 index 0000000..9ed85ce --- /dev/null +++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/security/SaslProperties.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.rpc.security; + +import javax.security.sasl.Sasl; +import java.util.HashMap; +import java.util.Map; + +public final class SaslProperties { + + /** + * All supported Quality of Protection values which can be negotiated + */ + enum QualityOfProtection { + AUTHENTICATION("auth"), + INTEGRITY("auth-int"), + PRIVACY("auth-conf"); + + public final String saslQop; + + QualityOfProtection(String saslQop) { + this.saslQop = saslQop; + } + + public String getSaslQop() { + return saslQop; + } + } + + /** + * Get's the map of minimum set of SaslProperties required during negotiation process either for encryption + * or authentication + * @param encryptionEnabled - Flag to determine if property needed is for encryption or authentication + * @param wrappedChunkSize - Configured wrappedChunkSize to negotiate for. + * @return Map of SaslProperties which will be used in negotiation. + */ + public static Map<String, String> getSaslProperties(boolean encryptionEnabled, int wrappedChunkSize) { + Map<String, String> saslProps = new HashMap<>(); + + if (encryptionEnabled) { + saslProps.put(Sasl.STRENGTH, "high"); + saslProps.put(Sasl.QOP, QualityOfProtection.PRIVACY.getSaslQop()); + saslProps.put(Sasl.MAX_BUFFER, Integer.toString(wrappedChunkSize)); + saslProps.put(Sasl.POLICY_NOPLAINTEXT, "true"); + } else { + saslProps.put(Sasl.QOP, QualityOfProtection.AUTHENTICATION.getSaslQop()); + } + + return saslProps; + } + + private SaslProperties() { + + } +} \ No newline at end of file
