DRILL-6307: Handle empty batches in record batch sizer correctly

closes #1228


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/f563f382
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/f563f382
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/f563f382

Branch: refs/heads/master
Commit: f563f38225d96872fd4bff237c1d70469b2664bf
Parents: 28a2903
Author: Padma Penumarthy <[email protected]>
Authored: Fri Apr 6 12:56:06 2018 -0700
Committer: Vitalii Diravka <[email protected]>
Committed: Sun Apr 29 23:20:55 2018 +0300

----------------------------------------------------------------------
 .../physical/impl/unnest/UnnestRecordBatch.java |   4 +-
 .../exec/record/JoinBatchMemoryManager.java     |   4 +-
 .../drill/exec/record/RecordBatchSizer.java     |  52 +-
 .../exec/physical/unit/TestOutputBatchSize.java |  12 +-
 .../drill/exec/record/TestRecordBatchSizer.java | 731 ++++++++++++++++++-
 .../drill/exec/vector/AllocationHelper.java     |   4 +-
 6 files changed, 762 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/f563f382/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
index 576594e..668a897 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
@@ -108,9 +108,7 @@ public class UnnestRecordBatch extends 
AbstractTableFunctionRecordBatch<UnnestPO
       final MaterializedField field = 
incoming.getSchema().getColumn(typedFieldId.getFieldIds()[0]);
 
       // Get column size of unnest column.
-      RecordBatchSizer.ColumnSize columnSize = RecordBatchSizer
-          .getColumn(incoming.getValueAccessorById(field.getValueClass(), 
typedFieldId.getFieldIds()).getValueVector(),
-              field.getName());
+      RecordBatchSizer.ColumnSize columnSize = 
getRecordBatchSizer().getColumn(field.getName());
 
       // Average rowWidth of single element in the unnest list.
       // subtract the offset vector size from column data size.

