Github user Ben-Zvi commented on a diff in the pull request:

    https://github.com/apache/drill/pull/938#discussion_r138433967
  
    --- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
 ---
    @@ -500,22 +516,45 @@ private void initializeSetup(RecordBatch newIncoming) 
throws SchemaChangeExcepti
        */
       private void updateEstMaxBatchSize(RecordBatch incoming) {
         if ( estMaxBatchSize > 0 ) { return; }  // no handling of a schema (or 
varchar) change
    +    // Use the sizer to get the input row width and the length of the 
longest varchar column
         RecordBatchSizer sizer = new RecordBatchSizer(incoming);
         logger.trace("Incoming sizer: {}",sizer);
         // An empty batch only has the schema, can not tell actual length of 
varchars
         // else use the actual varchars length, each capped at 50 (to match 
the space allocation)
    -    estRowWidth = sizer.rowCount() == 0 ? sizer.stdRowWidth() : 
sizer.netRowWidthCap50();
    -    estMaxBatchSize = estRowWidth * MAX_BATCH_SIZE;
    +    long estInputRowWidth = sizer.rowCount() == 0 ? sizer.stdRowWidth() : 
sizer.netRowWidthCap50();
     
         // Get approx max (varchar) column width to get better memory 
allocation
    -    maxColumnWidth = Math.max(sizer.maxSize(), 
VARIABLE_MIN_WIDTH_VALUE_SIZE);
    +    maxColumnWidth = Math.max(sizer.maxAvgColumnSize(), 
VARIABLE_MIN_WIDTH_VALUE_SIZE);
         maxColumnWidth = Math.min(maxColumnWidth, 
VARIABLE_MAX_WIDTH_VALUE_SIZE);
     
    -    logger.trace("{} phase. Estimated row width: {}  batch size: {}  
memory limit: {}  max column width: {}",
    -        
isTwoPhase?(is2ndPhase?"2nd":"1st"):"Single",estRowWidth,estMaxBatchSize,memoryLimit,maxColumnWidth);
    +    //
    +    // Calculate the estimated max (internal) batch (i.e. Keys batch + 
Values batch) size
    +    // (which is used to decide when to spill)
    +    // Also calculate the values batch size (used as a reserve to overcome 
an OOM)
    +    //
    +    Iterator<VectorWrapper<?>> outgoingIter = outContainer.iterator();
    +    int fieldId = 0;
    +    while (outgoingIter.hasNext()) {
    +      ValueVector vv = outgoingIter.next().getValueVector();
    +      MaterializedField mr = vv.getField();
    +      int fieldSize = vv instanceof VariableWidthVector ? maxColumnWidth :
    +          TypeHelper.getSize(mr.getType());
    +      estRowWidth += fieldSize;
    +      estOutputRowWidth += fieldSize;
    +      if ( fieldId < numGroupByOutFields ) { fieldId++; }
    +      else { estValuesRowWidth += fieldSize; }
    +    }
    +    // multiply by the max number of rows in a batch to get the final 
estimated max size
    +    estMaxBatchSize = Math.max(estRowWidth, estInputRowWidth) * 
MAX_BATCH_SIZE;
    --- End diff --
    
    Most of these estimates are for internal "worst case".  Only the "outgoing" 
one is for the outgoing batch (which is also for spilling - which is internal).
       Anyway all these estimates have nothing to do with _throttling_ the 
outgoing batch size; that logic was not changed from the original code (likely 
up to MAX_BATCH_SIZE). 
      Making such a change should be a separate project. 



---

Reply via email to