Github user Ben-Zvi commented on a diff in the pull request:

    https://github.com/apache/drill/pull/717#discussion_r99024407
  
    --- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
 ---
    @@ -0,0 +1,1321 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.physical.impl.xsort.managed;
    +
    +import java.io.IOException;
    +import java.util.Collection;
    +import java.util.LinkedList;
    +import java.util.List;
    +
    +import org.apache.drill.common.AutoCloseables;
    +import org.apache.drill.common.config.DrillConfig;
    +import org.apache.drill.common.exceptions.UserException;
    +import org.apache.drill.exec.ExecConstants;
    +import org.apache.drill.exec.exception.OutOfMemoryException;
    +import org.apache.drill.exec.exception.SchemaChangeException;
    +import org.apache.drill.exec.memory.BufferAllocator;
    +import org.apache.drill.exec.ops.FragmentContext;
    +import org.apache.drill.exec.ops.MetricDef;
    +import org.apache.drill.exec.physical.config.ExternalSort;
    +import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
    +import org.apache.drill.exec.physical.impl.xsort.MSortTemplate;
    +import org.apache.drill.exec.physical.impl.xsort.SingleBatchSorter;
    +import 
org.apache.drill.exec.physical.impl.xsort.managed.BatchGroup.InputBatch;
    +import org.apache.drill.exec.record.AbstractRecordBatch;
    +import org.apache.drill.exec.record.BatchSchema;
    +import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
    +import org.apache.drill.exec.record.RecordBatch;
    +import org.apache.drill.exec.record.SchemaUtil;
    +import org.apache.drill.exec.record.VectorContainer;
    +import org.apache.drill.exec.record.VectorWrapper;
    +import org.apache.drill.exec.record.WritableBatch;
    +import org.apache.drill.exec.record.selection.SelectionVector2;
    +import org.apache.drill.exec.record.selection.SelectionVector4;
    +import org.apache.drill.exec.testing.ControlsInjector;
    +import org.apache.drill.exec.testing.ControlsInjectorFactory;
    +import org.apache.drill.exec.vector.ValueVector;
    +import org.apache.drill.exec.vector.complex.AbstractContainerVector;
    +
    +import com.google.common.collect.Lists;
    +
    +/**
    + * External sort batch: a sort batch which can spill to disk in
    + * order to operate within a defined memory footprint.
    + * <p>
    + * <h4>Basic Operation</h4>
    + * The operator has three key phases:
    + * <p>
    + * <ul>
    + * <li>The load phase in which batches are read from upstream.</li>
    + * <li>The merge phase in which spilled batches are combined to
    + * reduce the number of files below the configured limit. (Best
    + * practice is to configure the system to avoid this phase.)
    + * <li>The delivery phase in which batches are combined to produce
    + * the final output.</li>
    + * </ul>
    + * During the load phase:
    + * <p>
    + * <ul>
    + * <li>The incoming (upstream) operator provides a series of batches.</li>
    + * <li>This operator sorts each batch, and accumulates them in an in-memory
    + * buffer.</li>
    + * <li>If the in-memory buffer becomes too large, this operator selects
    + * a subset of the buffered batches to spill.</li>
    + * <li>Each spill set is merged to create a new, sorted collection of
    + * batches, and each is spilled to disk.</li>
    + * <li>To allow the use of multiple disk storage, each spill group is 
written
    + * round-robin to a set of spill directories.</li>
    + * </ul>
    + * <p>
    + * During the sort/merge phase:
    + * <p>
    + * <ul>
    + * <li>When the input operator is complete, this operator merges the 
accumulated
    + * batches (which may be all in memory or partially on disk), and returns
    + * them to the output (downstream) operator in chunks of no more than
    + * 32K records.</li>
    + * <li>The final merge must combine a collection of in-memory and spilled
    + * batches. Several limits apply to the maximum "width" of this merge. For
    + * example, we each open spill run consumes a file handle, and we may wish
    + * to limit the number of file handles. A consolidation phase combines
    + * in-memory and spilled batches prior to the final merge to control final
    + * merge width.</li>
    + * <li>A special case occurs if no batches were spilled. In this case, the 
input
    + * batches are sorted in memory without merging.</li>
    + * </ul>
    + * <p>
    + * Many complex details are involved in doing the above; the details are 
explained
    + * in the methods of this class.
    + * <p>
    + * <h4>Configuration Options</h4>
    + * <dl>
    + * <dt>drill.exec.sort.external.spill.fs</dt>
    + * <dd>The file system (file://, hdfs://, etc.) of the spill 
directory.</dd>
    + * <dt>drill.exec.sort.external.spill.directories</dt>
    + * <dd>The (comma? space?) separated list of directories, on the above file
    + * system, to which to spill files in round-robin fashion. The query will
    + * fail if any one of the directories becomes full.</dt>
    + * <dt>drill.exec.sort.external.spill.file_size</dt>
    + * <dd>Target size for first-generation spill files Set this to large
    + * enough to get nice long writes, but not so large that spill directories
    + * are overwhelmed.</dd>
    + * <dt>drill.exec.sort.external.mem_limit</dt>
    + * <dd>Maximum memory to use for the in-memory buffer. (Primarily for 
testing.)</dd>
    + * <dt>drill.exec.sort.external.batch_limit</dt>
    + * <dd>Maximum number of batches to hold in memory. (Primarily for 
testing.)</dd>
    + * <dt>drill.exec.sort.external.spill.max_count</dt>
    + * <dd>Maximum number of batches to add to “first generation” files.
    + * Defaults to 0 (no limit). (Primarily for testing.)</dd>
    + * <dt>drill.exec.sort.external.spill.min_count</dt>
    + * <dd>Minimum number of batches to add to “first generation” files.
    + * Defaults to 0 (no limit). (Primarily for testing.)</dd>
    + * <dt>drill.exec.sort.external.merge_limit</dt>
    + * <dd>Sets the maximum number of runs to be merged in a single pass 
(limits
    + * the number of open files.)</dd>
    + * </dl>
    + * <p>
    + * The memory limit observed by this operator is the lesser of:
    + * <ul>
    + * <li>The maximum allocation allowed the the allocator assigned to this 
batch, or</li>
    + * <li>The maximum limit set for this operator by the Foreman.</li>
    + * <li>The maximum limit configured in the mem_limit parameter above. 
(Primarily for
    + * testing.</li>
    + * </ul>
    + * <h4>Output</h4>
    + * It is helpful to note that the sort operator will produce one of two 
kinds of
    + * output batches.
    + * <ul>
    + * <li>A large output with sv4 if data is sorted in memory. The sv4 
addresses
    + * the entire in-memory sort set. A selection vector remover will copy 
results
    + * into new batches of a size determined by that operator.</li>
    + * <li>A series of batches, without a selection vector, if the sort spills 
to
    + * disk. In this case, the downstream operator will still be a selection 
vector
    + * remover, but there is nothing for that operator to remove. Each batch is
    + * of the size set by {@link #MAX_MERGED_BATCH_SIZE}.</li>
    + * </ul>
    + * Note that, even in the in-memory sort case, this operator could do the 
copying
    + * to eliminate the extra selection vector remover. That is left as an 
exercise
    + * for another time.
    + * <h4>Logging</h4>
    + * Logging in this operator serves two purposes:
    + * <li>
    + * <ul>
    + * <li>Normal diagnostic information.</li>
    + * <li>Capturing the essence of the operator functionality for analysis in 
unit
    + * tests.</li>
    + * </ul>
    + * Test logging is designed to capture key events and timings. Take care
    + * when changing or removing log messages as you may need to adjust unit 
tests
    + * accordingly.
    + */
    +
    +public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
    +  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(ExternalSortBatch.class);
    +  protected static final ControlsInjector injector = 
