Github user Ben-Zvi commented on a diff in the pull request:
https://github.com/apache/drill/pull/1101#discussion_r164607265
--- Diff:
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
---
@@ -516,43 +501,48 @@ private void initializeSetup(RecordBatch newIncoming)
throws SchemaChangeExcepti
* @param incoming
*/
private void updateEstMaxBatchSize(RecordBatch incoming) {
- if ( estMaxBatchSize > 0 ) { return; } // no handling of a schema (or
varchar) change
// Use the sizer to get the input row width and the length of the
longest varchar column
- RecordBatchSizer sizer = new RecordBatchSizer(incoming);
- logger.trace("Incoming sizer: {}",sizer);
- // An empty batch only has the schema, can not tell actual length of
varchars
- // else use the actual varchars length, each capped at 50 (to match
the space allocation)
- long estInputRowWidth = sizer.rowCount() == 0 ? sizer.stdRowWidth() :
sizer.netRowWidthCap50();
+ final RecordBatchSizer incomingColumnSizes = new
RecordBatchSizer(incoming);
+ final Map<String, RecordBatchSizer.ColumnSize> columnSizeMap =
incomingColumnSizes.getColumnSizeMap();
+ keySizes = CaseInsensitiveMap.newHashMap();
- // Get approx max (varchar) column width to get better memory
allocation
- maxColumnWidth = Math.max(sizer.maxAvgColumnSize(),
VARIABLE_MIN_WIDTH_VALUE_SIZE);
- maxColumnWidth = Math.min(maxColumnWidth,
VARIABLE_MAX_WIDTH_VALUE_SIZE);
+ logger.trace("Incoming sizer: {}",incomingColumnSizes);
- //
// Calculate the estimated max (internal) batch (i.e. Keys batch +
Values batch) size
// (which is used to decide when to spill)
// Also calculate the values batch size (used as a reserve to overcome
an OOM)
- //
- Iterator<VectorWrapper<?>> outgoingIter = outContainer.iterator();
- int fieldId = 0;
- while (outgoingIter.hasNext()) {
- ValueVector vv = outgoingIter.next().getValueVector();
- MaterializedField mr = vv.getField();
- int fieldSize = vv instanceof VariableWidthVector ? maxColumnWidth :
- TypeHelper.getSize(mr.getType());
- estRowWidth += fieldSize;
- estOutputRowWidth += fieldSize;
- if ( fieldId < numGroupByOutFields ) { fieldId++; }
- else { estValuesRowWidth += fieldSize; }
+
+ for (int columnIndex = 0; columnIndex < numGroupByOutFields;
columnIndex++) {
+ final VectorWrapper vectorWrapper =
outContainer.getValueVector(columnIndex);
+ final String columnName = vectorWrapper.getField().getName();
+ final int columnSize = columnSizeMap.get(columnName).estSize;
+ keySizes.put(columnName, columnSize);
+ estOutputRowWidth += columnSize;
}
+
+ long estValuesRowWidth = 0;
+
+ for (int columnIndex = numGroupByOutFields; columnIndex <
outContainer.getNumberOfColumns(); columnIndex++) {
+ VectorWrapper vectorWrapper =
outContainer.getValueVector(columnIndex);
+ RecordBatchSizer.ColumnSize columnSize = new
RecordBatchSizer.ColumnSize(vectorWrapper.getValueVector());
+ estOutputRowWidth += columnSize.estSize;
+ estValuesRowWidth += columnSize.estSize;
+ }
+
// multiply by the max number of rows in a batch to get the final
estimated max size
- estMaxBatchSize = Math.max(estRowWidth, estInputRowWidth) *
MAX_BATCH_SIZE;
+ estMaxBatchSize = Math.max(estOutputRowWidth, 1) * MAX_BATCH_SIZE;
// (When there are no aggr functions, use '1' as later code relies on
this size being non-zero)
+ // Note: estValuesBatchSize cannot be 0 because a zero value for
estValuesBatchSize will cause reserveValueBatchMemory to have a value of 0. And
the meaning
+ // of a reserveValueBatchMemory value of 0 has multiple meanings in
different contexts. So estValuesBatchSize has an enforced minimum value of 1,
without this
+ // estValuesBatchsize could have a value of 0 in the case were there
are no value columns and all the columns are key columns.
estValuesBatchSize = Math.max(estValuesRowWidth, 1) * MAX_BATCH_SIZE;
estOutgoingAllocSize = estValuesBatchSize; // initially assume same
size
- logger.trace("{} phase. Estimated internal row width: {} Values row
width: {} batch size: {} memory limit: {} max column width: {}",
-
isTwoPhase?(is2ndPhase?"2nd":"1st"):"Single",estRowWidth,estValuesRowWidth,estMaxBatchSize,allocator.getLimit(),maxColumnWidth);
+ if (logger.isTraceEnabled()) {
--- End diff --
Why is this "if()" needed ? If trace is not enabled, there is no logging
....
---