Github user laurentgo commented on a diff in the pull request:

    https://github.com/apache/drill/pull/578#discussion_r102344639
  
    --- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/rpc/security/ServerAuthenticationHandler.java
 ---
    @@ -0,0 +1,269 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.rpc.security;
    +
    +import com.google.common.collect.ImmutableMap;
    +import com.google.common.collect.Maps;
    +import com.google.protobuf.ByteString;
    +import com.google.protobuf.Internal.EnumLite;
    +import com.google.protobuf.InvalidProtocolBufferException;
    +import io.netty.buffer.ByteBuf;
    +import io.netty.buffer.ByteBufInputStream;
    +import org.apache.drill.exec.proto.UserBitShared.SaslMessage;
    +import org.apache.drill.exec.proto.UserBitShared.SaslStatus;
    +import org.apache.drill.exec.rpc.RequestHandler;
    +import org.apache.drill.exec.rpc.Response;
    +import org.apache.drill.exec.rpc.ResponseSender;
    +import org.apache.drill.exec.rpc.RpcException;
    +import org.apache.drill.exec.rpc.ServerConnection;
    +import org.apache.hadoop.security.UserGroupInformation;
    +
    +import javax.security.sasl.SaslException;
    +import javax.security.sasl.SaslServer;
    +import java.io.IOException;
    +import java.lang.reflect.UndeclaredThrowableException;
    +import java.security.PrivilegedExceptionAction;
    +import java.util.EnumMap;
    +import java.util.Map;
    +
    +import static com.google.common.base.Preconditions.checkNotNull;
    +
    +public class ServerAuthenticationHandler<C extends ServerConnection, T 
