Github user Ben-Zvi commented on a diff in the pull request: https://github.com/apache/drill/pull/938#discussion_r138433967 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -500,22 +516,45 @@ private void initializeSetup(RecordBatch newIncoming) throws SchemaChangeExcepti */ private void updateEstMaxBatchSize(RecordBatch incoming) { if ( estMaxBatchSize > 0 ) { return; } // no handling of a schema (or varchar) change + // Use the sizer to get the input row width and the length of the longest varchar column RecordBatchSizer sizer = new RecordBatchSizer(incoming); logger.trace("Incoming sizer: {}",sizer); // An empty batch only has the schema, can not tell actual length of varchars // else use the actual varchars length, each capped at 50 (to match the space allocation) - estRowWidth = sizer.rowCount() == 0 ? sizer.stdRowWidth() : sizer.netRowWidthCap50(); - estMaxBatchSize = estRowWidth * MAX_BATCH_SIZE; + long estInputRowWidth = sizer.rowCount() == 0 ? sizer.stdRowWidth() : sizer.netRowWidthCap50(); // Get approx max (varchar) column width to get better memory allocation - maxColumnWidth = Math.max(sizer.maxSize(), VARIABLE_MIN_WIDTH_VALUE_SIZE); + maxColumnWidth = Math.max(sizer.maxAvgColumnSize(), VARIABLE_MIN_WIDTH_VALUE_SIZE); maxColumnWidth = Math.min(maxColumnWidth, VARIABLE_MAX_WIDTH_VALUE_SIZE); - logger.trace("{} phase. Estimated row width: {} batch size: {} memory limit: {} max column width: {}", - isTwoPhase?(is2ndPhase?"2nd":"1st"):"Single",estRowWidth,estMaxBatchSize,memoryLimit,maxColumnWidth); + // + // Calculate the estimated max (internal) batch (i.e. Keys batch + Values batch) size + // (which is used to decide when to spill) + // Also calculate the values batch size (used as a reserve to overcome an OOM) + // + Iterator<VectorWrapper<?>> outgoingIter = outContainer.iterator(); + int fieldId = 0; + while (outgoingIter.hasNext()) { + ValueVector vv = outgoingIter.next().getValueVector(); + MaterializedField mr = vv.getField(); + int fieldSize = vv instanceof VariableWidthVector ? maxColumnWidth : + TypeHelper.getSize(mr.getType()); + estRowWidth += fieldSize; + estOutputRowWidth += fieldSize; + if ( fieldId < numGroupByOutFields ) { fieldId++; } + else { estValuesRowWidth += fieldSize; } + } + // multiply by the max number of rows in a batch to get the final estimated max size + estMaxBatchSize = Math.max(estRowWidth, estInputRowWidth) * MAX_BATCH_SIZE; --- End diff -- Most of these estimates are for internal "worst case". Only the "outgoing" one is for the outgoing batch (which is also for spilling - which is internal). Anyway all these estimates have nothing to do with _throttling_ the outgoing batch size; that logic was not changed from the original code (likely up to MAX_BATCH_SIZE). Making such a change should be a separate project.
---