[ 
https://issues.apache.org/jira/browse/DRILL-6766?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16645516#comment-16645516
 ] 

ASF GitHub Bot commented on DRILL-6766:
---------------------------------------

sohami closed pull request #1490: DRILL-6766: Lateral Unnest query : 
IllegalStateException - rowId in right batch of lateral is smaller than rowId 
in left batch being processed
URL: https://github.com/apache/drill/pull/1490
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
index 2b9b31783f5..ffcfa78a707 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
@@ -20,6 +20,7 @@
 import java.io.IOException;
 import java.util.List;
 
+import 
org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.exceptions.UserException;
@@ -71,6 +72,7 @@
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK;
 import static 
org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.STOP;
 
 public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> 
{
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(StreamingAggBatch.class);
@@ -104,7 +106,7 @@
                                  // call to inner next is made.
   private boolean sendEmit = false; // In the case where we see an 
OK_NEW_SCHEMA along with the end of a data set
                                     // we send out a batch with OK_NEW_SCHEMA 
first, then in the next iteration,
-                                    // we send out an emopty batch with EMIT.
+                                    // we send out an empty batch with EMIT.
   private IterOutcome lastKnownOutcome = OK; // keep track of the outcome from 
the previous call to incoming.next
   private boolean firstBatchForSchema = true; // true if the current batch 
came in with an OK_NEW_SCHEMA
   private boolean firstBatchForDataSet = true; // true if the current batch is 
the first batch in a data set
@@ -127,7 +129,11 @@
   private boolean specialBatchSent = false;
   private static final int SPECIAL_BATCH_COUNT = 1;
 
-  public StreamingAggBatch(StreamingAggregate popConfig, RecordBatch incoming, 
FragmentContext context) throws OutOfMemoryException {
+  // TODO: Needs to adapt to batch sizing rather than hardcoded constant value
+  private int maxOutputRowCount = ValueVector.MAX_ROW_COUNT;
+
+  public StreamingAggBatch(StreamingAggregate popConfig, RecordBatch incoming, 
FragmentContext context)
+    throws OutOfMemoryException {
     super(popConfig, context);
     this.incoming = incoming;
 
@@ -189,7 +195,7 @@ public IterOutcome innerNext() {
 
     // if a special batch has been sent, we have no data in the incoming so 
exit early
     if (done || specialBatchSent) {
-      assert (sendEmit != true); // if special batch sent with emit then flag 
will not be set
+      assert (!sendEmit); // if special batch sent with emit then flag will 
not be set
       return NONE;
     }
 
@@ -199,6 +205,7 @@ public IterOutcome innerNext() {
       first = false; // first is set only in the case when we see a NONE after 
an empty first (and only) batch
       sendEmit = false;
       firstBatchForDataSet = true;
+      firstBatchForSchema = false;
       recordCount = 0;
       specialBatchSent = false;
       return EMIT;
@@ -239,18 +246,17 @@ public IterOutcome innerNext() {
             done = true;
             return IterOutcome.STOP;
           }
+          firstBatchForSchema = true;
           break;
         case EMIT:
           // if we get an EMIT with an empty batch as the first (and therefore 
only) batch
           // we have to do the special handling
           if (firstBatchForDataSet && popConfig.getKeys().size() == 0 && 
incoming.getRecordCount() == 0) {
             constructSpecialBatch();
-            firstBatchForDataSet = true; // reset on the next iteration
             // If outcome is NONE then we send the special batch in the first 
iteration and the NONE
             // outcome in the next iteration. If outcome is EMIT, we can send 
the special
             // batch and the EMIT outcome at the same time. (unless the 
finalOutcome is OK_NEW_SCHEMA)
-            IterOutcome finalOutcome =  getFinalOutcome();
-            return finalOutcome;
+            return  getFinalOutcome();
           }
           // else fall thru
         case OK:
@@ -259,15 +265,18 @@ public IterOutcome innerNext() {
           throw new IllegalStateException(String.format("unknown outcome %s", 
lastKnownOutcome));
       }
     } else {
-      if ( lastKnownOutcome != NONE && firstBatchForDataSet && 
!aggregator.isDone()) {
+      // If this is not the first batch and previous batch is fully processed 
with no error condition or NONE is not
+      // seen then it will call next() on upstream to get new batch. Otherwise 
just process the previous incoming batch
+      if ( lastKnownOutcome != NONE && firstBatchForDataSet && 
!aggregator.isDone()
+        && aggregator.previousBatchProcessed()) {
         lastKnownOutcome = incoming.next();
         if (!first ) {
           //Setup needs to be called again. During setup, generated code saves 
a reference to the vectors
-          // pointed to by the incoming batch so that the dereferencing of the 
vector wrappers to get to
+          // pointed to by the incoming batch so that the de-referencing of 
the vector wrappers to get to
           // the vectors  does not have to be done at each call to eval. 
However, after an EMIT is seen,
           // the vectors are replaced and the reference to the old vectors is 
no longer valid
           try {
-            aggregator.setup(oContext, incoming, this);
+            aggregator.setup(oContext, incoming, this, maxOutputRowCount);
           } catch (SchemaChangeException e) {
             UserException.Builder exceptionBuilder = 
UserException.functionError(e)
                 .message("A Schema change exception occured in calling setup() 
in generated code.");
@@ -280,8 +289,10 @@ public IterOutcome innerNext() {
     recordCount = aggregator.getOutputCount();
     container.setRecordCount(recordCount);
     logger.debug("Aggregator response {}, records {}", aggOutcome, 
aggregator.getOutputCount());
-    // overwrite the outcome variable since we no longer need to remember the 
first batch outcome
-    lastKnownOutcome = aggregator.getOutcome();
+    // get the returned IterOutcome from aggregator and based on AggOutcome 
and returned IterOutcome update the
+    // lastKnownOutcome below. For example: if AggOutcome is RETURN_AND_RESET 
then lastKnownOutcome is always set to
+    // EMIT
+    IterOutcome returnOutcome = aggregator.getOutcome();
     switch (aggOutcome) {
       case CLEANUP_AND_RETURN:
         if (!first) {
@@ -289,7 +300,7 @@ public IterOutcome innerNext() {
         }
         done = true;
         ExternalSortBatch.releaseBatches(incoming);
-        return lastKnownOutcome;
+        return returnOutcome;
       case RETURN_AND_RESET:
         //WE could have got a string of batches, all empty, until we hit an 
emit
         if (firstBatchForDataSet && popConfig.getKeys().size() == 0 && 
recordCount == 0) {
@@ -298,28 +309,32 @@ public IterOutcome innerNext() {
           // If outcome is NONE then we send the special batch in the first 
iteration and the NONE
           // outcome in the next iteration. If outcome is EMIT, we can send 
the special
           // batch and the EMIT outcome at the same time.
-
-          IterOutcome finalOutcome =  getFinalOutcome();
-          return finalOutcome;
+          return getFinalOutcome();
         }
         firstBatchForDataSet = true;
         firstBatchForSchema = false;
         if(first) {
           first = false;
         }
-        if(lastKnownOutcome == OK_NEW_SCHEMA) {
-          sendEmit = true;
+        // Since AggOutcome is RETURN_AND_RESET and returned IterOutcome is 
OK_NEW_SCHEMA from Aggregator that means it
+        // has seen first batch with OK_NEW_SCHEMA and then last batch with 
EMIT outcome. In that case if all the input
+        // batch is processed to produce output batch it need to send and 
empty batch with EMIT outcome in subsequent
+        // next call.
+        if(returnOutcome == OK_NEW_SCHEMA) {
+          sendEmit = (aggregator == null) || 
aggregator.previousBatchProcessed();
         }
         // Release external sort batches after EMIT is seen
         ExternalSortBatch.releaseBatches(incoming);
-        return lastKnownOutcome;
+        lastKnownOutcome = EMIT;
+        return returnOutcome;
       case RETURN_OUTCOME:
         // In case of complex writer expression, vectors would be added to 
batch run-time.
         // We have to re-build the schema.
         if (complexWriters != null) {
           container.buildSchema(SelectionVectorMode.NONE);
         }
-        if (lastKnownOutcome == IterOutcome.NONE ) {
+        if (returnOutcome == IterOutcome.NONE ) {
+          lastKnownOutcome = NONE;
           // we will set the 'done' flag in the next call to innerNext and use 
the lastKnownOutcome
           // to determine whether we should set the flag or not.
           // This is so that if someone calls getRecordCount in between calls 
to innerNext, we will
@@ -330,11 +345,12 @@ public IterOutcome innerNext() {
           } else {
             return OK;
           }
-        } else if (lastKnownOutcome == OK && first) {
+        } else if (returnOutcome == OK && first) {
           lastKnownOutcome = OK_NEW_SCHEMA;
+          returnOutcome = OK_NEW_SCHEMA;
         }
         first = false;
-        return lastKnownOutcome;
+        return returnOutcome;
       case UPDATE_AGGREGATOR:
         // We could get this either between data sets or within a data set.
         // If the former, we can handle the change and so need to update the 
aggregator and
@@ -342,8 +358,9 @@ public IterOutcome innerNext() {
         // and exception
         // This case is not tested since there are no unit tests for this and 
there is no support
         // from the sort operator for this case
-        if (lastKnownOutcome == EMIT) {
+        if (returnOutcome == EMIT) {
           createAggregator();
+          lastKnownOutcome = EMIT;
           return OK_NEW_SCHEMA;
         } else {
           
context.getExecutorState().fail(UserException.unsupportedError().message(SchemaChangeException
@@ -351,6 +368,7 @@ public IterOutcome innerNext() {
                   incoming.getSchema()).getMessage()).build(logger));
           close();
           killIncoming(false);
+          lastKnownOutcome = STOP;
           return IterOutcome.STOP;
         }
       default:
@@ -433,7 +451,7 @@ private StreamingAggregator createAggregatorInternal() 
throws SchemaChangeExcept
     ClassGenerator<StreamingAggregator> cg = 
CodeGenerator.getRoot(StreamingAggTemplate.TEMPLATE_DEFINITION, 
context.getOptions());
     cg.getCodeGenerator().plainJavaCapable(true);
     // Uncomment out this line to debug the generated code.
-    //  cg.getCodeGenerator().saveCodeForDebugging(true);
+    //cg.getCodeGenerator().saveCodeForDebugging(true);
     container.clear();
 
     LogicalExpression[] keyExprs = new 
LogicalExpression[popConfig.getKeys().size()];
@@ -506,7 +524,7 @@ private StreamingAggregator createAggregatorInternal() 
throws SchemaChangeExcept
 
     container.buildSchema(SelectionVectorMode.NONE);
     StreamingAggregator agg = context.getImplementationClass(cg);
-    agg.setup(oContext, incoming, this);
+    agg.setup(oContext, incoming, this, maxOutputRowCount);
     allocateComplexWriters();
     return agg;
   }
@@ -651,7 +669,11 @@ protected void killIncoming(boolean sendUpstream) {
 
   @Override
   public void dump() {
-    logger.error("StreamingAggBatch[container={}, popConfig={}, aggregator={}, 
incomingSchema={}]",
-        container, popConfig, aggregator, incomingSchema);
+    logger.error("StreamingAggBatch[container={}, popConfig={}, aggregator={}, 
incomingSchema={}]", container, popConfig, aggregator, incomingSchema);
+  }
+
+  @VisibleForTesting
+  public void setMaxOutputRowCount(int maxOutputRowCount) {
+    this.maxOutputRowCount = maxOutputRowCount;
   }
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
index 4bde7ab1708..cc89f23b55d 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
@@ -24,6 +24,7 @@
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.RecordBatch.IterOutcome;
 import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.vector.ValueVector;
 
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
@@ -33,7 +34,7 @@
 public abstract class StreamingAggTemplate implements StreamingAggregator {
   private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(StreamingAggregator.class);
   private static final boolean EXTRA_DEBUG = false;
-  private static final int OUTPUT_BATCH_SIZE = 32*1024;
+  private int maxOutputRows = ValueVector.MAX_ROW_COUNT;
 
   // lastOutcome is set ONLY if the lastOutcome was NONE or STOP
   private IterOutcome lastOutcome = null;
@@ -54,7 +55,7 @@
   // (i.e if a selection vector the sv4/sv2 entry has been dereferenced or if 
a vector then the record index itself)
   private int previousIndex = -1;  // the last index that has been processed. 
Initialized to -1 every time a new
                                    // aggregate group begins (including every 
time a new data set begins)
-  private int currentIndex; // current index being processed
+  private int currentIndex = Integer.MAX_VALUE; // current index being 
processed
   /**
    * Number of records added to the current aggregation group.
    */
@@ -72,10 +73,12 @@
 
 
   @Override
-  public void setup(OperatorContext context, RecordBatch incoming, 
StreamingAggBatch outgoing) throws SchemaChangeException {
+  public void setup(OperatorContext context, RecordBatch incoming,
+                    StreamingAggBatch outgoing, int outputRowCount) throws 
SchemaChangeException {
     this.context = context;
     this.incoming = incoming;
     this.outgoing = outgoing;
+    this.maxOutputRows = outputRowCount;
     setupInterior(incoming, outgoing);
   }
 
@@ -109,7 +112,7 @@ public AggOutcome doWork(IterOutcome outerOutcome) {
       allocateOutgoing();
 
       if (firstBatchForDataSet) {
-        this.currentIndex = incoming.getRecordCount() == 0 ? 0 : 
this.getVectorIndex(underlyingIndex);
+        this.currentIndex = incoming.getRecordCount() == 0 ? Integer.MAX_VALUE 
: this.getVectorIndex(underlyingIndex);
 
         if (outerOutcome == OK_NEW_SCHEMA) {
           firstBatchForSchema = true;
@@ -178,9 +181,10 @@ public AggOutcome doWork(IterOutcome outerOutcome) {
         // loop through existing records, adding as necessary.
         if(!processRemainingRecordsInBatch()) {
           // output batch is full. Return.
-          return setOkAndReturn();
+          return setOkAndReturn(outerOutcome);
         }
-        // if the current batch came with an EMIT, we're done
+        // if the current batch came with an EMIT, we're done since if we are 
here it means output batch consumed all
+        // the rows in incoming batch
         if(outerOutcome == EMIT) {
           // output the last record
           outputToBatch(previousIndex);
@@ -215,14 +219,14 @@ public AggOutcome doWork(IterOutcome outerOutcome) {
                 done = true;
                 lastOutcome = out;
                 if (firstBatchForDataSet && addedRecordCount == 0) {
-                  return setOkAndReturn();
+                  return setOkAndReturn(NONE);
                 } else if (addedRecordCount > 0) {
                   outputToBatchPrev(previous, previousIndex, outputCount); // 
No need to check the return value
                   // (output container full or not) as we are not going to 
insert any more records.
                   if (EXTRA_DEBUG) {
                     logger.debug("Received no more batches, returning.");
                   }
-                  return setOkAndReturn();
+                  return setOkAndReturn(NONE);
                 } else {
                   // not first batch and record Count == 0
                   outcome = out;
@@ -237,6 +241,7 @@ public AggOutcome doWork(IterOutcome outerOutcome) {
                   }
                 } else {
                   resetIndex();
+                  currentIndex = this.getVectorIndex(underlyingIndex);
                   if (previousIndex != -1 && isSamePrev(previousIndex, 
previous, currentIndex)) {
                     if (EXTRA_DEBUG) {
                       logger.debug("New value was same as last value of 
previous batch, adding.");
@@ -256,13 +261,16 @@ public AggOutcome doWork(IterOutcome outerOutcome) {
                         if (EXTRA_DEBUG) {
                           logger.debug("Output container is full. flushing 
it.");
                         }
-                        return setOkAndReturnEmit();
+                        return setOkAndReturn(EMIT);
                       }
                     }
                     // important to set the previous index to -1 since we 
start a new group
                     previousIndex = -1;
                   }
-                  processRemainingRecordsInBatch();
+                  if (!processRemainingRecordsInBatch()) {
+                    // output batch is full. Return.
+                    return setOkAndReturn(EMIT);
+                  }
                   outputToBatch(previousIndex); // currentIndex has been reset 
to int_max so use previous index.
                 }
                 resetIndex();
@@ -285,7 +293,7 @@ public AggOutcome doWork(IterOutcome outerOutcome) {
                     logger.debug("Wrote out end of previous batch, 
returning.");
                   }
                   newSchema = true;
-                  return setOkAndReturn();
+                  return setOkAndReturn(OK_NEW_SCHEMA);
                 }
                 cleanup();
                 return AggOutcome.UPDATE_AGGREGATOR;
@@ -294,6 +302,7 @@ public AggOutcome doWork(IterOutcome outerOutcome) {
                 if (incoming.getRecordCount() == 0) {
                   continue;
                 } else {
+                  currentIndex = this.getVectorIndex(underlyingIndex);
                   if (previousIndex != -1 && isSamePrev(previousIndex, 
previous, currentIndex)) {
                     if (EXTRA_DEBUG) {
                       logger.debug("New value was same as last value of 
previous batch, adding.");
@@ -315,7 +324,7 @@ public AggOutcome doWork(IterOutcome outerOutcome) {
                           logger.debug("Output container is full. flushing 
it.");
                         }
                         previousIndex = -1;
-                        return setOkAndReturn();
+                        return setOkAndReturn(OK);
                       }
                     }
                     previousIndex = -1;
@@ -405,8 +414,8 @@ private final void incIndex() {
   }
 
   private final void resetIndex() {
-    underlyingIndex = -1;
-    incIndex();
+    underlyingIndex = 0;
+    currentIndex = Integer.MAX_VALUE;
   }
 
   /**
@@ -414,7 +423,7 @@ private final void resetIndex() {
    *
    * @return outcome
    */
-  private final AggOutcome setOkAndReturn() {
+  private final AggOutcome setOkAndReturn(IterOutcome seenOutcome) {
     IterOutcome outcomeToReturn;
     firstBatchForDataSet = false;
     if (firstBatchForSchema) {
@@ -428,7 +437,7 @@ private final AggOutcome setOkAndReturn() {
     for (VectorWrapper<?> v : outgoing) {
       v.getValueVector().getMutator().setValueCount(outputCount);
     }
-    return AggOutcome.RETURN_OUTCOME;
+    return (seenOutcome == EMIT) ? AggOutcome.RETURN_AND_RESET : 
AggOutcome.RETURN_OUTCOME;
   }
 
   /**
@@ -457,7 +466,7 @@ private final AggOutcome setOkAndReturnEmit() {
   // Returns output container status after insertion of the given record. 
Caller must check the return value if it
   // plans to insert more records into outgoing container.
   private final boolean outputToBatch(int inIndex) {
-    assert outputCount < OUTPUT_BATCH_SIZE:
+    assert outputCount < maxOutputRows :
         "Outgoing RecordBatch is not flushed. It reached its max capacity in 
the last update";
 
     outputRecordKeys(inIndex, outputCount);
@@ -470,14 +479,13 @@ private final boolean outputToBatch(int inIndex) {
     resetValues();
     outputCount++;
     addedRecordCount = 0;
-
-    return outputCount == OUTPUT_BATCH_SIZE;
+    return outputCount == maxOutputRows;
   }
 
   // Returns output container status after insertion of the given record. 
Caller must check the return value if it
   // plans to inserts more record into outgoing container.
   private final boolean outputToBatchPrev(InternalBatch b1, int inIndex, int 
outIndex) {
-    assert outputCount < OUTPUT_BATCH_SIZE:
+    assert outputCount < maxOutputRows :
         "Outgoing RecordBatch is not flushed. It reached its max capacity in 
the last update";
 
     outputRecordKeysPrev(b1, inIndex, outIndex);
@@ -485,8 +493,7 @@ private final boolean outputToBatchPrev(InternalBatch b1, 
int inIndex, int outIn
     resetValues();
     outputCount++;
     addedRecordCount = 0;
-
-    return outputCount == OUTPUT_BATCH_SIZE;
+    return outputCount == maxOutputRows;
   }
 
   private void addRecordInc(int index) {
@@ -508,6 +515,11 @@ public String toString() {
         + "]";
   }
 
+  @Override
+  public boolean previousBatchProcessed() {
+    return (currentIndex == Integer.MAX_VALUE);
+  }
+
   public abstract void setupInterior(@Named("incoming") RecordBatch incoming, 
@Named("outgoing") RecordBatch outgoing) throws SchemaChangeException;
   public abstract boolean isSame(@Named("index1") int index1, @Named("index2") 
int index2);
   public abstract boolean isSamePrev(@Named("b1Index") int b1Index, 
@Named("b1") InternalBatch b1, @Named("b2Index") int b2Index);
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java
index 23fdcc1d24c..57caa9f058e 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java
@@ -25,7 +25,8 @@
 
 public interface StreamingAggregator {
 
-  public static TemplateClassDefinition<StreamingAggregator> 
TEMPLATE_DEFINITION = new 
TemplateClassDefinition<StreamingAggregator>(StreamingAggregator.class, 
StreamingAggTemplate.class);
+  TemplateClassDefinition<StreamingAggregator> TEMPLATE_DEFINITION =
+    new 
TemplateClassDefinition<StreamingAggregator>(StreamingAggregator.class, 
StreamingAggTemplate.class);
 
 
   /**
@@ -45,25 +46,27 @@
    * <p>
    * @see 
org.apache.drill.exec.physical.impl.aggregate.HashAggregator.AggOutcome 
HashAggregator.AggOutcome
    */
-  public static enum AggOutcome {
+  enum AggOutcome {
     RETURN_OUTCOME,
     CLEANUP_AND_RETURN,
     UPDATE_AGGREGATOR,
     RETURN_AND_RESET;
   }
 
-  public abstract void setup(OperatorContext context, RecordBatch incoming, 
StreamingAggBatch outgoing) throws SchemaChangeException;
+  void setup(OperatorContext context, RecordBatch incoming, StreamingAggBatch 
outgoing, int outputRowCount)
+    throws SchemaChangeException;
 
-  public abstract IterOutcome getOutcome();
+  IterOutcome getOutcome();
 
-  public abstract int getOutputCount();
+  int getOutputCount();
 
   // do the work. Also pass in the Iteroutcome of the batch already read in 
case it might be an EMIT. If the
   // outerOutcome is EMIT, we need to do the work without reading any more 
batches.
-  public abstract AggOutcome doWork(IterOutcome outerOutcome);
+  AggOutcome doWork(IterOutcome outerOutcome);
 
-  public abstract boolean isDone();
+  boolean isDone();
 
-  public abstract void cleanup();
+  void cleanup();
 
+  boolean previousBatchProcessed();
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
index 242687f040d..735f11f365b 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
@@ -389,7 +389,7 @@ protected void killIncoming(boolean sendUpstream) {
   private boolean handleSchemaChange() {
     try {
       stats.startSetup();
-      logger.debug("Setting up new schema based on incoming batch. Old output 
schema: %s", container.getSchema());
+      logger.debug("Setting up new schema based on incoming batch. Old output 
schema: {}", container.getSchema());
       setupNewSchema();
       return true;
     } catch (SchemaChangeException ex) {
@@ -805,7 +805,7 @@ private BatchSchema 
batchSchemaWithNoExcludedCols(BatchSchema originSchema, bool
    */
   private void setupNewSchema() throws SchemaChangeException {
 
-    logger.debug("Setting up new schema based on incoming batch. New left 
schema: %s and New right schema: %s",
+    logger.debug("Setting up new schema based on incoming batch. New left 
schema: {} and New right schema: {}",
       left.getSchema(), right.getSchema());
 
     // Clear up the container
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestImpl.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestImpl.java
index 508999f333b..a9c95983460 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestImpl.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestImpl.java
@@ -118,6 +118,7 @@ public final int unnestRecords(final int recordCount) {
     Preconditions.checkArgument(svMode == NONE, "Unnest does not support 
selection vector inputs.");
 
     final int initialInnerValueIndex = runningInnerValueIndex;
+    int nonEmptyArray = 0;
 
     outer:
     {
@@ -126,8 +127,12 @@ public final int unnestRecords(final int recordCount) {
 
       for (; valueIndex < valueCount; valueIndex++) {
         final int innerValueCount = accessor.getInnerValueCountAt(valueIndex);
-        logger.debug("Unnest: currentRecord: {}, innerValueCount: {}, record 
count: {}, output limit: {}",
-            innerValueCount, recordCount, outputLimit);
+        logger.trace("Unnest: CurrentRowId: {}, innerValueCount: {}, 
outputIndex: {},  output limit: {}",
+            valueIndex, innerValueCount, outputIndex, outputLimit);
+
+        if (innerValueCount > 0) {
+          ++nonEmptyArray;
+        }
 
         for (; innerValueIndex < innerValueCount; innerValueIndex++) {
           // If we've hit the batch size limit, stop and flush what we've got 
so far.
@@ -148,6 +153,9 @@ public final int unnestRecords(final int recordCount) {
       }  // forevery value in the array
     }  // for every incoming record
     final int delta = runningInnerValueIndex - initialInnerValueIndex;
+    logger.debug("Unnest: Finished processing current batch. [Details: 
LastProcessedRowIndex: {}, " +
+      "RowsWithNonEmptyArrays: {}, outputIndex: {}, outputLimit: {}, 
TotalIncomingRecords: {}]",
+      valueIndex, nonEmptyArray, delta, outputLimit, accessor.getValueCount());
     final SchemaChangeCallBack callBack = new SchemaChangeCallBack();
     for (TransferPair t : transfers) {
       t.splitAndTransfer(initialInnerValueIndex, delta);
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
index 362ea29924b..eb6112dfc1b 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
@@ -135,13 +135,15 @@ public final IterOutcome next(final int inputIndex, final 
RecordBatch b){
       return next;
     }
 
-    switch(next) {
+    boolean isNewSchema = false;
+    logger.debug("Received next batch for index: {} with outcome: {}", 
inputIndex, next);
+    switch (next) {
       case OK_NEW_SCHEMA:
-        stats.batchReceived(inputIndex, b.getRecordCount(), true);
-        break;
+        isNewSchema = true;
       case OK:
       case EMIT:
-        stats.batchReceived(inputIndex, b.getRecordCount(), false);
+        stats.batchReceived(inputIndex, b.getRecordCount(), isNewSchema);
+        logger.debug("Number of records in received batch: {}", 
b.getRecordCount());
         break;
       default:
         break;
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestStreamingAggEmitOutcome.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestStreamingAggEmitOutcome.java
index cead984cc5c..37a44eaa3e7 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestStreamingAggEmitOutcome.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestStreamingAggEmitOutcome.java
@@ -527,6 +527,232 @@ public void 
t8_testStreamingAggrMultipleInputToSingleOutputBatch() {
     nonEmptyInputRowSet2.clear();
   }
 
+  /**
+   * Verifies scenario where multiple incoming batches received with 
OK_NEW_SCHEMA, OK, OK, EMIT whose output is split
+   * into multiple output batches is handled correctly such that first output 
is produced with OK_NEW_SCHEMA and then
+   * followed by EMIT outcome
+   */
+  @Test
+  public void t8_1_testStreamingAggr_InputSplitToMultipleOutputBatch() {
+
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = 
operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 20, "item1")
+      .build();
+    final RowSet.SingleRowSet nonEmptyInputRowSet3 = 
operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(2, 30, "item2")
+      .build();
+
+    final RowSet.SingleRowSet nonEmptyInputRowSet4 = 
operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(2, 40, "item2")
+      .addRow(2, 50, "item2")
+      .addRow(2, 60, "item2")
+      .addRow(2, 70, "item2")
+      .addRow(3, 100, "item3")
+      .addRow(3, 200, "item3")
+      .addRow(3, 300, "item3")
+      .addRow(3, 400, "item3")
+      .build();
+
+    TupleMetadata resultSchema2 = new SchemaBuilder()
+      .add("name", TypeProtos.MinorType.VARCHAR)
+      .add("id", TypeProtos.MinorType.INT)
+      .add("total_count", TypeProtos.MinorType.BIGINT)
+      .buildSchema();
+
+    final RowSet.SingleRowSet expectedRowSet1 = 
operatorFixture.rowSetBuilder(resultSchema2)
+      .addRow("item1", 1, (long)2)
+      .addRow("item2", 2, (long)5)
+      .build();
+
+    final RowSet.SingleRowSet expectedRowSet2 = 
operatorFixture.rowSetBuilder(resultSchema2)
+      .addRow("item3", 3, (long)4)
+      .build();
+
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+    inputContainer.add(nonEmptyInputRowSet3.container());
+    inputContainer.add(nonEmptyInputRowSet4.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    final MockRecordBatch mockInputBatch = new 
MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+    final StreamingAggregate streamAggrConfig = new StreamingAggregate(null,
+      parseExprs("name_left", "name", "id_left", "id"),
+      parseExprs("count(cost_left)", "total_count"),
+      1.0f);
+
+    final StreamingAggBatch strAggBatch = new 
StreamingAggBatch(streamAggrConfig, mockInputBatch,
+      operatorFixture.getFragmentContext());
+    strAggBatch.setMaxOutputRowCount(2);
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    // Expect OK_NEW_SCHEMA first for all the input batch from second batch 
onwards since output batch is full after
+    // producing 2 groups as output
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertEquals(2, strAggBatch.getRecordCount());
+
+    RowSet actualRowSet = 
DirectRowSet.fromContainer(strAggBatch.getContainer());
+    new RowSetComparison(expectedRowSet1).verify(actualRowSet);
+
+    // The last group was produced in different output batch with EMIT outcome
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(1, strAggBatch.getRecordCount());
+    actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer());
+    new RowSetComparison(expectedRowSet2).verify(actualRowSet);
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE);
+
+    nonEmptyInputRowSet2.clear();
+    nonEmptyInputRowSet3.clear();
+    nonEmptyInputRowSet4.clear();
+
+    expectedRowSet1.clear();
+    expectedRowSet2.clear();
+  }
+
+  /**
+   * Verifies scenario where multiple incoming batches received with 
OK_NEW_SCHEMA, OK, OK, EMIT whose output is split
+   * into multiple output batches and incoming batches received with 
OK,OK,EMIT whose output is also split across
+   * multiple output batches is handled correctly.
+   */
+  @Test
+  public void 
t8_2_testStreamingAggr_Inputs_OK_EMIT_SplitToMultipleOutputBatch() {
+
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = 
operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 20, "item1")
+      .build();
+    final RowSet.SingleRowSet nonEmptyInputRowSet3 = 
operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(2, 30, "item2")
+      .build();
+
+    final RowSet.SingleRowSet nonEmptyInputRowSet4 = 
operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(2, 40, "item2")
+      .addRow(2, 50, "item2")
+      .addRow(2, 60, "item2")
+      .addRow(2, 70, "item2")
+      .addRow(3, 100, "item3")
+      .addRow(3, 200, "item3")
+      .addRow(3, 300, "item3")
+      .addRow(3, 400, "item3")
+      .build();
+
+    final RowSet.SingleRowSet nonEmptyInputRowSet5 = 
operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(2, 40, "item2")
+      .build();
+
+    final RowSet.SingleRowSet nonEmptyInputRowSet6 = 
operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(2, 50, "item2")
+      .build();
+
+    final RowSet.SingleRowSet nonEmptyInputRowSet7 = 
operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(3, 130, "item3")
+      .addRow(3, 130, "item3")
+      .addRow(4, 140, "item4")
+      .addRow(4, 140, "item4")
+      .build();
+
+    TupleMetadata resultSchema2 = new SchemaBuilder()
+      .add("name", TypeProtos.MinorType.VARCHAR)
+      .add("id", TypeProtos.MinorType.INT)
+      .add("total_count", TypeProtos.MinorType.BIGINT)
+      .buildSchema();
+
+    final RowSet.SingleRowSet expectedRowSet1 = 
operatorFixture.rowSetBuilder(resultSchema2)
+      .addRow("item1", 1, (long)2)
+      .addRow("item2", 2, (long)5)
+      .build();
+
+    final RowSet.SingleRowSet expectedRowSet2 = 
operatorFixture.rowSetBuilder(resultSchema2)
+      .addRow("item3", 3, (long)4)
+      .build();
+
+    final RowSet.SingleRowSet expectedRowSet3 = 
operatorFixture.rowSetBuilder(resultSchema2)
+      .addRow("item2", 2, (long)2)
+      .addRow("item3", 3, (long)2)
+      .build();
+
+    final RowSet.SingleRowSet expectedRowSet4 = 
operatorFixture.rowSetBuilder(resultSchema2)
+      .addRow("item4", 4, (long)2)
+      .build();
+
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+    inputContainer.add(nonEmptyInputRowSet3.container());
+    inputContainer.add(nonEmptyInputRowSet4.container());
+    inputContainer.add(nonEmptyInputRowSet5.container());
+    inputContainer.add(nonEmptyInputRowSet6.container());
+    inputContainer.add(nonEmptyInputRowSet7.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    final MockRecordBatch mockInputBatch = new 
MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+    final StreamingAggregate streamAggrConfig = new StreamingAggregate(null,
+      parseExprs("name_left", "name", "id_left", "id"),
+      parseExprs("count(cost_left)", "total_count"),
+      1.0f);
+
+    final StreamingAggBatch strAggBatch = new 
StreamingAggBatch(streamAggrConfig, mockInputBatch,
+      operatorFixture.getFragmentContext());
+    strAggBatch.setMaxOutputRowCount(2);
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+
+    // Output batches for input batch 2 to 5
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertEquals(2, strAggBatch.getRecordCount());
+
+    RowSet actualRowSet = 
DirectRowSet.fromContainer(strAggBatch.getContainer());
+    new RowSetComparison(expectedRowSet1).verify(actualRowSet);
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(1, strAggBatch.getRecordCount());
+    actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer());
+    new RowSetComparison(expectedRowSet2).verify(actualRowSet);
+
+    // Output batches for input batch 6 to 8
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK);
+    // output batch is full after producing 2 rows
+    assertEquals(2, strAggBatch.getRecordCount());
+    actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer());
+    new RowSetComparison(expectedRowSet3).verify(actualRowSet);
+
+    // output batch with pending rows
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(1, strAggBatch.getRecordCount());
+    actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer());
+    new RowSetComparison(expectedRowSet4).verify(actualRowSet);
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE);
+
+    nonEmptyInputRowSet2.clear();
+    nonEmptyInputRowSet3.clear();
+    nonEmptyInputRowSet4.clear();
+    nonEmptyInputRowSet5.clear();
+    nonEmptyInputRowSet6.clear();
+    nonEmptyInputRowSet7.clear();
+
+    expectedRowSet1.clear();
+    expectedRowSet2.clear();
+    expectedRowSet3.clear();
+    expectedRowSet4.clear();
+  }
 
   
/*****************************************************************************************
    Tests for validating regular StreamingAggr behavior with no EMIT outcome
@@ -620,6 +846,88 @@ public void t10_testStreamingAggrWithEmptyDataSet() {
     assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE);
   }
 
+  @Test
+  public void t10_1_testStreamingAggr_InputSplitToMultipleOutputBatch() {
+
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = 
operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 20, "item1")
+      .build();
+    final RowSet.SingleRowSet nonEmptyInputRowSet3 = 
operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(2, 30, "item2")
+      .build();
+
+    final RowSet.SingleRowSet nonEmptyInputRowSet4 = 
operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(2, 40, "item2")
+      .addRow(2, 50, "item2")
+      .addRow(2, 60, "item2")
+      .addRow(2, 70, "item2")
+      .addRow(3, 100, "item3")
+      .addRow(3, 200, "item3")
+      .addRow(3, 300, "item3")
+      .addRow(3, 400, "item3")
+      .build();
+
+    TupleMetadata resultSchema2 = new SchemaBuilder()
+      .add("name", TypeProtos.MinorType.VARCHAR)
+      .add("id", TypeProtos.MinorType.INT)
+      .add("total_count", TypeProtos.MinorType.BIGINT)
+      .buildSchema();
+
+    final RowSet.SingleRowSet expectedRowSet1 = 
operatorFixture.rowSetBuilder(resultSchema2)
+      .addRow("item1", 1, (long)2)
+      .addRow("item2", 2, (long)5)
+      .build();
+
+    final RowSet.SingleRowSet expectedRowSet2 = 
operatorFixture.rowSetBuilder(resultSchema2)
+      .addRow("item3", 3, (long)4)
+      .build();
+
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+    inputContainer.add(nonEmptyInputRowSet3.container());
+    inputContainer.add(nonEmptyInputRowSet4.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+
+    final MockRecordBatch mockInputBatch = new 
MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+    final StreamingAggregate streamAggrConfig = new StreamingAggregate(null,
+      parseExprs("name_left", "name", "id_left", "id"),
+      parseExprs("count(cost_left)", "total_count"),
+      1.0f);
+
+    final StreamingAggBatch strAggBatch = new 
StreamingAggBatch(streamAggrConfig, mockInputBatch,
+      operatorFixture.getFragmentContext());
+    strAggBatch.setMaxOutputRowCount(2);
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertEquals(2, strAggBatch.getRecordCount());
+
+    RowSet actualRowSet = 
DirectRowSet.fromContainer(strAggBatch.getContainer());
+    new RowSetComparison(expectedRowSet1).verify(actualRowSet);
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK);
+    assertEquals(1, strAggBatch.getRecordCount());
+    actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer());
+    new RowSetComparison(expectedRowSet2).verify(actualRowSet);
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE);
+
+    nonEmptyInputRowSet2.clear();
+    nonEmptyInputRowSet3.clear();
+    nonEmptyInputRowSet4.clear();
+
+    expectedRowSet1.clear();
+    expectedRowSet2.clear();
+  }
+
   /*******************************************************
    * Tests for EMIT with empty batches and no group by
    * (Tests t1-t8 are repeated with no group by)
@@ -813,14 +1121,15 @@ public void 
t14_testStreamingAggrMultipleEmptyBatchFollowedByNonEmptyBatchEmitOu
 
     assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
     assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertEquals(1, strAggBatch.getRecordCount()); // special batch
     assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
     assertEquals(0, strAggBatch.getRecordCount());
     assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
-    assertEquals(1, strAggBatch.getRecordCount());
+    assertEquals(1, strAggBatch.getRecordCount()); // special batch
     assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
-    assertEquals(1, strAggBatch.getRecordCount());
+    assertEquals(1, strAggBatch.getRecordCount()); // special batch
     assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
-    assertEquals(1, strAggBatch.getRecordCount());
+    assertEquals(1, strAggBatch.getRecordCount()); // data batch
 
     RowSet actualRowSet = 
DirectRowSet.fromContainer(strAggBatch.getContainer());
     new RowSetComparison(expectedRowSet).verify(actualRowSet);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Lateral Unnest query : IllegalStateException -  rowId in right batch of 
> lateral is smaller than rowId in left batch being processed
> -----------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: DRILL-6766
>                 URL: https://issues.apache.org/jira/browse/DRILL-6766
>             Project: Apache Drill
>          Issue Type: Bug
>          Components: Execution - Relational Operators
>    Affects Versions: 1.14.0
>            Reporter: Kedar Sankar Behera
>            Assignee: Sorabh Hamirwasia
>            Priority: Major
>              Labels: ready-to-commit
>             Fix For: 1.15.0
>
>
> The error is coming when one batch of streaming agg is split across multiple 
> output batches. In that case the first output batch is sent downstream and in 
> subsequent next() StreamingAgg discards the unprocessed rows in previous 
> incoming and call's next() to upstream again. At this point lateral is 
> waiting to receive the rows for unprocessed ones from right side and instead 
> it see's a batch with lower rowId from streaming agg resulting in the 
> exception.
> This is also a regression in previous behavior where without EMIT outcome 
> support StreamingAgg was handling this case properly. 
> {code:java}
> SELECT ttt.number_segments_in_group from BGFsmall t, LATERAL (select 
> l.o.`element`.geo_segments.list as ot from unnest(t.geo_onds.list) l(o)) tt, 
> LATERAL (select count(*) as number_segments_in_group from unnest(tt.ot) 
> ll(o)) ttt limit 3;
> {code}
> Log:-
> {code:java}
> [Error Id: cfe63aaa-7115-4fab-b0fb-d5098d3b6350 on drill182:31010]
> org.apache.drill.common.exceptions.UserException: SYSTEM ERROR: 
> IllegalStateException: Unexpected case where rowId 0 in right batch of 
> lateral is smaller than rowId 32768 in left batch being processed
>  
> Fragment 0:0
>  
> [Error Id: cfe63aaa-7115-4fab-b0fb-d5098d3b6350 on drill182:31010]
>         at 
> org.apache.drill.common.exceptions.UserException$Builder.build(UserException.java:633)
>  ~[drill-common-1.15.0-SNAPSHOT.jar:1.15.0-SNAPSHOT]
>         at 
> org.apache.drill.exec.work.fragment.FragmentExecutor.sendFinalState(FragmentExecutor.java:360)
>  [drill-java-exec-1.15.0-SNAPSHOT.jar:1.15.0-SNAPSHOT]
>         at 
> org.apache.drill.exec.work.fragment.FragmentExecutor.cleanup(FragmentExecutor.java:215)
>  [drill-java-exec-1.15.0-SNAPSHOT.jar:1.15.0-SNAPSHOT]
>         at 
> org.apache.drill.exec.work.fragment.FragmentExecutor.run(FragmentExecutor.java:326)
>  [drill-java-exec-1.15.0-SNAPSHOT.jar:1.15.0-SNAPSHOT]
>         at 
> org.apache.drill.common.SelfCleaningRunnable.run(SelfCleaningRunnable.java:38)
>  [drill-common-1.15.0-SNAPSHOT.jar:1.15.0-SNAPSHOT]
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  [na:1.8.0_161]
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  [na:1.8.0_161]
>         at java.lang.Thread.run(Thread.java:748) [na:1.8.0_161]
> Caused by: java.lang.IllegalStateException: Unexpected case where rowId 0 in 
> right batch of lateral is smaller than rowId 32768 in left batch being 
> processed
>         at 
> org.apache.drill.shaded.guava.com.google.common.base.Preconditions.checkState(Preconditions.java:609)
>  ~[drill-shaded-guava-23.0.jar:23.0]
>         at 
> org.apache.drill.exec.physical.impl.join.LateralJoinBatch.crossJoinAndOutputRecords(LateralJoinBatch.java:1024)
>  ~[drill-java-exec-1.15.0-SNAPSHOT.jar:1.15.0-SNAPSHOT]
>         at 
> org.apache.drill.exec.physical.impl.join.LateralJoinBatch.produceOutputBatch(LateralJoinBatch.java:575)
>  ~[drill-java-exec-1.15.0-SNAPSHOT.jar:1.15.0-SNAPSHOT]
>         at 
> org.apache.drill.exec.physical.impl.join.LateralJoinBatch.innerNext(LateralJoinBatch.java:238)
>  ~[drill-java-exec-1.15.0-SNAPSHOT.jar:1.15.0-SNAPSHOT]
>         at 
> org.apache.drill.exec.record.AbstractRecordBatch.next(AbstractRecordBatch.java:175)
>  ~[drill-java-exec-1.15.0-SNAPSHOT.jar:1.15.0-SNAPSHOT]
>         at 
> org.apache.drill.exec.record.AbstractRecordBatch.next(AbstractRecordBatch.java:122)
>  ~[drill-java-exec-1.15.0-SNAPSHOT.jar:1.15.0-SNAPSHOT]
>         at 
> org.apache.drill.exec.record.AbstractRecordBatch.next(AbstractRecordBatch.java:112)
>  ~[drill-java-exec-1.15.0-SNAPSHOT.jar:1.15.0-SNAPSHOT]
>         at 
> org.apache.drill.exec.record.AbstractUnaryRecordBatch.innerNext(AbstractUnaryRecordBatch.java:63)
>  ~[drill-java-exec-1.15.0-SNAPSHOT.jar:1.15.0-SNAPSHOT]
>         at 
> org.apache.drill.exec.physical.impl.limit.LimitRecordBatch.innerNext(LimitRecordBatch.java:101)
>  ~[drill-java-exec-1.15.0-SNAPSHOT.jar:1.15.0-SNAPSHOT]
>         at 
> org.apache.drill.exec.record.AbstractRecordBatch.next(AbstractRecordBatch.java:175)
>  ~[drill-java-exec-1.15.0-SNAPSHOT.jar:1.15.0-SNAPSHOT]
>         at 
> org.apache.drill.exec.record.AbstractRecordBatch.next(AbstractRecordBatch.java:122)
>  ~[drill-java-exec-1.15.0-SNAPSHOT.jar:1.15.0-SNAPSHOT]
>         at 
> org.apache.drill.exec.record.AbstractRecordBatch.next(AbstractRecordBatch.java:112)
>  ~[drill-java-exec-1.15.0-SNAPSHOT.jar:1.15.0-SNAPSHOT]
>         at 
> org.apache.drill.exec.record.AbstractUnaryRecordBatch.innerNext(AbstractUnaryRecordBatch.java:63)
>  ~[drill-java-exec-1.15.0-SNAPSHOT.jar:1.15.0-SNAPSHOT]
>         at 
> org.apache.drill.exec.physical.impl.limit.LimitRecordBatch.innerNext(LimitRecordBatch.java:101)
>  ~[drill-java-exec-1.15.0-SNAPSHOT.jar:1.15.0-SNAPSHOT]
>         at 
> org.apache.drill.exec.record.AbstractRecordBatch.next(AbstractRecordBatch.java:175)
>  ~[drill-java-exec-1.15.0-SNAPSHOT.jar:1.15.0-SNAPSHOT]
>         at 
> org.apache.drill.exec.record.AbstractRecordBatch.next(AbstractRecordBatch.java:122)
>  ~[drill-java-exec-1.15.0-SNAPSHOT.jar:1.15.0-SNAPSHOT]
>         at 
> org.apache.drill.exec.record.AbstractRecordBatch.next(AbstractRecordBatch.java:112)
>  ~[drill-java-exec-1.15.0-SNAPSHOT.jar:1.15.0-SNAPSHOT]
>         at 
> org.apache.drill.exec.record.AbstractUnaryRecordBatch.innerNext(AbstractUnaryRecordBatch.java:63)
>  ~[drill-java-exec-1.15.0-SNAPSHOT.jar:1.15.0-SNAPSHOT]
>         at 
> org.apache.drill.exec.record.AbstractRecordBatch.next(AbstractRecordBatch.java:175)
>  ~[drill-java-exec-1.15.0-SNAPSHOT.jar:1.15.0-SNAPSHOT]
>         at 
> org.apache.drill.exec.record.AbstractRecordBatch.next(AbstractRecordBatch.java:122)
>  ~[drill-java-exec-1.15.0-SNAPSHOT.jar:1.15.0-SNAPSHOT]
>         at 
> org.apache.drill.exec.record.AbstractRecordBatch.next(AbstractRecordBatch.java:112)
>  ~[drill-java-exec-1.15.0-SNAPSHOT.jar:1.15.0-SNAPSHOT]
>         at 
> org.apache.drill.exec.record.AbstractUnaryRecordBatch.innerNext(AbstractUnaryRecordBatch.java:69)
>  ~[drill-java-exec-1.15.0-SNAPSHOT.jar:1.15.0-SNAPSHOT]
>         at 
> org.apache.drill.exec.physical.impl.project.ProjectRecordBatch.innerNext(ProjectRecordBatch.java:143)
>  ~[drill-java-exec-1.15.0-SNAPSHOT.jar:1.15.0-SNAPSHOT]
>         at 
> org.apache.drill.exec.record.AbstractRecordBatch.next(AbstractRecordBatch.java:175)
>  ~[drill-java-exec-1.15.0-SNAPSHOT.jar:1.15.0-SNAPSHOT]
>         at 
> org.apache.drill.exec.physical.impl.BaseRootExec.next(BaseRootExec.java:103) 
> ~[drill-java-exec-1.15.0-SNAPSHOT.jar:1.15.0-SNAPSHOT]
>         at 
> org.apache.drill.exec.physical.impl.ScreenCreator$ScreenRoot.innerNext(ScreenCreator.java:83)
>  ~[drill-java-exec-1.15.0-SNAPSHOT.jar:1.15.0-SNAPSHOT]
>         at 
> org.apache.drill.exec.physical.impl.BaseRootExec.next(BaseRootExec.java:93) 
> ~[drill-java-exec-1.15.0-SNAPSHOT.jar:1.15.0-SNAPSHOT]
>         at 
> org.apache.drill.exec.work.fragment.FragmentExecutor$1.run(FragmentExecutor.java:293)
>  ~[drill-java-exec-1.15.0-SNAPSHOT.jar:1.15.0-SNAPSHOT]
>         at 
> org.apache.drill.exec.work.fragment.FragmentExecutor$1.run(FragmentExecutor.java:280)
>  ~[drill-java-exec-1.15.0-SNAPSHOT.jar:1.15.0-SNAPSHOT]
>         at java.security.AccessController.doPrivileged(Native Method) 
> ~[na:1.8.0_161]
>         at javax.security.auth.Subject.doAs(Subject.java:422) ~[na:1.8.0_161]
>         at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1595)
>  ~[hadoop-common-2.7.0-mapr-1707.jar:na]
>         at 
> org.apache.drill.exec.work.fragment.FragmentExecutor.run(FragmentExecutor.java:280)
>  [drill-java-exec-1.15.0-SNAPSHOT.jar:1.15.0-SNAPSHOT]
>         ... 4 common frames omitted
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to