Github user sohami commented on a diff in the pull request: https://github.com/apache/drill/pull/950#discussion_r140621394 --- Diff: exec/rpc/src/main/java/org/apache/drill/exec/rpc/ConnectionMultiListener.java --- @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.rpc; + +import com.google.protobuf.MessageLite; +import io.netty.buffer.ByteBuf; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GenericFutureListener; +import org.apache.drill.common.exceptions.DrillException; +import org.slf4j.Logger; + +import java.net.SocketAddress; +import java.util.concurrent.TimeUnit; + +/** + * @param <CC> Client Connection Listener + * @param <HS> Outbound handshake message type + * @param <HR> Inbound handshake message type + * @param <BC> BasicClient type + * <p> + * Implements a wrapper class that allows a client connection to associate different behaviours after + * establishing a connection with the server. The client can choose to send an application handshake, or + * in the case of SSL, wait for a SSL handshake completion and then send an application handshake. + */ + +public class ConnectionMultiListener<CC extends ClientConnection, HS extends MessageLite, HR extends MessageLite, BC extends BasicClient> { + + private static final Logger logger = org.slf4j.LoggerFactory.getLogger(ConnectionMultiListener.class); + + private final RpcConnectionHandler<CC> connectionListener; + private final HS handshakeValue; + private final BC parent; + + private ConnectionMultiListener(RpcConnectionHandler<CC> connectionListener, HS handshakeValue, + BC basicClient) { + assert connectionListener != null; + assert handshakeValue != null; + + this.connectionListener = connectionListener; + this.handshakeValue = handshakeValue; + this.parent = basicClient; + } + + @SuppressWarnings("unchecked") + public static <CC extends ClientConnection, HS extends MessageLite, BC extends BasicClient> Builder + newBuilder(RpcConnectionHandler<CC> connectionListener, HS handshakeValue, + BC basicClient) { + return new Builder(connectionListener, handshakeValue, basicClient); + } + + public ConnectionHandler connectionHandler = null; + public HandshakeSendHandler handshakeSendHandler = null; + public SSLConnectionHandler sslConnectionHandler = null; + + /** + * Manages connection establishment outcomes. + */ + private class ConnectionHandler implements GenericFutureListener<ChannelFuture> { + + @Override public void operationComplete(ChannelFuture future) throws Exception { + boolean isInterrupted = false; + + // We want to wait for at least 120 secs when interrupts occur. Establishing a connection fails/succeeds quickly, + // So there is no point propagating the interruption as failure immediately. + long remainingWaitTimeMills = 120000; + long startTime = System.currentTimeMillis(); + // logger.debug("Connection operation finished. Success: {}", future.isSuccess()); + while (true) { + try { + future.get(remainingWaitTimeMills, TimeUnit.MILLISECONDS); + if (future.isSuccess()) { + SocketAddress remote = future.channel().remoteAddress(); + SocketAddress local = future.channel().localAddress(); + parent.setAddresses(remote, local); + // if SSL is enabled send the handshake after the ssl handshake is completed, otherwise send it + // now + if(!parent.isSslEnabled()) { + // send a handshake on the current thread. This is the only time we will send from within the event thread. + // We can do this because the connection will not be backed up. + parent.send(handshakeSendHandler, handshakeValue, true); + } + } else { + connectionListener.connectionFailed(RpcConnectionHandler.FailureType.CONNECTION, + new RpcException("General connection failure.")); + } + // logger.debug("Handshake queued for send."); + break; + } catch (final InterruptedException interruptEx) { + remainingWaitTimeMills -= (System.currentTimeMillis() - startTime); + startTime = System.currentTimeMillis(); + isInterrupted = true; + if (remainingWaitTimeMills < 1) { + connectionListener.connectionFailed(RpcConnectionHandler.FailureType.CONNECTION, interruptEx); + break; + } + // Ignore the interrupt and continue to wait until we elapse remainingWaitTimeMills. + } catch (final Exception ex) { + logger.error("Failed to establish connection", ex); + connectionListener.connectionFailed(RpcConnectionHandler.FailureType.CONNECTION, ex); + break; + } + } + + if (isInterrupted) { + // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the + // interruption and respond to it if it wants to. + Thread.currentThread().interrupt(); + } + } + } + + private class SSLConnectionHandler implements GenericFutureListener<Future<Channel>> { + @Override public void operationComplete(Future<Channel> future) throws Exception { + // send the handshake + parent.send(handshakeSendHandler, handshakeValue, true); + } + } + + /** + * manages handshake outcomes. + */ + private class HandshakeSendHandler implements RpcOutcomeListener<HR> { + + @Override public void failed(RpcException ex) { + logger.debug("Failure while initiating handshake", ex); + connectionListener.connectionFailed(RpcConnectionHandler.FailureType.HANDSHAKE_COMMUNICATION, ex); + } + + @Override public void success(HR value, ByteBuf buffer) { + // logger.debug("Handshake received. {}", value); + try { + parent.validateHandshake(value); + parent.finalizeConnection(value, parent.connection); + connectionListener.connectionSucceeded((CC) parent.connection); + // logger.debug("Handshake completed succesfully."); + } catch (Exception ex) { + logger.debug("Failure while validating handshake", ex); + connectionListener.connectionFailed(RpcConnectionHandler.FailureType.HANDSHAKE_VALIDATION, ex); + } + } + + @Override public void interrupted(final InterruptedException ex) { + logger.warn("Interrupted while waiting for handshake response", ex); + connectionListener.connectionFailed(RpcConnectionHandler.FailureType.HANDSHAKE_COMMUNICATION, ex); + } + } + + /* + The SSL Handshake listener is special in that it is needed at the time of initializing an SSL + enabled pipeline and so is instantiated before the instance of the outer class may be needed. + We create an instance and set a reference back to the outer class instance when it is created + at the time of connection. + */ + public static class SSLHandshakeListener implements GenericFutureListener<Future<Channel>> { + ConnectionMultiListener parent; + public SSLHandshakeListener() { + } + + public void setParent(ConnectionMultiListener cml){ + this.parent = cml; + } + + @Override public void operationComplete(Future<Channel> future) throws Exception { + if(parent != null){ + if(future.isSuccess()) { + Channel c = future.get(); + parent.sslConnectionHandler.operationComplete(future); + parent.parent.setSslChannel(c); + } else { + throw new DrillException("SSL handshake failed.", future.cause()); + } + } else { + throw new RpcException("RPC Setup error. SSL handshake complete handler is not set up."); + } + return; + } + } + + + public static class Builder<CC extends ClientConnection, HS extends MessageLite, HR extends MessageLite, BC extends BasicClient> { + + private RpcConnectionHandler<CC> connectionListener; + private HS handshakeValue; + private BC basicClient; + private ConnectionMultiListener cml; + + private Builder(RpcConnectionHandler<CC> connectionListener, HS handshakeValue, BC basicClient) { + this.connectionListener = connectionListener; + this.handshakeValue = handshakeValue; + this.basicClient = basicClient; + this.cml = new ConnectionMultiListener(connectionListener, handshakeValue, basicClient); + } + + public Builder enableSSL() { + cml.connectionHandler = cml.new ConnectionHandler(); + cml.sslConnectionHandler = cml.new SSLConnectionHandler(); + return this; + } + + public Builder enablePlain() { + cml.connectionHandler = cml.new ConnectionHandler(); + return this; + } + + public Builder enableHandshake() { + cml.handshakeSendHandler = cml.new HandshakeSendHandler(); + return this; + } + + public ConnectionMultiListener build() { + //always enable handshake + if (cml.handshakeSendHandler == null) { + enableHandshake(); + } + if (cml.connectionHandler == null && cml.sslConnectionHandler == null) { + enablePlain(); + } + return cml; --- End diff -- Couple of things here: 1) This builder is responsible for creating ConnectionMultiListener which is not being created in build method rather in the constructor itself. 2) Since HandshakeSendHandler and ConnectionHandler are needed both for SSL and Plain case why not leave their instantiation as it was earlier which is in the ConnectionMultiListener class by default ? So we can do something like below: 1) Accept ConnectionListener, HandshakeValue and BasicClient in the Builder constructor and assign that to member's of builder. 2) Add new member for builder to have SSLConnectionHandler too. 3) Then just have enableSSL() or configurSSLHandler() method in which create a new SSLConnectionhandler and assign that to the member variable. 4) Inside build() method then create an instance of ConnectionMultiListener with all the 4 members and return that.
---