[ 
https://issues.apache.org/jira/browse/DRILL-6832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17001106#comment-17001106
 ] 

ASF GitHub Bot commented on DRILL-6832:
---------------------------------------

ihuzenko commented on pull request #1929: DRILL-6832: Remove the old 
"unmanaged" external sort
URL: https://github.com/apache/drill/pull/1929#discussion_r360450237
 
 

 ##########
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
 ##########
 @@ -286,540 +291,419 @@ public void buildSchema() throws SchemaChangeException 
{
         state = BatchState.DONE;
         break;
       default:
-        break;
+        throw new IllegalStateException("Unexpected iter outcome: " + outcome);
     }
   }
 
+  /**
+   * Process each request for a batch. The first request retrieves
+   * all the incoming batches and sorts them, optionally spilling to
+   * disk as needed. Subsequent calls retrieve the sorted results in
+   * fixed-size batches.
+   */
+
   @Override
   public IterOutcome innerNext() {
-    if (schema != null) {
-      if (spillCount == 0) {
-        return (getSelectionVector4().next()) ? IterOutcome.OK : 
IterOutcome.NONE;
-      } else {
-        Stopwatch w = Stopwatch.createStarted();
-        int count = copier.next(targetRecordCount);
-        if (count > 0) {
-          long t = w.elapsed(TimeUnit.MICROSECONDS);
-          logger.debug("Took {} us to merge {} records", t, count);
-          container.setRecordCount(count);
-          return IterOutcome.OK;
-        } else {
-          logger.debug("copier returned 0 records");
-          return IterOutcome.NONE;
-        }
+    switch (sortState) {
+    case DONE:
+      return NONE;
+    case START:
+      return load();
+    case LOAD:
+      if (!this.retainInMemoryBatchesOnNone) {
+        resetSortState();
       }
+      return (sortState == SortState.DONE) ? NONE : load();
+    case DELIVER:
+      return nextOutputBatch();
+    default:
+      throw new IllegalStateException("Unexpected sort state: " + sortState);
     }
+  }
 
-    int totalCount = 0;
-    int totalBatches = 0; // total number of batches received so far
+  private IterOutcome nextOutputBatch() {
+    // Call next on outputSV4 for it's state to progress in parallel to 
resultsIterator state
+    outputSV4.next();
 
-    try{
-      container.clear();
-      outer: while (true) {
-        IterOutcome upstream;
-        if (first) {
-          upstream = IterOutcome.OK_NEW_SCHEMA;
-        } else {
-          upstream = next(incoming);
-        }
-        if (upstream == IterOutcome.OK && sorter == null) {
-          upstream = IterOutcome.OK_NEW_SCHEMA;
-        }
-        switch (upstream) {
-        case NONE:
-          if (first) {
-            return upstream;
-          }
-          break outer;
-        case NOT_YET:
-          throw new UnsupportedOperationException();
-        case STOP:
-          return upstream;
-        case OK_NEW_SCHEMA:
-        case OK:
-          VectorContainer convertedBatch;
-          // only change in the case that the schema truly changes.  
Artificial schema changes are ignored.
-          if (upstream == IterOutcome.OK_NEW_SCHEMA && 
!incoming.getSchema().equals(schema)) {
-            if (schema != null) {
-              if (unionTypeEnabled) {
-                this.schema = SchemaUtil.mergeSchemas(schema, 
incoming.getSchema());
-              } else {
-                throw new SchemaChangeException("Schema changes not supported 
in External Sort. Please enable Union type");
-              }
-            } else {
-              schema = incoming.getSchema();
-            }
-            convertedBatch = SchemaUtil.coerceContainer(incoming, schema, 
oContext);
-            for (BatchGroup b : batchGroups) {
-              b.setSchema(schema);
-            }
-            for (BatchGroup b : spilledBatchGroups) {
-              b.setSchema(schema);
-            }
-            this.sorter = createNewSorter(context, convertedBatch);
-          } else {
-            convertedBatch = SchemaUtil.coerceContainer(incoming, schema, 
oContext);
-          }
-          if (first) {
-            first = false;
-          }
-          if (convertedBatch.getRecordCount() == 0) {
-            for (VectorWrapper<?> w : convertedBatch) {
-              w.clear();
-            }
-            break;
-          }
-          SelectionVector2 sv2;
-          if (incoming.getSchema().getSelectionVectorMode() == 
BatchSchema.SelectionVectorMode.TWO_BYTE) {
-            sv2 = incoming.getSelectionVector2().clone();
-          } else {
-            try {
-              sv2 = newSV2();
-            } catch(InterruptedException e) {
-              return IterOutcome.STOP;
-            } catch (OutOfMemoryException e) {
-              throw new OutOfMemoryException(e);
-            }
-          }
+    // But if results iterator next returns true that means it has more 
results to pass
+    if (resultsIterator.next()) {
+      container.setRecordCount(getRecordCount());
+      injector.injectUnchecked(context.getExecutionControls(), 
INTERRUPTION_WHILE_MERGING);
+    }
+    // getFinalOutcome will take care of returning correct IterOutcome when 
there is no data to pass and for
+    // EMIT/NONE scenarios
+    return getFinalOutcome();
+  }
 
-          int count = sv2.getCount();
-          totalCount += count;
-          totalBatches++;
-          sorter.setup(context, sv2, convertedBatch);
-          sorter.sort(sv2);
-          RecordBatchData rbd = new RecordBatchData(convertedBatch, 
oAllocator);
-          boolean success = false;
-          try {
-            rbd.setSv2(sv2);
-            batchGroups.add(new BatchGroup(rbd.getContainer(), rbd.getSv2(), 
oContext));
-            if (peakNumBatches < batchGroups.size()) {
-              peakNumBatches = batchGroups.size();
-              stats.setLongStat(Metric.PEAK_BATCHES_IN_MEMORY, peakNumBatches);
-            }
-
-            batchesSinceLastSpill++;
-            if (// If we haven't spilled so far, do we have enough memory for 
MSorter if this turns out to be the last incoming batch?
-                (spillCount == 0 && !hasMemoryForInMemorySort(totalCount)) ||
-                // If we haven't spilled so far, make sure we don't exceed the 
maximum number of batches SV4 can address
-                (spillCount == 0 && totalBatches > Character.MAX_VALUE) ||
-                // TODO(DRILL-4438) - consider setting this threshold more 
intelligently,
-                // lowering caused a failing low memory condition (test in 
BasicPhysicalOpUnitTest)
-                // to complete successfully (although it caused perf decrease 
as there was more spilling)
-
-                // current memory used is more than 95% of memory usage limit 
of this operator
-                (oAllocator.getAllocatedMemory() > .95 * 
oAllocator.getLimit()) ||
-                // Number of incoming batches (BatchGroups) exceed the limit 
and number of incoming batches accumulated
-                // since the last spill exceed the defined limit
-                (batchGroups.size() > SPILL_THRESHOLD && batchesSinceLastSpill 
>= SPILL_BATCH_GROUP_SIZE)) {
-
-              if (firstSpillBatchCount == 0) {
-                firstSpillBatchCount = batchGroups.size();
-              }
-
-              if (spilledBatchGroups.size() > firstSpillBatchCount / 2) {
-                logger.info("Merging spills");
-                final BatchGroup merged = mergeAndSpill(spilledBatchGroups);
-                if (merged != null) {
-                  spilledBatchGroups.addFirst(merged);
-                }
-              }
-              final BatchGroup merged = mergeAndSpill(batchGroups);
-              if (merged != null) { // make sure we don't add null to 
spilledBatchGroups
-                spilledBatchGroups.add(merged);
-                batchesSinceLastSpill = 0;
-              }
-            }
-            success = true;
-          } finally {
-            if (!success) {
-              rbd.clear();
-            }
-          }
-          break;
-        case OUT_OF_MEMORY:
-          logger.debug("received OUT_OF_MEMORY, trying to spill");
-          if (batchesSinceLastSpill > 2) {
-            final BatchGroup merged = mergeAndSpill(batchGroups);
-            if (merged != null) {
-              spilledBatchGroups.add(merged);
-              batchesSinceLastSpill = 0;
-            }
-          } else {
-            logger.debug("not enough batches to spill, sending OUT_OF_MEMORY 
downstream");
-            return IterOutcome.OUT_OF_MEMORY;
-          }
-          break;
-        default:
-          throw new UnsupportedOperationException();
-        }
-      }
+  /**
+   * Load the results and sort them. May bail out early if an exceptional
+   * condition is passed up from the input batch.
+   *
+   * @return return code: OK_NEW_SCHEMA if rows were sorted,
+   * NONE if no rows
+   */
 
-      if (totalCount == 0) {
-        return IterOutcome.NONE;
-      }
-      if (spillCount == 0) {
+  private IterOutcome load() {
+    logger.trace("Start of load phase");
 
-        if (builder != null) {
-          builder.clear();
-          builder.close();
-        }
-        builder = new SortRecordBatchBuilder(oAllocator);
+    // Don't clear the temporary container created by buildSchema() after each 
load since across EMIT outcome we have
+    // to maintain the ValueVector references for downstream operators
 
-        for (BatchGroup group : batchGroups) {
-          RecordBatchData rbd = new RecordBatchData(group.getContainer(), 
oAllocator);
-          rbd.setSv2(group.getSv2());
-          builder.add(rbd);
-        }
+    // Loop over all input batches
 
-        builder.build(container);
-        sv4 = builder.getSv4();
-        mSorter = createNewMSorter();
-        mSorter.setup(context, oAllocator, getSelectionVector4(), 
this.container);
+    IterOutcome result = OK;
+    for (;;) {
+      result = loadBatch();
 
-        // For testing memory-leak purpose, inject exception after mSorter 
finishes setup
-        injector.injectUnchecked(context.getExecutionControls(), 
INTERRUPTION_AFTER_SETUP);
-        mSorter.sort(this.container);
+      // NONE/EMIT means all batches have been read at this record boundary
+      if (result == NONE || result == EMIT) {
+        break; }
 
-        // sort may have prematurely exited due to should continue returning 
false.
-        if (!context.getExecutorState().shouldContinue()) {
-          return IterOutcome.STOP;
-        }
+      // if result is STOP that means something went wrong.
 
-        // For testing memory-leak purpose, inject exception after mSorter 
finishes sorting
-        injector.injectUnchecked(context.getExecutionControls(), 
INTERRUPTION_AFTER_SORT);
-        sv4 = mSorter.getSV4();
+      if (result == STOP) {
+        return result; }
+    }
 
-        container.buildSchema(SelectionVectorMode.FOUR_BYTE);
-      } else { // some batches were spilled
-        final BatchGroup merged = mergeAndSpill(batchGroups);
-        if (merged != null) {
-          spilledBatchGroups.add(merged);
-        }
-        batchGroups.addAll(spilledBatchGroups);
-        spilledBatchGroups = null; // no need to cleanup spilledBatchGroups, 
all it's batches are in batchGroups now
-
-        logger.warn("Starting to merge. {} batch groups. Current allocated 
memory: {}", batchGroups.size(), oAllocator.getAllocatedMemory());
-        VectorContainer hyperBatch = constructHyperBatch(batchGroups);
-        createCopier(hyperBatch, batchGroups, container, false);
-
-        int estimatedRecordSize = 0;
-        for (VectorWrapper<?> w : batchGroups.get(0)) {
-          try {
-            estimatedRecordSize += TypeHelper.getSize(w.getField().getType());
-          } catch (UnsupportedOperationException e) {
-            estimatedRecordSize += 50;
-          }
-        }
-        targetRecordCount = Math.min(MAX_BATCH_ROW_COUNT, Math.max(1, 
COPIER_BATCH_MEM_LIMIT / estimatedRecordSize));
-        int count = copier.next(targetRecordCount);
-        container.buildSchema(SelectionVectorMode.NONE);
-        container.setRecordCount(count);
+    // Anything to actually sort?
+    resultsIterator = sortImpl.startMerge();
+    if (! resultsIterator.next()) {
+      // If there is no records to sort and we got NONE then just return NONE
+      if (result == NONE) {
+        sortState = SortState.DONE;
+        return NONE;
       }
-
-      return IterOutcome.OK_NEW_SCHEMA;
-
-    } catch (SchemaChangeException ex) {
-      kill(false);
-      context.getExecutorState().fail(UserException.unsupportedError(ex)
-        .message("Sort doesn't currently support sorts with changing 
schemas").build(logger));
-      return IterOutcome.STOP;
-    } catch(ClassTransformationException | IOException ex) {
-      kill(false);
-      context.getExecutorState().fail(ex);
-      return IterOutcome.STOP;
-    } catch (UnsupportedOperationException e) {
-      throw new RuntimeException(e);
     }
-  }
-
-  private boolean hasMemoryForInMemorySort(int currentRecordCount) {
-    long currentlyAvailable =  popConfig.getMaxAllocation() - 
oAllocator.getAllocatedMemory();
 
-    long neededForInMemorySort = 
SortRecordBatchBuilder.memoryNeeded(currentRecordCount) +
-        MSortTemplate.memoryNeeded(currentRecordCount);
-
-    return currentlyAvailable > neededForInMemorySort;
-  }
+    // sort may have prematurely exited due to shouldContinue() returning 
false.
 
-  public BatchGroup mergeAndSpill(LinkedList<BatchGroup> batchGroups) throws 
SchemaChangeException {
-    logger.debug("Copier allocator current allocation {}", 
copierAllocator.getAllocatedMemory());
-    logger.debug("mergeAndSpill: starting total size in memory = {}", 
oAllocator.getAllocatedMemory());
-    VectorContainer outputContainer = new VectorContainer();
-    List<BatchGroup> batchGroupList = Lists.newArrayList();
-    int batchCount = batchGroups.size();
-    for (int i = 0; i < batchCount / 2; i++) {
-      if (batchGroups.size() == 0) {
-        break;
-      }
-      BatchGroup batch = batchGroups.pollLast();
-      assert batch != null : "Encountered a null batch during merge and spill 
operation";
-      batchGroupList.add(batch);
+    if (!context.getExecutorState().shouldContinue()) {
+      sortState = SortState.DONE;
+      return STOP;
     }
 
-    if (batchGroupList.size() == 0) {
-      return null;
-    }
-    int estimatedRecordSize = 0;
-    for (VectorWrapper<?> w : batchGroupList.get(0)) {
-      try {
-        estimatedRecordSize += TypeHelper.getSize(w.getField().getType());
-      } catch (UnsupportedOperationException e) {
-        estimatedRecordSize += 50;
-      }
-    }
-    int targetRecordCount = Math.max(1, COPIER_BATCH_MEM_LIMIT / 
estimatedRecordSize);
-    VectorContainer hyperBatch = constructHyperBatch(batchGroupList);
-    createCopier(hyperBatch, batchGroupList, outputContainer, true);
+    // If we are here that means there is some data to be returned downstream.
+    // We have to prepare output container
+    prepareOutputContainer(resultsIterator);
+    return getFinalOutcome();
+  }
 
-    int count = copier.next(targetRecordCount);
-    assert count > 0;
+  /**
+   * Load and process a single batch, handling schema changes. In general, the
+   * external sort accepts only one schema.
+   *
+   * @return return code depending on the amount of data read from upstream
+   */
 
-    logger.debug("mergeAndSpill: estimated record size = {}, target record 
count = {}", estimatedRecordSize, targetRecordCount);
+  private IterOutcome loadBatch() {
 
-    // 1 output container is kept in memory, so we want to hold on to it and 
transferClone
-    // allows keeping ownership
-    VectorContainer c1 = VectorContainer.getTransferClone(outputContainer, 
oContext);
-    c1.buildSchema(BatchSchema.SelectionVectorMode.NONE);
-    c1.setRecordCount(count);
+    // If this is the very first batch, then AbstractRecordBatch
+    // already loaded it for us in buildSchema().
 
-    String spillDir = dirs.next();
-    Path currSpillPath = new Path(Joiner.on("/").join(spillDir, fileName));
-    currSpillDirs.add(currSpillPath);
-    String outputFile = Joiner.on("/").join(currSpillPath, spillCount++);
-    try {
-        fs.deleteOnExit(currSpillPath);
-    } catch (IOException e) {
-        // since this is meant to be used in a batches's spilling, we don't 
propagate the exception
-        logger.warn("Unable to mark spill directory " + currSpillPath + " for 
deleting on exit", e);
+    if (sortState == SortState.START) {
+      sortState = SortState.LOAD;
+      lastKnownOutcome = OK_NEW_SCHEMA;
+    } else {
+      lastKnownOutcome = next(incoming);
     }
 
 Review comment:
   ```java
       if (sortState == SortState.START) {
         // first batch is preloaded by AbstractRecordBatch.buildSchema()
         sortState = SortState.LOAD;
         lastKnownOutcome = OK_NEW_SCHEMA;
       } else {
         lastKnownOutcome = next(incoming);
       }
   
   ```
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove old "unmanaged" sort implementation
> ------------------------------------------
>
>                 Key: DRILL-6832
>                 URL: https://issues.apache.org/jira/browse/DRILL-6832
>             Project: Apache Drill
>          Issue Type: Improvement
>    Affects Versions: 1.14.0
>            Reporter: Paul Rogers
>            Assignee: Paul Rogers
>            Priority: Minor
>
> Several releases back Drill introduced a new "managed" external sort that 
> enhanced the sort operator's memory management. To be safe, at the time, the 
> new version was controlled by an option, with the ability to revert to the 
> old version.
> The new version has proven to be stable. The time has come to remove the old 
> version.
> * Remove the implementation in {{physical.impl.xsort}}.
> * Move the implementation from {{physical.impl.xsort.managed}} to the parent 
> package.
> * Remove the conditional code in the batch creator.
> * Remove the option that allowed disabling the new version.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to