Repository: hive
Updated Branches:
  refs/heads/master 9d78fac36 -> 2820fc4c6


HIVE-20203: Arrow SerDe leaks a DirectByteBuffer (Eric Wohlstadter, reviewed by 
Teddy Choi)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/2820fc4c
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/2820fc4c
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/2820fc4c

Branch: refs/heads/master
Commit: 2820fc4c6d576bb9543bb627ab6f182f17a5c771
Parents: 9d78fac
Author: Teddy Choi <[email protected]>
Authored: Thu Jul 26 12:03:03 2018 +0900
Committer: Teddy Choi <[email protected]>
Committed: Thu Jul 26 12:03:03 2018 +0900

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |  2 ++
 .../hadoop/hive/llap/LlapArrowRecordWriter.java | 25 ++++++++++++++++----
 .../hive/llap/LlapOutputFormatService.java      |  4 +---
 .../hive/llap/WritableByteChannelAdapter.java   | 13 ++++++----
 .../hive/ql/io/arrow/ArrowWrapperWritable.java  | 19 +++++++++++++++
 .../hadoop/hive/ql/io/arrow/Serializer.java     | 17 ++++++++++---
 6 files changed, 65 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/2820fc4c/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java 
b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 18696ad..15217e7 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2657,6 +2657,8 @@ public class HiveConf extends Configuration {
     // For Arrow SerDe
     HIVE_ARROW_ROOT_ALLOCATOR_LIMIT("hive.arrow.root.allocator.limit", 
Long.MAX_VALUE,
         "Arrow root allocator memory size limitation in bytes."),
+    HIVE_ARROW_BATCH_ALLOCATOR_LIMIT("hive.arrow.batch.allocator.limit", 
10_000_000_000L,
+        "Max bytes per arrow batch. This is a threshold, the memory is not 
pre-allocated."),
     HIVE_ARROW_BATCH_SIZE("hive.arrow.batch.size", 1000, "The number of rows 
sent in one Arrow batch."),
 
     // For Druid storage handler

http://git-wip-us.apache.org/repos/asf/hive/blob/2820fc4c/ql/src/java/org/apache/hadoop/hive/llap/LlapArrowRecordWriter.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapArrowRecordWriter.java 
b/ql/src/java/org/apache/hadoop/hive/llap/LlapArrowRecordWriter.java
index 1b3a3eb..9ee1048 100644
--- a/ql/src/java/org/apache/hadoop/hive/llap/LlapArrowRecordWriter.java
+++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapArrowRecordWriter.java
@@ -20,11 +20,12 @@ package org.apache.hadoop.hive.llap;
 
 import java.io.IOException;
 
+import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.vector.VectorSchemaRoot;
 import org.apache.arrow.vector.ipc.ArrowStreamWriter;
 import org.apache.hadoop.hive.ql.io.arrow.ArrowWrapperWritable;
+import org.apache.arrow.vector.complex.NullableMapVector;
 import org.apache.hadoop.io.Writable;
-import java.nio.channels.WritableByteChannel;
 import org.apache.hadoop.mapred.RecordWriter;
 import org.apache.hadoop.mapred.Reporter;
 import org.slf4j.Logger;
@@ -47,15 +48,28 @@ public class LlapArrowRecordWriter<K extends Writable, V 
extends Writable>
   public static final Logger LOG = 
LoggerFactory.getLogger(LlapArrowRecordWriter.class);
 
   ArrowStreamWriter arrowStreamWriter;
-  WritableByteChannel out;
+  WritableByteChannelAdapter out;
+  BufferAllocator allocator;
+  NullableMapVector rootVector;
 
-  public LlapArrowRecordWriter(WritableByteChannel out) {
+  public LlapArrowRecordWriter(WritableByteChannelAdapter out) {
     this.out = out;
   }
 
   @Override
   public void close(Reporter reporter) throws IOException {
-    arrowStreamWriter.close();
+    try {
+      arrowStreamWriter.close();
+    } finally {
+      rootVector.close();
+      //bytesLeaked should always be 0
+      long bytesLeaked = allocator.getAllocatedMemory();
+      if(bytesLeaked != 0) {
+        LOG.error("Arrow memory leaked bytes: {}", bytesLeaked);
+        throw new IllegalStateException("Arrow memory leaked bytes:" + 
bytesLeaked);
+      }
+      allocator.close();
+    }
   }
 
   @Override
@@ -64,6 +78,9 @@ public class LlapArrowRecordWriter<K extends Writable, V 
extends Writable>
     if (arrowStreamWriter == null) {
       VectorSchemaRoot vectorSchemaRoot = 
arrowWrapperWritable.getVectorSchemaRoot();
       arrowStreamWriter = new ArrowStreamWriter(vectorSchemaRoot, null, out);
+      allocator = arrowWrapperWritable.getAllocator();
+      this.out.setAllocator(allocator);
+      rootVector = arrowWrapperWritable.getRootVector();
     }
     arrowStreamWriter.writeBatch();
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/2820fc4c/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java 
b/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java
index 996f8b3..c71c637 100644
--- a/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java
+++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java
@@ -199,12 +199,10 @@ public class LlapOutputFormatService {
       int maxPendingWrites = HiveConf.getIntVar(conf,
           HiveConf.ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_MAX_PENDING_WRITES);
       boolean useArrow = HiveConf.getBoolVar(conf, 
HiveConf.ConfVars.LLAP_OUTPUT_FORMAT_ARROW);
-      long allocatorMax = HiveConf.getLongVar(conf,
-          HiveConf.ConfVars.HIVE_ARROW_ROOT_ALLOCATOR_LIMIT);
       @SuppressWarnings("rawtypes")
       RecordWriter writer = null;
       if(useArrow) {
-        writer = new LlapArrowRecordWriter(new WritableByteChannelAdapter(ctx, 
maxPendingWrites, id, allocatorMax));
+        writer = new LlapArrowRecordWriter(new WritableByteChannelAdapter(ctx, 
maxPendingWrites, id));
       } else {
         writer = new LlapRecordWriter(id,
           new ChunkedOutputStream(

http://git-wip-us.apache.org/repos/asf/hive/blob/2820fc4c/ql/src/java/org/apache/hadoop/hive/llap/WritableByteChannelAdapter.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/llap/WritableByteChannelAdapter.java 
b/ql/src/java/org/apache/hadoop/hive/llap/WritableByteChannelAdapter.java
index 753da22..b07ce5b 100644
--- a/ql/src/java/org/apache/hadoop/hive/llap/WritableByteChannelAdapter.java
+++ b/ql/src/java/org/apache/hadoop/hive/llap/WritableByteChannelAdapter.java
@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.hive.llap;
 
-import org.apache.hadoop.hive.ql.io.arrow.RootAllocatorFactory;
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelHandlerContext;
 
@@ -26,6 +25,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.WritableByteChannel;
 import java.util.concurrent.Semaphore;
+import org.apache.arrow.memory.BufferAllocator;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -49,7 +49,7 @@ public class WritableByteChannelAdapter implements 
WritableByteChannel {
   private final Semaphore writeResources;
   private boolean closed = false;
   private final String id;
-  private long allocatorMax;
+  private BufferAllocator allocator;
 
   private ChannelFutureListener writeListener = new ChannelFutureListener() {
     @Override
@@ -77,12 +77,15 @@ public class WritableByteChannelAdapter implements 
WritableByteChannel {
     }
   };
 
-  public WritableByteChannelAdapter(ChannelHandlerContext chc, int 
maxPendingWrites, String id, long allocatorMax) {
+  public WritableByteChannelAdapter(ChannelHandlerContext chc, int 
maxPendingWrites, String id) {
     this.chc = chc;
     this.maxPendingWrites = maxPendingWrites;
     this.writeResources = new Semaphore(maxPendingWrites);
     this.id = id;
-    this.allocatorMax = allocatorMax;
+  }
+
+  public void setAllocator(BufferAllocator allocator) {
+    this.allocator = allocator;
   }
 
   @Override
@@ -90,7 +93,7 @@ public class WritableByteChannelAdapter implements 
WritableByteChannel {
     int size = src.remaining();
     //Down the semaphore or block until available
     takeWriteResources(1);
-    ByteBuf buf = 
RootAllocatorFactory.INSTANCE.getOrCreateRootAllocator(allocatorMax).buffer(size);
+    ByteBuf buf = allocator.buffer(size);
     buf.writeBytes(src);
     chc.writeAndFlush(buf).addListener(writeListener);
     return size;

http://git-wip-us.apache.org/repos/asf/hive/blob/2820fc4c/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/ArrowWrapperWritable.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/ArrowWrapperWritable.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/ArrowWrapperWritable.java
index dd490b1..40813fa 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/ArrowWrapperWritable.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/ArrowWrapperWritable.java
@@ -18,8 +18,10 @@
 
 package org.apache.hadoop.hive.ql.io.arrow;
 
+import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.vector.VectorSchemaRoot;
 import org.apache.hadoop.io.WritableComparable;
+import org.apache.arrow.vector.complex.NullableMapVector;
 
 import java.io.DataInput;
 import java.io.DataOutput;
@@ -27,10 +29,19 @@ import java.io.IOException;
 
 public class ArrowWrapperWritable implements WritableComparable {
   private VectorSchemaRoot vectorSchemaRoot;
+  private BufferAllocator allocator;
+  private NullableMapVector rootVector;
 
   public ArrowWrapperWritable(VectorSchemaRoot vectorSchemaRoot) {
     this.vectorSchemaRoot = vectorSchemaRoot;
   }
+
+  public ArrowWrapperWritable(VectorSchemaRoot vectorSchemaRoot, 
BufferAllocator allocator, NullableMapVector rootVector) {
+    this.vectorSchemaRoot = vectorSchemaRoot;
+    this.allocator = allocator;
+    this.rootVector = rootVector;
+  }
+
   public ArrowWrapperWritable() {}
 
   public VectorSchemaRoot getVectorSchemaRoot() {
@@ -41,6 +52,14 @@ public class ArrowWrapperWritable implements 
WritableComparable {
     this.vectorSchemaRoot = vectorSchemaRoot;
   }
 
+  public BufferAllocator getAllocator() {
+    return allocator;
+  }
+
+  public NullableMapVector getRootVector() {
+    return rootVector;
+  }
+
   @Override
   public void write(DataOutput dataOutput) throws IOException {
     throw new UnsupportedOperationException();

http://git-wip-us.apache.org/repos/asf/hive/blob/2820fc4c/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java
index 2961050..65a889e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java
@@ -69,11 +69,13 @@ import 
org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo;
+import org.apache.arrow.memory.BufferAllocator;
 
 import java.util.ArrayList;
 import java.util.List;
 
 import static 
org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_ARROW_BATCH_SIZE;
+import static 
org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_ARROW_BATCH_ALLOCATOR_LIMIT;
 import static 
org.apache.hadoop.hive.ql.exec.vector.VectorizedBatchUtil.createColumnVector;
 import static 
org.apache.hadoop.hive.ql.io.arrow.ArrowColumnarBatchSerDe.MICROS_PER_MILLIS;
 import static 
org.apache.hadoop.hive.ql.io.arrow.ArrowColumnarBatchSerDe.MILLIS_PER_SECOND;
@@ -96,20 +98,29 @@ class Serializer {
   private final VectorizedRowBatch vectorizedRowBatch;
   private final VectorAssignRow vectorAssignRow;
   private int batchSize;
+  private BufferAllocator allocator;
 
   private final NullableMapVector rootVector;
 
   Serializer(ArrowColumnarBatchSerDe serDe) throws SerDeException {
     MAX_BUFFERED_ROWS = HiveConf.getIntVar(serDe.conf, HIVE_ARROW_BATCH_SIZE);
+    long childAllocatorLimit = HiveConf.getLongVar(serDe.conf, 
HIVE_ARROW_BATCH_ALLOCATOR_LIMIT);
     ArrowColumnarBatchSerDe.LOG.info("ArrowColumnarBatchSerDe max number of 
buffered columns: " + MAX_BUFFERED_ROWS);
+    String childAllocatorName = Thread.currentThread().getName();
+    //Use per-task allocator for accounting only, no need to reserve per-task 
memory
+    long childAllocatorReservation = 0L;
+    //Break out accounting of direct memory per-task, so we can check no 
memory is leaked when task is completed
+    allocator = serDe.rootAllocator.newChildAllocator(
+       childAllocatorName,
+       childAllocatorReservation,
+       childAllocatorLimit);
 
     // Schema
     structTypeInfo = (StructTypeInfo) 
getTypeInfoFromObjectInspector(serDe.rowObjectInspector);
     List<TypeInfo> fieldTypeInfos = 
structTypeInfo.getAllStructFieldTypeInfos();
     fieldSize = fieldTypeInfos.size();
-
     // Init Arrow stuffs
-    rootVector = NullableMapVector.empty(null, serDe.rootAllocator);
+    rootVector = NullableMapVector.empty(null, allocator);
 
     // Init Hive stuffs
     vectorizedRowBatch = new VectorizedRowBatch(fieldSize);
@@ -146,7 +157,7 @@ class Serializer {
 
     batchSize = 0;
     VectorSchemaRoot vectorSchemaRoot = new VectorSchemaRoot(rootVector);
-    return new ArrowWrapperWritable(vectorSchemaRoot);
+    return new ArrowWrapperWritable(vectorSchemaRoot, allocator, rootVector);
   }
 
   private FieldType toFieldType(TypeInfo typeInfo) {

Reply via email to