Repository: hive Updated Branches: refs/heads/master dc4c66f6b -> 9bf23f6c6
HIVE-13956: LLAP: external client output is writing to channel before it is writable again (Jason Dere, reviewed by Prasanth Jayachandran) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/9bf23f6c Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/9bf23f6c Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/9bf23f6c Branch: refs/heads/master Commit: 9bf23f6c684fab4308fcdc4c042dbef1618a0e06 Parents: dc4c66f Author: Jason Dere <[email protected]> Authored: Wed Jun 15 11:05:41 2016 -0700 Committer: Jason Dere <[email protected]> Committed: Wed Jun 15 11:05:41 2016 -0700 ---------------------------------------------------------------------- .../hadoop/hive/llap/ChannelOutputStream.java | 19 +++++++++++++++++-- .../hive/llap/LlapOutputFormatService.java | 11 ++++++++++- 2 files changed, 27 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/9bf23f6c/ql/src/java/org/apache/hadoop/hive/llap/ChannelOutputStream.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/llap/ChannelOutputStream.java b/ql/src/java/org/apache/hadoop/hive/llap/ChannelOutputStream.java index e861791..239e061 100644 --- a/ql/src/java/org/apache/hadoop/hive/llap/ChannelOutputStream.java +++ b/ql/src/java/org/apache/hadoop/hive/llap/ChannelOutputStream.java @@ -40,6 +40,7 @@ public class ChannelOutputStream extends OutputStream { private ByteBuf buf; private byte[] singleByte = new byte[1]; private boolean closed = false; + private final Object channelWritabilityMonitor; private ChannelFutureListener listener = new ChannelFutureListener() { @Override @@ -52,11 +53,12 @@ public class ChannelOutputStream extends OutputStream { } }; - public ChannelOutputStream(ChannelHandlerContext chc, String id, int bufSize) { + public ChannelOutputStream(ChannelHandlerContext chc, String id, int bufSize, final Object monitor) { this.chc = chc; this.id = id; this.bufSize = bufSize; this.buf = chc.alloc().buffer(bufSize); + this.channelWritabilityMonitor = monitor; } @Override @@ -124,8 +126,21 @@ public class ChannelOutputStream extends OutputStream { throw new IOException("Already closed: " + id); } - chc.write(buf.copy()).addListener(listener); + chc.writeAndFlush(buf.copy()).addListener(listener); buf.clear(); + + // if underlying channel is not writable (perhaps because of slow consumer) wait for + // notification about writable state change + synchronized (channelWritabilityMonitor) { + // to prevent spurious wake up + while (!chc.channel().isWritable()) { + try { + channelWritabilityMonitor.wait(); + } catch (InterruptedException e) { + throw new IOException("Interrupted when waiting for channel writability state change", e); + } + } + } } private void writeInternal(byte[] b, int off, int len) throws IOException { http://git-wip-us.apache.org/repos/asf/hive/blob/9bf23f6c/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java index 151a31f..825488f 100644 --- a/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java +++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java @@ -166,6 +166,7 @@ public class LlapOutputFormatService { protected class LlapOutputFormatServiceHandler extends SimpleChannelInboundHandler<LlapOutputSocketInitMessage> { private final int sendBufferSize; + private final Object channelWritabilityMonitor = new Object(); public LlapOutputFormatServiceHandler(final int sendBufferSize) { this.sendBufferSize = sendBufferSize; } @@ -195,7 +196,7 @@ public class LlapOutputFormatService { LOG.debug("registering socket for: " + id); @SuppressWarnings("rawtypes") LlapRecordWriter writer = new LlapRecordWriter( - new ChannelOutputStream(ctx, id, sendBufferSize)); + new ChannelOutputStream(ctx, id, sendBufferSize, channelWritabilityMonitor)); boolean isFailed = true; synchronized (lock) { if (!writers.containsKey(id)) { @@ -221,6 +222,14 @@ public class LlapOutputFormatService { } LOG.error(error); } + + @Override + public void channelWritabilityChanged(final ChannelHandlerContext ctx) throws Exception { + super.channelWritabilityChanged(ctx); + synchronized (channelWritabilityMonitor) { + channelWritabilityMonitor.notifyAll(); + } + } } protected class LlapOutputFormatChannelCloseListener implements ChannelFutureListener {
