This is an automated email from the ASF dual-hosted git repository.

sorabh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit d5146c43986f09f132f4e96966082732a3740181
Author: Sorabh Hamirwasia <[email protected]>
AuthorDate: Mon Oct 1 14:15:33 2018 -0700

    DRILL-6766: Lateral Unnest query : IllegalStateException - rowId in right 
batch of lateral is smaller than rowId in left batch being processed
    Note: Issue was in StreamingAgg where if output from one or multiple input 
batch was splitting into multiple output batch, then remaining input
    records were discarded after producing first output batch
    closes #1490
---
 .../physical/impl/aggregate/StreamingAggBatch.java |  74 +++--
 .../impl/aggregate/StreamingAggTemplate.java       |  56 ++--
 .../impl/aggregate/StreamingAggregator.java        |  19 +-
 .../exec/physical/impl/join/LateralJoinBatch.java  |   4 +-
 .../exec/physical/impl/unnest/UnnestImpl.java      |  12 +-
 .../drill/exec/record/AbstractRecordBatch.java     |  10 +-
 .../impl/agg/TestStreamingAggEmitOutcome.java      | 315 ++++++++++++++++++++-
 7 files changed, 423 insertions(+), 67 deletions(-)

diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
index 2b9b317..ffcfa78 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.physical.impl.aggregate;
 import java.io.IOException;
 import java.util.List;
 
+import 
org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.exceptions.UserException;
@@ -71,6 +72,7 @@ import static 
org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK;
 import static 