ControlsInjectorFactory.getInjector(ExternalSortBatch.class);
    +
    +  private final RecordBatch incoming;
    +
    +  /**
    +   * Memory allocator for this operator itself. Incoming batches are
    +   * transferred into this allocator. Intermediate batches used during
    +   * merge also reside here.
    +   */
    +
    +  private final BufferAllocator allocator;
    +
    +  /**
    +   * Schema of batches that this operator produces.
    +   */
    +
    +  private BatchSchema schema;
    +
    +  private LinkedList<BatchGroup.InputBatch> bufferedBatches = 
Lists.newLinkedList();
    +  private LinkedList<BatchGroup.SpilledRun> spilledRuns = 
Lists.newLinkedList();
    +  private SelectionVector4 sv4;
    +
    +  /**
    +   * The number of records to add to each output batch sent to the
    +   * downstream operator or spilled to disk.
    +   */
    +
    +  private int outputBatchRecordCount;
    +  private int peakNumBatches = -1;
    +
    +  /**
    +   * The copier uses the COPIER_BATCH_MEM_LIMIT to estimate the target
    +   * number of records to return in each batch.
    +   * <p>
    +   * For reference, see {@link FlattenTemplate#OUTPUT_MEMORY_LIMIT}.
    +   * <p>
    +   * WARNING: Do not allow any one vector to grow beyond 16 MB. Drill 
contains a
    +   * design flaw that may give rise to fatal memory fragmentation if we 
allow it to
    +   * allocate larger vectors.
    +   */
    +
    +  private static final int MAX_MERGED_BATCH_SIZE = 16 * 1024 * 1024;
    +
    +  /**
    +   * Smallest allowed output batch size. The smallest output batch
    +   * created even under constrained memory conditions.
    +   */
    +  private static final int MIN_MERGED_BATCH_SIZE = 256 * 1024;
    +
    +  /**
    +   * The preferred amount of memory to set aside to output batches
    +   * expressed as a ratio of available memory.
    +   */
    +
    +  private static final float MERGE_BATCH_ALLOWANCE = 0.10F;
    +
    +  public static final String INTERRUPTION_AFTER_SORT = "after-sort";
    +  public static final String INTERRUPTION_AFTER_SETUP = "after-setup";
    +  public static final String INTERRUPTION_WHILE_SPILLING = "spilling";
    +
    +  private long memoryLimit;
    +
    +  /**
    +   * Iterates over the final, sorted results.
    +   */
    +
    +  private SortResults resultsIterator;
    +
    +  /**
    +   * Manages the set of spill directories and files.
    +   */
    +
    +  private final SpillSet spillSet;
    +
    +  /**
    +   * Manages the copier used to merge a collection of batches into
    +   * a new set of batches.
    +   */
    +
    +  private final CopierHolder copierHolder;
    +
    +  private enum SortState { START, LOAD, DELIVER, DONE }
    +  private SortState sortState = SortState.START;
    +  private int inputRecordCount = 0;
    +  private int inputBatchCount = 0; // total number of batches received so 
far
    +  private final OperatorCodeGenerator opCodeGen;
    +
    +  /**
    +   * Estimated size of the records for this query, updated on each
    +   * new batch received from upstream.
    +   */
    +
    +  private int estimatedRecordSize;
    +
    +  /**
    +   * Estimated size of the spill and output batches that this
    +   * operator produces, estimated from the estimated record
    +   * size.
    +   */
    +
    +  private long estimatedOutputBatchSize;
    +  private long estimatedInputBatchSize;
    +
    +  /**
    +   * Maximum number of batches to hold in memory.
    +   * (Primarily for testing.)
    +   */
    +
    +  private int bufferedBatchLimit;
    +  private int mergeLimit;
    +  private int minSpillLimit;
    +  private int maxSpillLimit;
    +  private long spillFileSize;
    +  private long minimumBufferSpace;
    +
    +  /**
    +   * Minimum memory level before spilling occurs. That is, we can buffer 
input
    +   * batches in memory until we are down to the level given by the spill 
point.
    +   */
    +
    +  private long spillPoint;
    +  private long mergeMemoryPool;
    +  private long preferredMergeBatchSize;
    +
    +  // WARNING: The enum here is used within this class. But, the members of
    +  // this enum MUST match those in the (unmanaged) ExternalSortBatch since
    +  // that is the enum used in the UI to display metrics for the query 
