Repository: drill Updated Branches: refs/heads/master 767711919 -> cc97cd471
DRILL-2664: In StreamingAgg fix issues with handling the case where output batch is full. Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/cc97cd47 Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/cc97cd47 Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/cc97cd47 Branch: refs/heads/master Commit: cc97cd471c40030d136bb93a1873e01b6cd6e8ef Parents: 7677119 Author: Aman Sinha <[email protected]> Authored: Wed Apr 1 19:00:27 2015 -0700 Committer: Aman Sinha <[email protected]> Committed: Wed Apr 1 19:00:27 2015 -0700 ---------------------------------------------------------------------- .../drill/exec/physical/impl/WriterRecordBatch.java | 2 +- .../physical/impl/aggregate/StreamingAggTemplate.java | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/cc97cd47/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java index 74a674e..45022f6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java @@ -126,8 +126,8 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer> { } } while(upstream != IterOutcome.NONE); }catch(Exception ex){ - kill(false); logger.error("Failure during query", ex); + kill(false); context.fail(ex); return IterOutcome.STOP; } http://git-wip-us.apache.org/repos/asf/drill/blob/cc97cd47/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java index 36f9f29..bf27187 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java @@ -79,11 +79,12 @@ public abstract class StreamingAggTemplate implements StreamingAggregator { } try { // outside loop to ensure that first is set to false after the first run. outputCount = 0; + // allocate outgoing since either this is the first time or if a subsequent time we would + // have sent the previous outgoing batch to downstream operator + allocateOutgoing(); - // if we're in the first state, allocate outgoing. if (first) { this.currentIndex = incoming.getRecordCount() == 0 ? 0 : this.getVectorIndex(underlyingIndex); - allocateOutgoing(); } if (incoming.getRecordCount() == 0) { @@ -232,11 +233,11 @@ public abstract class StreamingAggTemplate implements StreamingAggregator { } previousIndex = currentIndex; if (addedRecordCount > 0) { - if (!outputToBatchPrev(previous, previousIndex, outputCount)) { + if (outputToBatchPrev(previous, previousIndex, outputCount)) { if (EXTRA_DEBUG) { logger.debug("Output container is full. flushing it."); - return setOkAndReturn(); } + return setOkAndReturn(); } continue outside; } @@ -320,7 +321,6 @@ public abstract class StreamingAggTemplate implements StreamingAggregator { outputRecordKeysPrev(b1, inIndex, outIndex); outputRecordValues(outIndex); resetValues(); - resetValues(); outputCount++; addedRecordCount = 0;
