Ben-Zvi commented on a change in pull request #1358: DRILL-6516: EMIT support
in streaming agg
URL: https://github.com/apache/drill/pull/1358#discussion_r199963330
##########
File path:
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
##########
@@ -154,83 +188,174 @@ public void buildSchema() throws SchemaChangeException {
public IterOutcome innerNext() {
// if a special batch has been sent, we have no data in the incoming so
exit early
- if (specialBatchSent) {
- return IterOutcome.NONE;
+ if ( done || specialBatchSent) {
+ return NONE;
+ }
+
+ // We sent an OK_NEW_SCHEMA and also encountered the end of a data set. So
we need to send
+ // an EMIT with an empty batch now
+ if (sendEmit) {
+ sendEmit = false;
+ firstBatchForDataSet = true;
+ recordCount = 0;
+ return EMIT;
}
// this is only called on the first batch. Beyond this, the aggregator
manages batches.
if (aggregator == null || first) {
- IterOutcome outcome;
if (first && incoming.getRecordCount() > 0) {
first = false;
- outcome = IterOutcome.OK_NEW_SCHEMA;
+ lastKnownOutcome = OK_NEW_SCHEMA;
} else {
- outcome = next(incoming);
+ lastKnownOutcome = next(incoming);
}
- logger.debug("Next outcome of {}", outcome);
- switch (outcome) {
- case NONE:
- if (first && popConfig.getKeys().size() == 0) {
+ logger.debug("Next outcome of {}", lastKnownOutcome);
+ switch (lastKnownOutcome) {
+ case NONE:
+ if (firstBatchForDataSet && popConfig.getKeys().size() == 0) {
+ // if we have a straight aggregate and empty input batch, we need
to handle it in a different way
+ constructSpecialBatch();
+ // set state to indicate the fact that we have sent a special
batch and input is empty
+ specialBatchSent = true;
+ // If outcome is NONE then we send the special batch in the first
iteration and the NONE
+ // outcome in the next iteration. If outcome is EMIT, we can send
the special
+ // batch and the EMIT outcome at the same time.
+ return getFinalOutcome();
+ }
+ // else fall thru
+ case OUT_OF_MEMORY:
+ case NOT_YET:
+ case STOP:
+ return lastKnownOutcome;
+ case OK_NEW_SCHEMA:
+ if (!createAggregator()) {
+ done = true;
+ return IterOutcome.STOP;
+ }
+ break;
+ case EMIT:
+ if (firstBatchForDataSet && popConfig.getKeys().size() == 0) {
+ // if we have a straight aggregate and empty input batch, we need
to handle it in a different way
+ constructSpecialBatch();
+ // set state to indicate the fact that we have sent a special
batch and input is empty
+ specialBatchSent = true;
+ firstBatchForDataSet = true; // reset on the next iteration
+ // If outcome is NONE then we send the special batch in the first
iteration and the NONE
+ // outcome in the next iteration. If outcome is EMIT, we can send
the special
+ // batch and the EMIT outcome at the same time.
+ return getFinalOutcome();
+ }
+ // else fall thru
+ case OK:
+ break;
+ default:
+ throw new IllegalStateException(String.format("unknown outcome %s",
lastKnownOutcome));
+ }
+ } else {
+ if ( lastKnownOutcome != NONE && firstBatchForDataSet &&
!aggregator.isDone()) {
+ lastKnownOutcome = incoming.next();
+ if (!first && firstBatchForDataSet) {
Review comment:
Isn't `firstBatchForDataSet` guaranteed true here (due to the enclosing if()
) ?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services