[ 
https://issues.apache.org/jira/browse/DRILL-6832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17001133#comment-17001133
 ] 

ASF GitHub Bot commented on DRILL-6832:
---------------------------------------

ihuzenko commented on pull request #1929: DRILL-6832: Remove the old 
"unmanaged" external sort
URL: https://github.com/apache/drill/pull/1929#discussion_r360501525
 
 

 ##########
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
 ##########
 @@ -286,540 +291,419 @@ public void buildSchema() throws SchemaChangeException 
{
         state = BatchState.DONE;
         break;
       default:
-        break;
+        throw new IllegalStateException("Unexpected iter outcome: " + outcome);
     }
   }
 
+  /**
+   * Process each request for a batch. The first request retrieves
+   * all the incoming batches and sorts them, optionally spilling to
+   * disk as needed. Subsequent calls retrieve the sorted results in
+   * fixed-size batches.
+   */
+
   @Override
   public IterOutcome innerNext() {
-    if (schema != null) {
-      if (spillCount == 0) {
-        return (getSelectionVector4().next()) ? IterOutcome.OK : 
IterOutcome.NONE;
-      } else {
-        Stopwatch w = Stopwatch.createStarted();
-        int count = copier.next(targetRecordCount);
-        if (count > 0) {
-          long t = w.elapsed(TimeUnit.MICROSECONDS);
-          logger.debug("Took {} us to merge {} records", t, count);
-          container.setRecordCount(count);
-          return IterOutcome.OK;
-        } else {
-          logger.debug("copier returned 0 records");
-          return IterOutcome.NONE;
-        }
+    switch (sortState) {
+    case DONE:
+      return NONE;
+    case START:
+      return load();
+    case LOAD:
+      if (!this.retainInMemoryBatchesOnNone) {
+        resetSortState();
       }
+      return (sortState == SortState.DONE) ? NONE : load();
+    case DELIVER:
+      return nextOutputBatch();
+    default:
+      throw new IllegalStateException("Unexpected sort state: " + sortState);
     }
+  }
 
-    int totalCount = 0;
-    int totalBatches = 0; // total number of batches received so far
+  private IterOutcome nextOutputBatch() {
+    // Call next on outputSV4 for it's state to progress in parallel to 
resultsIterator state
+    outputSV4.next();
 
-    try{
-      container.clear();
-      outer: while (true) {
-        IterOutcome upstream;
-        if (first) {
-          upstream = IterOutcome.OK_NEW_SCHEMA;
-        } else {
-          upstream = next(incoming);
-        }
-        if (upstream == IterOutcome.OK && sorter == null) {
-          upstream = IterOutcome.OK_NEW_SCHEMA;
-        }
-        switch (upstream) {
-        case NONE:
-          if (first) {
-            return upstream;
-          }
-          break outer;
-        case NOT_YET:
-          throw new UnsupportedOperationException();
-        case STOP:
-          return upstream;
-        case OK_NEW_SCHEMA:
-        case OK:
-          VectorContainer convertedBatch;
-          // only change in the case that the schema truly changes.  
Artificial schema changes are ignored.
-          if (upstream == IterOutcome.OK_NEW_SCHEMA && 
!incoming.getSchema().equals(schema)) {
-            if (schema != null) {
-              if (unionTypeEnabled) {
-                this.schema = SchemaUtil.mergeSchemas(schema, 
incoming.getSchema());
-              } else {
-                throw new SchemaChangeException("Schema changes not supported 
in External Sort. Please enable Union type");
-              }
-            } else {
-              schema = incoming.getSchema();
-            }
-            convertedBatch = SchemaUtil.coerceContainer(incoming, schema, 
oContext);
-            for (BatchGroup b : batchGroups) {
-              b.setSchema(schema);
-            }
-            for (BatchGroup b : spilledBatchGroups) {
-              b.setSchema(schema);
-            }
-            this.sorter = createNewSorter(context, convertedBatch);
-          } else {
-            convertedBatch = SchemaUtil.coerceContainer(incoming, schema, 
oContext);
-          }
-          if (first) {
-            first = false;
-          }
-          if (convertedBatch.getRecordCount() == 0) {
-            for (VectorWrapper<?> w : convertedBatch) {
-              w.clear();
-            }
-            break;
-          }
-          SelectionVector2 sv2;
-          if (incoming.getSchema().getSelectionVectorMode() == 
BatchSchema.SelectionVectorMode.TWO_BYTE) {
-            sv2 = incoming.getSelectionVector2().clone();
-          } else {
-            try {
-              sv2 = newSV2();
-            } catch(InterruptedException e) {
-              return IterOutcome.STOP;
-            } catch (OutOfMemoryException e) {
-              throw new OutOfMemoryException(e);
-            }
-          }
+    // But if results iterator next returns true that means it has more 
results to pass
+    if (resultsIterator.next()) {
+      container.setRecordCount(getRecordCount());
+      injector.injectUnchecked(context.getExecutionControls(), 
INTERRUPTION_WHILE_MERGING);
+    }
+    // getFinalOutcome will take care of returning correct IterOutcome when 
there is no data to pass and for
+    // EMIT/NONE scenarios
+    return getFinalOutcome();
+  }
 
-          int count = sv2.getCount();
-          totalCount += count;
-          totalBatches++;
-          sorter.setup(context, sv2, convertedBatch);
-          sorter.sort(sv2);
-          RecordBatchData rbd = new RecordBatchData(convertedBatch, 
oAllocator);
-          boolean success = false;
-          try {
-            rbd.setSv2(sv2);
-            batchGroups.add(new BatchGroup(rbd.getContainer(), rbd.getSv2(), 
oContext));
-            if (peakNumBatches < batchGroups.size()) {
-              peakNumBatches = batchGroups.size();
-              stats.setLongStat(Metric.PEAK_BATCHES_IN_MEMORY, peakNumBatches);
-            }
-
-            batchesSinceLastSpill++;
-            if (// If we haven't spilled so far, do we have enough memory for 
MSorter if this turns out to be the last incoming batch?
-                (spillCount == 0 && !hasMemoryForInMemorySort(totalCount)) ||
-                // If we haven't spilled so far, make sure we don't exceed the 
maximum number of batches SV4 can address
-                (spillCount == 0 && totalBatches > Character.MAX_VALUE) ||
-                // TODO(DRILL-4438) - consider setting this threshold more 
intelligently,
-                // lowering caused a failing low memory condition (test in 
BasicPhysicalOpUnitTest)
-                // to complete successfully (although it caused perf decrease 
as there was more spilling)
-
-                // current memory used is more than 95% of memory usage limit 
of this operator
-                (oAllocator.getAllocatedMemory() > .95 * 
oAllocator.getLimit()) ||
-                // Number of incoming batches (BatchGroups) exceed the limit 
and number of incoming batches accumulated
-                // since the last spill exceed the defined limit
-                (batchGroups.size() > SPILL_THRESHOLD && batchesSinceLastSpill 
>= SPILL_BATCH_GROUP_SIZE)) {
-
-              if (firstSpillBatchCount == 0) {
-                firstSpillBatchCount = batchGroups.size();
-              }
-
-              if (spilledBatchGroups.size() > firstSpillBatchCount / 2) {
-                logger.info("Merging spills");
-                final BatchGroup merged = mergeAndSpill(spilledBatchGroups);
-                if (merged != null) {
-                  spilledBatchGroups.addFirst(merged);
-                }
-              }
-              final BatchGroup merged = mergeAndSpill(batchGroups);
-              if (merged != null) { // make sure we don't add null to 
spilledBatchGroups
-                spilledBatchGroups.add(merged);
-                batchesSinceLastSpill = 0;
-              }
-            }
-            success = true;
-          } finally {
-            if (!success) {
-              rbd.clear();
-            }
-          }
-          break;
-        case OUT_OF_MEMORY:
-          logger.debug("received OUT_OF_MEMORY, trying to spill");
-          if (batchesSinceLastSpill > 2) {
-            final BatchGroup merged = mergeAndSpill(batchGroups);
-            if (merged != null) {
-              spilledBatchGroups.add(merged);
-              batchesSinceLastSpill = 0;
-            }
-          } else {
-            logger.debug("not enough batches to spill, sending OUT_OF_MEMORY 
downstream");
-            return IterOutcome.OUT_OF_MEMORY;
-          }
-          break;
-        default:
-          throw new UnsupportedOperationException();
-        }
-      }
+  /**
+   * Load the results and sort them. May bail out early if an exceptional
+   * condition is passed up from the input batch.
+   *
+   * @return return code: OK_NEW_SCHEMA if rows were sorted,
+   * NONE if no rows
+   */
 
-      if (totalCount == 0) {
-        return IterOutcome.NONE;
-      }
-      if (spillCount == 0) {
+  private IterOutcome load() {
+    logger.trace("Start of load phase");
 
-        if (builder != null) {
-          builder.clear();
-          builder.close();
-        }
-        builder = new SortRecordBatchBuilder(oAllocator);
+    // Don't clear the temporary container created by buildSchema() after each 
load since across EMIT outcome we have
+    // to maintain the ValueVector references for downstream operators
 
-        for (BatchGroup group : batchGroups) {
-          RecordBatchData rbd = new RecordBatchData(group.getContainer(), 
oAllocator);
-          rbd.setSv2(group.getSv2());
-          builder.add(rbd);
-        }
+    // Loop over all input batches
 
-        builder.build(container);
-        sv4 = builder.getSv4();
-        mSorter = createNewMSorter();
-        mSorter.setup(context, oAllocator, getSelectionVector4(), 
this.container);
+    IterOutcome result = OK;
+    for (;;) {
+      result = loadBatch();
 
-        // For testing memory-leak purpose, inject exception after mSorter 
finishes setup
-        injector.injectUnchecked(context.getExecutionControls(), 
INTERRUPTION_AFTER_SETUP);
-        mSorter.sort(this.container);
+      // NONE/EMIT means all batches have been read at this record boundary
+      if (result == NONE || result == EMIT) {
+        break; }
 
-        // sort may have prematurely exited due to should continue returning 
false.
-        if (!context.getExecutorState().shouldContinue()) {
-          return IterOutcome.STOP;
-        }
+      // if result is STOP that means something went wrong.
 
-        // For testing memory-leak purpose, inject exception after mSorter 
finishes sorting
-        injector.injectUnchecked(context.getExecutionControls(), 
INTERRUPTION_AFTER_SORT);
-        sv4 = mSorter.getSV4();
+      if (result == STOP) {
+        return result; }
+    }
 
-        container.buildSchema(SelectionVectorMode.FOUR_BYTE);
-      } else { // some batches were spilled
-        final BatchGroup merged = mergeAndSpill(batchGroups);
-        if (merged != null) {
-          spilledBatchGroups.add(merged);
-        }
-        batchGroups.addAll(spilledBatchGroups);
-        spilledBatchGroups = null; // no need to cleanup spilledBatchGroups, 
all it's batches are in batchGroups now
-
-        logger.warn("Starting to merge. {} batch groups. Current allocated 
memory: {}", batchGroups.size(), oAllocator.getAllocatedMemory());
-        VectorContainer hyperBatch = constructHyperBatch(batchGroups);
-        createCopier(hyperBatch, batchGroups, container, false);
-
-        int estimatedRecordSize = 0;
-        for (VectorWrapper<?> w : batchGroups.get(0)) {
-          try {
-            estimatedRecordSize += TypeHelper.getSize(w.getField().getType());
-          } catch (UnsupportedOperationException e) {
-            estimatedRecordSize += 50;
-          }
-        }
-        targetRecordCount = Math.min(MAX_BATCH_ROW_COUNT, Math.max(1, 
COPIER_BATCH_MEM_LIMIT / estimatedRecordSize));
-        int count = copier.next(targetRecordCount);
-        container.buildSchema(SelectionVectorMode.NONE);
-        container.setRecordCount(count);
+    // Anything to actually sort?
+    resultsIterator = sortImpl.startMerge();
+    if (! resultsIterator.next()) {
+      // If there is no records to sort and we got NONE then just return NONE
+      if (result == NONE) {
+        sortState = SortState.DONE;
+        return NONE;
       }
-
-      return IterOutcome.OK_NEW_SCHEMA;
-
-    } catch (SchemaChangeException ex) {
-      kill(false);
-      context.getExecutorState().fail(UserException.unsupportedError(ex)
-        .message("Sort doesn't currently support sorts with changing 
schemas").build(logger));
-      return IterOutcome.STOP;
-    } catch(ClassTransformationException | IOException ex) {
-      kill(false);
-      context.getExecutorState().fail(ex);
-      return IterOutcome.STOP;
-    } catch (UnsupportedOperationException e) {
-      throw new RuntimeException(e);
     }
-  }
-
-  private boolean hasMemoryForInMemorySort(int currentRecordCount) {
-    long currentlyAvailable =  popConfig.getMaxAllocation() - 
oAllocator.getAllocatedMemory();
 
-    long neededForInMemorySort = 
SortRecordBatchBuilder.memoryNeeded(currentRecordCount) +
-        MSortTemplate.memoryNeeded(currentRecordCount);
-
-    return currentlyAvailable > neededForInMemorySort;
-  }
+    // sort may have prematurely exited due to shouldContinue() returning 
false.
 
-  public BatchGroup mergeAndSpill(LinkedList<BatchGroup> batchGroups) throws 
SchemaChangeException {
-    logger.debug("Copier allocator current allocation {}", 
copierAllocator.getAllocatedMemory());
-    logger.debug("mergeAndSpill: starting total size in memory = {}", 
oAllocator.getAllocatedMemory());
-    VectorContainer outputContainer = new VectorContainer();
-    List<BatchGroup> batchGroupList = Lists.newArrayList();
-    int batchCount = batchGroups.size();
-    for (int i = 0; i < batchCount / 2; i++) {
-      if (batchGroups.size() == 0) {
-        break;
-      }
-      BatchGroup batch = batchGroups.pollLast();
-      assert batch != null : "Encountered a null batch during merge and spill 
operation";
-      batchGroupList.add(batch);
+    if (!context.getExecutorState().shouldContinue()) {
+      sortState = SortState.DONE;
+      return STOP;
     }
 
-    if (batchGroupList.size() == 0) {
-      return null;
-    }
-    int estimatedRecordSize = 0;
-    for (VectorWrapper<?> w : batchGroupList.get(0)) {
-      try {
-        estimatedRecordSize += TypeHelper.getSize(w.getField().getType());
-      } catch (UnsupportedOperationException e) {
-        estimatedRecordSize += 50;
-      }
-    }
-    int targetRecordCount = Math.max(1, COPIER_BATCH_MEM_LIMIT / 
estimatedRecordSize);
-    VectorContainer hyperBatch = constructHyperBatch(batchGroupList);
-    createCopier(hyperBatch, batchGroupList, outputContainer, true);
+    // If we are here that means there is some data to be returned downstream.
+    // We have to prepare output container
+    prepareOutputContainer(resultsIterator);
+    return getFinalOutcome();
+  }
 
-    int count = copier.next(targetRecordCount);
-    assert count > 0;
+  /**
+   * Load and process a single batch, handling schema changes. In general, the
+   * external sort accepts only one schema.
+   *
+   * @return return code depending on the amount of data read from upstream
+   */
 
-    logger.debug("mergeAndSpill: estimated record size = {}, target record 
count = {}", estimatedRecordSize, targetRecordCount);
+  private IterOutcome loadBatch() {
 
-    // 1 output container is kept in memory, so we want to hold on to it and 
transferClone
-    // allows keeping ownership
-    VectorContainer c1 = VectorContainer.getTransferClone(outputContainer, 
oContext);
-    c1.buildSchema(BatchSchema.SelectionVectorMode.NONE);
-    c1.setRecordCount(count);
+    // If this is the very first batch, then AbstractRecordBatch
+    // already loaded it for us in buildSchema().
 
-    String spillDir = dirs.next();
-    Path currSpillPath = new Path(Joiner.on("/").join(spillDir, fileName));
-    currSpillDirs.add(currSpillPath);
-    String outputFile = Joiner.on("/").join(currSpillPath, spillCount++);
-    try {
-        fs.deleteOnExit(currSpillPath);
-    } catch (IOException e) {
-        // since this is meant to be used in a batches's spilling, we don't 
propagate the exception
-        logger.warn("Unable to mark spill directory " + currSpillPath + " for 
deleting on exit", e);
+    if (sortState == SortState.START) {
+      sortState = SortState.LOAD;
+      lastKnownOutcome = OK_NEW_SCHEMA;
+    } else {
+      lastKnownOutcome = next(incoming);
     }
-    stats.setLongStat(Metric.SPILL_COUNT, spillCount);
-    BatchGroup newGroup = new BatchGroup(c1, fs, outputFile, oContext);
-    try (AutoCloseable a = AutoCloseables.all(batchGroupList)) {
-      logger.info("Merging and spilling to {}", outputFile);
-      while ((count = copier.next(targetRecordCount)) > 0) {
-        outputContainer.buildSchema(BatchSchema.SelectionVectorMode.NONE);
-        outputContainer.setRecordCount(count);
-        // note that addBatch also clears the outputContainer
-        newGroup.addBatch(outputContainer);
+    switch (lastKnownOutcome) {
+    case NONE:
+    case STOP:
+      return lastKnownOutcome;
+    case OK_NEW_SCHEMA:
+      firstBatchOfSchema = true;
+      setupSchema();
+      // Fall through
+
+    case OK:
+    case EMIT:
+
+      // Add the batch to the in-memory generation, spilling if
+      // needed.
+
+      sortImpl.addBatch(incoming);
+      break;
+    case OUT_OF_MEMORY:
+
+      // Note: it is highly doubtful that this code actually works. It
+      // requires that the upstream batches got to a safe place to run
+      // out of memory and that no work was in-flight and thus abandoned.
+      // Consider removing this case once resource management is in place.
+
+      logger.error("received OUT_OF_MEMORY, trying to spill");
+      if (! sortImpl.forceSpill()) {
+        throw UserException.memoryError("Received OUT_OF_MEMORY, but not 
enough batches to spill")
+          .build(logger);
       }
-      injector.injectChecked(context.getExecutionControls(), 
INTERRUPTION_WHILE_SPILLING, IOException.class);
-      newGroup.closeOutputStream();
-    } catch (Throwable e) {
-      // we only need to cleanup newGroup if spill failed
-      try {
-        AutoCloseables.close(e, newGroup);
-      } catch (Throwable t) { /* close() may hit the same IO issue; just 
ignore */ }
-      throw UserException.resourceError(e)
-        .message("External Sort encountered an error while spilling to disk")
-              .addContext(e.getMessage() /* more detail */)
-        .build(logger);
-    } finally {
-      hyperBatch.clear();
+      break;
+    default:
+      throw new IllegalStateException("Unexpected iter outcome: " + 
lastKnownOutcome);
     }
-    logger.debug("mergeAndSpill: final total size in memory = {}", 
oAllocator.getAllocatedMemory());
-    logger.info("Completed spilling to {}", outputFile);
-    return newGroup;
+    return lastKnownOutcome;
   }
 
-  private SelectionVector2 newSV2() throws OutOfMemoryException, 
InterruptedException {
-    @SuppressWarnings("resource")
-    SelectionVector2 sv2 = new SelectionVector2(oAllocator);
-    if (!sv2.allocateNewSafe(incoming.getRecordCount())) {
-      try {
-        final BatchGroup merged = mergeAndSpill(batchGroups);
-        if (merged != null) {
-          spilledBatchGroups.add(merged);
-        } else {
-          throw UserException.memoryError("Unable to allocate sv2 for %d 
records, and not enough batchGroups to spill.",
-              incoming.getRecordCount())
-            .addContext("batchGroups.size", batchGroups.size())
-            .addContext("spilledBatchGroups.size", spilledBatchGroups.size())
-            .addContext("allocated memory", oAllocator.getAllocatedMemory())
-            .addContext("allocator limit", oAllocator.getLimit())
+  /**
+   * Handle a new schema from upstream. The ESB is quite limited in its ability
+   * to handle schema changes.
+   */
+
+  private void setupSchema()  {
+
+    // First batch: we won't have a schema.
+
+    if (schema == null) {
+      schema = incoming.getSchema();
+    } else if (incoming.getSchema().equals(schema)) {
+      // Nothing to do.  Artificial schema changes are ignored.
+    } else if (unionTypeEnabled) {
+      schema = SchemaUtil.mergeSchemas(schema, incoming.getSchema());
+    } else {
+      throw UserException.unsupportedError()
+            .message("Schema changes not supported in External Sort. Please 
enable Union type.")
+            .addContext("Previous schema", schema.toString())
+            .addContext("Incoming schema", incoming.getSchema().toString())
             .build(logger);
-        }
-      } catch (SchemaChangeException e) {
-        throw new RuntimeException(e);
-      }
-      int waitTime = 1;
-      while (true) {
-        try {
-          Thread.sleep(waitTime * 1000);
-        } catch(final InterruptedException e) {
-          if (!context.getExecutorState().shouldContinue()) {
-            throw e;
-          }
-        }
-        waitTime *= 2;
-        if (sv2.allocateNewSafe(incoming.getRecordCount())) {
-          break;
-        }
-        if (waitTime >= 32) {
-          throw new OutOfMemoryException("Unable to allocate sv2 buffer after 
repeated attempts");
-        }
-      }
     }
-    for (int i = 0; i < incoming.getRecordCount(); i++) {
-      sv2.setIndex(i, (char) i);
-    }
-    sv2.setRecordCount(incoming.getRecordCount());
-    return sv2;
+    sortImpl.setSchema(schema);
   }
 
-  private VectorContainer constructHyperBatch(List<BatchGroup> batchGroupList) 
{
-    VectorContainer cont = new VectorContainer();
-    for (MaterializedField field : schema) {
-      ValueVector[] vectors = new ValueVector[batchGroupList.size()];
-      int i = 0;
-      for (BatchGroup group : batchGroupList) {
-        vectors[i++] = group.getValueAccessorById(
-            field.getValueClass(),
-            
group.getValueVectorId(SchemaPath.getSimplePath(field.getName())).getFieldIds())
-            .getValueVector();
-      }
-      cont.add(vectors);
-    }
-    cont.buildSchema(BatchSchema.SelectionVectorMode.FOUR_BYTE);
-    return cont;
+  @Override
+  public WritableBatch getWritableBatch() {
+    throw new UnsupportedOperationException("A sort batch is not writable.");
   }
 
-  private MSorter createNewMSorter() throws ClassTransformationException, 
IOException, SchemaChangeException {
-    return createNewMSorter(context, this.popConfig.getOrderings(), this, 
MAIN_MAPPING, LEFT_MAPPING, RIGHT_MAPPING);
+  @Override
+  protected void killIncoming(boolean sendUpstream) {
+    incoming.kill(sendUpstream);
   }
 
-  private MSorter createNewMSorter(FragmentContext context, List<Ordering> 
orderings, VectorAccessible batch, MappingSet mainMapping, MappingSet 
leftMapping, MappingSet
-    rightMapping)
-          throws ClassTransformationException, IOException, 
SchemaChangeException {
-    CodeGenerator<MSorter> cg = CodeGenerator.get(MSorter.TEMPLATE_DEFINITION, 
context.getOptions());
-    ClassGenerator<MSorter> g = cg.getRoot();
-    g.setMappingSet(mainMapping);
-
-    for (Ordering od : orderings) {
-      // first, we rewrite the evaluation stack for each side of the 
comparison.
-      ErrorCollector collector = new ErrorCollectorImpl();
-      final LogicalExpression expr = 
ExpressionTreeMaterializer.materialize(od.getExpr(), batch, collector, 
context.getFunctionRegistry());
-      if (collector.hasErrors()) {
-        throw new SchemaChangeException("Failure while materializing 
expression. " + collector.toErrorString());
+  /**
+   * Extreme paranoia to avoid leaving resources unclosed in the case
+   * of an error. Since generally only the first error is of interest,
+   * we track only the first exception, not potential cascading downstream
+   * exceptions.
+   * <p>
+   * Some Drill code ends up calling close() two or more times. The code
+   * here protects itself from these undesirable semantics.
+   * </p>
+   */
+
+  @Override
+  public void close() {
+
+    // Sanity check: if close is called twice, just ignore
+    // the second call.
+
+    if (sortImpl == null) {
+      return;
+    }
+
+    RuntimeException ex = null;
+
+    // If we got far enough to have a results iterator, close
+    // that first.
+
+    try {
+      if (resultsIterator != null) {
+        resultsIterator.close();
+        resultsIterator = null;
       }
-      g.setMappingSet(leftMapping);
-      HoldingContainer left = g.addExpr(expr, 
ClassGenerator.BlkCreateMode.FALSE);
-      g.setMappingSet(rightMapping);
-      HoldingContainer right = g.addExpr(expr, 
ClassGenerator.BlkCreateMode.FALSE);
-      g.setMappingSet(mainMapping);
-
-      // next we wrap the two comparison sides and add the expression block 
for the comparison.
-      LogicalExpression fh =
-          FunctionGenerationHelper.getOrderingComparator(od.nullsSortHigh(), 
left, right,
-                                                         
context.getFunctionRegistry());
-      HoldingContainer out = g.addExpr(fh, ClassGenerator.BlkCreateMode.FALSE);
-      JConditional jc = g.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0)));
-
-      if (od.getDirection() == Direction.ASCENDING) {
-        jc._then()._return(out.getValue());
-      }else{
-        jc._then()._return(out.getValue().minus());
+    } catch (RuntimeException e) {
+      ex = e;
+    }
+
+    // Then close the "guts" of the sort operation.
+    try {
+      if (sortImpl != null) {
+        sortImpl.close();
+        sortImpl = null;
       }
-      g.rotateBlock();
+    } catch (RuntimeException e) {
+      ex = (ex == null) ? e : ex;
     }
 
-    g.rotateBlock();
-    g.getEvalBlock()._return(JExpr.lit(0));
+    // The call to super.close() clears out the output container.
+    // Doing so requires the allocator here, so it must be closed
+    // (when closing the operator context) after the super call.
 
-    cg.plainJavaCapable(true); // This class can generate plain-old Java.
-    // Uncomment out this line to debug the generated code.
-//    cg.saveCodeForDebugging(true);
-    return context.getImplementationClass(cg);
-  }
+    try {
+      outputWrapperContainer.clear();
+      outputSV4.clear();
+      super.close();
+    } catch (RuntimeException e) {
+      ex = (ex == null) ? e : ex;
+    }
 
-  public SingleBatchSorter createNewSorter(FragmentContext context, 
VectorAccessible batch)
-          throws ClassTransformationException, IOException, 
SchemaChangeException {
-    CodeGenerator<SingleBatchSorter> cg = 
CodeGenerator.get(SingleBatchSorter.TEMPLATE_DEFINITION, context.getOptions());
-    cg.plainJavaCapable(true); // This class can generate plain-old Java.
+    // Finally close the operator context (which closes the
+    // child allocator.)
 
-    // Uncomment out this line to debug the generated code.
-//    cg.saveCodeForDebugging(true);
-    generateComparisons(cg.getRoot(), batch);
-    return context.getImplementationClass(cg);
+    try {
+      oContext.close();
+    } catch (RuntimeException e) {
+      ex = ex == null ? e : ex;
+    }
+    if (ex != null) {
+      throw ex;
+    }
   }
 
-  private void generateComparisons(ClassGenerator<?> g, VectorAccessible 
batch) throws SchemaChangeException {
-    g.setMappingSet(MAIN_MAPPING);
+  /**
+   * Workaround for DRILL-5656. We wish to release the batches for an
+   * in-memory sort once data is delivered. Normally, we can release them
+   * just before returning NONE. But, the StreamingAggBatch requires that
+   * the batches still be present on NONE. This method "sniffs" the input
+   * provided, and if the external sort, sets a mode that retains the
+   * batches. Yes, it is a horrible hack. But, necessary until the Drill
+   * iterator protocol can be revised.
+   *
+   * @param incoming the incoming batch for some operator which may
+   * (or may not) be an external sort (or, an external sort wrapped
+   * in a batch iterator validator.)
+   */
 
-    for (Ordering od : popConfig.getOrderings()) {
-      // first, we rewrite the evaluation stack for each side of the 
comparison.
-      ErrorCollector collector = new ErrorCollectorImpl();
-      final LogicalExpression expr = 
ExpressionTreeMaterializer.materialize(od.getExpr(), batch, 
collector,context.getFunctionRegistry());
-      if (collector.hasErrors()) {
-        throw new SchemaChangeException("Failure while materializing 
expression. " + collector.toErrorString());
-      }
-      g.setMappingSet(LEFT_MAPPING);
-      HoldingContainer left = g.addExpr(expr, 
ClassGenerator.BlkCreateMode.FALSE);
-      g.setMappingSet(RIGHT_MAPPING);
-      HoldingContainer right = g.addExpr(expr, 
ClassGenerator.BlkCreateMode.FALSE);
-      g.setMappingSet(MAIN_MAPPING);
-
-      // next we wrap the two comparison sides and add the expression block 
for the comparison.
-      LogicalExpression fh =
-          FunctionGenerationHelper.getOrderingComparator(od.nullsSortHigh(), 
left, right,
-                                                         
context.getFunctionRegistry());
-      HoldingContainer out = g.addExpr(fh, ClassGenerator.BlkCreateMode.FALSE);
-      JConditional jc = g.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0)));
-
-      if (od.getDirection() == Direction.ASCENDING) {
-        jc._then()._return(out.getValue());
-      }else{
-        jc._then()._return(out.getValue().minus());
-      }
-      g.rotateBlock();
+  public static void retainSv4OnNone(RecordBatch incoming) {
+    if (incoming instanceof IteratorValidatorBatchIterator) {
+      incoming = ((IteratorValidatorBatchIterator) incoming).getIncoming();
     }
+    if (incoming instanceof ExternalSortBatch) {
+      ((ExternalSortBatch) incoming).retainInMemoryBatchesOnNone = true;
+    }
+  }
 
-    g.rotateBlock();
-    g.getEvalBlock()._return(JExpr.lit(0));
+  public static void releaseBatches(RecordBatch incoming) {
+    if (incoming instanceof IteratorValidatorBatchIterator) {
+      incoming = ((IteratorValidatorBatchIterator) incoming).getIncoming();
+    }
+    if (incoming instanceof ExternalSortBatch) {
+      ExternalSortBatch esb = (ExternalSortBatch) incoming;
+      esb.resetSortState();
+    }
   }
 
-  private void createCopier(VectorAccessible batch, List<BatchGroup> 
batchGroupList, VectorContainer outputContainer, boolean spilling) throws 
SchemaChangeException {
-    try {
-      if (copier == null) {
-        CodeGenerator<PriorityQueueCopier> cg = 
CodeGenerator.get(PriorityQueueCopier.TEMPLATE_DEFINITION, 
context.getOptions());
-        cg.plainJavaCapable(true);
-        // Uncomment out this line to debug the generated code.
-//        cg.saveCodeForDebugging(true);
-        ClassGenerator<PriorityQueueCopier> g = cg.getRoot();
-
-        generateComparisons(g, batch);
-
-        g.setMappingSet(COPIER_MAPPING_SET);
-        CopyUtil.generateCopies(g, batch, true);
-        g.setMappingSet(MAIN_MAPPING);
-        copier = context.getImplementationClass(cg);
-      } else {
-        copier.close();
-      }
+  private void releaseResources() {
+    if (resultsIterator != null) {
+      resultsIterator.close();
+    }
 
-      BufferAllocator allocator = spilling ? copierAllocator : oAllocator;
-      for (VectorWrapper<?> i : batch) {
-        ValueVector v = TypeHelper.getNewVector(i.getField(), allocator);
-        outputContainer.add(v);
-      }
-      copier.setup(context, allocator, batch, batchGroupList, outputContainer);
-    } catch (ClassTransformationException | IOException e) {
-      throw new RuntimeException(e);
+    // We only zero vectors for actual output container
+    outputWrapperContainer.clear();
+    outputSV4.clear();
+    container.zeroVectors();
+
+    // Close sortImpl for this boundary
+    if (sortImpl != null) {
+      sortImpl.close();
+    }
+  }
+
+  /**
+   * Method to reset sort state after every EMIT outcome is seen to process 
next batch of incoming records which
+   * belongs to different record boundary.
+   */
+  private void resetSortState() {
 
 Review comment:
   If it's ok to reassign ```sortState``` after ```releaseResources()``` call, 
I would suggest rewriting the method body to: 
   ```java
       // This means if it has received NONE/EMIT outcome and flag to retain is 
false which will be the case in presence of
       // StreamingAggBatch only since it will explicitly call releaseBacthes 
on ExternalSort when its done consuming
       // all the data buffer.
   
       // Close the iterator here to release any remaining resources such
       // as spill files. This is important when a query has a join: the
       // first branch sort may complete before the second branch starts;
       // it may be quite a while after returning the last batch before the
       // fragment executor calls this operator's close method.
       //
       // Note however, that the StreamingAgg operator REQUIRES that the sort
       // retain the batches behind an SV4 when doing an in-memory sort because
       // the StreamingAgg retains a reference to that data that it will use
       // after receiving a NONE result code. See DRILL-5656.
       releaseResources();
   
       if (lastKnownOutcome == EMIT) {
         sortState = SortState.LOAD;
         sortImpl = createNewSortImpl();
         sortImpl.setSchema(schema);
         resultsIterator = new SortImpl.EmptyResults(outputWrapperContainer);
       } else {
         sortState = SortState.DONE;
       }
   ```
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove old "unmanaged" sort implementation
> ------------------------------------------
>
>                 Key: DRILL-6832
>                 URL: https://issues.apache.org/jira/browse/DRILL-6832
>             Project: Apache Drill
>          Issue Type: Improvement
>    Affects Versions: 1.14.0
>            Reporter: Paul Rogers
>            Assignee: Paul Rogers
>            Priority: Minor
>
> Several releases back Drill introduced a new "managed" external sort that 
> enhanced the sort operator's memory management. To be safe, at the time, the 
> new version was controlled by an option, with the ability to revert to the 
> old version.
> The new version has proven to be stable. The time has come to remove the old 
> version.
> * Remove the implementation in {{physical.impl.xsort}}.
> * Move the implementation from {{physical.impl.xsort.managed}} to the parent 
> package.
> * Remove the conditional code in the batch creator.
> * Remove the option that allowed disabling the new version.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to