[ 
https://issues.apache.org/jira/browse/DRILL-6832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17001128#comment-17001128
 ] 

ASF GitHub Bot commented on DRILL-6832:
---------------------------------------

ihuzenko commented on pull request #1929: DRILL-6832: Remove the old 
"unmanaged" external sort
URL: https://github.com/apache/drill/pull/1929#discussion_r360444465
 
 

 ##########
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
 ##########
 @@ -286,540 +291,419 @@ public void buildSchema() throws SchemaChangeException 
{
         state = BatchState.DONE;
         break;
       default:
-        break;
+        throw new IllegalStateException("Unexpected iter outcome: " + outcome);
     }
   }
 
+  /**
+   * Process each request for a batch. The first request retrieves
+   * all the incoming batches and sorts them, optionally spilling to
+   * disk as needed. Subsequent calls retrieve the sorted results in
+   * fixed-size batches.
+   */
+
   @Override
   public IterOutcome innerNext() {
-    if (schema != null) {
-      if (spillCount == 0) {
-        return (getSelectionVector4().next()) ? IterOutcome.OK : 
IterOutcome.NONE;
-      } else {
-        Stopwatch w = Stopwatch.createStarted();
-        int count = copier.next(targetRecordCount);
-        if (count > 0) {
-          long t = w.elapsed(TimeUnit.MICROSECONDS);
-          logger.debug("Took {} us to merge {} records", t, count);
-          container.setRecordCount(count);
-          return IterOutcome.OK;
-        } else {
-          logger.debug("copier returned 0 records");
-          return IterOutcome.NONE;
-        }
+    switch (sortState) {
+    case DONE:
+      return NONE;
+    case START:
+      return load();
+    case LOAD:
+      if (!this.retainInMemoryBatchesOnNone) {
+        resetSortState();
       }
+      return (sortState == SortState.DONE) ? NONE : load();
+    case DELIVER:
+      return nextOutputBatch();
+    default:
+      throw new IllegalStateException("Unexpected sort state: " + sortState);
     }
+  }
 
-    int totalCount = 0;
-    int totalBatches = 0; // total number of batches received so far
+  private IterOutcome nextOutputBatch() {
+    // Call next on outputSV4 for it's state to progress in parallel to 
resultsIterator state
+    outputSV4.next();
 
-    try{
-      container.clear();
-      outer: while (true) {
-        IterOutcome upstream;
-        if (first) {
-          upstream = IterOutcome.OK_NEW_SCHEMA;
-        } else {
-          upstream = next(incoming);
-        }
-        if (upstream == IterOutcome.OK && sorter == null) {
-          upstream = IterOutcome.OK_NEW_SCHEMA;
-        }
-        switch (upstream) {
-        case NONE:
-          if (first) {
-            return upstream;
-          }
-          break outer;
-        case NOT_YET:
-          throw new UnsupportedOperationException();
-        case STOP:
-          return upstream;
-        case OK_NEW_SCHEMA:
-        case OK:
-          VectorContainer convertedBatch;
-          // only change in the case that the schema truly changes.  
Artificial schema changes are ignored.
-          if (upstream == IterOutcome.OK_NEW_SCHEMA && 
!incoming.getSchema().equals(schema)) {
-            if (schema != null) {
-              if (unionTypeEnabled) {
-                this.schema = SchemaUtil.mergeSchemas(schema, 
incoming.getSchema());
-              } else {
-                throw new SchemaChangeException("Schema changes not supported 
in External Sort. Please enable Union type");
-              }
-            } else {
-              schema = incoming.getSchema();
-            }
-            convertedBatch = SchemaUtil.coerceContainer(incoming, schema, 
oContext);
-            for (BatchGroup b : batchGroups) {
-              b.setSchema(schema);
-            }
-            for (BatchGroup b : spilledBatchGroups) {
-              b.setSchema(schema);
-            }
-            this.sorter = createNewSorter(context, convertedBatch);
-          } else {
-            convertedBatch = SchemaUtil.coerceContainer(incoming, schema, 
oContext);
-          }
-          if (first) {
-            first = false;
-          }
-          if (convertedBatch.getRecordCount() == 0) {
-            for (VectorWrapper<?> w : convertedBatch) {
-              w.clear();
-            }
-            break;
-          }
-          SelectionVector2 sv2;
-          if (incoming.getSchema().getSelectionVectorMode() == 
BatchSchema.SelectionVectorMode.TWO_BYTE) {
-            sv2 = incoming.getSelectionVector2().clone();
-          } else {
-            try {
-              sv2 = newSV2();
-            } catch(InterruptedException e) {
-              return IterOutcome.STOP;
-            } catch (OutOfMemoryException e) {
-              throw new OutOfMemoryException(e);
-            }
-          }
+    // But if results iterator next returns true that means it has more 
results to pass
+    if (resultsIterator.next()) {
+      container.setRecordCount(getRecordCount());
+      injector.injectUnchecked(context.getExecutionControls(), 
INTERRUPTION_WHILE_MERGING);
+    }
+    // getFinalOutcome will take care of returning correct IterOutcome when 
there is no data to pass and for
+    // EMIT/NONE scenarios
+    return getFinalOutcome();
+  }
 
-          int count = sv2.getCount();
-          totalCount += count;
-          totalBatches++;
-          sorter.setup(context, sv2, convertedBatch);
-          sorter.sort(sv2);
-          RecordBatchData rbd = new RecordBatchData(convertedBatch, 
oAllocator);
-          boolean success = false;
-          try {
-            rbd.setSv2(sv2);
-            batchGroups.add(new BatchGroup(rbd.getContainer(), rbd.getSv2(), 
oContext));
-            if (peakNumBatches < batchGroups.size()) {
-              peakNumBatches = batchGroups.size();
-              stats.setLongStat(Metric.PEAK_BATCHES_IN_MEMORY, peakNumBatches);
-            }
-
-            batchesSinceLastSpill++;
-            if (// If we haven't spilled so far, do we have enough memory for 
MSorter if this turns out to be the last incoming batch?
-                (spillCount == 0 && !hasMemoryForInMemorySort(totalCount)) ||
-                // If we haven't spilled so far, make sure we don't exceed the 
maximum number of batches SV4 can address
-                (spillCount == 0 && totalBatches > Character.MAX_VALUE) ||
-                // TODO(DRILL-4438) - consider setting this threshold more 
intelligently,
-                // lowering caused a failing low memory condition (test in 
BasicPhysicalOpUnitTest)
-                // to complete successfully (although it caused perf decrease 
as there was more spilling)
-
-                // current memory used is more than 95% of memory usage limit 
of this operator
-                (oAllocator.getAllocatedMemory() > .95 * 
oAllocator.getLimit()) ||
-                // Number of incoming batches (BatchGroups) exceed the limit 
and number of incoming batches accumulated
-                // since the last spill exceed the defined limit
-                (batchGroups.size() > SPILL_THRESHOLD && batchesSinceLastSpill 
>= SPILL_BATCH_GROUP_SIZE)) {
-
-              if (firstSpillBatchCount == 0) {
-                firstSpillBatchCount = batchGroups.size();
-              }
-
-              if (spilledBatchGroups.size() > firstSpillBatchCount / 2) {
-                logger.info("Merging spills");
-                final BatchGroup merged = mergeAndSpill(spilledBatchGroups);
-                if (merged != null) {
-                  spilledBatchGroups.addFirst(merged);
-                }
-              }
-              final BatchGroup merged = mergeAndSpill(batchGroups);
-              if (merged != null) { // make sure we don't add null to 
spilledBatchGroups
-                spilledBatchGroups.add(merged);
-                batchesSinceLastSpill = 0;
-              }
-            }
-            success = true;
-          } finally {
-            if (!success) {
-              rbd.clear();
-            }
-          }
-          break;
-        case OUT_OF_MEMORY:
-          logger.debug("received OUT_OF_MEMORY, trying to spill");
-          if (batchesSinceLastSpill > 2) {
-            final BatchGroup merged = mergeAndSpill(batchGroups);
-            if (merged != null) {
-              spilledBatchGroups.add(merged);
-              batchesSinceLastSpill = 0;
-            }
-          } else {
-            logger.debug("not enough batches to spill, sending OUT_OF_MEMORY 
downstream");
-            return IterOutcome.OUT_OF_MEMORY;
-          }
-          break;
-        default:
-          throw new UnsupportedOperationException();
-        }
-      }
+  /**
+   * Load the results and sort them. May bail out early if an exceptional
+   * condition is passed up from the input batch.
+   *
+   * @return return code: OK_NEW_SCHEMA if rows were sorted,
+   * NONE if no rows
+   */
 
-      if (totalCount == 0) {
-        return IterOutcome.NONE;
-      }
-      if (spillCount == 0) {
+  private IterOutcome load() {
+    logger.trace("Start of load phase");
 
-        if (builder != null) {
-          builder.clear();
-          builder.close();
-        }
-        builder = new SortRecordBatchBuilder(oAllocator);
+    // Don't clear the temporary container created by buildSchema() after each 
load since across EMIT outcome we have
+    // to maintain the ValueVector references for downstream operators
 
-        for (BatchGroup group : batchGroups) {
-          RecordBatchData rbd = new RecordBatchData(group.getContainer(), 
oAllocator);
-          rbd.setSv2(group.getSv2());
-          builder.add(rbd);
-        }
+    // Loop over all input batches
 
 
 Review comment:
   ```suggestion
   
   ```
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove old "unmanaged" sort implementation
> ------------------------------------------
>
>                 Key: DRILL-6832
>                 URL: https://issues.apache.org/jira/browse/DRILL-6832
>             Project: Apache Drill
>          Issue Type: Improvement
>    Affects Versions: 1.14.0
>            Reporter: Paul Rogers
>            Assignee: Paul Rogers
>            Priority: Minor
>
> Several releases back Drill introduced a new "managed" external sort that 
> enhanced the sort operator's memory management. To be safe, at the time, the 
> new version was controlled by an option, with the ability to revert to the 
> old version.
> The new version has proven to be stable. The time has come to remove the old 
> version.
> * Remove the implementation in {{physical.impl.xsort}}.
> * Move the implementation from {{physical.impl.xsort.managed}} to the parent 
> package.
> * Remove the conditional code in the batch creator.
> * Remove the option that allowed disabling the new version.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to