Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/938#discussion_r137939287
  
    --- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
 ---
    @@ -500,22 +516,45 @@ private void initializeSetup(RecordBatch newIncoming) 
throws SchemaChangeExcepti
        */
       private void updateEstMaxBatchSize(RecordBatch incoming) {
         if ( estMaxBatchSize > 0 ) { return; }  // no handling of a schema (or 
varchar) change
    +    // Use the sizer to get the input row width and the length of the 
longest varchar column
         RecordBatchSizer sizer = new RecordBatchSizer(incoming);
         logger.trace("Incoming sizer: {}",sizer);
         // An empty batch only has the schema, can not tell actual length of 
varchars
         // else use the actual varchars length, each capped at 50 (to match 
the space allocation)
    -    estRowWidth = sizer.rowCount() == 0 ? sizer.stdRowWidth() : 
sizer.netRowWidthCap50();
    -    estMaxBatchSize = estRowWidth * MAX_BATCH_SIZE;
    +    long estInputRowWidth = sizer.rowCount() == 0 ? sizer.stdRowWidth() : 
sizer.netRowWidthCap50();
     
         // Get approx max (varchar) column width to get better memory 
allocation
    -    maxColumnWidth = Math.max(sizer.maxSize(), 
VARIABLE_MIN_WIDTH_VALUE_SIZE);
    +    maxColumnWidth = Math.max(sizer.maxAvgColumnSize(), 
VARIABLE_MIN_WIDTH_VALUE_SIZE);
         maxColumnWidth = Math.min(maxColumnWidth, 
VARIABLE_MAX_WIDTH_VALUE_SIZE);
     
    -    logger.trace("{} phase. Estimated row width: {}  batch size: {}  
memory limit: {}  max column width: {}",
    -        
isTwoPhase?(is2ndPhase?"2nd":"1st"):"Single",estRowWidth,estMaxBatchSize,memoryLimit,maxColumnWidth);
    +    //
    +    // Calculate the estimated max (internal) batch (i.e. Keys batch + 
Values batch) size
    +    // (which is used to decide when to spill)
    +    // Also calculate the values batch size (used as a reserve to overcome 
an OOM)
    +    //
    +    Iterator<VectorWrapper<?>> outgoingIter = outContainer.iterator();
    +    int fieldId = 0;
    +    while (outgoingIter.hasNext()) {
    +      ValueVector vv = outgoingIter.next().getValueVector();
    +      MaterializedField mr = vv.getField();
    +      int fieldSize = vv instanceof VariableWidthVector ? maxColumnWidth :
    +          TypeHelper.getSize(mr.getType());
    +      estRowWidth += fieldSize;
    +      estOutputRowWidth += fieldSize;
    +      if ( fieldId < numGroupByOutFields ) { fieldId++; }
    +      else { estValuesRowWidth += fieldSize; }
    +    }
    +    // multiply by the max number of rows in a batch to get the final 
estimated max size
    +    estMaxBatchSize = Math.max(estRowWidth, estInputRowWidth) * 
MAX_BATCH_SIZE;
    --- End diff --
    
    Here, the output batch size is fixed based on the number of rows. Suppose 
we had a sort as the output of this operator, and the sort has a memory ceiling 
of x MB. Can the code here create batches larger than x/2 MB, meaning that that 
sort is forced to consume batches so large that it can't buffer two and spill?
    
    In other words, is there an attempt here to control overall output batch 
memory use instead of just assuming that we always output `MAX_BATCH_SIZE` rows 
regardless of memory use?


---

Reply via email to