This is an automated email from the ASF dual-hosted git repository. parthc pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
commit a77fd142d86dd5648cda8866b8ff3af39c7b6b11 Author: Parth Chandra <par...@apache.org> AuthorDate: Mon Jun 18 21:34:20 2018 -0700 DRILL-6516: EMIT support in streaming agg This closes #1358 --- .../physical/impl/aggregate/StreamingAggBatch.java | 274 ++++++--- .../impl/aggregate/StreamingAggTemplate.java | 425 +++++++++----- .../impl/aggregate/StreamingAggregator.java | 30 +- .../impl/agg/TestStreamingAggEmitOutcome.java | 614 +++++++++++++++++++++ .../impl/lateraljoin/TestE2EUnnestAndLateral.java | 63 +++ 5 files changed, 1198 insertions(+), 208 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java index caeed50..882c36d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java @@ -67,15 +67,49 @@ import com.sun.codemodel.JExpr; import com.sun.codemodel.JVar; import org.apache.drill.exec.vector.complex.writer.BaseWriter; +import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT; +import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE; +import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK; +import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA; + public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StreamingAggBatch.class); private StreamingAggregator aggregator; private final RecordBatch incoming; private List<BaseWriter.ComplexWriter> complexWriters; - private boolean done = false; - private boolean first = true; - private int recordCount = 0; + // + // Streaming agg can be in (a) a normal pipeline or (b) it may be in a pipeline that is part of a subquery involving + // lateral and unnest. In case(a), the aggregator proceeds normally until it sees a group change or a NONE. If a + // group has changed, the aggregated data is sent downstream and the aggregation continues with the next group. If + // a NONE is seen, the aggregator completes, sends data downstream and cleans up. + // In case (b), the aggregator behaves similar to case(a) if a group change or NONE is observed. However it will + // also encounter a new state EMIT, every time unnest processes a new row. In this case the aggregator must complete the + // aggregation, send out the results, AND reset to receive more data. To make the treatment of these two cases + // similar, we define the aggregation operation in terms of data sets. + // Data Set = The set of data that the aggregator is currently aggregating. In a normal query, the entire data is + // a single data set. In the case of a Lateral subquery, every row processed by unnest is a data set. There can, + // therefore, be one or more data sets in an aggregation. + // Data Sets may have multiple batches and may contain one or more empty batches. A data set may consist entirely + // of empty batches. + // Schema may change across Data Sets. + // A corner case is the case of a Data Set having many empty batches in the beginning. Such a data set may see a + // schema change once the first non-empty batch is received. + // Schema change within a Data Set is not supported. + // + // We will define some states for internal management + // + private boolean done = false; // END of all data + private boolean first = true; // Beginning of new data set. True during the build schema phase. False once the first + // call to inner next is made. + private boolean sendEmit = false; // In the case where we see an OK_NEW_SCHEMA along with the end of a data set + // we send out a batch with OK_NEW_SCHEMA first, then in the next iteration, + // we send out an emopty batch with EMIT. + private IterOutcome lastKnownOutcome = OK; // keep track of the outcome from the previous call to incoming.next + private boolean firstBatchForSchema = true; // true if the current batch came in with an OK_NEW_SCHEMA + private boolean firstBatchForDataSet = true; // true if the current batch is the first batch in a data set + private int recordCount = 0; // number of records output in the current data set + private BatchSchema incomingSchema; /* @@ -154,83 +188,174 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { public IterOutcome innerNext() { // if a special batch has been sent, we have no data in the incoming so exit early - if (specialBatchSent) { - return IterOutcome.NONE; + if ( done || specialBatchSent) { + return NONE; + } + + // We sent an OK_NEW_SCHEMA and also encountered the end of a data set. So we need to send + // an EMIT with an empty batch now + if (sendEmit) { + sendEmit = false; + firstBatchForDataSet = true; + recordCount = 0; + return EMIT; } // this is only called on the first batch. Beyond this, the aggregator manages batches. if (aggregator == null || first) { - IterOutcome outcome; if (first && incoming.getRecordCount() > 0) { first = false; - outcome = IterOutcome.OK_NEW_SCHEMA; + lastKnownOutcome = OK_NEW_SCHEMA; } else { - outcome = next(incoming); + lastKnownOutcome = next(incoming); } - logger.debug("Next outcome of {}", outcome); - switch (outcome) { - case NONE: - if (first && popConfig.getKeys().size() == 0) { + logger.debug("Next outcome of {}", lastKnownOutcome); + switch (lastKnownOutcome) { + case NONE: + if (firstBatchForDataSet && popConfig.getKeys().size() == 0) { + // if we have a straight aggregate and empty input batch, we need to handle it in a different way + constructSpecialBatch(); + // set state to indicate the fact that we have sent a special batch and input is empty + specialBatchSent = true; + // If outcome is NONE then we send the special batch in the first iteration and the NONE + // outcome in the next iteration. If outcome is EMIT, we can send the special + // batch and the EMIT outcome at the same time. + return getFinalOutcome(); + } + // else fall thru + case OUT_OF_MEMORY: + case NOT_YET: + case STOP: + return lastKnownOutcome; + case OK_NEW_SCHEMA: + if (!createAggregator()) { + done = true; + return IterOutcome.STOP; + } + break; + case EMIT: + // if we get an EMIT with an empty batch as the first (and therefore only) batch + // we have to do the special handling + if (firstBatchForDataSet && popConfig.getKeys().size() == 0 && incoming.getRecordCount() == 0) { + constructSpecialBatch(); + // set state to indicate the fact that we have sent a special batch and input is empty + specialBatchSent = true; + firstBatchForDataSet = true; // reset on the next iteration + // If outcome is NONE then we send the special batch in the first iteration and the NONE + // outcome in the next iteration. If outcome is EMIT, we can send the special + // batch and the EMIT outcome at the same time. + return getFinalOutcome(); + } + // else fall thru + case OK: + break; + default: + throw new IllegalStateException(String.format("unknown outcome %s", lastKnownOutcome)); + } + } else { + if ( lastKnownOutcome != NONE && firstBatchForDataSet && !aggregator.isDone()) { + lastKnownOutcome = incoming.next(); + if (!first ) { + //Setup needs to be called again. During setup, generated code saves a reference to the vectors + // pointed to by the incoming batch so that the dereferencing of the vector wrappers to get to + // the vectors does not have to be done at each call to eval. However, after an EMIT is seen, + // the vectors are replaced and the reference to the old vectors is no longer valid + try { + aggregator.setup(oContext, incoming, this); + } catch (SchemaChangeException e) { + UserException.Builder exceptionBuilder = UserException.functionError(e) + .message("A Schema change exception occured in calling setup() in generated code."); + throw exceptionBuilder.build(logger); + } + } + } + // We sent an EMIT in the previous iteration, so we must be starting a new data set + if (firstBatchForDataSet) { + done = false; + sendEmit = false; + specialBatchSent = false; + firstBatchForDataSet = false; + } + } + AggOutcome aggOutcome = aggregator.doWork(lastKnownOutcome); + recordCount = aggregator.getOutputCount(); + container.setRecordCount(recordCount); + logger.debug("Aggregator response {}, records {}", aggOutcome, aggregator.getOutputCount()); + // overwrite the outcome variable since we no longer need to remember the first batch outcome + lastKnownOutcome = aggregator.getOutcome(); + switch (aggOutcome) { + case CLEANUP_AND_RETURN: + if (!first) { + container.zeroVectors(); + } + done = true; + ExternalSortBatch.releaseBatches(incoming); + return lastKnownOutcome; + case RETURN_AND_RESET: + //WE could have got a string of batches, all empty, until we hit an emit + if (firstBatchForDataSet && popConfig.getKeys().size() == 0 && recordCount == 0) { // if we have a straight aggregate and empty input batch, we need to handle it in a different way constructSpecialBatch(); - first = false; // set state to indicate the fact that we have sent a special batch and input is empty specialBatchSent = true; - return IterOutcome.OK; + // If outcome is NONE then we send the special batch in the first iteration and the NONE + // outcome in the next iteration. If outcome is EMIT, we can send the special + // batch and the EMIT outcome at the same time. + return getFinalOutcome(); } - case OUT_OF_MEMORY: - case NOT_YET: - case STOP: - return outcome; - case OK_NEW_SCHEMA: - if (!createAggregator()) { - done = true; + firstBatchForDataSet = true; + if(first) { + first = false; + } + if(lastKnownOutcome == OK_NEW_SCHEMA) { + sendEmit = true; + } + // Release external sort batches after EMIT is seen + ExternalSortBatch.releaseBatches(incoming); + return lastKnownOutcome; + case RETURN_OUTCOME: + // In case of complex writer expression, vectors would be added to batch run-time. + // We have to re-build the schema. + if (complexWriters != null) { + container.buildSchema(SelectionVectorMode.NONE); + } + if (lastKnownOutcome == IterOutcome.NONE ) { + // we will set the 'done' flag in the next call to innerNext and use the lastKnownOutcome + // to determine whether we should set the flag or not. + // This is so that if someone calls getRecordCount in between calls to innerNext, we will + // return the correct record count (if the done flag is set, we will return 0). + if (first) { + first = false; + return OK_NEW_SCHEMA; + } else { + return OK; + } + } else if (lastKnownOutcome == OK && first) { + lastKnownOutcome = OK_NEW_SCHEMA; + } else if (lastKnownOutcome != IterOutcome.OUT_OF_MEMORY) { + first = false; + } + return lastKnownOutcome; + case UPDATE_AGGREGATOR: + // We could get this either between data sets or within a data set. + // If the former, we can handle the change and so need to update the aggregator and + // continue. If the latter, we cannot (currently) handle the schema change, so throw + // and exception + // This case is not tested since there are no unit tests for this and there is no support + // from the sort operator for this case + if (lastKnownOutcome == EMIT) { + createAggregator(); + return OK_NEW_SCHEMA; + } else { + context.getExecutorState().fail(UserException.unsupportedError().message(SchemaChangeException + .schemaChanged("Streaming aggregate does not support schema changes", incomingSchema, + incoming.getSchema()).getMessage()).build(logger)); + close(); + killIncoming(false); return IterOutcome.STOP; } - break; - case OK: - break; default: - throw new IllegalStateException(String.format("unknown outcome %s", outcome)); - } - } - AggOutcome out = aggregator.doWork(); - recordCount = aggregator.getOutputCount(); - logger.debug("Aggregator response {}, records {}", out, aggregator.getOutputCount()); - switch (out) { - case CLEANUP_AND_RETURN: - if (!first) { - container.zeroVectors(); - } - done = true; - ExternalSortBatch.releaseBatches(incoming); - // fall through - case RETURN_OUTCOME: - IterOutcome outcome = aggregator.getOutcome(); - // In case of complex writer expression, vectors would be added to batch run-time. - // We have to re-build the schema. - if (complexWriters != null) { - container.buildSchema(SelectionVectorMode.NONE); - } - if (outcome == IterOutcome.NONE && first) { - first = false; - done = true; - return IterOutcome.OK_NEW_SCHEMA; - } else if (outcome == IterOutcome.OK && first) { - outcome = IterOutcome.OK_NEW_SCHEMA; - } else if (outcome != IterOutcome.OUT_OF_MEMORY) { - first = false; - } - return outcome; - case UPDATE_AGGREGATOR: - context.getExecutorState().fail(UserException.unsupportedError() - .message(SchemaChangeException.schemaChanged("Streaming aggregate does not support schema changes", incomingSchema, incoming.getSchema()).getMessage()) - .build(logger)); - close(); - killIncoming(false); - return IterOutcome.STOP; - default: - throw new IllegalStateException(String.format("Unknown state %s.", out)); + throw new IllegalStateException(String.format("Unknown state %s.", aggOutcome)); } } @@ -309,7 +434,7 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { ClassGenerator<StreamingAggregator> cg = CodeGenerator.getRoot(StreamingAggTemplate.TEMPLATE_DEFINITION, context.getOptions()); cg.getCodeGenerator().plainJavaCapable(true); // Uncomment out this line to debug the generated code. - //cg.getCodeGenerator().saveCodeForDebugging(true); + // cg.getCodeGenerator().saveCodeForDebugging(true); container.clear(); LogicalExpression[] keyExprs = new LogicalExpression[popConfig.getKeys().size()]; @@ -496,6 +621,25 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { } } + private IterOutcome getFinalOutcome() { + IterOutcome outcomeToReturn; + + if (firstBatchForDataSet) { + firstBatchForDataSet = false; + } + if (firstBatchForSchema) { + outcomeToReturn = OK_NEW_SCHEMA; + firstBatchForSchema = false; + } else if (lastKnownOutcome == EMIT) { + firstBatchForDataSet = true; + outcomeToReturn = EMIT; + } else { + // get the outcome to return before calling refresh since that resets the lastKnowOutcome to OK + outcomeToReturn = (recordCount == 0) ? NONE : OK; + } + return outcomeToReturn; + } + @Override public void close() { super.close(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java index fb4d508..a752c7e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java @@ -25,26 +25,49 @@ import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.RecordBatch.IterOutcome; import org.apache.drill.exec.record.VectorWrapper; +import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT; +import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE; +import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK; +import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA; + public abstract class StreamingAggTemplate implements StreamingAggregator { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StreamingAggregator.class); private static final boolean EXTRA_DEBUG = false; private static final int OUTPUT_BATCH_SIZE = 32*1024; + // lastOutcome is set ONLY if the lastOutcome was NONE or STOP private IterOutcome lastOutcome = null; + + // First batch after build schema phase private boolean first = true; + private boolean firstBatchForSchema = true; // true if the current batch came in with an OK_NEW_SCHEMA. + private boolean firstBatchForDataSet = true; // true if the current batch is the first batch in a data set + private boolean newSchema = false; - private int previousIndex = -1; + + // End of all data + private boolean done = false; + + // index in the incoming (sv4/sv2/vector) private int underlyingIndex = 0; - private int currentIndex; + // The indexes below refer to the actual record indexes in input batch + // (i.e if a selection vector the sv4/sv2 entry has been dereferenced or if a vector then the record index itself) + private int previousIndex = -1; // the last index that has been processed. Initialized to -1 every time a new + // aggregate group begins (including every time a new data set begins) + private int currentIndex; // current index being processed /** * Number of records added to the current aggregation group. */ private long addedRecordCount = 0; + // There are two outcomes from the aggregator. One is the aggregator's outcome defined in + // StreamingAggregator.AggOutcome. The other is the outcome from the last call to incoming.next private IterOutcome outcome; + // Number of aggregation groups added into the output batch private int outputCount = 0; private RecordBatch incoming; + // the Streaming Agg Batch that this aggregator belongs to private StreamingAggBatch outgoing; - private boolean done = false; + private OperatorContext context; @@ -73,45 +96,67 @@ public abstract class StreamingAggTemplate implements StreamingAggregator { } @Override - public AggOutcome doWork() { - if (done) { + public AggOutcome doWork(IterOutcome outerOutcome) { + if (done || outerOutcome == NONE) { outcome = IterOutcome.NONE; return AggOutcome.CLEANUP_AND_RETURN; } - try { // outside loop to ensure that first is set to false after the first run. + + try { // outside block to ensure that first is set to false after the first run. outputCount = 0; // allocate outgoing since either this is the first time or if a subsequent time we would // have sent the previous outgoing batch to downstream operator allocateOutgoing(); - if (first) { + if (firstBatchForDataSet) { this.currentIndex = incoming.getRecordCount() == 0 ? 0 : this.getVectorIndex(underlyingIndex); - // consume empty batches until we get one with data. - if (incoming.getRecordCount() == 0) { - outer: while (true) { - IterOutcome out = outgoing.next(0, incoming); - switch (out) { - case OK_NEW_SCHEMA: - case OK: - if (incoming.getRecordCount() == 0) { - continue; - } else { - currentIndex = this.getVectorIndex(underlyingIndex); - break outer; - } - case OUT_OF_MEMORY: - outcome = out; - return AggOutcome.RETURN_OUTCOME; - case NONE: - out = IterOutcome.OK_NEW_SCHEMA; - case STOP: - default: - lastOutcome = out; - outcome = out; - done = true; - return AggOutcome.CLEANUP_AND_RETURN; - } + if (outerOutcome == OK_NEW_SCHEMA) { + firstBatchForSchema = true; + } + // consume empty batches until we get one with data (unless we got an EMIT). If we got an emit + // then this is the first batch, it was empty and we also got an emit. + if (incoming.getRecordCount() == 0 ) { + if (outerOutcome != EMIT) { + outer: + while (true) { + IterOutcome out = outgoing.next(0, incoming); + switch (out) { + case OK_NEW_SCHEMA: + //lastOutcome = out; + firstBatchForSchema = true; + case OK: + if (incoming.getRecordCount() == 0) { + continue; + } else { + currentIndex = this.getVectorIndex(underlyingIndex); + break outer; + } + case OUT_OF_MEMORY: + outcome = out; + return AggOutcome.RETURN_OUTCOME; + case EMIT: + if (incoming.getRecordCount() == 0) { + // When we see an EMIT we let the agg record batch know that it should either + // send out an EMIT or an OK_NEW_SCHEMA, followed by an EMIT. To do that we simply return + // RETURN_AND_RESET with the outcome so the record batch can take care of it. + return setOkAndReturnEmit(); + } else { + break outer; + } + + case NONE: + out = IterOutcome.OK_NEW_SCHEMA; + case STOP: + default: + lastOutcome = out; + outcome = out; + done = true; + return AggOutcome.CLEANUP_AND_RETURN; + } // switch (outcome) + } // while empty batches are seen + } else { + return setOkAndReturnEmit(); } } } @@ -121,49 +166,24 @@ public abstract class StreamingAggTemplate implements StreamingAggregator { return AggOutcome.UPDATE_AGGREGATOR; } - if (lastOutcome != null) { + // if the previous iteration has an outcome that was terminal, don't do anything. + if (lastOutcome != null /*&& lastOutcome != IterOutcome.OK_NEW_SCHEMA*/) { outcome = lastOutcome; return AggOutcome.CLEANUP_AND_RETURN; } outside: while(true) { - // loop through existing records, adding as necessary. - for (; underlyingIndex < incoming.getRecordCount(); incIndex()) { - if (EXTRA_DEBUG) { - logger.debug("Doing loop with values underlying {}, current {}", underlyingIndex, currentIndex); - } - if (previousIndex == -1) { - if (EXTRA_DEBUG) { - logger.debug("Adding the initial row's keys and values."); - } - addRecordInc(currentIndex); - } - else if (isSame( previousIndex, currentIndex )) { - if (EXTRA_DEBUG) { - logger.debug("Values were found the same, adding."); - } - addRecordInc(currentIndex); - } else { - if (EXTRA_DEBUG) { - logger.debug("Values were different, outputting previous batch."); - } - if(!outputToBatch(previousIndex)) { - // There is still space in outgoing container, so proceed to the next input. - if (EXTRA_DEBUG) { - logger.debug("Output successful."); - } - addRecordInc(currentIndex); - } else { - if (EXTRA_DEBUG) { - logger.debug("Output container has reached its capacity. Flushing it."); - } - - // Update the indices to set the state for processing next record in incoming batch in subsequent doWork calls. - previousIndex = -1; - return setOkAndReturn(); - } - } - previousIndex = currentIndex; + // loop through existing records, adding as necessary. + if(!processRemainingRecordsInBatch()) { + // output batch is full. Return. + return setOkAndReturn(); + } + // if the current batch came with an EMIT, we're done + if(outerOutcome == EMIT) { + // output the last record + outputToBatch(previousIndex); + resetIndex(); + return setOkAndReturnEmit(); } /** @@ -189,83 +209,122 @@ public abstract class StreamingAggTemplate implements StreamingAggregator { logger.debug("Received IterOutcome of {}", out); } switch (out) { - case NONE: - done = true; - lastOutcome = out; - if (first && addedRecordCount == 0) { - return setOkAndReturn(); - } else if (addedRecordCount > 0) { - outputToBatchPrev(previous, previousIndex, outputCount); // No need to check the return value - // (output container full or not) as we are not going to insert any more records. - if (EXTRA_DEBUG) { - logger.debug("Received no more batches, returning."); + case NONE: + done = true; + lastOutcome = out; + if (firstBatchForDataSet && addedRecordCount == 0) { + return setOkAndReturn(); + } else if (addedRecordCount > 0) { + outputToBatchPrev(previous, previousIndex, outputCount); // No need to check the return value + // (output container full or not) as we are not going to insert any more records. + if (EXTRA_DEBUG) { + logger.debug("Received no more batches, returning."); + } + return setOkAndReturn(); + } else { + // not first batch and record Count == 0 + outcome = out; + return AggOutcome.CLEANUP_AND_RETURN; } - return setOkAndReturn(); - } else { - if (first && out == IterOutcome.OK) { - out = IterOutcome.OK_NEW_SCHEMA; + // EMIT is handled like OK, except that we do not loop back to process the + // next incoming batch; we return instead + case EMIT: + if (incoming.getRecordCount() == 0) { + if (addedRecordCount > 0) { + outputToBatchPrev(previous, previousIndex, outputCount); + } + } else { + resetIndex(); + if (previousIndex != -1 && isSamePrev(previousIndex, previous, currentIndex)) { + if (EXTRA_DEBUG) { + logger.debug("New value was same as last value of previous batch, adding."); + } + addRecordInc(currentIndex); + previousIndex = currentIndex; + incIndex(); + if (EXTRA_DEBUG) { + logger.debug("Continuing outside"); + } + } else { // not the same + if (EXTRA_DEBUG) { + logger.debug("This is not the same as the previous, add record and continue outside."); + } + if (addedRecordCount > 0) { + if (outputToBatchPrev(previous, previousIndex, outputCount)) { + if (EXTRA_DEBUG) { + logger.debug("Output container is full. flushing it."); + } + return setOkAndReturnEmit(); + } + } + // important to set the previous index to -1 since we start a new group + previousIndex = -1; + } + processRemainingRecordsInBatch(); + outputToBatch(previousIndex); // currentIndex has been reset to int_max so use previous index. } - outcome = out; - return AggOutcome.CLEANUP_AND_RETURN; - } - - case NOT_YET: - this.outcome = out; - return AggOutcome.RETURN_OUTCOME; - - case OK_NEW_SCHEMA: - if (EXTRA_DEBUG) { - logger.debug("Received new schema. Batch has {} records.", incoming.getRecordCount()); - } - if (addedRecordCount > 0) { - outputToBatchPrev(previous, previousIndex, outputCount); // No need to check the return value - // (output container full or not) as we are not going to insert anymore records. + resetIndex(); + return setOkAndReturnEmit(); + + case NOT_YET: + this.outcome = out; + return AggOutcome.RETURN_OUTCOME; + + case OK_NEW_SCHEMA: + firstBatchForSchema = true; + //lastOutcome = out; if (EXTRA_DEBUG) { - logger.debug("Wrote out end of previous batch, returning."); + logger.debug("Received new schema. Batch has {} records.", incoming.getRecordCount()); } - newSchema = true; - return setOkAndReturn(); - } - cleanup(); - return AggOutcome.UPDATE_AGGREGATOR; - case OK: - resetIndex(); - if (incoming.getRecordCount() == 0) { - continue; - } else { - if (previousIndex != -1 && isSamePrev(previousIndex , previous, currentIndex)) { - if (EXTRA_DEBUG) { - logger.debug("New value was same as last value of previous batch, adding."); - } - addRecordInc(currentIndex); - previousIndex = currentIndex; - incIndex(); - if (EXTRA_DEBUG) { - logger.debug("Continuing outside"); - } - continue outside; - } else { // not the same + if (addedRecordCount > 0) { + outputToBatchPrev(previous, previousIndex, outputCount); // No need to check the return value + // (output container full or not) as we are not going to insert anymore records. if (EXTRA_DEBUG) { - logger.debug("This is not the same as the previous, add record and continue outside."); + logger.debug("Wrote out end of previous batch, returning."); } - if (addedRecordCount > 0) { - if (outputToBatchPrev(previous, previousIndex, outputCount)) { - if (EXTRA_DEBUG) { - logger.debug("Output container is full. flushing it."); + newSchema = true; + return setOkAndReturn(); + } + cleanup(); + return AggOutcome.UPDATE_AGGREGATOR; + case OK: + resetIndex(); + if (incoming.getRecordCount() == 0) { + continue; + } else { + if (previousIndex != -1 && isSamePrev(previousIndex, previous, currentIndex)) { + if (EXTRA_DEBUG) { + logger.debug("New value was same as last value of previous batch, adding."); + } + addRecordInc(currentIndex); + previousIndex = currentIndex; + incIndex(); + if (EXTRA_DEBUG) { + logger.debug("Continuing outside"); + } + continue outside; + } else { // not the same + if (EXTRA_DEBUG) { + logger.debug("This is not the same as the previous, add record and continue outside."); + } + if (addedRecordCount > 0) { + if (outputToBatchPrev(previous, previousIndex, outputCount)) { + if (EXTRA_DEBUG) { + logger.debug("Output container is full. flushing it."); + } + previousIndex = -1; + return setOkAndReturn(); } - previousIndex = -1; - return setOkAndReturn(); } + previousIndex = -1; + continue outside; } - previousIndex = -1; - continue outside; } - } - case STOP: - default: - lastOutcome = out; - outcome = out; - return AggOutcome.CLEANUP_AND_RETURN; + case STOP: + default: + lastOutcome = out; + outcome = out; + return AggOutcome.CLEANUP_AND_RETURN; } } } finally { @@ -277,12 +336,63 @@ public abstract class StreamingAggTemplate implements StreamingAggregator { } } finally { if (first) { - first = !first; + first = false; } } } + @Override + public boolean isDone() { + return done; + } + + /** + * Process the remaining records in the batch. Returns false if not all records are processed (if the output + * container gets full), true otherwise. + * @return Boolean indicating all records were processed + */ + private boolean processRemainingRecordsInBatch() { + for (; underlyingIndex < incoming.getRecordCount(); incIndex()) { + if (EXTRA_DEBUG) { + logger.debug("Doing loop with values underlying {}, current {}", underlyingIndex, currentIndex); + } + if (previousIndex == -1) { + if (EXTRA_DEBUG) { + logger.debug("Adding the initial row's keys and values."); + } + addRecordInc(currentIndex); + } + else if (isSame( previousIndex, currentIndex )) { + if (EXTRA_DEBUG) { + logger.debug("Values were found the same, adding."); + } + addRecordInc(currentIndex); + } else { + if (EXTRA_DEBUG) { + logger.debug("Values were different, outputting previous batch."); + } + if(!outputToBatch(previousIndex)) { + // There is still space in outgoing container, so proceed to the next input. + if (EXTRA_DEBUG) { + logger.debug("Output successful."); + } + addRecordInc(currentIndex); + } else { + if (EXTRA_DEBUG) { + logger.debug("Output container has reached its capacity. Flushing it."); + } + + // Update the indices to set the state for processing next record in incoming batch in subsequent doWork calls. + previousIndex = -1; + return false; + } + } + previousIndex = currentIndex; + } + return true; + } + private final void incIndex() { underlyingIndex++; if (underlyingIndex >= incoming.getRecordCount()) { @@ -297,18 +407,51 @@ public abstract class StreamingAggTemplate implements StreamingAggregator { incIndex(); } + /** + * Set the outcome to OK (or OK_NEW_SCHEMA) and return the AggOutcome parameter + * + * @return outcome + */ private final AggOutcome setOkAndReturn() { - if (first) { - this.outcome = IterOutcome.OK_NEW_SCHEMA; + IterOutcome outcomeToReturn; + firstBatchForDataSet = false; + if (firstBatchForSchema) { + outcomeToReturn = OK_NEW_SCHEMA; + firstBatchForSchema = false; } else { - this.outcome = IterOutcome.OK; + outcomeToReturn = OK; } + this.outcome = outcomeToReturn; + for (VectorWrapper<?> v : outgoing) { v.getValueVector().getMutator().setValueCount(outputCount); } return AggOutcome.RETURN_OUTCOME; } + /** + * setOkAndReturn (as above) if the iter outcome was EMIT + * + * @return outcome + */ + private final AggOutcome setOkAndReturnEmit() { + IterOutcome outcomeToReturn; + firstBatchForDataSet = true; + previousIndex = -1; + if (firstBatchForSchema) { + outcomeToReturn = OK_NEW_SCHEMA; + firstBatchForSchema = false; + } else { + outcomeToReturn = EMIT; + } + this.outcome = outcomeToReturn; + + for (VectorWrapper<?> v : outgoing) { + v.getValueVector().getMutator().setValueCount(outputCount); + } + return AggOutcome.RETURN_AND_RESET; + } + // Returns output container status after insertion of the given record. Caller must check the return value if it // plans to insert more records into outgoing container. private final boolean outputToBatch(int inIndex) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java index a300924..2a64b93 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java @@ -27,8 +27,30 @@ public interface StreamingAggregator { public static TemplateClassDefinition<StreamingAggregator> TEMPLATE_DEFINITION = new TemplateClassDefinition<StreamingAggregator>(StreamingAggregator.class, StreamingAggTemplate.class); + + /** + * The Aggregator can return one of the following outcomes: + * <p> + * <b>RETURN_OUTCOME:</b> The aggregation has seen a change in the group and should send data downstream. If + * complex writers are involved, then rebuild schema. + * <p> + * <b>CLEANUP_AND_RETURN:</b> End of all data. Return the data downstream, and cleanup. + * <p> + * <b>UPDATE_AGGREGATOR:</b> A schema change was encountered. The aggregator's generated code and (possibly) + * container need to be updated + * <p> + * <b>RETURN_AND_RESET:</b> If the aggregator encounters an EMIT, then that implies the end of a data set but + * not of all the data. Return the data (aggregated so far) downstream, reset the internal state variables and + * come back for the next data set. + * <p> + * @see org.apache.drill.exec.physical.impl.aggregate.HashAggregator.AggOutcome HashAggregator.AggOutcome + */ public static enum AggOutcome { - RETURN_OUTCOME, CLEANUP_AND_RETURN, UPDATE_AGGREGATOR; + RETURN_OUTCOME, + CLEANUP_AND_RETURN, + UPDATE_AGGREGATOR, + RETURN_AND_RESET + ; } public abstract void setup(OperatorContext context, RecordBatch incoming, StreamingAggBatch outgoing) throws SchemaChangeException; @@ -37,7 +59,11 @@ public interface StreamingAggregator { public abstract int getOutputCount(); - public abstract AggOutcome doWork(); + // do the work. Also pass in the Iteroutcome of the batch already read in case it might be an EMIT. If the + // outerOutcome is EMIT, we need to do the work without reading any more batches. + public abstract AggOutcome doWork(IterOutcome outerOutcome); + + public abstract boolean isDone(); public abstract void cleanup(); diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestStreamingAggEmitOutcome.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestStreamingAggEmitOutcome.java new file mode 100644 index 0000000..75c4598 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestStreamingAggEmitOutcome.java @@ -0,0 +1,614 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.agg; + +import org.apache.drill.categories.OperatorTest; +import org.apache.drill.common.types.TypeProtos; +import org.apache.drill.exec.physical.config.StreamingAggregate; +import org.apache.drill.exec.physical.impl.BaseTestOpBatchEmitOutcome; +import org.apache.drill.exec.physical.impl.MockRecordBatch; +import org.apache.drill.exec.physical.impl.aggregate.StreamingAggBatch; +import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.record.metadata.TupleMetadata; +import org.apache.drill.test.rowSet.DirectRowSet; +import org.apache.drill.test.rowSet.RowSet; +import org.apache.drill.test.rowSet.RowSetComparison; +import org.apache.drill.test.rowSet.schema.SchemaBuilder; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT; +import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +@Category(OperatorTest.class) +public class TestStreamingAggEmitOutcome extends BaseTestOpBatchEmitOutcome { + //private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestStreamingAggEmitOutcome.class); + protected static TupleMetadata resultSchema; + + @BeforeClass + public static void setUpBeforeClass2() throws Exception { + resultSchema = new SchemaBuilder() + .add("name", TypeProtos.MinorType.VARCHAR) + .addNullable("total_sum", TypeProtos.MinorType.BIGINT) + .buildSchema(); + } + + /** + * Verifies that if StreamingAggBatch receives empty batches with OK_NEW_SCHEMA and EMIT outcome then it correctly produces + * empty batches as output. First empty batch will be with OK_NEW_SCHEMA and second will be with EMIT outcome. + */ + @Test + public void t1_testStreamingAggrEmptyBatchEmitOutcome() { + inputContainer.add(emptyInputRowSet.container()); + inputContainer.add(emptyInputRowSet.container()); + inputContainer.add(emptyInputRowSet.container()); + + inputOutcomes.add(OK_NEW_SCHEMA); + inputOutcomes.add(OK_NEW_SCHEMA); + inputOutcomes.add(EMIT); + + final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext, + inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema()); + + final StreamingAggregate streamAggrConfig = new StreamingAggregate(null, + parseExprs("name_left", "name"), + parseExprs("sum(id_left+cost_left)", "total_sum"), + 1.0f); + + final StreamingAggBatch strAggBatch = new StreamingAggBatch(streamAggrConfig, mockInputBatch, + operatorFixture.getFragmentContext()); + + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA); + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA); + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT); + assertEquals(0, strAggBatch.getRecordCount()); + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE); + } + + /** + * Verifies that if StreamingAgg receives a RecordBatch with EMIT outcome post build schema phase then it produces + * output for those input batch correctly. The first output batch will always be returned with OK_NEW_SCHEMA + * outcome followed by EMIT with empty batch. The test verifies the output order with the expected baseline. + */ + @Test + public void t2_testStreamingAggrNonEmptyBatchEmitOutcome() { + final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema) + .addRow(13, 130, "item13") + .addRow(13, 130, "item13") + .addRow(2, 20, "item2") + .addRow(2, 20, "item2") + .addRow(4, 40, "item4") + .build(); + + final RowSet.SingleRowSet expectedRowSet = operatorFixture.rowSetBuilder(resultSchema) + .addRow("item1", (long)11) + .addRow("item13", (long)286) + .addRow("item2", (long)44) + .addRow("item4", (long)44) + .build(); + + inputContainer.add(emptyInputRowSet.container()); + inputContainer.add(nonEmptyInputRowSet.container()); + inputContainer.add(nonEmptyInputRowSet2.container()); + + inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + inputOutcomes.add(RecordBatch.IterOutcome.EMIT); + + final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext, + inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema()); + + final StreamingAggregate streamAggrConfig = new StreamingAggregate(null, + parseExprs("name_left", "name"), + parseExprs("sum(id_left+cost_left)", "total_sum"), + 1.0f); + + final StreamingAggBatch strAggBatch = new StreamingAggBatch(streamAggrConfig, mockInputBatch, + operatorFixture.getFragmentContext()); + + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA); + // Data before EMIT is returned with an OK_NEW_SCHEMA. + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA); + assertEquals(4, strAggBatch.getRecordCount()); + + RowSet actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer()); + new RowSetComparison(expectedRowSet).verify(actualRowSet); + + // EMIT comes with an empty batch + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT); + + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE); + + // Release memory for row sets + nonEmptyInputRowSet2.clear(); + expectedRowSet.clear(); + } + + @Test + public void t3_testStreamingAggrEmptyBatchFollowedByNonEmptyBatchEmitOutcome() { + final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema) + .addRow(13, 130, "item13") + .addRow(0, 1300, "item13") + .addRow(2, 20, "item2") + .addRow(0, 2000, "item2") + .addRow(4, 40, "item4") + .addRow(0, 4000, "item4") + .build(); + + final RowSet.SingleRowSet expectedRowSet = operatorFixture.rowSetBuilder(resultSchema) + .addRow("item13", (long)1443) + .addRow("item2", (long)2022) + .addRow("item4", (long)4044) + .build(); + + inputContainer.add(emptyInputRowSet.container()); + inputContainer.add(emptyInputRowSet.container()); + inputContainer.add(emptyInputRowSet.container()); + inputContainer.add(nonEmptyInputRowSet2.container()); + + inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + inputOutcomes.add(RecordBatch.IterOutcome.EMIT); + inputOutcomes.add(RecordBatch.IterOutcome.EMIT); + + final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext, + inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema()); + + final StreamingAggregate streamAggrConfig = new StreamingAggregate(null, + parseExprs("name_left", "name"), + parseExprs("sum(id_left+cost_left)", "total_sum"), + 1.0f); + + final StreamingAggBatch strAggBatch = new StreamingAggBatch(streamAggrConfig, mockInputBatch, + operatorFixture.getFragmentContext()); + + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA); + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA); + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT); + assertEquals(0, strAggBatch.getRecordCount()); + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT); + assertEquals(3, strAggBatch.getRecordCount()); + + RowSet actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer()); + new RowSetComparison(expectedRowSet).verify(actualRowSet); + + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE); + + // Release memory for row sets + nonEmptyInputRowSet2.clear(); + expectedRowSet.clear(); + } + + @Test + public void t4_testStreamingAggrMultipleEmptyBatchFollowedByNonEmptyBatchEmitOutcome() { + final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema) + .addRow(13, 130, "item13") + .addRow(0, 0, "item13") + .addRow(1, 33000, "item13") + .addRow(2, 20, "item2") + .addRow(0, 0, "item2") + .addRow(1, 11000, "item2") + .addRow(4, 40, "item4") + .build(); + + final RowSet.SingleRowSet expectedRowSet = operatorFixture.rowSetBuilder(resultSchema) + .addRow("item13", (long)33144) + .addRow("item2", (long)11023) + .addRow("item4", (long)44) + .build(); + + inputContainer.add(emptyInputRowSet.container()); + inputContainer.add(emptyInputRowSet.container()); + inputContainer.add(emptyInputRowSet.container()); + inputContainer.add(emptyInputRowSet.container()); + inputContainer.add(emptyInputRowSet.container()); + inputContainer.add(nonEmptyInputRowSet2.container()); + + inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + inputOutcomes.add(RecordBatch.IterOutcome.EMIT); + inputOutcomes.add(RecordBatch.IterOutcome.EMIT); + inputOutcomes.add(RecordBatch.IterOutcome.EMIT); + inputOutcomes.add(RecordBatch.IterOutcome.EMIT); + + final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext, + inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema()); + + final StreamingAggregate streamAggrConfig = new StreamingAggregate(null, + parseExprs("name_left", "name"), + parseExprs("sum(id_left+cost_left)", "total_sum"), + 1.0f); + + final StreamingAggBatch strAggBatch = new StreamingAggBatch(streamAggrConfig, mockInputBatch, + operatorFixture.getFragmentContext()); + + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA); + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA); + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT); + assertEquals(0, strAggBatch.getRecordCount()); + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT); + assertEquals(0, strAggBatch.getRecordCount()); + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT); + assertEquals(0, strAggBatch.getRecordCount()); + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT); + assertEquals(3, strAggBatch.getRecordCount()); + + RowSet actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer()); + new RowSetComparison(expectedRowSet).verify(actualRowSet); + + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE); + + // Release memory for row sets + nonEmptyInputRowSet2.clear(); + expectedRowSet.clear(); + } + + /** + * Verifies that if StreamingAggr receives multiple non-empty record batch with EMIT outcome in between then it produces + * output for those input batch correctly. In this case it receives first non-empty batch with OK_NEW_SCHEMA in + * buildSchema phase followed by an empty batch with EMIT outcome. For this combination it produces output for the + * record received so far along with EMIT outcome. Then it receives second non-empty batch with OK outcome and + * produces output for it differently. The test validates that for each output received the order of the records are + * correct. + * @throws Exception + */ + @Test + public void t5_testStreamingAgrResetsAfterFirstEmitOutcome() { + final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema) + .addRow(2, 20, "item2") + .addRow(2, 20, "item2") + .addRow(3, 30, "item3") + .addRow(3, 30, "item3") + .addRow(3, 30, "item3") + .addRow(3, 30, "item3") + .addRow(3, 30, "item3") + .addRow(3, 30, "item3") + .addRow(3, 30, "item3") + .addRow(3, 30, "item3") + .addRow(3, 30, "item3") + .addRow(3, 30, "item3") + .build(); + + final RowSet.SingleRowSet expectedRowSet1 = operatorFixture.rowSetBuilder(resultSchema) + .addRow("item1", (long)11) + .build(); + + final RowSet.SingleRowSet expectedRowSet2 = operatorFixture.rowSetBuilder(resultSchema) + .addRow("item2", (long)44) + .addRow("item3", (long)330) + .build(); + + inputContainer.add(emptyInputRowSet.container()); + inputContainer.add(nonEmptyInputRowSet.container()); + inputContainer.add(emptyInputRowSet.container()); + inputContainer.add(nonEmptyInputRowSet2.container()); + inputContainer.add(emptyInputRowSet.container()); + + inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + inputOutcomes.add(RecordBatch.IterOutcome.EMIT); + inputOutcomes.add(RecordBatch.IterOutcome.OK); + inputOutcomes.add(RecordBatch.IterOutcome.EMIT); + + final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext, + inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema()); + + final StreamingAggregate streamAggrConfig = new StreamingAggregate(null, + parseExprs("name_left", "name"), + parseExprs("sum(id_left+cost_left)", "total_sum"), + 1.0f); + + final StreamingAggBatch strAggBatch = new StreamingAggBatch(streamAggrConfig, mockInputBatch, + operatorFixture.getFragmentContext()); + + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA); + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA); + assertEquals(1, strAggBatch.getRecordCount()); + + RowSet actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer()); + new RowSetComparison(expectedRowSet1).verify(actualRowSet); + + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT); + assertEquals(0, strAggBatch.getRecordCount()); + + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT); + assertEquals(2, strAggBatch.getRecordCount()); + + actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer()); + new RowSetComparison(expectedRowSet2).verify(actualRowSet); + + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE); + + // Release memory for row sets + nonEmptyInputRowSet2.clear(); + expectedRowSet2.clear(); + expectedRowSet1.clear(); + } + + /** + * Verifies that if StreamingAggr receives multiple non-empty record batch with EMIT outcome in between then it produces + * output for those input batch correctly. In this case it receives first non-empty batch with OK_NEW_SCHEMA in + * buildSchema phase followed by an empty batch with EMIT outcome. For this combination it produces output for the + * record received so far along with EMIT outcome. Then it receives second non-empty batch with OK outcome and + * produces output for it differently. The test validates that for each output received the order of the records are + * correct. + * @throws Exception + */ + @Test + public void t6_testStreamingAggrOkFollowedByNone() { + final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema) + .addRow(2, 20, "item2") + .addRow(3, 30, "item3") + .addRow(4, 40, "item4") + .addRow(4, 40, "item4") + .addRow(5, 50, "item5") + .addRow(5, 50, "item5") + .build(); + + final RowSet.SingleRowSet expectedRowSet1 = operatorFixture.rowSetBuilder(resultSchema) + .addRow("item1", (long)11) + .build(); + + final RowSet.SingleRowSet expectedRowSet2 = operatorFixture.rowSetBuilder(resultSchema) + .addRow("item2", (long)22) + .addRow("item3", (long)33) + .addRow("item4", (long)88) + .addRow("item5", (long)110) + .build(); + + inputContainer.add(emptyInputRowSet.container()); + inputContainer.add(nonEmptyInputRowSet.container()); + inputContainer.add(emptyInputRowSet.container()); + inputContainer.add(nonEmptyInputRowSet2.container()); + + inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + inputOutcomes.add(RecordBatch.IterOutcome.EMIT); + inputOutcomes.add(RecordBatch.IterOutcome.OK); + + final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext, + inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema()); + + final StreamingAggregate streamAggrConfig = new StreamingAggregate(null, + parseExprs("name_left", "name"), + parseExprs("sum(id_left+cost_left)", "total_sum"), + 1.0f); + + final StreamingAggBatch strAggBatch = new StreamingAggBatch(streamAggrConfig, mockInputBatch, + operatorFixture.getFragmentContext()); + + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA); + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA); + assertEquals(1, strAggBatch.getRecordCount()); + + RowSet actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer()); + new RowSetComparison(expectedRowSet1).verify(actualRowSet); + + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT); + assertEquals(0, strAggBatch.getRecordCount()); + + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK); + assertEquals(4, strAggBatch.getRecordCount()); + + actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer()); + new RowSetComparison(expectedRowSet2).verify(actualRowSet); + + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE); + + // Release memory for row sets + nonEmptyInputRowSet2.clear(); + expectedRowSet2.clear(); + expectedRowSet1.clear(); + } + + /** + * Normal case + */ + @Test + public void t7_testStreamingAggrMultipleEMITOutcome() { + final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema) + .addRow(2, 20, "item2") + .addRow(3, 30, "item3") + .build(); + + inputContainer.add(emptyInputRowSet.container()); + inputContainer.add(nonEmptyInputRowSet.container()); + inputContainer.add(emptyInputRowSet.container()); + inputContainer.add(nonEmptyInputRowSet2.container()); + inputContainer.add(emptyInputRowSet.container()); + + inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + inputOutcomes.add(RecordBatch.IterOutcome.EMIT); + inputOutcomes.add(RecordBatch.IterOutcome.EMIT); + inputOutcomes.add(RecordBatch.IterOutcome.EMIT); + + final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext, + inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema()); + + final StreamingAggregate streamAggrConfig = new StreamingAggregate(null, + parseExprs("name_left", "name"), + parseExprs("sum(id_left+cost_left)", "total_sum"), + 1.0f); + + final StreamingAggBatch strAggBatch = new StreamingAggBatch(streamAggrConfig, mockInputBatch, + operatorFixture.getFragmentContext()); + + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA); + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA); + assertEquals(1, strAggBatch.getRecordCount()); + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT); + assertEquals(0, strAggBatch.getRecordCount()); + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT); + assertEquals(2, strAggBatch.getRecordCount()); + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT); + assertEquals(0, strAggBatch.getRecordCount()); + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE); + + nonEmptyInputRowSet2.clear(); + } + + /** + * + */ + @Test + public void t8_testStreamingAggrMultipleInputToSingleOutputBatch() { + + final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema) + .addRow(2, 20, "item2") + .build(); + + final RowSet.SingleRowSet expectedRowSet = operatorFixture.rowSetBuilder(resultSchema) + .addRow("item1", (long)11) + .addRow("item2", (long)22) + .build(); + + inputContainer.add(emptyInputRowSet.container()); + inputContainer.add(nonEmptyInputRowSet.container()); + inputContainer.add(emptyInputRowSet.container()); + inputContainer.add(nonEmptyInputRowSet2.container()); + inputContainer.add(emptyInputRowSet.container()); + + inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + inputOutcomes.add(RecordBatch.IterOutcome.OK); + inputOutcomes.add(RecordBatch.IterOutcome.OK); + inputOutcomes.add(RecordBatch.IterOutcome.EMIT); + + final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext, + inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema()); + + final StreamingAggregate streamAggrConfig = new StreamingAggregate(null, + parseExprs("name_left", "name"), + parseExprs("sum(id_left+cost_left)", "total_sum"), + 1.0f); + + final StreamingAggBatch strAggBatch = new StreamingAggBatch(streamAggrConfig, mockInputBatch, + operatorFixture.getFragmentContext()); + + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA); + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA); + assertEquals(2, strAggBatch.getRecordCount()); + + RowSet actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer()); + new RowSetComparison(expectedRowSet).verify(actualRowSet); + + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT); + assertEquals(0, strAggBatch.getRecordCount()); + + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE); + + nonEmptyInputRowSet2.clear(); + } + + + /***************************************************************************************** + Tests for validating regular StreamingAggr behavior with no EMIT outcome + ******************************************************************************************/ + @Test + public void t9_testStreamingAgr_WithEmptyNonEmptyBatchesAndOKOutcome() { + final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema) + .addRow(2, 20, "item1") + .addRow(13, 130, "item13") + .addRow(13, 130, "item13") + .addRow(13, 130, "item13") + .addRow(130, 1300, "item130") + .addRow(0, 0, "item130") + .build(); + + final RowSet.SingleRowSet nonEmptyInputRowSet3 = operatorFixture.rowSetBuilder(inputSchema) + .addRow(23, 230, "item23") + .addRow(3, 33, "item3") + .addRow(7, 70, "item7") + .addRow(17, 170, "item7") + .build(); + + final RowSet.SingleRowSet expectedRowSet = operatorFixture.rowSetBuilder(resultSchema) + .addRow("item1", (long)33) + .addRow("item13", (long)429) + .addRow("item130", (long)1430) + .addRow("item23", (long)253) + .addRow("item3", (long)36) + .addRow("item7", (long)264) + .build(); + + inputContainer.add(emptyInputRowSet.container()); + inputContainer.add(nonEmptyInputRowSet.container()); + inputContainer.add(emptyInputRowSet.container()); + inputContainer.add(nonEmptyInputRowSet2.container()); + inputContainer.add(emptyInputRowSet.container()); + inputContainer.add(nonEmptyInputRowSet3.container()); + inputContainer.add(emptyInputRowSet.container()); + + inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + inputOutcomes.add(RecordBatch.IterOutcome.OK); + inputOutcomes.add(RecordBatch.IterOutcome.OK); + inputOutcomes.add(RecordBatch.IterOutcome.OK); + inputOutcomes.add(RecordBatch.IterOutcome.OK); + inputOutcomes.add(RecordBatch.IterOutcome.OK); + + final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext, + inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema()); + + final StreamingAggregate streamAggrConfig = new StreamingAggregate(null, + parseExprs("name_left", "name"), + parseExprs("sum(id_left+cost_left)", "total_sum"), + 1.0f); + + final StreamingAggBatch strAggBatch = new StreamingAggBatch(streamAggrConfig, mockInputBatch, + operatorFixture.getFragmentContext()); + + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA); + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA); + assertEquals(6, strAggBatch.getRecordCount()); + + RowSet actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer()); + new RowSetComparison(expectedRowSet).verify(actualRowSet); + + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE); + + nonEmptyInputRowSet2.clear(); + nonEmptyInputRowSet3.clear(); + expectedRowSet.clear(); + } + + @Test + public void t10_testStreamingAggrWithEmptyDataSet() { + inputContainer.add(emptyInputRowSet.container()); + + inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + + final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext, + inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema()); + + final StreamingAggregate streamAggrConfig = new StreamingAggregate(null, + parseExprs("name_left", "name"), + parseExprs("sum(id_left+cost_left)", "total_sum"), + 1.0f); + + final StreamingAggBatch strAggBatch = new StreamingAggBatch(streamAggrConfig, mockInputBatch, + operatorFixture.getFragmentContext()); + + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA); + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE); + } +} diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java index c57093c..17a9d33 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java @@ -449,6 +449,7 @@ public class TestE2EUnnestAndLateral extends ClusterTest { + "dfs.`lateraljoin/multipleFiles/` customer) t1, LATERAL (SELECT CAST(MAX(t.ord.o_totalprice)" + " AS int) AS maxprice FROM UNNEST(t1.c_orders) t(ord) GROUP BY t.ord.o_orderstatus) t2"; + try { testBuilder() .optionSettingQueriesForTestQuery("alter session set `%s` = false", PlannerSettings.STREAMAGG.getOptionName()) @@ -462,6 +463,9 @@ public class TestE2EUnnestAndLateral extends ClusterTest { .baselineValues(235695) .baselineValues(177819) .build().run(); + } finally { + test("alter session set `" + PlannerSettings.STREAMAGG.getOptionName() + "` = true"); + } } @Test @@ -469,6 +473,7 @@ public class TestE2EUnnestAndLateral extends ClusterTest { String sql = "SELECT key, t3.dsls FROM cp.`lateraljoin/with_nulls.json` t LEFT OUTER " + "JOIN LATERAL (SELECT DISTINCT t2.sls AS dsls FROM UNNEST(t.sales) t2(sls)) t3 ON TRUE"; + try { testBuilder() .optionSettingQueriesForTestQuery("alter session set `%s` = false", PlannerSettings.STREAMAGG.getOptionName()) @@ -484,5 +489,63 @@ public class TestE2EUnnestAndLateral extends ClusterTest { .baselineValues("dd",111L) .baselineValues("dd",222L) .build().run(); + } finally { + test("alter session set `" + PlannerSettings.STREAMAGG.getOptionName() + "` = true"); + } + } + + @Test + public void testMultipleBatchesLateral_WithStreamingAgg() throws Exception { + String sql = "SELECT t2.maxprice FROM (SELECT customer.c_orders AS c_orders FROM " + + "dfs.`lateraljoin/multipleFiles/` customer) t1, LATERAL (SELECT CAST(MAX(t.ord.o_totalprice)" + + " AS int) AS maxprice FROM UNNEST(t1.c_orders) t(ord) GROUP BY t.ord.o_orderstatus) t2"; + + testBuilder() + .sqlQuery(sql) + .unOrdered() + .baselineColumns("maxprice") + .baselineValues(367190) + .baselineValues(316347) + .baselineValues(146610) + .baselineValues(306996) + .baselineValues(235695) + .baselineValues(177819) + .build().run(); } + + @Test + public void testLateral_StreamingAgg_with_nulls() throws Exception { + String sql = "SELECT key, t3.dsls FROM cp.`lateraljoin/with_nulls.json` t LEFT OUTER " + + "JOIN LATERAL (SELECT DISTINCT t2.sls AS dsls FROM UNNEST(t.sales) t2(sls)) t3 ON TRUE"; + + testBuilder() + .sqlQuery(sql) + .unOrdered() + .baselineColumns("key","dsls") + .baselineValues("aa",null) + .baselineValues("bb",100L) + .baselineValues("bb",200L) + .baselineValues("bb",300L) + .baselineValues("bb",400L) + .baselineValues("cc",null) + .baselineValues("dd",111L) + .baselineValues("dd",222L) + .build().run(); + } + + @Test + public void testMultipleBatchesLateral_WithStreamingAggNoGroup() throws Exception { + String sql = "SELECT t2.maxprice FROM (SELECT customer.c_orders AS c_orders FROM " + + "dfs.`lateraljoin/multipleFiles/` customer) t1, LATERAL (SELECT CAST(MAX(t.ord.o_totalprice)" + + " AS int) AS maxprice FROM UNNEST(t1.c_orders) t(ord) ) t2"; + + testBuilder() + .sqlQuery(sql) + .unOrdered() + .baselineColumns("maxprice") + .baselineValues(367190) + .baselineValues(306996) + .build().run(); + } + }