DRILL-3081: Populate connection name as late as possible so RPC error messages are reported correctly.
Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/4b0b3a67 Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/4b0b3a67 Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/4b0b3a67 Branch: refs/heads/master Commit: 4b0b3a67ab5e2db2baf34250bdedb174fce648ad Parents: f0b3671 Author: Parth Chandra <[email protected]> Authored: Thu May 14 12:27:40 2015 -0700 Committer: Jacques Nadeau <[email protected]> Committed: Thu May 14 21:58:53 2015 -0700 ---------------------------------------------------------------------- .../org/apache/drill/exec/rpc/BasicClient.java | 15 +++++++-- .../exec/rpc/BasicClientWithConnection.java | 1 + .../org/apache/drill/exec/rpc/BasicServer.java | 16 +++++----- .../apache/drill/exec/rpc/RemoteConnection.java | 8 +++-- .../java/org/apache/drill/exec/rpc/RpcBus.java | 32 +++++++++++++++----- .../drill/exec/rpc/RpcExceptionHandler.java | 13 ++++---- .../drill/exec/rpc/control/ControlClient.java | 3 +- .../drill/exec/rpc/control/ControlServer.java | 1 + .../apache/drill/exec/rpc/data/DataClient.java | 1 + .../apache/drill/exec/rpc/data/DataServer.java | 1 + .../apache/drill/exec/rpc/user/UserServer.java | 1 + 11 files changed, 65 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/4b0b3a67/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java index a33b370..cf09be3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java @@ -33,6 +33,7 @@ import io.netty.handler.timeout.IdleStateHandler; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; +import java.net.SocketAddress; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -46,7 +47,7 @@ import com.google.protobuf.Parser; public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection, HANDSHAKE_SEND extends MessageLite, HANDSHAKE_RESPONSE extends MessageLite> extends RpcBus<T, R> { - final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(getClass()); + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BasicClient.class); // The percentage of time that should pass before sending a ping message to ensure server doesn't time us out. For // example, if timeout is set to 30 seconds and we set percentage to 0.5, then if no write has happened within 15 @@ -101,7 +102,7 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection } pipe.addLast("message-handler", new InboundHandler(connection)); - pipe.addLast("exception-handler", new RpcExceptionHandler(connection.getName())); + pipe.addLast("exception-handler", new RpcExceptionHandler(connection)); } }); // @@ -110,6 +111,12 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection // } } + public R initRemoteConnection(SocketChannel channel){ + local=channel.localAddress(); + remote=channel.remoteAddress(); + return null; + }; + private static final OutboundRpcMessage PING_MESSAGE = new OutboundRpcMessage(RpcMode.PING, 0, 0, Acks.OK); /** @@ -200,12 +207,14 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection // So there is no point propagating the interruption as failure immediately. long remainingWaitTimeMills = 120000; long startTime = System.currentTimeMillis(); - // logger.debug("Connection operation finished. Success: {}", future.isSuccess()); while(true) { try { future.get(remainingWaitTimeMills, TimeUnit.MILLISECONDS); if (future.isSuccess()) { + SocketAddress remote = future.channel().remoteAddress(); + SocketAddress local = future.channel().localAddress(); + setAddresses(remote, local); // send a handshake on the current thread. This is the only time we will send from within the event thread. // We can do this because the connection will not be backed up. send(handshakeSendHandler, connection, handshakeType, handshakeValue, responseClass, true); http://git-wip-us.apache.org/repos/asf/drill/blob/4b0b3a67/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java index ab54fa1..c194b5e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java @@ -51,6 +51,7 @@ public abstract class BasicClientWithConnection<T extends EnumLite, HANDSHAKE_SE @Override public ServerConnection initRemoteConnection(SocketChannel channel) { + super.initRemoteConnection(channel); return new ServerConnection(connectionName, channel, alloc); } http://git-wip-us.apache.org/repos/asf/drill/blob/4b0b3a67/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java index 6a7bc65..5c04264 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java @@ -85,11 +85,11 @@ public abstract class BasicServer<T extends EnumLite, C extends RemoteConnection if (rpcMapping.hasTimeout()) { pipe.addLast(TIMEOUT_HANDLER, - new LogggingReadTimeoutHandler(connection.getName(), rpcMapping.getTimeout())); + new LogggingReadTimeoutHandler(connection, rpcMapping.getTimeout())); } pipe.addLast("message-handler", new InboundHandler(connection)); - pipe.addLast("exception-handler", new RpcExceptionHandler(connection.getName())); + pipe.addLast("exception-handler", new RpcExceptionHandler(connection)); connect = true; // logger.debug("Server connection initialization completed."); @@ -101,19 +101,19 @@ public abstract class BasicServer<T extends EnumLite, C extends RemoteConnection // } } - private class LogggingReadTimeoutHandler extends ReadTimeoutHandler { + private class LogggingReadTimeoutHandler<C extends RemoteConnection> extends ReadTimeoutHandler { - private final String name; + private final C connection; private final int timeoutSeconds; - public LogggingReadTimeoutHandler(String name, int timeoutSeconds) { + public LogggingReadTimeoutHandler(C connection, int timeoutSeconds) { super(timeoutSeconds); - this.name = name; + this.connection = connection; this.timeoutSeconds = timeoutSeconds; } @Override protected void readTimedOut(ChannelHandlerContext ctx) throws Exception { - logger.info("RPC connection {} timed out. Timeout was set to {} seconds. Closing connection.", name, + logger.info("RPC connection {} timed out. Timeout was set to {} seconds. Closing connection.", connection.getName(), timeoutSeconds); super.readTimedOut(ctx); } @@ -178,6 +178,8 @@ public abstract class BasicServer<T extends EnumLite, C extends RemoteConnection @Override public C initRemoteConnection(SocketChannel channel) { + local = channel.localAddress(); + remote = channel.remoteAddress(); return null; } http://git-wip-us.apache.org/repos/asf/drill/blob/4b0b3a67/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java index 199569c..30abcc4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java @@ -33,7 +33,8 @@ public abstract class RemoteConnection implements ConnectionThrottle, AutoClosea static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RemoteConnection.class); private final Channel channel; private final WriteManager writeManager; - private final String name; + private String name; + private final String clientName; public boolean inEventLoop(){ return channel.eventLoop().inEventLoop(); @@ -42,7 +43,7 @@ public abstract class RemoteConnection implements ConnectionThrottle, AutoClosea public RemoteConnection(SocketChannel channel, String name) { super(); this.channel = channel; - this.name = String.format("%s <--> %s (%s)", channel.localAddress(), channel.remoteAddress(), name); + this.clientName = name; this.writeManager = new WriteManager(); channel.pipeline().addLast(new BackPressureHandler()); channel.closeFuture().addListener(new GenericFutureListener<Future<? super Void>>() { @@ -57,6 +58,9 @@ public abstract class RemoteConnection implements ConnectionThrottle, AutoClosea } public String getName() { + if(name == null){ + name = String.format("%s <--> %s (%s)", channel.localAddress(), channel.remoteAddress(), clientName); + } return name; } http://git-wip-us.apache.org/repos/asf/drill/blob/4b0b3a67/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java index 1a23724..812b2fd 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java @@ -29,6 +29,7 @@ import io.netty.util.concurrent.GenericFutureListener; import java.io.Closeable; import java.net.InetSocketAddress; +import java.net.SocketAddress; import java.util.Arrays; import java.util.List; import java.util.concurrent.TimeUnit; @@ -67,10 +68,19 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp protected final RpcConfig rpcConfig; + protected volatile SocketAddress local; + protected volatile SocketAddress remote; + + public RpcBus(RpcConfig rpcConfig) { this.rpcConfig = rpcConfig; } + protected void setAddresses(SocketAddress remote, SocketAddress local){ + this.remote = remote; + this.local = local; + } + <SEND extends MessageLite, RECEIVE extends MessageLite> DrillRpcFuture<RECEIVE> send(C connection, T rpcType, SEND protobufBody, Class<RECEIVE> clazz, ByteBuf... dataBodies) { DrillRpcFutureImpl<RECEIVE> rpcFuture = new DrillRpcFutureImpl<RECEIVE>(); @@ -133,21 +143,27 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp public class ChannelClosedHandler implements GenericFutureListener<ChannelFuture> { - final InetSocketAddress local; - final InetSocketAddress remote; final C clientConnection; + private final Channel channel; - public ChannelClosedHandler(C clientConnection, InetSocketAddress local, InetSocketAddress remote) { - this.local = local; - this.remote = remote; + public ChannelClosedHandler(C clientConnection, Channel channel) { + this.channel = channel; this.clientConnection = clientConnection; } @Override public void operationComplete(ChannelFuture future) throws Exception { - String msg = String.format("Channel closed %s <--> %s.", local, remote); + String msg; + if(local!=null) { + msg = String.format("Channel closed %s <--> %s.", local, remote); + }else{ + msg = String.format("Channel closed %s <--> %s.", future.channel().localAddress(), future.channel().remoteAddress()); + } + if (RpcBus.this.isClient()) { - logger.info(String.format(msg)); + if(local != null) { + logger.info(String.format(msg)); + } } else { queue.channelClosed(new ChannelClosedException(msg)); } @@ -158,7 +174,7 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp } protected GenericFutureListener<ChannelFuture> getCloseHandler(SocketChannel channel, C clientConnection) { - return new ChannelClosedHandler(clientConnection, channel.localAddress(), channel.remoteAddress()); + return new ChannelClosedHandler(clientConnection, channel); } private class ResponseSenderImpl implements ResponseSender { http://git-wip-us.apache.org/repos/asf/drill/blob/4b0b3a67/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcExceptionHandler.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcExceptionHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcExceptionHandler.java index c12ff7b..46b7702 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcExceptionHandler.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcExceptionHandler.java @@ -19,23 +19,24 @@ package org.apache.drill.exec.rpc; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; +import org.eclipse.jetty.io.Connection; -public class RpcExceptionHandler implements ChannelHandler{ +public class RpcExceptionHandler<C extends RemoteConnection> implements ChannelHandler{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RpcExceptionHandler.class); - private final String name; + private final C connection; - public RpcExceptionHandler(String name) { - this.name = name; + public RpcExceptionHandler(C connection){ + this.connection = connection; } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { if(!ctx.channel().isOpen() || cause.getMessage().equals("Connection reset by peer")){ - logger.warn("Exception occurred with closed channel. Connection: {}", name, cause); + logger.warn("Exception occurred with closed channel. Connection: {}", connection.getName(), cause); return; }else{ - logger.error("Exception in RPC communication. Connection: {}. Closing connection.", name, cause); + logger.error("Exception in RPC communication. Connection: {}. Closing connection.", connection.getName(), cause); ctx.close(); } } http://git-wip-us.apache.org/repos/asf/drill/blob/4b0b3a67/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java index f191271..159f1df 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java @@ -40,7 +40,7 @@ import com.google.protobuf.MessageLite; public class ControlClient extends BasicClient<RpcType, ControlConnection, BitControlHandshake, BitControlHandshake>{ - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ControlClient.class); + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ControlClient.class); private final ControlMessageHandler handler; private final DrillbitEndpoint remoteEndpoint; @@ -66,6 +66,7 @@ public class ControlClient extends BasicClient<RpcType, ControlConnection, BitCo @SuppressWarnings("unchecked") @Override public ControlConnection initRemoteConnection(SocketChannel channel) { + super.initRemoteConnection(channel); this.connection = new ControlConnection("control client", channel, (RpcBus<RpcType, ControlConnection>) (RpcBus<?, ?>) this, allocator); return connection; http://git-wip-us.apache.org/repos/asf/drill/blob/4b0b3a67/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java index 5e405ab..98ce9e1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java @@ -69,6 +69,7 @@ public class ControlServer extends BasicServer<RpcType, ControlConnection>{ @Override public ControlConnection initRemoteConnection(SocketChannel channel) { + super.initRemoteConnection(channel); return new ControlConnection("control server", channel, this, allocator); } http://git-wip-us.apache.org/repos/asf/drill/blob/4b0b3a67/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java index b8a07c7..544bab9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java @@ -56,6 +56,7 @@ public class DataClient extends BasicClient<RpcType, DataClientConnection, BitCl @Override public DataClientConnection initRemoteConnection(SocketChannel channel) { + super.initRemoteConnection(channel); this.connection = new DataClientConnection(channel, this); return connection; } http://git-wip-us.apache.org/repos/asf/drill/blob/4b0b3a67/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java index 061ddcb..80d2d6e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java @@ -77,6 +77,7 @@ public class DataServer extends BasicServer<RpcType, BitServerConnection> { @Override public BitServerConnection initRemoteConnection(SocketChannel channel) { + super.initRemoteConnection(channel); return new BitServerConnection(channel, context.getAllocator()); } http://git-wip-us.apache.org/repos/asf/drill/blob/4b0b3a67/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java index 72b07ba..a197356 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java @@ -175,6 +175,7 @@ public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnec @Override public UserClientConnection initRemoteConnection(SocketChannel channel) { + super.initRemoteConnection(channel); return new UserClientConnection(channel); }
