DRILL-1329: External sort memory fixes

Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/437366f0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/437366f0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/437366f0

Branch: refs/heads/master
Commit: 437366f03e5de4c4692703007bff2f5a134720dd
Parents: fa3c8d5
Author: Steven Phillips <sphill...@maprtech.com>
Authored: Mon Aug 18 17:23:23 2014 -0700
Committer: Jacques Nadeau <jacq...@apache.org>
Committed: Fri Aug 29 17:51:51 2014 -0700

----------------------------------------------------------------------
 .../exec/store/hbase/HBaseRecordReader.java     |   1 +
 .../codegen/templates/NullableValueVectors.java |   8 +-
 .../templates/ParquetOutputRecordWriter.java    |   4 +-
 .../codegen/templates/RepeatedValueVectors.java |  10 +-
 .../templates/VariableLengthVectors.java        |   8 +-
 .../src/main/java/io/netty/buffer/DrillBuf.java |   4 +
 .../cache/VectorAccessibleSerializable.java     |  10 +-
 .../apache/drill/exec/client/DrillClient.java   |   3 +-
 .../drill/exec/memory/TopLevelAllocator.java    |   2 +-
 .../exec/physical/config/ExternalSort.java      |  13 +-
 .../drill/exec/physical/impl/ScanBatch.java     |  24 ++-
 .../impl/project/ProjectRecordBatch.java        |  12 +-
 .../exec/physical/impl/xsort/BatchGroup.java    | 161 ++++-----------
 .../physical/impl/xsort/ExternalSortBatch.java  | 206 +++++++++++--------
 .../impl/xsort/PriorityQueueCopier.java         |   5 +-
 .../impl/xsort/PriorityQueueCopierTemplate.java |  26 +--
 .../impl/xsort/PriorityQueueSelector.java       |  37 ----
 .../xsort/PriorityQueueSelectorTemplate.java    | 161 ---------------
 .../apache/drill/exec/record/WritableBatch.java |   2 +-
 .../exec/record/selection/SelectionVector2.java |  13 +-
 .../drill/exec/store/AbstractRecordReader.java  |  10 +
 .../apache/drill/exec/store/RecordReader.java   |   8 +
 .../exec/store/easy/json/JSONRecordReader2.java |   3 +-
 .../drill/exec/store/mock/MockRecordReader.java |  16 +-
 .../columnreaders/ParquetRecordReader.java      |  16 ++
 .../exec/store/parquet2/DrillParquetReader.java |  19 ++
 .../drill/exec/store/pojo/PojoRecordReader.java |  18 +-
 .../exec/store/text/DrillTextRecordReader.java  |   4 +-
 .../drill/exec/vector/BaseDataValueVector.java  |  17 +-
 .../drill/exec/vector/BaseValueVector.java      |   7 +-
 .../apache/drill/exec/vector/ObjectVector.java  |   4 +-
 .../apache/drill/exec/vector/ValueVector.java   |   8 +-
 .../drill/exec/vector/complex/MapVector.java    |   4 +-
 .../exec/vector/complex/RepeatedListVector.java |   4 +-
 .../exec/vector/complex/RepeatedMapVector.java  |   6 +-
 .../work/batch/UnlimitedRawBatchBuffer.java     |   5 +-
 .../apache/drill/exec/work/foreman/Foreman.java |  19 ++
 .../exec/work/fragment/FragmentExecutor.java    |   2 +-
 .../physical/impl/writer/TestParquetWriter.java |   3 +-
 39 files changed, 393 insertions(+), 490 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/437366f0/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
 
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
index 51a3151..ce44627 100644
--- 
a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
+++ 
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
@@ -38,6 +38,7 @@ import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.impl.OutputMutator;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.RecordReader;
 import org.apache.drill.exec.vector.NullableVarBinaryVector;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.VarBinaryVector;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/437366f0/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java 
b/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java
index cb52841..d9eae0f 100644
--- a/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java
+++ b/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java
@@ -73,9 +73,11 @@ public final class ${className} extends BaseValueVector 
implements <#if type.maj
   }
 
   @Override
