DRILL-6126: Allocate memory for value vectors upfront in flatten operator

Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/31e0f29a
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/31e0f29a
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/31e0f29a

Branch: refs/heads/master
Commit: 31e0f29a6140a19eda8de615e615208f51f2cf96
Parents: 47c5d1f
Author: Padma Penumarthy <ppenuma...@yahoo.com>
Authored: Tue Mar 6 16:09:13 2018 -0800
Committer: Ben-Zvi <bben-...@mapr.com>
Committed: Wed Mar 7 15:41:26 2018 -0800

----------------------------------------------------------------------
 .../impl/flatten/FlattenRecordBatch.java        | 34 +++++++++++---------
 .../AbstractRecordBatchMemoryManager.java       | 24 ++++++++++++--
 2 files changed, 40 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/31e0f29a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
index 4a910ef..9dd1770 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
@@ -104,25 +104,24 @@ public class FlattenRecordBatch extends 
AbstractSingleRecordBatch<FlattenPOP> {
     @Override
     public void update() {
       // Get sizing information for the batch.
-      RecordBatchSizer sizer = new RecordBatchSizer(incoming);
+      setRecordBatchSizer(new RecordBatchSizer(incoming));
 
       final TypedFieldId typedFieldId = 
incoming.getValueVectorId(popConfig.getColumn());
       final MaterializedField field = 
incoming.getSchema().getColumn(typedFieldId.getFieldIds()[0]);
 
       // Get column size of flatten column.
-      RecordBatchSizer.ColumnSize columnSize = 
RecordBatchSizer.getColumn(incoming.getValueAccessorById(field.getValueClass(),
-        typedFieldId.getFieldIds()).getValueVector(), field.getName());
+      RecordBatchSizer.ColumnSize columnSize = 
getRecordBatchSizer().getColumn(field.getName());
 
       // Average rowWidth of flatten column
-      final int avgRowWidthFlattenColumn = 
RecordBatchSizer.safeDivide(columnSize.netSize, incoming.getRecordCount());
+      final int avgRowWidthFlattenColumn = columnSize.getNetSizePerEntry();
 
       // Average rowWidth excluding the flatten column.
-      final int avgRowWidthWithOutFlattenColumn = sizer.netRowWidth() - 
avgRowWidthFlattenColumn;
+      final int avgRowWidthWithOutFlattenColumn = 
getRecordBatchSizer().netRowWidth() - avgRowWidthFlattenColumn;
 
       // Average rowWidth of single element in the flatten list.
       // subtract the offset vector size from column data size.
       final int avgRowWidthSingleFlattenEntry =
-        RecordBatchSizer.safeDivide(columnSize.netSize - (OFFSET_VECTOR_WIDTH 
* columnSize.valueCount), columnSize.elementCount);
+        RecordBatchSizer.safeDivide(columnSize.getTotalNetSize() - 
(OFFSET_VECTOR_WIDTH * columnSize.getValueCount()), 
columnSize.getElementCount());
 
       // Average rowWidth of outgoing batch.
       final int avgOutgoingRowWidth = avgRowWidthWithOutFlattenColumn + 
avgRowWidthSingleFlattenEntry;
@@ -130,13 +129,16 @@ public class FlattenRecordBatch extends 
AbstractSingleRecordBatch<FlattenPOP> {
       // Number of rows in outgoing batch
       setOutputRowCount(outputBatchSize, avgOutgoingRowWidth);
 
+      // Limit to lower bound of total number of rows possible for this batch
+      // i.e. all rows fit within memory budget.
+      setOutputRowCount(Math.min(columnSize.getElementCount(), 
getOutputRowCount()));
+
       logger.debug("flatten incoming batch sizer : {}, outputBatchSize : {}," +
-        "avgOutgoingRowWidth : {}, outputRowCount : {}", sizer, 
outputBatchSize, avgOutgoingRowWidth, getOutputRowCount());
+        "avgOutgoingRowWidth : {}, outputRowCount : {}", 
getRecordBatchSizer(), outputBatchSize,
+        avgOutgoingRowWidth, getOutputRowCount());
     }
-
   }
 
