Repository: hive Updated Branches: refs/heads/master a55f4a3a4 -> 57d1f3d85
HIVE-13751: LlapOutputFormatService should have a configurable send buffer size (Prasanth Jayachandran reviewed by Jason Dere) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/57d1f3d8 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/57d1f3d8 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/57d1f3d8 Branch: refs/heads/master Commit: 57d1f3d85e0fe04e2c84c907d713593958c79a1f Parents: a55f4a3 Author: Prasanth Jayachandran <[email protected]> Authored: Tue May 31 13:54:55 2016 -0700 Committer: Prasanth Jayachandran <[email protected]> Committed: Tue May 31 13:54:55 2016 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 2 ++ .../hive/llap/LlapOutputFormatService.java | 24 +++++++++++++++----- 2 files changed, 20 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/57d1f3d8/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 8706665..b2e6b6f 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2874,6 +2874,8 @@ public class HiveConf extends Configuration { "protocol or ZK paths), similar to how ssh refuses a key with bad access permissions."), LLAP_DAEMON_OUTPUT_SERVICE_PORT("hive.llap.daemon.output.service.port", 15003, "LLAP daemon output service port"), + LLAP_DAEMON_OUTPUT_SERVICE_SEND_BUFFER_SIZE("hive.llap.daemon.output.service.send.buffer.size", + 128 * 1024, "Send buffer size to be used by LLAP daemon output service"), LLAP_ENABLE_GRACE_JOIN_IN_LLAP("hive.llap.enable.grace.join.in.llap", false, "Override if grace join should be allowed to run in llap."), http://git-wip-us.apache.org/repos/asf/hive/blob/57d1f3d8/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java index f852041..06660b3 100644 --- a/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java +++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java @@ -109,15 +109,18 @@ public class LlapOutputFormatService { LOG.info("Starting LlapOutputFormatService"); int portFromConf = HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_PORT); + int sendBufferSize = HiveConf.getIntVar(conf, + HiveConf.ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_SEND_BUFFER_SIZE); eventLoopGroup = new NioEventLoopGroup(1); serverBootstrap = new ServerBootstrap(); serverBootstrap.group(eventLoopGroup); serverBootstrap.channel(NioServerSocketChannel.class); - serverBootstrap.childHandler(new LlapOutputFormatServiceChannelHandler()); + serverBootstrap.childHandler(new LlapOutputFormatServiceChannelHandler(sendBufferSize)); try { listeningChannelFuture = serverBootstrap.bind(portFromConf).sync(); this.port = ((InetSocketAddress) listeningChannelFuture.channel().localAddress()).getPort(); - LOG.info("LlapOutputFormatService: Binding to port " + this.port); + LOG.info("LlapOutputFormatService: Binding to port: {} with send buffer size: {} ", this.port, + sendBufferSize); } catch (InterruptedException err) { throw new IOException("LlapOutputFormatService: Error binding to port " + portFromConf, err); } @@ -154,6 +157,11 @@ public class LlapOutputFormatService { } protected class LlapOutputFormatServiceHandler extends SimpleChannelInboundHandler<String> { + private final int sendBufferSize; + public LlapOutputFormatServiceHandler(final int sendBufferSize) { + this.sendBufferSize = sendBufferSize; + } + @Override public void channelRead0(ChannelHandlerContext ctx, String msg) { String id = msg; @@ -162,9 +170,8 @@ public class LlapOutputFormatService { private void registerReader(ChannelHandlerContext ctx, String id) { synchronized(INSTANCE) { - LOG.debug("registering socket for: "+id); - int bufSize = 128 * 1024; // configable? - OutputStream stream = new ChannelOutputStream(ctx, id, bufSize); + LOG.debug("registering socket for: " + id); + OutputStream stream = new ChannelOutputStream(ctx, id, sendBufferSize); LlapRecordWriter writer = new LlapRecordWriter(stream); writers.put(id, writer); @@ -198,13 +205,18 @@ public class LlapOutputFormatService { } protected class LlapOutputFormatServiceChannelHandler extends ChannelInitializer<SocketChannel> { + private final int sendBufferSize; + public LlapOutputFormatServiceChannelHandler(final int sendBufferSize) { + this.sendBufferSize = sendBufferSize; + } + @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast( new DelimiterBasedFrameDecoder(MAX_QUERY_ID_LENGTH, Delimiters.nulDelimiter()), new StringDecoder(), new StringEncoder(), - new LlapOutputFormatServiceHandler()); + new LlapOutputFormatServiceHandler(sendBufferSize)); } } }