org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.STOP;
 
 public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> 
{
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(StreamingAggBatch.class);
@@ -104,7 +106,7 @@ public class StreamingAggBatch extends 
AbstractRecordBatch<StreamingAggregate> {
                                  // call to inner next is made.
   private boolean sendEmit = false; // In the case where we see an 
OK_NEW_SCHEMA along with the end of a data set
                                     // we send out a batch with OK_NEW_SCHEMA 
first, then in the next iteration,
-                                    // we send out an emopty batch with EMIT.
+                                    // we send out an empty batch with EMIT.
   private IterOutcome lastKnownOutcome = OK; // keep track of the outcome from 
the previous call to incoming.next
   private boolean firstBatchForSchema = true; // true if the current batch 
came in with an OK_NEW_SCHEMA
   private boolean firstBatchForDataSet = true; // true if the current batch is 
the first batch in a data set
@@ -127,7 +129,11 @@ public class StreamingAggBatch extends 
AbstractRecordBatch<StreamingAggregate> {
   private boolean specialBatchSent = false;
   private static final int SPECIAL_BATCH_COUNT = 1;
 
-  public StreamingAggBatch(StreamingAggregate popConfig, RecordBatch incoming, 
FragmentContext context) throws OutOfMemoryException {
+  // TODO: Needs to adapt to batch sizing rather than hardcoded constant value
+  private int maxOutputRowCount = ValueVector.MAX_ROW_COUNT;
+
+  public StreamingAggBatch(StreamingAggregate popConfig, RecordBatch incoming, 
FragmentContext context)
+    throws OutOfMemoryException {
     super(popConfig, context);
     this.incoming = incoming;
 
@@ -189,7 +195,7 @@ public class StreamingAggBatch extends 
AbstractRecordBatch<StreamingAggregate> {
 
     // if a special batch has been sent, we have no data in the incoming so 
exit early
     if (done || specialBatchSent) {
-      assert (sendEmit != true); // if special batch sent with emit then flag 
will not be set
+      assert (!sendEmit); // if special batch sent with emit then flag will 
not be set
       return NONE;
     }
 
@@ -199,6 +205,7 @@ public class StreamingAggBatch extends 
AbstractRecordBatch<StreamingAggregate> {
       first = false; // first is set only in the case when we see a NONE after 
an empty first (and only) batch
       sendEmit = false;
       firstBatchForDataSet = true;
+      firstBatchForSchema = false;
       recordCount = 0;
       specialBatchSent = false;
       return EMIT;
@@ -239,18 +246,17 @@ public class StreamingAggBatch extends 
AbstractRecordBatch<StreamingAggregate> {
             done = true;
             return IterOutcome.STOP;
           }
+          firstBatchForSchema = true;
           break;
         case EMIT:
           // if we get an EMIT with an empty batch as the first (and therefore 
only) batch
           // we have to do the special handling
           if (firstBatchForDataSet && popConfig.getKeys().size() == 0 && 
incoming.getRecordCount() == 0) {
             constructSpecialBatch();
-            firstBatchForDataSet = true; // reset on the next iteration
             // If outcome is NONE then we send the special batch in the first 
iteration and the NONE
             // outcome in the next iteration. If outcome is EMIT, we can send 
the special
             // batch and the EMIT outcome at the same time. (unless the 
finalOutcome is OK_NEW_SCHEMA)
-            IterOutcome finalOutcome =  getFinalOutcome();
-            return finalOutcome;
+            return  getFinalOutcome();
           }
           // else fall thru
         case OK:
@@ -259,15 +265,18 @@ public class StreamingAggBatch extends 
AbstractRecordBatch<StreamingAggregate> {
           throw new IllegalStateException(String.format("unknown outcome %s", 
lastKnownOutcome));
       }
     } else {
-      if ( lastKnownOutcome != NONE && firstBatchForDataSet && 
!aggregator.isDone()) {
+      // If this is not the first batch and previous batch is fully processed 
with no error condition or NONE is not
+      // seen then it will call next() on upstream to get new batch. Otherwise 
just process the previous incoming batch
+      if ( lastKnownOutcome != NONE && firstBatchForDataSet && 
!aggregator.isDone()
+        && aggregator.previousBatchProcessed()) {
         lastKnownOutcome = incoming.next();
         if (!first ) {
           //Setup needs to be called again. During setup, generated code saves 
a reference to the vectors
-          // pointed to by the incoming batch so that the dereferencing of the 
vector wrappers to get to
+          // pointed to by the incoming batch so that the de-referencing of 
the vector wrappers to get to
           // the vectors  does not have to be done at each call to eval. 
However, after an EMIT is seen,
           // the vectors are replaced and the reference to the old vectors is 
no longer valid
           try {
-            aggregator.setup(oContext, incoming, this);
+            aggregator.setup(oContext, incoming, this, maxOutputRowCount);
           } catch (SchemaChangeException e) {
             UserException.Builder exceptionBuilder = 
UserException.functionError(e)
                 .message("A Schema change exception occured in calling setup() 
in generated code.");
@@ -280,8 +289,10 @@ public class StreamingAggBatch extends 
AbstractRecordBatch<StreamingAggregate> {
     recordCount = aggregator.getOutputCount();
     container.setRecordCount(recordCount);
     logger.debug("Aggregator response {}, records {}", aggOutcome, 
aggregator.getOutputCount());
-    // overwrite the outcome variable since we no longer need to remember the 
first batch outcome
-    lastKnownOutcome = aggregator.getOutcome();
+    // get the returned IterOutcome from aggregator and based on AggOutcome 
and returned IterOutcome update the
+    // lastKnownOutcome below. For example: if AggOutcome is RETURN_AND_RESET 
then lastKnownOutcome is always set to
+    // EMIT
+    IterOutcome returnOutcome = aggregator.getOutcome();
     switch (aggOutcome) {
       case CLEANUP_AND_RETURN:
         if (!first) {
@@ -289,7 +300,7 @@ public class StreamingAggBatch extends 
AbstractRecordBatch<StreamingAggregate> {
         }
         done = true;
         ExternalSortBatch.releaseBatches(incoming);
-        return lastKnownOutcome;
+        return returnOutcome;
       case RETURN_AND_RESET:
         //WE could have got a string of batches, all empty, until we hit an 
emit
         if (firstBatchForDataSet && popConfig.getKeys().size() == 0 && 
recordCount == 0) {
@@ -298,28 +309,32 @@ public class StreamingAggBatch extends 
AbstractRecordBatch<StreamingAggregate> {
           // If outcome is NONE then we send the special batch in the first 
iteration and the NONE
           // outcome in the next iteration. If outcome is EMIT, we can send 
the special
           // batch and the EMIT outcome at the same time.
-
-          IterOutcome finalOutcome =  getFinalOutcome();
-          return finalOutcome;
+          return getFinalOutcome();
         }
         firstBatchForDataSet = true;
         firstBatchForSchema = false;
         if(first) {
           first = false;
         }
-        if(lastKnownOutcome == OK_NEW_SCHEMA) {
-          sendEmit = true;
+        // Since AggOutcome is RETURN_AND_RESET and returned IterOutcome is 
OK_NEW_SCHEMA from Aggregator that means it
+        // has seen first batch with OK_NEW_SCHEMA and then last batch with 
EMIT outcome. In that case if all the input
+        // batch is processed to produce output batch it need to send and 
empty batch with EMIT outcome in subsequent
+        // next call.
+        if(returnOutcome == OK_NEW_SCHEMA) {
+          sendEmit = (aggregator == null) || 
aggregator.previousBatchProcessed();
         }
         // Release external sort batches after EMIT is seen
         ExternalSortBatch.releaseBatches(incoming);
-        return lastKnownOutcome;
+        lastKnownOutcome = EMIT;
+        return returnOutcome;
       case RETURN_OUTCOME:
         // In case of complex writer expression, vectors would be added to 
batch run-time.
         // We have to re-build the schema.
         if (complexWriters != null) {
           container.buildSchema(SelectionVectorMode.NONE);
         }
-        if (lastKnownOutcome == IterOutcome.NONE ) {
+        if (returnOutcome == IterOutcome.NONE ) {
+          lastKnownOutcome = NONE;
           // we will set the 'done' flag in the next call to innerNext and use 
the lastKnownOutcome
           // to determine whether we should set the flag or not.
           // This is so that if someone calls getRecordCount in between calls 
to innerNext, we will
@@ -330,11 +345,12 @@ public class StreamingAggBatch extends 
AbstractRecordBatch<StreamingAggregate> {
           } else {
             return OK;
           }
-        } else if (lastKnownOutcome == OK && first) {
+        } else if (returnOutcome == OK && first) {
           lastKnownOutcome = OK_NEW_SCHEMA;
+          returnOutcome = OK_NEW_SCHEMA;
         }
         first = false;
-        return lastKnownOutcome;
+        return returnOutcome;
       case UPDATE_AGGREGATOR:
         // We could get this either between data sets or within a data set.
         // If the former, we can handle the change and so need to update the 
aggregator and
@@ -342,8 +358,9 @@ public class StreamingAggBatch extends 
AbstractRecordBatch<StreamingAggregate> {
         // and exception
         // This case is not tested since there are no unit tests for this and 
there is no support
         // from the sort operator for this case
-        if (lastKnownOutcome == EMIT) {
+        if (returnOutcome == EMIT) {
           createAggregator();
+          lastKnownOutcome = EMIT;
           return OK_NEW_SCHEMA;
         } else {
           
context.getExecutorState().fail(UserException.unsupportedError().message(SchemaChangeException
@@ -351,6 +368,7 @@ public class StreamingAggBatch extends 
AbstractRecordBatch<StreamingAggregate> {
                   incoming.getSchema()).getMessage()).build(logger));
           close();
           killIncoming(false);
+          lastKnownOutcome = STOP;
           return IterOutcome.STOP;
         }
       default:
@@ -433,7 +451,7 @@ public class StreamingAggBatch extends 
AbstractRecordBatch<StreamingAggregate> {
     ClassGenerator<StreamingAggregator> cg = 
CodeGenerator.getRoot(StreamingAggTemplate.TEMPLATE_DEFINITION, 
context.getOptions());
     cg.getCodeGenerator().plainJavaCapable(true);
     // Uncomment out this line to debug the generated code.
-    //  cg.getCodeGenerator().saveCodeForDebugging(true);
+    //cg.getCodeGenerator().saveCodeForDebugging(true);
     container.clear();
 
     LogicalExpression[] keyExprs = new 
LogicalExpression[popConfig.getKeys().size()];
@@ -506,7 +524,7 @@ public class StreamingAggBatch extends 
AbstractRecordBatch<StreamingAggregate> {
 
     container.buildSchema(SelectionVectorMode.NONE);
     StreamingAggregator agg = context.getImplementationClass(cg);
-    agg.setup(oContext, incoming, this);
+    agg.setup(oContext, incoming, this, maxOutputRowCount);
     allocateComplexWriters();
     return agg;
   }
@@ -651,7 +669,11 @@ public class StreamingAggBatch extends 
AbstractRecordBatch<StreamingAggregate> {
 
   @Override
   public void dump() {
-    logger.error("StreamingAggBatch[container={}, popConfig={}, aggregator={}, 
incomingSchema={}]",
-        container, popConfig, aggregator, incomingSchema);
+    logger.error("StreamingAggBatch[container={}, popConfig={}, aggregator={}, 
incomingSchema={}]", container, popConfig, aggregator, incomingSchema);
+  }
+
+  @VisibleForTesting
+  public void setMaxOutputRowCount(int maxOutputRowCount) {
+    this.maxOutputRowCount = maxOutputRowCount;
   }
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
index 4bde7ab..cc89f23 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
@@ -24,6 +24,7 @@ import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.RecordBatch.IterOutcome;
 import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.vector.ValueVector;
 
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
@@ -33,7 +34,7 @@ import static 
org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA
 public abstract class StreamingAggTemplate implements StreamingAggregator {
   private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(StreamingAggregator.class);
   private static final boolean EXTRA_DEBUG = false;
-  private static final int OUTPUT_BATCH_SIZE = 32*1024;
+  private int maxOutputRows = ValueVector.MAX_ROW_COUNT;
 
   // lastOutcome is set ONLY if the lastOutcome was NONE or STOP
   private IterOutcome lastOutcome = null;
@@ -54,7 +55,7 @@ public abstract class StreamingAggTemplate implements 
StreamingAggregator {
   // (i.e if a selection vector the sv4/sv2 entry has been dereferenced or if 
a vector then the record index itself)
   private int previousIndex = -1;  // the last index that has been processed. 
Initialized to -1 every time a new
                                    // aggregate group begins (including every 
time a new data set begins)
-  private int currentIndex; // current index being processed
+  private int currentIndex = Integer.MAX_VALUE; // current index being 
processed
   /**
    * Number of records added to the current aggregation group.
    */
@@ -72,10 +73,12 @@ public abstract class StreamingAggTemplate implements 
StreamingAggregator {
 
 
   @Override
-  public void setup(OperatorContext context, RecordBatch incoming, 
StreamingAggBatch outgoing) throws SchemaChangeException {
+  public void setup(OperatorContext context, RecordBatch incoming,
+                    StreamingAggBatch outgoing, int outputRowCount) throws 
SchemaChangeException {
     this.context = context;
     this.incoming = incoming;
     this.outgoing = outgoing;
+    this.maxOutputRows = outputRowCount;
     setupInterior(incoming, outgoing);
   }
 
@@ -109,7 +112,7 @@ public abstract class StreamingAggTemplate implements 
StreamingAggregator {
       allocateOutgoing();
 
       if (firstBatchForDataSet) {
-        this.currentIndex = incoming.getRecordCount() == 0 ? 0 : 
this.getVectorIndex(underlyingIndex);
+        this.currentIndex = incoming.getRecordCount() == 0 ? Integer.MAX_VALUE 
: this.getVectorIndex(underlyingIndex);
 
         if (outerOutcome == OK_NEW_SCHEMA) {
           firstBatchForSchema = true;
@@ -178,9 +181,10 @@ public abstract class StreamingAggTemplate implements 
StreamingAggregator {
         // loop through existing records, adding as necessary.
         if(!processRemainingRecordsInBatch()) {
           // output batch is full. Return.
-          return setOkAndReturn();
+          return setOkAndReturn(outerOutcome);
         }
-        // if the current batch came with an EMIT, we're done
+        // if the current batch came with an EMIT, we're done since if we are 
here it means output batch consumed all
+        // the rows in incoming batch
         if(outerOutcome == EMIT) {
           // output the last record
           outputToBatch(previousIndex);
@@ -215,14 +219,14 @@ public abstract class StreamingAggTemplate implements 
StreamingAggregator {
                 done = true;
                 lastOutcome = out;
                 if (firstBatchForDataSet && addedRecordCount == 0) {
-                  return setOkAndReturn();
+                  return setOkAndReturn(NONE);
                 } else if (addedRecordCount > 0) {
                   outputToBatchPrev(previous, previousIndex, outputCount); // 
No need to check the return value
                   // (output container full or not) as we are not going to 
insert any more records.
                   if (EXTRA_DEBUG) {
                     logger.debug("Received no more batches, returning.");
                   }
-                  return setOkAndReturn();
+                  return setOkAndReturn(NONE);
                 } else {
                   // not first batch and record Count == 0
                   outcome = out;
@@ -237,6 +241,7 @@ public abstract class StreamingAggTemplate implements 
StreamingAggregator {
                   }
                 } else {
                   resetIndex();
+                  currentIndex = this.getVectorIndex(underlyingIndex);
                   if (previousIndex != -1 && isSamePrev(previousIndex, 
previous, currentIndex)) {
                     if (EXTRA_DEBUG) {
                       logger.debug("New value was same as last value of 
previous batch, adding.");
@@ -256,13 +261,16 @@ public abstract class StreamingAggTemplate implements 
StreamingAggregator {
                         if (EXTRA_DEBUG) {
                           logger.debug("Output container is full. flushing 
it.");
                         }
-                        return setOkAndReturnEmit();
+                        return setOkAndReturn(EMIT);
                       }
                     }
                     // important to set the previous index to -1 since we 
start a new group
                     previousIndex = -1;
                   }
-                  processRemainingRecordsInBatch();
+                  if (!processRemainingRecordsInBatch()) {
+                    // output batch is full. Return.
+                    return setOkAndReturn(EMIT);
+                  }
                   outputToBatch(previousIndex); // currentIndex has been reset 
to int_max so use previous index.
                 }
                 resetIndex();
@@ -285,7 +293,7 @@ public abstract class StreamingAggTemplate implements 
StreamingAggregator {
                     logger.debug("Wrote out end of previous batch, 
returning.");
                   }
                   newSchema = true;
-                  return setOkAndReturn();
+                  return setOkAndReturn(OK_NEW_SCHEMA);
                 }
                 cleanup();
                 return AggOutcome.UPDATE_AGGREGATOR;
@@ -294,6 +302,7 @@ public abstract class StreamingAggTemplate implements 
StreamingAggregator {
                 if (incoming.getRecordCount() == 0) {
                   continue;
                 } else {
+                  currentIndex = this.getVectorIndex(underlyingIndex);
                   if (previousIndex != -1 && isSamePrev(previousIndex, 
previous, currentIndex)) {
                     if (EXTRA_DEBUG) {
                       logger.debug("New value was same as last value of 
previous batch, adding.");
@@ -315,7 +324,7 @@ public abstract class StreamingAggTemplate implements 
StreamingAggregator {
                           logger.debug("Output container is full. flushing 
it.");
                         }
                         previousIndex = -1;
-                        return setOkAndReturn();
+                        return setOkAndReturn(OK);
                       }
                     }
                     previousIndex = -1;
@@ -405,8 +414,8 @@ public abstract class StreamingAggTemplate implements 
StreamingAggregator {
   }
 
   private final void resetIndex() {
-    underlyingIndex = -1;
-    incIndex();
+    underlyingIndex = 0;
+    currentIndex = Integer.MAX_VALUE;
   }
 
   /**
@@ -414,7 +423,7 @@ public abstract class StreamingAggTemplate implements 
StreamingAggregator {
    *
    * @return outcome
    */
-  private final AggOutcome setOkAndReturn() {
+  private final AggOutcome setOkAndReturn(IterOutcome seenOutcome) {
     IterOutcome outcomeToReturn;
     firstBatchForDataSet = false;
     if (firstBatchForSchema) {
@@ -428,7 +437,7 @@ public abstract class StreamingAggTemplate implements 
StreamingAggregator {
     for (VectorWrapper<?> v : outgoing) {
       v.getValueVector().getMutator().setValueCount(outputCount);
     }
-    return AggOutcome.RETURN_OUTCOME;
+    return (seenOutcome == EMIT) ? AggOutcome.RETURN_AND_RESET : 
AggOutcome.RETURN_OUTCOME;
   }
 
   /**
@@ -457,7 +466,7 @@ public abstract class StreamingAggTemplate implements 
StreamingAggregator {
   // Returns output container status after insertion of the given record. 
Caller must check the return value if it
   // plans to insert more records into outgoing container.
   private final boolean outputToBatch(int inIndex) {
-    assert outputCount < OUTPUT_BATCH_SIZE:
+    assert outputCount < maxOutputRows :
         "Outgoing RecordBatch is not flushed. It reached its max capacity in 
the last update";
 
     outputRecordKeys(inIndex, outputCount);
@@ -470,14 +479,13 @@ public abstract class StreamingAggTemplate implements 
StreamingAggregator {
     resetValues();
     outputCount++;
     addedRecordCount = 0;
-
-    return outputCount == OUTPUT_BATCH_SIZE;
+    return outputCount == maxOutputRows;
   }
 
   // Returns output container status after insertion of the given record. 
Caller must check the return value if it
   // plans to inserts more record into outgoing container.
   private final boolean outputToBatchPrev(InternalBatch b1, int inIndex, int 
outIndex) {
-    assert outputCount < OUTPUT_BATCH_SIZE:
+    assert outputCount < maxOutputRows :
         "Outgoing RecordBatch is not flushed. It reached its max capacity in 
the last update";
 
     outputRecordKeysPrev(b1, inIndex, outIndex);
@@ -485,8 +493,7 @@ public abstract class StreamingAggTemplate implements 
StreamingAggregator {
     resetValues();
     outputCount++;
     addedRecordCount = 0;
-
-    return outputCount == OUTPUT_BATCH_SIZE;
+    return outputCount == maxOutputRows;
   }
 
   private void addRecordInc(int index) {
@@ -508,6 +515,11 @@ public abstract class StreamingAggTemplate implements 
StreamingAggregator {
         + "]";
   }
 
+  @Override
+  public boolean previousBatchProcessed() {
+    return (currentIndex == Integer.MAX_VALUE);
+  }
+
   public abstract void setupInterior(@Named("incoming") RecordBatch incoming, 
@Named("outgoing") RecordBatch outgoing) throws SchemaChangeException;
   public abstract boolean isSame(@Named("index1") int index1, @Named("index2") 
int index2);
   public abstract boolean isSamePrev(@Named("b1Index") int b1Index, 
@Named("b1") InternalBatch b1, @Named("b2Index") int b2Index);
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java
index 23fdcc1..57caa9f 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java
@@ -25,7 +25,8 @@ import org.apache.drill.exec.record.RecordBatch.IterOutcome;
 
 public interface StreamingAggregator {
 
-  public static TemplateClassDefinition<StreamingAggregator> 
TEMPLATE_DEFINITION = new 
TemplateClassDefinition<StreamingAggregator>(StreamingAggregator.class, 
StreamingAggTemplate.class);
+  TemplateClassDefinition<StreamingAggregator> TEMPLATE_DEFINITION =
+    new 
TemplateClassDefinition<StreamingAggregator>(StreamingAggregator.class, 
StreamingAggTemplate.class);
 
 
   /**
@@ -45,25 +46,27 @@ public interface StreamingAggregator {
    * <p>
    * @see 
org.apache.drill.exec.physical.impl.aggregate.HashAggregator.AggOutcome 
HashAggregator.AggOutcome
    */
-  public static enum AggOutcome {
+  enum AggOutcome {
     RETURN_OUTCOME,
     CLEANUP_AND_RETURN,
     UPDATE_AGGREGATOR,
     RETURN_AND_RESET;
   }
 
-  public abstract void setup(OperatorContext context, RecordBatch incoming, 
StreamingAggBatch outgoing) throws SchemaChangeException;
+  void setup(OperatorContext context, RecordBatch incoming, StreamingAggBatch 
outgoing, int outputRowCount)
+    throws SchemaChangeException;
 
-  public abstract IterOutcome getOutcome();
+  IterOutcome getOutcome();
 
-  public abstract int getOutputCount();
+  int getOutputCount();
 
   // do the work. Also pass in the Iteroutcome of the batch already read in 
case it might be an EMIT. If the
   // outerOutcome is EMIT, we need to do the work without reading any more 
batches.
-  public abstract AggOutcome doWork(IterOutcome outerOutcome);
+  AggOutcome doWork(IterOutcome outerOutcome);
 
-  public abstract boolean isDone();
+  boolean isDone();
 
-  public abstract void cleanup();
+  void cleanup();
 
+  boolean previousBatchProcessed();
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
index 242687f..735f11f 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
@@ -389,7 +389,7 @@ public class LateralJoinBatch extends 
AbstractBinaryRecordBatch<LateralJoinPOP>
   private boolean handleSchemaChange() {
     try {
       stats.startSetup();
-      logger.debug("Setting up new schema based on incoming batch. Old output 
schema: %s", container.getSchema());
+      logger.debug("Setting up new schema based on incoming batch. Old output 
schema: {}", container.getSchema());
       setupNewSchema();
       return true;
     } catch (SchemaChangeException ex) {
@@ -805,7 +805,7 @@ public class LateralJoinBatch extends 
AbstractBinaryRecordBatch<LateralJoinPOP>
    */
   private void setupNewSchema() throws SchemaChangeException {
 
-    logger.debug("Setting up new schema based on incoming batch. New left 
schema: %s and New right schema: %s",
+    logger.debug("Setting up new schema based on incoming batch. New left 
schema: {} and New right schema: {}",
       left.getSchema(), right.getSchema());
 
     // Clear up the container
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestImpl.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestImpl.java
index 508999f..a9c9598 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestImpl.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestImpl.java
@@ -118,6 +118,7 @@ public class UnnestImpl implements Unnest {
     Preconditions.checkArgument(svMode == NONE, "Unnest does not support 
selection vector inputs.");
 
     final int initialInnerValueIndex = runningInnerValueIndex;
+    int nonEmptyArray = 0;
 
     outer:
     {
@@ -126,8 +127,12 @@ public class UnnestImpl implements Unnest {
 
       for (; valueIndex < valueCount; valueIndex++) {
         final int innerValueCount = accessor.getInnerValueCountAt(valueIndex);
-        logger.debug("Unnest: currentRecord: {}, innerValueCount: {}, record 
count: {}, output limit: {}",
-            innerValueCount, recordCount, outputLimit);
+        logger.trace("Unnest: CurrentRowId: {}, innerValueCount: {}, 
outputIndex: {},  output limit: {}",
+            valueIndex, innerValueCount, outputIndex, outputLimit);
+
+        if (innerValueCount > 0) {
+          ++nonEmptyArray;
+        }
 
         for (; innerValueIndex < innerValueCount; innerValueIndex++) {
           // If we've hit the batch size limit, stop and flush what we've got 
so far.
@@ -148,6 +153,9 @@ public class UnnestImpl implements Unnest {
       }  // forevery value in the array
     }  // for every incoming record
     final int delta = runningInnerValueIndex - initialInnerValueIndex;
+    logger.debug("Unnest: Finished processing current batch. [Details: 
LastProcessedRowIndex: {}, " +
+      "RowsWithNonEmptyArrays: {}, outputIndex: {}, outputLimit: {}, 
TotalIncomingRecords: {}]",
+      valueIndex, nonEmptyArray, delta, outputLimit, accessor.getValueCount());
     final SchemaChangeCallBack callBack = new SchemaChangeCallBack();
     for (TransferPair t : transfers) {
       t.splitAndTransfer(initialInnerValueIndex, delta);
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
index 362ea29..eb6112d 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
@@ -135,13 +135,15 @@ public abstract class AbstractRecordBatch<T extends 
PhysicalOperator> implements
       return next;
     }
 
-    switch(next) {
+    boolean isNewSchema = false;
+    logger.debug("Received next batch for index: {} with outcome: {}", 
inputIndex, next);
+    switch (next) {
       case OK_NEW_SCHEMA:
-        stats.batchReceived(inputIndex, b.getRecordCount(), true);
-        break;
+        isNewSchema = true;
       case OK:
       case EMIT:
-        stats.batchReceived(inputIndex, b.getRecordCount(), false);
+        stats.batchReceived(inputIndex, b.getRecordCount(), isNewSchema);
+        logger.debug("Number of records in received batch: {}", 
b.getRecordCount());
         break;
       default:
         break;
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestStreamingAggEmitOutcome.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestStreamingAggEmitOutcome.java
index cead984..37a44ea 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestStreamingAggEmitOutcome.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestStreamingAggEmitOutcome.java
@@ -527,6 +527,232 @@ public class TestStreamingAggEmitOutcome extends 
BaseTestOpBatchEmitOutcome {
     nonEmptyInputRowSet2.clear();
   }
 
+  /**
+   * Verifies scenario where multiple incoming batches received with 
OK_NEW_SCHEMA, OK, OK, EMIT whose output is split
+   * into multiple output batches is handled correctly such that first output 
is produced with OK_NEW_SCHEMA and then
+   * followed by EMIT outcome
+   */
+  @Test
+  public void t8_1_testStreamingAggr_InputSplitToMultipleOutputBatch() {
+
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = 
operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 20, "item1")
+      .build();
+    final RowSet.SingleRowSet nonEmptyInputRowSet3 = 
operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(2, 30, "item2")
+      .build();
+
+    final RowSet.SingleRowSet nonEmptyInputRowSet4 = 
operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(2, 40, "item2")
+      .addRow(2, 50, "item2")
+      .addRow(2, 60, "item2")
+      .addRow(2, 70, "item2")
+      .addRow(3, 100, "item3")
+      .addRow(3, 200, "item3")
+      .addRow(3, 300, "item3")
+      .addRow(3, 400, "item3")
+      .build();
+
+    TupleMetadata resultSchema2 = new SchemaBuilder()
+      .add("name", TypeProtos.MinorType.VARCHAR)
+      .add("id", TypeProtos.MinorType.INT)
+      .add("total_count", TypeProtos.MinorType.BIGINT)
+      .buildSchema();
+
+    final RowSet.SingleRowSet expectedRowSet1 = 
operatorFixture.rowSetBuilder(resultSchema2)
+      .addRow("item1", 1, (long)2)
+      .addRow("item2", 2, (long)5)
+      .build();
+
+    final RowSet.SingleRowSet expectedRowSet2 = 
operatorFixture.rowSetBuilder(resultSchema2)
+      .addRow("item3", 3, (long)4)
+      .build();
+
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+    inputContainer.add(nonEmptyInputRowSet3.container());
+    inputContainer.add(nonEmptyInputRowSet4.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    final MockRecordBatch mockInputBatch = new 
MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+    final StreamingAggregate streamAggrConfig = new StreamingAggregate(null,
+      parseExprs("name_left", "name", "id_left", "id"),
+      parseExprs("count(cost_left)", "total_count"),
+      1.0f);
+
+    final StreamingAggBatch strAggBatch = new 
StreamingAggBatch(streamAggrConfig, mockInputBatch,
+      operatorFixture.getFragmentContext());
+    strAggBatch.setMaxOutputRowCount(2);
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    // Expect OK_NEW_SCHEMA first for all the input batch from second batch 
onwards since output batch is full after
+    // producing 2 groups as output
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertEquals(2, strAggBatch.getRecordCount());
+
+    RowSet actualRowSet = 
DirectRowSet.fromContainer(strAggBatch.getContainer());
+    new RowSetComparison(expectedRowSet1).verify(actualRowSet);
+
+    // The last group was produced in different output batch with EMIT outcome
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(1, strAggBatch.getRecordCount());
+    actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer());
+    new RowSetComparison(expectedRowSet2).verify(actualRowSet);
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE);
+
+    nonEmptyInputRowSet2.clear();
+    nonEmptyInputRowSet3.clear();
+    nonEmptyInputRowSet4.clear();
+
+    expectedRowSet1.clear();
+    expectedRowSet2.clear();
+  }
+
+  /**
+   * Verifies scenario where multiple incoming batches received with 
OK_NEW_SCHEMA, OK, OK, EMIT whose output is split
+   * into multiple output batches and incoming batches received with 
OK,OK,EMIT whose output is also split across
+   * multiple output batches is handled correctly.
+   */
+  @Test
+  public void 
t8_2_testStreamingAggr_Inputs_OK_EMIT_SplitToMultipleOutputBatch() {
+
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = 
operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 20, "item1")
+      .build();
+    final RowSet.SingleRowSet nonEmptyInputRowSet3 = 
operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(2, 30, "item2")
+      .build();
+
+    final RowSet.SingleRowSet nonEmptyInputRowSet4 = 
operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(2, 40, "item2")
+      .addRow(2, 50, "item2")
+      .addRow(2, 60, "item2")
+      .addRow(2, 70, "item2")
+      .addRow(3, 100, "item3")
+      .addRow(3, 200, "item3")
+      .addRow(3, 300, "item3")
+      .addRow(3, 400, "item3")
+      .build();
+
+    final RowSet.SingleRowSet nonEmptyInputRowSet5 = 
operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(2, 40, "item2")
+      .build();
+
+    final RowSet.SingleRowSet nonEmptyInputRowSet6 = 
operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(2, 50, "item2")
+      .build();
+
+    final RowSet.SingleRowSet nonEmptyInputRowSet7 = 
operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(3, 130, "item3")
+      .addRow(3, 130, "item3")
+      .addRow(4, 140, "item4")
+      .addRow(4, 140, "item4")
+      .build();
+
+    TupleMetadata resultSchema2 = new SchemaBuilder()
+      .add("name", TypeProtos.MinorType.VARCHAR)
+      .add("id", TypeProtos.MinorType.INT)
+      .add("total_count", TypeProtos.MinorType.BIGINT)
+      .buildSchema();
+
+    final RowSet.SingleRowSet expectedRowSet1 = 
operatorFixture.rowSetBuilder(resultSchema2)
+      .addRow("item1", 1, (long)2)
+      .addRow("item2", 2, (long)5)
+      .build();
+
+    final RowSet.SingleRowSet expectedRowSet2 = 
operatorFixture.rowSetBuilder(resultSchema2)
+      .addRow("item3", 3, (long)4)
+      .build();
+
+    final RowSet.SingleRowSet expectedRowSet3 = 
operatorFixture.rowSetBuilder(resultSchema2)
+      .addRow("item2", 2, (long)2)
+      .addRow("item3", 3, (long)2)
+      .build();
+
+    final RowSet.SingleRowSet expectedRowSet4 = 
operatorFixture.rowSetBuilder(resultSchema2)
+      .addRow("item4", 4, (long)2)
+      .build();
+
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+    inputContainer.add(nonEmptyInputRowSet3.container());
+    inputContainer.add(nonEmptyInputRowSet4.container());
+    inputContainer.add(nonEmptyInputRowSet5.container());
+    inputContainer.add(nonEmptyInputRowSet6.container());
+    inputContainer.add(nonEmptyInputRowSet7.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    final MockRecordBatch mockInputBatch = new 
MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+    final StreamingAggregate streamAggrConfig = new StreamingAggregate(null,
+      parseExprs("name_left", "name", "id_left", "id"),
+      parseExprs("count(cost_left)", "total_count"),
+      1.0f);
+
+    final StreamingAggBatch strAggBatch = new 
StreamingAggBatch(streamAggrConfig, mockInputBatch,
+      operatorFixture.getFragmentContext());
+    strAggBatch.setMaxOutputRowCount(2);
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+
+    // Output batches for input batch 2 to 5
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertEquals(2, strAggBatch.getRecordCount());
+
+    RowSet actualRowSet = 
DirectRowSet.fromContainer(strAggBatch.getContainer());
+    new RowSetComparison(expectedRowSet1).verify(actualRowSet);
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(1, strAggBatch.getRecordCount());
+    actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer());
+    new RowSetComparison(expectedRowSet2).verify(actualRowSet);
+
+    // Output batches for input batch 6 to 8
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK);
+    // output batch is full after producing 2 rows
+    assertEquals(2, strAggBatch.getRecordCount());
+    actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer());
+    new RowSetComparison(expectedRowSet3).verify(actualRowSet);
+
+    // output batch with pending rows
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(1, strAggBatch.getRecordCount());
+    actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer());
+    new RowSetComparison(expectedRowSet4).verify(actualRowSet);
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE);
+
+    nonEmptyInputRowSet2.clear();
+    nonEmptyInputRowSet3.clear();
+    nonEmptyInputRowSet4.clear();
+    nonEmptyInputRowSet5.clear();
+    nonEmptyInputRowSet6.clear();
+    nonEmptyInputRowSet7.clear();
+
+    expectedRowSet1.clear();
+    expectedRowSet2.clear();
+    expectedRowSet3.clear();
+    expectedRowSet4.clear();
+  }
 
   
/*****************************************************************************************
    Tests for validating regular StreamingAggr behavior with no EMIT outcome
@@ -620,6 +846,88 @@ public class TestStreamingAggEmitOutcome extends 
BaseTestOpBatchEmitOutcome {
     assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE);
   }
 
+  @Test
+  public void t10_1_testStreamingAggr_InputSplitToMultipleOutputBatch() {
+
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = 
operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 20, "item1")
+      .build();
+    final RowSet.SingleRowSet nonEmptyInputRowSet3 = 
operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(2, 30, "item2")
+      .build();
+
+    final RowSet.SingleRowSet nonEmptyInputRowSet4 = 
operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(2, 40, "item2")
+      .addRow(2, 50, "item2")
+      .addRow(2, 60, "item2")
+      .addRow(2, 70, "item2")
+      .addRow(3, 100, "item3")
+      .addRow(3, 200, "item3")
+      .addRow(3, 300, "item3")
+      .addRow(3, 400, "item3")
+      .build();
+
+    TupleMetadata resultSchema2 = new SchemaBuilder()
+      .add("name", TypeProtos.MinorType.VARCHAR)
+      .add("id", TypeProtos.MinorType.INT)
+      .add("total_count", TypeProtos.MinorType.BIGINT)
+      .buildSchema();
+
+    final RowSet.SingleRowSet expectedRowSet1 = 
operatorFixture.rowSetBuilder(resultSchema2)
+      .addRow("item1", 1, (long)2)
+      .addRow("item2", 2, (long)5)
+      .build();
+
+    final RowSet.SingleRowSet expectedRowSet2 = 
operatorFixture.rowSetBuilder(resultSchema2)
+      .addRow("item3", 3, (long)4)
+      .build();
+
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+    inputContainer.add(nonEmptyInputRowSet3.container());
+    inputContainer.add(nonEmptyInputRowSet4.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+
+    final MockRecordBatch mockInputBatch = new 
MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+    final StreamingAggregate streamAggrConfig = new StreamingAggregate(null,
+      parseExprs("name_left", "name", "id_left", "id"),
+      parseExprs("count(cost_left)", "total_count"),
+      1.0f);
+
+    final StreamingAggBatch strAggBatch = new 
StreamingAggBatch(streamAggrConfig, mockInputBatch,
+      operatorFixture.getFragmentContext());
+    strAggBatch.setMaxOutputRowCount(2);
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertEquals(2, strAggBatch.getRecordCount());
+
+    RowSet actualRowSet = 
DirectRowSet.fromContainer(strAggBatch.getContainer());
+    new RowSetComparison(expectedRowSet1).verify(actualRowSet);
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK);
+    assertEquals(1, strAggBatch.getRecordCount());
+    actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer());
+    new RowSetComparison(expectedRowSet2).verify(actualRowSet);
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE);
+
+    nonEmptyInputRowSet2.clear();
+    nonEmptyInputRowSet3.clear();
+    nonEmptyInputRowSet4.clear();
+
+    expectedRowSet1.clear();
+    expectedRowSet2.clear();
+  }
+
   /*******************************************************
    * Tests for EMIT with empty batches and no group by
    * (Tests t1-t8 are repeated with no group by)
@@ -813,14 +1121,15 @@ public class TestStreamingAggEmitOutcome extends 
BaseTestOpBatchEmitOutcome {
 
     assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
     assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertEquals(1, strAggBatch.getRecordCount()); // special batch
     assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
     assertEquals(0, strAggBatch.getRecordCount());
     assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
-    assertEquals(1, strAggBatch.getRecordCount());
+    assertEquals(1, strAggBatch.getRecordCount()); // special batch
     assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
-    assertEquals(1, strAggBatch.getRecordCount());
+    assertEquals(1, strAggBatch.getRecordCount()); // special batch
     assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
-    assertEquals(1, strAggBatch.getRecordCount());
+    assertEquals(1, strAggBatch.getRecordCount()); // data batch
 
     RowSet actualRowSet = 
DirectRowSet.fromContainer(strAggBatch.getContainer());
     new RowSetComparison(expectedRowSet).verify(actualRowSet);

Reply via email to