[ 
https://issues.apache.org/jira/browse/DRILL-5325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16039806#comment-16039806
 ] 

ASF GitHub Bot commented on DRILL-5325:
---------------------------------------

Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/808#discussion_r120497990
  
    --- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortMemoryManager.java
 ---
    @@ -0,0 +1,506 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.physical.impl.xsort.managed;
    +
    +public class SortMemoryManager {
    +
    +  /**
    +   * Maximum memory this operator may use. Usually comes from the
    +   * operator definition, but may be overridden by a configuration
    +   * parameter for unit testing.
    +   */
    +
    +  private final long memoryLimit;
    +
    +  /**
    +   * Estimated size of the records for this query, updated on each
    +   * new batch received from upstream.
    +   */
    +
    +  private int estimatedRowWidth;
    +
    +  /**
    +   * Size of the merge batches that this operator produces. Generally
    +   * the same as the merge batch size, unless low memory forces a smaller
    +   * value.
    +   */
    +
    +  private int expectedMergeBatchSize;
    +
    +  /**
    +   * Estimate of the input batch size based on the largest batch seen
    +   * thus far.
    +   */
    +  private int estimatedInputBatchSize;
    +
    +  /**
    +   * Maximum memory level before spilling occurs. That is, we can buffer 
input
    +   * batches in memory until we reach the level given by the buffer memory 
pool.
    +   */
    +
    +  private long bufferMemoryLimit;
    +
    +  /**
    +   * Maximum memory that can hold batches during the merge
    +   * phase.
    +   */
    +
    +  private long mergeMemoryLimit;
    +
    +  /**
    +   * The target size for merge batches sent downstream.
    +   */
    +
    +  private int preferredMergeBatchSize;
    +
    +  /**
    +   * The configured size for each spill batch.
    +   */
    +  private int preferredSpillBatchSize;
    +
    +  /**
    +   * Estimated number of rows that fit into a single spill batch.
    +   */
    +
    +  private int spillBatchRowCount;
    +
    +  /**
    +   * The estimated actual spill batch size which depends on the
    +   * details of the data rows for any particular query.
    +   */
    +
    +  private int expectedSpillBatchSize;
    +
    +  /**
    +   * The number of records to add to each output batch sent to the
    +   * downstream operator or spilled to disk.
    +   */
    +
    +  private int mergeBatchRowCount;
    +
    +  private SortConfig config;
    +
    +//  private long spillPoint;
    +
    +//  private long minMergeMemory;
    +
    +  private int estimatedInputSize;
    +
    +  private boolean potentialOverflow;
    +
    +  public SortMemoryManager(SortConfig config, long memoryLimit) {
    +    this.config = config;
    +
    +    // The maximum memory this operator can use as set by the
    +    // operator definition (propagated to the allocator.)
    +
    +    if (config.maxMemory() > 0) {
    +      this.memoryLimit = Math.min(memoryLimit, config.maxMemory());
    +    } else {
    +      this.memoryLimit = memoryLimit;
    +    }
    +
    +    preferredSpillBatchSize = config.spillBatchSize();;
    +    preferredMergeBatchSize = config.mergeBatchSize();
    +  }
    +
    +  /**
    +   * Update the data-driven memory use numbers including:
    +   * <ul>
    +   * <li>The average size of incoming records.</li>
    +   * <li>The estimated spill and output batch size.</li>
    +   * <li>The estimated number of average-size records per
    +   * spill and output batch.</li>
    +   * <li>The amount of memory set aside to hold the incoming
    +   * batches before spilling starts.</li>
    +   * </ul>
    +   * <p>
    +   * Under normal circumstances, the amount of memory available is much
    +   * larger than the input, spill or merge batch sizes. The primary 
question
    +   * is to determine how many input batches we can buffer during the load
    +   * phase, and how many spill batches we can merge during the merge
    +   * phase.
    +   *
    +   * @param batchSize the overall size of the current batch received from
    +   * upstream
    +   * @param batchRowWidth the width in bytes (including overhead) of each
    +   * row in the current input batch
    +   * @param batchRowCount the number of actual (not filtered) records in
    +   * that upstream batch
    +   */
    +
    +  public void updateEstimates(int batchSize, int batchRowWidth, int 
batchRowCount) {
    +
    +    // The record count should never be zero, but better safe than sorry...
    +
    +    if (batchRowCount == 0) {
    +      return; }
    +
    +
    +    // Update input batch estimates.
    +    // Go no further if nothing changed.
    +
    +    if (! updateInputEstimates(batchSize, batchRowWidth, batchRowCount)) {
    +      return;
    +    }
    +
    +    updateSpillSettings();
    +    updateMergeSettings();
    +    adjustForLowMemory();
    +    logSettings(batchRowCount);
    +  }
    +
    +  private boolean updateInputEstimates(int batchSize, int batchRowWidth, 
int batchRowCount) {
    +
    +    // The row width may end up as zero if all fields are nulls or some
    +    // other unusual situation. In this case, assume a width of 10 just
    +    // to avoid lots of special case code.
    +
    +    if (batchRowWidth == 0) {
    +      batchRowWidth = 10;
    +    }
    +
    +    // We know the batch size and number of records. Use that to estimate
    +    // the average record size. Since a typical batch has many records,
    +    // the average size is a fairly good estimator. Note that the batch
    +    // size includes not just the actual vector data, but any unused space
    +    // resulting from power-of-two allocation. This means that we don't
    +    // have to do size adjustments for input batches as we will do below
    +    // when estimating the size of other objects.
    +
    +    // Record sizes may vary across batches. To be conservative, use
    +    // the largest size observed from incoming batches.
    +
    +    int origRowEstimate = estimatedRowWidth;
    +    estimatedRowWidth = Math.max(estimatedRowWidth, batchRowWidth);
    +
    +    // Maintain an estimate of the incoming batch size: the largest
    +    // batch yet seen. Used to reserve memory for the next incoming
    +    // batch. Because we are using the actual observed batch size,
    +    // the size already includes overhead due to power-of-two rounding.
    +
    +    long origInputBatchSize = estimatedInputBatchSize;
    +    estimatedInputBatchSize = Math.max(estimatedInputBatchSize, batchSize);
    +
    +    // Estimate the total size of each incoming batch plus sv2. Note that, 
due
    +    // to power-of-two rounding, the allocated sv2 size might be twice the 
data size.
    +
    +    estimatedInputSize = estimatedInputBatchSize + 4 * batchRowCount;
    +
    +    // Return whether anything changed.
    +
    +    return estimatedRowWidth != origRowEstimate || estimatedInputBatchSize 
!= origInputBatchSize;
    +  }
    +
    +  /**
    +   * Determine the number of records to spill per spill batch. The goal is 
to
    +   * spill batches of either 64K records, or as many records as fit into 
the
    +   * amount of memory dedicated to each spill batch, whichever is less.
    +   */
    +
    +  private void updateSpillSettings() {
    +
    +    spillBatchRowCount = rowsPerBatch(preferredSpillBatchSize);
    +
    +    // Compute the actual spill batch size which may be larger or smaller
    +    // than the preferred size depending on the row width. Double the 
estimated
    +    // memory needs to allow for power-of-two rounding.
    +
    +    expectedSpillBatchSize = batchForRows(spillBatchRowCount);
    +
    +    // Determine the minimum memory needed for spilling. Spilling is done 
just
    +    // before accepting a spill batch, so we must spill if we don't have 
room for a
    +    // (worst case) input batch. To spill, we need room for the spill 
batch created
    +    // by merging the batches already in memory.
    +
    +    bufferMemoryLimit = memoryLimit - expectedSpillBatchSize;
    +  }
    +
    +  /**
    +   * Determine the number of records per batch per merge step. The goal is 
to
    +   * merge batches of either 64K records, or as many records as fit into 
the
    +   * amount of memory dedicated to each merge batch, whichever is less.
    +   */
    +
    +  private void updateMergeSettings() {
    +
    +    mergeBatchRowCount = rowsPerBatch(preferredMergeBatchSize);
    +    expectedMergeBatchSize = batchForRows(mergeBatchRowCount);
    +
    +    // The merge memory pool assumes we can spill all input batches. The 
memory
    +    // available to hold spill batches for merging is total memory minus 
the
    +    // expected output batch size.
    +
    +    mergeMemoryLimit = memoryLimit - expectedMergeBatchSize;
    +  }
    +
    +  /**
    +   * In a low-memory situation we have to approach the memory assignment
    +   * problem from a different angle. Memory is low enough that we can't
    +   * fit the incoming batches (of a size decided by the upstream operator)
    +   * and our usual spill or merge batch sizes. Instead, we have to
    +   * determine the largest spill and merge batch sizes possible given
    +   * the available memory, input batch size and row width. We shrink the
    +   * sizes of the batches we control to try to make things fit into limited
    +   * memory. At some point, however, if we cannot fit even two input
    +   * batches and even the smallest merge match, then we will run into an
    +   * out-of-memory condition and we log a warning.
    +   * <p>
    +   * Note that these calculations are a bit crazy: it is Drill that
    +   * decided to allocate the small memory, it is Drill that created the
    +   * large incoming batches, and so it is Drill that created the low
    +   * memory situation. Over time, a better fix for this condition is to
    +   * control memory usage at the query level so that the sort is guaranteed
    +   * to have sufficient memory. But, since we don't yet have the luxury
    +   * of making such changes, we just live with the situation as we find
    +   * it.
    +   */
    +
    +  private void adjustForLowMemory() {
    +
    +    long loadHeadroom = bufferMemoryLimit - 2 * estimatedInputSize;
    +    long mergeHeadroom = mergeMemoryLimit - 2 * expectedSpillBatchSize;
    +    if (loadHeadroom >= 0  &&  mergeHeadroom >= 0) {
    +      return;
    +    }
    +
    +    lowMemorySpillBatchSize();
    +    lowMemoryMergeBatchSize();
    +
    +    // Sanity check: if we've been given too little memory to make 
progress,
    +    // issue a warning but proceed anyway. Should only occur if something 
is
    +    // configured terribly wrong.
    +
    +    long minNeeds = 2 * estimatedInputSize + expectedSpillBatchSize;
    +    if (minNeeds > memoryLimit) {
    +      ExternalSortBatch.logger.warn("Potential memory overflow during load 
phase! " +
    +          "Minumum needed = {} bytes, actual available = {} bytes",
    +          minNeeds, memoryLimit);
    +      bufferMemoryLimit = 0;
    +      potentialOverflow = true;
    +    }
    +
    +    // Sanity check
    +
    +    minNeeds = 2 * expectedSpillBatchSize + expectedMergeBatchSize;
    +    if (minNeeds > memoryLimit) {
    +      ExternalSortBatch.logger.warn("Potential memory overflow during 
merge phase! " +
    +          "Minumum needed = {} bytes, actual available = {} bytes",
    --- End diff --
    
    Fixed.


> Implement sub-operator unit tests for managed external sort
> -----------------------------------------------------------
>
>                 Key: DRILL-5325
>                 URL: https://issues.apache.org/jira/browse/DRILL-5325
>             Project: Apache Drill
>          Issue Type: Improvement
>          Components: Tools, Build & Test
>    Affects Versions: 1.11.0
>            Reporter: Paul Rogers
>            Assignee: Paul Rogers
>             Fix For: 1.11.0
>
>
> Validate the proposed sub-operator test framework, by creating low-level unit 
> tests for the managed version of the external sort.
> The external sort has a small number of existing tests, but those tests are 
> quite superficial; the "managed sort" project found many bugs. The managed 
> sort itself was tested with ad-hoc system-level tests created using the new 
> "cluster fixture" framework. But, again, such tests could not reach deep 
> inside the sort code to exercise very specific conditions.
> As a result, we spent far too much time using QA functional tests to identify 
> specific code issues.
> Using the sub-opeator unit test framework, we can instead test each bit of 
> functionality at the unit test level.
> If doing so works, and is practical, it can serve as a model for other 
> operator testing projects.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to