Github user parthchandra commented on a diff in the pull request: https://github.com/apache/drill/pull/950#discussion_r141498641 --- Diff: exec/rpc/src/main/java/org/apache/drill/exec/rpc/ConnectionMultiListener.java --- @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.rpc; + +import com.google.protobuf.MessageLite; +import io.netty.buffer.ByteBuf; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GenericFutureListener; +import org.apache.drill.common.exceptions.DrillException; +import org.slf4j.Logger; + +import java.net.SocketAddress; +import java.util.concurrent.TimeUnit; + +/** + * @param <CC> Client Connection Listener + * @param <HS> Outbound handshake message type + * @param <HR> Inbound handshake message type + * @param <BC> BasicClient type + * <p> + * Implements a wrapper class that allows a client connection to associate different behaviours after + * establishing a connection with the server. The client can choose to send an application handshake, or + * in the case of SSL, wait for a SSL handshake completion and then send an application handshake. + */ + +public class ConnectionMultiListener<CC extends ClientConnection, HS extends MessageLite, HR extends MessageLite, BC extends BasicClient> { + + private static final Logger logger = org.slf4j.LoggerFactory.getLogger(ConnectionMultiListener.class); + + private final RpcConnectionHandler<CC> connectionListener; + private final HS handshakeValue; + private final BC parent; + + private ConnectionMultiListener(RpcConnectionHandler<CC> connectionListener, HS handshakeValue, + BC basicClient) { + assert connectionListener != null; + assert handshakeValue != null; + + this.connectionListener = connectionListener; + this.handshakeValue = handshakeValue; + this.parent = basicClient; + } + + @SuppressWarnings("unchecked") + public static <CC extends ClientConnection, HS extends MessageLite, BC extends BasicClient> Builder + newBuilder(RpcConnectionHandler<CC> connectionListener, HS handshakeValue, + BC basicClient) { + return new Builder(connectionListener, handshakeValue, basicClient); + } + + public ConnectionHandler connectionHandler = null; + public HandshakeSendHandler handshakeSendHandler = null; + public SSLConnectionHandler sslConnectionHandler = null; + + /** + * Manages connection establishment outcomes. + */ + private class ConnectionHandler implements GenericFutureListener<ChannelFuture> { + + @Override public void operationComplete(ChannelFuture future) throws Exception { + boolean isInterrupted = false; + + // We want to wait for at least 120 secs when interrupts occur. Establishing a connection fails/succeeds quickly, + // So there is no point propagating the interruption as failure immediately. + long remainingWaitTimeMills = 120000; + long startTime = System.currentTimeMillis(); + // logger.debug("Connection operation finished. Success: {}", future.isSuccess()); + while (true) { + try { + future.get(remainingWaitTimeMills, TimeUnit.MILLISECONDS); + if (future.isSuccess()) { + SocketAddress remote = future.channel().remoteAddress(); + SocketAddress local = future.channel().localAddress(); + parent.setAddresses(remote, local); + // if SSL is enabled send the handshake after the ssl handshake is completed, otherwise send it + // now + if(!parent.isSslEnabled()) { + // send a handshake on the current thread. This is the only time we will send from within the event thread. + // We can do this because the connection will not be backed up. + parent.send(handshakeSendHandler, handshakeValue, true); + } + } else { + connectionListener.connectionFailed(RpcConnectionHandler.FailureType.CONNECTION, + new RpcException("General connection failure.")); + } + // logger.debug("Handshake queued for send."); + break; + } catch (final InterruptedException interruptEx) { + remainingWaitTimeMills -= (System.currentTimeMillis() - startTime); + startTime = System.currentTimeMillis(); + isInterrupted = true; + if (remainingWaitTimeMills < 1) { + connectionListener.connectionFailed(RpcConnectionHandler.FailureType.CONNECTION, interruptEx); + break; + } + // Ignore the interrupt and continue to wait until we elapse remainingWaitTimeMills. + } catch (final Exception ex) { + logger.error("Failed to establish connection", ex); + connectionListener.connectionFailed(RpcConnectionHandler.FailureType.CONNECTION, ex); + break; + } + } + + if (isInterrupted) { + // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the + // interruption and respond to it if it wants to. + Thread.currentThread().interrupt(); + } + } + } + + private class SSLConnectionHandler implements GenericFutureListener<Future<Channel>> { + @Override public void operationComplete(Future<Channel> future) throws Exception { + // send the handshake + parent.send(handshakeSendHandler, handshakeValue, true); + } + } + + /** + * manages handshake outcomes. + */ + private class HandshakeSendHandler implements RpcOutcomeListener<HR> { + + @Override public void failed(RpcException ex) { + logger.debug("Failure while initiating handshake", ex); + connectionListener.connectionFailed(RpcConnectionHandler.FailureType.HANDSHAKE_COMMUNICATION, ex); + } + + @Override public void success(HR value, ByteBuf buffer) { + // logger.debug("Handshake received. {}", value); + try { + parent.validateHandshake(value); + parent.finalizeConnection(value, parent.connection); + connectionListener.connectionSucceeded((CC) parent.connection); + // logger.debug("Handshake completed succesfully."); + } catch (Exception ex) { + logger.debug("Failure while validating handshake", ex); + connectionListener.connectionFailed(RpcConnectionHandler.FailureType.HANDSHAKE_VALIDATION, ex); + } + } + + @Override public void interrupted(final InterruptedException ex) { + logger.warn("Interrupted while waiting for handshake response", ex); + connectionListener.connectionFailed(RpcConnectionHandler.FailureType.HANDSHAKE_COMMUNICATION, ex); + } + } + + /* + The SSL Handshake listener is special in that it is needed at the time of initializing an SSL + enabled pipeline and so is instantiated before the instance of the outer class may be needed. + We create an instance and set a reference back to the outer class instance when it is created + at the time of connection. + */ + public static class SSLHandshakeListener implements GenericFutureListener<Future<Channel>> { + ConnectionMultiListener parent; + public SSLHandshakeListener() { + } + + public void setParent(ConnectionMultiListener cml){ + this.parent = cml; + } + + @Override public void operationComplete(Future<Channel> future) throws Exception { + if(parent != null){ + if(future.isSuccess()) { + Channel c = future.get(); + parent.sslConnectionHandler.operationComplete(future); + parent.parent.setSslChannel(c); + } else { + throw new DrillException("SSL handshake failed.", future.cause()); + } + } else { + throw new RpcException("RPC Setup error. SSL handshake complete handler is not set up."); + } + return; + } + } + + + public static class Builder<CC extends ClientConnection, HS extends MessageLite, HR extends MessageLite, BC extends BasicClient> { + + private RpcConnectionHandler<CC> connectionListener; + private HS handshakeValue; + private BC basicClient; + private ConnectionMultiListener cml; + + private Builder(RpcConnectionHandler<CC> connectionListener, HS handshakeValue, BC basicClient) { + this.connectionListener = connectionListener; + this.handshakeValue = handshakeValue; + this.basicClient = basicClient; + this.cml = new ConnectionMultiListener(connectionListener, handshakeValue, basicClient); + } + + public Builder enableSSL() { + cml.connectionHandler = cml.new ConnectionHandler(); + cml.sslConnectionHandler = cml.new SSLConnectionHandler(); + return this; + } + + public Builder enablePlain() { + cml.connectionHandler = cml.new ConnectionHandler(); + return this; + } + + public Builder enableHandshake() { + cml.handshakeSendHandler = cml.new HandshakeSendHandler(); + return this; + } + + public ConnectionMultiListener build() { + //always enable handshake + if (cml.handshakeSendHandler == null) { + enableHandshake(); + } + if (cml.connectionHandler == null && cml.sslConnectionHandler == null) { + enablePlain(); + } + return cml; --- End diff -- Agree that I can defer creation of ConnectionMultiListener to the build() method. I thought that we should let the caller decide what handler it wants to enable. However, I'm clearly overriding that in the build()method, so perhaps you're right.
---