-
   public FlattenRecordBatch(FlattenPOP pop, RecordBatch incoming, 
FragmentContext context) throws OutOfMemoryException {
     super(pop, context, incoming);
 
@@ -199,7 +201,7 @@ public class FlattenRecordBatch extends 
AbstractSingleRecordBatch<FlattenPOP> {
 
     int incomingRecordCount = incoming.getRecordCount();
 
-    if (!doAlloc()) {
+    if (!doAlloc(flattenMemoryManager.getOutputRowCount())) {
       outOfMemory = true;
       return IterOutcome.OUT_OF_MEMORY;
     }
@@ -235,7 +237,7 @@ public class FlattenRecordBatch extends 
AbstractSingleRecordBatch<FlattenPOP> {
 
   private void handleRemainder() {
     int remainingRecordCount = 
flattener.getFlattenField().getAccessor().getInnerValueCount() - remainderIndex;
-    if (!doAlloc()) {
+    if (!doAlloc(remainingRecordCount)) {
       outOfMemory = true;
       return;
     }
@@ -266,12 +268,12 @@ public class FlattenRecordBatch extends 
AbstractSingleRecordBatch<FlattenPOP> {
     complexWriters.add(writer);
   }
 
-  private boolean doAlloc() {
-    //Allocate vv in the allocationVectors.
+  private boolean doAlloc(int recordCount) {
+
     for (ValueVector v : this.allocationVectors) {
-      if (!v.allocateNewSafe()) {
-        return false;
-      }
+      // This will iteratively allocate memory for nested columns underneath.
+      RecordBatchSizer.ColumnSize colSize = 
flattenMemoryManager.getColumnSize(v.getField().getName());
+      colSize.allocateVector(v, recordCount);
     }
 
     //Allocate vv for complexWriters.

http://git-wip-us.apache.org/repos/asf/drill/blob/31e0f29a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatchMemoryManager.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatchMemoryManager.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatchMemoryManager.java
index b91ede0..1abd365 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatchMemoryManager.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatchMemoryManager.java
@@ -27,6 +27,7 @@ public abstract class AbstractRecordBatchMemoryManager {
   protected static final int MIN_NUM_ROWS = 1;
   private int outputRowCount = MAX_NUM_ROWS;
   private int outgoingRowWidth;
+  private RecordBatchSizer sizer;
 
   public void update(int inputIndex) {};
 
@@ -41,14 +42,20 @@ public abstract class AbstractRecordBatchMemoryManager {
    * the min and max that is allowed.
    */
   public void setOutputRowCount(int targetBatchSize, int rowWidth) {
-    this.outputRowCount = 
adjustOutputRowCount(RecordBatchSizer.safeDivide(targetBatchSize/WORST_CASE_FRAGMENTATION_FACTOR,
 rowWidth));
+    this.outputRowCount = 
adjustOutputRowCount(RecordBatchSizer.safeDivide(targetBatchSize, rowWidth));
+  }
+
+  public void setOutputRowCount(int outputRowCount) {
+    this.outputRowCount = outputRowCount;
   }
 
   /**
    * This will adjust rowCount taking into account the min and max that is 
allowed.
+   * We will round down to nearest power of two - 1 for better memory 
utilization.
+   * -1 is done for adjusting accounting for offset vectors.
    */
   public static int adjustOutputRowCount(int rowCount) {
-    return (Math.min(MAX_NUM_ROWS, Math.max(rowCount, MIN_NUM_ROWS)));
+    return (Math.min(MAX_NUM_ROWS, Math.max(Integer.highestOneBit(rowCount) - 
1, MIN_NUM_ROWS)));
   }
 
   public void setOutgoingRowWidth(int outgoingRowWidth) {
@@ -58,4 +65,17 @@ public abstract class AbstractRecordBatchMemoryManager {
   public int getOutgoingRowWidth() {
     return outgoingRowWidth;
   }
+
+  public void setRecordBatchSizer(RecordBatchSizer sizer) {
+    this.sizer = sizer;
+  }
+
+  public RecordBatchSizer getRecordBatchSizer() {
+    return sizer;
+  }
+
+  public RecordBatchSizer.ColumnSize getColumnSize(String name) {
+    return sizer.getColumn(name);
+  }
+
 }

Reply via email to