When interrupt terminating RPC bus waitForSendComplete, make sure to release buffers.
Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/69582463 Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/69582463 Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/69582463 Branch: refs/heads/master Commit: 69582463c12ecb4d9ed987ceca1e13d17b75d7e0 Parents: 62a73bc Author: Jacques Nadeau <[email protected]> Authored: Thu May 14 21:56:14 2015 -0700 Committer: Jacques Nadeau <[email protected]> Committed: Thu May 14 22:17:59 2015 -0700 ---------------------------------------------------------------------- .../java/org/apache/drill/exec/rpc/RpcBus.java | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/69582463/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java index 812b2fd..9ca09a1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java @@ -28,7 +28,6 @@ import io.netty.handler.codec.MessageToMessageDecoder; import io.netty.util.concurrent.GenericFutureListener; import java.io.Closeable; -import java.net.InetSocketAddress; import java.net.SocketAddress; import java.util.Arrays; import java.util.List; @@ -96,21 +95,21 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp public <SEND extends MessageLite, RECEIVE extends MessageLite> void send(RpcOutcomeListener<RECEIVE> listener, C connection, T rpcType, SEND protobufBody, Class<RECEIVE> clazz, boolean allowInEventLoop, ByteBuf... dataBodies) { - if (!allowInEventLoop) { - if (connection.inEventLoop()) { - throw new IllegalStateException("You attempted to send while inside the rpc event thread. This isn't allowed because sending will block if the channel is backed up."); - } - - if (!connection.blockOnNotWritable(listener)) { - return; - } - } + Preconditions + .checkArgument( + allowInEventLoop || !connection.inEventLoop(), + "You attempted to send while inside the rpc event thread. This isn't allowed because sending will block if the channel is backed up."); ByteBuf pBuffer = null; boolean completed = false; try { + if (!allowInEventLoop && !connection.blockOnNotWritable(listener)) { + // if we're in not in the event loop and we're interrupted while blocking, skip sending this message. + return; + } + assert !Arrays.asList(dataBodies).contains(null); assert rpcConfig.checkSend(rpcType, protobufBody.getClass(), clazz);
