[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16027254#comment-16027254 ]
ASF GitHub Bot commented on DRILL-5457: --------------------------------------- Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r118813928 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -400,114 +782,411 @@ public IterOutcome getOutcome() { @Override public int getOutputCount() { - // return outputCount; return lastBatchOutputCount; } @Override public void cleanup() { - if (htable != null) { - htable.clear(); - htable = null; - } + if ( schema == null ) { return; } // not set up; nothing to clean + for ( int i = 0; i < numPartitions; i++) { + if (htables[i] != null) { + htables[i].clear(); + htables[i] = null; + } + if ( batchHolders[i] != null) { + for (BatchHolder bh : batchHolders[i]) { + bh.clear(); + } + batchHolders[i].clear(); + batchHolders[i] = null; + } + + // delete any (still active) output spill file + if ( outputStream[i] != null && spillFiles[i] != null) { + try { + spillSet.delete(spillFiles[i]); + } catch(IOException e) { + logger.warn("Cleanup: Failed to delete spill file {}",spillFiles[i]); + } + } + } + // delete any spill file left in unread spilled partitions + while ( ! spilledPartitionsList.isEmpty() ) { + SpilledPartition sp = spilledPartitionsList.remove(0); + try { + spillSet.delete(sp.spillFile); + } catch(IOException e) { + logger.warn("Cleanup: Failed to delete spill file {}",sp.spillFile); + } + } + spillSet.close(); // delete the spill directory(ies) htIdxHolder = null; materializedValueFields = null; outStartIdxHolder = null; outNumRecordsHolder = null; + } - if (batchHolders != null) { - for (BatchHolder bh : batchHolders) { + // First free the memory used by the given (spilled) partition (i.e., hash table plus batches) + // then reallocate them in pristine state to allow the partition to continue receiving rows + private void reinitPartition(int part) throws SchemaChangeException, ClassTransformationException, IOException { + assert htables[part] != null; + htables[part].reset(); + if ( batchHolders[part] != null) { + for (BatchHolder bh : batchHolders[part]) { bh.clear(); } - batchHolders.clear(); - batchHolders = null; + batchHolders[part].clear(); } + batchHolders[part] = new ArrayList<BatchHolder>(); // First BatchHolder is created when the first put request is received. } -// private final AggOutcome setOkAndReturn() { -// this.outcome = IterOutcome.OK; -// for (VectorWrapper<?> v : outgoing) { -// v.getValueVector().getMutator().setValueCount(outputCount); -// } -// return AggOutcome.RETURN_OUTCOME; -// } private final void incIndex() { underlyingIndex++; if (underlyingIndex >= incoming.getRecordCount()) { currentIndex = Integer.MAX_VALUE; return; } - currentIndex = getVectorIndex(underlyingIndex); + try { currentIndex = getVectorIndex(underlyingIndex); } + catch (SchemaChangeException sc) { throw new DrillRuntimeException(sc);} } private final void resetIndex() { underlyingIndex = -1; incIndex(); } - private void addBatchHolder() { + private boolean isSpilled(int part) { + return outputStream[part] != null; + } + /** + * Which partition to choose for flushing out (i.e. spill or return) ? + * - The current partition (to which a new bach holder is added) has a priority, + * because its last batch holder is full. + * - Also the largest prior spilled partition has some priority, as it is already spilled; + * but spilling too few rows (e.g. a single batch) gets us nothing. + * - So the largest non-spilled partition has some priority, to get more memory freed. + * Need to weigh the above three options. + * + * @param currPart - The partition that hit the memory limit (gets a priority) + * @return The partition (number) chosen to be spilled + */ + private int chooseAPartitionToFlush(int currPart) { + if ( ! is2ndPhase ) { return currPart; } // 1st phase: just use the current partition + int currPartSize = batchHolders[currPart].size(); + if ( currPartSize == 1 ) { currPartSize = -1; } // don't pick current if size is 1 + // first find the largest spilled partition + int maxSizeSpilled = -1; + int indexMaxSpilled = -1; + for (int isp = 0; isp < numPartitions; isp++ ) { + if ( isSpilled(isp) && maxSizeSpilled < batchHolders[isp].size() ) { + maxSizeSpilled = batchHolders[isp].size(); + indexMaxSpilled = isp; + } + } + // Give the current (if already spilled) some priority + if ( isSpilled(currPart) && ( currPartSize + 1 >= maxSizeSpilled )) { + maxSizeSpilled = currPartSize ; + indexMaxSpilled = currPart; + } + // now find the largest non-spilled partition + int maxSize = -1; + int indexMax = -1; + // Use the largest spilled (if found) as a base line, with a factor of 4 + if ( indexMaxSpilled > -1 && maxSizeSpilled > 1 ) { + indexMax = indexMaxSpilled; + maxSize = 4 * maxSizeSpilled ; + } + for ( int insp = 0; insp < numPartitions; insp++) { + if ( ! isSpilled(insp) && maxSize < batchHolders[insp].size() ) { + indexMax = insp; + maxSize = batchHolders[insp].size(); + } + } + // again - priority to the current partition + if ( ! isSpilled(currPart) && (currPartSize + 1 >= maxSize) ) { + return currPart; + } + if ( maxSize <= 1 ) { // Can not make progress by spilling a single batch! + return -1; // try skipping this spill + } + return indexMax; + } + + /** + * Iterate through the batches of the given partition, writing them to a file + * + * @param part The partition (number) to spill + */ + private void spillAPartition(int part) { + + ArrayList<BatchHolder> currPartition = batchHolders[part]; + rowsInPartition = 0; + if ( EXTRA_DEBUG_SPILL ) { + logger.debug("HashAggregate: Spilling partition {} current cycle {} part size {}", part, cycleNum, currPartition.size()); + } + + if ( currPartition.size() == 0 ) { return; } // in case empty - nothing to spill + + // If this is the first spill for this partition, create an output stream + if ( ! isSpilled(part) ) { + + spillFiles[part] = spillSet.getNextSpillFile(cycleNum > 0 ? Integer.toString(cycleNum) : null); + + try { + outputStream[part] = spillSet.openForOutput(spillFiles[part]); + } catch (IOException e) { + throw new DrillRuntimeException("Hash Aggregation failed to open spill file: " + spillFiles[part]); + } + } + + for (int currOutBatchIndex = 0; currOutBatchIndex < currPartition.size(); currOutBatchIndex++ ) { + + // get the number of records in the batch holder that are pending output + int numPendingOutput = currPartition.get(currOutBatchIndex).getNumPendingOutput(); + + rowsInPartition += numPendingOutput; // for logging + rowsSpilled += numPendingOutput; + + allocateOutgoing(numPendingOutput); + + currPartition.get(currOutBatchIndex).outputValues(outStartIdxHolder, outNumRecordsHolder); + int numOutputRecords = outNumRecordsHolder.value; + + this.htables[part].outputKeys(currOutBatchIndex, this.outContainer, outStartIdxHolder.value, outNumRecordsHolder.value); + + // set the value count for outgoing batch value vectors --- End diff -- This code more-or-less duplicates the code in the `SpilledRun` class mentioned above. Maybe use a single class that we can unit test once rather than having multiple copies of the code? > Support Spill to Disk for the Hash Aggregate Operator > ----------------------------------------------------- > > Key: DRILL-5457 > URL: https://issues.apache.org/jira/browse/DRILL-5457 > Project: Apache Drill > Issue Type: Improvement > Components: Execution - Relational Operators > Affects Versions: 1.10.0 > Reporter: Boaz Ben-Zvi > Assignee: Boaz Ben-Zvi > Fix For: 1.11.0 > > > Support gradual spilling memory to disk as the available memory gets too > small to allow in memory work for the Hash Aggregate Operator. -- This message was sent by Atlassian JIRA (v6.3.15#6346)