Use Direct Memory in Parquet Writer

Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/fab96c34
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/fab96c34
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/fab96c34

Branch: refs/heads/master
Commit: fab96c34cfc392b60fffc75b09c5d1927a72f33a
Parents: a0a8256
Author: Parth Chandra <pchan...@maprtech.com>
Authored: Wed Aug 20 13:37:27 2014 -0700
Committer: Parth Chandra <pchan...@maprtech.com>
Committed: Wed Aug 20 14:16:15 2014 -0700

----------------------------------------------------------------------
 exec/java-exec/pom.xml                          | 12 ++--
 .../ParquetDirectByteBufferAllocator.java       | 73 ++++++++++++++++++++
 .../exec/store/parquet/ParquetFormatPlugin.java |  8 ++-
 .../exec/store/parquet/ParquetRecordWriter.java | 47 +++++++++----
 .../ColumnChunkPageWriteStoreExposer.java       | 16 ++++-
 .../exec/store/parquet/TestFileGenerator.java   | 11 ++-
 6 files changed, 142 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fab96c34/exec/java-exec/pom.xml
----------------------------------------------------------------------
diff --git a/exec/java-exec/pom.xml b/exec/java-exec/pom.xml
index 685f2fe..903f953 100644
--- a/exec/java-exec/pom.xml
+++ b/exec/java-exec/pom.xml
@@ -125,7 +125,7 @@
     <dependency>
       <groupId>com.twitter</groupId>
       <artifactId>parquet-column</artifactId>
-      <version>1.5.1-drill-r2</version>
+      <version>1.5.1-drill-r3</version>
       <exclusions>
         <exclusion>
           <groupId>org.apache.hadoop</groupId>
@@ -140,7 +140,7 @@
     <dependency>
       <groupId>com.twitter</groupId>
       <artifactId>parquet-hadoop</artifactId>
-      <version>1.5.1-drill-r2</version>
+      <version>1.5.1-drill-r3</version>
       <exclusions>
         <exclusion>
           <groupId>org.apache.hadoop</groupId>
@@ -170,7 +170,7 @@
     <dependency>
       <groupId>com.twitter</groupId>
       <artifactId>parquet-common</artifactId>
-      <version>1.5.1-drill-r2</version>
+      <version>1.5.1-drill-r3</version>
       <exclusions>
         <exclusion>
           <groupId>org.apache.hadoop</groupId>
@@ -185,7 +185,7 @@
     <dependency>
       <groupId>com.twitter</groupId>
       <artifactId>parquet-jackson</artifactId>
-      <version>1.5.1-drill-r2</version>
+      <version>1.5.1-drill-r3</version>
       <exclusions>
         <exclusion>
           <groupId>org.apache.hadoop</groupId>
@@ -200,7 +200,7 @@
     <dependency>
       <groupId>com.twitter</groupId>
       <artifactId>parquet-encoding</artifactId>
-      <version>1.5.1-drill-r2</version>
+      <version>1.5.1-drill-r3</version>
       <exclusions>
         <exclusion>
           <groupId>org.apache.hadoop</groupId>
@@ -215,7 +215,7 @@
     <dependency>
       <groupId>com.twitter</groupId>
       <artifactId>parquet-generator</artifactId>
-      <version>1.5.1-drill-r2</version>
+      <version>1.5.1-drill-r3</version>
       <exclusions>
         <exclusion>
           <groupId>org.apache.hadoop</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fab96c34/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetDirectByteBufferAllocator.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetDirectByteBufferAllocator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetDirectByteBufferAllocator.java
