[ 
https://issues.apache.org/jira/browse/DRILL-6832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17001107#comment-17001107
 ] 

ASF GitHub Bot commented on DRILL-6832:
---------------------------------------

ihuzenko commented on pull request #1929: DRILL-6832: Remove the old 
"unmanaged" external sort
URL: https://github.com/apache/drill/pull/1929#discussion_r360426272
 
 

 ##########
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
 ##########
 @@ -17,250 +17,255 @@
  */
 package org.apache.drill.exec.physical.impl.xsort;
 
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
-import org.apache.calcite.rel.RelFieldCollation.Direction;
-import org.apache.drill.common.AutoCloseables;
-import org.apache.drill.common.config.DrillConfig;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK;
+import static 
org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.STOP;
+
 import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.common.expression.ErrorCollector;
-import org.apache.drill.common.expression.ErrorCollectorImpl;
-import org.apache.drill.common.expression.LogicalExpression;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.logical.data.Order.Ordering;
-import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.compile.sig.GeneratorMapping;
-import org.apache.drill.exec.compile.sig.MappingSet;
-import org.apache.drill.exec.exception.ClassTransformationException;
-import org.apache.drill.exec.exception.OutOfMemoryException;
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.expr.ClassGenerator;
-import org.apache.drill.exec.expr.ClassGenerator.HoldingContainer;
-import org.apache.drill.exec.expr.CodeGenerator;
-import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
-import org.apache.drill.exec.expr.TypeHelper;
-import org.apache.drill.exec.expr.fn.FunctionGenerationHelper;
-import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.MetricDef;
 import org.apache.drill.exec.physical.config.ExternalSort;
-import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
-import org.apache.drill.exec.physical.impl.sort.SortRecordBatchBuilder;
-import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
-import org.apache.drill.exec.proto.helper.QueryIdHelper;
+import org.apache.drill.exec.physical.impl.spill.SpillSet;
+import 
org.apache.drill.exec.physical.impl.validate.IteratorValidatorBatchIterator;
+import org.apache.drill.exec.physical.impl.xsort.SortImpl.SortResults;
 import org.apache.drill.exec.record.AbstractRecordBatch;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
-import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.SchemaUtil;
-import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.record.WritableBatch;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
 import org.apache.drill.exec.testing.ControlsInjector;
 import org.apache.drill.exec.testing.ControlsInjectorFactory;
