Repository: hive Updated Branches: refs/heads/master 9d78fac36 -> 2820fc4c6
HIVE-20203: Arrow SerDe leaks a DirectByteBuffer (Eric Wohlstadter, reviewed by Teddy Choi) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/2820fc4c Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/2820fc4c Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/2820fc4c Branch: refs/heads/master Commit: 2820fc4c6d576bb9543bb627ab6f182f17a5c771 Parents: 9d78fac Author: Teddy Choi <[email protected]> Authored: Thu Jul 26 12:03:03 2018 +0900 Committer: Teddy Choi <[email protected]> Committed: Thu Jul 26 12:03:03 2018 +0900 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 2 ++ .../hadoop/hive/llap/LlapArrowRecordWriter.java | 25 ++++++++++++++++---- .../hive/llap/LlapOutputFormatService.java | 4 +--- .../hive/llap/WritableByteChannelAdapter.java | 13 ++++++---- .../hive/ql/io/arrow/ArrowWrapperWritable.java | 19 +++++++++++++++ .../hadoop/hive/ql/io/arrow/Serializer.java | 17 ++++++++++--- 6 files changed, 65 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/2820fc4c/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 18696ad..15217e7 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2657,6 +2657,8 @@ public class HiveConf extends Configuration { // For Arrow SerDe HIVE_ARROW_ROOT_ALLOCATOR_LIMIT("hive.arrow.root.allocator.limit", Long.MAX_VALUE, "Arrow root allocator memory size limitation in bytes."), + HIVE_ARROW_BATCH_ALLOCATOR_LIMIT("hive.arrow.batch.allocator.limit", 10_000_000_000L, + "Max bytes per arrow batch. This is a threshold, the memory is not pre-allocated."), HIVE_ARROW_BATCH_SIZE("hive.arrow.batch.size", 1000, "The number of rows sent in one Arrow batch."), // For Druid storage handler http://git-wip-us.apache.org/repos/asf/hive/blob/2820fc4c/ql/src/java/org/apache/hadoop/hive/llap/LlapArrowRecordWriter.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapArrowRecordWriter.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapArrowRecordWriter.java index 1b3a3eb..9ee1048 100644 --- a/ql/src/java/org/apache/hadoop/hive/llap/LlapArrowRecordWriter.java +++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapArrowRecordWriter.java @@ -20,11 +20,12 @@ package org.apache.hadoop.hive.llap; import java.io.IOException; +import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.arrow.vector.ipc.ArrowStreamWriter; import org.apache.hadoop.hive.ql.io.arrow.ArrowWrapperWritable; +import org.apache.arrow.vector.complex.NullableMapVector; import org.apache.hadoop.io.Writable; -import java.nio.channels.WritableByteChannel; import org.apache.hadoop.mapred.RecordWriter; import org.apache.hadoop.mapred.Reporter; import org.slf4j.Logger; @@ -47,15 +48,28 @@ public class LlapArrowRecordWriter<K extends Writable, V extends Writable> public static final Logger LOG = LoggerFactory.getLogger(LlapArrowRecordWriter.class); ArrowStreamWriter arrowStreamWriter; - WritableByteChannel out; + WritableByteChannelAdapter out; + BufferAllocator allocator; + NullableMapVector rootVector; - public LlapArrowRecordWriter(WritableByteChannel out) { + public LlapArrowRecordWriter(WritableByteChannelAdapter out) { this.out = out; } @Override public void close(Reporter reporter) throws IOException { - arrowStreamWriter.close(); + try { + arrowStreamWriter.close(); + } finally { + rootVector.close(); + //bytesLeaked should always be 0 + long bytesLeaked = allocator.getAllocatedMemory(); + if(bytesLeaked != 0) { + LOG.error("Arrow memory leaked bytes: {}", bytesLeaked); + throw new IllegalStateException("Arrow memory leaked bytes:" + bytesLeaked); + } + allocator.close(); + } } @Override @@ -64,6 +78,9 @@ public class LlapArrowRecordWriter<K extends Writable, V extends Writable> if (arrowStreamWriter == null) { VectorSchemaRoot vectorSchemaRoot = arrowWrapperWritable.getVectorSchemaRoot(); arrowStreamWriter = new ArrowStreamWriter(vectorSchemaRoot, null, out); + allocator = arrowWrapperWritable.getAllocator(); + this.out.setAllocator(allocator); + rootVector = arrowWrapperWritable.getRootVector(); } arrowStreamWriter.writeBatch(); } http://git-wip-us.apache.org/repos/asf/hive/blob/2820fc4c/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java index 996f8b3..c71c637 100644 --- a/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java +++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java @@ -199,12 +199,10 @@ public class LlapOutputFormatService { int maxPendingWrites = HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_MAX_PENDING_WRITES); boolean useArrow = HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_OUTPUT_FORMAT_ARROW); - long allocatorMax = HiveConf.getLongVar(conf, - HiveConf.ConfVars.HIVE_ARROW_ROOT_ALLOCATOR_LIMIT); @SuppressWarnings("rawtypes") RecordWriter writer = null; if(useArrow) { - writer = new LlapArrowRecordWriter(new WritableByteChannelAdapter(ctx, maxPendingWrites, id, allocatorMax)); + writer = new LlapArrowRecordWriter(new WritableByteChannelAdapter(ctx, maxPendingWrites, id)); } else { writer = new LlapRecordWriter(id, new ChunkedOutputStream( http://git-wip-us.apache.org/repos/asf/hive/blob/2820fc4c/ql/src/java/org/apache/hadoop/hive/llap/WritableByteChannelAdapter.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/llap/WritableByteChannelAdapter.java b/ql/src/java/org/apache/hadoop/hive/llap/WritableByteChannelAdapter.java index 753da22..b07ce5b 100644 --- a/ql/src/java/org/apache/hadoop/hive/llap/WritableByteChannelAdapter.java +++ b/ql/src/java/org/apache/hadoop/hive/llap/WritableByteChannelAdapter.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hive.llap; -import org.apache.hadoop.hive.ql.io.arrow.RootAllocatorFactory; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; @@ -26,6 +25,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.WritableByteChannel; import java.util.concurrent.Semaphore; +import org.apache.arrow.memory.BufferAllocator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,7 +49,7 @@ public class WritableByteChannelAdapter implements WritableByteChannel { private final Semaphore writeResources; private boolean closed = false; private final String id; - private long allocatorMax; + private BufferAllocator allocator; private ChannelFutureListener writeListener = new ChannelFutureListener() { @Override @@ -77,12 +77,15 @@ public class WritableByteChannelAdapter implements WritableByteChannel { } }; - public WritableByteChannelAdapter(ChannelHandlerContext chc, int maxPendingWrites, String id, long allocatorMax) { + public WritableByteChannelAdapter(ChannelHandlerContext chc, int maxPendingWrites, String id) { this.chc = chc; this.maxPendingWrites = maxPendingWrites; this.writeResources = new Semaphore(maxPendingWrites); this.id = id; - this.allocatorMax = allocatorMax; + } + + public void setAllocator(BufferAllocator allocator) { + this.allocator = allocator; } @Override @@ -90,7 +93,7 @@ public class WritableByteChannelAdapter implements WritableByteChannel { int size = src.remaining(); //Down the semaphore or block until available takeWriteResources(1); - ByteBuf buf = RootAllocatorFactory.INSTANCE.getOrCreateRootAllocator(allocatorMax).buffer(size); + ByteBuf buf = allocator.buffer(size); buf.writeBytes(src); chc.writeAndFlush(buf).addListener(writeListener); return size; http://git-wip-us.apache.org/repos/asf/hive/blob/2820fc4c/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/ArrowWrapperWritable.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/ArrowWrapperWritable.java b/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/ArrowWrapperWritable.java index dd490b1..40813fa 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/ArrowWrapperWritable.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/ArrowWrapperWritable.java @@ -18,8 +18,10 @@ package org.apache.hadoop.hive.ql.io.arrow; +import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.hadoop.io.WritableComparable; +import org.apache.arrow.vector.complex.NullableMapVector; import java.io.DataInput; import java.io.DataOutput; @@ -27,10 +29,19 @@ import java.io.IOException; public class ArrowWrapperWritable implements WritableComparable { private VectorSchemaRoot vectorSchemaRoot; + private BufferAllocator allocator; + private NullableMapVector rootVector; public ArrowWrapperWritable(VectorSchemaRoot vectorSchemaRoot) { this.vectorSchemaRoot = vectorSchemaRoot; } + + public ArrowWrapperWritable(VectorSchemaRoot vectorSchemaRoot, BufferAllocator allocator, NullableMapVector rootVector) { + this.vectorSchemaRoot = vectorSchemaRoot; + this.allocator = allocator; + this.rootVector = rootVector; + } + public ArrowWrapperWritable() {} public VectorSchemaRoot getVectorSchemaRoot() { @@ -41,6 +52,14 @@ public class ArrowWrapperWritable implements WritableComparable { this.vectorSchemaRoot = vectorSchemaRoot; } + public BufferAllocator getAllocator() { + return allocator; + } + + public NullableMapVector getRootVector() { + return rootVector; + } + @Override public void write(DataOutput dataOutput) throws IOException { throw new UnsupportedOperationException(); http://git-wip-us.apache.org/repos/asf/hive/blob/2820fc4c/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java b/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java index 2961050..65a889e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java @@ -69,11 +69,13 @@ import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo; +import org.apache.arrow.memory.BufferAllocator; import java.util.ArrayList; import java.util.List; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_ARROW_BATCH_SIZE; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_ARROW_BATCH_ALLOCATOR_LIMIT; import static org.apache.hadoop.hive.ql.exec.vector.VectorizedBatchUtil.createColumnVector; import static org.apache.hadoop.hive.ql.io.arrow.ArrowColumnarBatchSerDe.MICROS_PER_MILLIS; import static org.apache.hadoop.hive.ql.io.arrow.ArrowColumnarBatchSerDe.MILLIS_PER_SECOND; @@ -96,20 +98,29 @@ class Serializer { private final VectorizedRowBatch vectorizedRowBatch; private final VectorAssignRow vectorAssignRow; private int batchSize; + private BufferAllocator allocator; private final NullableMapVector rootVector; Serializer(ArrowColumnarBatchSerDe serDe) throws SerDeException { MAX_BUFFERED_ROWS = HiveConf.getIntVar(serDe.conf, HIVE_ARROW_BATCH_SIZE); + long childAllocatorLimit = HiveConf.getLongVar(serDe.conf, HIVE_ARROW_BATCH_ALLOCATOR_LIMIT); ArrowColumnarBatchSerDe.LOG.info("ArrowColumnarBatchSerDe max number of buffered columns: " + MAX_BUFFERED_ROWS); + String childAllocatorName = Thread.currentThread().getName(); + //Use per-task allocator for accounting only, no need to reserve per-task memory + long childAllocatorReservation = 0L; + //Break out accounting of direct memory per-task, so we can check no memory is leaked when task is completed + allocator = serDe.rootAllocator.newChildAllocator( + childAllocatorName, + childAllocatorReservation, + childAllocatorLimit); // Schema structTypeInfo = (StructTypeInfo) getTypeInfoFromObjectInspector(serDe.rowObjectInspector); List<TypeInfo> fieldTypeInfos = structTypeInfo.getAllStructFieldTypeInfos(); fieldSize = fieldTypeInfos.size(); - // Init Arrow stuffs - rootVector = NullableMapVector.empty(null, serDe.rootAllocator); + rootVector = NullableMapVector.empty(null, allocator); // Init Hive stuffs vectorizedRowBatch = new VectorizedRowBatch(fieldSize); @@ -146,7 +157,7 @@ class Serializer { batchSize = 0; VectorSchemaRoot vectorSchemaRoot = new VectorSchemaRoot(rootVector); - return new ArrowWrapperWritable(vectorSchemaRoot); + return new ArrowWrapperWritable(vectorSchemaRoot, allocator, rootVector); } private FieldType toFieldType(TypeInfo typeInfo) {
