Ben-Zvi commented on a change in pull request #1358: DRILL-6516: EMIT support
in streaming agg
URL: https://github.com/apache/drill/pull/1358#discussion_r199974888
##########
File path:
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
##########
@@ -189,83 +209,128 @@ else if (isSame( previousIndex, currentIndex )) {
logger.debug("Received IterOutcome of {}", out);
}
switch (out) {
- case NONE:
- done = true;
- lastOutcome = out;
- if (first && addedRecordCount == 0) {
- return setOkAndReturn();
- } else if (addedRecordCount > 0) {
- outputToBatchPrev(previous, previousIndex, outputCount); // No
need to check the return value
- // (output container full or not) as we are not going to
insert any more records.
- if (EXTRA_DEBUG) {
- logger.debug("Received no more batches, returning.");
+ case NONE:
+ done = true;
+ lastOutcome = out;
+ if (firstBatchForDataSet && addedRecordCount == 0) {
+ return setOkAndReturn(out);
+ } else if (addedRecordCount > 0) {
+ outputToBatchPrev(previous, previousIndex, outputCount); //
No need to check the return value
+ // (output container full or not) as we are not going to
insert any more records.
+ if (EXTRA_DEBUG) {
+ logger.debug("Received no more batches, returning.");
+ }
+ return setOkAndReturn(out);
+ } else {
+ // not first batch and record Count == 0
+ outcome = out;
+ return AggOutcome.CLEANUP_AND_RETURN;
}
- return setOkAndReturn();
- } else {
- if (first && out == IterOutcome.OK) {
- out = IterOutcome.OK_NEW_SCHEMA;
+ // EMIT is handled like OK, except that we do not loop back to
process the
+ // next incoming batch; we return instead
+ case EMIT:
+ if (incoming.getRecordCount() == 0) {
+ if (addedRecordCount > 0) {
+ outputToBatchPrev(previous, previousIndex, outputCount);
+ }
+ resetIndex();
+ return setOkAndReturn(out);
+ } else {
+ resetIndex();
+ if (previousIndex != -1 && isSamePrev(previousIndex,
previous, currentIndex)) {
+ if (EXTRA_DEBUG) {
+ logger.debug("New value was same as last value of
previous batch, adding.");
+ }
+ addRecordInc(currentIndex);
+ previousIndex = currentIndex;
+ incIndex();
+ if (EXTRA_DEBUG) {
+ logger.debug("Continuing outside");
+ }
+ processRemainingRecordsInBatch();
+ // currentIndex has been reset to int_max so use previous
index.
+ outputToBatch(previousIndex);
+ resetIndex();
+ return setOkAndReturn(out);
+ } else { // not the same
+ if (EXTRA_DEBUG) {
+ logger.debug("This is not the same as the previous, add
record and continue outside.");
+ }
+ if (addedRecordCount > 0) {
+ if (outputToBatchPrev(previous, previousIndex,
outputCount)) {
+ if (EXTRA_DEBUG) {
+ logger.debug("Output container is full. flushing
it.");
+ }
+ return setOkAndReturn(out);
+ }
+ }
+ previousIndex = -1;
+ processRemainingRecordsInBatch();
+ outputToBatch(previousIndex); // currentIndex has been
reset to int_max so use previous index.
+ resetIndex();
+ return setOkAndReturn(out);
+ }
}
- outcome = out;
- return AggOutcome.CLEANUP_AND_RETURN;
- }
-
- case NOT_YET:
- this.outcome = out;
- return AggOutcome.RETURN_OUTCOME;
-
- case OK_NEW_SCHEMA:
- if (EXTRA_DEBUG) {
- logger.debug("Received new schema. Batch has {} records.",
incoming.getRecordCount());
- }
- if (addedRecordCount > 0) {
- outputToBatchPrev(previous, previousIndex, outputCount); // No
need to check the return value
- // (output container full or not) as we are not going to
insert anymore records.
+
+ case NOT_YET:
+ this.outcome = out;
+ return AggOutcome.RETURN_OUTCOME;
+
+ case OK_NEW_SCHEMA:
+ firstBatchForSchema = true;
+ //lastOutcome = out;
if (EXTRA_DEBUG) {
- logger.debug("Wrote out end of previous batch, returning.");
+ logger.debug("Received new schema. Batch has {} records.",
incoming.getRecordCount());
}
- newSchema = true;
- return setOkAndReturn();
- }
- cleanup();
- return AggOutcome.UPDATE_AGGREGATOR;
- case OK:
- resetIndex();
- if (incoming.getRecordCount() == 0) {
- continue;
- } else {
- if (previousIndex != -1 && isSamePrev(previousIndex ,
previous, currentIndex)) {
+ if (addedRecordCount > 0) {
+ outputToBatchPrev(previous, previousIndex, outputCount); //
No need to check the return value
+ // (output container full or not) as we are not going to
insert anymore records.
if (EXTRA_DEBUG) {
- logger.debug("New value was same as last value of previous
batch, adding.");
+ logger.debug("Wrote out end of previous batch,
returning.");
}
- addRecordInc(currentIndex);
- previousIndex = currentIndex;
- incIndex();
- if (EXTRA_DEBUG) {
- logger.debug("Continuing outside");
- }
- continue outside;
- } else { // not the same
- if (EXTRA_DEBUG) {
- logger.debug("This is not the same as the previous, add
record and continue outside.");
- }
- if (addedRecordCount > 0) {
- if (outputToBatchPrev(previous, previousIndex,
outputCount)) {
- if (EXTRA_DEBUG) {
- logger.debug("Output container is full. flushing it.");
+ newSchema = true;
+ return setOkAndReturn(out);
+ }
+ cleanup();
+ return AggOutcome.UPDATE_AGGREGATOR;
+ case OK:
+ resetIndex();
+ if (incoming.getRecordCount() == 0) {
+ continue;
Review comment:
Should this be `continue outside` ?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services