-import org.apache.drill.exec.vector.CopyUtil;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.complex.AbstractContainerVector;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import org.apache.drill.shaded.guava.com.google.common.base.Joiner;
-import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
-import org.apache.drill.shaded.guava.com.google.common.collect.Iterators;
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
-import com.sun.codemodel.JConditional;
-import com.sun.codemodel.JExpr;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * External sort batch: a sort batch which can spill to disk in
+ * order to operate within a defined memory footprint.
+ * <p>
+ * <h4>Basic Operation</h4>
+ * The operator has three key phases:
+ * <p>
+ * <ul>
+ * <li>The load phase in which batches are read from upstream.</li>
+ * <li>The merge phase in which spilled batches are combined to
+ * reduce the number of files below the configured limit. (Best
+ * practice is to configure the system to avoid this phase.)
+ * <li>The delivery phase in which batches are combined to produce
+ * the final output.</li>
+ * </ul>
+ * During the load phase:
+ * <p>
+ * <ul>
+ * <li>The incoming (upstream) operator provides a series of batches.</li>
+ * <li>This operator sorts each batch, and accumulates them in an in-memory
+ * buffer.</li>
+ * <li>If the in-memory buffer becomes too large, this operator selects
+ * a subset of the buffered batches to spill.</li>
+ * <li>Each spill set is merged to create a new, sorted collection of
+ * batches, and each is spilled to disk.</li>
+ * <li>To allow the use of multiple disk storage, each spill group is written
+ * round-robin to a set of spill directories.</li>
+ * </ul>
+ * <p>
+ * Data is spilled to disk as a "run". A run consists of one or more (typically
+ * many) batches, each of which is itself a sorted run of records.
+ * <p>
+ * During the sort/merge phase:
+ * <p>
+ * <ul>
+ * <li>When the input operator is complete, this operator merges the 
accumulated
+ * batches (which may be all in memory or partially on disk), and returns
+ * them to the output (downstream) operator in chunks of no more than
+ * 64K records.</li>
+ * <li>The final merge must combine a collection of in-memory and spilled
+ * batches. Several limits apply to the maximum "width" of this merge. For
+ * example, each open spill run consumes a file handle, and we may wish
+ * to limit the number of file handles. Further, memory must hold one batch
+ * from each run, so we may need to reduce the number of runs so that the
+ * remaining runs can fit into memory. A consolidation phase combines
+ * in-memory and spilled batches prior to the final merge to control final
+ * merge width.</li>
+ * <li>A special case occurs if no batches were spilled. In this case, the 
input
+ * batches are sorted in memory without merging.</li>
+ * </ul>
+ * <p>
+ * Many complex details are involved in doing the above; the details are 
explained
+ * in the methods of this class.
+ * <p>
+ * <h4>Configuration Options</h4>
+ * <dl>
+ * <dt>drill.exec.sort.external.spill.fs</dt>
+ * <dd>The file system (file://, hdfs://, etc.) of the spill directory.</dd>
+ * <dt>drill.exec.sort.external.spill.directories</dt>
+ * <dd>The comma delimited list of directories, on the above file
+ * system, to which to spill files in round-robin fashion. The query will
+ * fail if any one of the directories becomes full.</dt>
+ * <dt>drill.exec.sort.external.spill.file_size</dt>
+ * <dd>Target size for first-generation spill files Set this to large
+ * enough to get nice long writes, but not so large that spill directories
+ * are overwhelmed.</dd>
+ * <dt>drill.exec.sort.external.mem_limit</dt>
+ * <dd>Maximum memory to use for the in-memory buffer. (Primarily for 
testing.)</dd>
+ * <dt>drill.exec.sort.external.batch_limit</dt>
+ * <dd>Maximum number of batches to hold in memory. (Primarily for 
testing.)</dd>
+ * <dt>drill.exec.sort.external.spill.max_count</dt>
+ * <dd>Maximum number of batches to add to "first generation" files.
+ * Defaults to 0 (no limit). (Primarily for testing.)</dd>
+ * <dt>drill.exec.sort.external.spill.min_count</dt>
+ * <dd>Minimum number of batches to add to "first generation" files.
+ * Defaults to 0 (no limit). (Primarily for testing.)</dd>
+ * <dt>drill.exec.sort.external.merge_limit</dt>
+ * <dd>Sets the maximum number of runs to be merged in a single pass (limits
+ * the number of open files.)</dd>
+ * </dl>
+ * <p>
+ * The memory limit observed by this operator is the lesser of:
+ * <ul>
+ * <li>The maximum allocation allowed the allocator assigned to this batch
+ * as set by the Foreman, or</li>
+ * <li>The maximum limit configured in the mem_limit parameter above. 
(Primarily for
+ * testing.</li>
+ * </ul>
+ * <h4>Output</h4>
+ * It is helpful to note that the sort operator will produce one of two kinds 
of
+ * output batches.
+ * <ul>
+ * <li>A large output with sv4 if data is sorted in memory. The sv4 addresses
+ * the entire in-memory sort set. A selection vector remover will copy results
+ * into new batches of a size determined by that operator.</li>
+ * <li>A series of batches, without a selection vector, if the sort spills to
+ * disk. In this case, the downstream operator will still be a selection vector
+ * remover, but there is nothing for that operator to remove.
+ * </ul>
+ * Note that, even in the in-memory sort case, this operator could do the 
copying
+ * to eliminate the extra selection vector remover. That is left as an exercise
+ * for another time.
+ * <h4>Logging</h4>
+ * Logging in this operator serves two purposes:
+ * <li>
+ * <ul>
+ * <li>Normal diagnostic information.</li>
+ * <li>Capturing the essence of the operator functionality for analysis in unit
+ * tests.</li>
+ * </ul>
+ * Test logging is designed to capture key events and timings. Take care
+ * when changing or removing log messages as you may need to adjust unit tests
+ * accordingly.
+ */
 
 public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
-  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(ExternalSortBatch.class);
-  private static final ControlsInjector injector = 
ControlsInjectorFactory.getInjector(ExternalSortBatch.class);
-
-  private static final GeneratorMapping COPIER_MAPPING = new 
GeneratorMapping("doSetup", "doCopy", null, null);
-  private final MappingSet MAIN_MAPPING = new MappingSet( (String) null, null, 
ClassGenerator.DEFAULT_SCALAR_MAP, ClassGenerator.DEFAULT_SCALAR_MAP);
-  private final MappingSet LEFT_MAPPING = new MappingSet("leftIndex", null, 
ClassGenerator.DEFAULT_SCALAR_MAP, ClassGenerator.DEFAULT_SCALAR_MAP);
-  private final MappingSet RIGHT_MAPPING = new MappingSet("rightIndex", null, 
ClassGenerator.DEFAULT_SCALAR_MAP, ClassGenerator.DEFAULT_SCALAR_MAP);
-  private final MappingSet COPIER_MAPPING_SET = new MappingSet(COPIER_MAPPING, 
COPIER_MAPPING);
-
-  private final int SPILL_BATCH_GROUP_SIZE;
-  private final int SPILL_THRESHOLD;
-  private final Iterator<String> dirs;
+  static final Logger logger = 
LoggerFactory.getLogger(ExternalSortBatch.class);
+
+  // For backward compatibility, masquerade as the original
+  // external sort. Else, some tests don't pass.
+
+  protected static final ControlsInjector injector =
+      
ControlsInjectorFactory.getInjector(org.apache.drill.exec.physical.impl.xsort.ExternalSortBatch.class);
+
+  public static final String INTERRUPTION_AFTER_SORT = "after-sort";
+  public static final String INTERRUPTION_AFTER_SETUP = "after-setup";
+  public static final String INTERRUPTION_WHILE_SPILLING = "spilling";
+  public static final String INTERRUPTION_WHILE_MERGING = "merging";
+  private boolean retainInMemoryBatchesOnNone;
+
   private final RecordBatch incoming;
-  private final BufferAllocator oAllocator;
-  private final BufferAllocator copierAllocator;
 
-  private BatchSchema schema;
-  private SingleBatchSorter sorter;
-  private SortRecordBatchBuilder builder;
-  private MSorter mSorter;
   /**
-   * A single PriorityQueueCopier instance is used for 2 purposes:
-   * 1. Merge sorted batches before spilling
-   * 2. Merge sorted batches when all incoming data fits in memory
+   * Schema of batches that this operator produces.
    */
-  private PriorityQueueCopier copier;
-  private LinkedList<BatchGroup> batchGroups = Lists.newLinkedList();
-  private LinkedList<BatchGroup> spilledBatchGroups = Lists.newLinkedList();
-  private SelectionVector4 sv4;
-  private FileSystem fs;
-  private int spillCount = 0;
-  private int batchesSinceLastSpill = 0;
-  private boolean first = true;
-  private int targetRecordCount;
-  private final String fileName;
-  private Set<Path> currSpillDirs = Sets.newTreeSet();
-  private int firstSpillBatchCount = 0;
-  private int peakNumBatches = -1;
+
+  private BatchSchema schema;
 
   /**
-   * The copier uses the COPIER_BATCH_MEM_LIMIT to estimate the target
-   * number of records to return in each batch.
+   * Iterates over the final, sorted results.
    */
-  private static final int COPIER_BATCH_MEM_LIMIT = 256 * 1024;
 
-  public static final String INTERRUPTION_AFTER_SORT = "after-sort";
-  public static final String INTERRUPTION_AFTER_SETUP = "after-setup";
-  public static final String INTERRUPTION_WHILE_SPILLING = "spilling";
+  private SortResults resultsIterator;
+  private enum SortState { START, LOAD, DELIVER, DONE }
+  private SortState sortState = SortState.START;
+
+  private final SortConfig sortConfig;
+
+  private SortImpl sortImpl;
+
+  private IterOutcome lastKnownOutcome;
+
+  private boolean firstBatchOfSchema;
+
+  private final VectorContainer outputWrapperContainer;
 
-  // Be careful here! This enum is used in TWO places! First, it is used
-  // in this code to build up metrics. Easy enough. But, it is also used
-  // in OperatorMetricRegistry to define the metrics for the
-  // operator ID defined in CoreOperatorType. As a result, the values
-  // defined here are shared between this legacy version AND the new
-  // managed version. (Though the new, managed version has its own
-  // copy of this enum.) The two enums MUST be identical.
+  private final SelectionVector4 outputSV4;
 
   public enum Metric implements MetricDef {
     SPILL_COUNT,            // number of times operator spilled to disk
-    RETIRED1,               // Was: peak value for totalSizeInMemory
+    NOT_USED,               // Was: peak value for totalSizeInMemory
                             // But operator already provides this value
     PEAK_BATCHES_IN_MEMORY, // maximum number of batches kept in memory
-    MERGE_COUNT,            // Used only by the managed version.
-    MIN_BUFFER,             // Used only by the managed version.
-    INPUT_BATCHES;          // Used only by the managed version.
+    MERGE_COUNT,            // Number of second+ generation merges
+    MIN_BUFFER,             // Minimum memory level observed in operation.
+    SPILL_MB;               // Number of MB of data spilled to disk. This
+                            // amount is first written, then later re-read.
+                            // So, disk I/O is twice this amount.
 
     @Override
     public int metricId() {
       return ordinal();
     }
   }
 
-  public ExternalSortBatch(ExternalSort popConfig, FragmentContext context, 
RecordBatch incoming) throws OutOfMemoryException {
+  public ExternalSortBatch(ExternalSort popConfig, FragmentContext context, 
RecordBatch incoming) {
     super(popConfig, context, true);
     this.incoming = incoming;
-    DrillConfig config = context.getConfig();
-    Configuration conf = new Configuration();
-    conf.set(FileSystem.FS_DEFAULT_NAME_KEY, 
config.getString(ExecConstants.EXTERNAL_SORT_SPILL_FILESYSTEM));
-    try {
-      this.fs = FileSystem.get(conf);
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-    SPILL_BATCH_GROUP_SIZE = 
config.getInt(ExecConstants.EXTERNAL_SORT_SPILL_GROUP_SIZE);
-    SPILL_THRESHOLD = 
config.getInt(ExecConstants.EXTERNAL_SORT_SPILL_THRESHOLD);
-    dirs = 
Iterators.cycle(config.getStringList(ExecConstants.EXTERNAL_SORT_SPILL_DIRS));
-    oAllocator = oContext.getAllocator();
-    copierAllocator = oAllocator.newChildAllocator(oAllocator.getName() + 
":copier",
-        PriorityQueueCopier.INITIAL_ALLOCATION, 
PriorityQueueCopier.MAX_ALLOCATION);
-    FragmentHandle handle = context.getHandle();
-    fileName = String.format("%s_majorfragment%s_minorfragment%s_operator%s", 
QueryIdHelper.getQueryId(handle.getQueryId()),
-        handle.getMajorFragmentId(), handle.getMinorFragmentId(), 
popConfig.getOperatorId());
+    outputWrapperContainer = new VectorContainer(context.getAllocator());
+    outputSV4 = new SelectionVector4(context.getAllocator(), 0);
+    sortConfig = new SortConfig(context.getConfig(), context.getOptions());
+    oContext.setInjector(injector);
+    sortImpl = createNewSortImpl();
+
+    // The upstream operator checks on record count before we have
+    // results. Create an empty result set temporarily to handle
+    // these calls.
+
+    resultsIterator = new SortImpl.EmptyResults(outputWrapperContainer);
   }
 
   @Override
   public int getRecordCount() {
-    if (sv4 != null) {
-      return sv4.getCount();
-    }
-    return container.getRecordCount();
+    return resultsIterator.getRecordCount();
   }
 
   @Override
   public SelectionVector4 getSelectionVector4() {
-    return sv4;
-  }
-
-  private void closeBatchGroups(Collection<BatchGroup> groups) {
-    for (BatchGroup group: groups) {
-      try {
-        group.close();
-      } catch (Exception e) {
-        // collect all failure and make sure to cleanup all remaining batches
-        // Originally we would have thrown a RuntimeException that would 
propagate to FragmentExecutor.closeOutResources()
-        // where it would have been passed to context.fail()
-        // passing the exception directly to context.fail(e) will let the 
cleanup process continue instead of stopping
-        // right away, this will also make sure we collect any additional 
exception we may get while cleaning up
-        context.getExecutorState().fail(e);
-      }
-    }
+    // Return outputSV4 instead of resultsIterator sv4. For resultsIterator 
which has null SV4 outputSV4 will be empty.
+    // But Sort with EMIT outcome will ideally fail in those cases while 
preparing output container as it's not
+    // supported currently, like for spilling scenarios
 
 Review comment:
   maybe convert to javadoc? 
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove old "unmanaged" sort implementation
> ------------------------------------------
>
>                 Key: DRILL-6832
>                 URL: https://issues.apache.org/jira/browse/DRILL-6832
>             Project: Apache Drill
>          Issue Type: Improvement
>    Affects Versions: 1.14.0
>            Reporter: Paul Rogers
>            Assignee: Paul Rogers
>            Priority: Minor
>
> Several releases back Drill introduced a new "managed" external sort that 
> enhanced the sort operator's memory management. To be safe, at the time, the 
> new version was controlled by an option, with the ability to revert to the 
> old version.
> The new version has proven to be stable. The time has come to remove the old 
> version.
> * Remove the implementation in {{physical.impl.xsort}}.
> * Move the implementation from {{physical.impl.xsort.managed}} to the parent 
> package.
> * Remove the conditional code in the batch creator.
> * Remove the option that allowed disabling the new version.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to