profile.
    +
    +  public enum Metric implements MetricDef {
    +    SPILL_COUNT,            // number of times operator spilled to disk
    +    RETIRED1,               // Was: peak value for totalSizeInMemory
    +                            // But operator already provides this value
    +    PEAK_BATCHES_IN_MEMORY, // maximum number of batches kept in memory
    +    MERGE_COUNT,            // Number of second+ generation merges
    +    MIN_BUFFER,             // Minimum memory level observed in operation.
    +    INPUT_BATCHES;          // Number of batches read from upstream.
    +
    +    @Override
    +    public int metricId() {
    +      return ordinal();
    +    }
    +  }
    +
    +  /**
    +   * Iterates over the final sorted results. Implemented differently
    +   * depending on whether the results are in-memory or spilled to
    +   * disk.
    +   */
    +
    +  public interface SortResults {
    +    boolean next();
    +    void close();
    +    int getBatchCount();
    +    int getRecordCount();
    +  }
    +
    +  public ExternalSortBatch(ExternalSort popConfig, FragmentContext 
context, RecordBatch incoming) {
    +    super(popConfig, context, true);
    +    this.incoming = incoming;
    +    allocator = oContext.getAllocator();
    +    opCodeGen = new OperatorCodeGenerator(context, popConfig);
    +
    +    spillSet = new SpillSet(context, popConfig);
    +    copierHolder = new CopierHolder(context, allocator, opCodeGen);
    +    configure(context.getConfig());
    +  }
    +
    +  private void configure(DrillConfig config) {
    +
    +    // The maximum memory this operator can use. It is either the
    +    // limit set on the allocator or on the operator, whichever is
    +    // less.
    +
    +    memoryLimit = Math.min(popConfig.getMaxAllocation(), 
allocator.getLimit());
    +
    +    // Optional configured memory limit, typically used only for testing.
    +
    +    long configLimit = 
config.getBytes(ExecConstants.EXTERNAL_SORT_MAX_MEMORY);
    +    if (configLimit > 0) {
    +      memoryLimit = Math.min(memoryLimit, configLimit);
    +    }
    +
    +    // Optional limit on the number of buffered in-memory batches.
    +    // 0 means no limit. Used primarily for testing. Must allow at least 
two
    +    // batches or no merging can occur.
    +
    +    bufferedBatchLimit = getConfigLimit(config, 
ExecConstants.EXTERNAL_SORT_BATCH_LIMIT, Integer.MAX_VALUE, 2);
    +
    +    // Optional limit on the number of spilled runs to merge in a single
    +    // pass. Limits the number of open file handles. Must allow at least
    +    // two batches to merge to make progress.
    +
    +    mergeLimit = getConfigLimit(config, 
ExecConstants.EXTERNAL_SORT_MERGE_LIMIT, Integer.MAX_VALUE, 2);
    +
    +    // Limits on the minimum and maximum buffered batches to spill per
    +    // spill event.
    +
    +    minSpillLimit = getConfigLimit(config, 
ExecConstants.EXTERNAL_SORT_MIN_SPILL, Integer.MAX_VALUE, 2);
    +    maxSpillLimit = getConfigLimit(config, 
ExecConstants.EXTERNAL_SORT_MAX_SPILL, Integer.MAX_VALUE, 2);
    +    if (minSpillLimit > maxSpillLimit) {
    +      minSpillLimit = Math.min(minSpillLimit, maxSpillLimit);
    +      maxSpillLimit = minSpillLimit;
    +    }
    +
    +    // Limits the size of first-generation spill files.
    +
    +    spillFileSize = 
config.getBytes(ExecConstants.EXTERNAL_SORT_SPILL_FILE_SIZE);
    +
    +    // Set the target output batch size. Use the maximum size, but only if
    +    // this represents less than 10% of available memory. Otherwise, use 
10%
    +    // of memory, but no smaller than the minimum size. In any event, an
    +    // output batch can contain no fewer than a single record.
    +
    +    long maxAllowance = (long) (memoryLimit * MERGE_BATCH_ALLOWANCE);
    +    preferredMergeBatchSize = Math.min(maxAllowance, 
MAX_MERGED_BATCH_SIZE);
    +    preferredMergeBatchSize = Math.max(preferredMergeBatchSize, 
MIN_MERGED_BATCH_SIZE);
    +
    +    logger.debug("Config: memory limit = {}, batch limit = {}, " +
    +                 "min, max spill limit: {}, {}, merge limit = {}, merge 
batch size = {}",
    +                  memoryLimit, bufferedBatchLimit, minSpillLimit, 
maxSpillLimit, mergeLimit,
    +                  preferredMergeBatchSize);
    +  }
    +
    +  private int getConfigLimit(DrillConfig config, String paramName, int 
valueIfZero, int minValue) {
    +    int limit = config.getInt(paramName);
    +    if (limit > 0) {
    +      limit = Math.max(limit, minValue);
    +    } else {
    +      limit = valueIfZero;
    +    }
    +    return limit;
    +  }
    +
    +  @Override
    +  public int getRecordCount() {
    +    if (sv4 != null) {
    +      return sv4.getCount();
    +    }
    +    return container.getRecordCount();
    +  }
    +
    +  @Override
    +  public SelectionVector4 getSelectionVector4() {
    +    return sv4;
    +  }
    +
    +  private void closeBatchGroups(Collection<? extends BatchGroup> groups) {
    +    for (BatchGroup group: groups) {
    +      try {
    +        group.close();
    +      } catch (Exception e) {
    +        // collect all failure and make sure to cleanup all remaining 
batches
    +        // Originally we would have thrown a RuntimeException that would 
propagate to FragmentExecutor.closeOutResources()
    +        // where it would have been passed to context.fail()
    +        // passing the exception directly to context.fail(e) will let the 
cleanup process continue instead of stopping
    +        // right away, this will also make sure we collect any additional 
exception we may get while cleaning up
    +        context.fail(e);
    +      }
    +    }
    +  }
    +
    +  @Override
    +  public void close() {
    +    try {
    +      if (bufferedBatches != null) {
    +        closeBatchGroups(bufferedBatches);
    +        bufferedBatches = null;
    +      }
    +      if (spilledRuns != null) {
    +        closeBatchGroups(spilledRuns);
    +        spilledRuns = null;
    +      }
    +    } finally {
    +      if (sv4 != null) {
    +        sv4.clear();
    +      }
    +      if (resultsIterator != null) {
    +        resultsIterator.close();
    +      }
    +      copierHolder.close();
    +      spillSet.close();
    +      opCodeGen.close();
    +
    +      // The call to super.close() clears out the output container.
    +      // Doing so requires the allocator here, so it must be closed
    +      // after the super call.
    +
    +      super.close();
    +      allocator.close();
    +    }
    +  }
    +
    +  /**
    +   * Called by {@link AbstractRecordBatch} as a fast-path to obtain
    +   * the first record batch and setup the schema of this batch in order
    +   * to quickly return the schema to the client. Note that this method
    +   * fetches the first batch from upstream which will be waiting for
    +   * us the first time that {@link #innerNext()} is called.
    +   */
    +
    +  @Override
    +  public void buildSchema() {
    +    IterOutcome outcome = next(incoming);
    +    switch (outcome) {
    +      case OK:
    +      case OK_NEW_SCHEMA:
    +        for (VectorWrapper<?> w : incoming) {
    +          @SuppressWarnings("resource")
    +          ValueVector v = container.addOrGet(w.getField());
    +          if (v instanceof AbstractContainerVector) {
    +            w.getValueVector().makeTransferPair(v); // Can we remove this 
hack?
    +            v.clear();
    +          }
    +          v.allocateNew(); // Can we remove this? - SVR fails with NPE 
(TODO)
    +        }
    +        container.buildSchema(SelectionVectorMode.NONE);
    +        container.setRecordCount(0);
    +        break;
    +      case STOP:
    +        state = BatchState.STOP;
    +        break;
    +      case OUT_OF_MEMORY:
    +        state = BatchState.OUT_OF_MEMORY;
    +        break;
    +      case NONE:
    +        state = BatchState.DONE;
    +        break;
    +      default:
    +        break;
    +    }
    +  }
    +
    +  /**
    +   * Process each request for a batch. The first request retrieves
    +   * the all incoming batches and sorts them, optionally spilling to
    +   * disk as needed. Subsequent calls retrieve the sorted results in
    +   * fixed-size batches.
    +   */
    +
    +  @Override
    +  public IterOutcome innerNext() {
    +    switch (sortState) {
    +    case DONE:
    +      return IterOutcome.NONE;
    +    case START:
    +    case LOAD:
    +      return load();
    +    case DELIVER:
    +      return nextOutputBatch();
    +    default:
    +      throw new IllegalStateException("Unexpected sort state: " + 
sortState);
    +    }
    +  }
    +
    +  private IterOutcome nextOutputBatch() {
    +    if (resultsIterator.next()) {
    +      return IterOutcome.OK;
    +    } else {
    +      logger.trace("Deliver phase complete: Returned {} batches, {} 
records",
    +                    resultsIterator.getBatchCount(), 
resultsIterator.getRecordCount());
    +      sortState = SortState.DONE;
    +      return IterOutcome.NONE;
    +    }
    +  }
    +
    +  /**
    +   * Load and process a single batch, handling schema changes. In general, 
the
    +   * external sort accepts only one schema. It can handle compatible 
schemas
    +   * (which seems to mean the same columns in possibly different orders.)
    +   *
    +   * @return return code depending on the amount of data read from upstream
    +   */
    +
    +  private IterOutcome loadBatch() {
    +
    +    // If this is the very first batch, then AbstractRecordBatch
    +    // already loaded it for us in buildSchema().
    +
    +    IterOutcome upstream;
    +    if (sortState == SortState.START) {
    +      sortState = SortState.LOAD;
    +      upstream = IterOutcome.OK_NEW_SCHEMA;
    +    } else {
    +      upstream = next(incoming);
    +    }
    +    switch (upstream) {
    +    case NONE:
    +      return upstream;
    +    case NOT_YET:
    +      throw new UnsupportedOperationException();
    +    case STOP:
    +      return upstream;
    +    case OK_NEW_SCHEMA:
    +    case OK:
    +      setupSchema(upstream);
    +
    +      // Add the batch to the in-memory generation, spilling if
    +      // needed.
    +
    +      processBatch();
    +      break;
    +    case OUT_OF_MEMORY:
    +
    +      // Note: it is highly doubtful that this code actually works. It
    +      // requires that the upstream batches got to a safe place to run
    +      // out of memory and that no work as in-flight and thus abandoned.
    +      // Consider removing this case once resource management is in place.
    +
    +      logger.debug("received OUT_OF_MEMORY, trying to spill");
    +      if (bufferedBatches.size() > 2) {
    +        spillFromMemory();
    +      } else {
    +        logger.debug("not enough batches to spill, sending OUT_OF_MEMORY 
downstream");
    +        return IterOutcome.OUT_OF_MEMORY;
    +      }
    +      break;
    +    default:
    +      throw new UnsupportedOperationException();
    +    }
    +    return IterOutcome.OK;
    +  }
    +
    +  /**
    +   * Load the results and sort them. May bail out early if an exceptional
    +   * condition is passed up from the input batch.
    +   *
    +   * @return return code: OK_NEW_SCHEMA if rows were sorted,
    +   * NONE if no rows
    +   */
    +
    +  private IterOutcome load() {
    +    logger.trace("Start of load phase");
    +
    +    // Clear the temporary container created by
    +    // buildSchema().
    +
    +    container.clear();
    +
    +    // Loop over all input batches
    +
    +    for (;;) {
    +      IterOutcome result = loadBatch();
    +
    +      // None means all batches have been read.
    +
    +      if (result == IterOutcome.NONE) {
    +        break; }
    +
    +      // Any outcome other than OK means something went wrong.
    +
    +      if (result != IterOutcome.OK) {
    +        return result; }
    +    }
    +
    +    // Anything to actually sort?
    +
    +    if (inputRecordCount == 0) {
    +      sortState = SortState.DONE;
    +      return IterOutcome.NONE;
    +    }
    +    logger.trace("Completed load phase: read {} batches", inputBatchCount);
    +
    +    // Do the merge of the loaded batches. The merge can be done entirely 
in memory if
    +    // the results fit; else we have to do a disk-based merge of
    +    // pre-sorted spilled batches.
    +
    +    if (canUseMemoryMerge()) {
    +      return sortInMemory();
    +    } else {
    +      return mergeSpilledRuns();
    +    }
    +  }
    +
    +  /**
    +   * All data has been read from the upstream batch. Determine if we
    +   * can use a fast in-memory sort, or must use a merge (which typically,
    +   * but not always, involves spilled batches.)
    +   *
    +   * @return whether sufficient resources exist to do an in-memory sort
    +   * if all batches are still in memory
    +   */
    +
    +  private boolean canUseMemoryMerge() {
    +    if (spillSet.hasSpilled()) { return false; }
    +
    +    // Do we have enough memory for MSorter (the in-memory sorter)?
    +
    +    long allocMem = allocator.getAllocatedMemory();
    +    long availableMem = memoryLimit - allocMem;
    +    long neededForInMemorySort = 
MSortTemplate.memoryNeeded(inputRecordCount);
    +    if (availableMem < neededForInMemorySort) { return false; }
    +
    +    // Make sure we don't exceed the maximum number of batches SV4 can 
address.
    +
    +    if (bufferedBatches.size() > Character.MAX_VALUE) { return false; }
    +
    +    // We can do an in-memory merge.
    +
    +    return true;
    +  }
    +
    +  /**
    +   * Handle a new schema from upstream. The ESB is quite limited in its 
ability
    +   * to handle schema changes.
    +   *
    +   * @param upstream the status code from upstream: either OK or 
OK_NEW_SCHEMA
    +   */
    +
    +  private void setupSchema(IterOutcome upstream)  {
    +
    +    // First batch: we won't have a schema.
    +
    +    if (schema == null) {
    +      schema = incoming.getSchema();
    +
    +    // Subsequent batches, nothing to do if same schema.
    +
    +    } else if (upstream == IterOutcome.OK) {
    +      return;
    +
    +    // Only change in the case that the schema truly changes. Artificial 
schema changes are ignored.
    +
    +    } else if (incoming.getSchema().equals(schema)) {
    +      return;
    +    } else if (unionTypeEnabled) {
    +        schema = SchemaUtil.mergeSchemas(schema, incoming.getSchema());
    +
    +        // New schema: must generate a new sorter and copier.
    +
    +        opCodeGen.setSchema(schema);
    +    } else {
    +      throw UserException.unsupportedError()
    +            .message("Schema changes not supported in External Sort. 
Please enable Union type.")
    +            .build(logger);
    +    }
    +
    +    // Coerce all existing batches to the new schema.
    +
    +    for (BatchGroup b : bufferedBatches) {
    +//      System.out.println("Before: " + allocator.getAllocatedMemory()); 
// Debug only
    +      b.setSchema(schema);
    +//      System.out.println("After: " + allocator.getAllocatedMemory()); // 
Debug only
    +    }
    +    for (BatchGroup b : spilledRuns) {
    +      b.setSchema(schema);
    +    }
    +  }
    +
    +  /**
    +   * Convert an incoming batch into the agree-upon format. (Also seems to
    +   * make a persistent shallow copy of the batch saved until we are ready
    +   * to sort or spill.)
    +   *
    +   * @return the converted batch, or null if the incoming batch is empty
    +   */
    +
    +  private VectorContainer convertBatch() {
    +
    +    // Must accept the batch even if no records. Then clear
    +    // the vectors to release memory since we won't do any
    +    // further processing with the empty batch.
    +
    +    VectorContainer convertedBatch = SchemaUtil.coerceContainer(incoming, 
schema, oContext);
    +    if (incoming.getRecordCount() == 0) {
    +      for (VectorWrapper<?> w : convertedBatch) {
    +        w.clear();
    +      }
    +      return null;
    +    }
    +    return convertedBatch;
    +  }
    +
    +  private SelectionVector2 makeSelectionVector() {
    +    if (incoming.getSchema().getSelectionVectorMode() == 
BatchSchema.SelectionVectorMode.TWO_BYTE) {
    +      return incoming.getSelectionVector2().clone();
    +    } else {
    +      return newSV2();
    +    }
    +  }
    +
    +  /**
    +   * Process the converted incoming batch by adding it to the in-memory 
store
    +   * of data, or spilling data to disk when necessary.
    +   */
    +
    +  @SuppressWarnings("resource")
    +  private void processBatch() {
    +
    +    // The heart of the external sort operator: spill to disk when
    +    // the in-memory generation exceeds the allowed memory limit.
    +    // Preemptively spill BEFORE accepting the new batch into our memory
    +    // pool. The allocator will throw an OOM exception if we accept the
    +    // batch when we are near the limit - despite the fact that the batch
    +    // is already in memory and no new memory is allocated during the 
transfer.
    +
    +    if (isSpillNeeded()) {
    +      spillFromMemory();
    +    }
    +
    +    // Sanity check. We should now be above the spill point.
    +
    +    long startMem = allocator.getAllocatedMemory();
    +    if (memoryLimit - startMem < spillPoint) {
    +      logger.error( "ERROR: Failed to spill below the spill point. Spill 
point = {}, free memory = {}",
    +                    spillPoint, memoryLimit - startMem);
    +    }
    +
    +    // Convert the incoming batch to the agreed-upon schema.
    +    // No converted batch means we got an empty input batch.
    +    // Converting the batch transfers memory ownership to our
    +    // allocator. This gives a round-about way to learn the batch
    +    // size: check the before and after memory levels, then use
    +    // the difference as the batch size, in bytes.
    +
    +    VectorContainer convertedBatch = convertBatch();
    +    if (convertedBatch == null) {
    +      return;
    +    }
    +
    +    SelectionVector2 sv2 = makeSelectionVector();
    +
    +    // Compute batch size, including allocation of an sv2.
    +
    +    long endMem = allocator.getAllocatedMemory();
    +    long batchSize = endMem - startMem;
    +    int count = sv2.getCount();
    +    inputRecordCount += count;
    +    inputBatchCount++;
    +    stats.setLongStat(Metric.INPUT_BATCHES, inputBatchCount);
    +
    +    // Update the minimum buffer space metric.
    +
    +    if (minimumBufferSpace == 0) {
    +      minimumBufferSpace = endMem;
    +    } else {
    +      minimumBufferSpace = Math.min(minimumBufferSpace, endMem);
    +    }
    +    stats.setLongStat(Metric.MIN_BUFFER, minimumBufferSpace);
    +
    +    // Update the size based on the actual record count, not
    +    // the effective count as given by the selection vector
    +    // (which may exclude some records due to filtering.)
    +
    +    updateMemoryEstimates(batchSize, convertedBatch.getRecordCount());
    +
    +    // Sort the incoming batch using either the original selection vector,
    +    // or a new one created here.
    +
    +    SingleBatchSorter sorter;
    +    sorter = opCodeGen.getSorter(convertedBatch);
    +    try {
    +      sorter.setup(context, sv2, convertedBatch);
    +    } catch (SchemaChangeException e) {
    +      convertedBatch.clear();
    +      throw UserException.unsupportedError(e)
    +            .message("Unexpected schema change.")
    +            .build(logger);
    +    }
    +    try {
    +      sorter.sort(sv2);
    +    } catch (SchemaChangeException e) {
    +      throw UserException.unsupportedError(e)
    +                .message("Unexpected schema change.")
    +                .build(logger);
    +    }
    +    RecordBatchData rbd = new RecordBatchData(convertedBatch, allocator);
    +    try {
    +      rbd.setSv2(sv2);
    +      bufferedBatches.add(new BatchGroup.InputBatch(rbd.getContainer(), 
rbd.getSv2(), oContext, batchSize));
    +      if (peakNumBatches < bufferedBatches.size()) {
    +        peakNumBatches = bufferedBatches.size();
    +        stats.setLongStat(Metric.PEAK_BATCHES_IN_MEMORY, peakNumBatches);
    +      }
    +
    +    } catch (Throwable t) {
    +      rbd.clear();
    +      throw t;
    +    }
    +  }
    +
    +  /**
    +   * Update the data-driven memory use numbers including:
    +   * <ul>
    +   * <li>The average size of incoming records.</li>
    +   * <li>The estimated spill and output batch size.</li>
    +   * <li>The estimated number of average-size records per
    +   * spill and output batch.</li>
    +   * <li>The amount of memory set aside to hold the incoming
    +   * batches before spilling starts.</li>
    +   * </ul>
    +   *
    +   * @param actualBatchSize the overall size of the current batch received 
from
    +   * upstream
    +   * @param actualRecordCount the number of actual (not filtered) records 
in
    +   * that upstream batch
    +   */
    +
    +  private void updateMemoryEstimates(long actualBatchSize, int 
actualRecordCount) {
    +
    +    // The record count should never be zero, but better safe than sorry...
    +
    +    if (actualRecordCount == 0) {
    +      return; }
    +
    +    // We know the batch size and number of records. Use that to estimate
    +    // the average record size. Since a typical batch has many records,
    +    // the average size is a fairly good estimator. Note that the batch
    +    // size includes not just the actual vector data, but any unused space
    +    // resulting from power-of-two allocation. This means that we don't
    +    // have to do size adjustments for input batches as we will do below
    +    // when estimating the size of other objects.
    +
    +    int batchRecordSize = (int) (actualBatchSize / actualRecordCount);
    +
    +    // Record sizes may vary across batches. To be conservative, use
    +    // the largest size observed from incoming batches.
    +
    +    int origEstimate = estimatedRecordSize;
    +    estimatedRecordSize = Math.max(estimatedRecordSize, batchRecordSize);
    +
    +    // Go no further if nothing changed.
    +
    +    if (estimatedRecordSize == origEstimate) {
    +      return; }
    +
    +    // Maintain an estimate of the incoming batch size: the largest
    +    // batch yet seen. Used to reserve memory for the next incoming
    +    // batch.
    +
    +    estimatedInputBatchSize = Math.max(estimatedInputBatchSize, 
actualBatchSize);
    +
    +    // Estimate the input record count. Add one due to rounding.
    +
    +    long estimatedInputRecordCount = estimatedInputBatchSize / 
estimatedRecordSize + 1;
    +
    +    // Estimate the total size of each incoming batch plus sv2. Note that, 
due
    +    // to power-of-two rounding, the allocated size might be twice the 
data size.
    +
    +    long estimatedInputSize = estimatedInputBatchSize + 4 * 
estimatedInputRecordCount;
    +
    +    // Determine the number of records to spill per merge step. The goal 
is to
    +    // spill batches of either 32K records, or as many records as fit into 
the
    +    // amount of memory dedicated to each batch, whichever is less.
    +
    +    outputBatchRecordCount = (int) Math.max(1, preferredMergeBatchSize / 
estimatedRecordSize);
    +    outputBatchRecordCount = Math.min(outputBatchRecordCount, 
Short.MAX_VALUE);
    +
    +    // Compute the estimated size of batches that this operator creates.
    +    // Note that this estimate DOES NOT apply to incoming batches as we 
have
    +    // no control over those. Note that the output batch size should be 
about
    +    // the same as the MAX_MERGED_BATCH_SIZE constant used to create the
    +    // estimated record count. But, it can be bigger if we have jumbo sized
    +    // records larger than the target output size.
    +
    +    estimatedOutputBatchSize = outputBatchRecordCount * 
estimatedRecordSize;
    +    estimatedOutputBatchSize = Math.max(estimatedOutputBatchSize, 
preferredMergeBatchSize);
    +
    +    // Determine the minimum memory needed for spilling. Spilling is done 
just
    +    // before accepting a batch, so we must spill if we don't have room 
for a
    +    // (worst case) input batch. To spill, we need room for the output 
batch created
    +    // by merging the batches already in memory. Double this to allow for 
power-of-two
    +    // memory allocations, then add one more as a margin of safety.
    +
    +    spillPoint = estimatedInputBatchSize + 3 * estimatedOutputBatchSize;
    +
    +    // Determine the minimum total memory we would need to receive two 
input
    +    // batches (the minimum needed to make progress) and the allowance for 
the
    +    // output batch.
    +
    +    long minLoadMemory = spillPoint + estimatedInputSize;
    +
    +    // The merge memory pool assumes we can spill all input batches. To 
make
    +    // progress, we must have at least two merge batches (same size as an 
output
    +    // batch) and one output batch. Again, double to allow for power-of-two
    +    // allocation and add one for a margin of error.
    +
    +    long minMergeMemory = (2*3 + 1) * estimatedOutputBatchSize;
    +
    +    // Determine how much memory can be used to hold in-memory batches of 
spilled
    +    // runs when reading from disk.
    +
    +    mergeMemoryPool = Math.max(minMergeMemory,
    +                               (long) ((memoryLimit - 3 * 
estimatedOutputBatchSize) * 0.95));
    +
    +    // Sanity check: if we've been given too little memory to make 
progress,
    +    // issue a warning but proceed anyway. Should only occur if something 
is
    +    // configured terribly wrong.
    +
    +    long minMemoryNeeds = Math.max(minLoadMemory, minMergeMemory);
    +    if (minMemoryNeeds > memoryLimit) {
    +      logger.warn("updateMemoryEstimates: potential memory overflow! " +
    +                   "Minumum needed = {} bytes, actual available = {} 
bytes",
    +                   minMemoryNeeds, memoryLimit);
    +    }
    +
    +    // Log the calculated values. Turn this on if things seem amiss.
    +    // Message will appear only when the values change.
    +
    +    logger.debug("Memory Estimates: record size = {} bytes; input batch = 
{} bytes, {} records; " +
    +                  "output batch size = {} bytes, {} records; " +
    +                  "Available memory: {}, spill point = {}, min. merge 
memory = {}",
    +                estimatedRecordSize, estimatedInputBatchSize, 
estimatedInputRecordCount,
    +                estimatedOutputBatchSize, outputBatchRecordCount,
    +                memoryLimit, spillPoint, minMergeMemory);
    +  }
    +
    +  /**
    +   * Determine if spill is needed before receiving the new record batch.
    +   * Spilling is driven purely by memory availability (and an optional
    +   * batch limit for testing.)
    +   *
    +   * @return true if spilling is needed, false otherwise
    +   */
    +
    +  private boolean isSpillNeeded() {
    +
    +    // Can't spill if less than two batches else the merge
    +    // can't make progress.
    +
    +    if (bufferedBatches.size() < 2) {
    +      return false; }
    +
    +    // Must spill if we are below the spill point (the amount of memory
    +    // needed to do the minimal spill.)
    +
    +    long freeMemory = memoryLimit - allocator.getAllocatedMemory();
    +
    +    // Sanity check. If the memory goes negative, the calcs are off.
    +
    +    if (freeMemory < 0) {
    +      logger.error("ERROR: Free memory is negative: {}. Spill point = {}",
    +                   freeMemory, spillPoint);
    +    }
    +    if (freeMemory <= spillPoint) {
    +      return true; }
    +
    +    // Number of incoming batches (BatchGroups) exceed the limit and 
number of incoming
    +    // batches accumulated since the last spill exceed the defined limit
    +
    +    return bufferedBatches.size() > bufferedBatchLimit;
    +  }
    +
    +  /**
    +   * Perform an in-memory sort of the buffered batches. Obviously can
    +   * be used only for the non-spilling case.
    +   *
    +   * @return DONE if no rows, OK_NEW_SCHEMA if at least one row
    +   */
    +
    +  private IterOutcome sortInMemory() {
    +    logger.info("Starting in-memory sort. Batches = {}, Records = {}, 
Memory = {}",
    +                bufferedBatches.size(), inputRecordCount, 
allocator.getAllocatedMemory());
    +
    +    // Note the difference between how we handle batches here and in the 
spill/merge
    +    // case. In the spill/merge case, this class decides on the batch size 
to send
    +    // downstream. However, in the in-memory case, we must pass along all 
batches
    +    // in a single SV4. Attempts to do paging will result in errors. In 
the memory
    +    // merge case, the downstream Selection Vector Remover will split the 
one
    +    // big SV4 into multiple smaller batches to send further downstream.
    +
    +    // If the sort fails or is empty, clean up here. Otherwise, cleanup is 
done
    +    // by closing the resultsIterator after all results are returned 
downstream.
    +
    +    InMemorySorter memoryMerge = new InMemorySorter(context, allocator, 
opCodeGen);
    +    try {
    +      sv4 = memoryMerge.sort(bufferedBatches, this, container);
    +      if (sv4 == null) {
    +        sortState = SortState.DONE;
    +        return IterOutcome.STOP;
    +      } else {
    +        logger.info("Completed in-memory sort. Memory = {}",
    +                allocator.getAllocatedMemory());
    +        resultsIterator = memoryMerge;
    +        memoryMerge = null;
    +        sortState = SortState.DELIVER;
    +        return IterOutcome.OK_NEW_SCHEMA;
    +      }
    +    } finally {
    +      if (memoryMerge != null) {
    +        memoryMerge.close();
    +      }
    +    }
    +  }
    +
    +  /**
    +   * Perform merging of (typically spilled) batches. First consolidates 
batches
    +   * as needed, then performs a final merge that is read one batch at a 
time
    +   * to deliver batches to the downstream operator.
    +   *
    +   * @return always returns OK_NEW_SCHEMA
    +   */
    +
    +  private IterOutcome mergeSpilledRuns() {
    +    logger.info("Starting consolidate phase. Batches = {}, Records = {}, 
Memory = {}, In-memory batches {}, spilled runs {}",
    +                inputBatchCount, inputRecordCount, 
allocator.getAllocatedMemory(),
    +                bufferedBatches.size(), spilledRuns.size());
    +
    +    // Consolidate batches to a number that can be merged in
    +    // a single last pass.
    +
    +    int mergeCount = 0;
    +    while (consolidateBatches()) {
    +      mergeCount++;
    +    }
    +    stats.addLongStat(Metric.MERGE_COUNT, mergeCount);
    +
    +    // Merge in-memory batches and spilled runs for the final merge.
    +
    +    List<BatchGroup> allBatches = new LinkedList<>();
    +    allBatches.addAll(bufferedBatches);
    +    bufferedBatches.clear();
    +    allBatches.addAll(spilledRuns);
    +    spilledRuns.clear();
    +
    +    logger.info("Starting merge phase. Runs = {}, Alloc. memory = {}", 
allBatches.size(), allocator.getAllocatedMemory());
    +
    +    // Do the final merge as a results iterator.
    +
    +    CopierHolder.BatchMerger merger = copierHolder.startFinalMerge(schema, 
allBatches, container, outputBatchRecordCount);
    +    merger.next();
    +    resultsIterator = merger;
    +    sortState = SortState.DELIVER;
    +    return IterOutcome.OK_NEW_SCHEMA;
    +  }
    +
    +  private boolean consolidateBatches() {
    +
    +    // Determine additional memory needed to hold one batch from each
    +    // spilled run.
    +
    +    int inMemCount = bufferedBatches.size();
    +    int spilledRunsCount = spilledRuns.size();
    +
    +    // Can't merge more than will fit into memory at one time.
    +
    +    int maxMergeWidth = (int) (mergeMemoryPool / estimatedOutputBatchSize);
    +    maxMergeWidth = Math.min(mergeLimit, maxMergeWidth);
    +
    +    // If we can't fit all batches in memory, must spill any in-memory
    +    // batches to make room for multiple spill-merge-spill cycles.
    +
    +    if (inMemCount > 0) {
    +      if (spilledRunsCount > maxMergeWidth) {
    +        spillFromMemory();
    +        return true;
    +      }
    +
    +      // If we just plain have too many batches to merge, spill some
    +      // in-memory batches to reduce the burden.
    +
    +      if (inMemCount + spilledRunsCount > mergeLimit) {
    +        spillFromMemory();
    +        return true;
    +      }
    +
    +      // If the on-disk batches and in-memory batches need more memory than
    +      // is available, spill some in-memory batches.
    +
    +      long allocated = allocator.getAllocatedMemory();
    +      long totalNeeds = spilledRunsCount * estimatedOutputBatchSize + 
allocated;
    +      if (totalNeeds > mergeMemoryPool) {
    +        spillFromMemory();
    +        return true;
    +      }
    +    }
    +
    +    // Merge on-disk batches if we have too many.
    +
    +    int mergeCount = spilledRunsCount - maxMergeWidth;
    +    if (mergeCount <= 0) {
    +      return false;
    +    }
    +
    +    // We will merge. This will create yet another spilled
    +    // run. Account for that.
    +
    +    mergeCount += 1;
    +
    +    // Must merge at least 2 batches to make progress.
    +
    +    mergeCount = Math.max(2, mergeCount);
    +
    +    mergeCount = Math.min(mergeCount, maxMergeWidth);
    +
    +    // Do the merge, then loop to try again in case not
    +    // all the target batches spilled in one go.
    +
    +    logger.trace("Merging {} on-disk runs, Alloc. memory = {}",
    +        mergeCount, allocator.getAllocatedMemory());
    +    mergeAndSpill(spilledRuns, mergeCount);
    +    return true;
    +  }
    +
    +  /**
    +   * This operator has accumulated a set of sorted incoming record batches.
    +   * We wish to spill some of them to disk. To do this, a "copier"
    +   * merges the target batches to produce a stream of new (merged) batches
    +   * which are then written to disk.
    +   * <p>
    +   * This method spills only half the accumulated batches
    +   * minimizing unnecessary disk writes. The exact count must lie between
    +   * the minimum and maximum spill counts.
    +    */
    +
    +  private void spillFromMemory() {
    +    int spillCount;
    +    if (spillFileSize == 0) {
    +      // If no spill limit given, guess half the batches.
    +
    +      spillCount = bufferedBatches.size() / 2;
    +    } else {
    +      // Spill file size given. Figure out how many
    +      // batches.
    +
    +      long estSize = 0;
    +      spillCount = 0;
    +      for (InputBatch batch : bufferedBatches) {
    +        estSize += batch.getDataSize();
    +        if (estSize > spillFileSize) {
    +          break; }
    +        spillCount++;
    +      }
    +    }
    +
    +    // Upper spill bound (optional)
    +
    +    spillCount = Math.min(spillCount, maxSpillLimit);
    +
    +    // Lower spill bound (at least 2, perhaps more)
    +
    +    spillCount = Math.max(spillCount, minSpillLimit);
    +
    +    // Should not happen, but just to be sure...
    +
    +    if (spillCount == 0) {
    +      return; }
    +
    +    // Do the actual spill.
    +
    +    logger.trace("Starting spill from memory. Memory = {}, Batch count = 
{}, Spill count = {}",
    +                 allocator.getAllocatedMemory(), bufferedBatches.size(), 
spillCount);
    +    mergeAndSpill(bufferedBatches, spillCount);
    +  }
    +
    +  private void mergeAndSpill(LinkedList<? extends BatchGroup> source, int 
count) {
    +    if (count == 0) {
    +      return; }
    +    spilledRuns.add(doMergeAndSpill(source, count));
    +  }
    +
    +  private BatchGroup.SpilledRun doMergeAndSpill(LinkedList<? extends 
BatchGroup> batchGroups, int spillCount) {
    +    List<BatchGroup> batchesToSpill = Lists.newArrayList();
    +    spillCount = Math.min(batchGroups.size(), spillCount);
    +    assert spillCount > 0 : "Spill count to mergeAndSpill must not be 
zero";
    +    long spillSize = 0;
    +    for (int i = 0; i < spillCount; i++) {
    +      @SuppressWarnings("resource")
    +      BatchGroup batch = batchGroups.pollFirst();
    +      assert batch != null : "Encountered a null batch during merge and 
spill operation";
    +      batchesToSpill.add(batch);
    +      spillSize += batch.getDataSize();
    +    }
    +
    +    // Merge the selected set of matches and write them to the
    +    // spill file. After each write, we release the memory associated
    +    // with the just-written batch.
    +
    +    String outputFile = spillSet.getNextSpillFile();
    +    stats.setLongStat(Metric.SPILL_COUNT, spillSet.getFileCount());
    +    BatchGroup.SpilledRun newGroup = null;
    +    try (AutoCloseable ignored = AutoCloseables.all(batchesToSpill);
    +         CopierHolder.BatchMerger merger = copierHolder.startMerge(schema, 
batchesToSpill, outputBatchRecordCount)) {
    +      logger.trace("Merging and spilling to {}", outputFile);
    +      newGroup = new BatchGroup.SpilledRun(spillSet, outputFile, oContext, 
spillSize);
    +
    +      // The copier will merge records from the buffered batches into
    +      // the outputContainer up to targetRecordCount number of rows.
    +      // The actual count may be less if fewer records are available.
    +
    +      while (merger.next()) {
    +
    +        // Add a new batch of records (given by merger.getOutput()) to the 
spill
    +        // file, opening the file if not yet open, and creating the target
    +        // directory if it does not yet exist.
    +        //
    +        // note that addBatch also clears the merger's output container
    +
    +        newGroup.addBatch(merger.getOutput());
    +      }
    +      injector.injectChecked(context.getExecutionControls(), 
INTERRUPTION_WHILE_SPILLING, IOException.class);
    +      newGroup.closeOutputStream();
    +      logger.trace("mergeAndSpill: completed, memory = {}, spilled {} 
records to {}",
    +                   allocator.getAllocatedMemory(), 
merger.getRecordCount(), outputFile);
    +      return newGroup;
    +    } catch (Throwable e) {
    +      // we only need to cleanup newGroup if spill failed
    +      try {
    +        if (newGroup != null) {
    +          AutoCloseables.close(e, newGroup);
    +        }
    +      } catch (Throwable t) { /* close() may hit the same IO issue; just 
ignore */ }
    +
    +      // Here the merger is holding onto a partially-completed batch.
    +      // It will release the memory in the close() call.
    +
    +      try {
    +        // Rethrow so we can organize how to handle the error.
    +
    +        throw e;
    +      }
    +
    +      // If error is a User Exception, just use as is.
    +
    +      catch (UserException ue) { throw ue; }
    +      catch (Throwable ex) {
    +        throw UserException.resourceError(ex)
    +              .message("External Sort encountered an error while spilling 
to disk")
    +              .build(logger);
    +      }
    +    }
    +  }
    +
    +  /**
    +   * Allocate and initialize the selection vector used as the sort index.
    +   * Assumes that memory is available for the vector since memory 
management
    +   * ensured space is available.
    +   *
    +   * @return a new, populated selection vector 2
    +   */
    +
    +  private SelectionVector2 newSV2() {
    +    SelectionVector2 sv2 = new SelectionVector2(allocator);
    +    if (!sv2.allocateNewSafe(incoming.getRecordCount())) {
    +      throw UserException.resourceError(new OutOfMemoryException("Unable 
to allocate sv2 buffer"))
    +            .build(logger);
    +    }
    +    for (int i = 0; i < incoming.getRecordCount(); i++) {
    --- End diff --
    
    Is this initialization/population of the SV2 really needed ?  Probably the 
sort() later fills up the right values there.
    p.s. thanks for cleaning - the original/non-managed version of newSV2() is 
a mess. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to