Github user parthchandra commented on a diff in the pull request:

    https://github.com/apache/drill/pull/950#discussion_r141498641
  
    --- Diff: 
exec/rpc/src/main/java/org/apache/drill/exec/rpc/ConnectionMultiListener.java 
---
    @@ -0,0 +1,240 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.rpc;
    +
    +import com.google.protobuf.MessageLite;
    +import io.netty.buffer.ByteBuf;
    +import io.netty.channel.Channel;
    +import io.netty.channel.ChannelFuture;
    +import io.netty.util.concurrent.Future;
    +import io.netty.util.concurrent.GenericFutureListener;
    +import org.apache.drill.common.exceptions.DrillException;
    +import org.slf4j.Logger;
    +
    +import java.net.SocketAddress;
    +import java.util.concurrent.TimeUnit;
    +
    +/**
    + * @param <CC> Client Connection Listener
    + * @param <HS> Outbound handshake message type
    + * @param <HR> Inbound handshake message type
    + * @param <BC> BasicClient type
    + *             <p>
    + *             Implements a wrapper class that allows a client connection 
to associate different behaviours after
    + *             establishing a connection with the server. The client can 
choose to send an application handshake, or
    + *             in the case of SSL, wait for a SSL handshake completion and 
then send an application handshake.
    + */
    +
    +public class ConnectionMultiListener<CC extends ClientConnection, HS 
extends MessageLite, HR extends MessageLite, BC extends BasicClient> {
    +
    +  private static final Logger logger = 
org.slf4j.LoggerFactory.getLogger(ConnectionMultiListener.class);
    +
    +  private final RpcConnectionHandler<CC> connectionListener;
    +  private final HS handshakeValue;
    +  private final BC parent;
    +
    +  private ConnectionMultiListener(RpcConnectionHandler<CC> 
connectionListener, HS handshakeValue,
    +      BC basicClient) {
    +    assert connectionListener != null;
    +    assert handshakeValue != null;
    +
    +    this.connectionListener = connectionListener;
    +    this.handshakeValue = handshakeValue;
    +    this.parent = basicClient;
    +  }
    +
    +  @SuppressWarnings("unchecked")
    +  public static <CC extends ClientConnection, HS extends MessageLite, BC 
extends BasicClient> Builder
    +  newBuilder(RpcConnectionHandler<CC> connectionListener, HS 
handshakeValue,
    +      BC basicClient) {
    +    return new Builder(connectionListener, handshakeValue, basicClient);
    +  }
    +
    +  public ConnectionHandler connectionHandler = null;
    +  public HandshakeSendHandler handshakeSendHandler = null;
    +  public SSLConnectionHandler sslConnectionHandler = null;
    +
    +  /**
    +   * Manages connection establishment outcomes.
    +   */
    +  private class ConnectionHandler implements 
GenericFutureListener<ChannelFuture> {
    +
    +    @Override public void operationComplete(ChannelFuture future) throws 
Exception {
    +      boolean isInterrupted = false;
    +
    +      // We want to wait for at least 120 secs when interrupts occur. 
Establishing a connection fails/succeeds quickly,
    +      // So there is no point propagating the interruption as failure 
immediately.
    +      long remainingWaitTimeMills = 120000;
    +      long startTime = System.currentTimeMillis();
    +      // logger.debug("Connection operation finished.  Success: {}", 
future.isSuccess());
    +      while (true) {
    +        try {
    +          future.get(remainingWaitTimeMills, TimeUnit.MILLISECONDS);
    +          if (future.isSuccess()) {
    +            SocketAddress remote = future.channel().remoteAddress();
    +            SocketAddress local = future.channel().localAddress();
    +            parent.setAddresses(remote, local);
    +            // if SSL is enabled send the handshake after the ssl 
handshake is completed, otherwise send it
    +            // now
    +            if(!parent.isSslEnabled()) {
    +              // send a handshake on the current thread. This is the only 
time we will send from within the event thread.
    +              // We can do this because the connection will not be backed 
up.
    +              parent.send(handshakeSendHandler, handshakeValue, true);
    +            }
    +          } else {
    +            
connectionListener.connectionFailed(RpcConnectionHandler.FailureType.CONNECTION,
    +                new RpcException("General connection failure."));
    +          }
    +          // logger.debug("Handshake queued for send.");
    +          break;
    +        } catch (final InterruptedException interruptEx) {
    +          remainingWaitTimeMills -= (System.currentTimeMillis() - 
startTime);
    +          startTime = System.currentTimeMillis();
    +          isInterrupted = true;
    +          if (remainingWaitTimeMills < 1) {
    +            
connectionListener.connectionFailed(RpcConnectionHandler.FailureType.CONNECTION,
 interruptEx);
    +            break;
    +          }
    +          // Ignore the interrupt and continue to wait until we elapse 
remainingWaitTimeMills.
    +        } catch (final Exception ex) {
    +          logger.error("Failed to establish connection", ex);
    +          
connectionListener.connectionFailed(RpcConnectionHandler.FailureType.CONNECTION,
 ex);
    +          break;
    +        }
    +      }
    +
    +      if (isInterrupted) {
    +        // Preserve evidence that the interruption occurred so that code 
higher up on the call stack can learn of the
    +        // interruption and respond to it if it wants to.
    +        Thread.currentThread().interrupt();
    +      }
    +    }
    +  }
    +
    +  private class SSLConnectionHandler implements 
