[
https://issues.apache.org/jira/browse/DRILL-5325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16039802#comment-16039802
]
ASF GitHub Bot commented on DRILL-5325:
---------------------------------------
Github user paul-rogers commented on a diff in the pull request:
https://github.com/apache/drill/pull/808#discussion_r120498177
--- Diff:
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortMemoryManager.java
---
@@ -0,0 +1,506 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.xsort.managed;
+
+public class SortMemoryManager {
+
+ /**
+ * Maximum memory this operator may use. Usually comes from the
+ * operator definition, but may be overridden by a configuration
+ * parameter for unit testing.
+ */
+
+ private final long memoryLimit;
+
+ /**
+ * Estimated size of the records for this query, updated on each
+ * new batch received from upstream.
+ */
+
+ private int estimatedRowWidth;
+
+ /**
+ * Size of the merge batches that this operator produces. Generally
+ * the same as the merge batch size, unless low memory forces a smaller
+ * value.
+ */
+
+ private int expectedMergeBatchSize;
+
+ /**
+ * Estimate of the input batch size based on the largest batch seen
+ * thus far.
+ */
+ private int estimatedInputBatchSize;
+
+ /**
+ * Maximum memory level before spilling occurs. That is, we can buffer
input
+ * batches in memory until we reach the level given by the buffer memory
pool.
+ */
+
+ private long bufferMemoryLimit;
+
+ /**
+ * Maximum memory that can hold batches during the merge
+ * phase.
+ */
+
+ private long mergeMemoryLimit;
+
+ /**
+ * The target size for merge batches sent downstream.
+ */
+
+ private int preferredMergeBatchSize;
+
+ /**
+ * The configured size for each spill batch.
+ */
+ private int preferredSpillBatchSize;
+
+ /**
+ * Estimated number of rows that fit into a single spill batch.
+ */
+
+ private int spillBatchRowCount;
+
+ /**
+ * The estimated actual spill batch size which depends on the
+ * details of the data rows for any particular query.
+ */
+
+ private int expectedSpillBatchSize;
+
+ /**
+ * The number of records to add to each output batch sent to the
+ * downstream operator or spilled to disk.
+ */
+
+ private int mergeBatchRowCount;
+
+ private SortConfig config;
+
+// private long spillPoint;
+
+// private long minMergeMemory;
+
+ private int estimatedInputSize;
+
+ private boolean potentialOverflow;
+
+ public SortMemoryManager(SortConfig config, long memoryLimit) {
+ this.config = config;
+
+ // The maximum memory this operator can use as set by the
+ // operator definition (propagated to the allocator.)
+
+ if (config.maxMemory() > 0) {
+ this.memoryLimit = Math.min(memoryLimit, config.maxMemory());
+ } else {
+ this.memoryLimit = memoryLimit;
+ }
+
+ preferredSpillBatchSize = config.spillBatchSize();;
+ preferredMergeBatchSize = config.mergeBatchSize();
+ }
+
+ /**
+ * Update the data-driven memory use numbers including:
+ * <ul>
+ * <li>The average size of incoming records.</li>
+ * <li>The estimated spill and output batch size.</li>
+ * <li>The estimated number of average-size records per
+ * spill and output batch.</li>
+ * <li>The amount of memory set aside to hold the incoming
+ * batches before spilling starts.</li>
+ * </ul>
+ * <p>
+ * Under normal circumstances, the amount of memory available is much
+ * larger than the input, spill or merge batch sizes. The primary
question
+ * is to determine how many input batches we can buffer during the load
+ * phase, and how many spill batches we can merge during the merge
+ * phase.
+ *
+ * @param batchSize the overall size of the current batch received from
+ * upstream
+ * @param batchRowWidth the width in bytes (including overhead) of each
+ * row in the current input batch
+ * @param batchRowCount the number of actual (not filtered) records in
+ * that upstream batch
+ */
+
+ public void updateEstimates(int batchSize, int batchRowWidth, int
batchRowCount) {
+
+ // The record count should never be zero, but better safe than sorry...
+
+ if (batchRowCount == 0) {
+ return; }
+
+
+ // Update input batch estimates.
+ // Go no further if nothing changed.
+
+ if (! updateInputEstimates(batchSize, batchRowWidth, batchRowCount)) {
+ return;
+ }
+
+ updateSpillSettings();
+ updateMergeSettings();
+ adjustForLowMemory();
+ logSettings(batchRowCount);
+ }
+
+ private boolean updateInputEstimates(int batchSize, int batchRowWidth,
int batchRowCount) {
+
+ // The row width may end up as zero if all fields are nulls or some
+ // other unusual situation. In this case, assume a width of 10 just
+ // to avoid lots of special case code.
+
+ if (batchRowWidth == 0) {
+ batchRowWidth = 10;
+ }
+
+ // We know the batch size and number of records. Use that to estimate
+ // the average record size. Since a typical batch has many records,
+ // the average size is a fairly good estimator. Note that the batch
+ // size includes not just the actual vector data, but any unused space
+ // resulting from power-of-two allocation. This means that we don't
+ // have to do size adjustments for input batches as we will do below
+ // when estimating the size of other objects.
+
+ // Record sizes may vary across batches. To be conservative, use
+ // the largest size observed from incoming batches.
+
+ int origRowEstimate = estimatedRowWidth;
+ estimatedRowWidth = Math.max(estimatedRowWidth, batchRowWidth);
+
+ // Maintain an estimate of the incoming batch size: the largest
+ // batch yet seen. Used to reserve memory for the next incoming
+ // batch. Because we are using the actual observed batch size,
+ // the size already includes overhead due to power-of-two rounding.
+
+ long origInputBatchSize = estimatedInputBatchSize;
+ estimatedInputBatchSize = Math.max(estimatedInputBatchSize, batchSize);
+
+ // Estimate the total size of each incoming batch plus sv2. Note that,
due
+ // to power-of-two rounding, the allocated sv2 size might be twice the
data size.
+
+ estimatedInputSize = estimatedInputBatchSize + 4 * batchRowCount;
+
+ // Return whether anything changed.
+
+ return estimatedRowWidth != origRowEstimate || estimatedInputBatchSize
!= origInputBatchSize;
+ }
+
+ /**
+ * Determine the number of records to spill per spill batch. The goal is
to
+ * spill batches of either 64K records, or as many records as fit into
the
+ * amount of memory dedicated to each spill batch, whichever is less.
+ */
+
+ private void updateSpillSettings() {
+
+ spillBatchRowCount = rowsPerBatch(preferredSpillBatchSize);
+
+ // Compute the actual spill batch size which may be larger or smaller
+ // than the preferred size depending on the row width. Double the
estimated
+ // memory needs to allow for power-of-two rounding.
+
+ expectedSpillBatchSize = batchForRows(spillBatchRowCount);
+
+ // Determine the minimum memory needed for spilling. Spilling is done
just
+ // before accepting a spill batch, so we must spill if we don't have
room for a
+ // (worst case) input batch. To spill, we need room for the spill
batch created
+ // by merging the batches already in memory.
+
+ bufferMemoryLimit = memoryLimit - expectedSpillBatchSize;
+ }
+
+ /**
+ * Determine the number of records per batch per merge step. The goal is
to
+ * merge batches of either 64K records, or as many records as fit into
the
+ * amount of memory dedicated to each merge batch, whichever is less.
+ */
+
+ private void updateMergeSettings() {
+
+ mergeBatchRowCount = rowsPerBatch(preferredMergeBatchSize);
+ expectedMergeBatchSize = batchForRows(mergeBatchRowCount);
+
+ // The merge memory pool assumes we can spill all input batches. The
memory
+ // available to hold spill batches for merging is total memory minus
the
+ // expected output batch size.
+
+ mergeMemoryLimit = memoryLimit - expectedMergeBatchSize;
+ }
+
+ /**
+ * In a low-memory situation we have to approach the memory assignment
+ * problem from a different angle. Memory is low enough that we can't
+ * fit the incoming batches (of a size decided by the upstream operator)
+ * and our usual spill or merge batch sizes. Instead, we have to
+ * determine the largest spill and merge batch sizes possible given
+ * the available memory, input batch size and row width. We shrink the
+ * sizes of the batches we control to try to make things fit into limited
+ * memory. At some point, however, if we cannot fit even two input
+ * batches and even the smallest merge match, then we will run into an
+ * out-of-memory condition and we log a warning.
+ * <p>
+ * Note that these calculations are a bit crazy: it is Drill that
+ * decided to allocate the small memory, it is Drill that created the
+ * large incoming batches, and so it is Drill that created the low
+ * memory situation. Over time, a better fix for this condition is to
+ * control memory usage at the query level so that the sort is guaranteed
+ * to have sufficient memory. But, since we don't yet have the luxury
+ * of making such changes, we just live with the situation as we find
+ * it.
+ */
+
+ private void adjustForLowMemory() {
+
+ long loadHeadroom = bufferMemoryLimit - 2 * estimatedInputSize;
+ long mergeHeadroom = mergeMemoryLimit - 2 * expectedSpillBatchSize;
+ if (loadHeadroom >= 0 && mergeHeadroom >= 0) {
+ return;
+ }
+
+ lowMemorySpillBatchSize();
+ lowMemoryMergeBatchSize();
+
+ // Sanity check: if we've been given too little memory to make
progress,
+ // issue a warning but proceed anyway. Should only occur if something
is
+ // configured terribly wrong.
+
+ long minNeeds = 2 * estimatedInputSize + expectedSpillBatchSize;
+ if (minNeeds > memoryLimit) {
+ ExternalSortBatch.logger.warn("Potential memory overflow during load
phase! " +
+ "Minumum needed = {} bytes, actual available = {} bytes",
+ minNeeds, memoryLimit);
+ bufferMemoryLimit = 0;
+ potentialOverflow = true;
+ }
+
+ // Sanity check
+
+ minNeeds = 2 * expectedSpillBatchSize + expectedMergeBatchSize;
+ if (minNeeds > memoryLimit) {
+ ExternalSortBatch.logger.warn("Potential memory overflow during
merge phase! " +
+ "Minumum needed = {} bytes, actual available = {} bytes",
+ minNeeds, memoryLimit);
+ mergeMemoryLimit = 0;
+ potentialOverflow = true;
+ }
+ }
+
+ /**
+ * If we are in a low-memory condition, then we might not have room for
the
+ * default spill batch size. In that case, pick a smaller size based on
+ * the observation that we need two input batches and
+ * one spill batch to make progress.
+ */
+
+ private void lowMemorySpillBatchSize() {
+
+ // The "expected" size is with power-of-two rounding in some vectors.
+ // We later work backwards to the row count assuming average internal
+ // fragmentation.
+
+ // Must hold two input batches. Use (most of) the rest for the spill
batch.
+
+ expectedSpillBatchSize = (int) (memoryLimit - 2 * estimatedInputSize);
+
+ // But, in the merge phase, we need two spill batches and one output
batch.
+ // (Assume that the spill and merge are equal sizes.)
+ // Use 3/4 of memory for each batch (to allow power-of-two rounding:
+
+ expectedSpillBatchSize = (int) Math.min(expectedSpillBatchSize,
memoryLimit/3);
+
+ // Never going to happen, but let's ensure we don't somehow create
large batches.
+
+ expectedSpillBatchSize = Math.max(expectedSpillBatchSize,
SortConfig.MIN_SPILL_BATCH_SIZE);
+
+ // Must hold at least one row to spill. That is, we can make progress
if we
+ // create spill files that consist of single-record batches.
+
+ expectedSpillBatchSize = Math.max(expectedSpillBatchSize,
estimatedRowWidth);
+
+ // Work out the spill batch count needed by the spill code. Allow room
for
+ // power-of-two rounding.
+
+ spillBatchRowCount = rowsPerBatch(expectedSpillBatchSize);
+
+ // Finally, figure out when we must spill.
+
+ bufferMemoryLimit = memoryLimit - expectedSpillBatchSize;
+ }
+
+ /**
+ * For merge batch, we must hold at least two spill batches and
+ * one output batch.
+ */
+
+ private void lowMemoryMergeBatchSize() {
+ expectedMergeBatchSize = (int) (memoryLimit - 2 *
expectedSpillBatchSize);
+ expectedMergeBatchSize = Math.max(expectedMergeBatchSize,
SortConfig.MIN_MERGE_BATCH_SIZE);
+ expectedMergeBatchSize = Math.max(expectedMergeBatchSize,
estimatedRowWidth);
+ mergeBatchRowCount = rowsPerBatch(expectedMergeBatchSize);
+ mergeMemoryLimit = memoryLimit - expectedMergeBatchSize;
+ }
+
+ /**
+ * Log the calculated values. Turn this on if things seem amiss.
+ * Message will appear only when the values change.
+ */
+
+ private void logSettings(int actualRecordCount) {
+
+ ExternalSortBatch.logger.debug("Input Batch Estimates: record size =
{} bytes; input batch = {} bytes, {} records",
+ estimatedRowWidth, estimatedInputBatchSize,
actualRecordCount);
+ ExternalSortBatch.logger.debug("Merge batch size = {} bytes, {}
records; spill file size: {} bytes",
+ expectedSpillBatchSize, spillBatchRowCount,
config.spillFileSize());
+ ExternalSortBatch.logger.debug("Output batch size = {} bytes, {}
records",
+ expectedMergeBatchSize, mergeBatchRowCount);
+ ExternalSortBatch.logger.debug("Available memory: {}, buffer memory =
{}, merge memory = {}",
+ memoryLimit, bufferMemoryLimit, mergeMemoryLimit);
+ }
+
+ public enum MergeAction { SPILL, MERGE, NONE }
+
+ public static class MergeTask {
+ public MergeAction action;
+ public int count;
+
+ public MergeTask(MergeAction action, int count) {
+ this.action = action;
+ this.count = count;
+ }
+ }
+
+ public MergeTask consolidateBatches(long allocMemory, int inMemCount,
int spilledRunsCount) {
+
+ // Determine additional memory needed to hold one batch from each
+ // spilled run.
+
+ // If the on-disk batches and in-memory batches need more memory than
+ // is available, spill some in-memory batches.
+
+ if (inMemCount > 0) {
+ long mergeSize = spilledRunsCount * expectedSpillBatchSize;
+ if (allocMemory + mergeSize > mergeMemoryLimit) {
+ return new MergeTask(MergeAction.SPILL, 0);
+ }
+ }
+
+ // Maximum batches that fit into available memory.
+
+ int mergeLimit = (int) ((mergeMemoryLimit - allocMemory) /
expectedSpillBatchSize);
+
+ // Can't merge more than the merge limit.
+
+ mergeLimit = Math.min(mergeLimit, config.mergeLimit());
+
+ // How many batches to merge?
+
+ int mergeCount = spilledRunsCount - mergeLimit;
+ if (mergeCount <= 0) {
+ return new MergeTask(MergeAction.NONE, 0);
+ }
+
+ // We will merge. This will create yet another spilled
+ // run. Account for that.
+
+ mergeCount += 1;
+
+ // Must merge at least 2 batches to make progress.
+ // This is the the (at least one) excess plus the allowance
+ // above for the new one.
+
+ // Can't merge more than the limit.
+
+ mergeCount = Math.min(mergeCount, config.mergeLimit());
+
+ // Do the merge, then loop to try again in case not
+ // all the target batches spilled in one go.
+
+ return new MergeTask(MergeAction.MERGE, mergeCount);
+ }
+
+ /**
+ * Compute the number of rows per batch assuming that the batch is
+ * subject to average internal fragmentation due to power-of-two
+ * rounding on vectors.
+ * <p>
+ * <pre>[____|__$__]</pre>
+ * In the above, the brackets represent the whole vector. The
+ * first half is always full. When the first half filled, the second
+ * half was allocated. On average, the second half will be half full.
+ *
+ * @param batchSize expected batch size, including internal fragmentation
+ * @return number of rows that fit into the batch
+ */
+
+ private int rowsPerBatch(int batchSize) {
+ int rowCount = batchSize * 3 / 4 / estimatedRowWidth;
+ return Math.max(1, Math.min(rowCount, Character.MAX_VALUE));
+ }
+
+ /**
+ * Compute the expected number of rows that fit into a given size
+ * batch, accounting for internal fragmentation due to power-of-two
+ * rounding on vector allocations.
+ *
+ * @param rowCount the desired number of rows in the batch
+ * @return the size of resulting batch, including power-of-two
+ * rounding.
+ */
+
+ private int batchForRows(int rowCount) {
+ return estimatedRowWidth * rowCount * 4 / 3;
+ }
+
+ // Must spill if we are below the spill point (the amount of memory
+ // needed to do the minimal spill.)
+
+ public boolean isSpillNeeded(long allocatedBytes, int incomingSize) {
+ return allocatedBytes + incomingSize >= bufferMemoryLimit;
+ }
+
+ public boolean hasMemoryMergeCapacity(long allocatedBytes, long
neededForInMemorySort) {
+ return (freeMemory(allocatedBytes) >= neededForInMemorySort);
+ }
+
+ public long freeMemory(long allocatedBytes) {
+ return memoryLimit - allocatedBytes;
+ }
+
+ public long getMergeMemoryLimit() { return mergeMemoryLimit; }
+ public int getSpillBatchRowCount() { return spillBatchRowCount; }
+ public int getMergeBatchRowCount() { return mergeBatchRowCount; }
+
+ // Primarily for testing
+
--- End diff --
Fixed.
> Implement sub-operator unit tests for managed external sort
> -----------------------------------------------------------
>
> Key: DRILL-5325
> URL: https://issues.apache.org/jira/browse/DRILL-5325
> Project: Apache Drill
> Issue Type: Improvement
> Components: Tools, Build & Test
> Affects Versions: 1.11.0
> Reporter: Paul Rogers
> Assignee: Paul Rogers
> Fix For: 1.11.0
>
>
> Validate the proposed sub-operator test framework, by creating low-level unit
> tests for the managed version of the external sort.
> The external sort has a small number of existing tests, but those tests are
> quite superficial; the "managed sort" project found many bugs. The managed
> sort itself was tested with ad-hoc system-level tests created using the new
> "cluster fixture" framework. But, again, such tests could not reach deep
> inside the sort code to exercise very specific conditions.
> As a result, we spent far too much time using QA functional tests to identify
> specific code issues.
> Using the sub-opeator unit test framework, we can instead test each bit of
> functionality at the unit test level.
> If doing so works, and is practical, it can serve as a model for other
> operator testing projects.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)