This is an automated email from the ASF dual-hosted git repository. boaz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
commit 02255d8573497cb88e19624ea0054d0121498646 Author: Padma Penumarthy <[email protected]> AuthorDate: Mon Jun 25 19:06:51 2018 -0700 DRILL-6512: Remove unnecessary processing overhead from RecordBatchSizer closes #1341 --- .../physical/impl/aggregate/HashAggTemplate.java | 4 +- .../exec/physical/impl/common/HashPartition.java | 2 +- .../physical/impl/common/HashTableTemplate.java | 2 +- .../physical/impl/flatten/FlattenRecordBatch.java | 2 +- .../exec/physical/impl/xsort/managed/SortImpl.java | 10 +- .../drill/exec/record/JoinBatchMemoryManager.java | 7 +- .../exec/record/RecordBatchMemoryManager.java | 12 ++- .../apache/drill/exec/record/RecordBatchSizer.java | 120 ++++++++++++--------- .../exec/physical/unit/TestOutputBatchSize.java | 2 +- .../org/apache/drill/test/DrillTestWrapper.java | 2 +- .../drill/test/rowSet/AbstractSingleRowSet.java | 2 +- .../apache/drill/test/rowSet/IndirectRowSet.java | 2 +- 12 files changed, 96 insertions(+), 71 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java index 320f296..3b50471 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java @@ -517,10 +517,10 @@ public abstract class HashAggTemplate implements HashAggregator { logger.trace("Incoming sizer: {}",sizer); // An empty batch only has the schema, can not tell actual length of varchars // else use the actual varchars length, each capped at 50 (to match the space allocation) - long estInputRowWidth = sizer.rowCount() == 0 ? sizer.stdRowWidth() : sizer.netRowWidthCap50(); + long estInputRowWidth = sizer.rowCount() == 0 ? sizer.getStdRowWidth() : sizer.getNetRowWidthCap50(); // Get approx max (varchar) column width to get better memory allocation - maxColumnWidth = Math.max(sizer.maxAvgColumnSize(), VARIABLE_MIN_WIDTH_VALUE_SIZE); + maxColumnWidth = Math.max(sizer.getMaxAvgColumnSize(), VARIABLE_MIN_WIDTH_VALUE_SIZE); maxColumnWidth = Math.min(maxColumnWidth, VARIABLE_MAX_WIDTH_VALUE_SIZE); // diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java index e525530..d80237c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java @@ -249,7 +249,7 @@ public class HashPartition implements HashJoinMemoryCalculator.PartitionStat { tmpBatchesList.add(currentBatch); partitionBatchesCount++; - long batchSize = new RecordBatchSizer(currentBatch).actualSize(); + long batchSize = new RecordBatchSizer(currentBatch).getActualSize(); inMemoryBatchStats.add(new HashJoinMemoryCalculator.BatchStat(currentBatch.getRecordCount(), batchSize)); partitionInMemorySize += batchSize; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java index bb0b1ad..6c9a398 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java @@ -438,7 +438,7 @@ public abstract class HashTableTemplate implements HashTable { size += ledger.getAccountedSize(); } - size += new RecordBatchSizer(htContainer).actualSize(); + size += new RecordBatchSizer(htContainer).getActualSize(); return size; } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java index 2f92d52..8421d6e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java @@ -136,7 +136,7 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> { final int avgRowWidthFlattenColumn = columnSize.getNetSizePerEntry(); // Average rowWidth excluding the flatten column. - final int avgRowWidthWithOutFlattenColumn = getRecordBatchSizer().netRowWidth() - avgRowWidthFlattenColumn; + final int avgRowWidthWithOutFlattenColumn = getRecordBatchSizer().getNetRowWidth() - avgRowWidthFlattenColumn; // Average rowWidth of single element in the flatten list. // subtract the offset vector size from column data size. diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortImpl.java index 55a20bd..03fb751 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortImpl.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortImpl.java @@ -298,14 +298,14 @@ public class SortImpl { // during the transfer, we immediately follow the transfer with an SV2 // allocation that will fail if we are over the allocation limit. - if (isSpillNeeded(sizer.actualSize())) { + if (isSpillNeeded(sizer.getActualSize())) { spillFromMemory(); } // Sanity check. We should now be below the buffer memory maximum. long startMem = allocator.getAllocatedMemory(); - bufferedBatches.add(incoming, sizer.netSize()); + bufferedBatches.add(incoming, sizer.getNetBatchSize()); // Compute batch size, including allocation of an sv2. @@ -314,7 +314,7 @@ public class SortImpl { // Update the minimum buffer space metric. - metrics.updateInputMetrics(sizer.rowCount(), sizer.actualSize()); + metrics.updateInputMetrics(sizer.rowCount(), sizer.getActualSize()); metrics.updateMemory(memManager.freeMemory(endMem)); metrics.updatePeakBatches(bufferedBatches.size()); @@ -322,8 +322,8 @@ public class SortImpl { // the effective count as given by the selection vector // (which may exclude some records due to filtering.) - validateBatchSize(sizer.actualSize(), batchSize); - if (memManager.updateEstimates((int) batchSize, sizer.netRowWidth(), sizer.rowCount())) { + validateBatchSize(sizer.getActualSize(), batchSize); + if (memManager.updateEstimates((int) batchSize, sizer.getNetRowWidth(), sizer.rowCount())) { // If estimates changed, discard the helper based on the old estimates. diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java index 16b06fe..2ebe887 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java @@ -37,13 +37,14 @@ public class JoinBatchMemoryManager extends RecordBatchMemoryManager { private int updateInternal(int inputIndex, int outputPosition, boolean useAggregate) { updateIncomingStats(inputIndex); - rowWidth[inputIndex] = useAggregate ? (int) getAvgInputRowWidth(inputIndex) : getRecordBatchSizer(inputIndex).getRowAllocSize(); + rowWidth[inputIndex] = useAggregate ? (int) getAvgInputRowWidth(inputIndex) : getRecordBatchSizer(inputIndex).getRowAllocWidth(); final int newOutgoingRowWidth = rowWidth[LEFT_INDEX] + rowWidth[RIGHT_INDEX]; - // If outgoing row width is 0, just return. This is possible for empty batches or + // If outgoing row width is 0 or there is no change in outgoing row width, just return. + // This is possible for empty batches or // when first set of batches come with OK_NEW_SCHEMA and no data. - if (newOutgoingRowWidth == 0) { + if (newOutgoingRowWidth == 0 || newOutgoingRowWidth == getOutgoingRowWidth()) { return getOutputRowCount(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchMemoryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchMemoryManager.java index 993f3cb..a270ced 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchMemoryManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchMemoryManager.java @@ -157,9 +157,9 @@ public class RecordBatchMemoryManager { public void update(RecordBatch recordBatch, int index) { // Get sizing information for the batch. setRecordBatchSizer(index, new RecordBatchSizer(recordBatch)); - setOutgoingRowWidth(getRecordBatchSizer(index).netRowWidth()); + setOutgoingRowWidth(getRecordBatchSizer(index).getNetRowWidth()); // Number of rows in outgoing batch - setOutputRowCount(getOutputBatchSize(), getRecordBatchSizer(index).netRowWidth()); + setOutputRowCount(getOutputBatchSize(), getRecordBatchSizer(index).getNetRowWidth()); updateIncomingStats(index); } @@ -201,6 +201,10 @@ public class RecordBatchMemoryManager { return (Math.min(MAX_NUM_ROWS, Math.max(Integer.highestOneBit(rowCount) - 1, MIN_NUM_ROWS))); } + public static int computeRowCount(int batchSize, int rowWidth) { + return adjustOutputRowCount(RecordBatchSizer.safeDivide(batchSize, rowWidth)); + } + public void setOutgoingRowWidth(int outgoingRowWidth) { this.outgoingRowWidth = outgoingRowWidth; } @@ -249,13 +253,13 @@ public class RecordBatchMemoryManager { Preconditions.checkArgument(index >= 0 && index < numInputs); Preconditions.checkArgument(inputBatchStats[index] != null); inputBatchStats[index].incNumBatches(); - inputBatchStats[index].incSumBatchSizes(sizer[index].netSize()); + inputBatchStats[index].incSumBatchSizes(sizer[index].getNetBatchSize()); inputBatchStats[index].incTotalRecords(sizer[index].rowCount()); } public void updateIncomingStats() { inputBatchStats[DEFAULT_INPUT_INDEX].incNumBatches(); - inputBatchStats[DEFAULT_INPUT_INDEX].incSumBatchSizes(sizer[DEFAULT_INPUT_INDEX].netSize()); + inputBatchStats[DEFAULT_INPUT_INDEX].incSumBatchSizes(sizer[DEFAULT_INPUT_INDEX].getNetBatchSize()); inputBatchStats[DEFAULT_INPUT_INDEX].incTotalRecords(sizer[DEFAULT_INPUT_INDEX].rowCount()); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchSizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchSizer.java index 4b8ae80..83287ee 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchSizer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchSizer.java @@ -27,7 +27,6 @@ import org.apache.drill.common.types.TypeProtos.MinorType; import org.apache.drill.exec.expr.TypeHelper; import org.apache.drill.exec.memory.AllocationManager.BufferLedger; import org.apache.drill.exec.memory.BaseAllocator; -import org.apache.drill.exec.physical.impl.xsort.managed.SortMemoryManager; import org.apache.drill.exec.record.selection.SelectionVector2; import org.apache.drill.exec.vector.AllocationHelper; import org.apache.drill.exec.vector.NullableVector; @@ -53,11 +52,6 @@ import static org.apache.drill.exec.vector.AllocationHelper.STD_REPETITION_FACTO */ public class RecordBatchSizer { - // TODO consolidate common memory estimation helpers - public static final double PAYLOAD_FROM_BUFFER = SortMemoryManager.PAYLOAD_FROM_BUFFER; - public static final double FRAGMENTATION_FACTOR = 1.0 / PAYLOAD_FROM_BUFFER; - public static final double BUFFER_FROM_PAYLOAD = SortMemoryManager.BUFFER_FROM_PAYLOAD; - private static final int OFFSET_VECTOR_WIDTH = UInt4Vector.VALUE_WIDTH; private static final int BIT_VECTOR_WIDTH = UInt1Vector.VALUE_WIDTH; @@ -644,12 +638,6 @@ public class RecordBatchSizer { */ private int rowCount; /** - * Standard row width using Drill meta-data. Note: this information is - * 100% bogus. Do not use it. - */ - @Deprecated - private int stdRowWidth; - /** * Actual batch size summing all buffers used to store data * for the batch. */ @@ -669,9 +657,12 @@ public class RecordBatchSizer { /** * actual row size if input is not empty. Otherwise, standard size. */ - private int rowAllocSize; - private boolean hasSv2; + private int rowAllocWidth; + private int stdRowWidth; + + public SelectionVector2 sv2 = null; private int sv2Size; + private int avgDensity; private Set<BufferLedger> ledgers = Sets.newIdentityHashSet(); @@ -724,47 +715,24 @@ public class RecordBatchSizer { for (VectorWrapper<?> vw : va) { ColumnSize colSize = measureColumn(vw.getValueVector(), ""); columnSizes.put(vw.getField().getName(), colSize); - stdRowWidth += colSize.getStdDataSizePerEntry(); netBatchSize += colSize.getTotalNetSize(); maxSize = Math.max(maxSize, colSize.getTotalDataSize()); if (colSize.metadata.isNullable()) { nullableCount++; } netRowWidth += colSize.getNetSizePerEntry(); - rowAllocSize += colSize.getAllocSizePerEntry(); - } - - for (BufferLedger ledger : ledgers) { - accountedMemorySize += ledger.getAccountedSize(); - } - - if (rowCount > 0) { - grossRowWidth = safeDivide(accountedMemorySize, rowCount); - } - - if (sv2 != null) { - sv2Size = sv2.getBuffer(false).capacity(); - accountedMemorySize += sv2Size; - hasSv2 = true; } - - computeEstimates(); - } - - private void computeEstimates() { - grossRowWidth = safeDivide(accountedMemorySize, rowCount); - avgDensity = safeDivide(netBatchSize * 100L, accountedMemorySize); + this.sv2 = sv2; } public void applySv2() { - if (hasSv2) { + if (sv2 == null) { return; } - hasSv2 = true; sv2Size = BaseAllocator.nextPowerOfTwo(2 * rowCount); + avgDensity = safeDivide(netBatchSize * 100L, getActualSize()); accountedMemorySize += sv2Size; - computeEstimates(); } /** @@ -856,10 +824,64 @@ public class RecordBatchSizer { } public int rowCount() { return rowCount; } - public int stdRowWidth() { return stdRowWidth; } - public int grossRowWidth() { return grossRowWidth; } - public int netRowWidth() { return netRowWidth; } - public int getRowAllocSize() { return rowAllocSize; } + + public int getStdRowWidth() { + if (stdRowWidth != 0) { + return stdRowWidth; + } + + for (ColumnSize columnSize : columnSizes.values()) { + stdRowWidth += columnSize.getStdDataSizePerEntry(); + } + + return stdRowWidth; + } + + public int getRowAllocWidth() { + if (rowAllocWidth != 0) { + return rowAllocWidth; + } + + for (ColumnSize columnSize : columnSizes.values()) { + rowAllocWidth += columnSize.getAllocSizePerEntry(); + } + + return rowAllocWidth; + } + + public long getActualSize() { + if (accountedMemorySize != 0) { + return accountedMemorySize; + } + + for (BufferLedger ledger : ledgers) { + accountedMemorySize += ledger.getAccountedSize(); + } + + if (sv2 != null) { + sv2Size = sv2.getBuffer(false).capacity(); + accountedMemorySize += sv2Size; + } + + return accountedMemorySize; + } + + public int getGrossRowWidth() { + if (grossRowWidth != 0) { + return grossRowWidth; + } + + grossRowWidth = safeDivide(getActualSize(), rowCount); + + return grossRowWidth; + } + + public int getAvgDensity() { + return safeDivide(netBatchSize * 100L, getActualSize()); + } + + + public int getNetRowWidth() { return netRowWidth; } public Map<String, ColumnSize> columns() { return columnSizes; } /** @@ -868,12 +890,10 @@ public class RecordBatchSizer { * and null marking columns. * @return "real" width of the row */ - public int netRowWidthCap50() { return netRowWidthCap50 + nullableCount; } - public long actualSize() { return accountedMemorySize; } - public boolean hasSv2() { return hasSv2; } - public int avgDensity() { return avgDensity; } - public long netSize() { return netBatchSize; } - public int maxAvgColumnSize() { return maxSize / rowCount; } + public int getNetRowWidthCap50() { return netRowWidthCap50 + nullableCount; } + public boolean hasSv2() { return sv2 != null; } + public long getNetBatchSize() { return netBatchSize; } + public int getMaxAvgColumnSize() { return safeDivide(maxSize, rowCount); } @Override public String toString() { diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java index a029832..fd0b494 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java @@ -73,7 +73,7 @@ public class TestOutputBatchSize extends PhysicalOpUnitTestBase { long totalSize = 0; for (VectorAccessible batch : batches) { RecordBatchSizer sizer = new RecordBatchSizer(batch); - totalSize += sizer.netSize(); + totalSize += sizer.getNetBatchSize(); } return totalSize; } diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/DrillTestWrapper.java b/exec/java-exec/src/test/java/org/apache/drill/test/DrillTestWrapper.java index 051f4b3..e037d02 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/test/DrillTestWrapper.java +++ b/exec/java-exec/src/test/java/org/apache/drill/test/DrillTestWrapper.java @@ -373,7 +373,7 @@ public class DrillTestWrapper { RecordBatchSizer sizer = new RecordBatchSizer(loader); // Not checking actualSize as accounting is not correct when we do // split and transfer ownership across operators. - Assert.assertTrue(sizer.netSize() <= expectedBatchSize); + Assert.assertTrue(sizer.getNetBatchSize() <= expectedBatchSize); } // TODO: Clean: DRILL-2933: That load(...) no longer throws diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/AbstractSingleRowSet.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/AbstractSingleRowSet.java index 71ca3cf..297a1c5 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/AbstractSingleRowSet.java +++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/AbstractSingleRowSet.java @@ -52,7 +52,7 @@ public abstract class AbstractSingleRowSet extends AbstractRowSet implements Sin @Override public long size() { RecordBatchSizer sizer = new RecordBatchSizer(container()); - return sizer.actualSize(); + return sizer.getActualSize(); } /** diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/IndirectRowSet.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/IndirectRowSet.java index a2bc5e8..f0ebdc0 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/IndirectRowSet.java +++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/IndirectRowSet.java @@ -139,6 +139,6 @@ public class IndirectRowSet extends AbstractSingleRowSet { @Override public long size() { RecordBatchSizer sizer = new RecordBatchSizer(container(), sv2); - return sizer.actualSize(); + return sizer.getActualSize(); } }
