DRILL-1081: Always return OK_NEW_SCHEMA on first batch in Streaming Agg
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/d5967c56 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/d5967c56 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/d5967c56 Branch: refs/heads/master Commit: d5967c564b4cce933210d2e673f128431c632441 Parents: b8d4576 Author: Steven Phillips <sphill...@maprtech.com> Authored: Wed Jun 25 20:00:25 2014 -0700 Committer: Jacques Nadeau <jacq...@apache.org> Committed: Thu Jun 26 09:02:24 2014 -0700 ---------------------------------------------------------------------- .../drill/exec/physical/impl/aggregate/StreamingAggBatch.java | 2 ++ .../exec/physical/impl/aggregate/StreamingAggTemplate.java | 5 ++++- .../physical/impl/validate/IteratorValidatorBatchIterator.java | 4 ++++ 3 files changed, 10 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d5967c56/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java index b587ad1..2f71bf9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java @@ -122,6 +122,8 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { first = false; done = true; return IterOutcome.OK_NEW_SCHEMA; + } else if (outcome == IterOutcome.OK && first) { + outcome = IterOutcome.OK_NEW_SCHEMA; } first = false; return outcome; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d5967c56/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java index 4d6e7c4..8a9ba3b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java @@ -170,6 +170,9 @@ public abstract class StreamingAggTemplate implements StreamingAggregator { if(EXTRA_DEBUG) logger.debug("Received no more batches, returning."); return setOkAndReturn(); }else{ + if (first && out == IterOutcome.OK) { + out = IterOutcome.OK_NEW_SCHEMA; + } outcome = out; return AggOutcome.CLEANUP_AND_RETURN; } @@ -195,7 +198,7 @@ public abstract class StreamingAggTemplate implements StreamingAggregator { if(incoming.getRecordCount() == 0){ continue; }else{ - if(isSamePrev(previousIndex , previous, currentIndex)){ + if(previousIndex != -1 && isSamePrev(previousIndex , previous, currentIndex)){ if(EXTRA_DEBUG) logger.debug("New value was same as last value of previous batch, adding."); addRecordInc(currentIndex); previousIndex = currentIndex; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d5967c56/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java index ee8f37a..c8e9c60 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java @@ -21,6 +21,7 @@ import java.util.Iterator; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.record.AbstractRecordBatch; import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.TypedFieldId; @@ -113,6 +114,9 @@ public class IteratorValidatorBatchIterator implements RecordBatch { if (first && state == IterOutcome.NONE) { throw new IllegalStateException("The incoming iterator returned a state of NONE on the first batch. There should always be at least one batch output before returning NONE"); } + if (first && state == IterOutcome.OK) { + throw new IllegalStateException("The incoming iterator returned a state of OK on the first batch. There should always be a new schema on the first batch. Incoming: " + incoming.getClass().getName()); + } if (first) first = !first; if(state == IterOutcome.OK || state == IterOutcome.OK_NEW_SCHEMA) {