DRILL-969: Improvements to ValueVector allocations

Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/2712c3c3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/2712c3c3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/2712c3c3

Branch: refs/heads/master
Commit: 2712c3c35f8fc1e5bd7443c7e40d6b76fad9d49d
Parents: 734f9a8
Author: Steven Phillips <[email protected]>
Authored: Mon Jun 9 16:39:56 2014 -0700
Committer: Jacques Nadeau <[email protected]>
Committed: Wed Jun 11 21:24:55 2014 -0700

----------------------------------------------------------------------
 .../codegen/templates/FixedValueVectors.java    | 15 ++++---
 .../codegen/templates/RepeatedValueVectors.java |  9 ++--
 .../templates/VariableLengthVectors.java        | 23 +++++-----
 .../impl/svremover/RemovingRecordBatch.java     | 46 +++++++++++++++-----
 .../exec/record/AbstractSingleRecordBatch.java  |  7 +++
 .../exec/store/text/DrillTextRecordReader.java  | 13 ++++--
 6 files changed, 78 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2712c3c3/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java 
b/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java
index af31f64..a83ec97 100644
--- a/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java
+++ b/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java
@@ -53,7 +53,7 @@ public final class ${minor.class}Vector extends 
BaseDataValueVector implements F
   private final Accessor accessor = new Accessor();
   private final Mutator mutator = new Mutator();
 