new file mode 100644
index 0000000..1a49dcd
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetDirectByteBufferAllocator.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.parquet;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.drill.exec.ops.OperatorContext;
+import parquet.bytes.ByteBufferAllocator;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+public class ParquetDirectByteBufferAllocator implements ByteBufferAllocator {
+  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(ParquetDirectByteBufferAllocator.class);
+
+  private OperatorContext oContext;
+  private HashMap<Integer, ByteBuf> allocatedBuffers = new HashMap<Integer, 
ByteBuf>();
+
+  public ParquetDirectByteBufferAllocator(OperatorContext o){
+    oContext=o;
+  }
+
+
+  @Override
+  public ByteBuffer allocate(int sz) {
+    ByteBuf bb = oContext.getAllocator().buffer(sz);
+    ByteBuffer b = bb.nioBuffer(0, sz);
+    allocatedBuffers.put(System.identityHashCode(b), bb);
+    logger.debug("ParquetDirectByteBufferAllocator: Allocated "+sz+" bytes. 
Allocated ByteBuffer id: "+System.identityHashCode(b));
+    return b;
+  }
+
+  @Override
+  public void release(ByteBuffer b) {
+    Integer id = System.identityHashCode(b);
+    ByteBuf bb = allocatedBuffers.get(id);
+    // The ByteBuffer passed in may already have been freed or not allocated 
by this allocator.
+    // If it is not found in the allocated buffers, do nothing
+    if(bb!=null) {
+      logger.debug("ParquetDirectByteBufferAllocator: Freed byte buffer. 
Allocated ByteBuffer id: "+System.identityHashCode(b));
+      bb.release();
+      allocatedBuffers.remove(id);
+    }
+  }
+
+  public void clear(){
+    Iterator it = allocatedBuffers.entrySet().iterator();
+    while (it.hasNext()) {
+      Map.Entry pair = (Map.Entry)it.next();
+      Integer id = (Integer)pair.getKey();
+      ByteBuf bb = allocatedBuffers.get(id);
+      bb.release();
+      it.remove();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fab96c34/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
index 37d6403..eb07d79 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
@@ -27,7 +27,9 @@ import 
org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.base.AbstractWriter;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.impl.WriterRecordBatch;
@@ -71,7 +73,7 @@ public class ParquetFormatPlugin implements FormatPlugin{
   private final ParquetFormatConfig config;
   private final StoragePluginConfig storageConfig;
   private final String name;
-  
+
   public ParquetFormatPlugin(String name, DrillbitContext context, 
DrillFileSystem fs, StoragePluginConfig storageConfig){
     this(name, context, fs, storageConfig, new ParquetFormatConfig());
   }
@@ -118,7 +120,7 @@ public class ParquetFormatPlugin implements FormatPlugin{
     return new ParquetWriter(child, location, this);
   }
 
-  public RecordWriter getRecordWriter(FragmentContext context, ParquetWriter 
writer) throws IOException {
+  public RecordWriter getRecordWriter(FragmentContext context, ParquetWriter 
writer) throws IOException, OutOfMemoryException {
     Map<String, String> options = Maps.newHashMap();
 
     options.put("location", writer.getLocation());
@@ -131,7 +133,7 @@ public class ParquetFormatPlugin implements FormatPlugin{
 
     options.put(ExecConstants.PARQUET_BLOCK_SIZE, 
context.getOptions().getOption(ExecConstants.PARQUET_BLOCK_SIZE).num_val.toString());
 
-    RecordWriter recordWriter = new ParquetRecordWriter();
+    RecordWriter recordWriter = new ParquetRecordWriter(context, writer);
     recordWriter.init(options);
 
     return recordWriter;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fab96c34/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
index 94ccc13..2c5f232 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
@@ -21,14 +21,14 @@ import com.google.common.collect.Lists;
 import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.expr.holders.ComplexHolder;
+import org.apache.drill.exec.memory.OutOfMemoryException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.store.EventBasedRecordWriter;
 import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter;
 import org.apache.drill.exec.store.ParquetOutputRecordWriter;
-import org.apache.drill.exec.vector.ValueVector;
-import org.apache.drill.exec.vector.complex.MapVector;
 import org.apache.drill.exec.vector.complex.reader.FieldReader;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -67,7 +67,7 @@ public class ParquetRecordWriter extends 
ParquetOutputRecordWriter {
   private static final int MINIMUM_RECORD_COUNT_FOR_CHECK = 100;
   private static final int MAXIMUM_RECORD_COUNT_FOR_CHECK = 10000;
 
-  private ParquetFileWriter w;
+  private ParquetFileWriter parquetFileWriter;
   private MessageType schema;
   private Map<String, String> extraMetaData = new HashMap();
   private int blockSize;
@@ -91,6 +91,13 @@ public class ParquetRecordWriter extends 
ParquetOutputRecordWriter {
   private String location;
   private String prefix;
   private int index = 0;
+  private OperatorContext oContext;
+  private ParquetDirectByteBufferAllocator allocator;
+
+  public ParquetRecordWriter(FragmentContext context, ParquetWriter writer) 
throws OutOfMemoryException{
+    super();
+    this.oContext=new OperatorContext(writer, context);
+  }
 
   @Override
   public void init(Map<String, String> writerOptions) throws IOException {
@@ -121,11 +128,15 @@ public class ParquetRecordWriter extends 
ParquetOutputRecordWriter {
     schema = new MessageType("root", types);
 
     Path fileName = new Path(location, prefix + "_" + index + ".parquet");
-    w = new ParquetFileWriter(conf, schema, fileName);
-    w.start();
+    parquetFileWriter = new ParquetFileWriter(conf, schema, fileName);
+    parquetFileWriter.start();
 
     int initialBlockBufferSize = max(MINIMUM_BUFFER_SIZE, blockSize / 
this.schema.getColumns().size() / 5);
-    pageStore = 
ColumnChunkPageWriteStoreExposer.newColumnChunkPageWriteStore(codec, pageSize, 
this.schema, initialBlockBufferSize);
+    pageStore = 
ColumnChunkPageWriteStoreExposer.newColumnChunkPageWriteStore(this.oContext,
+      codec,
+      pageSize,
+      this.schema,
+      initialBlockBufferSize);
     int initialPageBufferSize = max(MINIMUM_BUFFER_SIZE, min(pageSize + 
pageSize / 10, initialBlockBufferSize));
     store = new ColumnWriteStoreImpl(pageStore, pageSize, 
initialPageBufferSize, dictionaryPageSize, enableDictionary, writerVersion);
     MessageColumnIO columnIO = new 
ColumnIOFactory(validating).getColumnIO(this.schema);
@@ -162,12 +173,14 @@ public class ParquetRecordWriter extends 
ParquetOutputRecordWriter {
   }
 
   private void flush() throws IOException {
-    w.startBlock(recordCount);
+    parquetFileWriter.startBlock(recordCount);
     store.flush();
-    ColumnChunkPageWriteStoreExposer.flushPageStore(pageStore, w);
+    ColumnChunkPageWriteStoreExposer.flushPageStore(pageStore, 
parquetFileWriter);
     recordCount = 0;
-    w.endBlock();
-    w.end(extraMetaData);
+    parquetFileWriter.endBlock();
+    parquetFileWriter.end(extraMetaData);
+    store.close();
+    ColumnChunkPageWriteStoreExposer.close(pageStore);
     store = null;
     pageStore = null;
     index++;
@@ -274,7 +287,17 @@ public class ParquetRecordWriter extends 
ParquetOutputRecordWriter {
   @Override
   public void cleanup() throws IOException {
     if (recordCount > 0) {
-      flush();
+      parquetFileWriter.startBlock(recordCount);
+      store.flush();
+      ColumnChunkPageWriteStoreExposer.flushPageStore(pageStore, 
parquetFileWriter);
+      recordCount = 0;
+      parquetFileWriter.endBlock();
+      parquetFileWriter.end(extraMetaData);
+    }
+    store.close();
+    ColumnChunkPageWriteStoreExposer.close(pageStore);
+    if(oContext!=null){
+      oContext.close();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fab96c34/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkPageWriteStoreExposer.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkPageWriteStoreExposer.java
 
b/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkPageWriteStoreExposer.java
index 54f647a..0ffc3ba 100644
--- 
a/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkPageWriteStoreExposer.java
+++ 
b/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkPageWriteStoreExposer.java
@@ -17,6 +17,8 @@
  */
 package parquet.hadoop;
 
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.store.parquet.ParquetDirectByteBufferAllocator;
 import org.apache.hadoop.conf.Configuration;
 import parquet.column.page.PageWriteStore;
 import parquet.hadoop.CodecFactory.BytesCompressor;
@@ -27,13 +29,23 @@ import java.io.IOException;
 
 public class ColumnChunkPageWriteStoreExposer {
 
-  public static ColumnChunkPageWriteStore 
newColumnChunkPageWriteStore(CompressionCodecName codec, int pageSize, 
MessageType schema, int initialSize) {
+  public static ColumnChunkPageWriteStore 
newColumnChunkPageWriteStore(OperatorContext oContext,
+                                                                       
CompressionCodecName codec,
+                                                                       int 
pageSize,
+                                                                       
MessageType schema,
+                                                                       int 
initialSize) {
     BytesCompressor compressor = new CodecFactory(new 
Configuration()).getCompressor(codec, pageSize);
-    return new ColumnChunkPageWriteStore(compressor, schema, initialSize);
+    return new ColumnChunkPageWriteStore(compressor, schema, initialSize, new 
ParquetDirectByteBufferAllocator(oContext));
   }
 
   public static void flushPageStore(PageWriteStore pageStore, 
ParquetFileWriter w) throws IOException {
     ((ColumnChunkPageWriteStore) pageStore).flushToFileWriter(w);
   }
 
+  public static void close(PageWriteStore pageStore) throws IOException {
+    ((ColumnChunkPageWriteStore) pageStore).close();
+
+  }
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fab96c34/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestFileGenerator.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestFileGenerator.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestFileGenerator.java
index 3c0287d..fb1ea60 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestFileGenerator.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestFileGenerator.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
 import parquet.bytes.BytesInput;
+import parquet.bytes.DirectByteBufferAllocator;
 import parquet.column.ColumnDescriptor;
 import parquet.column.values.rle.RunLengthBitPackingHybridValuesWriter;
 import parquet.hadoop.ParquetFileWriter;
@@ -183,8 +184,14 @@ public class TestFileGenerator {
         w.startColumn(c1, props.recordsPerRowGroup, codec);
         int valsPerPage = (int) Math.ceil(props.recordsPerRowGroup / (float) 
fieldInfo.numberOfPages);
         byte[] bytes;
-        RunLengthBitPackingHybridValuesWriter defLevels = new 
RunLengthBitPackingHybridValuesWriter(MAX_EXPECTED_BIT_WIDTH_FOR_DEFINITION_LEVELS,
 valsPerPage);
-        RunLengthBitPackingHybridValuesWriter repLevels = new 
RunLengthBitPackingHybridValuesWriter(MAX_EXPECTED_BIT_WIDTH_FOR_DEFINITION_LEVELS,
 valsPerPage);
+        RunLengthBitPackingHybridValuesWriter defLevels = new 
RunLengthBitPackingHybridValuesWriter(
+          MAX_EXPECTED_BIT_WIDTH_FOR_DEFINITION_LEVELS,
+          valsPerPage,
+          new DirectByteBufferAllocator());
+        RunLengthBitPackingHybridValuesWriter repLevels = new 
RunLengthBitPackingHybridValuesWriter(
+          MAX_EXPECTED_BIT_WIDTH_FOR_DEFINITION_LEVELS,
+          valsPerPage,
+          new DirectByteBufferAllocator());
         // for variable length binary fields
         int bytesNeededToEncodeLength = 4;
         if ((int) fieldInfo.bitLength > 0) {

Reply via email to