more logging in streaming agg
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/31283c32 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/31283c32 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/31283c32 Branch: refs/heads/master Commit: 31283c32299fd9607f870ceb95518e0c33515132 Parents: 4935b19 Author: Jacques Nadeau <jacq...@apache.org> Authored: Sat Jun 7 17:56:21 2014 -0700 Committer: Jacques Nadeau <jacq...@apache.org> Committed: Sat Jun 7 17:56:21 2014 -0700 ---------------------------------------------------------------------- .../impl/aggregate/StreamingAggTemplate.java | 86 +++++++++++--------- 1 file changed, 47 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/31283c32/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java index e3eb6fe..48e3100 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java @@ -61,14 +61,14 @@ public abstract class StreamingAggTemplate implements StreamingAggregator { this.currentIndex = this.getVectorIndex(underlyingIndex); } - + private void allocateOutgoing() { for (VectorAllocator a : allocators) { if(EXTRA_DEBUG) logger.debug("Allocating {} with {} records.", a, 20000); a.alloc(20000); } } - + @Override public IterOutcome getOutcome() { return outcome; @@ -84,16 +84,16 @@ public abstract class StreamingAggTemplate implements StreamingAggregator { this.outcome = IterOutcome.STOP; return AggOutcome.CLEANUP_AND_RETURN; } - + @Override public AggOutcome doWork() { try{ // outside loop to ensure that first is set to false after the first run. - + // if we're in the first state, allocate outgoing. if(first){ allocateOutgoing(); } - + // pick up a remainder batch if we have one. if(remainderBatch != null){ if (!outputToBatch( previousIndex )) return tooBigFailure(); @@ -101,24 +101,25 @@ public abstract class StreamingAggTemplate implements StreamingAggregator { remainderBatch = null; return setOkAndReturn(); } - - + + // setup for new output and pick any remainder. if (pendingOutput) { allocateOutgoing(); pendingOutput = false; + if(EXTRA_DEBUG) logger.debug("Attempting to output remainder."); if (!outputToBatch( previousIndex)) return tooBigFailure(); } - + if(newSchema){ return AggOutcome.UPDATE_AGGREGATOR; } - + if(lastOutcome != null){ outcome = lastOutcome; return AggOutcome.CLEANUP_AND_RETURN; } - + outside: while(true){ // loop through existing records, adding as necessary. for (; underlyingIndex < incoming.getRecordCount(); incIndex()) { @@ -138,21 +139,21 @@ public abstract class StreamingAggTemplate implements StreamingAggregator { } else { if(EXTRA_DEBUG) logger.debug("Output failed."); if(outputCount == 0) return tooBigFailure(); - + // mark the pending output but move forward for the next cycle. pendingOutput = true; previousIndex = currentIndex; incIndex(); return setOkAndReturn(); - + } } previousIndex = currentIndex; } - - + + InternalBatch previous = null; - + try{ while(true){ previous = new InternalBatch(incoming); @@ -169,13 +170,13 @@ public abstract class StreamingAggTemplate implements StreamingAggregator { outcome = out; return AggOutcome.CLEANUP_AND_RETURN; } - - + + case NOT_YET: this.outcome = out; return AggOutcome.RETURN_OUTCOME; - + case OK_NEW_SCHEMA: if(EXTRA_DEBUG) logger.debug("Received new schema. Batch has {} records.", incoming.getRecordCount()); if(addedRecordCount > 0){ @@ -185,7 +186,7 @@ public abstract class StreamingAggTemplate implements StreamingAggregator { return setOkAndReturn(); } cleanup(); - return AggOutcome.UPDATE_AGGREGATOR; + return AggOutcome.UPDATE_AGGREGATOR; case OK: resetIndex(); if(incoming.getRecordCount() == 0){ @@ -204,7 +205,7 @@ public abstract class StreamingAggTemplate implements StreamingAggregator { if(addedRecordCount > 0){ if( !outputToBatchPrev( previous, previousIndex, outputCount) ){ remainderBatch = previous; - return setOkAndReturn(); + return setOkAndReturn(); } continue outside; } @@ -229,10 +230,10 @@ public abstract class StreamingAggTemplate implements StreamingAggregator { }finally{ if(first) first = !first; } - + } - - + + private final void incIndex(){ underlyingIndex++; if(underlyingIndex >= incoming.getRecordCount()){ @@ -241,12 +242,12 @@ public abstract class StreamingAggTemplate implements StreamingAggregator { } currentIndex = getVectorIndex(underlyingIndex); } - + private final void resetIndex(){ underlyingIndex = -1; incIndex(); } - + private final AggOutcome setOkAndReturn(){ if(first){ this.outcome = IterOutcome.OK_NEW_SCHEMA; @@ -260,16 +261,22 @@ public abstract class StreamingAggTemplate implements StreamingAggregator { } private final boolean outputToBatch(int inIndex){ - boolean success = outputRecordKeys(inIndex, outputCount) // - && outputRecordValues(outputCount) // - && resetValues(); - if(success){ - if(EXTRA_DEBUG) logger.debug("Outputting values to {}", outputCount); - outputCount++; - addedRecordCount = 0; + + if(!outputRecordKeys(inIndex, outputCount)){ + if(EXTRA_DEBUG) logger.debug("Failure while outputting keys {}", outputCount); + return false; } - - return success; + + if(!outputRecordValues(outputCount)){ + if(EXTRA_DEBUG) logger.debug("Failure while outputting values {}", outputCount); + return false; + } + + if(EXTRA_DEBUG) logger.debug("{} values output successfully", outputCount); + resetValues(); + outputCount++; + addedRecordCount = 0; + return true; } private final boolean outputToBatchPrev(InternalBatch b1, int inIndex, int outIndex){ @@ -277,21 +284,22 @@ public abstract class StreamingAggTemplate implements StreamingAggregator { && outputRecordValues(outIndex) // && resetValues(); if(success){ + resetValues(); outputCount++; addedRecordCount = 0; } - + return success; } - + private void addRecordInc(int index){ addRecord(index); this.addedRecordCount++; } - + @Override public void cleanup(){ - if(remainderBatch != null) remainderBatch.clear(); + if(remainderBatch != null) remainderBatch.clear(); } @@ -304,5 +312,5 @@ public abstract class StreamingAggTemplate implements StreamingAggregator { public abstract boolean outputRecordValues(@Named("outIndex") int outIndex); public abstract int getVectorIndex(@Named("recordIndex") int recordIndex); public abstract boolean resetValues(); - + }