http://git-wip-us.apache.org/repos/asf/drill/blob/f563f382/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java
index c0171eb..fbf8bb4 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java
@@ -47,12 +47,12 @@ public class JoinBatchMemoryManager extends 
RecordBatchMemoryManager {
     switch (inputIndex) {
       case LEFT_INDEX:
         setRecordBatchSizer(inputIndex, new RecordBatchSizer(leftIncoming));
-        leftRowWidth = getRecordBatchSizer(inputIndex).netRowWidth();
+        leftRowWidth = getRecordBatchSizer(inputIndex).getRowAllocSize();
         logger.debug("left incoming batch size : {}", 
getRecordBatchSizer(inputIndex));
         break;
       case RIGHT_INDEX:
         setRecordBatchSizer(inputIndex, new RecordBatchSizer(rightIncoming));
-        rightRowWidth = getRecordBatchSizer(inputIndex).netRowWidth();
+        rightRowWidth = getRecordBatchSizer(inputIndex).getRowAllocSize();
         logger.debug("right incoming batch size : {}", 
getRecordBatchSizer(inputIndex));
       default:
         break;

http://git-wip-us.apache.org/repos/asf/drill/blob/f563f382/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchSizer.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchSizer.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchSizer.java
index 1586ccc..d4da171 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchSizer.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchSizer.java
@@ -41,6 +41,7 @@ import org.apache.drill.exec.vector.VariableWidthVector;
 
 import com.google.common.collect.Sets;
 import org.apache.drill.exec.vector.complex.RepeatedVariableWidthVectorLike;
+import static 
org.apache.drill.exec.vector.AllocationHelper.STD_REPETITION_FACTOR;
 
 /**
  * Given a record batch or vector container, determines the actual memory
@@ -50,12 +51,11 @@ import 
org.apache.drill.exec.vector.complex.RepeatedVariableWidthVectorLike;
 public class RecordBatchSizer {
   private static final int OFFSET_VECTOR_WIDTH = UInt4Vector.VALUE_WIDTH;
   private static final int BIT_VECTOR_WIDTH = UInt1Vector.VALUE_WIDTH;
-  private static final int STD_REPETITION_FACTOR = 10;
 
   /**
    * Column size information.
    */
-  public static class ColumnSize {
+  public class ColumnSize {
     public final String prefix;
     public final MaterializedField metadata;
 
@@ -221,6 +221,15 @@ public class RecordBatchSizer {
     }
 
     /**
+     * This returns actual entry size if rowCount > 0 or standard size 
otherwise.
+     * Use this for the cases when you might get empty batches with schema
+     * and you still need to do memory calculations based on just schema.
+     */
+    public int getAllocSizePerEntry() {
+      return rowCount() == 0 ? getStdNetSizePerEntry() : getNetSizePerEntry();
+    }
+
+    /**
      * This is the total data size for the column, including children for map
      * columns. Does not include any overhead of metadata vectors.
      */
@@ -277,18 +286,29 @@ public class RecordBatchSizer {
     /**
      * This is the average per entry width, used for vector allocation.
      */
-    public int getEntryWidth() {
+    private int getEntryWidthForAlloc() {
       int width = 0;
       if (isVariableWidth) {
-        width = getNetSizePerEntry() - OFFSET_VECTOR_WIDTH;
+        width = getAllocSizePerEntry() - OFFSET_VECTOR_WIDTH;
 
         // Subtract out the bits (is-set) vector width
-        if (metadata.getDataMode() == DataMode.OPTIONAL) {
+        if (isOptional) {
           width -= BIT_VECTOR_WIDTH;
         }
+
+        if (isRepeated && rowCount() == 0) {
+          return (safeDivide(width, STD_REPETITION_FACTOR));
+        }
       }
 
-      return (safeDivide(width, cardinality));
+      return (safeDivide(width, getEntryCardinalityForAlloc()));
+    }
+
+    /**
+     * This is the average per entry cardinality, used for vector allocation.
+     */
+    private float getEntryCardinalityForAlloc() {
+      return getCardinality() == 0 ? (isRepeated ? STD_REPETITION_FACTOR : 1) 
:getCardinality();
     }
 
     public ColumnSize(ValueVector v, String prefix) {
@@ -297,7 +317,7 @@ public class RecordBatchSizer {
       metadata = v.getField();
       isVariableWidth = (v instanceof VariableWidthVector || v instanceof 
RepeatedVariableWidthVectorLike);
       elementCount = valueCount;
-      cardinality = 1;
+      cardinality = valueCount == 0 ? 0 : 1;
       totalNetSize = v.getPayloadByteCount(valueCount);
 
       // Special case. For union and list vectors, it is very complex
@@ -384,7 +404,7 @@ public class RecordBatchSizer {
     private void allocateMap(AbstractMapVector map, int recordCount) {
       if (map instanceof RepeatedMapVector) {
         ((RepeatedMapVector) map).allocateOffsetsNew(recordCount);
-          recordCount *= getCardinality();
+          recordCount *= getEntryCardinalityForAlloc();
         }
 
       for (ValueVector vector : map) {
@@ -394,7 +414,7 @@ public class RecordBatchSizer {
 
     private void allocateRepeatedList(RepeatedListVector vector, int 
recordCount) {
       vector.allocateOffsetsNew(recordCount);
-      recordCount *= getCardinality();
+      recordCount *= getEntryCardinalityForAlloc();
       ColumnSize child = children.get(vector.getField().getName());
       if (vector.getDataVector() != null) {
         child.allocateVector(vector.getDataVector(), recordCount);
@@ -412,7 +432,7 @@ public class RecordBatchSizer {
         return;
       }
 
-      AllocationHelper.allocate(vector, recordCount, getEntryWidth(), 
getCardinality());
+      AllocationHelper.allocate(vector, recordCount, getEntryWidthForAlloc(), 
getEntryCardinalityForAlloc());
     }
 
     @Override
@@ -498,10 +518,6 @@ public class RecordBatchSizer {
 
   }
 
-  public static ColumnSize getColumn(ValueVector v, String prefix) {
-    return new ColumnSize(v, prefix);
-  }
-
   public ColumnSize getColumn(String name) {
     return columnSizes.get(name);
   }
@@ -536,6 +552,11 @@ public class RecordBatchSizer {
    */
   private int netRowWidth;
   private int netRowWidthCap50;
+
+  /**
+   * actual row size if input is not empty. Otherwise, standard size.
+   */
+  private int rowAllocSize;
   private boolean hasSv2;
   private int sv2Size;
   private int avgDensity;
@@ -597,6 +618,7 @@ public class RecordBatchSizer {
         nullableCount++;
       }
       netRowWidth += colSize.getNetSizePerEntry();
+      rowAllocSize += colSize.getAllocSizePerEntry();
     }
 
     for (BufferLedger ledger : ledgers) {
@@ -618,7 +640,6 @@ public class RecordBatchSizer {
 
   private void computeEstimates() {
     grossRowWidth = safeDivide(accountedMemorySize, rowCount);
-    netRowWidth = safeDivide(netBatchSize, rowCount);
     avgDensity = safeDivide(netBatchSize * 100L, accountedMemorySize);
   }
 
@@ -718,6 +739,7 @@ public class RecordBatchSizer {
   public int stdRowWidth() { return stdRowWidth; }
   public int grossRowWidth() { return grossRowWidth; }
   public int netRowWidth() { return netRowWidth; }
+  public int getRowAllocSize() { return rowAllocSize; }
   public Map<String, ColumnSize> columns() { return columnSizes; }
 
   /**

http://git-wip-us.apache.org/repos/asf/drill/blob/f563f382/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java
index 0b4bffa..f4b9109 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java
@@ -1166,13 +1166,13 @@ public class TestOutputBatchSize extends 
PhysicalOpUnitTestBase {
     assertNotNull(column);
 
     /**
-     * stdDataSize:8*10*10, stdNetSize:8*10*10 + 4*10 + 4*10 + 4,
+     * stdDataSize:8*5*5, stdNetSize:8*5*5 + 4*5 + 4*5 + 4,
      * dataSizePerEntry:8*8, netSizePerEntry:8*8 + 4*2 + 4,
      * totalDataSize:8*8*10, totalNetSize:netSizePerEntry*10, valueCount:10,
      * elementCount:10, estElementCountPerArray:1, isVariableWidth:false
      */
-    assertEquals(800, column.getStdDataSizePerEntry());
-    assertEquals(884, column.getStdNetSizePerEntry());
+    assertEquals(200, column.getStdDataSizePerEntry());
+    assertEquals(244, column.getStdNetSizePerEntry());
     assertEquals(64, column.getDataSizePerEntry());
     assertEquals(76, column.getNetSizePerEntry());
     assertEquals(640, column.getTotalDataSize());
@@ -1309,13 +1309,13 @@ public class TestOutputBatchSize extends 
PhysicalOpUnitTestBase {
     assertNotNull(column);
 
     /**
-     * stdDataSize:8*10*10*10, stdNetSize:8*10*10*10 + 8*10*10 + 8*10 + 4,
+     * stdDataSize:8*5*5*5, stdNetSize:8*5*5*5 + 8*5*5 + 8*5 + 4,
      * dataSizePerEntry:16*8, netSizePerEntry:16*8 + 16*4 + 4*2 + 4*2,
      * totalDataSize:16*8*10, totalNetSize:netSizePerEntry*10, valueCount:10,
      * elementCount:10, estElementCountPerArray:1, isVariableWidth:false
      */
-    assertEquals(8000, column.getStdDataSizePerEntry());
-    assertEquals(8884, column.getStdNetSizePerEntry());
+    assertEquals(1000, column.getStdDataSizePerEntry());
+    assertEquals(1244, column.getStdNetSizePerEntry());
     assertEquals(128, column.getDataSizePerEntry());
     assertEquals(156, column.getNetSizePerEntry());
     assertEquals(1280, column.getTotalDataSize());

http://git-wip-us.apache.org/repos/asf/drill/blob/f563f382/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestRecordBatchSizer.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestRecordBatchSizer.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestRecordBatchSizer.java
index ccb9c19..eb50519 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestRecordBatchSizer.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestRecordBatchSizer.java
@@ -36,6 +36,7 @@ import org.junit.Test;
 import static junit.framework.TestCase.fail;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static 
org.apache.drill.exec.vector.AllocationHelper.STD_REPETITION_FACTOR;
 
 public class TestRecordBatchSizer extends SubOperatorTest {
   private final int testRowCount = 1000;
@@ -45,7 +46,8 @@ public class TestRecordBatchSizer extends SubOperatorTest {
   private void verifyColumnValues(ColumnSize column, int stdDataSizePerEntry, 
int stdNetSizePerEntry,
                                   int dataSizePerEntry, int netSizePerEntry, 
int totalDataSize,
                                   int totalNetSize, int valueCount, int 
elementCount,
-                                  int estElementCountPerArray, boolean 
isVariableWidth) {
+                                  int cardinality, // Array cardinality: the 
number of values in an array. 1, if not an array.
+                                  boolean isVariableWidth) {
     assertNotNull(column);
 
     assertEquals(stdDataSizePerEntry, column.getStdDataSizePerEntry());
@@ -60,7 +62,7 @@ public class TestRecordBatchSizer extends SubOperatorTest {
     assertEquals(valueCount, column.getValueCount());
     assertEquals(elementCount, column.getElementCount());
 
-    assertEquals(estElementCountPerArray, column.getCardinality(), 0.01);
+    assertEquals(cardinality, column.getCardinality(), 0.01);
     assertEquals(isVariableWidth, column.isVariableWidth());
   }
 
@@ -83,7 +85,7 @@ public class TestRecordBatchSizer extends SubOperatorTest {
     /**
      * stdDataSize:8, stdNetSize:8, dataSizePerEntry:8, netSizePerEntry:8,
      * totalDataSize:8*10, totalNetSize:8*10, valueCount:10,
-     * elementCount:10, estElementCountPerArray:1, isVariableWidth:false
+     * elementCount:10, cardinality:1, isVariableWidth:false
      */
     verifyColumnValues(aColumn, 8, 8, 8, 8, 80, 80, 10, 10, 1, false);
 
@@ -138,15 +140,15 @@ public class TestRecordBatchSizer extends SubOperatorTest 
{
     assertEquals(2, sizer.columns().size());
 
     /**
-     * stdDataSize:8*10, stdNetSize:8*10+4, dataSizePerEntry:5*8, 
netSizePerEntry:5*8+4,
-     * totalDataSize:5*8*10, totalNetSize:5*8*10+5*8, valueCount:10,
-     * elementCount:50, estElementCountPerArray:5, isVariableWidth:false
+     * stdDataSize:8*5, stdNetSize:8*5+4, dataSizePerEntry:5*8, 
netSizePerEntry:5*8+4,
+     * totalDataSize:5*8*5, totalNetSize:5*8*10+5*8, valueCount:10,
+     * elementCount:50, cardinality:5, isVariableWidth:false
      */
     verifyColumnValues(sizer.columns().get("a"),
-      80, 84, 40, 44, 400, 440, 10, 50, 5, false);
+      40, 44, 40, 44, 400, 440, 10, 50, 5, false);
 
     verifyColumnValues(sizer.columns().get("b"),
-      80, 84, 40, 44, 400, 440, 10, 50, 5, false);
+      40, 44, 40, 44, 400, 440, 10, 50, 5, false);
 
     SingleRowSet empty = fixture.rowSet(schema);
     VectorAccessible accessible = empty.vectorAccessible();
@@ -216,8 +218,8 @@ public class TestRecordBatchSizer extends SubOperatorTest {
 
     /**
      * stdDataSize:8, stdNetSize:8+1, dataSizePerEntry:8, netSizePerEntry:8+1,
-     * totalDataSize:8*10, totalNetSize:(8+1)*10, valueCount:10,
-     * elementCount:10, estElementCountPerArray:1, isVariableWidth:false
+     * totalDataSize:8*5, totalNetSize:(8+1)*5, valueCount:10,
+     * elementCount:10, cardinality:1, isVariableWidth:false
      */
     verifyColumnValues(aColumn,
       8, 9, 8, 9, 80, 90, 10, 10, 1, false);
@@ -294,7 +296,7 @@ public class TestRecordBatchSizer extends SubOperatorTest {
     /**
      * stdDataSize:50, stdNetSize:50+4, dataSizePerEntry:8, netSizePerEntry:8,
      * totalDataSize:(10*11)/2, totalNetSize:(10*11)/2 + 4*10, valueCount:10,
-     * elementCount:10, estElementCountPerArray:1, isVariableWidth:true
+     * elementCount:10, cardinality:1, isVariableWidth:true
      */
     verifyColumnValues(aColumn,
       50, 54, 6, 10, 55, 95, 10, 10, 1, true);
@@ -363,11 +365,11 @@ public class TestRecordBatchSizer extends SubOperatorTest 
{
     ColumnSize bColumn = sizer.columns().get("b");
 
     /**
-     * stdDataSize:50*10, stdNetSize:50*10+4*10+4, dataSizePerEntry:(5*6)/2, 
netSizePerEntry:(5*6)/2+5*4+4,
+     * stdDataSize:50*5, stdNetSize:50*5+4*5+4, dataSizePerEntry:(5*6)/2, 
netSizePerEntry:(5*6)/2+5*4+4,
      * totalDataSize:(5*6)/2 * 10, totalNetSize: ((5*6)/2+5*4+4)*10, 
valueCount:10,
-     * elementCount:50, estElementCountPerArray:5, isVariableWidth:true
+     * elementCount:50, cardinality:5, isVariableWidth:true
      */
-    verifyColumnValues(bColumn, 500, 544, 15, 39, 150, 390, 10, 50, 5,true);
+    verifyColumnValues(bColumn, 250, 274, 15, 39, 150, 390, 10, 50, 5,true);
 
     SingleRowSet empty = fixture.rowSet(schema);
     VectorAccessible accessible = empty.vectorAccessible();
@@ -444,7 +446,7 @@ public class TestRecordBatchSizer extends SubOperatorTest {
      * netSizePerEntry: dataSizePerEntry+4+1,
      * totalDataSize:(10*11)/2, totalNetSize: (10*11)/2 + (4*10) + (1*10),
      * valueCount:10,
-     * elementCount:10, estElementCountPerArray:1, isVariableWidth:true
+     * elementCount:10, cardinality:1, isVariableWidth:true
      */
     verifyColumnValues(sizer.columns().get("b"),
       50, 55, 6, 11, 55, 105, 10, 10, 1,true);
@@ -527,7 +529,7 @@ public class TestRecordBatchSizer extends SubOperatorTest {
      * netSizePerEntry: 4+1+4,
      * totalDataSize:5*10, totalNetSize:4*10+4*10+1*10,
      * valueCount:10,
-     * elementCount:10, estElementCountPerArray:1, isVariableWidth:true
+     * elementCount:10, cardinality:1, isVariableWidth:true
      */
     verifyColumnValues(sizer.columns().get("map"), 54, 58, 5, 9, 50, 90, 10, 
10, 1, false);
 
@@ -611,7 +613,7 @@ public class TestRecordBatchSizer extends SubOperatorTest {
      * netSizePerEntry: 4*2+1*2+4*2+4,
      * totalDataSize:5*2*10, totalNetSize:netSizePerEntry*2,
      * valueCount:10,
-     * elementCount:20, estElementCountPerArray:2, isVariableWidth:true
+     * elementCount:20, cardinality:2, isVariableWidth:true
      */
     verifyColumnValues(sizer.columns().get("map"), 54,62, 10, 22, 100, 220, 
10, 20, 2, false);
 
@@ -716,7 +718,7 @@ public class TestRecordBatchSizer extends SubOperatorTest {
      * netSizePerEntry: 4*2+1*2+4*2,
      * totalDataSize:5*2*10, totalNetSize:netSizePerEntry*2,
      * valueCount:10,
-     * elementCount:10, estElementCountPerArray:1, isVariableWidth:true
+     * elementCount:10, cardinality:1, isVariableWidth:true
      */
     verifyColumnValues(sizer.columns().get("map"), 108, 116, 10, 18, 100, 180, 
10, 10, 1, false);
 
@@ -805,6 +807,699 @@ public class TestRecordBatchSizer extends SubOperatorTest 
{
 
   }
 
+  @Test
+  public void testEmptyBatchFixedWidth() {
+    BatchSchema schema = new SchemaBuilder().add("a", 
MinorType.BIGINT).add("b", MinorType.FLOAT8).build();
+    RowSetBuilder builder = fixture.rowSetBuilder(schema);
+    RowSet rows = builder.build();
+
+    // Run the record batch sizer on the resulting batch.
+    RecordBatchSizer sizer = new RecordBatchSizer(rows.container());
+    assertEquals(2, sizer.columns().size());
+
+    ColumnSize aColumn = sizer.columns().get("a");
+
+    /**
+     * stdDataSize:8, stdNetSize:8, dataSizePerEntry:0, netSizePerEntry:0,
+     * totalDataSize:0, totalNetSize:0, valueCount:0,
+     * elementCount:0, cardinality:0, isVariableWidth:false
+     */
+    verifyColumnValues(aColumn, 8, 8, 0, 0, 0, 0, 0, 0, 0, false);
+
+    ColumnSize bColumn = sizer.columns().get("b");
+    verifyColumnValues(bColumn, 8, 8, 0, 0, 0, 0, 0, 0, 0,false);
+
+    // Verify memory allocation is done correctly based on std size for empty 
batch.
+    SingleRowSet empty = fixture.rowSet(schema);
+    VectorAccessible accessible = empty.vectorAccessible();
+
+    for (VectorWrapper<?> vw : accessible) {
+      ValueVector v = vw.getValueVector();
+      RecordBatchSizer.ColumnSize colSize = 
sizer.getColumn(v.getField().getName());
+
+      // Allocates to nearest power of two.
+      colSize.allocateVector(v, testRowCount);
+      assertEquals((Integer.highestOneBit(testRowCount) << 1), 
v.getValueCapacity());
+      v.clear();
+
+      // Allocates the same as value passed since it is already power of two.
+      colSize.allocateVector(v, testRowCountPowerTwo);
+      assertEquals(testRowCountPowerTwo, v.getValueCapacity());
+      v.clear();
+
+      // Allocate for max rows.
+      colSize.allocateVector(v, ValueVector.MAX_ROW_COUNT);
+      assertEquals(ValueVector.MAX_ROW_COUNT, v.getValueCapacity());
+      v.clear();
+
+      // Allocate for 0 rows. should atleast do allocation for 1 row.
+      colSize.allocateVector(v,  0);
+      assertEquals(ValueVector.MIN_ROW_COUNT, v.getValueCapacity());
+      v.clear();
+    }
+
+    rows.clear();
+    empty.clear();
+
+  }
+
+  @Test
+  public void testEmptyBatchRepeatedFixedWidth() {
+    BatchSchema schema = new SchemaBuilder().addArray("a", 
MinorType.BIGINT).addArray("b", MinorType.FLOAT8).build();
+    RowSetBuilder builder = fixture.rowSetBuilder(schema);
+    RowSet rows = builder.build();
+
+    // Run the record batch sizer on the resulting batch.
+    RecordBatchSizer sizer = new RecordBatchSizer(rows.container());
+    assertEquals(2, sizer.columns().size());
+
+    /**
+     * stdDataSize:8*5, stdNetSize:8*5+4, dataSizePerEntry:0, 
netSizePerEntry:0,
+     * totalDataSize:0, totalNetSize:0, valueCount:0,
+     * elementCount:0, cardinality:0, isVariableWidth:false
+     */
+    verifyColumnValues(sizer.columns().get("a"),
+      40, 44, 0, 0, 0, 0, 0, 0, 0, false);
+
+    verifyColumnValues(sizer.columns().get("b"),
+      40, 44, 0, 0, 0, 0, 0, 0, 0, false);
+
+    // Verify memory allocation is done correctly based on std size for empty 
batch.
+    SingleRowSet empty = fixture.rowSet(schema);
+    VectorAccessible accessible = empty.vectorAccessible();
+    UInt4Vector offsetVector;
+    ValueVector dataVector;
+    for (VectorWrapper<?> vw : accessible) {
+      ValueVector v = vw.getValueVector();
+      RecordBatchSizer.ColumnSize colSize = 
sizer.getColumn(v.getField().getName());
+
+      // Allocates to nearest power of two
+      colSize.allocateVector(v, testRowCount);
+      offsetVector = ((RepeatedValueVector) v).getOffsetVector();
+      assertEquals((Integer.highestOneBit(testRowCount) << 1), 
offsetVector.getValueCapacity());
+      dataVector = ((RepeatedValueVector) v).getDataVector();
+      assertEquals(Integer.highestOneBit( testRowCount * STD_REPETITION_FACTOR 
 << 1), dataVector.getValueCapacity());
+      v.clear();
+
+      // Allocates the same as value passed since it is already power of two.
+      // -1 is done for adjustment needed for offset vector.
+      colSize.allocateVector(v, testRowCountPowerTwo - 1);
+      offsetVector = ((RepeatedValueVector) v).getOffsetVector();
+      assertEquals(testRowCountPowerTwo, offsetVector.getValueCapacity());
+      dataVector = ((RepeatedValueVector) v).getDataVector();
+      assertEquals(Integer.highestOneBit((testRowCountPowerTwo -1) * 
STD_REPETITION_FACTOR) << 1, dataVector.getValueCapacity());
+      v.clear();
+
+      // Allocate for max rows.
+      colSize.allocateVector(v, ValueVector.MAX_ROW_COUNT - 1);
+      offsetVector = ((RepeatedValueVector) v).getOffsetVector();
+      assertEquals(ValueVector.MAX_ROW_COUNT, offsetVector.getValueCapacity());
+      dataVector = ((RepeatedValueVector) v).getDataVector();
+      assertEquals(Integer.highestOneBit(((ValueVector.MAX_ROW_COUNT - 1)* 
STD_REPETITION_FACTOR << 1)), dataVector.getValueCapacity());
+      v.clear();
+
+      // Allocate for 0 rows. should atleast do allocation for 1 row.
+      colSize.allocateVector(v, 0);
+      offsetVector = ((RepeatedValueVector) v).getOffsetVector();
+      assertEquals(ValueVector.MIN_ROW_COUNT + 1, 
offsetVector.getValueCapacity());
+      dataVector = ((RepeatedValueVector) v).getDataVector();
+      assertEquals(ValueVector.MIN_ROW_COUNT, dataVector.getValueCapacity());
+      v.clear();
+    }
+
+    empty.clear();
+    rows.clear();
+  }
+
+  @Test
+  public void testEmptyBatchNullableFixedWidth() {
+    BatchSchema schema = new SchemaBuilder().addNullable("a", 
MinorType.BIGINT).addNullable("b", MinorType.FLOAT8).build();
+    RowSetBuilder builder = fixture.rowSetBuilder(schema);
+    RowSet rows = builder.build();
+
+    // Run the record batch sizer on the resulting batch.
+    RecordBatchSizer sizer = new RecordBatchSizer(rows.container());
+    assertEquals(2, sizer.columns().size());
+
+    ColumnSize aColumn = sizer.columns().get("a");
+    ColumnSize bColumn = sizer.columns().get("b");
+
+    /**
+     * stdDataSize:8, stdNetSize:8+1, dataSizePerEntry:0, netSizePerEntry:0,
+     * totalDataSize:0, totalNetSize:0, valueCount:0,
+     * elementCount:0, cardinality:0, isVariableWidth:false
+     */
+    verifyColumnValues(aColumn,
+      8, 9, 0, 0, 0, 0, 0, 0, 0, false);
+
+    verifyColumnValues(bColumn,
+      8, 9, 0, 0, 0, 0, 0, 0, 0, false);
+
+    // Verify memory allocation is done correctly based on std size for empty 
batch.
+    SingleRowSet empty = fixture.rowSet(schema);
+    VectorAccessible accessible = empty.vectorAccessible();
+    ValueVector bitVector, valueVector;
+
+    for (VectorWrapper<?> vw : accessible) {
+      ValueVector v = vw.getValueVector();
+      RecordBatchSizer.ColumnSize colSize = 
sizer.getColumn(v.getField().getName());
+
+      // Allocates to nearest power of two
+      colSize.allocateVector(v, testRowCount);
+      bitVector = ((NullableVector) v).getBitsVector();
+      assertEquals((Integer.highestOneBit(testRowCount) << 1), 
bitVector.getValueCapacity());
+      valueVector = ((NullableVector) v).getValuesVector();
+      assertEquals(Integer.highestOneBit(testRowCount << 1), 
valueVector.getValueCapacity());
+      v.clear();
+
+      // Allocates the same as value passed since it is already power of two.
+      colSize.allocateVector(v, testRowCountPowerTwo);
+      bitVector = ((NullableVector) v).getBitsVector();
+      assertEquals(testRowCountPowerTwo, bitVector.getValueCapacity());
+      valueVector = ((NullableVector) v).getValuesVector();
+      assertEquals(testRowCountPowerTwo, valueVector.getValueCapacity());
+      v.clear();
+
+      // Allocate for max rows.
+      colSize.allocateVector(v, ValueVector.MAX_ROW_COUNT);
+      bitVector = ((NullableVector) v).getBitsVector();
+      assertEquals(ValueVector.MAX_ROW_COUNT, bitVector.getValueCapacity());
+      valueVector = ((NullableVector) v).getValuesVector();
+      assertEquals(ValueVector.MAX_ROW_COUNT, valueVector.getValueCapacity());
+      v.clear();
+
+      // Allocate for 0 rows. should atleast do allocation for 1 row.
+      colSize.allocateVector(v, 0);
+      bitVector = ((NullableVector) v).getBitsVector();
+      assertEquals(ValueVector.MIN_ROW_COUNT, bitVector.getValueCapacity());
+      valueVector = ((NullableVector) v).getValuesVector();
+      assertEquals(ValueVector.MIN_ROW_COUNT, valueVector.getValueCapacity());
+      v.clear();
+    }
+
+    empty.clear();
+    rows.clear();
+  }
+
+  @Test
+  public void testEmptyBatchVariableWidth() {
+    BatchSchema schema = new SchemaBuilder().add("a", 
MinorType.VARCHAR).build();
+    RowSetBuilder builder = fixture.rowSetBuilder(schema);
+    RowSet rows = builder.build();
+
+    // Run the record batch sizer on the resulting batch.
+    RecordBatchSizer sizer = new RecordBatchSizer(rows.container());
+    assertEquals(1, sizer.columns().size());
+
+    ColumnSize aColumn = sizer.columns().get("a");
+
+    /**
+     * stdDataSize:50, stdNetSize:50+4, dataSizePerEntry:0, netSizePerEntry:0,
+     * totalDataSize:0, totalNetSize:0, valueCount:0,
+     * elementCount:0, cardinality:0, isVariableWidth:true
+     */
+    verifyColumnValues(aColumn,
+      50, 54, 0, 0, 0, 0, 0, 0, 0, true);
+
+    // Verify memory allocation is done correctly based on std size for empty 
batch.
+    SingleRowSet empty = fixture.rowSet(schema);
+    VectorAccessible accessible = empty.vectorAccessible();
+    UInt4Vector offsetVector;
+
+    for (VectorWrapper<?> vw : accessible) {
+      ValueVector v = vw.getValueVector();
+      RecordBatchSizer.ColumnSize colSize = 
sizer.getColumn(v.getField().getName());
+
+      // Allocates to nearest power of two
+      colSize.allocateVector(v, testRowCount);
+      offsetVector = ((VariableWidthVector)v).getOffsetVector();
+      assertEquals((Integer.highestOneBit(testRowCount) << 1), 
offsetVector.getValueCapacity());
+      assertEquals(Integer.highestOneBit(testRowCount  << 1)-1, 
v.getValueCapacity());
+      v.clear();
+
+      // Allocates the same as value passed since it is already power of two.
+      // -1 is done for adjustment needed for offset vector.
+      colSize.allocateVector(v, testRowCountPowerTwo - 1);
+      offsetVector = ((VariableWidthVector)v).getOffsetVector();
+      assertEquals(testRowCountPowerTwo, offsetVector.getValueCapacity());
+      assertEquals(testRowCountPowerTwo - 1, v.getValueCapacity());
+      v.clear();
+
+      // Allocate for max rows.
+      colSize.allocateVector(v, ValueVector.MAX_ROW_COUNT - 1);
+      offsetVector = ((VariableWidthVector)v).getOffsetVector();
+      assertEquals(ValueVector.MAX_ROW_COUNT, offsetVector.getValueCapacity());
+      assertEquals(ValueVector.MAX_ROW_COUNT - 1, v.getValueCapacity());
+      v.clear();
+
+      // Allocate for 0 rows. should atleast do allocation for 1 row.
+      colSize.allocateVector(v, 0);
+      offsetVector = ((VariableWidthVector)v).getOffsetVector();
+      assertEquals(ValueVector.MIN_ROW_COUNT + 1, 
offsetVector.getValueCapacity());
+      assertEquals(ValueVector.MIN_ROW_COUNT, v.getValueCapacity());
+      v.clear();
+    }
+
+    empty.clear();
+    rows.clear();
+  }
+
+  @Test
+  public void testEmptyBatchRepeatedVariableWidth() {
+    BatchSchema schema = new SchemaBuilder().addArray("b", 
MinorType.VARCHAR).build();
+    RowSetBuilder builder = fixture.rowSetBuilder(schema);
+    RowSet rows = builder.build();
+
+    // Run the record batch sizer on the resulting batch.
+    RecordBatchSizer sizer = new RecordBatchSizer(rows.container());
+    assertEquals(1, sizer.columns().size());
+
+    ColumnSize bColumn = sizer.columns().get("b");
+
+    /**
+     * stdDataSize:50*5, stdNetSize:50*5+4*10+4, dataSizePerEntry:0, 
netSizePerEntry:0,
+     * totalDataSize:0, totalNetSize:0, valueCount:0,
+     * elementCount:0, cardinality:0, isVariableWidth:true
+     */
+    verifyColumnValues(bColumn, 250, 274, 0, 0, 0, 0, 0, 0, 0,true);
+
+    // Verify memory allocation is done correctly based on std size for empty 
batch.
+    SingleRowSet empty = fixture.rowSet(schema);
+    VectorAccessible accessible = empty.vectorAccessible();
+
+    for (VectorWrapper<?> vw : accessible) {
+      ValueVector v = vw.getValueVector();
+      RecordBatchSizer.ColumnSize colSize = 
sizer.getColumn(v.getField().getName());
+
+      // Allocates to nearest power of two
+      colSize.allocateVector(v, testRowCount - 1);
+      UInt4Vector offsetVector = ((RepeatedValueVector)v).getOffsetVector();
+      assertEquals(Integer.highestOneBit(testRowCount) << 1, 
offsetVector.getValueCapacity());
+      VariableWidthVector vwVector = ((VariableWidthVector) 
((RepeatedValueVector) v).getDataVector());
+      offsetVector = vwVector.getOffsetVector();
+      assertEquals((Integer.highestOneBit((testRowCount-1) * 
STD_REPETITION_FACTOR) << 1), offsetVector.getValueCapacity());
+      assertEquals(Integer.highestOneBit((testRowCount-1) * 
STD_REPETITION_FACTOR  << 1)-1, vwVector.getValueCapacity());
+      v.clear();
+
+      // Allocates the same as value passed since it is already power of two.
+      colSize.allocateVector(v, testRowCountPowerTwo);
+      offsetVector = ((RepeatedValueVector)v).getOffsetVector();
+      assertEquals((Integer.highestOneBit(testRowCountPowerTwo) << 1), 
offsetVector.getValueCapacity());
+      vwVector = ((VariableWidthVector) ((RepeatedValueVector) 
v).getDataVector());
+      offsetVector = vwVector.getOffsetVector();
+      assertEquals((Integer.highestOneBit((int)(testRowCountPowerTwo * 
STD_REPETITION_FACTOR)) << 1), offsetVector.getValueCapacity());
+      assertEquals(Integer.highestOneBit(testRowCountPowerTwo * 
STD_REPETITION_FACTOR  << 1)-1, vwVector.getValueCapacity());
+      v.clear();
+
+      // Allocate for max rows.
+      colSize.allocateVector(v, ValueVector.MAX_ROW_COUNT);
+      offsetVector = ((RepeatedValueVector)v).getOffsetVector();
+      assertEquals(ValueVector.MAX_ROW_COUNT << 1, 
offsetVector.getValueCapacity());
+      vwVector = ((VariableWidthVector) ((RepeatedValueVector) 
v).getDataVector());
+      offsetVector = vwVector.getOffsetVector();
+      assertEquals((Integer.highestOneBit((int) (ValueVector.MAX_ROW_COUNT * 
STD_REPETITION_FACTOR)) << 1), offsetVector.getValueCapacity());
+      assertEquals(Integer.highestOneBit(ValueVector.MAX_ROW_COUNT * 
STD_REPETITION_FACTOR  << 1)-1, vwVector.getValueCapacity());
+      v.clear();
+
+      // Allocate for 0 rows. should atleast do allocation for 1 row.
+      colSize.allocateVector(v,  0);
+      offsetVector = ((RepeatedValueVector)v).getOffsetVector();
+      assertEquals(ValueVector.MIN_ROW_COUNT + 1, 
offsetVector.getValueCapacity());
+      vwVector = ((VariableWidthVector) ((RepeatedValueVector) 
v).getDataVector());
+      offsetVector = vwVector.getOffsetVector();
+      assertEquals(ValueVector.MIN_ROW_COUNT + 1, 
offsetVector.getValueCapacity());
+      assertEquals(ValueVector.MIN_ROW_COUNT, vwVector.getValueCapacity());
+      v.clear();
+    }
+
+    empty.clear();
+    rows.clear();
+  }
+
+  @Test
+  public void testEmptyBatchNullableVariableWidth() {
+    BatchSchema schema = new SchemaBuilder().addNullable("b", 
MinorType.VARCHAR).build();
+    RowSetBuilder builder = fixture.rowSetBuilder(schema);
+    RowSet rows = builder.build();
+
+    // Run the record batch sizer on the resulting batch.
+    RecordBatchSizer sizer = new RecordBatchSizer(rows.container());
+    assertEquals(1, sizer.columns().size());
+
+    /**
+     * stdDataSize:50, stdNetSize:50+4+1, dataSizePerEntry:0,
+     * netSizePerEntry:0,
+     * totalDataSize:0, totalNetSize:0,
+     * valueCount:0,
+     * elementCount:0, cardinality:0, isVariableWidth:true
+     */
+    verifyColumnValues(sizer.columns().get("b"),
+      50, 55, 0, 0, 0, 0, 0, 0, 0,true);
+
+    // Verify memory allocation is done correctly based on std size for empty 
batch.
+    SingleRowSet empty = fixture.rowSet(schema);
+    VectorAccessible accessible = empty.vectorAccessible();
+    ValueVector bitVector;
+    VariableWidthVector vwVector;
+    UInt4Vector offsetVector;
+
+    for (VectorWrapper<?> vw : accessible) {
+      ValueVector v = vw.getValueVector();
+      RecordBatchSizer.ColumnSize colSize = 
sizer.getColumn(v.getField().getName());
+
+      // Allocates to nearest power of two
+      colSize.allocateVector(v, testRowCount);
+      bitVector = ((NullableVector) v).getBitsVector();
+      assertEquals((Integer.highestOneBit(testRowCount) << 1), 
bitVector.getValueCapacity());
+      vwVector = (VariableWidthVector) ((NullableVector) v).getValuesVector();
+      offsetVector = vwVector.getOffsetVector();
+      assertEquals((Integer.highestOneBit(testRowCount) << 1), 
offsetVector.getValueCapacity());
+      assertEquals(Integer.highestOneBit(testRowCount  << 1)-1, 
vwVector.getValueCapacity());
+
+      // Allocates the same as value passed since it is already power of two.
+      colSize.allocateVector(v,  testRowCountPowerTwo-1);
+      bitVector = ((NullableVector) v).getBitsVector();
+      assertEquals(Integer.highestOneBit(testRowCountPowerTwo), 
bitVector.getValueCapacity());
+      vwVector = (VariableWidthVector) ((NullableVector) v).getValuesVector();
+      offsetVector = vwVector.getOffsetVector();
+      assertEquals(testRowCountPowerTwo, offsetVector.getValueCapacity());
+      assertEquals(Integer.highestOneBit(testRowCountPowerTwo)-1, 
vwVector.getValueCapacity());
+
+      // Allocate for max rows.
+      colSize.allocateVector(v, ValueVector.MAX_ROW_COUNT-1);
+      bitVector = ((NullableVector) v).getBitsVector();
+      assertEquals((Integer.highestOneBit(ValueVector.MAX_ROW_COUNT)), 
bitVector.getValueCapacity());
+      vwVector = (VariableWidthVector) ((NullableVector) v).getValuesVector();
+      offsetVector = vwVector.getOffsetVector();
+      assertEquals(ValueVector.MAX_ROW_COUNT, offsetVector.getValueCapacity());
+      assertEquals(ValueVector.MAX_ROW_COUNT-1, vwVector.getValueCapacity());
+
+      // Allocate for 0 rows. should atleast do allocation for 1 row.
+      colSize.allocateVector(v,  0);
+      bitVector = ((NullableVector) v).getBitsVector();
+      assertEquals((Integer.highestOneBit(ValueVector.MIN_ROW_COUNT)), 
bitVector.getValueCapacity());
+      vwVector = (VariableWidthVector) ((NullableVector) v).getValuesVector();
+      offsetVector = vwVector.getOffsetVector();
+      assertEquals(ValueVector.MIN_ROW_COUNT+1, 
offsetVector.getValueCapacity());
+      assertEquals(ValueVector.MIN_ROW_COUNT, vwVector.getValueCapacity());
+    }
+
+    empty.clear();
+    rows.clear();
+  }
+
+  @Test
+  public void testEmptyBatchMap() {
+    BatchSchema schema = new SchemaBuilder()
+      .addMap("map")
+      .add("key", MinorType.INT)
+      .add("value", MinorType.VARCHAR)
+      .resumeSchema()
+      .build();
+
+    RowSetBuilder builder = fixture.rowSetBuilder(schema);
+    RowSet rows = builder.build();
+
+    // Run the record batch sizer on the resulting batch.
+    RecordBatchSizer sizer = new RecordBatchSizer(rows.container());
+    assertEquals(1, sizer.columns().size());
+
+    /**
+     * stdDataSize:50+4, stdNetSize:50+4+4, dataSizePerEntry:0,
+     * netSizePerEntry:0,
+     * totalDataSize:0, totalNetSize:0,
+     * valueCount:0,
+     * elementCount:0, cardinality:0, isVariableWidth:true
+     */
+    verifyColumnValues(sizer.columns().get("map"), 54, 58, 0, 0, 0, 0, 0, 0, 
0, false);
+
+    // Verify memory allocation is done correctly based on std size for empty 
batch.
+    SingleRowSet empty = fixture.rowSet(schema);
+    VectorAccessible accessible = empty.vectorAccessible();
+
+    for (VectorWrapper<?> vw : accessible) {
+      ValueVector v = vw.getValueVector();
+      RecordBatchSizer.ColumnSize colSize = 
sizer.getColumn(v.getField().getName());
+
+      // Allocates to nearest power of two
+      colSize.allocateVector(v, testRowCount);
+      MapVector mapVector = (MapVector)v;
+      ValueVector keyVector = mapVector.getChild("key");
+      ValueVector valueVector1 = mapVector.getChild("value");
+      assertEquals((Integer.highestOneBit(testRowCount) << 1), 
keyVector.getValueCapacity());
+      UInt4Vector offsetVector = 
((VariableWidthVector)valueVector1).getOffsetVector();
+      assertEquals((Integer.highestOneBit(testRowCount) << 1), 
offsetVector.getValueCapacity());
+      assertEquals(Integer.highestOneBit(testRowCount  << 1)-1, 
valueVector1.getValueCapacity());
+
+      // Allocates the same as value passed since it is already power of two.
+      colSize.allocateVector(v, testRowCountPowerTwo-1);
+      mapVector = (MapVector)v;
+      keyVector = mapVector.getChild("key");
+      valueVector1 = mapVector.getChild("value");
+      assertEquals((Integer.highestOneBit(testRowCountPowerTwo -1) << 1), 
keyVector.getValueCapacity());
+      offsetVector = ((VariableWidthVector)valueVector1).getOffsetVector();
+      assertEquals(testRowCountPowerTwo, offsetVector.getValueCapacity());
+      assertEquals(Integer.highestOneBit(testRowCountPowerTwo)-1, 
valueVector1.getValueCapacity());
+
+      // Allocate for max rows.
+      colSize.allocateVector(v, ValueVector.MAX_ROW_COUNT -1);
+      mapVector = (MapVector)v;
+      keyVector = mapVector.getChild("key");
+      valueVector1 = mapVector.getChild("value");
+      assertEquals(ValueVector.MAX_ROW_COUNT, keyVector.getValueCapacity());
+      offsetVector = ((VariableWidthVector)valueVector1).getOffsetVector();
+      assertEquals(ValueVector.MAX_ROW_COUNT, offsetVector.getValueCapacity());
+      assertEquals(ValueVector.MAX_ROW_COUNT-1, 
valueVector1.getValueCapacity());
+
+      // Allocate for 0 rows. should atleast do allocation for 1 row.
+      colSize.allocateVector(v, 0);
+      mapVector = (MapVector)v;
+      keyVector = mapVector.getChild("key");
+      valueVector1 = mapVector.getChild("value");
+      assertEquals(ValueVector.MIN_ROW_COUNT, keyVector.getValueCapacity());
+      offsetVector = ((VariableWidthVector)valueVector1).getOffsetVector();
+      assertEquals(ValueVector.MIN_ROW_COUNT+1, 
offsetVector.getValueCapacity());
+      assertEquals(ValueVector.MIN_ROW_COUNT, valueVector1.getValueCapacity());
+
+      v.clear();
+    }
+
+    empty.clear();
+    rows.clear();
+
+  }
+
+  @Test
+  public void testEmptyBatchRepeatedMap() {
+    BatchSchema schema = new SchemaBuilder().addMapArray("map").
+      add("key", MinorType.INT).
+      add("value", MinorType.VARCHAR).
+      resumeSchema().build();
+
+    RowSetBuilder builder = fixture.rowSetBuilder(schema);
+    RowSet rows = builder.build();
+
+    // Run the record batch sizer on the resulting batch.
+    RecordBatchSizer sizer = new RecordBatchSizer(rows.container());
+    assertEquals(1, sizer.columns().size());
+
+    /**
+     * stdDataSize:50+4, stdNetSize:50+4+4+4, dataSizePerEntry:0,
+     * netSizePerEntry: 0,
+     * totalDataSize:0, totalNetSize:0,
+     * valueCount:0,
+     * elementCount:0, cardinality:0, isVariableWidth:true
+     */
+    verifyColumnValues(sizer.columns().get("map"), 54,62, 0, 0, 0, 0, 0, 0, 0, 
false);
+
+    // Verify memory allocation is done correctly based on std size for empty 
batch.
+    SingleRowSet empty = fixture.rowSet(schema);
+    VectorAccessible accessible = empty.vectorAccessible();
+
+    UInt4Vector offsetVector;
+
+    for (VectorWrapper<?> vw : accessible) {
+      ValueVector v = vw.getValueVector();
+      RecordBatchSizer.ColumnSize colSize = 
sizer.getColumn(v.getField().getName());
+
+      // Allocates to nearest power of two
+      colSize.allocateVector(v, testRowCount);
+      RepeatedMapVector mapVector = (RepeatedMapVector)v;
+
+      offsetVector = ((RepeatedValueVector)mapVector).getOffsetVector();
+      assertEquals((Integer.highestOneBit(testRowCount) << 1), 
offsetVector.getValueCapacity());
+
+      ValueVector keyVector = mapVector.getChild("key");
+      ValueVector valueVector1 = mapVector.getChild("value");
+      assertEquals(((Integer.highestOneBit(testRowCount * 
STD_REPETITION_FACTOR) << 1)), keyVector.getValueCapacity());
+      offsetVector = ((VariableWidthVector)valueVector1).getOffsetVector();
+      assertEquals((Integer.highestOneBit(testRowCount * 
STD_REPETITION_FACTOR) << 1) , offsetVector.getValueCapacity());
+      assertEquals(Integer.highestOneBit(testRowCount * STD_REPETITION_FACTOR 
<< 1)  - 1, valueVector1.getValueCapacity());
+
+      // Allocates the same as value passed since it is already power of two.
+      colSize.allocateVector(v, testRowCountPowerTwo-1);
+      mapVector = (RepeatedMapVector)v;
+
+      offsetVector = ((RepeatedValueVector)mapVector).getOffsetVector();
+      assertEquals(testRowCountPowerTwo, offsetVector.getValueCapacity());
+
+      keyVector = mapVector.getChild("key");
+      valueVector1 = mapVector.getChild("value");
+      assertEquals(Integer.highestOneBit(testRowCountPowerTwo * 
STD_REPETITION_FACTOR) << 1, keyVector.getValueCapacity());
+      offsetVector = ((VariableWidthVector)valueVector1).getOffsetVector();
+      assertEquals(Integer.highestOneBit((int)(testRowCountPowerTwo * 
STD_REPETITION_FACTOR)) << 1, offsetVector.getValueCapacity());
+      assertEquals((Integer.highestOneBit(testRowCountPowerTwo * 
STD_REPETITION_FACTOR << 1)) - 1, valueVector1.getValueCapacity());
+
+      // Allocate for max rows.
+      colSize.allocateVector(v,  ValueVector.MAX_ROW_COUNT -1);
+      mapVector = (RepeatedMapVector)v;
+
+      offsetVector = ((RepeatedValueVector)mapVector).getOffsetVector();
+      assertEquals(ValueVector.MAX_ROW_COUNT, offsetVector.getValueCapacity());
+
+      keyVector = mapVector.getChild("key");
+      valueVector1 = mapVector.getChild("value");
+      assertEquals(Integer.highestOneBit(ValueVector.MAX_ROW_COUNT * 
STD_REPETITION_FACTOR) << 1, keyVector.getValueCapacity());
+      offsetVector = ((VariableWidthVector)valueVector1).getOffsetVector();
+      assertEquals(Integer.highestOneBit(ValueVector.MAX_ROW_COUNT * 
STD_REPETITION_FACTOR) << 1, offsetVector.getValueCapacity());
+      assertEquals((Integer.highestOneBit(ValueVector.MAX_ROW_COUNT * 
STD_REPETITION_FACTOR) << 1) - 1, valueVector1.getValueCapacity());
+
+      // Allocate for 0 rows. should atleast do allocation for 1 row.
+      colSize.allocateVector(v, 0);
+      mapVector = (RepeatedMapVector)v;
+
+      offsetVector = ((RepeatedValueVector)mapVector).getOffsetVector();
+      assertEquals(ValueVector.MIN_ROW_COUNT, offsetVector.getValueCapacity());
+
+      keyVector = mapVector.getChild("key");
+      valueVector1 = mapVector.getChild("value");
+      assertEquals(ValueVector.MIN_ROW_COUNT, keyVector.getValueCapacity());
+      offsetVector = ((VariableWidthVector)valueVector1).getOffsetVector();
+      assertEquals(ValueVector.MIN_ROW_COUNT+1, 
offsetVector.getValueCapacity());
+      assertEquals(ValueVector.MIN_ROW_COUNT, valueVector1.getValueCapacity());
+
+      v.clear();
+    }
+
+    empty.clear();
+    rows.clear();
+  }
+
+  @Test
+  public void testEmptyBatchNestedMap() {
+    BatchSchema schema = new SchemaBuilder()
+      .addMap("map")
+      .add("key", MinorType.INT)
+      .add("value", MinorType.VARCHAR)
+      .addMap("childMap")
+      .add("childKey", MinorType.INT)
+      .add("childValue", MinorType.VARCHAR)
+      .resumeMap()
+      .resumeSchema()
+      .build();
+
+    RowSetBuilder builder = fixture.rowSetBuilder(schema);
+    RowSet rows = builder.build();
+
+    // Run the record batch sizer on the resulting batch.
+    RecordBatchSizer sizer = new RecordBatchSizer(rows.container());
+    assertEquals(1, sizer.columns().size());
+
+    /**
+     * stdDataSize:(50+4)*2, stdNetSize:(50+4)*2+4+4, dataSizePerEntry:0,
+     * netSizePerEntry: 0,
+     * totalDataSize:0, totalNetSize:0,
+     * valueCount:0,
+     * elementCount:0, cardinality:0, isVariableWidth:true
+     */
+    verifyColumnValues(sizer.columns().get("map"), 108, 116, 0, 0, 0, 0, 0, 0, 
0, false);
+
+    // Verify memory allocation is done correctly based on std size for empty 
batch.
+    SingleRowSet empty = fixture.rowSet(schema);
+    VectorAccessible accessible = empty.vectorAccessible();
+
+    UInt4Vector offsetVector;
+
+    for (VectorWrapper<?> vw : accessible) {
+      ValueVector v = vw.getValueVector();
+      RecordBatchSizer.ColumnSize colSize = 
sizer.getColumn(v.getField().getName());
+
+      // Allocates to nearest power of two
+      colSize.allocateVector(v, testRowCount);
+      MapVector mapVector = (MapVector)v;
+      ValueVector keyVector = mapVector.getChild("key");
+      ValueVector valueVector1 = mapVector.getChild("value");
+      assertEquals((Integer.highestOneBit(testRowCount) << 1), 
keyVector.getValueCapacity());
+      offsetVector = ((VariableWidthVector)valueVector1).getOffsetVector();
+      assertEquals((Integer.highestOneBit(testRowCount) << 1), 
offsetVector.getValueCapacity());
+      assertEquals(Integer.highestOneBit(testRowCount  << 1)-1, 
valueVector1.getValueCapacity());
+      MapVector childMapVector = (MapVector) mapVector.getChild("childMap");
+      ValueVector childKeyVector = childMapVector.getChild("childKey");
+      ValueVector childValueVector1 = childMapVector.getChild("childValue");
+      assertEquals((Integer.highestOneBit(testRowCount) << 1), 
childKeyVector.getValueCapacity());
+      offsetVector = ((VariableWidthVector)valueVector1).getOffsetVector();
+      assertEquals((Integer.highestOneBit(testRowCount) << 1), 
offsetVector.getValueCapacity());
+      assertEquals(Integer.highestOneBit(testRowCount  << 1)-1, 
childValueVector1.getValueCapacity());
+
+      // Allocates the same as value passed since it is already power of two.
+      colSize.allocateVector(v, testRowCountPowerTwo-1);
+      mapVector = (MapVector)v;
+      keyVector = mapVector.getChild("key");
+      valueVector1 = mapVector.getChild("value");
+      assertEquals(testRowCountPowerTwo, keyVector.getValueCapacity());
+      offsetVector = ((VariableWidthVector)valueVector1).getOffsetVector();
+      assertEquals(testRowCountPowerTwo, offsetVector.getValueCapacity());
+      assertEquals(testRowCountPowerTwo-1, valueVector1.getValueCapacity());
+      childMapVector = (MapVector) mapVector.getChild("childMap");
+      childKeyVector = childMapVector.getChild("childKey");
+      childValueVector1 = childMapVector.getChild("childValue");
+      assertEquals(testRowCountPowerTwo, childKeyVector.getValueCapacity());
+      offsetVector = ((VariableWidthVector)valueVector1).getOffsetVector();
+      assertEquals(testRowCountPowerTwo, offsetVector.getValueCapacity());
+      assertEquals(testRowCountPowerTwo-1, 
childValueVector1.getValueCapacity());
+
+      // Allocate for max rows.
+      colSize.allocateVector(v, ValueVector.MAX_ROW_COUNT-1);
+      mapVector = (MapVector)v;
+      keyVector = mapVector.getChild("key");
+      valueVector1 = mapVector.getChild("value");
+      assertEquals(ValueVector.MAX_ROW_COUNT, keyVector.getValueCapacity());
+      offsetVector = ((VariableWidthVector)valueVector1).getOffsetVector();
+      assertEquals(ValueVector.MAX_ROW_COUNT, offsetVector.getValueCapacity());
+      assertEquals(ValueVector.MAX_ROW_COUNT-1, 
valueVector1.getValueCapacity());
+      childMapVector = (MapVector) mapVector.getChild("childMap");
+      childKeyVector = childMapVector.getChild("childKey");
+      childValueVector1 = childMapVector.getChild("childValue");
+      assertEquals(ValueVector.MAX_ROW_COUNT, 
childKeyVector.getValueCapacity());
+      offsetVector = ((VariableWidthVector)valueVector1).getOffsetVector();
+      assertEquals(ValueVector.MAX_ROW_COUNT, offsetVector.getValueCapacity());
+      assertEquals(ValueVector.MAX_ROW_COUNT-1, 
childValueVector1.getValueCapacity());
+
+      // Allocate for 0 rows. should atleast do allocation for 1 row.
+      colSize.allocateVector(v,  0);
+      mapVector = (MapVector)v;
+      keyVector = mapVector.getChild("key");
+      valueVector1 = mapVector.getChild("value");
+      assertEquals(ValueVector.MIN_ROW_COUNT, keyVector.getValueCapacity());
+      offsetVector = ((VariableWidthVector)valueVector1).getOffsetVector();
+      assertEquals(ValueVector.MIN_ROW_COUNT+1, 
offsetVector.getValueCapacity());
+      assertEquals(ValueVector.MIN_ROW_COUNT, valueVector1.getValueCapacity());
+      childMapVector = (MapVector) mapVector.getChild("childMap");
+      childKeyVector = childMapVector.getChild("childKey");
+      childValueVector1 = childMapVector.getChild("childValue");
+      assertEquals(ValueVector.MIN_ROW_COUNT, 
childKeyVector.getValueCapacity());
+      offsetVector = ((VariableWidthVector)valueVector1).getOffsetVector();
+      assertEquals(ValueVector.MIN_ROW_COUNT+1, 
offsetVector.getValueCapacity());
+      assertEquals(ValueVector.MIN_ROW_COUNT, 
childValueVector1.getValueCapacity());
+
+      v.clear();
+    }
+
+    empty.clear();
+    rows.clear();
+
+  }
+
   /**
    * Test to verify that record batch sizer handles the actual empty vectors 
correctly. RowSetBuilder by default
    * allocates Drillbuf of 10bytes for each vector type which makes their 
capacity >0 and not ==0 which will be in

http://git-wip-us.apache.org/repos/asf/drill/blob/f563f382/exec/vector/src/main/java/org/apache/drill/exec/vector/AllocationHelper.java
----------------------------------------------------------------------
diff --git 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/AllocationHelper.java 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/AllocationHelper.java
index 4fd0cbd..b0687d2 100644
--- 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/AllocationHelper.java
+++ 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/AllocationHelper.java
@@ -22,8 +22,10 @@ import 
org.apache.drill.exec.vector.complex.RepeatedVariableWidthVectorLike;
 
 public class AllocationHelper {
 
+  public static final int STD_REPETITION_FACTOR = 5;
+
   public static void allocate(ValueVector vector, int valueCount, int 
bytesPerValue) {
-    allocate(vector, valueCount, bytesPerValue, 5);
+    allocate(vector, valueCount, bytesPerValue, STD_REPETITION_FACTOR);
   }
 
   public static void allocatePrecomputedChildCount(ValueVector vector, int 
valueCount, int bytesPerValue, int childValCount) {

Reply via email to