Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/822#discussion_r118814041
  
    --- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
 ---
    @@ -400,114 +782,411 @@ public IterOutcome getOutcome() {
     
       @Override
       public int getOutputCount() {
    -    // return outputCount;
         return lastBatchOutputCount;
       }
     
       @Override
       public void cleanup() {
    -    if (htable != null) {
    -      htable.clear();
    -      htable = null;
    -    }
    +      if ( schema == null ) { return; } // not set up; nothing to clean
    +      for ( int i = 0; i < numPartitions; i++) {
    +          if (htables[i] != null) {
    +              htables[i].clear();
    +              htables[i] = null;
    +          }
    +          if ( batchHolders[i] != null) {
    +              for (BatchHolder bh : batchHolders[i]) {
    +                    bh.clear();
    +              }
    +              batchHolders[i].clear();
    +              batchHolders[i] = null;
    +          }
    +
    +          // delete any (still active) output spill file
    +          if ( outputStream[i] != null && spillFiles[i] != null) {
    +            try {
    +              spillSet.delete(spillFiles[i]);
    +            } catch(IOException e) {
    +              logger.warn("Cleanup: Failed to delete spill file 
{}",spillFiles[i]);
    +            }
    +          }
    +      }
    +      // delete any spill file left in unread spilled partitions
    +      while ( ! spilledPartitionsList.isEmpty() ) {
    +        SpilledPartition sp = spilledPartitionsList.remove(0);
    +        try {
    +          spillSet.delete(sp.spillFile);
    +        } catch(IOException e) {
    +          logger.warn("Cleanup: Failed to delete spill file 
{}",sp.spillFile);
    +        }
    +      }
    +      spillSet.close(); // delete the spill directory(ies)
         htIdxHolder = null;
         materializedValueFields = null;
         outStartIdxHolder = null;
         outNumRecordsHolder = null;
    +  }
     
    -    if (batchHolders != null) {
    -      for (BatchHolder bh : batchHolders) {
    +  // First free the memory used by the given (spilled) partition (i.e., 
hash table plus batches)
    +  // then reallocate them in pristine state to allow the partition to 
continue receiving rows
    +  private void reinitPartition(int part) throws SchemaChangeException, 
ClassTransformationException, IOException {
    +    assert htables[part] != null;
    +    htables[part].reset();
    +    if ( batchHolders[part] != null) {
    +      for (BatchHolder bh : batchHolders[part]) {
             bh.clear();
           }
    -      batchHolders.clear();
    -      batchHolders = null;
    +      batchHolders[part].clear();
         }
    +    batchHolders[part] = new ArrayList<BatchHolder>(); // First 
BatchHolder is created when the first put request is received.
       }
     
    -//  private final AggOutcome setOkAndReturn() {
    -//    this.outcome = IterOutcome.OK;
    -//    for (VectorWrapper<?> v : outgoing) {
    -//      v.getValueVector().getMutator().setValueCount(outputCount);
    -//    }
    -//    return AggOutcome.RETURN_OUTCOME;
    -//  }
     
       private final void incIndex() {
         underlyingIndex++;
         if (underlyingIndex >= incoming.getRecordCount()) {
           currentIndex = Integer.MAX_VALUE;
           return;
         }
    -    currentIndex = getVectorIndex(underlyingIndex);
    +    try { currentIndex = getVectorIndex(underlyingIndex); }
    +    catch (SchemaChangeException sc) { throw new 
DrillRuntimeException(sc);}
       }
     
       private final void resetIndex() {
         underlyingIndex = -1;
         incIndex();
       }
     
    -  private void addBatchHolder() {
    +  private boolean isSpilled(int part) {
    +    return outputStream[part] != null;
    +  }
    +  /**
    +   * Which partition to choose for flushing out (i.e. spill or return) ?
    +   * - The current partition (to which a new bach holder is added) has a 
priority,
    +   *   because its last batch holder is full.
    +   * - Also the largest prior spilled partition has some priority, as it 
is already spilled;
    +   *   but spilling too few rows (e.g. a single batch) gets us nothing.
    +   * - So the largest non-spilled partition has some priority, to get more 
memory freed.
    +   * Need to weigh the above three options.
    +   *
    +   *  @param currPart - The partition that hit the memory limit (gets a 
priority)
    +   *  @return The partition (number) chosen to be spilled
    +   */
    +  private int chooseAPartitionToFlush(int currPart) {
    +    if ( ! is2ndPhase ) { return currPart; } // 1st phase: just use the 
current partition
    +    int currPartSize = batchHolders[currPart].size();
    +    if ( currPartSize == 1 ) { currPartSize = -1; } // don't pick current 
if size is 1
    +    // first find the largest spilled partition
    +    int maxSizeSpilled = -1;
    +    int indexMaxSpilled = -1;
    +    for (int isp = 0; isp < numPartitions; isp++ ) {
    +      if ( isSpilled(isp) && maxSizeSpilled < batchHolders[isp].size() ) {
    +        maxSizeSpilled = batchHolders[isp].size();
    +        indexMaxSpilled = isp;
    +      }
    +    }
    +    // Give the current (if already spilled) some priority
    +    if ( isSpilled(currPart) && ( currPartSize + 1 >= maxSizeSpilled )) {
    +      maxSizeSpilled = currPartSize ;
    +      indexMaxSpilled = currPart;
    +    }
    +    // now find the largest non-spilled partition
    +    int maxSize = -1;
    +    int indexMax = -1;
    +    // Use the largest spilled (if found) as a base line, with a factor of 
4
    +    if ( indexMaxSpilled > -1 && maxSizeSpilled > 1 ) {
    +      indexMax = indexMaxSpilled;
    +      maxSize = 4 * maxSizeSpilled ;
    +    }
    +    for ( int insp = 0; insp < numPartitions; insp++) {
    +      if ( ! isSpilled(insp) && maxSize < batchHolders[insp].size() ) {
    +        indexMax = insp;
    +        maxSize = batchHolders[insp].size();
    +      }
    +    }
    +    // again - priority to the current partition
    +    if ( ! isSpilled(currPart) && (currPartSize + 1 >= maxSize) ) {
    +      return currPart;
    +    }
    +    if ( maxSize <= 1 ) { // Can not make progress by spilling a single 
batch!
    +      return -1; // try skipping this spill
    +    }
    +    return indexMax;
    +  }
    +
    +  /**
    +   * Iterate through the batches of the given partition, writing them to a 
file
    +   *
    +   * @param part The partition (number) to spill
    +   */
    +  private void spillAPartition(int part) {
    +
    +    ArrayList<BatchHolder> currPartition = batchHolders[part];
    +    rowsInPartition = 0;
    +    if ( EXTRA_DEBUG_SPILL ) {
    +      logger.debug("HashAggregate: Spilling partition {} current cycle {} 
part size {}", part, cycleNum, currPartition.size());
    +    }
    +
    +    if ( currPartition.size() == 0 ) { return; } // in case empty - 
nothing to spill
    +
    +    // If this is the first spill for this partition, create an output 
stream
    +    if ( ! isSpilled(part) ) {
    +
    +      spillFiles[part] = spillSet.getNextSpillFile(cycleNum > 0 ? 
Integer.toString(cycleNum) : null);
    +
    +      try {
    +        outputStream[part] = spillSet.openForOutput(spillFiles[part]);
    +      } catch (IOException e) {
    +        throw new DrillRuntimeException("Hash Aggregation failed to open 
spill file: " + spillFiles[part]);
    +      }
    +    }
    +
    +    for (int currOutBatchIndex = 0; currOutBatchIndex < 
currPartition.size(); currOutBatchIndex++ ) {
    +
    +      // get the number of records in the batch holder that are pending 
output
    +      int numPendingOutput = 
currPartition.get(currOutBatchIndex).getNumPendingOutput();
    +
    +      rowsInPartition += numPendingOutput;  // for logging
    +      rowsSpilled += numPendingOutput;
    +
    +      allocateOutgoing(numPendingOutput);
    +
    +      currPartition.get(currOutBatchIndex).outputValues(outStartIdxHolder, 
outNumRecordsHolder);
    +      int numOutputRecords = outNumRecordsHolder.value;
    +
    +      this.htables[part].outputKeys(currOutBatchIndex, this.outContainer, 
outStartIdxHolder.value, outNumRecordsHolder.value);
    +
    +      // set the value count for outgoing batch value vectors
    +      /* int i = 0; */
    +      for (VectorWrapper<?> v : outgoing) {
    +        v.getValueVector().getMutator().setValueCount(numOutputRecords);
    +        /*
    +        // print out the first row to be spilled ( varchar, varchar, 
bigint )
    +        try {
    +          if (i++ < 2) {
    +            NullableVarCharVector vv = ((NullableVarCharVector) 
v.getValueVector());
    +            logger.info("FIRST ROW = {}", vv.getAccessor().get(0));
    +          } else {
    +            NullableBigIntVector vv = ((NullableBigIntVector) 
v.getValueVector());
    +            logger.info("FIRST ROW = {}", vv.getAccessor().get(0));
    +          }
    +        } catch (Exception e) { logger.info("While printing the first row 
- Got an exception = {}",e); }
    +        */
    +      }
    +
    +      outContainer.setRecordCount(numPendingOutput);
    +      WritableBatch batch = 
WritableBatch.getBatchNoHVWrap(numPendingOutput, outContainer, false);
    +      VectorAccessibleSerializable outputBatch = new 
VectorAccessibleSerializable(batch, allocator);
    +      Stopwatch watch = Stopwatch.createStarted();
    +      try {
    +        outputBatch.writeToStream(outputStream[part]);
    +      } catch (IOException e) {
    +        throw new DrillRuntimeException("Hash Aggregation failed to write 
to output stream: " + outputStream[part].toString());
    +      }
    +      outContainer.zeroVectors();
    +      logger.trace("HASH AGG: Took {} us to spill {} records", 
watch.elapsed(TimeUnit.MICROSECONDS), numPendingOutput);
    +    }
    +
    +    spilledBatchesCount[part] += currPartition.size(); // update count of 
spilled batches
    +
    +    logger.trace("HASH AGG: Spilled {} rows from {} batches of partition 
{}", rowsInPartition, currPartition.size(), part);
    +  }
    +
    +  private void addBatchHolder(int part) {
    +
         BatchHolder bh = newBatchHolder();
    -    batchHolders.add(bh);
    +    batchHolders[part].add(bh);
     
         if (EXTRA_DEBUG_1) {
    -      logger.debug("HashAggregate: Added new batch; num batches = {}.", 
batchHolders.size());
    +      logger.debug("HashAggregate: Added new batch; num batches = {}.", 
batchHolders[part].size());
         }
     
         bh.setup();
       }
     
    -  // Overridden in the generated class when created as plain Java code.
    -
    +  // These methods are overridden in the generated class when created as 
plain Java code.
       protected BatchHolder newBatchHolder() {
         return new BatchHolder();
       }
    +  protected SpilledRecordbatch newSpilledRecordBatch(String arg1, int 
arg2, FragmentContext arg4, BatchSchema arg5, OperatorContext arg6) {
    +    return new SpilledRecordbatch(arg1, arg2, arg4, arg5, arg6);
    +  }
     
    +  /**
    +   * Output the next batch from partition "nextPartitionToReturn"
    +   *
    +   * @return iteration outcome (e.g., OK, NONE ...)
    +   */
       @Override
       public IterOutcome outputCurrentBatch() {
    -    if (outBatchIndex >= batchHolders.size()) {
    -      this.outcome = IterOutcome.NONE;
    -      return outcome;
    +
    +    // when incoming was an empty batch, just finish up
    +    if ( schema == null ) {
    +      logger.trace("Incoming was empty; output is an empty batch.");
    +      this.outcome = IterOutcome.NONE; // no records were read
    +      allFlushed = true;
    +      return this.outcome;
         }
     
    -    // get the number of records in the batch holder that are pending 
output
    -    int numPendingOutput = 
batchHolders.get(outBatchIndex).getNumPendingOutput();
    +    // Initialization (covers the case of early output)
    +    ArrayList<BatchHolder> currPartition = batchHolders[earlyPartition];
    +    int currOutBatchIndex = outBatchIndex[earlyPartition];
    +    int partitionToReturn = earlyPartition;
    +
    +    if ( ! earlyOutput ) {
    +      // Update the next partition to return (if needed)
    +      // skip fully returned (or spilled) partitions
    +      while (nextPartitionToReturn < numPartitions) {
    +        //
    +        // If this partition was spilled - spill the rest of it and skip it
    +        //
    +        if ( isSpilled(nextPartitionToReturn) ) {
    +          spillAPartition(nextPartitionToReturn); // spill the rest
    +          SpilledPartition sp = new SpilledPartition();
    +          sp.spillFile = spillFiles[nextPartitionToReturn];
    +          sp.spilledBatches = spilledBatchesCount[nextPartitionToReturn];
    +          sp.cycleNum = cycleNum; // remember the current cycle
    +          sp.origPartn = nextPartitionToReturn; // for debugging / filename
    +          sp.prevOrigPartn = originalPartition; // for debugging / filename
    +          spilledPartitionsList.add(sp);
    +          try {
    +            reinitPartition(nextPartitionToReturn); // free the memory
    +          } catch (Exception e) {throw new RuntimeException(e);}
    +          try {
    +            long posn = 
spillSet.getPosition(outputStream[nextPartitionToReturn]);
    +            spillSet.tallyWriteBytes(posn); // for the IO stats
    +            outputStream[nextPartitionToReturn].close();
    +          } catch (IOException e) { throw new RuntimeException(e); }
    +          outputStream[nextPartitionToReturn] = null;
    +        }
    +        else {
    +          currPartition = batchHolders[nextPartitionToReturn];
    +          currOutBatchIndex = outBatchIndex[nextPartitionToReturn];
    +          // If curr batch (partition X index) is not empty - proceed to 
return it
    +          if (currOutBatchIndex < currPartition.size() && 0 != 
currPartition.get(currOutBatchIndex).getNumPendingOutput()) {
    +            break;
    +          }
    +        }
    +        nextPartitionToReturn++; // else check next partition
    +      }
    +
    +      // if passed the last partition
    +      if (nextPartitionToReturn >= numPartitions) {
    +        // The following "if" is probably never used; due to a similar 
check at the end of this method
    +        if ( spilledPartitionsList.isEmpty() ) { // and no spilled 
partitions
    +          allFlushed = true;
    +          this.outcome = IterOutcome.NONE;
    +          if ( is2ndPhase ) {
    +            stats.setLongStat(Metric.SPILL_MB, // update stats - total MB 
spilled
    +                (int) Math.round(spillSet.getWriteBytes() / 1024.0D / 
1024.0));
    +          }
    +          return outcome;  // then return NONE
    +        }
    +        // Else - there are still spilled partitions to process - pick one 
and handle just like a new incoming
    +        buildComplete = false; // go back and call doWork() again
    +        handlingSpills = true; // beginning to work on the spill files
    +        // pick a spilled partition; set a new incoming ...
    +        SpilledPartition sp = spilledPartitionsList.remove(0);
    +        SpilledRecordbatch newIncoming = 
newSpilledRecordBatch(sp.spillFile, sp.spilledBatches, context, schema, 
oContext);
    +        originalPartition = sp.origPartn; // used for the filename
    +        logger.trace("Reading back spilled original partition {} as an 
incoming",originalPartition);
    +        // prevOriginalPartition = sp.prevOrigPartn;
    +        // Initialize .... new incoming, new set of partitions
    +        try { initializeSetup(newIncoming); } catch (Exception e) { throw 
new RuntimeException(e); }
    +        // update the cycle num if needed
    +        // The current cycle num should always be one larger than in the 
spilled partition
    +        if ( cycleNum == sp.cycleNum ) {
    +          cycleNum = 1 + sp.cycleNum;
    +          stats.setLongStat(Metric.SPILL_CYCLE, cycleNum); // update stats
    +          // report memory stressful situations
    +          if ( cycleNum == 2 ) { logger.info("SECONDARY SPILLING "); }
    +          if ( cycleNum == 3 ) { logger.info("TERTIARY SPILLING "); }
    +        }
    +        if ( EXTRA_DEBUG_SPILL ) {
    +          logger.debug("Start reading spilled partition {} (prev {}) from 
cycle {} (with {} batches). More {} spilled partitions left.",
    +              sp.origPartn, sp.prevOrigPartn, sp.cycleNum, 
sp.spilledBatches, spilledPartitionsList.size());
    +        }
    +        return IterOutcome.RESTART;
    +      }
    +
    +      partitionToReturn = nextPartitionToReturn ;
     
    -    if (numPendingOutput == 0) {
    -      this.outcome = IterOutcome.NONE;
    -      return outcome;
         }
     
    +    // get the number of records in the batch holder that are pending 
output
    +    int numPendingOutput = 
currPartition.get(currOutBatchIndex).getNumPendingOutput();
    +
    +    // The following accounting is for logging, metrics, etc.
    +    rowsInPartition += numPendingOutput ;
    +    if ( ! handlingSpills ) { rowsNotSpilled += numPendingOutput; }
    +    else { rowsSpilledReturned += numPendingOutput; }
    +    if ( earlyOutput ) { rowsReturnedEarly += numPendingOutput; }
    +
         allocateOutgoing(numPendingOutput);
     
    -    batchHolders.get(outBatchIndex).outputValues(outStartIdxHolder, 
outNumRecordsHolder);
    +    currPartition.get(currOutBatchIndex).outputValues(outStartIdxHolder, 
outNumRecordsHolder);
         int numOutputRecords = outNumRecordsHolder.value;
     
         if (EXTRA_DEBUG_1) {
           logger.debug("After output values: outStartIdx = {}, outNumRecords = 
{}", outStartIdxHolder.value, outNumRecordsHolder.value);
         }
    -    this.htable.outputKeys(outBatchIndex, this.outContainer, 
outStartIdxHolder.value, outNumRecordsHolder.value);
    +
    +    this.htables[partitionToReturn].outputKeys(currOutBatchIndex, 
this.outContainer, outStartIdxHolder.value, outNumRecordsHolder.value);
     
         // set the value count for outgoing batch value vectors
         for (VectorWrapper<?> v : outgoing) {
           v.getValueVector().getMutator().setValueCount(numOutputRecords);
         }
     
    -//    outputCount += numOutputRecords;
    -
         this.outcome = IterOutcome.OK;
     
    -    logger.debug("HashAggregate: Output current batch index {} with {} 
records.", outBatchIndex, numOutputRecords);
    +    // logger.debug("HashAggregate: Output {} current batch index {} with 
{} records for partition {}.", earlyOutput ? "(early)" : "",
    +    //    outBatchIndex, numOutputRecords, partitionToReturn);
    +    if ( EXTRA_DEBUG_SPILL && is2ndPhase ) {
    +      logger.debug("So far returned {} + SpilledReturned {}  total {} 
(spilled {})",rowsNotSpilled,rowsSpilledReturned,
    +        rowsNotSpilled+rowsSpilledReturned,
    +        rowsSpilled);
    +    }
     
         lastBatchOutputCount = numOutputRecords;
    -    outBatchIndex++;
    -    if (outBatchIndex == batchHolders.size()) {
    -      allFlushed = true;
    +    outBatchIndex[partitionToReturn]++;
    +    // if just flushed the last batch in the partition
    +    if (outBatchIndex[partitionToReturn] == currPartition.size()) {
    +
    +      if ( EXTRA_DEBUG_SPILL ) {
    +        logger.debug("HashAggregate: {} Flushed partition {} with {} 
batches total {} rows",
    +            earlyOutput ? "(Early)" : "",
    +            partitionToReturn, outBatchIndex[partitionToReturn], 
rowsInPartition);
    +      }
    +      rowsInPartition = 0; // reset to count for the next partition
    +
    +      try {
    +        // deallocate memory used by this partition, and re-initialize
    +        reinitPartition(partitionToReturn);
    +      } catch (SchemaChangeException sce) {
    +        throw new DrillRuntimeException("Hash Aggregation can not handle 
schema changes.");
    +      } catch (Exception e) { /* just ignore */ }
    --- End diff --
    
    Even, say, an OOM?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to