http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortMemoryManager.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortMemoryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortMemoryManager.java index 213720f..cd03b70 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortMemoryManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortMemoryManager.java @@ -19,7 +19,160 @@ package org.apache.drill.exec.physical.impl.xsort.managed; import com.google.common.annotations.VisibleForTesting; +/** + * Computes the memory needs for input batches, spill batches and merge + * batches. The key challenges that this code tries to overcome are: + * <ul> + * <li>Drill is not designed for the small memory allocations, + * but the planner may provide such allocations because the memory per + * query is divided among slices (minor fragments) and among buffering + * operators, leaving very little per operator.</li> + * <li>Drill does not provide the detailed memory information needed to + * carefully manage memory in tight constraints.</li> + * <li>But, Drill has a death penalty for going over the memory limit.</li> + * </ul> + * As a result, this class is a bit of a hack: it attempt to consider a + * number of ill-defined factors in order to divide up memory use in a + * way that prevents OOM errors. + * <p> + * First, it is necessary to differentiate two concepts: + * <ul> + * <li>The <i>data size</i> of a batch: the amount of memory needed to hold + * the data itself. The data size is constant for any given batch.</li> + * <li>The <i>buffer size</i> of the buffers that hold the data. The buffer + * size varies wildly depending on how the batch was produced.</li> + * </ul> + * The three kinds of buffer layouts seen to date include: + * <ul> + * <li>One buffer per vector component (data, offsets, null flags, etc.) + * – create by readers, project and other operators.</li> + * <li>One buffer for the entire batch, with each vector component using + * a slice of the overall buffer. – case for batches deserialized from + * exchanges.</li> + * <li>One buffer for each top-level vector, with component vectors + * using slices of the overall vector buffer – the result of reading + * spilled batches from disk.</li> + * </ul> + * In each case, buffer sizes are power-of-two rounded from the data size. + * But since the data is grouped differently in each case, the resulting buffer + * sizes vary considerably. + * <p> + * As a result, we can never be sure of the amount of memory needed for a + * batch. So, we have to estimate based on a number of factors: + * <ul> + * <li>Uses the {@link RecordBatchSizer} to estimate the data size and + * buffer size of each incoming batch.</li> + * <li>Estimates the internal fragmentation due to power-of-two rounding.</li> + * <li>Configured preferences for spill and output batches.</li> + * </ul> + * The code handles "normal" and "low" memory conditions. + * <ul> + * <li>In normal memory, we simply work out the number of preferred-size + * batches that fit in memory (based on the predicted buffer size.)</li> + * <li>In low memory, we divide up the available memory to produce the + * spill and merge batch sizes. The sizes will be less than the configured + * preference.</li> + * </ul> + * <p> + * The sort has two key configured parameters: the spill file size and the + * size of the output (downstream) batch. The spill file size is chosen to + * be large enough to ensure efficient I/O, but not so large as to overwhelm + * any one spill directory. The output batch size is chosen to be large enough + * to amortize the per-batch overhead over the maximum number of records, but + * not so large as to overwhelm downstream operators. Setting these parameters + * is a judgment call. + * <p> + * Under limited memory, the above sizes may be too big for the space available. + * For example, the default spill file size is 256 MB. But, if the sort is + * only given 50 MB, then spill files will be smaller. The default output batch + * size is 16 MB, but if the sort is given only 20 MB, then the output batch must + * be smaller. The low memory logic starts with the memory available and works + * backwards to figure out spill batch size, output batch size and spill file + * size. The sizes will be smaller than optimal, but as large as will fit in + * the memory provided. + */ + public class SortMemoryManager { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExternalSortBatch.class); + + /** + * Estimate for typical internal fragmentation in a buffer due to power-of-two + * rounding on vectors. + * <p> + * <p> + * <pre>[____|__$__]</pre> + * In the above, the brackets represent the whole vector. The + * first half is always full. The $ represents the end of data. + * When the first half filled, the second + * half was allocated. On average, the second half will be half full. + * This means that, on average, 1/4 of the allocated space is + * unused (the definition of internal fragmentation.) + */ + + public static final double INTERNAL_FRAGMENTATION_ESTIMATE = 1.0/4.0; + + /** + * Given a buffer, this is the assumed amount of space + * available for data. (Adding more will double the buffer + * size half the time.) + */ + + public static final double PAYLOAD_FROM_BUFFER = 1 - INTERNAL_FRAGMENTATION_ESTIMATE; + + /** + * Given a data size, this is the multiplier to create the buffer + * size estimate. (Note: since we work with aggregate batches, we + * cannot simply round up to the next power of two: rounding is done + * on a vector-by-vector basis. Here we need to estimate the aggregate + * effect of rounding. + */ + + public static final double BUFFER_FROM_PAYLOAD = 3.0 / 2.0; + + /** + * On really bad days, we will add one more byte (or value) to a vector + * than fits in a power-of-two sized buffer, forcing a doubling. In this + * case, half the resulting buffer is empty. + */ + + public static final double WORST_CASE_BUFFER_RATIO = 2.0; + + /** + * Desperate attempt to keep spill batches from being too small in low memory. + * <p> + * The number is also used for logging: the system will log a warning if + * batches fall below this number which may represent too little memory + * allocated for the job at hand. (Queries operate on big data: many records. + * Batches with too few records are a probable performance hit. But, what is + * too few? It is a judgment call.) + */ + + public static final int MIN_ROWS_PER_SORT_BATCH = 100; + public static final double LOW_MEMORY_MERGE_BATCH_RATIO = 0.25; + + public static class BatchSizeEstimate { + int dataSize; + int expectedBufferSize; + int maxBufferSize; + + public void setFromData(int dataSize) { + this.dataSize = dataSize; + expectedBufferSize = multiply(dataSize, BUFFER_FROM_PAYLOAD); + maxBufferSize = multiply(dataSize, WORST_CASE_BUFFER_RATIO); + } + + public void setFromBuffer(int bufferSize) { + expectedBufferSize = bufferSize; + dataSize = multiply(bufferSize, PAYLOAD_FROM_BUFFER); + maxBufferSize = multiply(dataSize, WORST_CASE_BUFFER_RATIO); + } + + public void setFromWorstCaseBuffer(int bufferSize) { + maxBufferSize = bufferSize; + dataSize = multiply(maxBufferSize, 1 / WORST_CASE_BUFFER_RATIO); + expectedBufferSize = multiply(dataSize, BUFFER_FROM_PAYLOAD); + } + } /** * Maximum memory this operator may use. Usually comes from the @@ -42,13 +195,13 @@ public class SortMemoryManager { * value. */ - private int expectedMergeBatchSize; + private final BatchSizeEstimate mergeBatchSize = new BatchSizeEstimate(); /** * Estimate of the input batch size based on the largest batch seen * thus far. */ - private int estimatedInputBatchSize; + private final BatchSizeEstimate inputBatchSize = new BatchSizeEstimate(); /** * Maximum memory level before spilling occurs. That is, we can buffer input @@ -86,7 +239,7 @@ public class SortMemoryManager { * details of the data rows for any particular query. */ - private int expectedSpillBatchSize; + private final BatchSizeEstimate spillBatchSize = new BatchSizeEstimate(); /** * The number of records to add to each output batch sent to the @@ -97,24 +250,41 @@ public class SortMemoryManager { private SortConfig config; - private int estimatedInputSize; - private boolean potentialOverflow; - public SortMemoryManager(SortConfig config, long memoryLimit) { + private boolean isLowMemory; + + private boolean performanceWarning; + + public SortMemoryManager(SortConfig config, long opMemoryLimit) { this.config = config; // The maximum memory this operator can use as set by the // operator definition (propagated to the allocator.) - if (config.maxMemory() > 0) { - this.memoryLimit = Math.min(memoryLimit, config.maxMemory()); - } else { - this.memoryLimit = memoryLimit; - } + final long configMemoryLimit = config.maxMemory(); + memoryLimit = (configMemoryLimit == 0) ? opMemoryLimit + : Math.min(opMemoryLimit, configMemoryLimit); preferredSpillBatchSize = config.spillBatchSize();; preferredMergeBatchSize = config.mergeBatchSize(); + + // Initialize the buffer memory limit for the first batch. + // Assume 1/2 of (allocated - spill batch size). + + bufferMemoryLimit = (memoryLimit - config.spillBatchSize()) / 2; + if (bufferMemoryLimit < 0) { + // Bad news: not enough for even the spill batch. + // Assume half of memory, will adjust later. + bufferMemoryLimit = memoryLimit / 2; + } + + if (memoryLimit == opMemoryLimit) { + logger.debug("Memory config: Allocator limit = {}", memoryLimit); + } else { + logger.debug("Memory config: Allocator limit = {}, Configured limit: {}", + opMemoryLimit, memoryLimit); + } } /** @@ -134,36 +304,39 @@ public class SortMemoryManager { * phase, and how many spill batches we can merge during the merge * phase. * - * @param batchSize the overall size of the current batch received from + * @param batchDataSize the overall size of the current batch received from * upstream * @param batchRowWidth the average width in bytes (including overhead) of * rows in the current input batch * @param batchRowCount the number of actual (not filtered) records in * that upstream batch + * @return true if the estimates changed, false if the previous estimates + * remain valid */ - public void updateEstimates(int batchSize, int batchRowWidth, int batchRowCount) { + public boolean updateEstimates(int batchDataSize, int batchRowWidth, int batchRowCount) { // The record count should never be zero, but better safe than sorry... if (batchRowCount == 0) { - return; } + return false; } // Update input batch estimates. // Go no further if nothing changed. - if (! updateInputEstimates(batchSize, batchRowWidth, batchRowCount)) { - return; + if (! updateInputEstimates(batchDataSize, batchRowWidth, batchRowCount)) { + return false; } updateSpillSettings(); updateMergeSettings(); adjustForLowMemory(); logSettings(batchRowCount); + return true; } - private boolean updateInputEstimates(int batchSize, int batchRowWidth, int batchRowCount) { + private boolean updateInputEstimates(int batchDataSize, int batchRowWidth, int batchRowCount) { // The row width may end up as zero if all fields are nulls or some // other unusual situation. In this case, assume a width of 10 just @@ -192,17 +365,13 @@ public class SortMemoryManager { // batch. Because we are using the actual observed batch size, // the size already includes overhead due to power-of-two rounding. - long origInputBatchSize = estimatedInputBatchSize; - estimatedInputBatchSize = Math.max(estimatedInputBatchSize, batchSize); - - // Estimate the total size of each incoming batch plus sv2. Note that, due - // to power-of-two rounding, the allocated sv2 size might be twice the data size. - - estimatedInputSize = estimatedInputBatchSize + 4 * batchRowCount; + long origInputBatchSize = inputBatchSize.dataSize; + inputBatchSize.setFromData(Math.max(inputBatchSize.dataSize, batchDataSize)); // Return whether anything changed. - return estimatedRowWidth != origRowEstimate || estimatedInputBatchSize != origInputBatchSize; + return estimatedRowWidth > origRowEstimate || + inputBatchSize.dataSize > origInputBatchSize; } /** @@ -215,18 +384,23 @@ public class SortMemoryManager { spillBatchRowCount = rowsPerBatch(preferredSpillBatchSize); + // But, don't allow spill batches to be too small; we pay too + // much overhead cost for small row counts. + + spillBatchRowCount = Math.max(spillBatchRowCount, MIN_ROWS_PER_SORT_BATCH); + // Compute the actual spill batch size which may be larger or smaller - // than the preferred size depending on the row width. Double the estimated - // memory needs to allow for power-of-two rounding. + // than the preferred size depending on the row width. - expectedSpillBatchSize = batchForRows(spillBatchRowCount); + spillBatchSize.setFromData(spillBatchRowCount * estimatedRowWidth); // Determine the minimum memory needed for spilling. Spilling is done just // before accepting a spill batch, so we must spill if we don't have room for a // (worst case) input batch. To spill, we need room for the spill batch created - // by merging the batches already in memory. + // by merging the batches already in memory. This is a memory calculation, + // so use the buffer size for the spill batch. - bufferMemoryLimit = memoryLimit - expectedSpillBatchSize; + bufferMemoryLimit = memoryLimit - 2 * spillBatchSize.maxBufferSize; } /** @@ -238,13 +412,21 @@ public class SortMemoryManager { private void updateMergeSettings() { mergeBatchRowCount = rowsPerBatch(preferredMergeBatchSize); - expectedMergeBatchSize = batchForRows(mergeBatchRowCount); + + // But, don't allow merge batches to be too small; we pay too + // much overhead cost for small row counts. + + mergeBatchRowCount = Math.max(mergeBatchRowCount, MIN_ROWS_PER_SORT_BATCH); + + // Compute the actual merge batch size. + + mergeBatchSize.setFromData(mergeBatchRowCount * estimatedRowWidth); // The merge memory pool assumes we can spill all input batches. The memory // available to hold spill batches for merging is total memory minus the // expected output batch size. - mergeMemoryLimit = memoryLimit - expectedMergeBatchSize; + mergeMemoryLimit = memoryLimit - mergeBatchSize.maxBufferSize; } /** @@ -271,22 +453,27 @@ public class SortMemoryManager { private void adjustForLowMemory() { - long loadHeadroom = bufferMemoryLimit - 2 * estimatedInputSize; - long mergeHeadroom = mergeMemoryLimit - 2 * expectedSpillBatchSize; - if (loadHeadroom >= 0 && mergeHeadroom >= 0) { - return; - } + potentialOverflow = false; + performanceWarning = false; - lowMemorySpillBatchSize(); - lowMemoryMergeBatchSize(); + // Input batches are assumed to have typical fragmentation. Experience + // shows that spilled batches have close to the maximum fragmentation. + + long loadHeadroom = bufferMemoryLimit - 2 * inputBatchSize.expectedBufferSize; + long mergeHeadroom = mergeMemoryLimit - 2 * spillBatchSize.maxBufferSize; + isLowMemory = (loadHeadroom < 0 | mergeHeadroom < 0); + if (! isLowMemory) { + return; } + + lowMemoryInternalBatchSizes(); // Sanity check: if we've been given too little memory to make progress, // issue a warning but proceed anyway. Should only occur if something is // configured terribly wrong. - long minNeeds = 2 * estimatedInputSize + expectedSpillBatchSize; + long minNeeds = 2 * inputBatchSize.expectedBufferSize + spillBatchSize.maxBufferSize; if (minNeeds > memoryLimit) { - ExternalSortBatch.logger.warn("Potential memory overflow during load phase! " + + logger.warn("Potential memory overflow during load phase! " + "Minimum needed = {} bytes, actual available = {} bytes", minNeeds, memoryLimit); bufferMemoryLimit = 0; @@ -295,14 +482,36 @@ public class SortMemoryManager { // Sanity check - minNeeds = 2 * expectedSpillBatchSize + expectedMergeBatchSize; + minNeeds = 2 * spillBatchSize.expectedBufferSize + mergeBatchSize.expectedBufferSize; if (minNeeds > memoryLimit) { - ExternalSortBatch.logger.warn("Potential memory overflow during merge phase! " + + logger.warn("Potential memory overflow during merge phase! " + "Minimum needed = {} bytes, actual available = {} bytes", minNeeds, memoryLimit); mergeMemoryLimit = 0; potentialOverflow = true; } + + // Performance warning + + if (potentialOverflow) { + return; + } + if (spillBatchSize.dataSize < config.spillBatchSize() && + spillBatchRowCount < Character.MAX_VALUE) { + logger.warn("Potential performance degredation due to low memory. " + + "Preferred spill batch size: {}, actual: {}, rows per batch: {}", + config.spillBatchSize(), spillBatchSize.dataSize, + spillBatchRowCount); + performanceWarning = true; + } + if (mergeBatchSize.dataSize < config.mergeBatchSize() && + mergeBatchRowCount < Character.MAX_VALUE) { + logger.warn("Potential performance degredation due to low memory. " + + "Preferred merge batch size: {}, actual: {}, rows per batch: {}", + config.mergeBatchSize(), mergeBatchSize.dataSize, + mergeBatchRowCount); + performanceWarning = true; + } } /** @@ -312,52 +521,66 @@ public class SortMemoryManager { * one spill batch to make progress. */ - private void lowMemorySpillBatchSize() { + private void lowMemoryInternalBatchSizes() { // The "expected" size is with power-of-two rounding in some vectors. // We later work backwards to the row count assuming average internal // fragmentation. - // Must hold two input batches. Use (most of) the rest for the spill batch. + // Must hold two input batches. Use half of the rest for the spill batch. + // In a really bad case, the number here may be negative. We'll fix + // it below. - expectedSpillBatchSize = (int) (memoryLimit - 2 * estimatedInputSize); + int spillBufferSize = (int) (memoryLimit - 2 * inputBatchSize.maxBufferSize) / 2; // But, in the merge phase, we need two spill batches and one output batch. // (Assume that the spill and merge are equal sizes.) - // Use 3/4 of memory for each batch (to allow power-of-two rounding: - expectedSpillBatchSize = (int) Math.min(expectedSpillBatchSize, memoryLimit/3); + spillBufferSize = (int) Math.min(spillBufferSize, memoryLimit/4); - // Never going to happen, but let's ensure we don't somehow create large batches. + // Compute the size from the buffer. Assume worst-case + // fragmentation (as is typical when reading from the spill file.) - expectedSpillBatchSize = Math.max(expectedSpillBatchSize, SortConfig.MIN_SPILL_BATCH_SIZE); + spillBatchSize.setFromWorstCaseBuffer(spillBufferSize); // Must hold at least one row to spill. That is, we can make progress if we // create spill files that consist of single-record batches. - expectedSpillBatchSize = Math.max(expectedSpillBatchSize, estimatedRowWidth); + int spillDataSize = Math.min(spillBatchSize.dataSize, config.spillBatchSize()); + spillDataSize = Math.max(spillDataSize, estimatedRowWidth); + if (spillDataSize != spillBatchSize.dataSize) { + spillBatchSize.setFromData(spillDataSize); + } // Work out the spill batch count needed by the spill code. Allow room for // power-of-two rounding. - spillBatchRowCount = rowsPerBatch(expectedSpillBatchSize); + spillBatchRowCount = rowsPerBatch(spillBatchSize.dataSize); // Finally, figure out when we must spill. - bufferMemoryLimit = memoryLimit - expectedSpillBatchSize; - } + bufferMemoryLimit = memoryLimit - 2 * spillBatchSize.maxBufferSize; + bufferMemoryLimit = Math.max(bufferMemoryLimit, 0); - /** - * For merge batch, we must hold at least two spill batches and - * one output batch. - */ + // Assume two spill batches must be merged (plus safety margin.) + // The rest can be give to the merge batch. + + long mergeBufferSize = memoryLimit - 2 * spillBatchSize.maxBufferSize; + + // The above calcs assume that the merge batch size is the same as + // the spill batch size (the division by three.) + // For merge batch, we must hold at least two spill batches and + // one output batch, which is why we assumed 3 spill batches. - private void lowMemoryMergeBatchSize() { - expectedMergeBatchSize = (int) (memoryLimit - 2 * expectedSpillBatchSize); - expectedMergeBatchSize = Math.max(expectedMergeBatchSize, SortConfig.MIN_MERGE_BATCH_SIZE); - expectedMergeBatchSize = Math.max(expectedMergeBatchSize, estimatedRowWidth); - mergeBatchRowCount = rowsPerBatch(expectedMergeBatchSize); - mergeMemoryLimit = memoryLimit - expectedMergeBatchSize; + mergeBatchSize.setFromBuffer((int) mergeBufferSize); + int mergeDataSize = Math.min(mergeBatchSize.dataSize, config.mergeBatchSize()); + mergeDataSize = Math.max(mergeDataSize, estimatedRowWidth); + if (mergeDataSize != mergeBatchSize.dataSize) { + mergeBatchSize.setFromData(spillDataSize); + } + + mergeBatchRowCount = rowsPerBatch(mergeBatchSize.dataSize); + mergeMemoryLimit = Math.max(2 * spillBatchSize.expectedBufferSize, memoryLimit - mergeBatchSize.maxBufferSize); } /** @@ -367,14 +590,34 @@ public class SortMemoryManager { private void logSettings(int actualRecordCount) { - ExternalSortBatch.logger.debug("Input Batch Estimates: record size = {} bytes; input batch = {} bytes, {} records", - estimatedRowWidth, estimatedInputBatchSize, actualRecordCount); - ExternalSortBatch.logger.debug("Merge batch size = {} bytes, {} records; spill file size: {} bytes", - expectedSpillBatchSize, spillBatchRowCount, config.spillFileSize()); - ExternalSortBatch.logger.debug("Output batch size = {} bytes, {} records", - expectedMergeBatchSize, mergeBatchRowCount); - ExternalSortBatch.logger.debug("Available memory: {}, buffer memory = {}, merge memory = {}", + logger.debug("Input Batch Estimates: record size = {} bytes; net = {} bytes, gross = {}, records = {}", + estimatedRowWidth, inputBatchSize.dataSize, + inputBatchSize.expectedBufferSize, actualRecordCount); + logger.debug("Spill batch size: net = {} bytes, gross = {} bytes, records = {}; spill file = {} bytes", + spillBatchSize.dataSize, spillBatchSize.expectedBufferSize, + spillBatchRowCount, config.spillFileSize()); + logger.debug("Output batch size: net = {} bytes, gross = {} bytes, records = {}", + mergeBatchSize.dataSize, mergeBatchSize.expectedBufferSize, + mergeBatchRowCount); + logger.debug("Available memory: {}, buffer memory = {}, merge memory = {}", memoryLimit, bufferMemoryLimit, mergeMemoryLimit); + + // Performance warnings due to low row counts per batch. + // Low row counts cause excessive per-batch overhead and hurt + // performance. + + if (spillBatchRowCount < MIN_ROWS_PER_SORT_BATCH) { + logger.warn("Potential performance degredation due to low memory or large input row. " + + "Preferred spill batch row count: {}, actual: {}", + MIN_ROWS_PER_SORT_BATCH, spillBatchRowCount); + performanceWarning = true; + } + if (mergeBatchRowCount < MIN_ROWS_PER_SORT_BATCH) { + logger.warn("Potential performance degredation due to low memory or large input row. " + + "Preferred merge batch row count: {}, actual: {}", + MIN_ROWS_PER_SORT_BATCH, mergeBatchRowCount); + performanceWarning = true; + } } public enum MergeAction { SPILL, MERGE, NONE } @@ -389,86 +632,120 @@ public class SortMemoryManager { } } + /** + * Choose a consolidation option during the merge phase depending on memory + * available. Preference is given to moving directly onto merging (with no + * additional spilling) when possible. But, if memory pressures don't allow + * this, we must spill batches and/or merge on-disk spilled runs, to reduce + * the final set of runs to something that can be merged in the available + * memory. + * <p> + * Logic is here (returning an enum) rather than in the merge code to allow + * unit testing without actually needing batches in memory. + * + * @param allocMemory + * amount of memory currently allocated (this class knows the total + * memory available) + * @param inMemCount + * number of incoming batches in memory (the number is important, not + * the in-memory size; we get the memory size from + * <tt>allocMemory</tt>) + * @param spilledRunsCount + * the number of runs sitting on disk to be merged + * @return whether to <tt>SPILL</tt> in-memory batches, whether to + * <tt>MERGE<tt> on-disk batches to create a new, larger run, or whether + * to do nothing (<tt>NONE</tt>) and instead advance to the final merge + */ + public MergeTask consolidateBatches(long allocMemory, int inMemCount, int spilledRunsCount) { - // Determine additional memory needed to hold one batch from each - // spilled run. + assert allocMemory == 0 || inMemCount > 0; + assert inMemCount + spilledRunsCount > 0; - // If the on-disk batches and in-memory batches need more memory than - // is available, spill some in-memory batches. + // If only one spilled run, then merging is not productive regardless + // of memory limits. - if (inMemCount > 0) { - long mergeSize = spilledRunsCount * expectedSpillBatchSize; - if (allocMemory + mergeSize > mergeMemoryLimit) { - return new MergeTask(MergeAction.SPILL, 0); - } + if (inMemCount == 0 && spilledRunsCount <= 1) { + return new MergeTask(MergeAction.NONE, 0); } - // Maximum batches that fit into available memory. + // If memory is above the merge memory limit, then must spill + // merge to create room for a merge batch. - int mergeLimit = (int) ((mergeMemoryLimit - allocMemory) / expectedSpillBatchSize); + if (allocMemory > mergeMemoryLimit) { + return new MergeTask(MergeAction.SPILL, 0); + } - // Can't merge more than the merge limit. + // Determine additional memory needed to hold one batch from each + // spilled run. + + // Maximum spill batches that fit into available memory. - mergeLimit = Math.min(mergeLimit, config.mergeLimit()); + int memMergeLimit = (int) ((mergeMemoryLimit - allocMemory) / + spillBatchSize.expectedBufferSize); + memMergeLimit = Math.max(0, memMergeLimit); - // How many batches to merge? + // If batches are in memory, and we need more memory to merge + // them all than is actually available, then spill some in-memory + // batches. + + if (inMemCount > 0 && memMergeLimit < spilledRunsCount) { + return new MergeTask(MergeAction.SPILL, 0); + } - int mergeCount = spilledRunsCount - mergeLimit; - if (mergeCount <= 0) { + // If all batches fit in memory, then no need for a second-generation + // merge/spill. + + memMergeLimit = Math.min(memMergeLimit, config.mergeLimit()); + int mergeRunCount = spilledRunsCount - memMergeLimit; + if (mergeRunCount <= 0) { return new MergeTask(MergeAction.NONE, 0); } - // We will merge. This will create yet another spilled - // run. Account for that. + // We need a second generation load-merge-spill cycle + // to reduce the number of spilled runs to a smaller set + // that will fit in memory. + + // Merging creates another batch. Include one more run + // in the merge to create space for the new run. + + mergeRunCount += 1; - mergeCount += 1; + // Merge only as many batches as fit in memory. + // Use all memory for this process; no need to reserve space for a + // merge output batch. Assume worst case since we are forced to + // accept spilled batches blind: we can't limit reading based on memory + // limits. Subtract one to allow for the output spill batch. + + memMergeLimit = (int)(memoryLimit / spillBatchSize.maxBufferSize) - 1; + mergeRunCount = Math.min(mergeRunCount, memMergeLimit); // Must merge at least 2 batches to make progress. - // This is the the (at least one) excess plus the allowance - // above for the new one. + // We know we have at least two because of the check done above. - // Can't merge more than the limit. + mergeRunCount = Math.max(mergeRunCount, 2); - mergeCount = Math.min(mergeCount, config.mergeLimit()); + // Can't merge more than the merge limit. - // Do the merge, then loop to try again in case not - // all the target batches spilled in one go. + mergeRunCount = Math.min(mergeRunCount, config.mergeLimit()); - return new MergeTask(MergeAction.MERGE, mergeCount); + return new MergeTask(MergeAction.MERGE, mergeRunCount); } /** - * Compute the number of rows per batch assuming that the batch is - * subject to average internal fragmentation due to power-of-two - * rounding on vectors. - * <p> - * <pre>[____|__$__]</pre> - * In the above, the brackets represent the whole vector. The - * first half is always full. When the first half filled, the second - * half was allocated. On average, the second half will be half full. + * Compute the number of rows that fit into a given batch data size. * * @param batchSize expected batch size, including internal fragmentation * @return number of rows that fit into the batch */ private int rowsPerBatch(int batchSize) { - int rowCount = batchSize * 3 / 4 / estimatedRowWidth; + int rowCount = batchSize / estimatedRowWidth; return Math.max(1, Math.min(rowCount, Character.MAX_VALUE)); } - /** - * Compute the expected number of rows that fit into a given size - * batch, accounting for internal fragmentation due to power-of-two - * rounding on vector allocations. - * - * @param rowCount the desired number of rows in the batch - * @return the size of resulting batch, including power-of-two - * rounding. - */ - - private int batchForRows(int rowCount) { - return estimatedRowWidth * rowCount * 4 / 3; + public static int multiply(int byteSize, double multiplier) { + return (int) Math.floor(byteSize * multiplier); } // Must spill if we are below the spill point (the amount of memory @@ -497,17 +774,21 @@ public class SortMemoryManager { @VisibleForTesting public int getRowWidth() { return estimatedRowWidth; } @VisibleForTesting - public int getInputBatchSize() { return estimatedInputBatchSize; } + public BatchSizeEstimate getInputBatchSize() { return inputBatchSize; } @VisibleForTesting public int getPreferredSpillBatchSize() { return preferredSpillBatchSize; } @VisibleForTesting public int getPreferredMergeBatchSize() { return preferredMergeBatchSize; } @VisibleForTesting - public int getSpillBatchSize() { return expectedSpillBatchSize; } + public BatchSizeEstimate getSpillBatchSize() { return spillBatchSize; } @VisibleForTesting - public int getMergeBatchSize() { return expectedMergeBatchSize; } + public BatchSizeEstimate getMergeBatchSize() { return mergeBatchSize; } @VisibleForTesting public long getBufferMemoryLimit() { return bufferMemoryLimit; } @VisibleForTesting public boolean mayOverflow() { return potentialOverflow; } + @VisibleForTesting + public boolean isLowMemory() { return isLowMemory; } + @VisibleForTesting + public boolean hasPerformanceWarning() { return performanceWarning; } }
http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SorterWrapper.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SorterWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SorterWrapper.java index 4231cf4..0f27884 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SorterWrapper.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SorterWrapper.java @@ -83,10 +83,9 @@ public class SorterWrapper extends BaseSortWrapper { ClassGenerator<SingleBatchSorter> g = cg.getRoot(); cg.plainJavaCapable(true); // Uncomment out this line to debug the generated code. - cg.saveCodeForDebugging(true); +// cg.saveCodeForDebugging(true); generateComparisons(g, batch, logger); return getInstance(cg, logger); } - } http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SpilledRuns.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SpilledRuns.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SpilledRuns.java index a6042c6..b75ce77 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SpilledRuns.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SpilledRuns.java @@ -28,6 +28,7 @@ import org.apache.drill.exec.physical.impl.spill.SpillSet; import org.apache.drill.exec.physical.impl.xsort.managed.BatchGroup.SpilledRun; import org.apache.drill.exec.physical.impl.xsort.managed.SortImpl.SortResults; import org.apache.drill.exec.record.BatchSchema; +import org.apache.drill.exec.record.VectorInitializer; import org.apache.drill.exec.record.VectorContainer; import com.google.common.collect.Lists; @@ -86,13 +87,14 @@ public class SpilledRuns { return batchesToSpill; } - public void mergeAndSpill(List<BatchGroup> batchesToSpill, int spillBatchRowCount) { - spilledRuns.add(safeMergeAndSpill(batchesToSpill, spillBatchRowCount)); + public void mergeAndSpill(List<BatchGroup> batchesToSpill, int spillBatchRowCount, VectorInitializer allocHelper) { + spilledRuns.add(safeMergeAndSpill(batchesToSpill, spillBatchRowCount, allocHelper)); logger.trace("Completed spill: memory = {}", context.getAllocator().getAllocatedMemory()); } - public void mergeRuns(int targetCount, long mergeMemoryPool, int spillBatchRowCount) { + public void mergeRuns(int targetCount, long mergeMemoryPool, + int spillBatchRowCount, VectorInitializer allocHelper) { long allocated = context.getAllocator().getAllocatedMemory(); mergeMemoryPool -= context.getAllocator().getAllocatedMemory(); @@ -128,12 +130,12 @@ public class SpilledRuns { // Do the actual spill. List<BatchGroup> batchesToSpill = prepareSpillBatches(spilledRuns, mergeCount); - mergeAndSpill(batchesToSpill, spillBatchRowCount); + mergeAndSpill(batchesToSpill, spillBatchRowCount, allocHelper); } - private BatchGroup.SpilledRun safeMergeAndSpill(List<? extends BatchGroup> batchesToSpill, int spillBatchRowCount) { + private BatchGroup.SpilledRun safeMergeAndSpill(List<? extends BatchGroup> batchesToSpill, int spillBatchRowCount, VectorInitializer allocHelper) { try { - return doMergeAndSpill(batchesToSpill, spillBatchRowCount); + return doMergeAndSpill(batchesToSpill, spillBatchRowCount, allocHelper); } // If error is a User Exception, just use as is. @@ -145,7 +147,8 @@ public class SpilledRuns { } } - private BatchGroup.SpilledRun doMergeAndSpill(List<? extends BatchGroup> batchesToSpill, int spillBatchRowCount) throws Throwable { + private BatchGroup.SpilledRun doMergeAndSpill(List<? extends BatchGroup> batchesToSpill, + int spillBatchRowCount, VectorInitializer allocHelper) throws Throwable { // Merge the selected set of matches and write them to the // spill file. After each write, we release the memory associated @@ -155,7 +158,8 @@ public class SpilledRuns { BatchGroup.SpilledRun newGroup = null; VectorContainer dest = new VectorContainer(); try (AutoCloseable ignored = AutoCloseables.all(batchesToSpill); - PriorityQueueCopierWrapper.BatchMerger merger = copierHolder.startMerge(schema, batchesToSpill, dest, spillBatchRowCount)) { + PriorityQueueCopierWrapper.BatchMerger merger = copierHolder.startMerge(schema, batchesToSpill, + dest, spillBatchRowCount, allocHelper)) { newGroup = new BatchGroup.SpilledRun(spillSet, outputFile, context.getAllocator()); logger.trace("Spilling {} batches, into spill batches of {} rows, to {}", batchesToSpill.size(), spillBatchRowCount, outputFile); @@ -175,9 +179,9 @@ public class SpilledRuns { } context.injectChecked(ExternalSortBatch.INTERRUPTION_WHILE_SPILLING, IOException.class); newGroup.closeOutputStream(); - logger.trace("Spilled {} output batches, each of {} by bytes, {} records to {}", - merger.getBatchCount(), merger.getRecordCount(), - merger.getEstBatchSize(), outputFile); + logger.trace("Spilled {} output batches, each of {} bytes, {} records, to {}", + merger.getBatchCount(), merger.getEstBatchSize(), + spillBatchRowCount, outputFile); newGroup.setBatchSize(merger.getEstBatchSize()); return newGroup; } catch (Throwable e) { @@ -192,7 +196,8 @@ public class SpilledRuns { } } - public SortResults finalMerge(List<? extends BatchGroup> bufferedBatches, VectorContainer container, int mergeRowCount) { + public SortResults finalMerge(List<? extends BatchGroup> bufferedBatches, + VectorContainer container, int mergeRowCount, VectorInitializer allocHelper) { List<BatchGroup> allBatches = new LinkedList<>(); allBatches.addAll(bufferedBatches); bufferedBatches.clear(); @@ -200,7 +205,7 @@ public class SpilledRuns { spilledRuns.clear(); logger.debug("Starting merge phase. Runs = {}, Alloc. memory = {}", allBatches.size(), context.getAllocator().getAllocatedMemory()); - return copierHolder.startMerge(schema, allBatches, container, mergeRowCount); + return copierHolder.startMerge(schema, allBatches, container, mergeRowCount, allocHelper); } public void close() { http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java index ca275c7..eb90614 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java @@ -117,17 +117,19 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements return IterOutcome.STOP; } next = b.next(); - }finally{ + } finally { stats.startProcessing(); } - switch(next){ + switch(next) { case OK_NEW_SCHEMA: stats.batchReceived(inputIndex, b.getRecordCount(), true); break; case OK: stats.batchReceived(inputIndex, b.getRecordCount(), false); break; + default: + break; } return next; http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java index 44c6b1a..4e47051 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -81,6 +81,7 @@ public class HyperVectorWrapper<T extends ValueVector> implements VectorWrapper< } } + @SuppressWarnings("resource") @Override public VectorWrapper<?> getChildWrapper(int[] ids) { if (ids.length == 1) { @@ -105,6 +106,7 @@ public class HyperVectorWrapper<T extends ValueVector> implements VectorWrapper< return new HyperVectorWrapper<ValueVector>(vectors[0].getField(), vectors); } + @SuppressWarnings("resource") @Override public TypedFieldId getFieldIdIfMatches(int id, SchemaPath expectedPath) { ValueVector v = vectors[0]; @@ -112,7 +114,6 @@ public class HyperVectorWrapper<T extends ValueVector> implements VectorWrapper< } @Override - @SuppressWarnings("unchecked") public VectorWrapper<T> cloneAndTransfer(BufferAllocator allocator) { return new HyperVectorWrapper<T>(f, vectors, false); // T[] newVectors = (T[]) Array.newInstance(vectors.getClass().getComponentType(), vectors.length); @@ -128,12 +129,14 @@ public class HyperVectorWrapper<T extends ValueVector> implements VectorWrapper< return new HyperVectorWrapper<T>(f, v, releasable); } + @SuppressWarnings("unchecked") public void addVector(ValueVector v) { Preconditions.checkArgument(v.getClass() == this.getVectorClass(), String.format("Cannot add vector type %s to hypervector type %s for field %s", v.getClass(), this.getVectorClass(), v.getField())); vectors = (T[]) ArrayUtils.add(vectors, v);// TODO optimize this so not copying every time } + @SuppressWarnings("unchecked") public void addVectors(ValueVector[] vv) { vectors = (T[]) ArrayUtils.add(vectors, vv); } http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java index 0daa6b3..b4ae2d2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -19,8 +19,6 @@ package org.apache.drill.exec.record; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.ops.FragmentContext; -import org.apache.drill.exec.record.selection.SelectionVector2; -import org.apache.drill.exec.record.selection.SelectionVector4; /** * A record batch contains a set of field values for a particular range of http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorInitializer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorInitializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorInitializer.java new file mode 100644 index 0000000..5dd348e --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorInitializer.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.record; + +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.drill.exec.vector.AllocationHelper; +import org.apache.drill.exec.vector.ValueVector; +import org.apache.drill.exec.vector.complex.AbstractMapVector; +import org.apache.drill.exec.vector.complex.RepeatedMapVector; + +/** + * Prototype mechanism to allocate vectors based on expected + * data sizes. This version uses a name-based map of fields + * to sizes. Better to represent the batch structurally and + * simply iterate over the schema rather than doing a per-field + * lookup. But, the mechanisms needed to do the efficient solution + * don't exist yet. + */ + +public class VectorInitializer { + + private static class AllocationHint { + public final int entryWidth; + public final int elementCount; + + private AllocationHint(int width, int elements) { + entryWidth = width; + elementCount = elements; + } + + @Override + public String toString() { + StringBuilder buf = new StringBuilder() + .append("{"); + String sep = ""; + if (entryWidth > 0) { + buf.append("width=") + .append(entryWidth); + sep = ", "; + } + if (elementCount > 0) { + buf.append(sep) + .append("elements=") + .append(elementCount); + } + buf.append("}"); + return buf.toString(); + } + } + + private Map<String, AllocationHint> hints = new HashMap<>(); + + public void variableWidth(String name, int width) { + hints.put(name, new AllocationHint(width, 1)); + } + + public void fixedWidthArray(String name, int elements) { + hints.put(name, new AllocationHint(0, elements)); + } + + public void variableWidthArray(String name, int width, int elements) { + hints.put(name, new AllocationHint(width, elements)); + } + + public void allocateBatch(VectorAccessible va, int recordCount) { + for (VectorWrapper<?> w: va) { + allocateVector(w.getValueVector(), "", recordCount); + } + } + + private void allocateVector(ValueVector vector, String prefix, int recordCount) { + String key = prefix + vector.getField().getName(); + AllocationHint hint = hints.get(key); + if (vector instanceof AbstractMapVector) { + allocateMap((AbstractMapVector) vector, prefix, recordCount, hint); + } else { + allocateVector(vector, recordCount, hint); + } +// Set<BufferLedger> ledgers = new HashSet<>(); +// vector.getLedgers(ledgers); +// int size = 0; +// for (BufferLedger ledger : ledgers) { +// size += ledger.getAccountedSize(); +// } +// System.out.println(key + ": " + vector.getField().toString() + +// " " + +// ((hint == null) ? "no hint" : hint.toString()) + +// ", " + size); + } + + private void allocateVector(ValueVector vector, int recordCount, AllocationHint hint) { + if (hint == null) { + // Use hard-coded values. Same as ScanBatch + + AllocationHelper.allocate(vector, recordCount, 50, 10); + } else { + AllocationHelper.allocate(vector, recordCount, hint.entryWidth, hint.elementCount); + } + } + + private void allocateMap(AbstractMapVector map, String prefix, int recordCount, AllocationHint hint) { + if (map instanceof RepeatedMapVector) { + ((RepeatedMapVector) map).allocateOffsetsNew(recordCount); + if (hint == null) { + recordCount *= 10; + } else { + recordCount *= hint.elementCount; + } + } + prefix += map.getField().getName() + "."; + for (ValueVector vector : map) { + allocateVector(vector, prefix, recordCount); + } + } + + @Override + public String toString() { + StringBuilder buf = new StringBuilder(); + buf.append("[" + getClass().getSimpleName()) + .append(" "); + boolean first = true; + for (Entry<String, AllocationHint>entry : hints.entrySet()) { + if (! first) { + buf.append(", "); + } + first = false; + buf.append("[") + .append(entry.getKey()) + .append(" ") + .append(entry.getValue().toString()) + .append("]"); + } + buf.append("]"); + return buf.toString(); + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java index bcec920..b3b46c2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -80,6 +80,7 @@ public class WritableBatch implements AutoCloseable { len += b.capacity(); } + @SuppressWarnings("resource") DrillBuf newBuf = allocator.buffer(len); try { /* Copy data from each buffer into the compound buffer */ @@ -101,7 +102,9 @@ public class WritableBatch implements AutoCloseable { for (VectorWrapper<?> vv : container) { SerializedField fmd = fields.get(vectorIndex); + @SuppressWarnings("resource") ValueVector v = vv.getValueVector(); + @SuppressWarnings("resource") DrillBuf bb = newBuf.slice(bufferOffset, fmd.getBufferLength()); // v.load(fmd, cbb.slice(bufferOffset, fmd.getBufferLength())); v.load(fmd, bb); http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java index 0d341df..7ed9220 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java @@ -293,6 +293,7 @@ public class Drillbit implements AutoCloseable { return start(config, null); } + @SuppressWarnings("resource") public static Drillbit start(final DrillConfig config, final RemoteServiceSet remoteServiceSet) throws DrillbitStartupException { logger.debug("Starting new Drillbit."); http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/java-exec/src/main/resources/drill-module.conf ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf index 437862e..41ecc95 100644 --- a/exec/java-exec/src/main/resources/drill-module.conf +++ b/exec/java-exec/src/main/resources/drill-module.conf @@ -251,8 +251,6 @@ drill.exec: { external: { // Drill uses the managed External Sort Batch by default. // Set this to true to use the legacy, unmanaged version. - // Disabled in the intial commit, to be enabled after - // tests are committed. disable_managed: true, // Limit on the number of batches buffered in memory. // Primarily for testing. @@ -282,9 +280,9 @@ drill.exec: { directories: ${drill.exec.spill.directories}, // Size of the batches written to, and read from, the spill files. // Determines the ratio of memory to input data size for a single- - // generation sort. Smaller values give larger ratios, but at a - // (high) cost of much greater disk seek times. - spill_batch_size = 8M, + // generation sort. Smaller values are better, but too small + // incurs per-batch overhead. + spill_batch_size = 1M, // Preferred file size for "first-generation" spill files. // Set large enough to get long, continuous writes, but not so // large as to overwhelm a temp directory. @@ -292,7 +290,8 @@ drill.exec: { file_size: 256M, // Size of the batch sent downstream from the sort operator during // the merge phase. Don't change this unless you know what you are doing, - // larger sizes can result in memory fragmentation. + // larger sizes can result in memory fragmentation, smaller sizes + // in excessive operator iterator overhead. merge_batch_size = 16M } } http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java b/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java index 7700a1e..ee350ce 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java +++ b/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java @@ -31,7 +31,6 @@ import org.junit.Test; import java.util.List; public class TestUnionAll extends BaseTestQuery{ -// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestUnionAll.class); private static final String sliceTargetSmall = "alter session set `planner.slice_target` = 1"; private static final String sliceTargetDefault = "alter session reset `planner.slice_target`"; http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestBatchSerialization.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestBatchSerialization.java b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestBatchSerialization.java index 05670c5..cfb8645 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestBatchSerialization.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestBatchSerialization.java @@ -17,9 +17,6 @@ */ package org.apache.drill.exec.cache; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - import java.io.BufferedInputStream; import java.io.BufferedOutputStream; import java.io.File; @@ -119,8 +116,6 @@ public class TestBatchSerialization extends DrillTest { */ private void verifySerialize(SingleRowSet rowSet, SingleRowSet expected) throws IOException { - long origSize = rowSet.size(); - File dir = OperatorFixture.getTempDir("serial"); File outFile = new File(dir, "serialze.dat"); try (OutputStream out = new BufferedOutputStream(new FileOutputStream(outFile))) { @@ -135,7 +130,6 @@ public class TestBatchSerialization extends DrillTest { .read()); } - assertTrue(origSize >= result.size()); new RowSetComparison(expected) .verifyAndClearAll(result); outFile.delete(); http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java index 76f0935..73f9b6d 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java @@ -440,6 +440,11 @@ public class TestWindowFrame extends BaseTestQuery { .go(); } + // Note: This test is unstable. It works when forcing the merge/sort batch + // size to 20, but not for other sizes. The problem is either that the results + // are not ordered (and so subject to sort instability), or there is some bug + // somewhere in the window functions. + @Test public void test4657() throws Exception { testBuilder() http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSimpleExternalSort.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSimpleExternalSort.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSimpleExternalSort.java index f643d5f..7e63600 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSimpleExternalSort.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSimpleExternalSort.java @@ -35,14 +35,17 @@ import org.apache.drill.test.ClientFixture; import org.apache.drill.test.ClusterFixture; import org.apache.drill.test.DrillTest; import org.apache.drill.test.FixtureBuilder; +import org.apache.drill.test.SecondaryTest; import org.junit.Rule; import org.junit.Test; +import org.junit.experimental.categories.Category; import org.junit.rules.TestRule; +@Category(SecondaryTest.class) public class TestSimpleExternalSort extends DrillTest { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestSimpleExternalSort.class); - @Rule public final TestRule TIMEOUT = TestTools.getTimeoutRule(80000); + @Rule public final TestRule TIMEOUT = TestTools.getTimeoutRule(80_000); @Test public void mergeSortWithSv2Managed() throws Exception { @@ -100,7 +103,7 @@ public class TestSimpleExternalSort extends DrillTest { ClientFixture client = cluster.clientFixture()) { chooseImpl(client, testLegacy); List<QueryDataBatch> results = client.queryBuilder().physicalResource("xsort/one_key_sort_descending.json").results(); - assertEquals(1000000, client.countResults(results)); + assertEquals(1_000_000, client.countResults(results)); validateResults(client.allocator(), results); } } http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSortSpillWithException.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSortSpillWithException.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSortSpillWithException.java index 5a1bf6d..bbb48af 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSortSpillWithException.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSortSpillWithException.java @@ -64,8 +64,8 @@ public class TestSortSpillWithException extends ClusterTest { // inject exception in sort while spilling final String controls = Controls.newBuilder() .addExceptionOnBit( - org.apache.drill.exec.physical.impl.xsort.ExternalSortBatch.class, - org.apache.drill.exec.physical.impl.xsort.ExternalSortBatch.INTERRUPTION_WHILE_SPILLING, + ExternalSortBatch.class, + ExternalSortBatch.INTERRUPTION_WHILE_SPILLING, IOException.class, cluster.drillbit().getContext().getEndpoint()) .build(); @@ -87,8 +87,8 @@ public class TestSortSpillWithException extends ClusterTest { // inject exception in sort while spilling final String controls = Controls.newBuilder() .addExceptionOnBit( - org.apache.drill.exec.physical.impl.xsort.managed.ExternalSortBatch.class, - org.apache.drill.exec.physical.impl.xsort.managed.ExternalSortBatch.INTERRUPTION_WHILE_SPILLING, + ExternalSortBatch.class, + ExternalSortBatch.INTERRUPTION_WHILE_SPILLING, IOException.class, cluster.drillbit().getContext().getEndpoint()) .build(); http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/SortTestUtilities.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/SortTestUtilities.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/SortTestUtilities.java index 1a4d4b2..8ba34ef 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/SortTestUtilities.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/SortTestUtilities.java @@ -104,7 +104,7 @@ public class SortTestUtilities { VectorContainer dest = new VectorContainer(); @SuppressWarnings("resource") BatchMerger merger = copier.startMerge(schema.toBatchSchema(SelectionVectorMode.NONE), - batches, dest, rowCount); + batches, dest, rowCount, null); verifyResults(merger, dest); dest.clear(); http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestCopier.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestCopier.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestCopier.java index 0050747..6464b5a 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestCopier.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestCopier.java @@ -69,8 +69,13 @@ public class TestCopier extends DrillTest { PriorityQueueCopierWrapper copier = SortTestUtilities.makeCopier(fixture, Ordering.ORDER_ASC, Ordering.NULLS_UNSPECIFIED); VectorContainer dest = new VectorContainer(); try { + // TODO: Create a vector allocator to pass as last parameter so + // that the test uses the same vector allocator as the production + // code. Only nuisance is that we don't have the required metadata + // readily at hand here... + @SuppressWarnings({ "resource", "unused" }) - BatchMerger merger = copier.startMerge(schema, batches, dest, 10); + BatchMerger merger = copier.startMerge(schema, batches, dest, 10, null); fail(); } catch (AssertionError e) { // Expected http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestExternalSortInternals.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestExternalSortInternals.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestExternalSortInternals.java index 6bff088..69e9e1b 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestExternalSortInternals.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestExternalSortInternals.java @@ -47,12 +47,14 @@ public class TestExternalSortInternals extends DrillTest { assertEquals(Integer.MAX_VALUE, sortConfig.mergeLimit()); // Default size: 256 MiB assertEquals(256 * ONE_MEG, sortConfig.spillFileSize()); - // Default size: 8 MiB - assertEquals(8 * ONE_MEG, sortConfig.spillBatchSize()); + // Default size: 1 MiB + assertEquals(ONE_MEG, sortConfig.spillBatchSize()); // Default size: 16 MiB assertEquals(16 * ONE_MEG, sortConfig.mergeBatchSize()); // Default: unlimited assertEquals(Integer.MAX_VALUE, sortConfig.getBufferedBatchLimit()); + // Default: 64K + assertEquals(Character.MAX_VALUE, sortConfig.getMSortBatchSize()); } /** @@ -69,6 +71,7 @@ public class TestExternalSortInternals extends DrillTest { .put(ExecConstants.EXTERNAL_SORT_SPILL_BATCH_SIZE, 500_000) .put(ExecConstants.EXTERNAL_SORT_MERGE_BATCH_SIZE, 600_000) .put(ExecConstants.EXTERNAL_SORT_BATCH_LIMIT, 50) + .put(ExecConstants.EXTERNAL_SORT_MSORT_MAX_BATCHSIZE, 10) .build(); SortConfig sortConfig = new SortConfig(drillConfig); assertEquals(2000 * 1024, sortConfig.maxMemory()); @@ -77,6 +80,7 @@ public class TestExternalSortInternals extends DrillTest { assertEquals(500_000, sortConfig.spillBatchSize()); assertEquals(600_000, sortConfig.mergeBatchSize()); assertEquals(50, sortConfig.getBufferedBatchLimit()); + assertEquals(10, sortConfig.getMSortBatchSize()); } /** @@ -90,6 +94,7 @@ public class TestExternalSortInternals extends DrillTest { .put(ExecConstants.EXTERNAL_SORT_SPILL_BATCH_SIZE, SortConfig.MIN_SPILL_BATCH_SIZE - 1) .put(ExecConstants.EXTERNAL_SORT_MERGE_BATCH_SIZE, SortConfig.MIN_MERGE_BATCH_SIZE - 1) .put(ExecConstants.EXTERNAL_SORT_BATCH_LIMIT, 1) + .put(ExecConstants.EXTERNAL_SORT_MSORT_MAX_BATCHSIZE, 0) .build(); SortConfig sortConfig = new SortConfig(drillConfig); assertEquals(SortConfig.MIN_MERGE_LIMIT, sortConfig.mergeLimit()); @@ -97,13 +102,14 @@ public class TestExternalSortInternals extends DrillTest { assertEquals(SortConfig.MIN_SPILL_BATCH_SIZE, sortConfig.spillBatchSize()); assertEquals(SortConfig.MIN_MERGE_BATCH_SIZE, sortConfig.mergeBatchSize()); assertEquals(2, sortConfig.getBufferedBatchLimit()); + assertEquals(1, sortConfig.getMSortBatchSize()); } @Test public void testMemoryManagerBasics() { DrillConfig drillConfig = DrillConfig.create(); SortConfig sortConfig = new SortConfig(drillConfig); - long memoryLimit = 50 * ONE_MEG; + long memoryLimit = 70 * ONE_MEG; SortMemoryManager memManager = new SortMemoryManager(sortConfig, memoryLimit); // Basic setup @@ -120,35 +126,35 @@ public class TestExternalSortInternals extends DrillTest { int rowCount = 10000; int batchSize = rowWidth * rowCount * 2; - memManager.updateEstimates(batchSize, rowWidth, rowCount); + assertTrue(memManager.updateEstimates(batchSize, rowWidth, rowCount)); verifyCalcs(sortConfig, memoryLimit, memManager, batchSize, rowWidth, rowCount); // Zero rows - no update - memManager.updateEstimates(batchSize, rowWidth, 0); + assertFalse(memManager.updateEstimates(batchSize, rowWidth, 0)); assertEquals(rowWidth, memManager.getRowWidth()); - assertEquals(batchSize, memManager.getInputBatchSize()); + assertEquals(batchSize, memManager.getInputBatchSize().dataSize); // Larger batch size, update batch size rowCount = 20000; batchSize = rowWidth * rowCount * 2; - memManager.updateEstimates(batchSize, rowWidth, rowCount); + assertTrue(memManager.updateEstimates(batchSize, rowWidth, rowCount)); verifyCalcs(sortConfig, memoryLimit, memManager, batchSize, rowWidth, rowCount); // Smaller batch size: no change rowCount = 5000; int lowBatchSize = rowWidth * rowCount * 2; - memManager.updateEstimates(lowBatchSize, rowWidth, rowCount); + assertFalse(memManager.updateEstimates(lowBatchSize, rowWidth, rowCount)); assertEquals(rowWidth, memManager.getRowWidth()); - assertEquals(batchSize, memManager.getInputBatchSize()); + assertEquals(batchSize, memManager.getInputBatchSize().dataSize); // Different batch density, update batch size rowCount = 10000; batchSize = rowWidth * rowCount * 5; - memManager.updateEstimates(batchSize, rowWidth, rowCount); + assertTrue(memManager.updateEstimates(batchSize, rowWidth, rowCount)); verifyCalcs(sortConfig, memoryLimit, memManager, batchSize, rowWidth, rowCount); // Smaller row size, no update @@ -156,23 +162,23 @@ public class TestExternalSortInternals extends DrillTest { int lowRowWidth = 200; rowCount = 10000; lowBatchSize = rowWidth * rowCount * 2; - memManager.updateEstimates(lowBatchSize, lowRowWidth, rowCount); + assertFalse(memManager.updateEstimates(lowBatchSize, lowRowWidth, rowCount)); assertEquals(rowWidth, memManager.getRowWidth()); - assertEquals(batchSize, memManager.getInputBatchSize()); + assertEquals(batchSize, memManager.getInputBatchSize().dataSize); // Larger row size, updates calcs rowWidth = 400; rowCount = 10000; lowBatchSize = rowWidth * rowCount * 2; - memManager.updateEstimates(lowBatchSize, rowWidth, rowCount); + assertTrue(memManager.updateEstimates(lowBatchSize, rowWidth, rowCount)); verifyCalcs(sortConfig, memoryLimit, memManager, batchSize, rowWidth, rowCount); // EOF: very low density - memManager.updateEstimates(lowBatchSize, rowWidth, 5); + assertFalse(memManager.updateEstimates(lowBatchSize, rowWidth, 5)); assertEquals(rowWidth, memManager.getRowWidth()); - assertEquals(batchSize, memManager.getInputBatchSize()); + assertEquals(batchSize, memManager.getInputBatchSize().dataSize); } private void verifyCalcs(SortConfig sortConfig, long memoryLimit, SortMemoryManager memManager, int batchSize, @@ -183,7 +189,7 @@ public class TestExternalSortInternals extends DrillTest { // Row and batch sizes should be exact assertEquals(rowWidth, memManager.getRowWidth()); - assertEquals(batchSize, memManager.getInputBatchSize()); + assertEquals(batchSize, memManager.getInputBatchSize().dataSize); // Spill sizes will be rounded, but within reason. @@ -191,9 +197,9 @@ public class TestExternalSortInternals extends DrillTest { assertTrue(count >= memManager.getSpillBatchRowCount()); assertTrue(count/2 <= memManager.getSpillBatchRowCount()); int spillSize = memManager.getSpillBatchRowCount() * rowWidth; - assertTrue(spillSize <= memManager.getSpillBatchSize()); - assertTrue(spillSize >= memManager.getSpillBatchSize()/2); - assertEquals(memoryLimit - memManager.getSpillBatchSize(), memManager.getBufferMemoryLimit()); + assertTrue(spillSize <= memManager.getSpillBatchSize().dataSize); + assertTrue(spillSize >= memManager.getSpillBatchSize().dataSize/2); + assertTrue(memManager.getBufferMemoryLimit() <= memoryLimit - memManager.getSpillBatchSize().expectedBufferSize ); // Merge sizes will also be rounded, within reason. @@ -201,9 +207,9 @@ public class TestExternalSortInternals extends DrillTest { assertTrue(count >= memManager.getMergeBatchRowCount()); assertTrue(count/2 <= memManager.getMergeBatchRowCount()); int mergeSize = memManager.getMergeBatchRowCount() * rowWidth; - assertTrue(mergeSize <= memManager.getMergeBatchSize()); - assertTrue(mergeSize >= memManager.getMergeBatchSize()/2); - assertEquals(memoryLimit - memManager.getMergeBatchSize(), memManager.getMergeMemoryLimit()); + assertTrue(mergeSize <= memManager.getMergeBatchSize().dataSize); + assertTrue(mergeSize >= memManager.getMergeBatchSize().dataSize/2); + assertTrue(memManager.getMergeMemoryLimit() <= memoryLimit - memManager.getMergeBatchSize().expectedBufferSize); } @Test @@ -220,7 +226,7 @@ public class TestExternalSortInternals extends DrillTest { int batchSize = rowCount * 2; memManager.updateEstimates(batchSize, rowWidth, rowCount); assertEquals(10, memManager.getRowWidth()); - assertEquals(batchSize, memManager.getInputBatchSize()); + assertEquals(batchSize, memManager.getInputBatchSize().dataSize); // Truncate spill, merge batch row count @@ -234,12 +240,12 @@ public class TestExternalSortInternals extends DrillTest { // Small, but non-zero, row - rowWidth = 20; + rowWidth = 10; rowCount = 10000; batchSize = rowWidth * rowCount; memManager.updateEstimates(batchSize, rowWidth, rowCount); assertEquals(rowWidth, memManager.getRowWidth()); - assertEquals(batchSize, memManager.getInputBatchSize()); + assertEquals(batchSize, memManager.getInputBatchSize().dataSize); // Truncate spill, merge batch row count @@ -256,69 +262,89 @@ public class TestExternalSortInternals extends DrillTest { public void testLowMemory() { DrillConfig drillConfig = DrillConfig.create(); SortConfig sortConfig = new SortConfig(drillConfig); - long memoryLimit = 10 * ONE_MEG; + int memoryLimit = 10 * ONE_MEG; SortMemoryManager memManager = new SortMemoryManager(sortConfig, memoryLimit); // Tight squeeze, but can be made to work. - // Input batches are a quarter of memory. + // Input batch buffer size is a quarter of memory. int rowWidth = 1000; - int rowCount = (int) (memoryLimit / 4 / rowWidth); - int batchSize = rowCount * rowWidth; + int batchSize = SortMemoryManager.multiply(memoryLimit / 4, SortMemoryManager.PAYLOAD_FROM_BUFFER); + int rowCount = batchSize / rowWidth; + batchSize = rowCount * rowWidth; memManager.updateEstimates(batchSize, rowWidth, rowCount); assertEquals(rowWidth, memManager.getRowWidth()); - assertEquals(batchSize, memManager.getInputBatchSize()); + assertEquals(batchSize, memManager.getInputBatchSize().dataSize); assertFalse(memManager.mayOverflow()); + assertTrue(memManager.hasPerformanceWarning()); // Spill, merge batches should be constrained - int spillBatchSize = memManager.getSpillBatchSize(); + int spillBatchSize = memManager.getSpillBatchSize().dataSize; assertTrue(spillBatchSize < memManager.getPreferredSpillBatchSize()); assertTrue(spillBatchSize >= rowWidth); assertTrue(spillBatchSize <= memoryLimit / 3); assertTrue(spillBatchSize + 2 * batchSize <= memoryLimit); assertTrue(spillBatchSize / rowWidth >= memManager.getSpillBatchRowCount()); - int mergeBatchSize = memManager.getMergeBatchSize(); + int mergeBatchSize = memManager.getMergeBatchSize().dataSize; assertTrue(mergeBatchSize < memManager.getPreferredMergeBatchSize()); assertTrue(mergeBatchSize >= rowWidth); assertTrue(mergeBatchSize + 2 * spillBatchSize <= memoryLimit); assertTrue(mergeBatchSize / rowWidth >= memManager.getMergeBatchRowCount()); - // Should spill after just two batches + // Should spill after just two or three batches - assertFalse(memManager.isSpillNeeded(0, batchSize)); - assertFalse(memManager.isSpillNeeded(batchSize, batchSize)); - assertTrue(memManager.isSpillNeeded(2 * batchSize, batchSize)); + int inputBufferSize = memManager.getInputBatchSize().expectedBufferSize; + assertFalse(memManager.isSpillNeeded(0, inputBufferSize)); + assertFalse(memManager.isSpillNeeded(batchSize, inputBufferSize)); + assertTrue(memManager.isSpillNeeded(3 * inputBufferSize, inputBufferSize)); + } + + @Test + public void testLowerMemory() { + DrillConfig drillConfig = DrillConfig.create(); + SortConfig sortConfig = new SortConfig(drillConfig); + int memoryLimit = 10 * ONE_MEG; + SortMemoryManager memManager = new SortMemoryManager(sortConfig, memoryLimit); // Tighter squeeze, but can be made to work. // Input batches are 3/8 of memory; two fill 3/4, // but small spill and merge batches allow progress. - rowWidth = 1000; - rowCount = (int) (memoryLimit * 3 / 8 / rowWidth); + int rowWidth = 1000; + int batchSize = SortMemoryManager.multiply(memoryLimit * 3 / 8, SortMemoryManager.PAYLOAD_FROM_BUFFER); + int rowCount = batchSize / rowWidth; batchSize = rowCount * rowWidth; memManager.updateEstimates(batchSize, rowWidth, rowCount); assertEquals(rowWidth, memManager.getRowWidth()); - assertEquals(batchSize, memManager.getInputBatchSize()); + assertEquals(batchSize, memManager.getInputBatchSize().dataSize); assertFalse(memManager.mayOverflow()); + assertTrue(memManager.hasPerformanceWarning()); // Spill, merge batches should be constrained - spillBatchSize = memManager.getSpillBatchSize(); + int spillBatchSize = memManager.getSpillBatchSize().dataSize; assertTrue(spillBatchSize < memManager.getPreferredSpillBatchSize()); assertTrue(spillBatchSize >= rowWidth); assertTrue(spillBatchSize <= memoryLimit / 3); assertTrue(spillBatchSize + 2 * batchSize <= memoryLimit); - assertTrue(memManager.getSpillBatchRowCount() > 1); + assertTrue(memManager.getSpillBatchRowCount() >= 1); assertTrue(spillBatchSize / rowWidth >= memManager.getSpillBatchRowCount()); - mergeBatchSize = memManager.getMergeBatchSize(); + int mergeBatchSize = memManager.getMergeBatchSize().dataSize; assertTrue(mergeBatchSize < memManager.getPreferredMergeBatchSize()); assertTrue(mergeBatchSize >= rowWidth); assertTrue(mergeBatchSize + 2 * spillBatchSize <= memoryLimit); assertTrue(memManager.getMergeBatchRowCount() > 1); assertTrue(mergeBatchSize / rowWidth >= memManager.getMergeBatchRowCount()); + + // Should spill after just two batches + + int inputBufferSize = memManager.getInputBatchSize().expectedBufferSize; + assertFalse(memManager.isSpillNeeded(0, inputBufferSize)); + assertFalse(memManager.isSpillNeeded(batchSize, inputBufferSize)); + assertTrue(memManager.isSpillNeeded(2 * inputBufferSize, inputBufferSize)); } @Test @@ -333,21 +359,22 @@ public class TestExternalSortInternals extends DrillTest { // Have to back off the exact size a bit to allow for internal fragmentation // in the merge and output batches. - int rowWidth = (int) (memoryLimit / 3 * 0.75); + int rowWidth = (int) (memoryLimit / 3 / 2); int rowCount = 1; int batchSize = rowWidth; memManager.updateEstimates(batchSize, rowWidth, rowCount); assertEquals(rowWidth, memManager.getRowWidth()); - assertEquals(batchSize, memManager.getInputBatchSize()); + assertEquals(batchSize, memManager.getInputBatchSize().dataSize); assertFalse(memManager.mayOverflow()); + assertTrue(memManager.hasPerformanceWarning()); - int spillBatchSize = memManager.getSpillBatchSize(); + int spillBatchSize = memManager.getSpillBatchSize().dataSize; assertTrue(spillBatchSize >= rowWidth); assertTrue(spillBatchSize <= memoryLimit / 3); assertTrue(spillBatchSize + 2 * batchSize <= memoryLimit); assertEquals(1, memManager.getSpillBatchRowCount()); - int mergeBatchSize = memManager.getMergeBatchSize(); + int mergeBatchSize = memManager.getMergeBatchSize().dataSize; assertTrue(mergeBatchSize >= rowWidth); assertTrue(mergeBatchSize + 2 * spillBatchSize <= memoryLimit); assertEquals(1, memManager.getMergeBatchRowCount()); @@ -357,12 +384,26 @@ public class TestExternalSortInternals extends DrillTest { assertFalse(memManager.isSpillNeeded(0, batchSize)); assertFalse(memManager.isSpillNeeded(batchSize, batchSize)); assertTrue(memManager.isSpillNeeded(2 * batchSize, batchSize)); + } - // In trouble now, can't fit even three rows. + @Test + public void testMemoryOverflow() { + DrillConfig drillConfig = DrillConfig.create(); + SortConfig sortConfig = new SortConfig(drillConfig); + long memoryLimit = 10 * ONE_MEG; + SortMemoryManager memManager = new SortMemoryManager(sortConfig, memoryLimit); + + // In trouble now, can't fit even two input batches. + // A better implementation would spill the first batch to a file, + // leave it open, and append the second batch. Slicing each big input + // batch into small spill batches will allow the sort to proceed as + // long as it can hold a single input batch and single merge batch. But, + // the current implementation requires all batches to be spilled are in + // memory at the same time... - rowWidth = (int) (memoryLimit / 2); - rowCount = 1; - batchSize = rowWidth; + int rowWidth = (int) (memoryLimit / 2); + int rowCount = 1; + int batchSize = rowWidth; memManager.updateEstimates(batchSize, rowWidth, rowCount); assertTrue(memManager.mayOverflow()); } @@ -406,7 +447,7 @@ public class TestExternalSortInternals extends DrillTest { memManager.updateEstimates(batchSize, rowWidth, rowCount); - int spillBatchSize = memManager.getSpillBatchSize(); + int spillBatchSize = memManager.getSpillBatchSize().dataSize; // Test various memory fill levels @@ -432,63 +473,67 @@ public class TestExternalSortInternals extends DrillTest { .build(); SortConfig sortConfig = new SortConfig(drillConfig); // Allow four spill batches, 8 MB each, plus one output of 16 - long memoryLimit = 50 * ONE_MEG; + // Allow for internal fragmentation + // 96 > (4 * 8 + 16) * 2 + long memoryLimit = 96 * ONE_MEG; SortMemoryManager memManager = new SortMemoryManager(sortConfig, memoryLimit); - // Prime the estimates + // Prime the estimates. Batch size is data size, not buffer size. int rowWidth = 300; int rowCount = 10000; int batchSize = rowWidth * rowCount * 2; memManager.updateEstimates(batchSize, rowWidth, rowCount); - int spillBatchSize = memManager.getSpillBatchSize(); - int mergeBatchSize = memManager.getMergeBatchSize(); + assertFalse(memManager.isLowMemory()); + int spillBatchBufferSize = memManager.getSpillBatchSize().expectedBufferSize; + int inputBatchBufferSize = memManager.getInputBatchSize().expectedBufferSize; // One in-mem batch, no merging. - long allocMemory = memoryLimit - mergeBatchSize; + long allocMemory = inputBatchBufferSize; MergeTask task = memManager.consolidateBatches(allocMemory, 1, 0); assertEquals(MergeAction.NONE, task.action); // Many in-mem batches, just enough to merge - allocMemory = memoryLimit - mergeBatchSize; - int memBatches = (int) (allocMemory / batchSize); - allocMemory = memBatches * batchSize; + int memBatches = (int) (memManager.getMergeMemoryLimit() / inputBatchBufferSize); + allocMemory = memBatches * inputBatchBufferSize; task = memManager.consolidateBatches(allocMemory, memBatches, 0); assertEquals(MergeAction.NONE, task.action); // Spills if no room for spill and in-memory batches - task = memManager.consolidateBatches(allocMemory, memBatches, 1); + int spillCount = (int) Math.ceil((memManager.getMergeMemoryLimit() - allocMemory) / (1.0 * spillBatchBufferSize)); + assertTrue(spillCount >= 1); + task = memManager.consolidateBatches(allocMemory, memBatches, spillCount); assertEquals(MergeAction.SPILL, task.action); // One more in-mem batch: now needs to spill memBatches++; - allocMemory = memBatches * batchSize; + allocMemory = memBatches * inputBatchBufferSize; task = memManager.consolidateBatches(allocMemory, memBatches, 0); assertEquals(MergeAction.SPILL, task.action); // No spill for various in-mem/spill run combinations - allocMemory = memoryLimit - spillBatchSize - mergeBatchSize; - memBatches = (int) (allocMemory / batchSize); - allocMemory = memBatches * batchSize; + long freeMem = memManager.getMergeMemoryLimit() - spillBatchBufferSize; + memBatches = (int) (freeMem / inputBatchBufferSize); + allocMemory = memBatches * inputBatchBufferSize; task = memManager.consolidateBatches(allocMemory, memBatches, 1); assertEquals(MergeAction.NONE, task.action); - allocMemory = memoryLimit - 2 * spillBatchSize - mergeBatchSize; - memBatches = (int) (allocMemory / batchSize); - allocMemory = memBatches * batchSize; + freeMem = memManager.getMergeMemoryLimit() - 2 * spillBatchBufferSize; + memBatches = (int) (freeMem / inputBatchBufferSize); + allocMemory = memBatches * inputBatchBufferSize; task = memManager.consolidateBatches(allocMemory, memBatches, 2); assertEquals(MergeAction.NONE, task.action); // No spill if no in-memory, only spill, and spill fits - long freeMem = memoryLimit - mergeBatchSize; - int spillBatches = (int) (freeMem / spillBatchSize); + freeMem = memManager.getMergeMemoryLimit(); + int spillBatches = (int) (freeMem / spillBatchBufferSize); task = memManager.consolidateBatches(0, 0, spillBatches); assertEquals(MergeAction.NONE, task.action); @@ -503,6 +548,47 @@ public class TestExternalSortInternals extends DrillTest { task = memManager.consolidateBatches(0, 0, spillBatches + 2); assertEquals(MergeAction.MERGE, task.action); assertEquals(3, task.count); + + // If only one spilled run, and no in-memory batches, + // skip merge. + + task = memManager.consolidateBatches(0, 0, 1); + assertEquals(MergeAction.NONE, task.action); + + // Very large number of spilled runs. Limit to what fits in memory. + + task = memManager.consolidateBatches(0, 0, 1000); + assertEquals(MergeAction.MERGE, task.action); + assertTrue(task.count <= (int)(memoryLimit / spillBatchBufferSize) - 1); + } + + @Test + public void testMergeCalcsExtreme() { + + DrillConfig drillConfig = DrillConfig.create(); + SortConfig sortConfig = new SortConfig(drillConfig); + + // Force odd situation in which the spill batch is larger + // than memory. Won't actually run, but needed to test + // odd merge case. + + long memoryLimit = ONE_MEG / 2; + SortMemoryManager memManager = new SortMemoryManager(sortConfig, memoryLimit); + + // Prime the estimates. Batch size is data size, not buffer size. + + int rowWidth = (int) memoryLimit; + int rowCount = 1; + int batchSize = rowWidth; + + memManager.updateEstimates(batchSize, rowWidth, rowCount); + assertTrue(memManager.getMergeMemoryLimit() < rowWidth); + + // Only one spill batch, that batch is above the merge memory limit, + // but nothing useful comes from merging. + + MergeTask task = memManager.consolidateBatches(0, 0, 1); + assertEquals(MergeAction.NONE, task.action); } @Test
