[ 
https://issues.apache.org/jira/browse/DRILL-6832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17001109#comment-17001109
 ] 

ASF GitHub Bot commented on DRILL-6832:
---------------------------------------

ihuzenko commented on pull request #1929: DRILL-6832: Remove the old 
"unmanaged" external sort
URL: https://github.com/apache/drill/pull/1929#discussion_r360415588
 
 

 ##########
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
 ##########
 @@ -17,250 +17,255 @@
  */
 package org.apache.drill.exec.physical.impl.xsort;
 
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
-import org.apache.calcite.rel.RelFieldCollation.Direction;
-import org.apache.drill.common.AutoCloseables;
-import org.apache.drill.common.config.DrillConfig;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK;
+import static 
org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.STOP;
+
 import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.common.expression.ErrorCollector;
-import org.apache.drill.common.expression.ErrorCollectorImpl;
-import org.apache.drill.common.expression.LogicalExpression;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.logical.data.Order.Ordering;
-import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.compile.sig.GeneratorMapping;
-import org.apache.drill.exec.compile.sig.MappingSet;
-import org.apache.drill.exec.exception.ClassTransformationException;
-import org.apache.drill.exec.exception.OutOfMemoryException;
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.expr.ClassGenerator;
-import org.apache.drill.exec.expr.ClassGenerator.HoldingContainer;
-import org.apache.drill.exec.expr.CodeGenerator;
-import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
-import org.apache.drill.exec.expr.TypeHelper;
-import org.apache.drill.exec.expr.fn.FunctionGenerationHelper;
-import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.MetricDef;
 import org.apache.drill.exec.physical.config.ExternalSort;
-import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
-import org.apache.drill.exec.physical.impl.sort.SortRecordBatchBuilder;
-import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
-import org.apache.drill.exec.proto.helper.QueryIdHelper;
+import org.apache.drill.exec.physical.impl.spill.SpillSet;
+import 
org.apache.drill.exec.physical.impl.validate.IteratorValidatorBatchIterator;
+import org.apache.drill.exec.physical.impl.xsort.SortImpl.SortResults;
 import org.apache.drill.exec.record.AbstractRecordBatch;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
-import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.SchemaUtil;
-import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.record.WritableBatch;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
 import org.apache.drill.exec.testing.ControlsInjector;
 import org.apache.drill.exec.testing.ControlsInjectorFactory;
