Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/938#discussion_r137939287 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -500,22 +516,45 @@ private void initializeSetup(RecordBatch newIncoming) throws SchemaChangeExcepti */ private void updateEstMaxBatchSize(RecordBatch incoming) { if ( estMaxBatchSize > 0 ) { return; } // no handling of a schema (or varchar) change + // Use the sizer to get the input row width and the length of the longest varchar column RecordBatchSizer sizer = new RecordBatchSizer(incoming); logger.trace("Incoming sizer: {}",sizer); // An empty batch only has the schema, can not tell actual length of varchars // else use the actual varchars length, each capped at 50 (to match the space allocation) - estRowWidth = sizer.rowCount() == 0 ? sizer.stdRowWidth() : sizer.netRowWidthCap50(); - estMaxBatchSize = estRowWidth * MAX_BATCH_SIZE; + long estInputRowWidth = sizer.rowCount() == 0 ? sizer.stdRowWidth() : sizer.netRowWidthCap50(); // Get approx max (varchar) column width to get better memory allocation - maxColumnWidth = Math.max(sizer.maxSize(), VARIABLE_MIN_WIDTH_VALUE_SIZE); + maxColumnWidth = Math.max(sizer.maxAvgColumnSize(), VARIABLE_MIN_WIDTH_VALUE_SIZE); maxColumnWidth = Math.min(maxColumnWidth, VARIABLE_MAX_WIDTH_VALUE_SIZE); - logger.trace("{} phase. Estimated row width: {} batch size: {} memory limit: {} max column width: {}", - isTwoPhase?(is2ndPhase?"2nd":"1st"):"Single",estRowWidth,estMaxBatchSize,memoryLimit,maxColumnWidth); + // + // Calculate the estimated max (internal) batch (i.e. Keys batch + Values batch) size + // (which is used to decide when to spill) + // Also calculate the values batch size (used as a reserve to overcome an OOM) + // + Iterator<VectorWrapper<?>> outgoingIter = outContainer.iterator(); + int fieldId = 0; + while (outgoingIter.hasNext()) { + ValueVector vv = outgoingIter.next().getValueVector(); + MaterializedField mr = vv.getField(); + int fieldSize = vv instanceof VariableWidthVector ? maxColumnWidth : + TypeHelper.getSize(mr.getType()); + estRowWidth += fieldSize; + estOutputRowWidth += fieldSize; + if ( fieldId < numGroupByOutFields ) { fieldId++; } + else { estValuesRowWidth += fieldSize; } + } + // multiply by the max number of rows in a batch to get the final estimated max size + estMaxBatchSize = Math.max(estRowWidth, estInputRowWidth) * MAX_BATCH_SIZE; --- End diff -- Here, the output batch size is fixed based on the number of rows. Suppose we had a sort as the output of this operator, and the sort has a memory ceiling of x MB. Can the code here create batches larger than x/2 MB, meaning that that sort is forced to consume batches so large that it can't buffer two and spill? In other words, is there an attempt here to control overall output batch memory use instead of just assuming that we always output `MAX_BATCH_SIZE` rows regardless of memory use?
---