Repository: hive
Updated Branches:
  refs/heads/master 40635f7e3 -> 385a26ad8


HIVE-20093: LlapOutputFomatService: Use ArrowBuf with Netty for Accounting 
(Eric Wohlstadter, reviewed by Jason Dere)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/385a26ad
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/385a26ad
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/385a26ad

Branch: refs/heads/master
Commit: 385a26ad89a29492ff55bcba32ec104af54bf139
Parents: 40635f7
Author: Jason Dere <jd...@hortonworks.com>
Authored: Wed Jul 11 10:47:01 2018 -0700
Committer: Jason Dere <jd...@hortonworks.com>
Committed: Wed Jul 11 10:47:01 2018 -0700

----------------------------------------------------------------------
 .../apache/hadoop/hive/llap/LlapOutputFormatService.java |  4 +++-
 .../hadoop/hive/llap/WritableByteChannelAdapter.java     | 11 ++++++++---
 2 files changed, 11 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/385a26ad/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java 
b/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java
index c71c637..996f8b3 100644
--- a/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java
+++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java
@@ -199,10 +199,12 @@ public class LlapOutputFormatService {
       int maxPendingWrites = HiveConf.getIntVar(conf,
           HiveConf.ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_MAX_PENDING_WRITES);
       boolean useArrow = HiveConf.getBoolVar(conf, 
HiveConf.ConfVars.LLAP_OUTPUT_FORMAT_ARROW);
+      long allocatorMax = HiveConf.getLongVar(conf,
+          HiveConf.ConfVars.HIVE_ARROW_ROOT_ALLOCATOR_LIMIT);
       @SuppressWarnings("rawtypes")
       RecordWriter writer = null;
       if(useArrow) {
-        writer = new LlapArrowRecordWriter(new WritableByteChannelAdapter(ctx, 
maxPendingWrites, id));
+        writer = new LlapArrowRecordWriter(new WritableByteChannelAdapter(ctx, 
maxPendingWrites, id, allocatorMax));
       } else {
         writer = new LlapRecordWriter(id,
           new ChunkedOutputStream(

http://git-wip-us.apache.org/repos/asf/hive/blob/385a26ad/ql/src/java/org/apache/hadoop/hive/llap/WritableByteChannelAdapter.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/llap/WritableByteChannelAdapter.java 
b/ql/src/java/org/apache/hadoop/hive/llap/WritableByteChannelAdapter.java
index 57da1d9..753da22 100644
--- a/ql/src/java/org/apache/hadoop/hive/llap/WritableByteChannelAdapter.java
+++ b/ql/src/java/org/apache/hadoop/hive/llap/WritableByteChannelAdapter.java
@@ -18,7 +18,8 @@
 
 package org.apache.hadoop.hive.llap;
 
-import io.netty.buffer.Unpooled;
+import org.apache.hadoop.hive.ql.io.arrow.RootAllocatorFactory;
+import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelHandlerContext;
 
 import java.io.IOException;
@@ -48,6 +49,7 @@ public class WritableByteChannelAdapter implements 
WritableByteChannel {
   private final Semaphore writeResources;
   private boolean closed = false;
   private final String id;
+  private long allocatorMax;
 
   private ChannelFutureListener writeListener = new ChannelFutureListener() {
     @Override
@@ -75,11 +77,12 @@ public class WritableByteChannelAdapter implements 
WritableByteChannel {
     }
   };
 
-  public WritableByteChannelAdapter(ChannelHandlerContext chc, int 
maxPendingWrites, String id) {
+  public WritableByteChannelAdapter(ChannelHandlerContext chc, int 
maxPendingWrites, String id, long allocatorMax) {
     this.chc = chc;
     this.maxPendingWrites = maxPendingWrites;
     this.writeResources = new Semaphore(maxPendingWrites);
     this.id = id;
+    this.allocatorMax = allocatorMax;
   }
 
   @Override
@@ -87,7 +90,9 @@ public class WritableByteChannelAdapter implements 
WritableByteChannel {
     int size = src.remaining();
     //Down the semaphore or block until available
     takeWriteResources(1);
-    chc.writeAndFlush(Unpooled.wrappedBuffer(src)).addListener(writeListener);
+    ByteBuf buf = 
RootAllocatorFactory.INSTANCE.getOrCreateRootAllocator(allocatorMax).buffer(size);
+    buf.writeBytes(src);
+    chc.writeAndFlush(buf).addListener(writeListener);
     return size;
   }
 

Reply via email to