Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1091#discussion_r162225832
  
    --- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
 ---
    @@ -94,8 +98,54 @@ private void clear() {
         }
       }
     
    +  private class FlattenMemoryManager {
    +    private final int outputRowCount;
    +    private static final int OFFSET_VECTOR_WIDTH = 4;
    +    private static final int WORST_CASE_FRAGMENTATION_FACTOR = 2;
    +    private static final int MAX_NUM_ROWS = 64 * 1024;
    +    private static final int MIN_NUM_ROWS = 1;
    +
    +    private FlattenMemoryManager(RecordBatch incoming, long 
outputBatchSize, SchemaPath flattenColumn) {
    +      // Get sizing information for the batch.
    +      RecordBatchSizer sizer = new RecordBatchSizer(incoming);
    +
    +      final TypedFieldId typedFieldId = 
incoming.getValueVectorId(flattenColumn);
    +      final MaterializedField field = 
incoming.getSchema().getColumn(typedFieldId.getFieldIds()[0]);
    +
    +      // Get column size of flatten column.
    +      RecordBatchSizer.ColumnSize columnSize = 
sizer.getColumn(incoming.getValueAccessorById(field.getValueClass(),
    +          typedFieldId.getFieldIds()).getValueVector(), field.getName());
    +
    +      // Average rowWidth of flatten column
    +      final int avgRowWidthFlattenColumn = 
RecordBatchSizer.safeDivide(columnSize.dataSize, incoming.getRecordCount());
    +
    +      // Average rowWidth excluding the flatten column.
    +      final int avgRowWidthWithOutFlattenColumn = sizer.netRowWidth() - 
avgRowWidthFlattenColumn;
    +
    +      // Average rowWidth of single element in the flatten list.
    +      // subtract the offset vector size from column data size.
    +      final int avgRowWidthSingleFlattenEntry =
    +          RecordBatchSizer.safeDivide(columnSize.dataSize - 
(OFFSET_VECTOR_WIDTH * columnSize.valueCount), columnSize.elementCount);
    +
    +      // Average rowWidth of outgoing batch.
    +      final int avgOutgoingRowWidth = avgRowWidthWithOutFlattenColumn + 
avgRowWidthSingleFlattenEntry;
    +
    +      // Number of rows in outgoing batch
    +      outputRowCount = Math.max(MIN_NUM_ROWS, Math.min(MAX_NUM_ROWS,
    +          
RecordBatchSizer.safeDivide((outputBatchSize/WORST_CASE_FRAGMENTATION_FACTOR), 
avgOutgoingRowWidth)));
    +    }
    --- End diff --
    
    Would be great to log this info in debug mode to aid in tuning. Doing this 
in the sort proved highly valuable.


---

Reply via email to