Repository: hive
Updated Branches:
  refs/heads/master dc4c66f6b -> 9bf23f6c6


HIVE-13956: LLAP: external client output is writing to channel before it is 
writable again (Jason Dere, reviewed by Prasanth Jayachandran)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/9bf23f6c
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/9bf23f6c
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/9bf23f6c

Branch: refs/heads/master
Commit: 9bf23f6c684fab4308fcdc4c042dbef1618a0e06
Parents: dc4c66f
Author: Jason Dere <[email protected]>
Authored: Wed Jun 15 11:05:41 2016 -0700
Committer: Jason Dere <[email protected]>
Committed: Wed Jun 15 11:05:41 2016 -0700

----------------------------------------------------------------------
 .../hadoop/hive/llap/ChannelOutputStream.java    | 19 +++++++++++++++++--
 .../hive/llap/LlapOutputFormatService.java       | 11 ++++++++++-
 2 files changed, 27 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/9bf23f6c/ql/src/java/org/apache/hadoop/hive/llap/ChannelOutputStream.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/llap/ChannelOutputStream.java 
b/ql/src/java/org/apache/hadoop/hive/llap/ChannelOutputStream.java
index e861791..239e061 100644
--- a/ql/src/java/org/apache/hadoop/hive/llap/ChannelOutputStream.java
+++ b/ql/src/java/org/apache/hadoop/hive/llap/ChannelOutputStream.java
@@ -40,6 +40,7 @@ public class ChannelOutputStream extends OutputStream {
   private ByteBuf buf;
   private byte[] singleByte = new byte[1];
   private boolean closed = false;
+  private final Object channelWritabilityMonitor;
 
   private ChannelFutureListener listener = new ChannelFutureListener() {
     @Override
@@ -52,11 +53,12 @@ public class ChannelOutputStream extends OutputStream {
     }
   };
 
-  public ChannelOutputStream(ChannelHandlerContext chc, String id, int 
bufSize) {
+  public ChannelOutputStream(ChannelHandlerContext chc, String id, int 
bufSize, final Object monitor) {
     this.chc = chc;
     this.id = id;
     this.bufSize = bufSize;
     this.buf = chc.alloc().buffer(bufSize);
+    this.channelWritabilityMonitor = monitor;
   }
 
   @Override
@@ -124,8 +126,21 @@ public class ChannelOutputStream extends OutputStream {
       throw new IOException("Already closed: " + id);
     }
 
-    chc.write(buf.copy()).addListener(listener);
+    chc.writeAndFlush(buf.copy()).addListener(listener);
     buf.clear();
+
+    // if underlying channel is not writable (perhaps because of slow 
consumer) wait for
+    // notification about writable state change
+    synchronized (channelWritabilityMonitor) {
+      // to prevent spurious wake up
+      while (!chc.channel().isWritable()) {
+        try {
+          channelWritabilityMonitor.wait();
+        } catch (InterruptedException e) {
+          throw new IOException("Interrupted when waiting for channel 
writability state change", e);
+        }
+      }
+    }
   }
 
   private void writeInternal(byte[] b, int off, int len) throws IOException {

http://git-wip-us.apache.org/repos/asf/hive/blob/9bf23f6c/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java 
b/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java
index 151a31f..825488f 100644
--- a/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java
+++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java
@@ -166,6 +166,7 @@ public class LlapOutputFormatService {
   protected class LlapOutputFormatServiceHandler
     extends SimpleChannelInboundHandler<LlapOutputSocketInitMessage> {
     private final int sendBufferSize;
+    private final Object channelWritabilityMonitor = new Object();
     public LlapOutputFormatServiceHandler(final int sendBufferSize) {
       this.sendBufferSize = sendBufferSize;
     }
@@ -195,7 +196,7 @@ public class LlapOutputFormatService {
       LOG.debug("registering socket for: " + id);
       @SuppressWarnings("rawtypes")
       LlapRecordWriter writer = new LlapRecordWriter(
-          new ChannelOutputStream(ctx, id, sendBufferSize));
+          new ChannelOutputStream(ctx, id, sendBufferSize, 
channelWritabilityMonitor));
       boolean isFailed = true;
       synchronized (lock) {
         if (!writers.containsKey(id)) {
@@ -221,6 +222,14 @@ public class LlapOutputFormatService {
       }
       LOG.error(error);
     }
+
+    @Override
+    public void channelWritabilityChanged(final ChannelHandlerContext ctx) 
throws Exception {
+      super.channelWritabilityChanged(ctx);
+      synchronized (channelWritabilityMonitor) {
+        channelWritabilityMonitor.notifyAll();
+      }
+    }
   }
 
   protected class LlapOutputFormatChannelCloseListener implements 
ChannelFutureListener {

Reply via email to