DRILL-4041 & DRILL-4057: Disable RPC thread offload until concurrency bug is found.
Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/e465cd90 Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/e465cd90 Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/e465cd90 Branch: refs/heads/1.3.0 Commit: e465cd90c8915aa05f1f4cf0824bb1c3bb5eb1e9 Parents: ba4baea Author: Jacques Nadeau <[email protected]> Authored: Fri Nov 6 14:25:05 2015 -0800 Committer: Jacques Nadeau <[email protected]> Committed: Tue Nov 10 16:05:52 2015 -0800 ---------------------------------------------------------------------- .../main/java/org/apache/drill/exec/rpc/RpcBus.java | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/e465cd90/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java index 61922a1..b6f3032 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java @@ -59,6 +59,7 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(this.getClass()); private static final OutboundRpcMessage PONG = new OutboundRpcMessage(RpcMode.PONG, 0, 0, Acks.OK); + private static final boolean ENABLE_SEPARATE_THREADS = "true".equals(System.getProperty("drill.enable_rpc_offload")); protected final CoordinationQueue queue = new CoordinationQueue(16, 16); @@ -256,16 +257,25 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp } + private class SameExecutor implements Executor { + + @Override + public void execute(Runnable command) { + command.run(); + } + + } protected class InboundHandler extends MessageToMessageDecoder<InboundRpcMessage> { - private final RpcEventHandler exec; + private final Executor exec; private final C connection; public InboundHandler(C connection) { super(); this.connection = connection; - this.exec = new RpcEventHandler(rpcConfig.getExecutor()); + final Executor underlyingExecutor = ENABLE_SEPARATE_THREADS ? rpcConfig.getExecutor() : new SameExecutor(); + this.exec = new RpcEventHandler(underlyingExecutor); } @Override