-  public DrillBuf[] getBuffers() {
-    DrillBuf[] buffers = ObjectArrays.concat(bits.getBuffers(), 
values.getBuffers(), DrillBuf.class);
-    clear();
+  public DrillBuf[] getBuffers(boolean clear) {
+    DrillBuf[] buffers = ObjectArrays.concat(bits.getBuffers(clear), 
values.getBuffers(clear), DrillBuf.class);
+    if (clear) {
+      clear();
+    }
     return buffers;
   }
   

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/437366f0/exec/java-exec/src/main/codegen/templates/ParquetOutputRecordWriter.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/codegen/templates/ParquetOutputRecordWriter.java 
b/exec/java-exec/src/main/codegen/templates/ParquetOutputRecordWriter.java
index cb37a1b..2fbbc6f 100644
--- a/exec/java-exec/src/main/codegen/templates/ParquetOutputRecordWriter.java
+++ b/exec/java-exec/src/main/codegen/templates/ParquetOutputRecordWriter.java
@@ -41,7 +41,7 @@ import 
org.apache.drill.exec.vector.complex.reader.FieldReader;
 import parquet.io.api.RecordConsumer;
 import parquet.schema.MessageType;
 import parquet.io.api.Binary;
-import io.netty.buffer.ByteBuf;
+import io.netty.buffer.DrillBuf;
 import org.apache.drill.exec.memory.TopLevelAllocator;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.MaterializedField;
@@ -220,7 +220,7 @@ public abstract class ParquetOutputRecordWriter extends 
AbstractRecordWriter imp
       //consumer.endField(fieldName, fieldId);
     <#else>
     reader.read(holder);
-    ByteBuf buf = holder.buffer;
+    DrillBuf buf = holder.buffer;
     consumer.startField(fieldName, fieldId);
     
consumer.addBinary(Binary.fromByteBuffer(holder.buffer.nioBuffer(holder.start, 
holder.end - holder.start)));
     consumer.endField(fieldName, fieldId);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/437366f0/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java 
b/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java
index 97117a4..195f182 100644
--- a/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java
+++ b/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java
@@ -71,7 +71,7 @@ package org.apache.drill.exec.vector;
   }
 
   public void setCurrentValueCount(int count) {
-    values.setCurrentValueCount(count);
+    values.setCurrentValueCount(offsets.getAccessor().get(count));
   }
   
   public int getBufferSize(){
@@ -259,9 +259,11 @@ package org.apache.drill.exec.vector;
   </#if>
 
   @Override
-  public DrillBuf[] getBuffers() {
-    DrillBuf[] buffers = ObjectArrays.concat(offsets.getBuffers(), 
values.getBuffers(), DrillBuf.class);
-    clear();
+  public DrillBuf[] getBuffers(boolean clear) {
+    DrillBuf[] buffers = ObjectArrays.concat(offsets.getBuffers(clear), 
values.getBuffers(clear), DrillBuf.class);
+    if (clear) {
+      clear();
+    }
     return buffers;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/437366f0/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java 
b/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
index 95cd3cc..fe255a8 100644
--- a/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
+++ b/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
@@ -134,9 +134,11 @@ public final class ${minor.class}Vector extends 
BaseDataValueVector implements V
 
   
   @Override
-  public DrillBuf[] getBuffers() {
-    DrillBuf[] buffers = ObjectArrays.concat(offsetVector.getBuffers(), 
super.getBuffers(), DrillBuf.class);
-    clear();
+  public DrillBuf[] getBuffers(boolean clear) {
+    DrillBuf[] buffers = ObjectArrays.concat(offsetVector.getBuffers(clear), 
super.getBuffers(clear), DrillBuf.class);
+    if (clear) {
+      clear();
+    }
     return buffers;
   }
   

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/437366f0/exec/java-exec/src/main/java/io/netty/buffer/DrillBuf.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/io/netty/buffer/DrillBuf.java 
b/exec/java-exec/src/main/java/io/netty/buffer/DrillBuf.java
index 6ad5da8..5399239 100644
--- a/exec/java-exec/src/main/java/io/netty/buffer/DrillBuf.java
+++ b/exec/java-exec/src/main/java/io/netty/buffer/DrillBuf.java
@@ -663,6 +663,10 @@ public final class DrillBuf extends AbstractByteBuf {
     return new DrillBuf(allocator, a);
   }
 
+  public boolean isRootBuffer(){
+    return rootBuffer;
+  }
+
   public static DrillBuf wrapByteBuffer(ByteBuffer b){
     if(!b.isDirect()){
       throw new IllegalStateException("DrillBufs can only refer to direct 
memory.");

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/437366f0/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
index e007bcc..83b3d5a 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
@@ -27,6 +27,7 @@ import java.util.List;
 
 import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.metrics.DrillMetrics;
 import org.apache.drill.exec.proto.UserBitShared;
 import org.apache.drill.exec.proto.UserBitShared.SerializedField;
@@ -114,6 +115,9 @@ public class VectorAccessibleSerializable extends 
AbstractStreamSerializable {
       int dataLength = metaData.getBufferLength();
       MaterializedField field = MaterializedField.create(metaData);
       DrillBuf buf = allocator.buffer(dataLength);
+      if (buf == null) {
+        throw new IOException(new OutOfMemoryException());
+      }
       buf.writeBytes(input, dataLength);
       ValueVector vector = TypeHelper.getNewVector(field, allocator);
       vector.load(metaData, buf);
@@ -143,10 +147,10 @@ public class VectorAccessibleSerializable extends 
AbstractStreamSerializable {
     Preconditions.checkNotNull(output);
     final Timer.Context timerContext = metrics.timer(WRITER_TIMER).time();
 
-    ByteBuf[] incomingBuffers = batch.getBuffers();
+    DrillBuf[] incomingBuffers = batch.getBuffers();
     UserBitShared.RecordBatchDef batchDef = batch.getDef();
 
-        /* ByteBuf associated with the selection vector */
+    /* DrillBuf associated with the selection vector */
     DrillBuf svBuf = null;
     Integer svCount =  null;
 
@@ -171,7 +175,7 @@ public class VectorAccessibleSerializable extends 
AbstractStreamSerializable {
       }
 
             /* Dump the array of ByteBuf's associated with the value vectors */
-      for (ByteBuf buf : incomingBuffers)
+      for (DrillBuf buf : incomingBuffers)
       {
                 /* dump the buffer into the OutputStream */
         int bufLength = buf.readableBytes();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/437366f0/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
index 36c162a..2d39b5e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
@@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkState;
 import static 
org.apache.drill.exec.proto.UserProtos.QueryResultsMode.STREAM_FULL;
 import static org.apache.drill.exec.proto.UserProtos.RunQuery.newBuilder;
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.DrillBuf;
 
 import java.io.Closeable;
 import java.io.IOException;
@@ -338,7 +339,7 @@ public class DrillClient implements Closeable, 
ConnectionThrottle{
     }
 
     @Override
-    public ByteBuf getBuffer() {
+    public DrillBuf getBuffer() {
       return null;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/437366f0/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java
index 55f11a4..ae80f7b 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java
@@ -162,7 +162,7 @@ public class TopLevelAllocator implements BufferAllocator {
     public DrillBuf buffer(int size, int max) {
       if(size == 0) return empty;
       if(!childAcct.reserve(size)){
-        logger.warn("Unable to allocate buffer of size {} due to memory limit. 
Current allocation: {}", size, getAllocatedMemory());
+        logger.warn("Unable to allocate buffer of size {} due to memory limit. 
Current allocation: {}", size, getAllocatedMemory(), new Exception());
         return null;
       };
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/437366f0/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/ExternalSort.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/ExternalSort.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/ExternalSort.java
index 8af69fa..c196a96 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/ExternalSort.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/ExternalSort.java
@@ -32,6 +32,8 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
 public class ExternalSort extends Sort {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(ExternalSort.class);
 
+  private long initialAllocation = 20000000;
+
   @JsonCreator
   public ExternalSort(@JsonProperty("child") PhysicalOperator child, 
@JsonProperty("orderings") List<Ordering> orderings, @JsonProperty("reverse") 
boolean reverse) {
     super(child, orderings, reverse);
@@ -52,7 +54,9 @@ public class ExternalSort extends Sort {
 
   @Override
   protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
-    return new ExternalSort(child, orderings, reverse);
+    ExternalSort newSort = new ExternalSort(child, orderings, reverse);
+    newSort.setMaxAllocation(getMaxAllocation());
+    return newSort;
   }
 
   @Override
@@ -60,5 +64,12 @@ public class ExternalSort extends Sort {
     return CoreOperatorType.EXTERNAL_SORT_VALUE;
   }
 
+  public void setMaxAllocation(long maxAllocation) {
+    this.maxAllocation = Math.max(initialAllocation, maxAllocation);
+  }
+
+  public long getInitialAllocation() {
+    return initialAllocation;
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/437366f0/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index 6216305..e56d883 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -33,6 +33,7 @@ import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.expr.holders.NullableVarCharHolder;
+import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
@@ -60,9 +61,6 @@ import com.google.common.collect.Maps;
 public class ScanBatch implements RecordBatch {
   private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(ScanBatch.class);
 
-  private static final long ALLOCATOR_INITIAL_RESERVATION = 1*1024*1024;
-  private static final long ALLOCATOR_MAX_RESERVATION = 20L*1000*1000*1000;
-
   private static final int MAX_RECORD_CNT = Character.MAX_VALUE;
 
   private final Map<MaterializedField.Key, ValueVector> fieldVectorMap = 
Maps.newHashMap();
@@ -147,7 +145,15 @@ public class ScanBatch implements RecordBatch {
     long t1 = System.nanoTime();
     oContext.getStats().startProcessing();
     try {
-      mutator.allocate(MAX_RECORD_CNT);
+      try {
+        currentReader.allocate(fieldVectorMap);
+      } catch (OutOfMemoryException e) {
+        logger.debug("Caught OutOfMemoryException");
+        for (ValueVector v : fieldVectorMap.values()) {
+          v.clear();
+        }
+        return IterOutcome.OUT_OF_MEMORY;
+      }
       while ((recordCount = currentReader.next()) == 0) {
         try {
           if (!readers.hasNext()) {
@@ -169,7 +175,15 @@ public class ScanBatch implements RecordBatch {
             currentReader = readers.next();
             partitionValues = partitionColumns.hasNext() ? 
partitionColumns.next() : null;
             currentReader.setup(mutator);
-            mutator.allocate(MAX_RECORD_CNT);
+            try {
+              currentReader.allocate(fieldVectorMap);
+            } catch (OutOfMemoryException e) {
+              logger.debug("Caught OutOfMemoryException");
+              for (ValueVector v : fieldVectorMap.values()) {
+                v.clear();
+              }
+              return IterOutcome.OUT_OF_MEMORY;
+            }
             addPartitionVectors();
           } finally {
             oContext.getStats().stopSetup();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/437366f0/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
index c8929d1..734088e 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -80,7 +80,7 @@ public class ProjectRecordBatch extends 
AbstractSingleRecordBatch<Project>{
   private boolean hasRemainder = false;
   private int remainderIndex = 0;
   private int recordCount;
- 
+
   private static final String EMPTY_STRING = "";
   
   private class ClassifierResult { 
@@ -137,7 +137,10 @@ public class ProjectRecordBatch extends 
AbstractSingleRecordBatch<Project>{
 //    VectorUtil.showVectorAccessibleContent(incoming, ",");
     int incomingRecordCount = incoming.getRecordCount();
 
-    doAlloc();
+    if (!doAlloc()) {
+      outOfMemory = true;
+      return;
+    }
 
     int outputRecords = projector.projectRecords(0, incomingRecordCount, 0);
     if (outputRecords < incomingRecordCount) {
@@ -161,7 +164,10 @@ public class ProjectRecordBatch extends 
AbstractSingleRecordBatch<Project>{
 
   private void handleRemainder() {
     int remainingRecordCount = incoming.getRecordCount() - remainderIndex;
-    doAlloc();
+    if (!doAlloc()) {
+      outOfMemory = true;
+      return;
+    }
     int projRecords = projector.projectRecords(remainderIndex, 
remainingRecordCount, 0);
     if (projRecords < remainingRecordCount) {
       setValueCount(projRecords);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/437366f0/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java
index 5c2ab22..2370070 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.physical.impl.xsort;
 
 import java.io.IOException;
 import java.util.Iterator;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.cache.VectorAccessibleSerializable;
@@ -41,36 +42,23 @@ import com.google.common.base.Stopwatch;
 public class BatchGroup implements VectorAccessible {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(BatchGroup.class);
 
-  private final VectorContainer firstContainer;
-  private final VectorContainer secondContainer;
   private VectorContainer currentContainer;
   private SelectionVector2 sv2;
   private int pointer = 0;
-  private int batchPointer = 0;
-  private boolean hasSecond = false;
   private FSDataInputStream inputStream;
   private FSDataOutputStream outputStream;
   private Path path;
   private FileSystem fs;
   private BufferAllocator allocator;
   private int spilledBatches = 0;
-  private boolean done = false;
 
   public BatchGroup(VectorContainer container, SelectionVector2 sv2) {
-    this.firstContainer = container;
-    this.secondContainer = null;
     this.sv2 = sv2;
-    this.currentContainer = firstContainer;
+    this.currentContainer = container;
   }
 
-  public BatchGroup(VectorContainer firstContainer, VectorContainer 
secondContainer, FileSystem fs, String path, BufferAllocator allocator) {
-    assert firstContainer.getSchema().getSelectionVectorMode() == 
BatchSchema.SelectionVectorMode.NONE;
-    assert secondContainer.getSchema().getSelectionVectorMode() == 
BatchSchema.SelectionVectorMode.NONE;
-
-    this.firstContainer = firstContainer;
-    this.secondContainer = secondContainer;
-    currentContainer = firstContainer;
-    this.hasSecond = true;
+  public BatchGroup(VectorContainer container, FileSystem fs, String path, 
BufferAllocator allocator) {
+    currentContainer = container;
     this.fs = fs;
     this.path = new Path(path);
     this.allocator = allocator;
@@ -93,7 +81,7 @@ public class BatchGroup implements VectorAccessible {
     watch.start();
     outputBatch.writeToStream(outputStream);
     newContainer.zeroVectors();
-//    logger.debug("Took {} us to spill {} records", 
watch.elapsed(TimeUnit.MICROSECONDS), recordCount);
+    logger.debug("Took {} us to spill {} records", 
watch.elapsed(TimeUnit.MICROSECONDS), recordCount);
     spilledBatches++;
   }
 
@@ -107,132 +95,51 @@ public class BatchGroup implements VectorAccessible {
     Stopwatch watch = new Stopwatch();
     watch.start();
     vas.readFromStream(inputStream);
-    VectorContainer c = (VectorContainer) vas.get();
+    VectorContainer c =  vas.get();
 //    logger.debug("Took {} us to read {} records", 
watch.elapsed(TimeUnit.MICROSECONDS), c.getRecordCount());
     spilledBatches--;
+    currentContainer.zeroVectors();
+    Iterator<VectorWrapper<?>> wrapperIterator = c.iterator();
+    for (VectorWrapper w : currentContainer) {
+      TransferPair pair = 
wrapperIterator.next().getValueVector().makeTransferPair(w.getValueVector());
+      pair.transfer();
+    }
+    currentContainer.setRecordCount(c.getRecordCount());
+    c.zeroVectors();
     return c;
   }
 
   public int getNextIndex() {
-    if (pointer == currentContainer.getRecordCount()) {
-      if (!hasSecond || (batchPointer == 1 && spilledBatches == 0)) {
+    int val;
+    if (pointer == getRecordCount()) {
+      if (spilledBatches == 0) {
         return -1;
-      } else if (batchPointer == 1 && spilledBatches > 0) {
-        return -2;
       }
-      batchPointer++;
-      currentContainer = secondContainer;
-      pointer = 0;
+      try {
+        currentContainer.zeroVectors();
+        getBatch();
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      pointer = 1;
+      return 0;
     }
     if (sv2 == null) {
-      int val = pointer;
+      val = pointer;
       pointer++;
       assert val < currentContainer.getRecordCount();
-      return val;
     } else {
-      int val = pointer;
+      val = pointer;
       pointer++;
       assert val < currentContainer.getRecordCount();
-      return sv2.getIndex(val);
-    }
-  }
-
-  public VectorContainer getFirstContainer() {
-    return firstContainer;
-  }
-
-  public VectorContainer getSecondContainer() {
-    return secondContainer;
-  }
-
-  public boolean hasSecond() {
-    return hasSecond;
-  }
-
-  public void rotate() {
-    if (batchPointer == 0) {
-      return;
-    }
-    if (pointer == secondContainer.getRecordCount()) {
-      try {
-        getTwoBatches();
-        return;
-      } catch (IOException e) {
-        throw new RuntimeException(e);
-      }
-    }
-    firstContainer.zeroVectors();
-    Iterator<VectorWrapper<?>> wrapperIterator = secondContainer.iterator();
-    for (VectorWrapper w : firstContainer) {
-      TransferPair pair = 
wrapperIterator.next().getValueVector().makeTransferPair(w.getValueVector());
-      pair.transfer();
+      val = sv2.getIndex(val);
     }
-    firstContainer.setRecordCount(secondContainer.getRecordCount());
 
-    if (spilledBatches > 0) {
-      VectorContainer c = null;
-      try {
-        c = getBatch();
-      } catch (IOException e) {
-        throw new RuntimeException(e);
-      }
-      secondContainer.zeroVectors();
-      wrapperIterator = c.iterator();
-      for (VectorWrapper w : secondContainer) {
-        TransferPair pair = 
wrapperIterator.next().getValueVector().makeTransferPair(w.getValueVector());
-        pair.transfer();
-      }
-      secondContainer.setRecordCount(c.getRecordCount());
-      c.zeroVectors();
-    } else {
-      secondContainer.zeroVectors();
-      hasSecond = false;
-    }
-    batchPointer = 0;
-    currentContainer = firstContainer;
+    return val;
   }
 
-  private void getTwoBatches() throws IOException {
-    firstContainer.zeroVectors();
-    if (spilledBatches > 0) {
-      VectorContainer c = getBatch();
-      Iterator<VectorWrapper<?>> wrapperIterator = c.iterator();
-      for (VectorWrapper w : firstContainer) {
-        TransferPair pair = 
wrapperIterator.next().getValueVector().makeTransferPair(w.getValueVector());
-        pair.transfer();
-      }
-      firstContainer.setRecordCount(c.getRecordCount());
-      c.zeroVectors();
-    } else {
-      batchPointer = -1;
-      pointer = -1;
-      firstContainer.zeroVectors();
-      secondContainer.zeroVectors();
-    }
-    if (spilledBatches > 0) {
-      VectorContainer c = getBatch();
-      Iterator<VectorWrapper<?>> wrapperIterator = c.iterator();
-      for (VectorWrapper w : secondContainer) {
-        TransferPair pair = 
wrapperIterator.next().getValueVector().makeTransferPair(w.getValueVector());
-        pair.transfer();
-      }
-      secondContainer.setRecordCount(c.getRecordCount());
-      c.zeroVectors();
-    } else {
-      secondContainer.zeroVectors();
-      hasSecond = false;
-    }
-    batchPointer = 0;
-    currentContainer = firstContainer;
-    pointer = 0;
-//    BatchPrinter.printBatch(firstContainer);
-//    BatchPrinter.printBatch(secondContainer);
-    return;
-  }
-
-  public int getBatchPointer() {
-    assert batchPointer < 2;
-    return batchPointer;
+  public VectorContainer getContainer() {
+    return currentContainer;
   }
 
   public void cleanup() throws IOException {
@@ -263,7 +170,11 @@ public class BatchGroup implements VectorAccessible {
 
   @Override
   public int getRecordCount() {
-    return currentContainer.getRecordCount();
+    if (sv2 != null) {
+      return sv2.getCount();
+    } else {
+      return currentContainer.getRecordCount();
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/437366f0/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
index d4b1001..aa65272 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
@@ -17,6 +17,8 @@
  */
 package org.apache.drill.exec.physical.impl.xsort;
 
+import io.netty.buffer.DrillBuf;
+
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -93,10 +95,10 @@ public class ExternalSortBatch extends 
AbstractRecordBatch<ExternalSort> {
   private SingleBatchSorter sorter;
   private SortRecordBatchBuilder builder;
   private MSorter mSorter;
-  private PriorityQueueSelector selector;
   private PriorityQueueCopier copier;
   private BufferAllocator copierAllocator;
   private LinkedList<BatchGroup> batchGroups = Lists.newLinkedList();
+  private LinkedList<BatchGroup> spilledBatchGroups = Lists.newLinkedList();
   private SelectionVector4 sv4;
   private FileSystem fs;
   private int spillCount = 0;
@@ -104,6 +106,9 @@ public class ExternalSortBatch extends 
AbstractRecordBatch<ExternalSort> {
   private long uid;//used for spill files to ensure multiple sorts within same 
fragment don't clobber each others' files
   private boolean useIncomingSchema = false;
   private boolean first = true;
+  private long totalSizeInMemory = 0;
+  private long highWaterMark = Long.MAX_VALUE;
+  private int targetRecordCount;
 
   public ExternalSortBatch(ExternalSort popConfig, FragmentContext context, 
RecordBatch incoming) throws OutOfMemoryException {
     super(popConfig, context);
@@ -128,7 +133,11 @@ public class ExternalSortBatch extends 
AbstractRecordBatch<ExternalSort> {
 
   @Override
   public int getRecordCount() {
-    return sv4.getCount();
+    if (sv4 != null) {
+      return sv4.getCount();
+    } else {
+      return container.getRecordCount();
+    }
   }
 
   @Override
@@ -195,21 +204,22 @@ public class ExternalSortBatch extends 
AbstractRecordBatch<ExternalSort> {
       } else {
         Stopwatch w = new Stopwatch();
         w.start();
-        int count = selector.next();
+//        int count = selector.next();
+        int count = copier.next(targetRecordCount);
         if(count > 0){
           long t = w.elapsed(TimeUnit.MICROSECONDS);
           logger.debug("Took {} us to merge {} records", t, count);
           container.setRecordCount(count);
           return IterOutcome.OK;
         }else{
-          logger.debug("selector returned 0 records");
+          logger.debug("copier returned 0 records");
           return IterOutcome.NONE;
         }
       }
     }
 
     long totalcount = 0;
-    
+
     try{
       outer: while (true) {
         Stopwatch watch = new Stopwatch();
@@ -234,17 +244,21 @@ public class ExternalSortBatch extends 
AbstractRecordBatch<ExternalSort> {
           }
           // fall through.
         case OK:
+          totalSizeInMemory += getBufferSize(incoming);
           SelectionVector2 sv2;
 //          if (incoming.getRecordCount() == 0) {
 //            break outer;
 //          }
           if (incoming.getSchema().getSelectionVectorMode() == 
BatchSchema.SelectionVectorMode.TWO_BYTE) {
             sv2 = incoming.getSelectionVector2();
+            if (sv2.getBuffer(false).isRootBuffer()) {
+              oContext.getAllocator().takeOwnership(sv2.getBuffer(false));
+            }
           } else {
             try {
               sv2 = newSV2();
             } catch (OutOfMemoryException e) {
-              throw new RuntimeException();
+              throw new RuntimeException(e);
             }
           }
           int count = sv2.getCount();
@@ -263,7 +277,9 @@ public class ExternalSortBatch extends 
AbstractRecordBatch<ExternalSort> {
           }
           batchGroups.add(new BatchGroup(rbd.getContainer(), rbd.getSv2()));
           batchesSinceLastSpill++;
-          if (batchGroups.size() > SPILL_THRESHOLD && batchesSinceLastSpill >= 
SPILL_BATCH_GROUP_SIZE) {
+          if ((spillCount > 0 && totalSizeInMemory > .75 * highWaterMark) ||
+                  (totalSizeInMemory > .95 * popConfig.getMaxAllocation()) ||
+                  (batchGroups.size() > SPILL_THRESHOLD && 
batchesSinceLastSpill >= SPILL_BATCH_GROUP_SIZE)) {
             mergeAndSpill();
             batchesSinceLastSpill = 0;
           }
@@ -271,6 +287,7 @@ public class ExternalSortBatch extends 
AbstractRecordBatch<ExternalSort> {
 //          logger.debug("Took {} us to sort {} records", t, count);
           break;
         case OUT_OF_MEMORY:
+          highWaterMark = totalSizeInMemory;
           if (batchesSinceLastSpill > 2) mergeAndSpill();
           batchesSinceLastSpill = 0;
           break;
@@ -279,25 +296,14 @@ public class ExternalSortBatch extends 
AbstractRecordBatch<ExternalSort> {
         }
       }
 
-//      if (schema == null || totalcount == 0){
-//        builder may be null at this point if the first incoming batch is 
empty
-//        useIncomingSchema = true;
-//        return IterOutcome.NONE;
-//      }
-
       if (spillCount == 0) {
         Stopwatch watch = new Stopwatch();
         watch.start();
-//        if (schema == null){
-          // builder may be null at this point if the first incoming batch is 
empty
-//          useIncomingSchema = true;
-//          return IterOutcome.NONE;
-//        }
 
         builder = new SortRecordBatchBuilder(oContext.getAllocator(), 
MAX_SORT_BYTES);
 
         for (BatchGroup group : batchGroups) {
-          RecordBatchData rbd = new RecordBatchData(group.getFirstContainer());
+          RecordBatchData rbd = new RecordBatchData(group.getContainer());
           rbd.setSv2(group.getSv2());
           builder.add(rbd);
         }
@@ -311,16 +317,32 @@ public class ExternalSortBatch extends 
AbstractRecordBatch<ExternalSort> {
         sv4 = mSorter.getSV4();
 
         long t = watch.elapsed(TimeUnit.MICROSECONDS);
-        logger.debug("Took {} us to sort {} records", t, sv4.getTotalCount());
+//        logger.debug("Took {} us to sort {} records", t, 
sv4.getTotalCount());
+        container.buildSchema(SelectionVectorMode.FOUR_BYTE);
       } else {
-        constructHyperBatch(batchGroups, this.container);
-        constructSV4();
-        selector = createSelector();
-        selector.setup(context, oContext.getAllocator(), this, sv4, 
batchGroups);
-        selector.next();
+        mergeAndSpill();
+        batchGroups.addAll(spilledBatchGroups);
+        logger.warn("Starting to merge. {} batch groups. Current allocated 
memory: {}", batchGroups.size(), oContext.getAllocator().getAllocatedMemory());
+        VectorContainer hyperBatch = constructHyperBatch(batchGroups);
+        createCopier(hyperBatch, batchGroups, container);
+        int inMemoryRecordCount = 0;
+        for (BatchGroup g : batchGroups) {
+          inMemoryRecordCount += g.getRecordCount();
+        }
+        int estimatedRecordSize = 0;
+        for (VectorWrapper w : batchGroups.get(0)) {
+          try {
+            estimatedRecordSize += TypeHelper.getSize(w.getField().getType());
+          } catch (UnsupportedOperationException e) {
+            estimatedRecordSize += 50;
+          }
+        }
+        targetRecordCount = (int) Math.max(1, 250 * 1000 / 
estimatedRecordSize);
+        int count = copier.next(targetRecordCount);
+        container.buildSchema(SelectionVectorMode.NONE);
+        container.setRecordCount(count);
       }
 
-      container.buildSchema(BatchSchema.SelectionVectorMode.FOUR_BYTE);
       return IterOutcome.OK_NEW_SCHEMA;
 
     }catch(SchemaChangeException | ClassTransformationException | IOException 
ex){
@@ -329,67 +351,93 @@ public class ExternalSortBatch extends 
AbstractRecordBatch<ExternalSort> {
       context.fail(ex);
       return IterOutcome.STOP;
     } catch (UnsupportedOperationException e) {
-      logger.error(e.getMessage());
       throw new RuntimeException(e);
     }
   }
 
   public void mergeAndSpill() throws SchemaChangeException {
     logger.debug("Copier allocator current allocation {}", 
copierAllocator.getAllocatedMemory());
-    VectorContainer hyperBatch = new VectorContainer();
     VectorContainer outputContainer = new VectorContainer();
     List<BatchGroup> batchGroupList = Lists.newArrayList();
-    int recordCount = 0;
-    for (int i = 0; i < SPILL_BATCH_GROUP_SIZE; i++) {
+    int batchCount = batchGroups.size();
+    for (int i = 0; i < batchCount / 2; i++) {
       if (batchGroups.size() == 0) {
         break;
       }
-      if (batchGroups.peekLast().getSecondContainer() != null) {
+      if (batchGroups.peekLast().getSv2() == null) {
         break;
       }
       BatchGroup batch = batchGroups.pollLast();
-      recordCount += batch.getSv2().getCount();
       batchGroupList.add(batch);
+      long bufferSize = getBufferSize(batch);
+      totalSizeInMemory -= bufferSize;
     }
     if (batchGroupList.size() == 0) {
       return;
     }
-    constructHyperBatch(batchGroupList, hyperBatch);
-    createCopier(hyperBatch, batchGroupList, outputContainer, recordCount);
+    int estimatedRecordSize = 0;
+    for (VectorWrapper w : batchGroups.get(0)) {
+      try {
+        estimatedRecordSize += TypeHelper.getSize(w.getField().getType());
+      } catch (UnsupportedOperationException e) {
+        estimatedRecordSize += 50;
+      }
+    }
+    int targetRecordCount = Math.max(1, 250 * 1000 / estimatedRecordSize);
+    VectorContainer hyperBatch = constructHyperBatch(batchGroupList);
+    createCopier(hyperBatch, batchGroupList, outputContainer);
 
-    int count = copier.next();
+    int count = copier.next(targetRecordCount);
     assert count > 0;
 
     VectorContainer c1 = VectorContainer.getTransferClone(outputContainer);
     c1.buildSchema(BatchSchema.SelectionVectorMode.NONE);
     c1.setRecordCount(count);
 
-    count = copier.next();
-    assert count > 0;
-
-
-    VectorContainer c2 = VectorContainer.getTransferClone(outputContainer);
-    c2.buildSchema(BatchSchema.SelectionVectorMode.NONE);
-    c2.setRecordCount(count);
-
     String outputFile = 
String.format(Utilities.getFileNameForQueryFragment(context, dirs.next(), 
"spill" + uid + "_" + spillCount++));
-    BatchGroup newGroup = new BatchGroup(c1, c2, fs, outputFile, 
oContext.getAllocator());
+    BatchGroup newGroup = new BatchGroup(c1, fs, outputFile, 
oContext.getAllocator());
 
     try {
-      while ((count = copier.next()) > 0) {
+      while ((count = copier.next(targetRecordCount)) > 0) {
         outputContainer.buildSchema(BatchSchema.SelectionVectorMode.NONE);
         outputContainer.setRecordCount(count);
         newGroup.addBatch(outputContainer);
       }
       newGroup.closeOutputStream();
-      batchGroups.add(newGroup);
+      spilledBatchGroups.add(newGroup);
       for (BatchGroup group : batchGroupList) {
-          group.cleanup();
+        group.cleanup();
       }
       hyperBatch.clear();
     } catch (IOException e) {
-        throw new RuntimeException(e);
+      throw new RuntimeException(e);
+    }
+    takeOwnership(c1);
+    totalSizeInMemory += getBufferSize(c1);
+  }
+
+  private void takeOwnership(VectorAccessible batch) {
+    for (VectorWrapper w : batch) {
+      DrillBuf[] bufs = w.getValueVector().getBuffers(false);
+      for (DrillBuf buf : bufs) {
+        if (buf.isRootBuffer()) {
+          oContext.getAllocator().takeOwnership(buf);
+        }
       }
+    }
+  }
+
+  private long getBufferSize(VectorAccessible batch) {
+    long size = 0;
+    for (VectorWrapper w : batch) {
+      DrillBuf[] bufs = w.getValueVector().getBuffers(false);
+      for (DrillBuf buf : bufs) {
+        if (buf.isRootBuffer()) {
+          size += buf.capacity();
+        }
+      }
+    }
+    return size;
   }
 
   private SelectionVector2 newSV2() throws OutOfMemoryException {
@@ -401,8 +449,20 @@ public class ExternalSortBatch extends 
AbstractRecordBatch<ExternalSort> {
         throw new RuntimeException();
       }
       batchesSinceLastSpill = 0;
-      if (!sv2.allocateNew(incoming.getRecordCount())) {
-        throw new OutOfMemoryException();
+      int waitTime = 1;
+      while (true) {
+        try {
+          Thread.sleep(waitTime * 1000);
+        } catch (InterruptedException e) {
+          throw new OutOfMemoryException(e);
+        }
+        waitTime *= 2;
+        if (sv2.allocateNew(incoming.getRecordCount())) {
+          break;
+        }
+        if (waitTime >= 32) {
+          throw new OutOfMemoryException("Unable to allocate sv2 buffer after 
repeated attempts");
+        }
       }
     }
     for (int i = 0; i < incoming.getRecordCount(); i++) {
@@ -412,35 +472,21 @@ public class ExternalSortBatch extends 
AbstractRecordBatch<ExternalSort> {
     return sv2;
   }
 
-  private void constructHyperBatch(List<BatchGroup> batchGroupList, 
VectorContainer cont) {
+  private VectorContainer constructHyperBatch(List<BatchGroup> batchGroupList) 
{
+    VectorContainer cont = new VectorContainer();
     for (MaterializedField field : schema) {
-      ValueVector[] vectors = new ValueVector[batchGroupList.size() * 2];
+      ValueVector[] vectors = new ValueVector[batchGroupList.size()];
       int i = 0;
       for (BatchGroup group : batchGroupList) {
         vectors[i++] = group.getValueAccessorById(
             field.getValueClass(),
-            group.getValueVectorId(field.getPath()).getFieldIds()
-                ).getValueVector();
-        if (group.hasSecond()) {
-          VectorContainer c = group.getSecondContainer();
-          vectors[i++] = c.getValueAccessorById(
-              field.getValueClass(),
-              c.getValueVectorId(field.getPath()).getFieldIds()
-                  ).getValueVector();
-        } else {
-          vectors[i] = vectors[i - 1].getTransferPair().getTo(); //this vector 
should never be used. Just want to avoid having null elements in the hyper 
vector
-          i++;
-        }
+            group.getValueVectorId(field.getPath()).getFieldIds())
+            .getValueVector();
       }
       cont.add(vectors);
     }
     cont.buildSchema(BatchSchema.SelectionVectorMode.FOUR_BYTE);
-  }
-
-  private void constructSV4() throws SchemaChangeException {
-    BufferAllocator.PreAllocator preAlloc = 
oContext.getAllocator().getNewPreAllocator();
-    preAlloc.preAllocate(4 * TARGET_RECORD_COUNT);
-    sv4 = new SelectionVector4(preAlloc.getAllocation(), TARGET_RECORD_COUNT, 
TARGET_RECORD_COUNT);
+    return cont;
   }
 
   private MSorter createNewMSorter() throws ClassTransformationException, 
IOException, SchemaChangeException {
@@ -526,23 +572,7 @@ public class ExternalSortBatch extends 
AbstractRecordBatch<ExternalSort> {
     g.getEvalBlock()._return(JExpr.lit(0));
   }
 
-  private PriorityQueueSelector createSelector() throws SchemaChangeException {
-    CodeGenerator<PriorityQueueSelector> cg = 
CodeGenerator.get(PriorityQueueSelector.TEMPLATE_DEFINITION, 
context.getFunctionRegistry());
-    ClassGenerator<PriorityQueueSelector> g = cg.getRoot();
-
-    generateComparisons(g, this);
-
-    try {
-      PriorityQueueSelector c = context.getImplementationClass(cg);
-      return c;
-    } catch (ClassTransformationException e) {
-      throw new RuntimeException(e);
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  private void createCopier(VectorAccessible batch, List<BatchGroup> 
batchGroupList, VectorContainer outputContainer, int recordCount) throws 
SchemaChangeException {
+  private void createCopier(VectorAccessible batch, List<BatchGroup> 
batchGroupList, VectorContainer outputContainer) throws SchemaChangeException {
     try {
       if (copier == null) {
         CodeGenerator<PriorityQueueCopier> cg = 
CodeGenerator.get(PriorityQueueCopier.TEMPLATE_DEFINITION, 
context.getFunctionRegistry());
@@ -562,7 +592,7 @@ public class ExternalSortBatch extends 
AbstractRecordBatch<ExternalSort> {
         outputContainer.add(v);
         allocators.add(VectorAllocator.getAllocator(v, 110));
       }
-      copier.setup(context, copierAllocator, batch, batchGroupList, 
outputContainer, allocators, recordCount);
+      copier.setup(context, copierAllocator, batch, batchGroupList, 
outputContainer, allocators);
     } catch (ClassTransformationException e) {
       throw new RuntimeException(e);
     } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/437366f0/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopier.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopier.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopier.java
index f2da717..0eda0a6 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopier.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopier.java
@@ -32,8 +32,9 @@ import org.apache.drill.exec.vector.allocator.VectorAllocator;
 import java.util.List;
 
 public interface PriorityQueueCopier {
-  public void setup(FragmentContext context, BufferAllocator allocator, 
VectorAccessible hyperBatch, List<BatchGroup> batchGroups, VectorAccessible 
outgoing, List<VectorAllocator> allocators, int recordCnt) throws 
SchemaChangeException;
-  public int next();
+  public void setup(FragmentContext context, BufferAllocator allocator, 
VectorAccessible hyperBatch, List<BatchGroup> batchGroups,
+                    VectorAccessible outgoing, List<VectorAllocator> 
allocators) throws SchemaChangeException;
+  public int next(int targetRecordCount);
   public List<VectorAllocator> getAllocators();
   public void cleanup();
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/437366f0/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java
index 6e9c355..b1c316c 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java
@@ -44,7 +44,8 @@ public abstract class PriorityQueueCopierTemplate implements 
PriorityQueueCopier
   private int targetRecordCount = ExternalSortBatch.SPILL_TARGET_RECORD_COUNT;
 
   @Override
-  public void setup(FragmentContext context, BufferAllocator allocator, 
VectorAccessible hyperBatch, List<BatchGroup> batchGroups, VectorAccessible 
outgoing, List<VectorAllocator> allocators, int recordCnt) throws 
SchemaChangeException {
+  public void setup(FragmentContext context, BufferAllocator allocator, 
VectorAccessible hyperBatch, List<BatchGroup> batchGroups,
+                    VectorAccessible outgoing, List<VectorAllocator> 
allocators) throws SchemaChangeException {
     this.context = context;
     this.allocator = allocator;
     this.hyperBatch = hyperBatch;
@@ -60,20 +61,15 @@ public abstract class PriorityQueueCopierTemplate 
implements PriorityQueueCopier
 
     queueSize = 0;
     for (int i = 0; i < size; i++) {
-      vector4.set(i, i * 2, batchGroups.get(i).getNextIndex());
+      vector4.set(i, i, batchGroups.get(i).getNextIndex());
       siftUp();
       queueSize++;
     }
-
-    // Check if the we have enough records to create BatchData with two 
containers.
-    if (recordCnt < (2 * targetRecordCount)) {
-      targetRecordCount = (recordCnt / 2);
-    }
   }
 
   @Override
-  public int next() {
-    allocateVectors();
+  public int next(int targetRecordCount) {
+    allocateVectors(targetRecordCount);
     for (int outgoingIndex = 0; outgoingIndex < targetRecordCount; 
outgoingIndex++) {
       if (queueSize == 0) {
         cleanup();
@@ -81,15 +77,12 @@ public abstract class PriorityQueueCopierTemplate 
implements PriorityQueueCopier
       }
       int compoundIndex = vector4.get(0);
       int batch = compoundIndex >>> 16;
-      assert batch < batchGroups.size() * 2 : String.format("batch: %d 
batchGroups: %d", batch, batchGroups.size());
-      int batchGroup = batch / 2;
+      assert batch < batchGroups.size() : String.format("batch: %d 
batchGroups: %d", batch, batchGroups.size());
       if (!doCopy(compoundIndex, outgoingIndex)) {
         setValueCount(outgoingIndex);
         return outgoingIndex;
       }
-      int nextIndex = batchGroups.get(batchGroup).getNextIndex();
-      batch = batch & 0xFFFE;
-      batch += batchGroups.get(batchGroup).getBatchPointer();
+      int nextIndex = batchGroups.get(batch).getNextIndex();
       if (nextIndex < 0) {
         vector4.set(0, vector4.get(--queueSize));
       } else {
@@ -117,6 +110,9 @@ public abstract class PriorityQueueCopierTemplate 
implements PriorityQueueCopier
     for (VectorWrapper w: outgoing) {
       w.getValueVector().clear();
     }
+    for (VectorWrapper w : hyperBatch) {
+      w.clear();
+    }
   }
 
   @Override
@@ -136,7 +132,7 @@ public abstract class PriorityQueueCopierTemplate 
implements PriorityQueueCopier
     }
   }
 
-  private void allocateVectors() {
+  private void allocateVectors(int targetRecordCount) {
     for(VectorAllocator a : allocators){
       a.alloc(targetRecordCount);
     }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/437366f0/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueSelector.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueSelector.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueSelector.java
deleted file mode 100644
index 786667a..0000000
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueSelector.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.physical.impl.xsort;
-
-import org.apache.drill.exec.compile.TemplateClassDefinition;
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.record.VectorAccessible;
-import org.apache.drill.exec.record.selection.SelectionVector4;
-import org.apache.drill.exec.vector.allocator.VectorAllocator;
-
-import java.util.List;
-
-public interface PriorityQueueSelector {
-  public void setup(FragmentContext context, BufferAllocator allocator, 
VectorAccessible hyperBatch, SelectionVector4 sv4, List<BatchGroup> 
batchGroups) throws SchemaChangeException;
-  public int next();
-  public void cleanup();
-
-  public static TemplateClassDefinition<PriorityQueueSelector> 
TEMPLATE_DEFINITION = new 
TemplateClassDefinition<PriorityQueueSelector>(PriorityQueueSelector.class, 
PriorityQueueSelectorTemplate.class);
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/437366f0/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueSelectorTemplate.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueSelectorTemplate.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueSelectorTemplate.java
deleted file mode 100644
index 65a072b..0000000
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueSelectorTemplate.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.physical.impl.xsort;
-
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.record.VectorAccessible;
-import org.apache.drill.exec.record.selection.SelectionVector4;
-import org.apache.drill.exec.vector.BigIntVector;
-import org.apache.drill.exec.vector.allocator.VectorAllocator;
-
-import javax.inject.Named;
-import java.util.List;
-
-public abstract class PriorityQueueSelectorTemplate implements 
PriorityQueueSelector {
-  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(PriorityQueueSelectorTemplate.class);
-
-  private SelectionVector4 sv4;
-  private SelectionVector4 vector4;
-  private List<BatchGroup> batchGroups;
-  private FragmentContext context;
-  private BufferAllocator allocator;
-  private int size;
-  private int queueSize = 0;
-  private int targetRecordCount = ExternalSortBatch.TARGET_RECORD_COUNT;
-  private VectorAccessible hyperBatch;
-
-  @Override
-  public void setup(FragmentContext context, BufferAllocator allocator, 
VectorAccessible hyperBatch, SelectionVector4 sv4, List<BatchGroup> 
batchGroups) throws SchemaChangeException {
-    this.context = context;
-    this.allocator = allocator;
-    this.sv4 = sv4;
-    this.batchGroups = batchGroups;
-    this.size = batchGroups.size();
-    this.hyperBatch = hyperBatch;
-
-    BufferAllocator.PreAllocator preAlloc = allocator.getNewPreAllocator();
-    preAlloc.preAllocate(4 * size);
-    vector4 = new SelectionVector4(preAlloc.getAllocation(), size, 
Character.MAX_VALUE);
-    doSetup(context, hyperBatch, null);
-
-    for (int i = 0; i < size; i++, queueSize++) {
-      vector4.set(i, i * 2, batchGroups.get(i).getNextIndex());
-      siftUp();
-    }
-  }
-
-  @Override
-  public int next() {
-    if (queueSize == 0) {
-      cleanup();
-      return 0;
-    }
-    rotate();
-    for (int outgoingIndex = 0; outgoingIndex < targetRecordCount; 
outgoingIndex++) {
-      int compoundIndex = vector4.get(0);
-      int batch = compoundIndex >> 16;
-      int batchGroup = batch / 2; //two containers per batch group
-      sv4.set(outgoingIndex, compoundIndex);
-      int nextIndex = batchGroups.get(batchGroup).getNextIndex();
-      batch = batchGroup * 2 + batchGroups.get(batchGroup).getBatchPointer(); 
// increment batch pointer if batchGroup is currently using second container
-      if (nextIndex == -1) {
-        vector4.set(0, vector4.get(--queueSize));
-      } else if (nextIndex == -2) {
-        vector4.set(0, batch - 1, 0);
-        sv4.setCount(outgoingIndex);
-        assert outgoingIndex != 0;
-        return outgoingIndex;
-      } else {
-        vector4.set(0, batch, nextIndex);
-      }
-      if (queueSize == 0) {
-        sv4.setCount(++outgoingIndex);
-        return outgoingIndex;
-      }
-      siftDown();
-    }
-    sv4.setCount(targetRecordCount);
-    return targetRecordCount;
-  }
-
-  private void rotate() {
-    for (BatchGroup group : batchGroups) {
-      group.rotate();
-    }
-    for (int i = 0; i < vector4.getTotalCount(); i++) {
-      vector4.set(i, vector4.get(i) & 0xFFFEFFFF);
-    }
-  }
-
-  @Override
-  public void cleanup() {
-    vector4.clear();
-  }
-
-  private final void siftUp() {
-    int p = queueSize;
-    while (p > 0) {
-      if (compare(p, (p - 1) / 2) < 0) {
-        swap(p, (p - 1) / 2);
-        p = (p - 1) / 2;
-      } else {
-        break;
-      }
-    }
-  }
-
-  private final void siftDown() {
-    int p = 0;
-    int next;
-    while (p * 2 + 1 < queueSize) {
-      if (p * 2 + 2 >= queueSize) {
-        next = p * 2 + 1;
-      } else {
-        if (compare(p * 2 + 1, p * 2 + 2) <= 0) {
-          next = p * 2 + 1;
-        } else {
-          next = p * 2 + 2;
-        }
-      }
-      if (compare(p, next) > 0) {
-        swap(p, next);
-        p = next;
-      } else {
-        break;
-      }
-    }
-  }
-
-
-  public final void swap(int sv0, int sv1) {
-    int tmp = vector4.get(sv0);
-    vector4.set(sv0, vector4.get(sv1));
-    vector4.set(sv1, tmp);
-  }
-  
-  public final int compare(int leftIndex, int rightIndex) {
-    int sv1 = vector4.get(leftIndex);
-    int sv2 = vector4.get(rightIndex);
-    return doEval(sv1, sv2);
-  }
-
-  public abstract void doSetup(@Named("context") FragmentContext context, 
@Named("incoming") VectorAccessible incoming, @Named("outgoing") 
VectorAccessible outgoing);
-  public abstract int doEval(@Named("leftIndex") int leftIndex, 
@Named("rightIndex") int rightIndex);
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/437366f0/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
index 3810115..b9690a6 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
@@ -143,7 +143,7 @@ public class WritableBatch {
         continue;
       }
 
-      for (DrillBuf b : vv.getBuffers()) {
+      for (DrillBuf b : vv.getBuffers(true)) {
         buffers.add(b);
       }
       // remove vv access to buffers.

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/437366f0/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java
index 038bb2f..1043011 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java
@@ -45,10 +45,14 @@ public class SelectionVector2 implements Closeable{
     return recordCount;
   }
 
-  public DrillBuf getBuffer()
-  {
-      DrillBuf bufferHandle = this.buffer;
+  public DrillBuf getBuffer() {
+    return getBuffer(true);
+  }
 
+  public DrillBuf getBuffer(boolean clear) {
+    DrillBuf bufferHandle = this.buffer;
+
+    if (clear) {
       /* Increment the ref count for this buffer */
       bufferHandle.retain();
 
@@ -56,8 +60,9 @@ public class SelectionVector2 implements Closeable{
        * caller. clear the buffer from within our selection vector
        */
       clear();
+    }
 
-      return bufferHandle;
+    return bufferHandle;
   }
 
   public void setBuffer(DrillBuf bufferHandle)

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/437366f0/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java
index 4cc06c8..209ec8f 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java
@@ -18,11 +18,15 @@
 package org.apache.drill.exec.store;
 
 import java.util.Collection;
+import java.util.Map;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Predicate;
 import com.google.common.collect.Iterables;
 import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.memory.OutOfMemoryException;
+import org.apache.drill.exec.record.MaterializedField.Key;
+import org.apache.drill.exec.vector.ValueVector;
 
 public abstract class AbstractRecordReader implements RecordReader {
   private static final String COL_NULL_ERROR = "Columns cannot be null. Use 
star column to select all fields.";
@@ -59,4 +63,10 @@ public abstract class AbstractRecordReader implements 
RecordReader {
     }).isPresent();
   }
 
+  @Override
+  public void allocate(Map<Key, ValueVector> vectorMap) throws 
OutOfMemoryException {
+    for (ValueVector v : vectorMap.values()) {
+      v.allocateNew();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/437366f0/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java
index 1745421..42cdcc3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java
@@ -19,7 +19,13 @@ package org.apache.drill.exec.store;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.MaterializedField.Key;
+import org.apache.drill.exec.vector.ValueVector;
+
+import java.util.Map;
 
 public interface RecordReader {
 
@@ -36,6 +42,8 @@ public interface RecordReader {
    */
   public abstract void setup(OutputMutator output) throws 
ExecutionSetupException;
 
+  public abstract void allocate(Map<Key, ValueVector> vectorMap) throws 
OutOfMemoryException;
+
   /**
    * Set the operator context. The Reader can use this to access the operator 
context and allocate direct memory
    * if needed

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/437366f0/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader2.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader2.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader2.java
index ee78c39..2bd9df5 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader2.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader2.java
@@ -31,6 +31,7 @@ import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.store.AbstractRecordReader;
 import org.apache.drill.exec.store.RecordReader;
 import org.apache.drill.exec.vector.BaseValueVector;
 import org.apache.drill.exec.vector.complex.fn.JsonReader;
@@ -45,7 +46,7 @@ import org.apache.hadoop.fs.Path;
 
 import com.google.common.collect.Lists;
 
-public class JSONRecordReader2 implements RecordReader{
+public class JSONRecordReader2 extends AbstractRecordReader {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(JSONRecordReader2.class);
 
   private OutputMutator mutator;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/437366f0/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java
index 81505ae..0714ab8 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java
@@ -30,6 +30,8 @@ import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.impl.OutputMutator;
 import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.MaterializedField.Key;
+import org.apache.drill.exec.store.AbstractRecordReader;
 import org.apache.drill.exec.store.RecordReader;
 import org.apache.drill.exec.store.mock.MockGroupScanPOP.MockColumn;
 import org.apache.drill.exec.store.mock.MockGroupScanPOP.MockScanEntry;
@@ -37,8 +39,9 @@ import org.apache.drill.exec.vector.AllocationHelper;
 import org.apache.drill.exec.vector.ValueVector;
 
 import java.util.List;
+import java.util.Map;
 
-public class MockRecordReader implements RecordReader {
+public class MockRecordReader extends AbstractRecordReader {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(MockRecordReader.class);
 
   private OutputMutator output;
@@ -120,6 +123,17 @@ public class MockRecordReader implements RecordReader {
   }
 
   @Override
+  public void allocate(Map<Key, ValueVector> vectorMap) throws 
OutOfMemoryException {
+    try {
+      for (ValueVector v : vectorMap.values()) {
+        AllocationHelper.allocate(v, Character.MAX_VALUE, 50, 10);
+      }
+    } catch (NullPointerException e) {
+      throw new OutOfMemoryException();
+    }
+  }
+
+  @Override
   public void cleanup() {
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/437366f0/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
index 6c2d44c..c72e750 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Predicate;
@@ -36,14 +37,17 @@ import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.base.GroupScan;
 import org.apache.drill.exec.physical.impl.OutputMutator;
 import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.MaterializedField.Key;
 import org.apache.drill.exec.store.AbstractRecordReader;
 import org.apache.drill.exec.store.RecordReader;
 import org.apache.drill.exec.vector.NullableBitVector;
+import org.apache.drill.exec.vector.AllocationHelper;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.RepeatedFixedWidthVector;
 import org.apache.hadoop.fs.FileSystem;
@@ -320,6 +324,18 @@ public class ParquetRecordReader extends 
AbstractRecordReader {
     }
   }
 
+  @Override
+  public void allocate(Map<Key, ValueVector> vectorMap) throws 
OutOfMemoryException {
+    try {
+      for (ValueVector v : vectorMap.values()) {
+        AllocationHelper.allocate(v, recordsPerBatch, 50, 10);
+      }
+    } catch (NullPointerException e) {
+      throw new OutOfMemoryException();
+    }
+  }
+
+
   private SchemaPath toFieldName(String[] paths) {
     return SchemaPath.getCompoundPath(paths);
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/437366f0/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
index 7a864f0..14075f3 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
@@ -24,12 +24,16 @@ import com.google.common.collect.Lists;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.PathSegment;
 import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.record.MaterializedField.Key;
 import org.apache.drill.exec.store.AbstractRecordReader;
 import org.apache.drill.exec.store.RecordReader;
 import org.apache.drill.exec.store.parquet.RowGroupReadEntry;
+import org.apache.drill.exec.vector.AllocationHelper;
 import org.apache.drill.exec.vector.BaseValueVector;
+import org.apache.drill.exec.vector.RepeatedFixedWidthVector;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.VariableWidthVector;
 import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
@@ -114,6 +118,17 @@ public class DrillParquetReader extends 
AbstractRecordReader {
   }
 
   @Override
+  public void allocate(Map<Key, ValueVector> vectorMap) throws 
OutOfMemoryException {
+    try {
+      for (ValueVector v : vectorMap.values()) {
+        AllocationHelper.allocate(v, Character.MAX_VALUE, 50, 10);
+      }
+    } catch (NullPointerException e) {
+      throw new OutOfMemoryException();
+    }
+  }
+
+  @Override
   public void setup(OutputMutator output) throws ExecutionSetupException {
 
     try {
@@ -202,7 +217,11 @@ public class DrillParquetReader extends 
AbstractRecordReader {
       if (v instanceof VariableWidthVector) {
         filled = Math.max(filled, ((VariableWidthVector) 
v).getCurrentSizeInBytes() * 100 / ((VariableWidthVector) v).getByteCapacity());
       }
+//      if (v instanceof RepeatedFixedWidthVector) {
+//        filled = Math.max(filled, ((RepeatedFixedWidthVector) 
v).getAccessor().getGroupCount() * 100)
+//      }
     }
+    logger.debug("Percent filled: {}", filled);
     return filled;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/437366f0/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
index 48e09aa..38160df 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
@@ -22,12 +22,15 @@ import java.lang.reflect.Modifier;
 import java.sql.Timestamp;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.impl.OutputMutator;
-import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.record.MaterializedField.Key;
+import org.apache.drill.exec.store.AbstractRecordReader;
 import org.apache.drill.exec.store.pojo.Writers.BitWriter;
 import org.apache.drill.exec.store.pojo.Writers.DoubleWriter;
 import org.apache.drill.exec.store.pojo.Writers.EnumWriter;
@@ -39,12 +42,12 @@ import 
org.apache.drill.exec.store.pojo.Writers.NDoubleWriter;
 import org.apache.drill.exec.store.pojo.Writers.NIntWriter;
 import org.apache.drill.exec.store.pojo.Writers.NTimeStampWriter;
 import org.apache.drill.exec.store.pojo.Writers.StringWriter;
+import org.apache.drill.exec.vector.AllocationHelper;
+import org.apache.drill.exec.vector.ValueVector;
 
 import com.google.common.collect.Lists;
 
-
-
-public class PojoRecordReader<T> implements RecordReader{
+public class PojoRecordReader<T> extends AbstractRecordReader {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(PojoRecordReader.class);
 
   public final int forJsonIgnore = 1;
@@ -120,6 +123,13 @@ public class PojoRecordReader<T> implements RecordReader{
 
   }
 
+  @Override
+  public void allocate(Map<Key, ValueVector> vectorMap) throws 
OutOfMemoryException {
+    for (ValueVector v : vectorMap.values()) {
+      AllocationHelper.allocate(v, Character.MAX_VALUE, 50, 10);
+    }
+  }
+
   private void allocate(){
     for(PojoWriter writer : writers){
       writer.allocate();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/437366f0/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
index 2031aee..ef65f2a 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
@@ -127,8 +127,8 @@ public class DrillTextRecordReader extends 
AbstractRecordReader {
 
   @Override
   public int next() {
-    logger.debug("vector value capacity {}", vector.getValueCapacity());
-    logger.debug("vector byte capacity {}", vector.getByteCapacity());
+//    logger.debug("vector value capacity {}", vector.getValueCapacity());
+//    logger.debug("vector byte capacity {}", vector.getByteCapacity());
     int batchSize = 0;
     try {
       int recordCount = 0;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/437366f0/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java
index b711c66..5db0299 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java
@@ -45,7 +45,10 @@ public abstract class BaseDataValueVector extends 
BaseValueVector{
    */
   @Override
   public void clear() {
-    if (data != null) {
+    if (data == null) {
+      data = DeadBuf.DEAD_BUFFER;
+    }
+    if (data != DeadBuf.DEAD_BUFFER) {
       data.release();
       data = data.getAllocator().getEmpty();
       valueCount = 0;
@@ -62,16 +65,20 @@ public abstract class BaseDataValueVector extends 
BaseValueVector{
 
 
   @Override
-  public DrillBuf[] getBuffers(){
+  public DrillBuf[] getBuffers(boolean clear){
     DrillBuf[] out;
     if(valueCount == 0){
       out = new DrillBuf[0];
     }else{
       out = new DrillBuf[]{data};
-      data.readerIndex(0);
-      data.retain();
+      if (clear) {
+        data.readerIndex(0);
+        data.retain();
+      }
+    }
+    if (clear) {
+      clear();
     }
-    clear();
     return out;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/437366f0/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java
index 1b3705e..18da67d 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java
@@ -17,12 +17,11 @@
  */
 package org.apache.drill.exec.vector;
 
+import io.netty.buffer.DrillBuf;
+
 import java.util.Iterator;
 
 import org.apache.drill.common.expression.FieldReference;
-
-import io.netty.buffer.ByteBuf;
-
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.proto.UserBitShared.SerializedField;
 import org.apache.drill.exec.record.MaterializedField;
@@ -62,7 +61,7 @@ public abstract class BaseValueVector implements ValueVector{
   public abstract int getCurrentValueCount();
   public abstract void setCurrentValueCount(int count);
 
-  abstract public ByteBuf getData();
+  abstract public DrillBuf getData();
 
   abstract static class BaseAccessor implements ValueVector.Accessor{
     public abstract int getValueCount();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/437366f0/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ObjectVector.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ObjectVector.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ObjectVector.java
index 2c215ff..032ccc2 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ObjectVector.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ObjectVector.java
@@ -136,7 +136,7 @@ public class ObjectVector extends BaseValueVector{
   }
 
   @Override
-  public ByteBuf getData() {
+  public DrillBuf getData() {
     throw new UnsupportedOperationException("ObjectVector does not support 
this");
   }
 
@@ -166,7 +166,7 @@ public class ObjectVector extends BaseValueVector{
   }
 
   @Override
-  public DrillBuf[] getBuffers() {
+  public DrillBuf[] getBuffers(boolean clear) {
     throw new UnsupportedOperationException("ObjectVector does not support 
this");
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/437366f0/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java
index f7f010a..3433537 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java
@@ -22,6 +22,7 @@ import io.netty.buffer.DrillBuf;
 import java.io.Closeable;
 
 import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.memory.OutOfMemoryRuntimeException;
 import org.apache.drill.exec.proto.UserBitShared.SerializedField;
 import org.apache.drill.exec.record.MaterializedField;
@@ -99,9 +100,12 @@ public interface ValueVector extends Closeable, 
Iterable<ValueVector> {
    * this buffer so it only should be used for in-context access. Also note 
that this buffer changes regularly thus
    * external classes shouldn't hold a reference to it (unless they change it).
    *
-   * @return The underlying DrillBuf.
+   * @param clear
+   *          Whether to clear vector
+   *
+   * @return The underlying ByteBuf.
    */
-  public abstract DrillBuf[] getBuffers();
+  public abstract DrillBuf[] getBuffers(boolean clear);
 
   /**
    * Load the data provided in the buffer. Typically used when deserializing 
from the wire.

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/437366f0/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java
index c91c397..834719c 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java
@@ -276,10 +276,10 @@ public class MapVector extends AbstractContainerVector {
   }
 
   @Override
-  public DrillBuf[] getBuffers() {
+  public DrillBuf[] getBuffers(boolean clear) {
     List<DrillBuf> bufs = Lists.newArrayList();
     for(ValueVector v : vectors.values()){
-      for(DrillBuf b : v.getBuffers()){
+      for(DrillBuf b : v.getBuffers(clear)){
         bufs.add(b);
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/437366f0/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java
index f903b0c..fef416f 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java
@@ -355,8 +355,8 @@ public class RepeatedListVector extends 
AbstractContainerVector implements Repea
   }
 
   @Override
-  public DrillBuf[] getBuffers() {
-    return ArrayUtils.addAll(offsets.getBuffers(), vector.getBuffers());
+  public DrillBuf[] getBuffers(boolean clear) {
+    return ArrayUtils.addAll(offsets.getBuffers(clear), 
vector.getBuffers(clear));
   }
 
   private void setVector(ValueVector v){

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/437366f0/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
index 3fd1c12..678439b 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
@@ -307,11 +307,11 @@ public class RepeatedMapVector extends 
AbstractContainerVector implements Repeat
   }
 
   @Override
-  public DrillBuf[] getBuffers() {
-    List<DrillBuf> bufs = Lists.newArrayList(offsets.getBuffers());
+  public DrillBuf[] getBuffers(boolean clear) {
+    List<DrillBuf> bufs = Lists.newArrayList(offsets.getBuffers(clear));
 
     for(ValueVector v : vectors.values()){
-      for(DrillBuf b : v.getBuffers()){
+      for(DrillBuf b : v.getBuffers(clear)){
         bufs.add(b);
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/437366f0/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
index bb56e10..912f956 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
@@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.proto.BitData.FragmentRecordBatch;
 import org.apache.drill.exec.record.RawFragmentBatch;
 import org.apache.drill.exec.rpc.ResponseSender;
 import org.apache.drill.exec.rpc.data.DataRpcConfig;
@@ -59,7 +60,9 @@ public class UnlimitedRawBatchBuffer implements 
RawBatchBuffer{
     }
     if (batch.getHeader().getIsOutOfMemory()) {
       logger.debug("Setting autoread false");
-      if (!outOfMemory.get() && 
!buffer.peekFirst().getHeader().getIsOutOfMemory()) {
+      RawFragmentBatch firstBatch = buffer.peekFirst();
+      FragmentRecordBatch header = firstBatch == null ? null 
:firstBatch.getHeader();
+      if (!outOfMemory.get() && !(header == null) && 
header.getIsOutOfMemory()) {
         buffer.addFirst(batch);
       }
       outOfMemory.set(true);

Reply via email to