GenericFutureListener<Future<Channel>> {
    +    @Override public void operationComplete(Future<Channel> future) throws 
Exception {
    +      // send the handshake
    +      parent.send(handshakeSendHandler, handshakeValue, true);
    +    }
    +  }
    +
    +  /**
    +   * manages handshake outcomes.
    +   */
    +  private class HandshakeSendHandler implements RpcOutcomeListener<HR> {
    +
    +    @Override public void failed(RpcException ex) {
    +      logger.debug("Failure while initiating handshake", ex);
    +      
connectionListener.connectionFailed(RpcConnectionHandler.FailureType.HANDSHAKE_COMMUNICATION,
 ex);
    +    }
    +
    +    @Override public void success(HR value, ByteBuf buffer) {
    +      // logger.debug("Handshake received. {}", value);
    +      try {
    +        parent.validateHandshake(value);
    +        parent.finalizeConnection(value, parent.connection);
    +        connectionListener.connectionSucceeded((CC) parent.connection);
    +        // logger.debug("Handshake completed succesfully.");
    +      } catch (Exception ex) {
    +        logger.debug("Failure while validating handshake", ex);
    +        
connectionListener.connectionFailed(RpcConnectionHandler.FailureType.HANDSHAKE_VALIDATION,
 ex);
    +      }
    +    }
    +
    +    @Override public void interrupted(final InterruptedException ex) {
    +      logger.warn("Interrupted while waiting for handshake response", ex);
    +      
connectionListener.connectionFailed(RpcConnectionHandler.FailureType.HANDSHAKE_COMMUNICATION,
 ex);
    +    }
    +  }
    +
    +  /*
    +    The SSL Handshake listener is special in that it is needed at the time 
of initializing an SSL
    +    enabled pipeline and so is instantiated before the instance of the 
outer class may be needed.
    +    We create an instance and set a reference back to the outer class 
instance when it is created
    +    at the time of connection.
    +   */
    +  public static class SSLHandshakeListener implements 
GenericFutureListener<Future<Channel>> {
    +    ConnectionMultiListener parent;
    +    public SSLHandshakeListener() {
    +    }
    +
    +    public void setParent(ConnectionMultiListener cml){
    +      this.parent = cml;
    +    }
    +
    +    @Override public void operationComplete(Future<Channel> future) throws 
Exception {
    +      if(parent != null){
    +        if(future.isSuccess()) {
    +          Channel c = future.get();
    +          parent.sslConnectionHandler.operationComplete(future);
    +          parent.parent.setSslChannel(c);
    +        } else {
    +          throw new DrillException("SSL handshake failed.", 
future.cause());
    +        }
    +      } else {
    +        throw new RpcException("RPC Setup error. SSL handshake complete 
handler is not set up.");
    +      }
    +      return;
    +    }
    +  }
    +
    +
    +  public static class Builder<CC extends ClientConnection, HS extends 
MessageLite, HR extends MessageLite, BC extends BasicClient> {
    +
    +    private RpcConnectionHandler<CC> connectionListener;
    +    private HS handshakeValue;
    +    private BC basicClient;
    +    private ConnectionMultiListener cml;
    +
    +    private Builder(RpcConnectionHandler<CC> connectionListener, HS 
handshakeValue, BC basicClient) {
    +      this.connectionListener = connectionListener;
    +      this.handshakeValue = handshakeValue;
    +      this.basicClient = basicClient;
    +      this.cml = new ConnectionMultiListener(connectionListener, 
handshakeValue, basicClient);
    +    }
    +
    +    public Builder enableSSL() {
    +      cml.connectionHandler = cml.new ConnectionHandler();
    +      cml.sslConnectionHandler = cml.new SSLConnectionHandler();
    +      return this;
    +    }
    +
    +    public Builder enablePlain() {
    +      cml.connectionHandler = cml.new ConnectionHandler();
    +      return this;
    +    }
    +
    +    public Builder enableHandshake() {
    +      cml.handshakeSendHandler = cml.new HandshakeSendHandler();
    +      return this;
    +    }
    +
    +    public ConnectionMultiListener build() {
    +      //always enable handshake
    +      if (cml.handshakeSendHandler == null) {
    +        enableHandshake();
    +      }
    +      if (cml.connectionHandler == null && cml.sslConnectionHandler == 
null) {
    +        enablePlain();
    +      }
    +      return cml;
    --- End diff --
    
    Agree that I can defer creation of ConnectionMultiListener to the build() 
method. 
    I thought that we should let the caller decide what handler it wants to 
enable. However, I'm clearly overriding that in the build()method, so perhaps 
you're right.


---

Reply via email to