[
https://issues.apache.org/jira/browse/DRILL-5080?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15847966#comment-15847966
]
ASF GitHub Bot commented on DRILL-5080:
---------------------------------------
Github user Ben-Zvi commented on a diff in the pull request:
https://github.com/apache/drill/pull/717#discussion_r98790336
--- Diff:
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
---
@@ -0,0 +1,1321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.xsort.managed;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.MetricDef;
+import org.apache.drill.exec.physical.config.ExternalSort;
+import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
+import org.apache.drill.exec.physical.impl.xsort.MSortTemplate;
+import org.apache.drill.exec.physical.impl.xsort.SingleBatchSorter;
+import
org.apache.drill.exec.physical.impl.xsort.managed.BatchGroup.InputBatch;
+import org.apache.drill.exec.record.AbstractRecordBatch;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.SchemaUtil;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.record.WritableBatch;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+import org.apache.drill.exec.testing.ControlsInjector;
+import org.apache.drill.exec.testing.ControlsInjectorFactory;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.complex.AbstractContainerVector;
+
+import com.google.common.collect.Lists;
+
+/**
+ * External sort batch: a sort batch which can spill to disk in
+ * order to operate within a defined memory footprint.
+ * <p>
+ * <h4>Basic Operation</h4>
+ * The operator has three key phases:
+ * <p>
+ * <ul>
+ * <li>The load phase in which batches are read from upstream.</li>
+ * <li>The merge phase in which spilled batches are combined to
+ * reduce the number of files below the configured limit. (Best
+ * practice is to configure the system to avoid this phase.)
+ * <li>The delivery phase in which batches are combined to produce
+ * the final output.</li>
+ * </ul>
+ * During the load phase:
+ * <p>
+ * <ul>
+ * <li>The incoming (upstream) operator provides a series of batches.</li>
+ * <li>This operator sorts each batch, and accumulates them in an in-memory
+ * buffer.</li>
+ * <li>If the in-memory buffer becomes too large, this operator selects
+ * a subset of the buffered batches to spill.</li>
+ * <li>Each spill set is merged to create a new, sorted collection of
+ * batches, and each is spilled to disk.</li>
+ * <li>To allow the use of multiple disk storage, each spill group is
written
+ * round-robin to a set of spill directories.</li>
+ * </ul>
+ * <p>
+ * During the sort/merge phase:
+ * <p>
+ * <ul>
+ * <li>When the input operator is complete, this operator merges the
accumulated
+ * batches (which may be all in memory or partially on disk), and returns
+ * them to the output (downstream) operator in chunks of no more than
+ * 32K records.</li>
+ * <li>The final merge must combine a collection of in-memory and spilled
+ * batches. Several limits apply to the maximum "width" of this merge. For
+ * example, we each open spill run consumes a file handle, and we may wish
+ * to limit the number of file handles. A consolidation phase combines
+ * in-memory and spilled batches prior to the final merge to control final
+ * merge width.</li>
+ * <li>A special case occurs if no batches were spilled. In this case, the
input
+ * batches are sorted in memory without merging.</li>
+ * </ul>
+ * <p>
+ * Many complex details are involved in doing the above; the details are
explained
+ * in the methods of this class.
+ * <p>
+ * <h4>Configuration Options</h4>
+ * <dl>
+ * <dt>drill.exec.sort.external.spill.fs</dt>
+ * <dd>The file system (file://, hdfs://, etc.) of the spill
directory.</dd>
+ * <dt>drill.exec.sort.external.spill.directories</dt>
+ * <dd>The (comma? space?) separated list of directories, on the above file
+ * system, to which to spill files in round-robin fashion. The query will
+ * fail if any one of the directories becomes full.</dt>
+ * <dt>drill.exec.sort.external.spill.file_size</dt>
+ * <dd>Target size for first-generation spill files Set this to large
+ * enough to get nice long writes, but not so large that spill directories
+ * are overwhelmed.</dd>
+ * <dt>drill.exec.sort.external.mem_limit</dt>
+ * <dd>Maximum memory to use for the in-memory buffer. (Primarily for
testing.)</dd>
+ * <dt>drill.exec.sort.external.batch_limit</dt>
+ * <dd>Maximum number of batches to hold in memory. (Primarily for
testing.)</dd>
+ * <dt>drill.exec.sort.external.spill.max_count</dt>
+ * <dd>Maximum number of batches to add to “first generation” files.
+ * Defaults to 0 (no limit). (Primarily for testing.)</dd>
+ * <dt>drill.exec.sort.external.spill.min_count</dt>
+ * <dd>Minimum number of batches to add to “first generation” files.
+ * Defaults to 0 (no limit). (Primarily for testing.)</dd>
+ * <dt>drill.exec.sort.external.merge_limit</dt>
+ * <dd>Sets the maximum number of runs to be merged in a single pass
(limits
+ * the number of open files.)</dd>
+ * </dl>
+ * <p>
+ * The memory limit observed by this operator is the lesser of:
+ * <ul>
+ * <li>The maximum allocation allowed the the allocator assigned to this
batch, or</li>
+ * <li>The maximum limit set for this operator by the Foreman.</li>
+ * <li>The maximum limit configured in the mem_limit parameter above.
(Primarily for
+ * testing.</li>
+ * </ul>
+ * <h4>Output</h4>
+ * It is helpful to note that the sort operator will produce one of two
kinds of
+ * output batches.
+ * <ul>
+ * <li>A large output with sv4 if data is sorted in memory. The sv4
addresses
+ * the entire in-memory sort set. A selection vector remover will copy
results
+ * into new batches of a size determined by that operator.</li>
+ * <li>A series of batches, without a selection vector, if the sort spills
to
+ * disk. In this case, the downstream operator will still be a selection
vector
+ * remover, but there is nothing for that operator to remove. Each batch is
+ * of the size set by {@link #MAX_MERGED_BATCH_SIZE}.</li>
+ * </ul>
+ * Note that, even in the in-memory sort case, this operator could do the
copying
+ * to eliminate the extra selection vector remover. That is left as an
exercise
+ * for another time.
+ * <h4>Logging</h4>
+ * Logging in this operator serves two purposes:
+ * <li>
+ * <ul>
+ * <li>Normal diagnostic information.</li>
+ * <li>Capturing the essence of the operator functionality for analysis in
unit
+ * tests.</li>
+ * </ul>
+ * Test logging is designed to capture key events and timings. Take care
+ * when changing or removing log messages as you may need to adjust unit
tests
+ * accordingly.
+ */
+
+public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
+ private static final org.slf4j.Logger logger =
org.slf4j.LoggerFactory.getLogger(ExternalSortBatch.class);
+ protected static final ControlsInjector injector =
ControlsInjectorFactory.getInjector(ExternalSortBatch.class);
+
+ private final RecordBatch incoming;
+
+ /**
+ * Memory allocator for this operator itself. Incoming batches are
+ * transferred into this allocator. Intermediate batches used during
+ * merge also reside here.
+ */
+
+ private final BufferAllocator allocator;
+
+ /**
+ * Schema of batches that this operator produces.
+ */
+
+ private BatchSchema schema;
+
+ private LinkedList<BatchGroup.InputBatch> bufferedBatches =
Lists.newLinkedList();
+ private LinkedList<BatchGroup.SpilledRun> spilledRuns =
Lists.newLinkedList();
+ private SelectionVector4 sv4;
+
+ /**
+ * The number of records to add to each output batch sent to the
+ * downstream operator or spilled to disk.
+ */
+
+ private int outputBatchRecordCount;
+ private int peakNumBatches = -1;
+
+ /**
+ * The copier uses the COPIER_BATCH_MEM_LIMIT to estimate the target
+ * number of records to return in each batch.
+ * <p>
+ * For reference, see {@link FlattenTemplate#OUTPUT_MEMORY_LIMIT}.
+ * <p>
+ * WARNING: Do not allow any one vector to grow beyond 16 MB. Drill
contains a
+ * design flaw that may give rise to fatal memory fragmentation if we
allow it to
+ * allocate larger vectors.
+ */
+
+ private static final int MAX_MERGED_BATCH_SIZE = 16 * 1024 * 1024;
+
+ /**
+ * Smallest allowed output batch size. The smallest output batch
+ * created even under constrained memory conditions.
+ */
+ private static final int MIN_MERGED_BATCH_SIZE = 256 * 1024;
+
+ /**
+ * The preferred amount of memory to set aside to output batches
+ * expressed as a ratio of available memory.
+ */
+
+ private static final float MERGE_BATCH_ALLOWANCE = 0.10F;
+
+ public static final String INTERRUPTION_AFTER_SORT = "after-sort";
+ public static final String INTERRUPTION_AFTER_SETUP = "after-setup";
+ public static final String INTERRUPTION_WHILE_SPILLING = "spilling";
+
+ private long memoryLimit;
+
+ /**
+ * Iterates over the final, sorted results.
+ */
+
+ private SortResults resultsIterator;
+
+ /**
+ * Manages the set of spill directories and files.
+ */
+
+ private final SpillSet spillSet;
+
+ /**
+ * Manages the copier used to merge a collection of batches into
+ * a new set of batches.
+ */
+
+ private final CopierHolder copierHolder;
+
+ private enum SortState { START, LOAD, DELIVER, DONE }
+ private SortState sortState = SortState.START;
+ private int inputRecordCount = 0;
+ private int inputBatchCount = 0; // total number of batches received so
far
+ private final OperatorCodeGenerator opCodeGen;
+
+ /**
+ * Estimated size of the records for this query, updated on each
+ * new batch received from upstream.
+ */
+
+ private int estimatedRecordSize;
+
+ /**
+ * Estimated size of the spill and output batches that this
+ * operator produces, estimated from the estimated record
+ * size.
+ */
+
+ private long estimatedOutputBatchSize;
+ private long estimatedInputBatchSize;
+
+ /**
+ * Maximum number of batches to hold in memory.
+ * (Primarily for testing.)
+ */
+
+ private int bufferedBatchLimit;
+ private int mergeLimit;
+ private int minSpillLimit;
+ private int maxSpillLimit;
+ private long spillFileSize;
+ private long minimumBufferSpace;
+
+ /**
+ * Minimum memory level before spilling occurs. That is, we can buffer
input
+ * batches in memory until we are down to the level given by the spill
point.
+ */
+
+ private long spillPoint;
+ private long mergeMemoryPool;
+ private long preferredMergeBatchSize;
+
+ // WARNING: The enum here is used within this class. But, the members of
+ // this enum MUST match those in the (unmanaged) ExternalSortBatch since
+ // that is the enum used in the UI to display metrics for the query
profile.
+
+ public enum Metric implements MetricDef {
+ SPILL_COUNT, // number of times operator spilled to disk
+ RETIRED1, // Was: peak value for totalSizeInMemory
+ // But operator already provides this value
+ PEAK_BATCHES_IN_MEMORY, // maximum number of batches kept in memory
+ MERGE_COUNT, // Number of second+ generation merges
+ MIN_BUFFER, // Minimum memory level observed in operation.
+ INPUT_BATCHES; // Number of batches read from upstream.
+
+ @Override
+ public int metricId() {
+ return ordinal();
+ }
+ }
+
+ /**
+ * Iterates over the final sorted results. Implemented differently
+ * depending on whether the results are in-memory or spilled to
+ * disk.
+ */
+
+ public interface SortResults {
+ boolean next();
+ void close();
+ int getBatchCount();
+ int getRecordCount();
+ }
+
+ public ExternalSortBatch(ExternalSort popConfig, FragmentContext
context, RecordBatch incoming) {
+ super(popConfig, context, true);
+ this.incoming = incoming;
+ allocator = oContext.getAllocator();
+ opCodeGen = new OperatorCodeGenerator(context, popConfig);
+
+ spillSet = new SpillSet(context, popConfig);
+ copierHolder = new CopierHolder(context, allocator, opCodeGen);
+ configure(context.getConfig());
+ }
+
+ private void configure(DrillConfig config) {
+
+ // The maximum memory this operator can use. It is either the
+ // limit set on the allocator or on the operator, whichever is
+ // less.
+
+ memoryLimit = Math.min(popConfig.getMaxAllocation(),
allocator.getLimit());
+
+ // Optional configured memory limit, typically used only for testing.
+
+ long configLimit =
config.getBytes(ExecConstants.EXTERNAL_SORT_MAX_MEMORY);
+ if (configLimit > 0) {
+ memoryLimit = Math.min(memoryLimit, configLimit);
+ }
+
+ // Optional limit on the number of buffered in-memory batches.
+ // 0 means no limit. Used primarily for testing. Must allow at least
two
+ // batches or no merging can occur.
+
+ bufferedBatchLimit = getConfigLimit(config,
ExecConstants.EXTERNAL_SORT_BATCH_LIMIT, Integer.MAX_VALUE, 2);
+
+ // Optional limit on the number of spilled runs to merge in a single
+ // pass. Limits the number of open file handles. Must allow at least
+ // two batches to merge to make progress.
+
+ mergeLimit = getConfigLimit(config,
ExecConstants.EXTERNAL_SORT_MERGE_LIMIT, Integer.MAX_VALUE, 2);
+
+ // Limits on the minimum and maximum buffered batches to spill per
+ // spill event.
+
+ minSpillLimit = getConfigLimit(config,
ExecConstants.EXTERNAL_SORT_MIN_SPILL, Integer.MAX_VALUE, 2);
+ maxSpillLimit = getConfigLimit(config,
ExecConstants.EXTERNAL_SORT_MAX_SPILL, Integer.MAX_VALUE, 2);
+ if (minSpillLimit > maxSpillLimit) {
+ minSpillLimit = Math.min(minSpillLimit, maxSpillLimit);
+ maxSpillLimit = minSpillLimit;
+ }
+
+ // Limits the size of first-generation spill files.
+
+ spillFileSize =
config.getBytes(ExecConstants.EXTERNAL_SORT_SPILL_FILE_SIZE);
+
+ // Set the target output batch size. Use the maximum size, but only if
+ // this represents less than 10% of available memory. Otherwise, use
10%
+ // of memory, but no smaller than the minimum size. In any event, an
+ // output batch can contain no fewer than a single record.
+
+ long maxAllowance = (long) (memoryLimit * MERGE_BATCH_ALLOWANCE);
+ preferredMergeBatchSize = Math.min(maxAllowance,
MAX_MERGED_BATCH_SIZE);
+ preferredMergeBatchSize = Math.max(preferredMergeBatchSize,
MIN_MERGED_BATCH_SIZE);
+
+ logger.debug("Config: memory limit = {}, batch limit = {}, " +
+ "min, max spill limit: {}, {}, merge limit = {}, merge
batch size = {}",
+ memoryLimit, bufferedBatchLimit, minSpillLimit,
maxSpillLimit, mergeLimit,
+ preferredMergeBatchSize);
+ }
+
+ private int getConfigLimit(DrillConfig config, String paramName, int
valueIfZero, int minValue) {
+ int limit = config.getInt(paramName);
+ if (limit > 0) {
+ limit = Math.max(limit, minValue);
+ } else {
+ limit = valueIfZero;
+ }
+ return limit;
+ }
+
+ @Override
+ public int getRecordCount() {
+ if (sv4 != null) {
+ return sv4.getCount();
+ }
+ return container.getRecordCount();
+ }
+
+ @Override
+ public SelectionVector4 getSelectionVector4() {
+ return sv4;
+ }
+
+ private void closeBatchGroups(Collection<? extends BatchGroup> groups) {
+ for (BatchGroup group: groups) {
+ try {
+ group.close();
+ } catch (Exception e) {
+ // collect all failure and make sure to cleanup all remaining
batches
+ // Originally we would have thrown a RuntimeException that would
propagate to FragmentExecutor.closeOutResources()
+ // where it would have been passed to context.fail()
+ // passing the exception directly to context.fail(e) will let the
cleanup process continue instead of stopping
+ // right away, this will also make sure we collect any additional
exception we may get while cleaning up
+ context.fail(e);
+ }
+ }
+ }
+
+ @Override
+ public void close() {
+ try {
+ if (bufferedBatches != null) {
+ closeBatchGroups(bufferedBatches);
+ bufferedBatches = null;
+ }
+ if (spilledRuns != null) {
+ closeBatchGroups(spilledRuns);
+ spilledRuns = null;
+ }
+ } finally {
+ if (sv4 != null) {
+ sv4.clear();
+ }
+ if (resultsIterator != null) {
+ resultsIterator.close();
+ }
+ copierHolder.close();
+ spillSet.close();
+ opCodeGen.close();
+
+ // The call to super.close() clears out the output container.
+ // Doing so requires the allocator here, so it must be closed
+ // after the super call.
+
+ super.close();
+ allocator.close();
+ }
+ }
+
+ /**
+ * Called by {@link AbstractRecordBatch} as a fast-path to obtain
+ * the first record batch and setup the schema of this batch in order
+ * to quickly return the schema to the client. Note that this method
+ * fetches the first batch from upstream which will be waiting for
+ * us the first time that {@link #innerNext()} is called.
+ */
+
+ @Override
+ public void buildSchema() {
+ IterOutcome outcome = next(incoming);
+ switch (outcome) {
+ case OK:
+ case OK_NEW_SCHEMA:
+ for (VectorWrapper<?> w : incoming) {
+ @SuppressWarnings("resource")
+ ValueVector v = container.addOrGet(w.getField());
+ if (v instanceof AbstractContainerVector) {
+ w.getValueVector().makeTransferPair(v); // Can we remove this
hack?
+ v.clear();
+ }
+ v.allocateNew(); // Can we remove this? - SVR fails with NPE
(TODO)
--- End diff --
Why allocating on line 482, then finishing the loop cycle (which would make
the allocation into garbage) ?
> Create a memory-managed version of the External Sort operator
> -------------------------------------------------------------
>
> Key: DRILL-5080
> URL: https://issues.apache.org/jira/browse/DRILL-5080
> Project: Apache Drill
> Issue Type: Improvement
> Affects Versions: 1.8.0
> Reporter: Paul Rogers
> Assignee: Paul Rogers
> Fix For: 1.10
>
> Attachments: ManagedExternalSortDesign.pdf
>
>
> We propose to create a "managed" version of the external sort operator that
> works to a clearly-defined memory limit. Attached is a design specification
> for the work.
> The project will include fixing a number of bugs related to the external
> sort, include as sub-tasks of this umbrella task.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)