-  private int allocationValueCount = 4000;
+  private int allocationValueCount = 4096;
   private int allocationMonitor = 0;
   
   public ${minor.class}Vector(MaterializedField field, BufferAllocator 
allocator) {
@@ -81,11 +81,11 @@ public final class ${minor.class}Vector extends 
BaseDataValueVector implements F
   
   public boolean allocateNewSafe() {
     clear();
-    if (allocationMonitor > 5) {
-      allocationValueCount = Math.max(2, (int) (allocationValueCount * 0.9));
+    if (allocationMonitor > 10) {
+      allocationValueCount = Math.max(8, (int) (allocationValueCount / 2));
       allocationMonitor = 0;
-    } else if (allocationMonitor < -5) {
-      allocationValueCount = (int) (allocationValueCount * 1.1);
+    } else if (allocationMonitor < -2) {
+      allocationValueCount = (int) (allocationValueCount * 2);
       allocationMonitor = 0;
     }
     this.data = allocator.buffer(allocationValueCount * ${type.width});
@@ -102,6 +102,7 @@ public final class ${minor.class}Vector extends 
BaseDataValueVector implements F
     clear();
     this.data = allocator.buffer(valueCount * ${type.width});
     this.data.readerIndex(0);
+    this.allocationValueCount = valueCount;
   }
 
   /**
@@ -815,8 +816,10 @@ public final class ${minor.class}Vector extends 
BaseDataValueVector implements F
      int currentValueCapacity = getValueCapacity();
      ${minor.class}Vector.this.valueCount = valueCount;
      int idx = (${type.width} * valueCount);
-     if (((float) currentValueCapacity) / idx > 1.1) {
+     if (valueCount > 0 && currentValueCapacity > idx * 2) {
        allocationMonitor++;
+     } else if (allocationMonitor > 0) {
+       allocationMonitor--;
      }
      data.writerIndex(idx);
      if (data instanceof AccountingByteBuf) {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2712c3c3/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java 
b/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java
index 48efc16..7bf84f2 100644
--- a/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java
+++ b/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java
@@ -98,6 +98,8 @@ package org.apache.drill.exec.vector;
     int endValue = offsets.getAccessor().get(startIndex + length);
     values.splitAndTransferTo(startValue, endValue - startValue, 
target.values);
     offsets.splitAndTransferTo(startIndex, length, target.offsets);
+    target.parentValueCount = parentValueCount;
+    target.childValueCount = childValueCount;
     sliceOffset = startIndex;
   }
   
@@ -369,8 +371,7 @@ package org.apache.drill.exec.vector;
       if(getValueCapacity() <= index){
         return false;
       }
-      offsets.getMutator().set(index+1, offsets.getAccessor().get(index));
-      return true;
+      return offsets.getMutator().setSafe(index+1, 
offsets.getAccessor().get(index));
     }
 
     /**
@@ -392,7 +393,9 @@ package org.apache.drill.exec.vector;
     }
 
     public boolean addSafe(int index, byte[] bytes, int start, int length) {
-      if(offsets.getValueCapacity() <= index+1) return false;
+      if(offsets.getValueCapacity() <= index+1) {
+        return false;
+      }
       int nextOffset = offsets.getAccessor().get(index+1);
       boolean b1 = values.getMutator().setSafe(nextOffset, bytes, start, 
length);
       boolean b2 = offsets.getMutator().setSafe(index+1, nextOffset+1);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2712c3c3/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java 
b/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
index 6a2dfd3..22a668d 100644
--- a/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
+++ b/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
@@ -49,7 +49,7 @@ public final class ${minor.class}Vector extends 
BaseDataValueVector implements V
   private final Accessor accessor = new Accessor();
   private final Mutator mutator = new Mutator();
 
-  private int allocationTotalByteCount = 40000;
+  private int allocationTotalByteCount = 32768;
   private int allocationMonitor = 0;
 
   public ${minor.class}Vector(MaterializedField field, BufferAllocator 
allocator) {
@@ -229,11 +229,11 @@ public final class ${minor.class}Vector extends 
BaseDataValueVector implements V
   @Override
   public boolean allocateNewSafe() {
     clear();
-    if (allocationMonitor > 5) {
-      allocationTotalByteCount = Math.max(1, (int) (allocationTotalByteCount * 
0.9));
+    if (allocationMonitor > 10) {
+      allocationTotalByteCount = Math.max(8, (int) (allocationTotalByteCount / 
2));
       allocationMonitor = 0;
-    } else if (allocationMonitor < -5) {
-      allocationTotalByteCount = (int) (allocationTotalByteCount * 1.1);
+    } else if (allocationMonitor < -2) {
+      allocationTotalByteCount = (int) (allocationTotalByteCount * 2);
       allocationMonitor = 0;
     }
     data = allocator.buffer(allocationTotalByteCount);
@@ -254,6 +254,7 @@ public final class ${minor.class}Vector extends 
BaseDataValueVector implements V
     assert totalBytes >= 0;
     data = allocator.buffer(totalBytes);
     data.readerIndex(0);
+    allocationTotalByteCount = totalBytes;
     offsetVector.allocateNew(valueCount+1);
     offsetVector.zeroVector();
   }
@@ -359,7 +360,6 @@ public final class ${minor.class}Vector extends 
BaseDataValueVector implements V
 
     public boolean setSafe(int index, byte[] bytes) {
       assert index >= 0;
-      if(index >= getValueCapacity()) return false;
 
       int currentOffset = offsetVector.getAccessor().get(index);
       if (data.capacity() < currentOffset + bytes.length) {
@@ -391,7 +391,6 @@ public final class ${minor.class}Vector extends 
BaseDataValueVector implements V
 
     public boolean setSafe(int index, byte[] bytes, int start, int length) {
       assert index >= 0;
-      if(index >= getValueCapacity()) return false;
 
       int currentOffset = offsetVector.getAccessor().get(index);
 
@@ -409,8 +408,7 @@ public final class ${minor.class}Vector extends 
BaseDataValueVector implements V
    
     public boolean setSafe(int index, Nullable${minor.class}Holder holder){
       assert holder.isSet == 1;
-      if(index >= getValueCapacity()) return false;
-      
+
       int start = holder.start;
       int end =   holder.end;
       int len = end - start;
@@ -433,8 +431,7 @@ public final class ${minor.class}Vector extends 
BaseDataValueVector implements V
     }
     
     public boolean setSafe(int index, ${minor.class}Holder holder){
-      if(index >= getValueCapacity()) return false;
-      
+
       int start = holder.start;
       int end =   holder.end;
       int len = end - start;
@@ -483,8 +480,10 @@ public final class ${minor.class}Vector extends 
BaseDataValueVector implements V
       ${minor.class}Vector.this.valueCount = valueCount;
       int idx = offsetVector.getAccessor().get(valueCount);
       data.writerIndex(idx);
-      if (((float) currentByteCapacity) / idx > 1.1) {
+      if (valueCount > 0 && currentByteCapacity > idx * 2) {
         allocationMonitor++;
+      } else if (allocationMonitor > 0) {
+        allocationMonitor--;
       }
       if (data instanceof AccountingByteBuf) {
         data.capacity(idx);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2712c3c3/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
index f3388dc..77582c3 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
@@ -49,6 +49,7 @@ public class RemovingRecordBatch extends 
AbstractSingleRecordBatch<SelectionVect
   private int recordCount;
   private boolean hasRemainder;
   private int remainderIndex;
+  private boolean first;
 
   public RemovingRecordBatch(SelectionVectorRemover popConfig, FragmentContext 
context, RecordBatch incoming) throws OutOfMemoryException {
     super(popConfig, context, incoming);
@@ -93,9 +94,10 @@ public class RemovingRecordBatch extends 
AbstractSingleRecordBatch<SelectionVect
 
   @Override
   protected void doWork() {
-    recordCount = incoming.getRecordCount();
-    int copiedRecords = copier.copyRecords(0, recordCount);
-    if (copiedRecords < recordCount) {
+    int incomingRecordCount = incoming.getRecordCount();
+    int copiedRecords = copier.copyRecords(0, incomingRecordCount);
+
+    if (copiedRecords < incomingRecordCount) {
       for(VectorWrapper<?> v : container){
         ValueVector.Mutator m = v.getValueVector().getMutator();
         m.setValueCount(copiedRecords);
@@ -104,6 +106,7 @@ public class RemovingRecordBatch extends 
AbstractSingleRecordBatch<SelectionVect
       remainderIndex = copiedRecords;
       this.recordCount = remainderIndex;
     } else {
+      recordCount = copiedRecords;
       for(VectorWrapper<?> v : container){
         ValueVector.Mutator m = v.getValueVector().getMutator();
         m.setValueCount(recordCount);
@@ -118,16 +121,37 @@ public class RemovingRecordBatch extends 
AbstractSingleRecordBatch<SelectionVect
       }
     }
 
-    logger.debug(String.format("doWork(): %s records copied for out of %s, 
remaining: %s, incoming schema %s ",
+    assert recordCount >= copiedRecords;
+    logger.debug("doWork(): {} records copied out of {}, remaining: {}, 
incoming schema {} ",
         copiedRecords,
-        incoming.getRecordCount(),
-        incoming.getRecordCount() - remainderIndex,
-        incoming.getSchema()));
+        incomingRecordCount,
+        incomingRecordCount - remainderIndex,
+        incoming.getSchema());
   }
 
   private void handleRemainder() {
+    int recordCount = incoming.getRecordCount();
     int remainingRecordCount = incoming.getRecordCount() - remainderIndex;
-    int copiedRecords = copier.copyRecords(remainderIndex, 
remainingRecordCount);
+    int copiedRecords;
+    while((copiedRecords = copier.copyRecords(remainderIndex, 
remainingRecordCount)) == 0) {
+      logger.debug("Copied zero records. Retrying");
+      container.zeroVectors();
+    }
+
+    /*
+    StringBuilder builder = new StringBuilder();
+    for (VectorWrapper w : container) {
+      builder.append(w.getField().getPath());
+      builder.append(" Value capacity: ");
+      builder.append(w.getValueVector().getValueCapacity());
+      if (w.getValueVector() instanceof VariableWidthVector) {
+        builder.append(" Byte capacity: ");
+        builder.append(((VariableWidthVector) 
w.getValueVector()).getByteCapacity());
+        builder.append("\n");
+      }
+    }
+    logger.debug(builder.toString());
+    */
 
     if (copiedRecords < remainingRecordCount) {
       for(VectorWrapper<?> v : container){
@@ -150,10 +174,10 @@ public class RemovingRecordBatch extends 
AbstractSingleRecordBatch<SelectionVect
       remainderIndex = 0;
       hasRemainder = false;
     }
-    if(logger.isDebugEnabled()) logger.debug(String.format("handleRemainder(): 
%s records copied for out of %s, remaining: %s, incoming schema %s ",
+    logger.debug(String.format("handleRemainder(): %s records copied out of 
%s, remaining: %s, incoming schema %s ",
         copiedRecords,
-        incoming.getRecordCount(),
-        incoming.getRecordCount() - remainderIndex,
+        recordCount,
+        recordCount - remainderIndex,
         incoming.getSchema()));
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2712c3c3/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
index c5fdaeb..9473945 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
@@ -44,6 +44,13 @@ public abstract class AbstractSingleRecordBatch<T extends 
PhysicalOperator> exte
   public IterOutcome innerNext() {
     IterOutcome upstream = next(incoming);
     if(first && upstream == IterOutcome.OK) upstream = 
IterOutcome.OK_NEW_SCHEMA;
+    if (!first && upstream == IterOutcome.OK && incoming.getRecordCount() == 
0) {
+      do {
+        for (VectorWrapper w : incoming) {
+          w.clear();
+        }
+      } while ((upstream = next(incoming)) == IterOutcome.OK && 
incoming.getRecordCount() == 0);
+    }
     first = false;
     switch(upstream){
     case NONE:

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2712c3c3/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
index 5c3d381..20f458f 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
@@ -63,6 +63,7 @@ public class DrillTextRecordReader implements RecordReader {
   private Text value;
   private int numCols = 0;
   private boolean redoRecord = false;
+  private boolean first = true;
 
   public DrillTextRecordReader(FileSplit split, FragmentContext context, char 
delimiter, List<SchemaPath> columns) {
     this.context = context;
@@ -106,10 +107,12 @@ public class DrillTextRecordReader implements 
RecordReader {
 
   @Override
   public int next() {
-    AllocationHelper.allocate(vector, targetRecordCount, 50);
+    logger.debug("vector value capacity {}", vector.getValueCapacity());
+    logger.debug("vector byte capacity {}", vector.getByteCapacity());
+    int batchSize = 0;
     try {
       int recordCount = 0;
-      while (redoRecord || (recordCount < targetRecordCount && 
reader.next(key, value))) {
+      while (redoRecord || (batchSize < 200*1000 && reader.next(key, value))) {
         redoRecord = false;
         int start;
         int end = -1;
@@ -126,9 +129,10 @@ public class DrillTextRecordReader implements RecordReader 
{
             end = value.getLength();
           }
           if (numCols > 0 && i++ < columnIds.get(p)) {
-            if (!vector.getMutator().addSafe(recordCount, value.getBytes(), 
start + 1, start + 1)) {
+            if (!vector.getMutator().addSafe(recordCount, value.getBytes(), 
start + 1, 0)) {
               redoRecord = true;
               vector.getMutator().setValueCount(recordCount);
+              logger.debug("text scan batch size {}", batchSize);
               return recordCount;
             }
             continue;
@@ -137,8 +141,10 @@ public class DrillTextRecordReader implements RecordReader 
{
           if (!vector.getMutator().addSafe(recordCount, value.getBytes(), 
start + 1, end - start - 1)) {
             redoRecord = true;
             vector.getMutator().setValueCount(recordCount);
+            logger.debug("text scan batch size {}", batchSize);
             return recordCount;
           }
+          batchSize += end - start;
         }
         recordCount++;
       }
@@ -146,6 +152,7 @@ public class DrillTextRecordReader implements RecordReader {
         v.getMutator().setValueCount(recordCount);
       }
       vector.getMutator().setValueCount(recordCount);
+      logger.debug("text scan batch size {}", batchSize);
       return recordCount;
     } catch (IOException e) {
       cleanup();

Reply via email to