Github user paul-rogers commented on a diff in the pull request:
https://github.com/apache/drill/pull/822#discussion_r118813964
--- Diff:
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
---
@@ -400,114 +782,411 @@ public IterOutcome getOutcome() {
@Override
public int getOutputCount() {
- // return outputCount;
return lastBatchOutputCount;
}
@Override
public void cleanup() {
- if (htable != null) {
- htable.clear();
- htable = null;
- }
+ if ( schema == null ) { return; } // not set up; nothing to clean
+ for ( int i = 0; i < numPartitions; i++) {
+ if (htables[i] != null) {
+ htables[i].clear();
+ htables[i] = null;
+ }
+ if ( batchHolders[i] != null) {
+ for (BatchHolder bh : batchHolders[i]) {
+ bh.clear();
+ }
+ batchHolders[i].clear();
+ batchHolders[i] = null;
+ }
+
+ // delete any (still active) output spill file
+ if ( outputStream[i] != null && spillFiles[i] != null) {
+ try {
+ spillSet.delete(spillFiles[i]);
+ } catch(IOException e) {
+ logger.warn("Cleanup: Failed to delete spill file
{}",spillFiles[i]);
+ }
+ }
+ }
+ // delete any spill file left in unread spilled partitions
+ while ( ! spilledPartitionsList.isEmpty() ) {
+ SpilledPartition sp = spilledPartitionsList.remove(0);
+ try {
+ spillSet.delete(sp.spillFile);
+ } catch(IOException e) {
+ logger.warn("Cleanup: Failed to delete spill file
{}",sp.spillFile);
+ }
+ }
+ spillSet.close(); // delete the spill directory(ies)
htIdxHolder = null;
materializedValueFields = null;
outStartIdxHolder = null;
outNumRecordsHolder = null;
+ }
- if (batchHolders != null) {
- for (BatchHolder bh : batchHolders) {
+ // First free the memory used by the given (spilled) partition (i.e.,
hash table plus batches)
+ // then reallocate them in pristine state to allow the partition to
continue receiving rows
+ private void reinitPartition(int part) throws SchemaChangeException,
ClassTransformationException, IOException {
+ assert htables[part] != null;
+ htables[part].reset();
+ if ( batchHolders[part] != null) {
+ for (BatchHolder bh : batchHolders[part]) {
bh.clear();
}
- batchHolders.clear();
- batchHolders = null;
+ batchHolders[part].clear();
}
+ batchHolders[part] = new ArrayList<BatchHolder>(); // First
BatchHolder is created when the first put request is received.
}
-// private final AggOutcome setOkAndReturn() {
-// this.outcome = IterOutcome.OK;
-// for (VectorWrapper<?> v : outgoing) {
-// v.getValueVector().getMutator().setValueCount(outputCount);
-// }
-// return AggOutcome.RETURN_OUTCOME;
-// }
private final void incIndex() {
underlyingIndex++;
if (underlyingIndex >= incoming.getRecordCount()) {
currentIndex = Integer.MAX_VALUE;
return;
}
- currentIndex = getVectorIndex(underlyingIndex);
+ try { currentIndex = getVectorIndex(underlyingIndex); }
+ catch (SchemaChangeException sc) { throw new
DrillRuntimeException(sc);}
}
private final void resetIndex() {
underlyingIndex = -1;
incIndex();
}
- private void addBatchHolder() {
+ private boolean isSpilled(int part) {
+ return outputStream[part] != null;
+ }
+ /**
+ * Which partition to choose for flushing out (i.e. spill or return) ?
+ * - The current partition (to which a new bach holder is added) has a
priority,
+ * because its last batch holder is full.
+ * - Also the largest prior spilled partition has some priority, as it
is already spilled;
+ * but spilling too few rows (e.g. a single batch) gets us nothing.
+ * - So the largest non-spilled partition has some priority, to get more
memory freed.
+ * Need to weigh the above three options.
+ *
+ * @param currPart - The partition that hit the memory limit (gets a
priority)
+ * @return The partition (number) chosen to be spilled
+ */
+ private int chooseAPartitionToFlush(int currPart) {
+ if ( ! is2ndPhase ) { return currPart; } // 1st phase: just use the
current partition
+ int currPartSize = batchHolders[currPart].size();
+ if ( currPartSize == 1 ) { currPartSize = -1; } // don't pick current
if size is 1
+ // first find the largest spilled partition
+ int maxSizeSpilled = -1;
+ int indexMaxSpilled = -1;
+ for (int isp = 0; isp < numPartitions; isp++ ) {
+ if ( isSpilled(isp) && maxSizeSpilled < batchHolders[isp].size() ) {
+ maxSizeSpilled = batchHolders[isp].size();
+ indexMaxSpilled = isp;
+ }
+ }
+ // Give the current (if already spilled) some priority
+ if ( isSpilled(currPart) && ( currPartSize + 1 >= maxSizeSpilled )) {
+ maxSizeSpilled = currPartSize ;
+ indexMaxSpilled = currPart;
+ }
+ // now find the largest non-spilled partition
+ int maxSize = -1;
+ int indexMax = -1;
+ // Use the largest spilled (if found) as a base line, with a factor of
4
+ if ( indexMaxSpilled > -1 && maxSizeSpilled > 1 ) {
+ indexMax = indexMaxSpilled;
+ maxSize = 4 * maxSizeSpilled ;
+ }
+ for ( int insp = 0; insp < numPartitions; insp++) {
+ if ( ! isSpilled(insp) && maxSize < batchHolders[insp].size() ) {
+ indexMax = insp;
+ maxSize = batchHolders[insp].size();
+ }
+ }
+ // again - priority to the current partition
+ if ( ! isSpilled(currPart) && (currPartSize + 1 >= maxSize) ) {
+ return currPart;
+ }
+ if ( maxSize <= 1 ) { // Can not make progress by spilling a single
batch!
+ return -1; // try skipping this spill
+ }
+ return indexMax;
+ }
+
+ /**
+ * Iterate through the batches of the given partition, writing them to a
file
+ *
+ * @param part The partition (number) to spill
+ */
+ private void spillAPartition(int part) {
+
+ ArrayList<BatchHolder> currPartition = batchHolders[part];
+ rowsInPartition = 0;
+ if ( EXTRA_DEBUG_SPILL ) {
+ logger.debug("HashAggregate: Spilling partition {} current cycle {}
part size {}", part, cycleNum, currPartition.size());
+ }
+
+ if ( currPartition.size() == 0 ) { return; } // in case empty -
nothing to spill
+
+ // If this is the first spill for this partition, create an output
stream
+ if ( ! isSpilled(part) ) {
+
+ spillFiles[part] = spillSet.getNextSpillFile(cycleNum > 0 ?
Integer.toString(cycleNum) : null);
+
+ try {
+ outputStream[part] = spillSet.openForOutput(spillFiles[part]);
+ } catch (IOException e) {
+ throw new DrillRuntimeException("Hash Aggregation failed to open
spill file: " + spillFiles[part]);
+ }
+ }
+
+ for (int currOutBatchIndex = 0; currOutBatchIndex <
currPartition.size(); currOutBatchIndex++ ) {
+
+ // get the number of records in the batch holder that are pending
output
+ int numPendingOutput =
currPartition.get(currOutBatchIndex).getNumPendingOutput();
+
+ rowsInPartition += numPendingOutput; // for logging
+ rowsSpilled += numPendingOutput;
+
+ allocateOutgoing(numPendingOutput);
+
+ currPartition.get(currOutBatchIndex).outputValues(outStartIdxHolder,
outNumRecordsHolder);
+ int numOutputRecords = outNumRecordsHolder.value;
+
+ this.htables[part].outputKeys(currOutBatchIndex, this.outContainer,
outStartIdxHolder.value, outNumRecordsHolder.value);
+
+ // set the value count for outgoing batch value vectors
+ /* int i = 0; */
+ for (VectorWrapper<?> v : outgoing) {
+ v.getValueVector().getMutator().setValueCount(numOutputRecords);
+ /*
+ // print out the first row to be spilled ( varchar, varchar,
bigint )
+ try {
+ if (i++ < 2) {
+ NullableVarCharVector vv = ((NullableVarCharVector)
v.getValueVector());
+ logger.info("FIRST ROW = {}", vv.getAccessor().get(0));
+ } else {
+ NullableBigIntVector vv = ((NullableBigIntVector)
v.getValueVector());
+ logger.info("FIRST ROW = {}", vv.getAccessor().get(0));
+ }
+ } catch (Exception e) { logger.info("While printing the first row
- Got an exception = {}",e); }
+ */
+ }
+
+ outContainer.setRecordCount(numPendingOutput);
+ WritableBatch batch =
WritableBatch.getBatchNoHVWrap(numPendingOutput, outContainer, false);
+ VectorAccessibleSerializable outputBatch = new
VectorAccessibleSerializable(batch, allocator);
+ Stopwatch watch = Stopwatch.createStarted();
+ try {
+ outputBatch.writeToStream(outputStream[part]);
+ } catch (IOException e) {
+ throw new DrillRuntimeException("Hash Aggregation failed to write
to output stream: " + outputStream[part].toString());
+ }
+ outContainer.zeroVectors();
+ logger.trace("HASH AGG: Took {} us to spill {} records",
watch.elapsed(TimeUnit.MICROSECONDS), numPendingOutput);
+ }
+
+ spilledBatchesCount[part] += currPartition.size(); // update count of
spilled batches
+
+ logger.trace("HASH AGG: Spilled {} rows from {} batches of partition
{}", rowsInPartition, currPartition.size(), part);
+ }
+
+ private void addBatchHolder(int part) {
+
BatchHolder bh = newBatchHolder();
- batchHolders.add(bh);
+ batchHolders[part].add(bh);
if (EXTRA_DEBUG_1) {
- logger.debug("HashAggregate: Added new batch; num batches = {}.",
batchHolders.size());
+ logger.debug("HashAggregate: Added new batch; num batches = {}.",
batchHolders[part].size());
}
bh.setup();
}
- // Overridden in the generated class when created as plain Java code.
-
+ // These methods are overridden in the generated class when created as
plain Java code.
protected BatchHolder newBatchHolder() {
return new BatchHolder();
}
+ protected SpilledRecordbatch newSpilledRecordBatch(String arg1, int
arg2, FragmentContext arg4, BatchSchema arg5, OperatorContext arg6) {
+ return new SpilledRecordbatch(arg1, arg2, arg4, arg5, arg6);
+ }
+ /**
+ * Output the next batch from partition "nextPartitionToReturn"
+ *
+ * @return iteration outcome (e.g., OK, NONE ...)
+ */
@Override
public IterOutcome outputCurrentBatch() {
- if (outBatchIndex >= batchHolders.size()) {
- this.outcome = IterOutcome.NONE;
- return outcome;
+
+ // when incoming was an empty batch, just finish up
+ if ( schema == null ) {
+ logger.trace("Incoming was empty; output is an empty batch.");
+ this.outcome = IterOutcome.NONE; // no records were read
+ allFlushed = true;
+ return this.outcome;
--- End diff --
Is the outcome used as a state? Otherwise, why not just
```
return IterOutcome.NONE;
```
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---