[ https://issues.apache.org/jira/browse/DRILL-5325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16039806#comment-16039806 ]
ASF GitHub Bot commented on DRILL-5325: --------------------------------------- Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/808#discussion_r120497990 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortMemoryManager.java --- @@ -0,0 +1,506 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.xsort.managed; + +public class SortMemoryManager { + + /** + * Maximum memory this operator may use. Usually comes from the + * operator definition, but may be overridden by a configuration + * parameter for unit testing. + */ + + private final long memoryLimit; + + /** + * Estimated size of the records for this query, updated on each + * new batch received from upstream. + */ + + private int estimatedRowWidth; + + /** + * Size of the merge batches that this operator produces. Generally + * the same as the merge batch size, unless low memory forces a smaller + * value. + */ + + private int expectedMergeBatchSize; + + /** + * Estimate of the input batch size based on the largest batch seen + * thus far. + */ + private int estimatedInputBatchSize; + + /** + * Maximum memory level before spilling occurs. That is, we can buffer input + * batches in memory until we reach the level given by the buffer memory pool. + */ + + private long bufferMemoryLimit; + + /** + * Maximum memory that can hold batches during the merge + * phase. + */ + + private long mergeMemoryLimit; + + /** + * The target size for merge batches sent downstream. + */ + + private int preferredMergeBatchSize; + + /** + * The configured size for each spill batch. + */ + private int preferredSpillBatchSize; + + /** + * Estimated number of rows that fit into a single spill batch. + */ + + private int spillBatchRowCount; + + /** + * The estimated actual spill batch size which depends on the + * details of the data rows for any particular query. + */ + + private int expectedSpillBatchSize; + + /** + * The number of records to add to each output batch sent to the + * downstream operator or spilled to disk. + */ + + private int mergeBatchRowCount; + + private SortConfig config; + +// private long spillPoint; + +// private long minMergeMemory; + + private int estimatedInputSize; + + private boolean potentialOverflow; + + public SortMemoryManager(SortConfig config, long memoryLimit) { + this.config = config; + + // The maximum memory this operator can use as set by the + // operator definition (propagated to the allocator.) + + if (config.maxMemory() > 0) { + this.memoryLimit = Math.min(memoryLimit, config.maxMemory()); + } else { + this.memoryLimit = memoryLimit; + } + + preferredSpillBatchSize = config.spillBatchSize();; + preferredMergeBatchSize = config.mergeBatchSize(); + } + + /** + * Update the data-driven memory use numbers including: + * <ul> + * <li>The average size of incoming records.</li> + * <li>The estimated spill and output batch size.</li> + * <li>The estimated number of average-size records per + * spill and output batch.</li> + * <li>The amount of memory set aside to hold the incoming + * batches before spilling starts.</li> + * </ul> + * <p> + * Under normal circumstances, the amount of memory available is much + * larger than the input, spill or merge batch sizes. The primary question + * is to determine how many input batches we can buffer during the load + * phase, and how many spill batches we can merge during the merge + * phase. + * + * @param batchSize the overall size of the current batch received from + * upstream + * @param batchRowWidth the width in bytes (including overhead) of each + * row in the current input batch + * @param batchRowCount the number of actual (not filtered) records in + * that upstream batch + */ + + public void updateEstimates(int batchSize, int batchRowWidth, int batchRowCount) { + + // The record count should never be zero, but better safe than sorry... + + if (batchRowCount == 0) { + return; } + + + // Update input batch estimates. + // Go no further if nothing changed. + + if (! updateInputEstimates(batchSize, batchRowWidth, batchRowCount)) { + return; + } + + updateSpillSettings(); + updateMergeSettings(); + adjustForLowMemory(); + logSettings(batchRowCount); + } + + private boolean updateInputEstimates(int batchSize, int batchRowWidth, int batchRowCount) { + + // The row width may end up as zero if all fields are nulls or some + // other unusual situation. In this case, assume a width of 10 just + // to avoid lots of special case code. + + if (batchRowWidth == 0) { + batchRowWidth = 10; + } + + // We know the batch size and number of records. Use that to estimate + // the average record size. Since a typical batch has many records, + // the average size is a fairly good estimator. Note that the batch + // size includes not just the actual vector data, but any unused space + // resulting from power-of-two allocation. This means that we don't + // have to do size adjustments for input batches as we will do below + // when estimating the size of other objects. + + // Record sizes may vary across batches. To be conservative, use + // the largest size observed from incoming batches. + + int origRowEstimate = estimatedRowWidth; + estimatedRowWidth = Math.max(estimatedRowWidth, batchRowWidth); + + // Maintain an estimate of the incoming batch size: the largest + // batch yet seen. Used to reserve memory for the next incoming + // batch. Because we are using the actual observed batch size, + // the size already includes overhead due to power-of-two rounding. + + long origInputBatchSize = estimatedInputBatchSize; + estimatedInputBatchSize = Math.max(estimatedInputBatchSize, batchSize); + + // Estimate the total size of each incoming batch plus sv2. Note that, due + // to power-of-two rounding, the allocated sv2 size might be twice the data size. + + estimatedInputSize = estimatedInputBatchSize + 4 * batchRowCount; + + // Return whether anything changed. + + return estimatedRowWidth != origRowEstimate || estimatedInputBatchSize != origInputBatchSize; + } + + /** + * Determine the number of records to spill per spill batch. The goal is to + * spill batches of either 64K records, or as many records as fit into the + * amount of memory dedicated to each spill batch, whichever is less. + */ + + private void updateSpillSettings() { + + spillBatchRowCount = rowsPerBatch(preferredSpillBatchSize); + + // Compute the actual spill batch size which may be larger or smaller + // than the preferred size depending on the row width. Double the estimated + // memory needs to allow for power-of-two rounding. + + expectedSpillBatchSize = batchForRows(spillBatchRowCount); + + // Determine the minimum memory needed for spilling. Spilling is done just + // before accepting a spill batch, so we must spill if we don't have room for a + // (worst case) input batch. To spill, we need room for the spill batch created + // by merging the batches already in memory. + + bufferMemoryLimit = memoryLimit - expectedSpillBatchSize; + } + + /** + * Determine the number of records per batch per merge step. The goal is to + * merge batches of either 64K records, or as many records as fit into the + * amount of memory dedicated to each merge batch, whichever is less. + */ + + private void updateMergeSettings() { + + mergeBatchRowCount = rowsPerBatch(preferredMergeBatchSize); + expectedMergeBatchSize = batchForRows(mergeBatchRowCount); + + // The merge memory pool assumes we can spill all input batches. The memory + // available to hold spill batches for merging is total memory minus the + // expected output batch size. + + mergeMemoryLimit = memoryLimit - expectedMergeBatchSize; + } + + /** + * In a low-memory situation we have to approach the memory assignment + * problem from a different angle. Memory is low enough that we can't + * fit the incoming batches (of a size decided by the upstream operator) + * and our usual spill or merge batch sizes. Instead, we have to + * determine the largest spill and merge batch sizes possible given + * the available memory, input batch size and row width. We shrink the + * sizes of the batches we control to try to make things fit into limited + * memory. At some point, however, if we cannot fit even two input + * batches and even the smallest merge match, then we will run into an + * out-of-memory condition and we log a warning. + * <p> + * Note that these calculations are a bit crazy: it is Drill that + * decided to allocate the small memory, it is Drill that created the + * large incoming batches, and so it is Drill that created the low + * memory situation. Over time, a better fix for this condition is to + * control memory usage at the query level so that the sort is guaranteed + * to have sufficient memory. But, since we don't yet have the luxury + * of making such changes, we just live with the situation as we find + * it. + */ + + private void adjustForLowMemory() { + + long loadHeadroom = bufferMemoryLimit - 2 * estimatedInputSize; + long mergeHeadroom = mergeMemoryLimit - 2 * expectedSpillBatchSize; + if (loadHeadroom >= 0 && mergeHeadroom >= 0) { + return; + } + + lowMemorySpillBatchSize(); + lowMemoryMergeBatchSize(); + + // Sanity check: if we've been given too little memory to make progress, + // issue a warning but proceed anyway. Should only occur if something is + // configured terribly wrong. + + long minNeeds = 2 * estimatedInputSize + expectedSpillBatchSize; + if (minNeeds > memoryLimit) { + ExternalSortBatch.logger.warn("Potential memory overflow during load phase! " + + "Minumum needed = {} bytes, actual available = {} bytes", + minNeeds, memoryLimit); + bufferMemoryLimit = 0; + potentialOverflow = true; + } + + // Sanity check + + minNeeds = 2 * expectedSpillBatchSize + expectedMergeBatchSize; + if (minNeeds > memoryLimit) { + ExternalSortBatch.logger.warn("Potential memory overflow during merge phase! " + + "Minumum needed = {} bytes, actual available = {} bytes", --- End diff -- Fixed. > Implement sub-operator unit tests for managed external sort > ----------------------------------------------------------- > > Key: DRILL-5325 > URL: https://issues.apache.org/jira/browse/DRILL-5325 > Project: Apache Drill > Issue Type: Improvement > Components: Tools, Build & Test > Affects Versions: 1.11.0 > Reporter: Paul Rogers > Assignee: Paul Rogers > Fix For: 1.11.0 > > > Validate the proposed sub-operator test framework, by creating low-level unit > tests for the managed version of the external sort. > The external sort has a small number of existing tests, but those tests are > quite superficial; the "managed sort" project found many bugs. The managed > sort itself was tested with ad-hoc system-level tests created using the new > "cluster fixture" framework. But, again, such tests could not reach deep > inside the sort code to exercise very specific conditions. > As a result, we spent far too much time using QA functional tests to identify > specific code issues. > Using the sub-opeator unit test framework, we can instead test each bit of > functionality at the unit test level. > If doing so works, and is practical, it can serve as a model for other > operator testing projects. -- This message was sent by Atlassian JIRA (v6.3.15#6346)