extends EnumLite> implements RequestHandler<C> {
    +  private static final org.slf4j.Logger logger =
    +      org.slf4j.LoggerFactory.getLogger(ServerAuthenticationHandler.class);
    +
    +  private static final ImmutableMap<SaslStatus, SaslResponseProcessor> 
RESPONSE_PROCESSORS;
    +
    +  static {
    +    final Map<SaslStatus, SaslResponseProcessor> map = new 
EnumMap<>(SaslStatus.class);
    +    map.put(SaslStatus.SASL_START, new SaslStartProcessor());
    +    map.put(SaslStatus.SASL_IN_PROGRESS, new SaslInProgressProcessor());
    +    map.put(SaslStatus.SASL_SUCCESS, new SaslSuccessProcessor());
    +    map.put(SaslStatus.SASL_FAILED, new SaslFailedProcessor());
    +    RESPONSE_PROCESSORS = Maps.immutableEnumMap(map);
    +  }
    +
    +  private final RequestHandler<C> requestHandler;
    +  private final int saslRequestTypeValue;
    +  private final T saslResponseType;
    +
    +  public ServerAuthenticationHandler(final RequestHandler<C> 
requestHandler, final int saslRequestTypeValue,
    +                                     final T saslResponseType) {
    +    this.requestHandler = requestHandler;
    +    this.saslRequestTypeValue = saslRequestTypeValue;
    +    this.saslResponseType = saslResponseType;
    +  }
    +
    +  @Override
    +  public void handle(C connection, int rpcType, ByteBuf pBody, ByteBuf 
dBody, ResponseSender sender)
    +      throws RpcException {
    +    final String remoteAddress = connection.getRemoteAddress().toString();
    +
    +    // exchange involves server "challenges" and client "responses" 
(initiated by client)
    +    if (saslRequestTypeValue == rpcType) {
    +      final SaslMessage saslResponse;
    +      try {
    +        saslResponse = SaslMessage.PARSER.parseFrom(new 
ByteBufInputStream(pBody));
    +      } catch (final InvalidProtocolBufferException e) {
    +        handleAuthFailure(connection, remoteAddress, sender, e, 
saslResponseType);
    +        return;
    +      }
    +
    +      logger.trace("Received SASL message {} from {}", 
saslResponse.getStatus(), remoteAddress);
    +      final SaslResponseProcessor processor = 
RESPONSE_PROCESSORS.get(saslResponse.getStatus());
    +      if (processor == null) {
    +        logger.info("Unknown message type from client from {}. Will stop 
authentication.", remoteAddress);
    +        handleAuthFailure(connection, remoteAddress, sender, new 
SaslException("Received unexpected message"),
    +            saslResponseType);
    +        return;
    +      }
    +
    +      final SaslResponseContext<C, T> context = new 
SaslResponseContext<>(saslResponse, connection, remoteAddress,
    +          sender, requestHandler, saslResponseType);
    +      try {
    +        processor.process(context);
    +      } catch (final Exception e) {
    +        handleAuthFailure(connection, remoteAddress, sender, e, 
saslResponseType);
    +      }
    +    } else {
    +
    +      // this handler only handles messages of SASL_MESSAGE_VALUE type
    +
    +      // drop connection
    +      connection.close();
    +
    +      // the response type for this request type is likely known from 
UserRpcConfig,
    +      // but the client should not be making any requests before 
authenticating.
    +      throw new UnsupportedOperationException(
    +          String.format("Request of type %d is not allowed without 
authentication. " +
    +                  "Client on %s must authenticate before making requests. 
Connection dropped.",
    +              rpcType, remoteAddress));
    +    }
    +  }
    +
    +  private static class SaslResponseContext<C extends ServerConnection, T 
extends EnumLite> {
    +
    +    final SaslMessage saslResponse;
    +    final C connection;
    +    final String remoteAddress;
    +    final ResponseSender sender;
    +    final RequestHandler<C> requestHandler;
    +    final T saslResponseType;
    +
    +    SaslResponseContext(SaslMessage saslResponse, C connection, String 
remoteAddress, ResponseSender sender,
    +                        RequestHandler<C> requestHandler, T 
saslResponseType) {
    +      this.saslResponse = checkNotNull(saslResponse);
    +      this.connection = checkNotNull(connection);
    +      this.remoteAddress = checkNotNull(remoteAddress);
    +      this.sender = checkNotNull(sender);
    +      this.requestHandler = checkNotNull(requestHandler);
    +      this.saslResponseType = checkNotNull(saslResponseType);
    +    }
    +  }
    +
    +  private interface SaslResponseProcessor {
    +
    +    /**
    +     * Process response from client, and if there are no exceptions, send 
response using
    +     * {@link SaslResponseContext#sender}. Otherwise, throw the exception.
    +     *
    +     * @param context response context
    +     */
    +    void process(SaslResponseContext context) throws Exception;
    +
    +  }
    +
    +  private static class SaslStartProcessor implements SaslResponseProcessor 
{
    +
    +    @Override
    +    public void process(final SaslResponseContext context) throws 
Exception {
    +      
context.connection.initSaslServer(context.saslResponse.getMechanism());
    +
    +      // assume #evaluateResponse must be called at least once
    +      
RESPONSE_PROCESSORS.get(SaslStatus.SASL_IN_PROGRESS).process(context);
    +    }
    +  }
    +
    +  private static class SaslInProgressProcessor implements 
SaslResponseProcessor {
    +
    +    @Override
    +    public void process(final SaslResponseContext context) throws 
Exception {
    +      final SaslMessage.Builder challenge = SaslMessage.newBuilder();
    +      final SaslServer saslServer = context.connection.getSaslServer();
    +
    +      final byte[] challengeBytes = evaluateResponse(saslServer, 
context.saslResponse.getData().toByteArray());
    +
    +      if (saslServer.isComplete()) {
    +        challenge.setStatus(SaslStatus.SASL_SUCCESS);
    +        if (challengeBytes != null) {
    +          challenge.setData(ByteString.copyFrom(challengeBytes));
    +        }
    +
    +        handleSuccess(context, challenge, saslServer);
    +      } else {
    +        challenge.setStatus(SaslStatus.SASL_IN_PROGRESS)
    +            .setData(ByteString.copyFrom(challengeBytes));
    +        context.sender.send(new Response(context.saslResponseType, 
challenge.build()));
    +      }
    +    }
    +  }
    +
    +  // only when client succeeds first
    +  private static class SaslSuccessProcessor implements 
SaslResponseProcessor {
    +
    +    @Override
    +    public void process(final SaslResponseContext context) throws 
Exception {
    +      // at this point, #isComplete must be false; so try once, fail 
otherwise
    +      final SaslServer saslServer = context.connection.getSaslServer();
    +
    +      evaluateResponse(saslServer, 
context.saslResponse.getData().toByteArray()); // discard challenge
    +
    +      if (saslServer.isComplete()) {
    +        final SaslMessage.Builder challenge = SaslMessage.newBuilder();
    +        challenge.setStatus(SaslStatus.SASL_SUCCESS);
    +
    +        handleSuccess(context, challenge, saslServer);
    +      } else {
    +        logger.info("Failed to authenticate client from {}", 
context.remoteAddress);
    +        throw new SaslException("Client allegedly succeeded 
authentication, but server did not. Suspicious?");
    +      }
    +    }
    +  }
    +
    +  private static class SaslFailedProcessor implements 
SaslResponseProcessor {
    +
    +    @Override
    +    public void process(final SaslResponseContext context) throws 
Exception {
    +      logger.info("Client from {} failed authentication graciously, and 
does not want to continue.",
    +          context.remoteAddress);
    +      throw new SaslException("Client graciously failed authentication");
    +    }
    +  }
    +
    +  private static byte[] evaluateResponse(final SaslServer saslServer,
    +                                         final byte[] responseBytes) 
throws SaslException {
    +    try {
    +      return UserGroupInformation.getLoginUser().doAs(new 
PrivilegedExceptionAction<byte[]>() {
    +        @Override
    +        public byte[] run() throws Exception {
    +          return saslServer.evaluateResponse(responseBytes);
    +        }
    +      });
    +    } catch (final UndeclaredThrowableException e) {
    +      throw new SaslException(String.format("Unexpected failure trying to 
authenticate using %s",
    +          saslServer.getMechanismName()), e.getCause());
    +    } catch (final IOException | InterruptedException e) {
    +      if (e instanceof SaslException) {
    +        throw (SaslException) e;
    +      } else {
    +        throw new SaslException(String.format("Unexpected failure trying 
to authenticate using %s",
    +            saslServer.getMechanismName()), e);
    +      }
    +    }
    +  }
    +
    +  @SuppressWarnings("unchecked")
    +  private static void handleSuccess(final SaslResponseContext context, 
final SaslMessage.Builder challenge,
    +                                    final SaslServer saslServer) throws 
IOException {
    +    context.connection.changeHandlerTo(context.requestHandler);
    +    context.connection.finalizeSaslSession();
    +    context.sender.send(new Response(context.saslResponseType, 
challenge.build()));
    +
    +    // setup security layers here..
    --- End diff --
    
    we should probably dispose of the saslServer (`saslServer#dispose()`)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to