[ https://issues.apache.org/jira/browse/DRILL-6766?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16645516#comment-16645516 ]
ASF GitHub Bot commented on DRILL-6766: --------------------------------------- sohami closed pull request #1490: DRILL-6766: Lateral Unnest query : IllegalStateException - rowId in right batch of lateral is smaller than rowId in left batch being processed URL: https://github.com/apache/drill/pull/1490 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java index 2b9b31783f5..ffcfa78a707 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.List; +import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting; import org.apache.drill.shaded.guava.com.google.common.collect.Lists; import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.exceptions.UserException; @@ -71,6 +72,7 @@ import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE; import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK; import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA; +import static org.apache.drill.exec.record.RecordBatch.IterOutcome.STOP; public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StreamingAggBatch.class); @@ -104,7 +106,7 @@ // call to inner next is made. private boolean sendEmit = false; // In the case where we see an OK_NEW_SCHEMA along with the end of a data set // we send out a batch with OK_NEW_SCHEMA first, then in the next iteration, - // we send out an emopty batch with EMIT. + // we send out an empty batch with EMIT. private IterOutcome lastKnownOutcome = OK; // keep track of the outcome from the previous call to incoming.next private boolean firstBatchForSchema = true; // true if the current batch came in with an OK_NEW_SCHEMA private boolean firstBatchForDataSet = true; // true if the current batch is the first batch in a data set @@ -127,7 +129,11 @@ private boolean specialBatchSent = false; private static final int SPECIAL_BATCH_COUNT = 1; - public StreamingAggBatch(StreamingAggregate popConfig, RecordBatch incoming, FragmentContext context) throws OutOfMemoryException { + // TODO: Needs to adapt to batch sizing rather than hardcoded constant value + private int maxOutputRowCount = ValueVector.MAX_ROW_COUNT; + + public StreamingAggBatch(StreamingAggregate popConfig, RecordBatch incoming, FragmentContext context) + throws OutOfMemoryException { super(popConfig, context); this.incoming = incoming; @@ -189,7 +195,7 @@ public IterOutcome innerNext() { // if a special batch has been sent, we have no data in the incoming so exit early if (done || specialBatchSent) { - assert (sendEmit != true); // if special batch sent with emit then flag will not be set + assert (!sendEmit); // if special batch sent with emit then flag will not be set return NONE; } @@ -199,6 +205,7 @@ public IterOutcome innerNext() { first = false; // first is set only in the case when we see a NONE after an empty first (and only) batch sendEmit = false; firstBatchForDataSet = true; + firstBatchForSchema = false; recordCount = 0; specialBatchSent = false; return EMIT; @@ -239,18 +246,17 @@ public IterOutcome innerNext() { done = true; return IterOutcome.STOP; } + firstBatchForSchema = true; break; case EMIT: // if we get an EMIT with an empty batch as the first (and therefore only) batch // we have to do the special handling if (firstBatchForDataSet && popConfig.getKeys().size() == 0 && incoming.getRecordCount() == 0) { constructSpecialBatch(); - firstBatchForDataSet = true; // reset on the next iteration // If outcome is NONE then we send the special batch in the first iteration and the NONE // outcome in the next iteration. If outcome is EMIT, we can send the special // batch and the EMIT outcome at the same time. (unless the finalOutcome is OK_NEW_SCHEMA) - IterOutcome finalOutcome = getFinalOutcome(); - return finalOutcome; + return getFinalOutcome(); } // else fall thru case OK: @@ -259,15 +265,18 @@ public IterOutcome innerNext() { throw new IllegalStateException(String.format("unknown outcome %s", lastKnownOutcome)); } } else { - if ( lastKnownOutcome != NONE && firstBatchForDataSet && !aggregator.isDone()) { + // If this is not the first batch and previous batch is fully processed with no error condition or NONE is not + // seen then it will call next() on upstream to get new batch. Otherwise just process the previous incoming batch + if ( lastKnownOutcome != NONE && firstBatchForDataSet && !aggregator.isDone() + && aggregator.previousBatchProcessed()) { lastKnownOutcome = incoming.next(); if (!first ) { //Setup needs to be called again. During setup, generated code saves a reference to the vectors - // pointed to by the incoming batch so that the dereferencing of the vector wrappers to get to + // pointed to by the incoming batch so that the de-referencing of the vector wrappers to get to // the vectors does not have to be done at each call to eval. However, after an EMIT is seen, // the vectors are replaced and the reference to the old vectors is no longer valid try { - aggregator.setup(oContext, incoming, this); + aggregator.setup(oContext, incoming, this, maxOutputRowCount); } catch (SchemaChangeException e) { UserException.Builder exceptionBuilder = UserException.functionError(e) .message("A Schema change exception occured in calling setup() in generated code."); @@ -280,8 +289,10 @@ public IterOutcome innerNext() { recordCount = aggregator.getOutputCount(); container.setRecordCount(recordCount); logger.debug("Aggregator response {}, records {}", aggOutcome, aggregator.getOutputCount()); - // overwrite the outcome variable since we no longer need to remember the first batch outcome - lastKnownOutcome = aggregator.getOutcome(); + // get the returned IterOutcome from aggregator and based on AggOutcome and returned IterOutcome update the + // lastKnownOutcome below. For example: if AggOutcome is RETURN_AND_RESET then lastKnownOutcome is always set to + // EMIT + IterOutcome returnOutcome = aggregator.getOutcome(); switch (aggOutcome) { case CLEANUP_AND_RETURN: if (!first) { @@ -289,7 +300,7 @@ public IterOutcome innerNext() { } done = true; ExternalSortBatch.releaseBatches(incoming); - return lastKnownOutcome; + return returnOutcome; case RETURN_AND_RESET: //WE could have got a string of batches, all empty, until we hit an emit if (firstBatchForDataSet && popConfig.getKeys().size() == 0 && recordCount == 0) { @@ -298,28 +309,32 @@ public IterOutcome innerNext() { // If outcome is NONE then we send the special batch in the first iteration and the NONE // outcome in the next iteration. If outcome is EMIT, we can send the special // batch and the EMIT outcome at the same time. - - IterOutcome finalOutcome = getFinalOutcome(); - return finalOutcome; + return getFinalOutcome(); } firstBatchForDataSet = true; firstBatchForSchema = false; if(first) { first = false; } - if(lastKnownOutcome == OK_NEW_SCHEMA) { - sendEmit = true; + // Since AggOutcome is RETURN_AND_RESET and returned IterOutcome is OK_NEW_SCHEMA from Aggregator that means it + // has seen first batch with OK_NEW_SCHEMA and then last batch with EMIT outcome. In that case if all the input + // batch is processed to produce output batch it need to send and empty batch with EMIT outcome in subsequent + // next call. + if(returnOutcome == OK_NEW_SCHEMA) { + sendEmit = (aggregator == null) || aggregator.previousBatchProcessed(); } // Release external sort batches after EMIT is seen ExternalSortBatch.releaseBatches(incoming); - return lastKnownOutcome; + lastKnownOutcome = EMIT; + return returnOutcome; case RETURN_OUTCOME: // In case of complex writer expression, vectors would be added to batch run-time. // We have to re-build the schema. if (complexWriters != null) { container.buildSchema(SelectionVectorMode.NONE); } - if (lastKnownOutcome == IterOutcome.NONE ) { + if (returnOutcome == IterOutcome.NONE ) { + lastKnownOutcome = NONE; // we will set the 'done' flag in the next call to innerNext and use the lastKnownOutcome // to determine whether we should set the flag or not. // This is so that if someone calls getRecordCount in between calls to innerNext, we will @@ -330,11 +345,12 @@ public IterOutcome innerNext() { } else { return OK; } - } else if (lastKnownOutcome == OK && first) { + } else if (returnOutcome == OK && first) { lastKnownOutcome = OK_NEW_SCHEMA; + returnOutcome = OK_NEW_SCHEMA; } first = false; - return lastKnownOutcome; + return returnOutcome; case UPDATE_AGGREGATOR: // We could get this either between data sets or within a data set. // If the former, we can handle the change and so need to update the aggregator and @@ -342,8 +358,9 @@ public IterOutcome innerNext() { // and exception // This case is not tested since there are no unit tests for this and there is no support // from the sort operator for this case - if (lastKnownOutcome == EMIT) { + if (returnOutcome == EMIT) { createAggregator(); + lastKnownOutcome = EMIT; return OK_NEW_SCHEMA; } else { context.getExecutorState().fail(UserException.unsupportedError().message(SchemaChangeException @@ -351,6 +368,7 @@ public IterOutcome innerNext() { incoming.getSchema()).getMessage()).build(logger)); close(); killIncoming(false); + lastKnownOutcome = STOP; return IterOutcome.STOP; } default: @@ -433,7 +451,7 @@ private StreamingAggregator createAggregatorInternal() throws SchemaChangeExcept ClassGenerator<StreamingAggregator> cg = CodeGenerator.getRoot(StreamingAggTemplate.TEMPLATE_DEFINITION, context.getOptions()); cg.getCodeGenerator().plainJavaCapable(true); // Uncomment out this line to debug the generated code. - // cg.getCodeGenerator().saveCodeForDebugging(true); + //cg.getCodeGenerator().saveCodeForDebugging(true); container.clear(); LogicalExpression[] keyExprs = new LogicalExpression[popConfig.getKeys().size()]; @@ -506,7 +524,7 @@ private StreamingAggregator createAggregatorInternal() throws SchemaChangeExcept container.buildSchema(SelectionVectorMode.NONE); StreamingAggregator agg = context.getImplementationClass(cg); - agg.setup(oContext, incoming, this); + agg.setup(oContext, incoming, this, maxOutputRowCount); allocateComplexWriters(); return agg; } @@ -651,7 +669,11 @@ protected void killIncoming(boolean sendUpstream) { @Override public void dump() { - logger.error("StreamingAggBatch[container={}, popConfig={}, aggregator={}, incomingSchema={}]", - container, popConfig, aggregator, incomingSchema); + logger.error("StreamingAggBatch[container={}, popConfig={}, aggregator={}, incomingSchema={}]", container, popConfig, aggregator, incomingSchema); + } + + @VisibleForTesting + public void setMaxOutputRowCount(int maxOutputRowCount) { + this.maxOutputRowCount = maxOutputRowCount; } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java index 4bde7ab1708..cc89f23b55d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java @@ -24,6 +24,7 @@ import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.RecordBatch.IterOutcome; import org.apache.drill.exec.record.VectorWrapper; +import org.apache.drill.exec.vector.ValueVector; import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT; import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE; @@ -33,7 +34,7 @@ public abstract class StreamingAggTemplate implements StreamingAggregator { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StreamingAggregator.class); private static final boolean EXTRA_DEBUG = false; - private static final int OUTPUT_BATCH_SIZE = 32*1024; + private int maxOutputRows = ValueVector.MAX_ROW_COUNT; // lastOutcome is set ONLY if the lastOutcome was NONE or STOP private IterOutcome lastOutcome = null; @@ -54,7 +55,7 @@ // (i.e if a selection vector the sv4/sv2 entry has been dereferenced or if a vector then the record index itself) private int previousIndex = -1; // the last index that has been processed. Initialized to -1 every time a new // aggregate group begins (including every time a new data set begins) - private int currentIndex; // current index being processed + private int currentIndex = Integer.MAX_VALUE; // current index being processed /** * Number of records added to the current aggregation group. */ @@ -72,10 +73,12 @@ @Override - public void setup(OperatorContext context, RecordBatch incoming, StreamingAggBatch outgoing) throws SchemaChangeException { + public void setup(OperatorContext context, RecordBatch incoming, + StreamingAggBatch outgoing, int outputRowCount) throws SchemaChangeException { this.context = context; this.incoming = incoming; this.outgoing = outgoing; + this.maxOutputRows = outputRowCount; setupInterior(incoming, outgoing); } @@ -109,7 +112,7 @@ public AggOutcome doWork(IterOutcome outerOutcome) { allocateOutgoing(); if (firstBatchForDataSet) { - this.currentIndex = incoming.getRecordCount() == 0 ? 0 : this.getVectorIndex(underlyingIndex); + this.currentIndex = incoming.getRecordCount() == 0 ? Integer.MAX_VALUE : this.getVectorIndex(underlyingIndex); if (outerOutcome == OK_NEW_SCHEMA) { firstBatchForSchema = true; @@ -178,9 +181,10 @@ public AggOutcome doWork(IterOutcome outerOutcome) { // loop through existing records, adding as necessary. if(!processRemainingRecordsInBatch()) { // output batch is full. Return. - return setOkAndReturn(); + return setOkAndReturn(outerOutcome); } - // if the current batch came with an EMIT, we're done + // if the current batch came with an EMIT, we're done since if we are here it means output batch consumed all + // the rows in incoming batch if(outerOutcome == EMIT) { // output the last record outputToBatch(previousIndex); @@ -215,14 +219,14 @@ public AggOutcome doWork(IterOutcome outerOutcome) { done = true; lastOutcome = out; if (firstBatchForDataSet && addedRecordCount == 0) { - return setOkAndReturn(); + return setOkAndReturn(NONE); } else if (addedRecordCount > 0) { outputToBatchPrev(previous, previousIndex, outputCount); // No need to check the return value // (output container full or not) as we are not going to insert any more records. if (EXTRA_DEBUG) { logger.debug("Received no more batches, returning."); } - return setOkAndReturn(); + return setOkAndReturn(NONE); } else { // not first batch and record Count == 0 outcome = out; @@ -237,6 +241,7 @@ public AggOutcome doWork(IterOutcome outerOutcome) { } } else { resetIndex(); + currentIndex = this.getVectorIndex(underlyingIndex); if (previousIndex != -1 && isSamePrev(previousIndex, previous, currentIndex)) { if (EXTRA_DEBUG) { logger.debug("New value was same as last value of previous batch, adding."); @@ -256,13 +261,16 @@ public AggOutcome doWork(IterOutcome outerOutcome) { if (EXTRA_DEBUG) { logger.debug("Output container is full. flushing it."); } - return setOkAndReturnEmit(); + return setOkAndReturn(EMIT); } } // important to set the previous index to -1 since we start a new group previousIndex = -1; } - processRemainingRecordsInBatch(); + if (!processRemainingRecordsInBatch()) { + // output batch is full. Return. + return setOkAndReturn(EMIT); + } outputToBatch(previousIndex); // currentIndex has been reset to int_max so use previous index. } resetIndex(); @@ -285,7 +293,7 @@ public AggOutcome doWork(IterOutcome outerOutcome) { logger.debug("Wrote out end of previous batch, returning."); } newSchema = true; - return setOkAndReturn(); + return setOkAndReturn(OK_NEW_SCHEMA); } cleanup(); return AggOutcome.UPDATE_AGGREGATOR; @@ -294,6 +302,7 @@ public AggOutcome doWork(IterOutcome outerOutcome) { if (incoming.getRecordCount() == 0) { continue; } else { + currentIndex = this.getVectorIndex(underlyingIndex); if (previousIndex != -1 && isSamePrev(previousIndex, previous, currentIndex)) { if (EXTRA_DEBUG) { logger.debug("New value was same as last value of previous batch, adding."); @@ -315,7 +324,7 @@ public AggOutcome doWork(IterOutcome outerOutcome) { logger.debug("Output container is full. flushing it."); } previousIndex = -1; - return setOkAndReturn(); + return setOkAndReturn(OK); } } previousIndex = -1; @@ -405,8 +414,8 @@ private final void incIndex() { } private final void resetIndex() { - underlyingIndex = -1; - incIndex(); + underlyingIndex = 0; + currentIndex = Integer.MAX_VALUE; } /** @@ -414,7 +423,7 @@ private final void resetIndex() { * * @return outcome */ - private final AggOutcome setOkAndReturn() { + private final AggOutcome setOkAndReturn(IterOutcome seenOutcome) { IterOutcome outcomeToReturn; firstBatchForDataSet = false; if (firstBatchForSchema) { @@ -428,7 +437,7 @@ private final AggOutcome setOkAndReturn() { for (VectorWrapper<?> v : outgoing) { v.getValueVector().getMutator().setValueCount(outputCount); } - return AggOutcome.RETURN_OUTCOME; + return (seenOutcome == EMIT) ? AggOutcome.RETURN_AND_RESET : AggOutcome.RETURN_OUTCOME; } /** @@ -457,7 +466,7 @@ private final AggOutcome setOkAndReturnEmit() { // Returns output container status after insertion of the given record. Caller must check the return value if it // plans to insert more records into outgoing container. private final boolean outputToBatch(int inIndex) { - assert outputCount < OUTPUT_BATCH_SIZE: + assert outputCount < maxOutputRows : "Outgoing RecordBatch is not flushed. It reached its max capacity in the last update"; outputRecordKeys(inIndex, outputCount); @@ -470,14 +479,13 @@ private final boolean outputToBatch(int inIndex) { resetValues(); outputCount++; addedRecordCount = 0; - - return outputCount == OUTPUT_BATCH_SIZE; + return outputCount == maxOutputRows; } // Returns output container status after insertion of the given record. Caller must check the return value if it // plans to inserts more record into outgoing container. private final boolean outputToBatchPrev(InternalBatch b1, int inIndex, int outIndex) { - assert outputCount < OUTPUT_BATCH_SIZE: + assert outputCount < maxOutputRows : "Outgoing RecordBatch is not flushed. It reached its max capacity in the last update"; outputRecordKeysPrev(b1, inIndex, outIndex); @@ -485,8 +493,7 @@ private final boolean outputToBatchPrev(InternalBatch b1, int inIndex, int outIn resetValues(); outputCount++; addedRecordCount = 0; - - return outputCount == OUTPUT_BATCH_SIZE; + return outputCount == maxOutputRows; } private void addRecordInc(int index) { @@ -508,6 +515,11 @@ public String toString() { + "]"; } + @Override + public boolean previousBatchProcessed() { + return (currentIndex == Integer.MAX_VALUE); + } + public abstract void setupInterior(@Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing) throws SchemaChangeException; public abstract boolean isSame(@Named("index1") int index1, @Named("index2") int index2); public abstract boolean isSamePrev(@Named("b1Index") int b1Index, @Named("b1") InternalBatch b1, @Named("b2Index") int b2Index); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java index 23fdcc1d24c..57caa9f058e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java @@ -25,7 +25,8 @@ public interface StreamingAggregator { - public static TemplateClassDefinition<StreamingAggregator> TEMPLATE_DEFINITION = new TemplateClassDefinition<StreamingAggregator>(StreamingAggregator.class, StreamingAggTemplate.class); + TemplateClassDefinition<StreamingAggregator> TEMPLATE_DEFINITION = + new TemplateClassDefinition<StreamingAggregator>(StreamingAggregator.class, StreamingAggTemplate.class); /** @@ -45,25 +46,27 @@ * <p> * @see org.apache.drill.exec.physical.impl.aggregate.HashAggregator.AggOutcome HashAggregator.AggOutcome */ - public static enum AggOutcome { + enum AggOutcome { RETURN_OUTCOME, CLEANUP_AND_RETURN, UPDATE_AGGREGATOR, RETURN_AND_RESET; } - public abstract void setup(OperatorContext context, RecordBatch incoming, StreamingAggBatch outgoing) throws SchemaChangeException; + void setup(OperatorContext context, RecordBatch incoming, StreamingAggBatch outgoing, int outputRowCount) + throws SchemaChangeException; - public abstract IterOutcome getOutcome(); + IterOutcome getOutcome(); - public abstract int getOutputCount(); + int getOutputCount(); // do the work. Also pass in the Iteroutcome of the batch already read in case it might be an EMIT. If the // outerOutcome is EMIT, we need to do the work without reading any more batches. - public abstract AggOutcome doWork(IterOutcome outerOutcome); + AggOutcome doWork(IterOutcome outerOutcome); - public abstract boolean isDone(); + boolean isDone(); - public abstract void cleanup(); + void cleanup(); + boolean previousBatchProcessed(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java index 242687f040d..735f11f365b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java @@ -389,7 +389,7 @@ protected void killIncoming(boolean sendUpstream) { private boolean handleSchemaChange() { try { stats.startSetup(); - logger.debug("Setting up new schema based on incoming batch. Old output schema: %s", container.getSchema()); + logger.debug("Setting up new schema based on incoming batch. Old output schema: {}", container.getSchema()); setupNewSchema(); return true; } catch (SchemaChangeException ex) { @@ -805,7 +805,7 @@ private BatchSchema batchSchemaWithNoExcludedCols(BatchSchema originSchema, bool */ private void setupNewSchema() throws SchemaChangeException { - logger.debug("Setting up new schema based on incoming batch. New left schema: %s and New right schema: %s", + logger.debug("Setting up new schema based on incoming batch. New left schema: {} and New right schema: {}", left.getSchema(), right.getSchema()); // Clear up the container diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestImpl.java index 508999f333b..a9c95983460 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestImpl.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestImpl.java @@ -118,6 +118,7 @@ public final int unnestRecords(final int recordCount) { Preconditions.checkArgument(svMode == NONE, "Unnest does not support selection vector inputs."); final int initialInnerValueIndex = runningInnerValueIndex; + int nonEmptyArray = 0; outer: { @@ -126,8 +127,12 @@ public final int unnestRecords(final int recordCount) { for (; valueIndex < valueCount; valueIndex++) { final int innerValueCount = accessor.getInnerValueCountAt(valueIndex); - logger.debug("Unnest: currentRecord: {}, innerValueCount: {}, record count: {}, output limit: {}", - innerValueCount, recordCount, outputLimit); + logger.trace("Unnest: CurrentRowId: {}, innerValueCount: {}, outputIndex: {}, output limit: {}", + valueIndex, innerValueCount, outputIndex, outputLimit); + + if (innerValueCount > 0) { + ++nonEmptyArray; + } for (; innerValueIndex < innerValueCount; innerValueIndex++) { // If we've hit the batch size limit, stop and flush what we've got so far. @@ -148,6 +153,9 @@ public final int unnestRecords(final int recordCount) { } // forevery value in the array } // for every incoming record final int delta = runningInnerValueIndex - initialInnerValueIndex; + logger.debug("Unnest: Finished processing current batch. [Details: LastProcessedRowIndex: {}, " + + "RowsWithNonEmptyArrays: {}, outputIndex: {}, outputLimit: {}, TotalIncomingRecords: {}]", + valueIndex, nonEmptyArray, delta, outputLimit, accessor.getValueCount()); final SchemaChangeCallBack callBack = new SchemaChangeCallBack(); for (TransferPair t : transfers) { t.splitAndTransfer(initialInnerValueIndex, delta); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java index 362ea29924b..eb6112dfc1b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java @@ -135,13 +135,15 @@ public final IterOutcome next(final int inputIndex, final RecordBatch b){ return next; } - switch(next) { + boolean isNewSchema = false; + logger.debug("Received next batch for index: {} with outcome: {}", inputIndex, next); + switch (next) { case OK_NEW_SCHEMA: - stats.batchReceived(inputIndex, b.getRecordCount(), true); - break; + isNewSchema = true; case OK: case EMIT: - stats.batchReceived(inputIndex, b.getRecordCount(), false); + stats.batchReceived(inputIndex, b.getRecordCount(), isNewSchema); + logger.debug("Number of records in received batch: {}", b.getRecordCount()); break; default: break; diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestStreamingAggEmitOutcome.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestStreamingAggEmitOutcome.java index cead984cc5c..37a44eaa3e7 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestStreamingAggEmitOutcome.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestStreamingAggEmitOutcome.java @@ -527,6 +527,232 @@ public void t8_testStreamingAggrMultipleInputToSingleOutputBatch() { nonEmptyInputRowSet2.clear(); } + /** + * Verifies scenario where multiple incoming batches received with OK_NEW_SCHEMA, OK, OK, EMIT whose output is split + * into multiple output batches is handled correctly such that first output is produced with OK_NEW_SCHEMA and then + * followed by EMIT outcome + */ + @Test + public void t8_1_testStreamingAggr_InputSplitToMultipleOutputBatch() { + + final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema) + .addRow(1, 20, "item1") + .build(); + final RowSet.SingleRowSet nonEmptyInputRowSet3 = operatorFixture.rowSetBuilder(inputSchema) + .addRow(2, 30, "item2") + .build(); + + final RowSet.SingleRowSet nonEmptyInputRowSet4 = operatorFixture.rowSetBuilder(inputSchema) + .addRow(2, 40, "item2") + .addRow(2, 50, "item2") + .addRow(2, 60, "item2") + .addRow(2, 70, "item2") + .addRow(3, 100, "item3") + .addRow(3, 200, "item3") + .addRow(3, 300, "item3") + .addRow(3, 400, "item3") + .build(); + + TupleMetadata resultSchema2 = new SchemaBuilder() + .add("name", TypeProtos.MinorType.VARCHAR) + .add("id", TypeProtos.MinorType.INT) + .add("total_count", TypeProtos.MinorType.BIGINT) + .buildSchema(); + + final RowSet.SingleRowSet expectedRowSet1 = operatorFixture.rowSetBuilder(resultSchema2) + .addRow("item1", 1, (long)2) + .addRow("item2", 2, (long)5) + .build(); + + final RowSet.SingleRowSet expectedRowSet2 = operatorFixture.rowSetBuilder(resultSchema2) + .addRow("item3", 3, (long)4) + .build(); + + inputContainer.add(emptyInputRowSet.container()); + inputContainer.add(nonEmptyInputRowSet.container()); + inputContainer.add(nonEmptyInputRowSet2.container()); + inputContainer.add(nonEmptyInputRowSet3.container()); + inputContainer.add(nonEmptyInputRowSet4.container()); + + inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + inputOutcomes.add(RecordBatch.IterOutcome.OK); + inputOutcomes.add(RecordBatch.IterOutcome.OK); + inputOutcomes.add(RecordBatch.IterOutcome.EMIT); + + final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext, + inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema()); + + final StreamingAggregate streamAggrConfig = new StreamingAggregate(null, + parseExprs("name_left", "name", "id_left", "id"), + parseExprs("count(cost_left)", "total_count"), + 1.0f); + + final StreamingAggBatch strAggBatch = new StreamingAggBatch(streamAggrConfig, mockInputBatch, + operatorFixture.getFragmentContext()); + strAggBatch.setMaxOutputRowCount(2); + + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA); + // Expect OK_NEW_SCHEMA first for all the input batch from second batch onwards since output batch is full after + // producing 2 groups as output + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA); + assertEquals(2, strAggBatch.getRecordCount()); + + RowSet actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer()); + new RowSetComparison(expectedRowSet1).verify(actualRowSet); + + // The last group was produced in different output batch with EMIT outcome + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT); + assertEquals(1, strAggBatch.getRecordCount()); + actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer()); + new RowSetComparison(expectedRowSet2).verify(actualRowSet); + + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE); + + nonEmptyInputRowSet2.clear(); + nonEmptyInputRowSet3.clear(); + nonEmptyInputRowSet4.clear(); + + expectedRowSet1.clear(); + expectedRowSet2.clear(); + } + + /** + * Verifies scenario where multiple incoming batches received with OK_NEW_SCHEMA, OK, OK, EMIT whose output is split + * into multiple output batches and incoming batches received with OK,OK,EMIT whose output is also split across + * multiple output batches is handled correctly. + */ + @Test + public void t8_2_testStreamingAggr_Inputs_OK_EMIT_SplitToMultipleOutputBatch() { + + final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema) + .addRow(1, 20, "item1") + .build(); + final RowSet.SingleRowSet nonEmptyInputRowSet3 = operatorFixture.rowSetBuilder(inputSchema) + .addRow(2, 30, "item2") + .build(); + + final RowSet.SingleRowSet nonEmptyInputRowSet4 = operatorFixture.rowSetBuilder(inputSchema) + .addRow(2, 40, "item2") + .addRow(2, 50, "item2") + .addRow(2, 60, "item2") + .addRow(2, 70, "item2") + .addRow(3, 100, "item3") + .addRow(3, 200, "item3") + .addRow(3, 300, "item3") + .addRow(3, 400, "item3") + .build(); + + final RowSet.SingleRowSet nonEmptyInputRowSet5 = operatorFixture.rowSetBuilder(inputSchema) + .addRow(2, 40, "item2") + .build(); + + final RowSet.SingleRowSet nonEmptyInputRowSet6 = operatorFixture.rowSetBuilder(inputSchema) + .addRow(2, 50, "item2") + .build(); + + final RowSet.SingleRowSet nonEmptyInputRowSet7 = operatorFixture.rowSetBuilder(inputSchema) + .addRow(3, 130, "item3") + .addRow(3, 130, "item3") + .addRow(4, 140, "item4") + .addRow(4, 140, "item4") + .build(); + + TupleMetadata resultSchema2 = new SchemaBuilder() + .add("name", TypeProtos.MinorType.VARCHAR) + .add("id", TypeProtos.MinorType.INT) + .add("total_count", TypeProtos.MinorType.BIGINT) + .buildSchema(); + + final RowSet.SingleRowSet expectedRowSet1 = operatorFixture.rowSetBuilder(resultSchema2) + .addRow("item1", 1, (long)2) + .addRow("item2", 2, (long)5) + .build(); + + final RowSet.SingleRowSet expectedRowSet2 = operatorFixture.rowSetBuilder(resultSchema2) + .addRow("item3", 3, (long)4) + .build(); + + final RowSet.SingleRowSet expectedRowSet3 = operatorFixture.rowSetBuilder(resultSchema2) + .addRow("item2", 2, (long)2) + .addRow("item3", 3, (long)2) + .build(); + + final RowSet.SingleRowSet expectedRowSet4 = operatorFixture.rowSetBuilder(resultSchema2) + .addRow("item4", 4, (long)2) + .build(); + + inputContainer.add(emptyInputRowSet.container()); + inputContainer.add(nonEmptyInputRowSet.container()); + inputContainer.add(nonEmptyInputRowSet2.container()); + inputContainer.add(nonEmptyInputRowSet3.container()); + inputContainer.add(nonEmptyInputRowSet4.container()); + inputContainer.add(nonEmptyInputRowSet5.container()); + inputContainer.add(nonEmptyInputRowSet6.container()); + inputContainer.add(nonEmptyInputRowSet7.container()); + + inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + inputOutcomes.add(RecordBatch.IterOutcome.OK); + inputOutcomes.add(RecordBatch.IterOutcome.OK); + inputOutcomes.add(RecordBatch.IterOutcome.EMIT); + inputOutcomes.add(RecordBatch.IterOutcome.OK); + inputOutcomes.add(RecordBatch.IterOutcome.OK); + inputOutcomes.add(RecordBatch.IterOutcome.EMIT); + + final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext, + inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema()); + + final StreamingAggregate streamAggrConfig = new StreamingAggregate(null, + parseExprs("name_left", "name", "id_left", "id"), + parseExprs("count(cost_left)", "total_count"), + 1.0f); + + final StreamingAggBatch strAggBatch = new StreamingAggBatch(streamAggrConfig, mockInputBatch, + operatorFixture.getFragmentContext()); + strAggBatch.setMaxOutputRowCount(2); + + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA); + + // Output batches for input batch 2 to 5 + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA); + assertEquals(2, strAggBatch.getRecordCount()); + + RowSet actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer()); + new RowSetComparison(expectedRowSet1).verify(actualRowSet); + + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT); + assertEquals(1, strAggBatch.getRecordCount()); + actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer()); + new RowSetComparison(expectedRowSet2).verify(actualRowSet); + + // Output batches for input batch 6 to 8 + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK); + // output batch is full after producing 2 rows + assertEquals(2, strAggBatch.getRecordCount()); + actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer()); + new RowSetComparison(expectedRowSet3).verify(actualRowSet); + + // output batch with pending rows + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT); + assertEquals(1, strAggBatch.getRecordCount()); + actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer()); + new RowSetComparison(expectedRowSet4).verify(actualRowSet); + + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE); + + nonEmptyInputRowSet2.clear(); + nonEmptyInputRowSet3.clear(); + nonEmptyInputRowSet4.clear(); + nonEmptyInputRowSet5.clear(); + nonEmptyInputRowSet6.clear(); + nonEmptyInputRowSet7.clear(); + + expectedRowSet1.clear(); + expectedRowSet2.clear(); + expectedRowSet3.clear(); + expectedRowSet4.clear(); + } /***************************************************************************************** Tests for validating regular StreamingAggr behavior with no EMIT outcome @@ -620,6 +846,88 @@ public void t10_testStreamingAggrWithEmptyDataSet() { assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE); } + @Test + public void t10_1_testStreamingAggr_InputSplitToMultipleOutputBatch() { + + final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema) + .addRow(1, 20, "item1") + .build(); + final RowSet.SingleRowSet nonEmptyInputRowSet3 = operatorFixture.rowSetBuilder(inputSchema) + .addRow(2, 30, "item2") + .build(); + + final RowSet.SingleRowSet nonEmptyInputRowSet4 = operatorFixture.rowSetBuilder(inputSchema) + .addRow(2, 40, "item2") + .addRow(2, 50, "item2") + .addRow(2, 60, "item2") + .addRow(2, 70, "item2") + .addRow(3, 100, "item3") + .addRow(3, 200, "item3") + .addRow(3, 300, "item3") + .addRow(3, 400, "item3") + .build(); + + TupleMetadata resultSchema2 = new SchemaBuilder() + .add("name", TypeProtos.MinorType.VARCHAR) + .add("id", TypeProtos.MinorType.INT) + .add("total_count", TypeProtos.MinorType.BIGINT) + .buildSchema(); + + final RowSet.SingleRowSet expectedRowSet1 = operatorFixture.rowSetBuilder(resultSchema2) + .addRow("item1", 1, (long)2) + .addRow("item2", 2, (long)5) + .build(); + + final RowSet.SingleRowSet expectedRowSet2 = operatorFixture.rowSetBuilder(resultSchema2) + .addRow("item3", 3, (long)4) + .build(); + + inputContainer.add(emptyInputRowSet.container()); + inputContainer.add(nonEmptyInputRowSet.container()); + inputContainer.add(nonEmptyInputRowSet2.container()); + inputContainer.add(nonEmptyInputRowSet3.container()); + inputContainer.add(nonEmptyInputRowSet4.container()); + + inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + inputOutcomes.add(RecordBatch.IterOutcome.OK); + inputOutcomes.add(RecordBatch.IterOutcome.OK); + inputOutcomes.add(RecordBatch.IterOutcome.OK); + + final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext, + inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema()); + + final StreamingAggregate streamAggrConfig = new StreamingAggregate(null, + parseExprs("name_left", "name", "id_left", "id"), + parseExprs("count(cost_left)", "total_count"), + 1.0f); + + final StreamingAggBatch strAggBatch = new StreamingAggBatch(streamAggrConfig, mockInputBatch, + operatorFixture.getFragmentContext()); + strAggBatch.setMaxOutputRowCount(2); + + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA); + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA); + assertEquals(2, strAggBatch.getRecordCount()); + + RowSet actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer()); + new RowSetComparison(expectedRowSet1).verify(actualRowSet); + + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK); + assertEquals(1, strAggBatch.getRecordCount()); + actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer()); + new RowSetComparison(expectedRowSet2).verify(actualRowSet); + + assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE); + + nonEmptyInputRowSet2.clear(); + nonEmptyInputRowSet3.clear(); + nonEmptyInputRowSet4.clear(); + + expectedRowSet1.clear(); + expectedRowSet2.clear(); + } + /******************************************************* * Tests for EMIT with empty batches and no group by * (Tests t1-t8 are repeated with no group by) @@ -813,14 +1121,15 @@ public void t14_testStreamingAggrMultipleEmptyBatchFollowedByNonEmptyBatchEmitOu assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA); assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA); + assertEquals(1, strAggBatch.getRecordCount()); // special batch assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT); assertEquals(0, strAggBatch.getRecordCount()); assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT); - assertEquals(1, strAggBatch.getRecordCount()); + assertEquals(1, strAggBatch.getRecordCount()); // special batch assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT); - assertEquals(1, strAggBatch.getRecordCount()); + assertEquals(1, strAggBatch.getRecordCount()); // special batch assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT); - assertEquals(1, strAggBatch.getRecordCount()); + assertEquals(1, strAggBatch.getRecordCount()); // data batch RowSet actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer()); new RowSetComparison(expectedRowSet).verify(actualRowSet); ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Lateral Unnest query : IllegalStateException - rowId in right batch of > lateral is smaller than rowId in left batch being processed > ----------------------------------------------------------------------------------------------------------------------------------- > > Key: DRILL-6766 > URL: https://issues.apache.org/jira/browse/DRILL-6766 > Project: Apache Drill > Issue Type: Bug > Components: Execution - Relational Operators > Affects Versions: 1.14.0 > Reporter: Kedar Sankar Behera > Assignee: Sorabh Hamirwasia > Priority: Major > Labels: ready-to-commit > Fix For: 1.15.0 > > > The error is coming when one batch of streaming agg is split across multiple > output batches. In that case the first output batch is sent downstream and in > subsequent next() StreamingAgg discards the unprocessed rows in previous > incoming and call's next() to upstream again. At this point lateral is > waiting to receive the rows for unprocessed ones from right side and instead > it see's a batch with lower rowId from streaming agg resulting in the > exception. > This is also a regression in previous behavior where without EMIT outcome > support StreamingAgg was handling this case properly. > {code:java} > SELECT ttt.number_segments_in_group from BGFsmall t, LATERAL (select > l.o.`element`.geo_segments.list as ot from unnest(t.geo_onds.list) l(o)) tt, > LATERAL (select count(*) as number_segments_in_group from unnest(tt.ot) > ll(o)) ttt limit 3; > {code} > Log:- > {code:java} > [Error Id: cfe63aaa-7115-4fab-b0fb-d5098d3b6350 on drill182:31010] > org.apache.drill.common.exceptions.UserException: SYSTEM ERROR: > IllegalStateException: Unexpected case where rowId 0 in right batch of > lateral is smaller than rowId 32768 in left batch being processed > > Fragment 0:0 > > [Error Id: cfe63aaa-7115-4fab-b0fb-d5098d3b6350 on drill182:31010] > at > org.apache.drill.common.exceptions.UserException$Builder.build(UserException.java:633) > ~[drill-common-1.15.0-SNAPSHOT.jar:1.15.0-SNAPSHOT] > at > org.apache.drill.exec.work.fragment.FragmentExecutor.sendFinalState(FragmentExecutor.java:360) > [drill-java-exec-1.15.0-SNAPSHOT.jar:1.15.0-SNAPSHOT] > at > org.apache.drill.exec.work.fragment.FragmentExecutor.cleanup(FragmentExecutor.java:215) > [drill-java-exec-1.15.0-SNAPSHOT.jar:1.15.0-SNAPSHOT] > at > org.apache.drill.exec.work.fragment.FragmentExecutor.run(FragmentExecutor.java:326) > [drill-java-exec-1.15.0-SNAPSHOT.jar:1.15.0-SNAPSHOT] > at > org.apache.drill.common.SelfCleaningRunnable.run(SelfCleaningRunnable.java:38) > [drill-common-1.15.0-SNAPSHOT.jar:1.15.0-SNAPSHOT] > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > [na:1.8.0_161] > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > [na:1.8.0_161] > at java.lang.Thread.run(Thread.java:748) [na:1.8.0_161] > Caused by: java.lang.IllegalStateException: Unexpected case where rowId 0 in > right batch of lateral is smaller than rowId 32768 in left batch being > processed > at > org.apache.drill.shaded.guava.com.google.common.base.Preconditions.checkState(Preconditions.java:609) > ~[drill-shaded-guava-23.0.jar:23.0] > at > org.apache.drill.exec.physical.impl.join.LateralJoinBatch.crossJoinAndOutputRecords(LateralJoinBatch.java:1024) > ~[drill-java-exec-1.15.0-SNAPSHOT.jar:1.15.0-SNAPSHOT] > at > org.apache.drill.exec.physical.impl.join.LateralJoinBatch.produceOutputBatch(LateralJoinBatch.java:575) > ~[drill-java-exec-1.15.0-SNAPSHOT.jar:1.15.0-SNAPSHOT] > at > org.apache.drill.exec.physical.impl.join.LateralJoinBatch.innerNext(LateralJoinBatch.java:238) > ~[drill-java-exec-1.15.0-SNAPSHOT.jar:1.15.0-SNAPSHOT] > at > org.apache.drill.exec.record.AbstractRecordBatch.next(AbstractRecordBatch.java:175) > ~[drill-java-exec-1.15.0-SNAPSHOT.jar:1.15.0-SNAPSHOT] > at > org.apache.drill.exec.record.AbstractRecordBatch.next(AbstractRecordBatch.java:122) > ~[drill-java-exec-1.15.0-SNAPSHOT.jar:1.15.0-SNAPSHOT] > at > org.apache.drill.exec.record.AbstractRecordBatch.next(AbstractRecordBatch.java:112) > ~[drill-java-exec-1.15.0-SNAPSHOT.jar:1.15.0-SNAPSHOT] > at > org.apache.drill.exec.record.AbstractUnaryRecordBatch.innerNext(AbstractUnaryRecordBatch.java:63) > ~[drill-java-exec-1.15.0-SNAPSHOT.jar:1.15.0-SNAPSHOT] > at > org.apache.drill.exec.physical.impl.limit.LimitRecordBatch.innerNext(LimitRecordBatch.java:101) > ~[drill-java-exec-1.15.0-SNAPSHOT.jar:1.15.0-SNAPSHOT] > at > org.apache.drill.exec.record.AbstractRecordBatch.next(AbstractRecordBatch.java:175) > ~[drill-java-exec-1.15.0-SNAPSHOT.jar:1.15.0-SNAPSHOT] > at > org.apache.drill.exec.record.AbstractRecordBatch.next(AbstractRecordBatch.java:122) > ~[drill-java-exec-1.15.0-SNAPSHOT.jar:1.15.0-SNAPSHOT] > at > org.apache.drill.exec.record.AbstractRecordBatch.next(AbstractRecordBatch.java:112) > ~[drill-java-exec-1.15.0-SNAPSHOT.jar:1.15.0-SNAPSHOT] > at > org.apache.drill.exec.record.AbstractUnaryRecordBatch.innerNext(AbstractUnaryRecordBatch.java:63) > ~[drill-java-exec-1.15.0-SNAPSHOT.jar:1.15.0-SNAPSHOT] > at > org.apache.drill.exec.physical.impl.limit.LimitRecordBatch.innerNext(LimitRecordBatch.java:101) > ~[drill-java-exec-1.15.0-SNAPSHOT.jar:1.15.0-SNAPSHOT] > at > org.apache.drill.exec.record.AbstractRecordBatch.next(AbstractRecordBatch.java:175) > ~[drill-java-exec-1.15.0-SNAPSHOT.jar:1.15.0-SNAPSHOT] > at > org.apache.drill.exec.record.AbstractRecordBatch.next(AbstractRecordBatch.java:122) > ~[drill-java-exec-1.15.0-SNAPSHOT.jar:1.15.0-SNAPSHOT] > at > org.apache.drill.exec.record.AbstractRecordBatch.next(AbstractRecordBatch.java:112) > ~[drill-java-exec-1.15.0-SNAPSHOT.jar:1.15.0-SNAPSHOT] > at > org.apache.drill.exec.record.AbstractUnaryRecordBatch.innerNext(AbstractUnaryRecordBatch.java:63) > ~[drill-java-exec-1.15.0-SNAPSHOT.jar:1.15.0-SNAPSHOT] > at > org.apache.drill.exec.record.AbstractRecordBatch.next(AbstractRecordBatch.java:175) > ~[drill-java-exec-1.15.0-SNAPSHOT.jar:1.15.0-SNAPSHOT] > at > org.apache.drill.exec.record.AbstractRecordBatch.next(AbstractRecordBatch.java:122) > ~[drill-java-exec-1.15.0-SNAPSHOT.jar:1.15.0-SNAPSHOT] > at > org.apache.drill.exec.record.AbstractRecordBatch.next(AbstractRecordBatch.java:112) > ~[drill-java-exec-1.15.0-SNAPSHOT.jar:1.15.0-SNAPSHOT] > at > org.apache.drill.exec.record.AbstractUnaryRecordBatch.innerNext(AbstractUnaryRecordBatch.java:69) > ~[drill-java-exec-1.15.0-SNAPSHOT.jar:1.15.0-SNAPSHOT] > at > org.apache.drill.exec.physical.impl.project.ProjectRecordBatch.innerNext(ProjectRecordBatch.java:143) > ~[drill-java-exec-1.15.0-SNAPSHOT.jar:1.15.0-SNAPSHOT] > at > org.apache.drill.exec.record.AbstractRecordBatch.next(AbstractRecordBatch.java:175) > ~[drill-java-exec-1.15.0-SNAPSHOT.jar:1.15.0-SNAPSHOT] > at > org.apache.drill.exec.physical.impl.BaseRootExec.next(BaseRootExec.java:103) > ~[drill-java-exec-1.15.0-SNAPSHOT.jar:1.15.0-SNAPSHOT] > at > org.apache.drill.exec.physical.impl.ScreenCreator$ScreenRoot.innerNext(ScreenCreator.java:83) > ~[drill-java-exec-1.15.0-SNAPSHOT.jar:1.15.0-SNAPSHOT] > at > org.apache.drill.exec.physical.impl.BaseRootExec.next(BaseRootExec.java:93) > ~[drill-java-exec-1.15.0-SNAPSHOT.jar:1.15.0-SNAPSHOT] > at > org.apache.drill.exec.work.fragment.FragmentExecutor$1.run(FragmentExecutor.java:293) > ~[drill-java-exec-1.15.0-SNAPSHOT.jar:1.15.0-SNAPSHOT] > at > org.apache.drill.exec.work.fragment.FragmentExecutor$1.run(FragmentExecutor.java:280) > ~[drill-java-exec-1.15.0-SNAPSHOT.jar:1.15.0-SNAPSHOT] > at java.security.AccessController.doPrivileged(Native Method) > ~[na:1.8.0_161] > at javax.security.auth.Subject.doAs(Subject.java:422) ~[na:1.8.0_161] > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1595) > ~[hadoop-common-2.7.0-mapr-1707.jar:na] > at > org.apache.drill.exec.work.fragment.FragmentExecutor.run(FragmentExecutor.java:280) > [drill-java-exec-1.15.0-SNAPSHOT.jar:1.15.0-SNAPSHOT] > ... 4 common frames omitted > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)