-import org.apache.drill.exec.vector.CopyUtil;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.complex.AbstractContainerVector;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import org.apache.drill.shaded.guava.com.google.common.base.Joiner;
-import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
-import org.apache.drill.shaded.guava.com.google.common.collect.Iterators;
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
-import com.sun.codemodel.JConditional;
-import com.sun.codemodel.JExpr;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * External sort batch: a sort batch which can spill to disk in
+ * order to operate within a defined memory footprint.
+ * <p>
+ * <h4>Basic Operation</h4>
+ * The operator has three key phases:
+ * <p>
+ * <ul>
+ * <li>The load phase in which batches are read from upstream.</li>
+ * <li>The merge phase in which spilled batches are combined to
+ * reduce the number of files below the configured limit. (Best
+ * practice is to configure the system to avoid this phase.)
+ * <li>The delivery phase in which batches are combined to produce
+ * the final output.</li>
+ * </ul>
+ * During the load phase:
+ * <p>
+ * <ul>
+ * <li>The incoming (upstream) operator provides a series of batches.</li>
+ * <li>This operator sorts each batch, and accumulates them in an in-memory
+ * buffer.</li>
+ * <li>If the in-memory buffer becomes too large, this operator selects
+ * a subset of the buffered batches to spill.</li>
+ * <li>Each spill set is merged to create a new, sorted collection of
+ * batches, and each is spilled to disk.</li>
+ * <li>To allow the use of multiple disk storage, each spill group is written
+ * round-robin to a set of spill directories.</li>
+ * </ul>
+ * <p>
+ * Data is spilled to disk as a "run". A run consists of one or more (typically
+ * many) batches, each of which is itself a sorted run of records.
+ * <p>
+ * During the sort/merge phase:
+ * <p>
+ * <ul>
+ * <li>When the input operator is complete, this operator merges the 
accumulated
+ * batches (which may be all in memory or partially on disk), and returns
+ * them to the output (downstream) operator in chunks of no more than
+ * 64K records.</li>
+ * <li>The final merge must combine a collection of in-memory and spilled
+ * batches. Several limits apply to the maximum "width" of this merge. For
+ * example, each open spill run consumes a file handle, and we may wish
+ * to limit the number of file handles. Further, memory must hold one batch
+ * from each run, so we may need to reduce the number of runs so that the
+ * remaining runs can fit into memory. A consolidation phase combines
+ * in-memory and spilled batches prior to the final merge to control final
+ * merge width.</li>
+ * <li>A special case occurs if no batches were spilled. In this case, the 
input
+ * batches are sorted in memory without merging.</li>
+ * </ul>
+ * <p>
+ * Many complex details are involved in doing the above; the details are 
explained
+ * in the methods of this class.
+ * <p>
+ * <h4>Configuration Options</h4>
+ * <dl>
+ * <dt>drill.exec.sort.external.spill.fs</dt>
+ * <dd>The file system (file://, hdfs://, etc.) of the spill directory.</dd>
+ * <dt>drill.exec.sort.external.spill.directories</dt>
+ * <dd>The comma delimited list of directories, on the above file
+ * system, to which to spill files in round-robin fashion. The query will
+ * fail if any one of the directories becomes full.</dt>
+ * <dt>drill.exec.sort.external.spill.file_size</dt>
+ * <dd>Target size for first-generation spill files Set this to large
+ * enough to get nice long writes, but not so large that spill directories
+ * are overwhelmed.</dd>
+ * <dt>drill.exec.sort.external.mem_limit</dt>
+ * <dd>Maximum memory to use for the in-memory buffer. (Primarily for 
testing.)</dd>
+ * <dt>drill.exec.sort.external.batch_limit</dt>
+ * <dd>Maximum number of batches to hold in memory. (Primarily for 
testing.)</dd>
+ * <dt>drill.exec.sort.external.spill.max_count</dt>
+ * <dd>Maximum number of batches to add to "first generation" files.
+ * Defaults to 0 (no limit). (Primarily for testing.)</dd>
+ * <dt>drill.exec.sort.external.spill.min_count</dt>
+ * <dd>Minimum number of batches to add to "first generation" files.
+ * Defaults to 0 (no limit). (Primarily for testing.)</dd>
+ * <dt>drill.exec.sort.external.merge_limit</dt>
+ * <dd>Sets the maximum number of runs to be merged in a single pass (limits
+ * the number of open files.)</dd>
+ * </dl>
+ * <p>
+ * The memory limit observed by this operator is the lesser of:
+ * <ul>
+ * <li>The maximum allocation allowed the allocator assigned to this batch
+ * as set by the Foreman, or</li>
+ * <li>The maximum limit configured in the mem_limit parameter above. 
(Primarily for
+ * testing.</li>
+ * </ul>
+ * <h4>Output</h4>
+ * It is helpful to note that the sort operator will produce one of two kinds 
of
+ * output batches.
+ * <ul>
+ * <li>A large output with sv4 if data is sorted in memory. The sv4 addresses
+ * the entire in-memory sort set. A selection vector remover will copy results
+ * into new batches of a size determined by that operator.</li>
+ * <li>A series of batches, without a selection vector, if the sort spills to
+ * disk. In this case, the downstream operator will still be a selection vector
+ * remover, but there is nothing for that operator to remove.
+ * </ul>
+ * Note that, even in the in-memory sort case, this operator could do the 
copying
+ * to eliminate the extra selection vector remover. That is left as an exercise
+ * for another time.
+ * <h4>Logging</h4>
+ * Logging in this operator serves two purposes:
+ * <li>
+ * <ul>
+ * <li>Normal diagnostic information.</li>
+ * <li>Capturing the essence of the operator functionality for analysis in unit
+ * tests.</li>
+ * </ul>
+ * Test logging is designed to capture key events and timings. Take care
+ * when changing or removing log messages as you may need to adjust unit tests
+ * accordingly.
+ */
 
 public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
-  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(ExternalSortBatch.class);
-  private static final ControlsInjector injector = 
ControlsInjectorFactory.getInjector(ExternalSortBatch.class);
-
-  private static final GeneratorMapping COPIER_MAPPING = new 
GeneratorMapping("doSetup", "doCopy", null, null);
-  private final MappingSet MAIN_MAPPING = new MappingSet( (String) null, null, 
ClassGenerator.DEFAULT_SCALAR_MAP, ClassGenerator.DEFAULT_SCALAR_MAP);
-  private final MappingSet LEFT_MAPPING = new MappingSet("leftIndex", null, 
ClassGenerator.DEFAULT_SCALAR_MAP, ClassGenerator.DEFAULT_SCALAR_MAP);
-  private final MappingSet RIGHT_MAPPING = new MappingSet("rightIndex", null, 
ClassGenerator.DEFAULT_SCALAR_MAP, ClassGenerator.DEFAULT_SCALAR_MAP);
-  private final MappingSet COPIER_MAPPING_SET = new MappingSet(COPIER_MAPPING, 
COPIER_MAPPING);
-
-  private final int SPILL_BATCH_GROUP_SIZE;
-  private final int SPILL_THRESHOLD;
-  private final Iterator<String> dirs;
+  static final Logger logger = 
LoggerFactory.getLogger(ExternalSortBatch.class);
+
+  // For backward compatibility, masquerade as the original
+  // external sort. Else, some tests don't pass.
+
 
 Review comment:
   ```suggestion
   
   ```
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove old "unmanaged" sort implementation
> ------------------------------------------
>
>                 Key: DRILL-6832
>                 URL: https://issues.apache.org/jira/browse/DRILL-6832
>             Project: Apache Drill
>          Issue Type: Improvement
>    Affects Versions: 1.14.0
>            Reporter: Paul Rogers
>            Assignee: Paul Rogers
>            Priority: Minor
>
> Several releases back Drill introduced a new "managed" external sort that 
> enhanced the sort operator's memory management. To be safe, at the time, the 
> new version was controlled by an option, with the ability to revert to the 
> old version.
> The new version has proven to be stable. The time has come to remove the old 
> version.
> * Remove the implementation in {{physical.impl.xsort}}.
> * Move the implementation from {{physical.impl.xsort.managed}} to the parent 
> package.
> * Remove the conditional code in the batch creator.
> * Remove the option that allowed disabling the new version.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to