parthchandra closed pull request #1358:  DRILL-6516: EMIT support in streaming 
agg
URL: https://github.com/apache/drill/pull/1358
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
index caeed50df3f..882c36d746d 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
@@ -67,15 +67,49 @@
 import com.sun.codemodel.JVar;
 import org.apache.drill.exec.vector.complex.writer.BaseWriter;
 
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK;
+import static 
org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
+
 public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> 
{
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(StreamingAggBatch.class);
 
   private StreamingAggregator aggregator;
   private final RecordBatch incoming;
   private List<BaseWriter.ComplexWriter> complexWriters;
-  private boolean done = false;
-  private boolean first = true;
-  private int recordCount = 0;
+  //
+  // Streaming agg can be in (a) a normal pipeline or (b) it may be in a 
pipeline that is part of a subquery involving
+  // lateral and unnest. In case(a), the aggregator proceeds normally until it 
sees a group change or a NONE. If a
+  // group has changed, the aggregated data is sent downstream and the 
aggregation continues with the next group. If
+  // a NONE is seen, the aggregator completes, sends data downstream and 
cleans up.
+  // In case (b), the aggregator behaves similar to case(a) if a group change 
or NONE is observed. However it will
+  // also encounter a new state EMIT, every time unnest processes a new row. 
In this case the aggregator must complete the
+  // aggregation, send out the results, AND reset to receive more data. To 
make the treatment of these two cases
+  // similar, we define the aggregation operation in terms of data sets.
+  //   Data Set = The set of data that the aggregator is currently 
aggregating. In a normal query, the entire data is
+  //   a single data set. In the case of a Lateral subquery, every row 
processed by unnest is a data set.  There can,
+  //   therefore, be one or more data sets in an aggregation.
+  //   Data Sets may have multiple batches and may contain one or more empty 
batches. A data set may consist entirely
+  //   of empty batches.
+  //   Schema may change across Data Sets.
+  //   A corner case is the case of a Data Set having many empty batches in 
the beginning. Such a data set may see a
+  //   schema change once the first non-empty batch is received.
+  //   Schema change within a Data Set is not supported.
+  //
+  //   We will define some states for internal management
+  //
+  private boolean done = false;  // END of all data
+  private boolean first = true;  // Beginning of new data set. True during the 
build schema phase. False once the first
+                                 // call to inner next is made.
+  private boolean sendEmit = false; // In the case where we see an 
OK_NEW_SCHEMA along with the end of a data set
+                                    // we send out a batch with OK_NEW_SCHEMA 
first, then in the next iteration,
+                                    // we send out an emopty batch with EMIT.
+  private IterOutcome lastKnownOutcome = OK; // keep track of the outcome from 
the previous call to incoming.next
+  private boolean firstBatchForSchema = true; // true if the current batch 
came in with an OK_NEW_SCHEMA
+  private boolean firstBatchForDataSet = true; // true if the current batch is 
the first batch in a data set
+  private int recordCount = 0;  // number of records output in the current 
data set
+
   private BatchSchema incomingSchema;
 
   /*
@@ -154,83 +188,174 @@ public void buildSchema() throws SchemaChangeException {
   public IterOutcome innerNext() {
 
     // if a special batch has been sent, we have no data in the incoming so 
exit early
-    if (specialBatchSent) {
-      return IterOutcome.NONE;
+    if ( done || specialBatchSent) {
+      return NONE;
+    }
+
+    // We sent an OK_NEW_SCHEMA and also encountered the end of a data set. So 
we need to send
+    // an EMIT with an empty batch now
+    if (sendEmit) {
+      sendEmit = false;
+      firstBatchForDataSet = true;
+      recordCount = 0;
+      return EMIT;
     }
 
     // this is only called on the first batch. Beyond this, the aggregator 
manages batches.
     if (aggregator == null || first) {
-      IterOutcome outcome;
       if (first && incoming.getRecordCount() > 0) {
         first = false;
-        outcome = IterOutcome.OK_NEW_SCHEMA;
+        lastKnownOutcome = OK_NEW_SCHEMA;
       } else {
-        outcome = next(incoming);
+        lastKnownOutcome = next(incoming);
       }
-      logger.debug("Next outcome of {}", outcome);
-      switch (outcome) {
-      case NONE:
-        if (first && popConfig.getKeys().size() == 0) {
+      logger.debug("Next outcome of {}", lastKnownOutcome);
+      switch (lastKnownOutcome) {
+        case NONE:
+          if (firstBatchForDataSet && popConfig.getKeys().size() == 0) {
+            // if we have a straight aggregate and empty input batch, we need 
to handle it in a different way
+            constructSpecialBatch();
+            // set state to indicate the fact that we have sent a special 
batch and input is empty
+            specialBatchSent = true;
+            // If outcome is NONE then we send the special batch in the first 
iteration and the NONE
+            // outcome in the next iteration. If outcome is EMIT, we can send 
the special
+            // batch and the EMIT outcome at the same time.
+            return getFinalOutcome();
+          }
+          // else fall thru
+        case OUT_OF_MEMORY:
+        case NOT_YET:
+        case STOP:
+          return lastKnownOutcome;
+        case OK_NEW_SCHEMA:
+          if (!createAggregator()) {
+            done = true;
+            return IterOutcome.STOP;
+          }
+          break;
+        case EMIT:
+          // if we get an EMIT with an empty batch as the first (and therefore 
only) batch
+          // we have to do the special handling
+          if (firstBatchForDataSet && popConfig.getKeys().size() == 0 && 
incoming.getRecordCount() == 0) {
+            constructSpecialBatch();
+            // set state to indicate the fact that we have sent a special 
batch and input is empty
+            specialBatchSent = true;
+            firstBatchForDataSet = true; // reset on the next iteration
+            // If outcome is NONE then we send the special batch in the first 
iteration and the NONE
+            // outcome in the next iteration. If outcome is EMIT, we can send 
the special
+            // batch and the EMIT outcome at the same time.
+            return getFinalOutcome();
+          }
+          // else fall thru
+        case OK:
+          break;
+        default:
+          throw new IllegalStateException(String.format("unknown outcome %s", 
lastKnownOutcome));
+      }
+    } else {
+      if ( lastKnownOutcome != NONE && firstBatchForDataSet && 
!aggregator.isDone()) {
+        lastKnownOutcome = incoming.next();
+        if (!first ) {
+          //Setup needs to be called again. During setup, generated code saves 
a reference to the vectors
+          // pointed to by the incoming batch so that the dereferencing of the 
vector wrappers to get to
+          // the vectors  does not have to be done at each call to eval. 
However, after an EMIT is seen,
+          // the vectors are replaced and the reference to the old vectors is 
no longer valid
+          try {
+            aggregator.setup(oContext, incoming, this);
+          } catch (SchemaChangeException e) {
+            UserException.Builder exceptionBuilder = 
UserException.functionError(e)
+                .message("A Schema change exception occured in calling setup() 
in generated code.");
+            throw exceptionBuilder.build(logger);
+          }
+        }
+      }
+      // We sent an EMIT in the previous iteration, so we must be starting a 
new data set
+      if (firstBatchForDataSet) {
+        done = false;
+        sendEmit = false;
+        specialBatchSent = false;
+        firstBatchForDataSet = false;
+      }
+    }
+    AggOutcome aggOutcome = aggregator.doWork(lastKnownOutcome);
+    recordCount = aggregator.getOutputCount();
+    container.setRecordCount(recordCount);
+    logger.debug("Aggregator response {}, records {}", aggOutcome, 
aggregator.getOutputCount());
+    // overwrite the outcome variable since we no longer need to remember the 
first batch outcome
+    lastKnownOutcome = aggregator.getOutcome();
+    switch (aggOutcome) {
+      case CLEANUP_AND_RETURN:
+        if (!first) {
+          container.zeroVectors();
+        }
+        done = true;
+        ExternalSortBatch.releaseBatches(incoming);
+        return lastKnownOutcome;
+      case RETURN_AND_RESET:
+        //WE could have got a string of batches, all empty, until we hit an 
emit
+        if (firstBatchForDataSet && popConfig.getKeys().size() == 0 && 
recordCount == 0) {
           // if we have a straight aggregate and empty input batch, we need to 
handle it in a different way
           constructSpecialBatch();
-          first = false;
           // set state to indicate the fact that we have sent a special batch 
and input is empty
           specialBatchSent = true;
-          return IterOutcome.OK;
+          // If outcome is NONE then we send the special batch in the first 
iteration and the NONE
+          // outcome in the next iteration. If outcome is EMIT, we can send 
the special
+          // batch and the EMIT outcome at the same time.
+          return getFinalOutcome();
         }
-      case OUT_OF_MEMORY:
-      case NOT_YET:
-      case STOP:
-        return outcome;
-      case OK_NEW_SCHEMA:
-        if (!createAggregator()) {
-          done = true;
+        firstBatchForDataSet = true;
+        if(first) {
+          first = false;
+        }
+        if(lastKnownOutcome == OK_NEW_SCHEMA) {
+          sendEmit = true;
+        }
+        // Release external sort batches after EMIT is seen
+        ExternalSortBatch.releaseBatches(incoming);
+        return lastKnownOutcome;
+      case RETURN_OUTCOME:
+        // In case of complex writer expression, vectors would be added to 
batch run-time.
+        // We have to re-build the schema.
+        if (complexWriters != null) {
+          container.buildSchema(SelectionVectorMode.NONE);
+        }
+        if (lastKnownOutcome == IterOutcome.NONE ) {
+          // we will set the 'done' flag in the next call to innerNext and use 
the lastKnownOutcome
+          // to determine whether we should set the flag or not.
+          // This is so that if someone calls getRecordCount in between calls 
to innerNext, we will
+          // return the correct record count (if the done flag is set, we will 
return 0).
+          if (first) {
+            first = false;
+            return OK_NEW_SCHEMA;
+          } else {
+            return OK;
+          }
+        } else if (lastKnownOutcome == OK && first) {
+          lastKnownOutcome = OK_NEW_SCHEMA;
+        } else if (lastKnownOutcome != IterOutcome.OUT_OF_MEMORY) {
+          first = false;
+        }
+        return lastKnownOutcome;
+      case UPDATE_AGGREGATOR:
+        // We could get this either between data sets or within a data set.
+        // If the former, we can handle the change and so need to update the 
aggregator and
+        // continue. If the latter, we cannot (currently) handle the schema 
change, so throw
+        // and exception
+        // This case is not tested since there are no unit tests for this and 
there is no support
+        // from the sort operator for this case
+        if (lastKnownOutcome == EMIT) {
+          createAggregator();
+          return OK_NEW_SCHEMA;
+        } else {
+          
context.getExecutorState().fail(UserException.unsupportedError().message(SchemaChangeException
+              .schemaChanged("Streaming aggregate does not support schema 
changes", incomingSchema,
+                  incoming.getSchema()).getMessage()).build(logger));
+          close();
+          killIncoming(false);
           return IterOutcome.STOP;
         }
-        break;
-      case OK:
-        break;
       default:
-        throw new IllegalStateException(String.format("unknown outcome %s", 
outcome));
-      }
-    }
-    AggOutcome out = aggregator.doWork();
-    recordCount = aggregator.getOutputCount();
-    logger.debug("Aggregator response {}, records {}", out, 
aggregator.getOutputCount());
-    switch (out) {
-    case CLEANUP_AND_RETURN:
-      if (!first) {
-        container.zeroVectors();
-      }
-      done = true;
-      ExternalSortBatch.releaseBatches(incoming);
-      // fall through
-    case RETURN_OUTCOME:
-      IterOutcome outcome = aggregator.getOutcome();
-      // In case of complex writer expression, vectors would be added to batch 
run-time.
-      // We have to re-build the schema.
-      if (complexWriters != null) {
-        container.buildSchema(SelectionVectorMode.NONE);
-      }
-      if (outcome == IterOutcome.NONE && first) {
-        first = false;
-        done = true;
-        return IterOutcome.OK_NEW_SCHEMA;
-      } else if (outcome == IterOutcome.OK && first) {
-        outcome = IterOutcome.OK_NEW_SCHEMA;
-      } else if (outcome != IterOutcome.OUT_OF_MEMORY) {
-        first = false;
-      }
-      return outcome;
-    case UPDATE_AGGREGATOR:
-      context.getExecutorState().fail(UserException.unsupportedError()
-        .message(SchemaChangeException.schemaChanged("Streaming aggregate does 
not support schema changes", incomingSchema, incoming.getSchema()).getMessage())
-        .build(logger));
-      close();
-      killIncoming(false);
-      return IterOutcome.STOP;
-    default:
-      throw new IllegalStateException(String.format("Unknown state %s.", out));
+        throw new IllegalStateException(String.format("Unknown state %s.", 
aggOutcome));
     }
   }
 
@@ -309,7 +434,7 @@ private StreamingAggregator createAggregatorInternal() 
throws SchemaChangeExcept
     ClassGenerator<StreamingAggregator> cg = 
CodeGenerator.getRoot(StreamingAggTemplate.TEMPLATE_DEFINITION, 
context.getOptions());
     cg.getCodeGenerator().plainJavaCapable(true);
     // Uncomment out this line to debug the generated code.
-    //cg.getCodeGenerator().saveCodeForDebugging(true);
+    //  cg.getCodeGenerator().saveCodeForDebugging(true);
     container.clear();
 
     LogicalExpression[] keyExprs = new 
LogicalExpression[popConfig.getKeys().size()];
@@ -496,6 +621,25 @@ private void getIndex(ClassGenerator<StreamingAggregator> 
g) {
     }
   }
 
+  private IterOutcome getFinalOutcome() {
+    IterOutcome outcomeToReturn;
+
+    if (firstBatchForDataSet) {
+      firstBatchForDataSet = false;
+    }
+    if (firstBatchForSchema) {
+      outcomeToReturn = OK_NEW_SCHEMA;
+      firstBatchForSchema = false;
+    } else if (lastKnownOutcome == EMIT) {
+      firstBatchForDataSet = true;
+      outcomeToReturn = EMIT;
+    } else {
+      // get the outcome to return before calling refresh since that resets 
the lastKnowOutcome to OK
+      outcomeToReturn = (recordCount == 0) ? NONE : OK;
+    }
+    return outcomeToReturn;
+  }
+
   @Override
   public void close() {
     super.close();
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
index fb4d508e1df..a752c7e7fcf 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
@@ -25,26 +25,49 @@
 import org.apache.drill.exec.record.RecordBatch.IterOutcome;
 import org.apache.drill.exec.record.VectorWrapper;
 
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK;
+import static 
org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
+
 public abstract class StreamingAggTemplate implements StreamingAggregator {
   private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(StreamingAggregator.class);
   private static final boolean EXTRA_DEBUG = false;
   private static final int OUTPUT_BATCH_SIZE = 32*1024;
 
+  // lastOutcome is set ONLY if the lastOutcome was NONE or STOP
   private IterOutcome lastOutcome = null;
+
+  // First batch after build schema phase
   private boolean first = true;
+  private boolean firstBatchForSchema = true; // true if the current batch 
came in with an OK_NEW_SCHEMA.
+  private boolean firstBatchForDataSet = true; // true if the current batch is 
the first batch in a data set
+
   private boolean newSchema = false;
-  private int previousIndex = -1;
+
+  // End of all data
+  private boolean done = false;
+
+  // index in the incoming (sv4/sv2/vector)
   private int underlyingIndex = 0;
-  private int currentIndex;
+  // The indexes below refer to the actual record indexes in input batch
+  // (i.e if a selection vector the sv4/sv2 entry has been dereferenced or if 
a vector then the record index itself)
+  private int previousIndex = -1;  // the last index that has been processed. 
Initialized to -1 every time a new
+                                   // aggregate group begins (including every 
time a new data set begins)
+  private int currentIndex; // current index being processed
   /**
    * Number of records added to the current aggregation group.
    */
   private long addedRecordCount = 0;
+  // There are two outcomes from the aggregator. One is the aggregator's 
outcome defined in
+  // StreamingAggregator.AggOutcome. The other is the outcome from the last 
call to incoming.next
   private IterOutcome outcome;
+  // Number of aggregation groups added into the output batch
   private int outputCount = 0;
   private RecordBatch incoming;
+  // the Streaming Agg Batch that this aggregator belongs to
   private StreamingAggBatch outgoing;
-  private boolean done = false;
+
   private OperatorContext context;
 
 
@@ -73,45 +96,67 @@ public int getOutputCount() {
   }
 
   @Override
-  public AggOutcome doWork() {
-    if (done) {
+  public AggOutcome doWork(IterOutcome outerOutcome) {
+    if (done || outerOutcome == NONE) {
       outcome = IterOutcome.NONE;
       return AggOutcome.CLEANUP_AND_RETURN;
     }
-    try { // outside loop to ensure that first is set to false after the first 
run.
+
+    try { // outside block to ensure that first is set to false after the 
first run.
       outputCount = 0;
       // allocate outgoing since either this is the first time or if a 
subsequent time we would
       // have sent the previous outgoing batch to downstream operator
       allocateOutgoing();
 
-      if (first) {
+      if (firstBatchForDataSet) {
         this.currentIndex = incoming.getRecordCount() == 0 ? 0 : 
this.getVectorIndex(underlyingIndex);
 
-        // consume empty batches until we get one with data.
-        if (incoming.getRecordCount() == 0) {
-          outer: while (true) {
-            IterOutcome out = outgoing.next(0, incoming);
-            switch (out) {
-            case OK_NEW_SCHEMA:
-            case OK:
-              if (incoming.getRecordCount() == 0) {
-                continue;
-              } else {
-                currentIndex = this.getVectorIndex(underlyingIndex);
-                break outer;
-              }
-            case OUT_OF_MEMORY:
-              outcome = out;
-              return AggOutcome.RETURN_OUTCOME;
-            case NONE:
-              out = IterOutcome.OK_NEW_SCHEMA;
-            case STOP:
-            default:
-              lastOutcome = out;
-              outcome = out;
-              done = true;
-              return AggOutcome.CLEANUP_AND_RETURN;
-            }
+        if (outerOutcome == OK_NEW_SCHEMA) {
+          firstBatchForSchema = true;
+        }
+        // consume empty batches until we get one with data (unless we got an 
EMIT). If we got an emit
+        // then this is the first batch, it was empty and we also got an emit.
+        if (incoming.getRecordCount() == 0 ) {
+          if (outerOutcome != EMIT) {
+            outer:
+            while (true) {
+              IterOutcome out = outgoing.next(0, incoming);
+              switch (out) {
+                case OK_NEW_SCHEMA:
+                  //lastOutcome = out;
+                  firstBatchForSchema = true;
+                case OK:
+                  if (incoming.getRecordCount() == 0) {
+                    continue;
+                  } else {
+                    currentIndex = this.getVectorIndex(underlyingIndex);
+                    break outer;
+                  }
+                case OUT_OF_MEMORY:
+                  outcome = out;
+                  return AggOutcome.RETURN_OUTCOME;
+                case EMIT:
+                  if (incoming.getRecordCount() == 0) {
+                    // When we see an EMIT we let the  agg record batch know 
that it should either
+                    // send out an EMIT or an OK_NEW_SCHEMA, followed by an 
EMIT. To do that we simply return
+                    // RETURN_AND_RESET with the outcome so the record batch 
can take care of it.
+                    return setOkAndReturnEmit();
+                  } else {
+                    break outer;
+                  }
+
+                case NONE:
+                  out = IterOutcome.OK_NEW_SCHEMA;
+                case STOP:
+                default:
+                  lastOutcome = out;
+                  outcome = out;
+                  done = true;
+                  return AggOutcome.CLEANUP_AND_RETURN;
+              } // switch (outcome)
+            } // while empty batches are seen
+          } else {
+            return setOkAndReturnEmit();
           }
         }
       }
@@ -121,49 +166,24 @@ public AggOutcome doWork() {
         return AggOutcome.UPDATE_AGGREGATOR;
       }
 
-      if (lastOutcome != null) {
+      // if the previous iteration has an outcome that was terminal, don't do 
anything.
+      if (lastOutcome != null /*&& lastOutcome != IterOutcome.OK_NEW_SCHEMA*/) 
{
         outcome = lastOutcome;
         return AggOutcome.CLEANUP_AND_RETURN;
       }
 
       outside: while(true) {
-      // loop through existing records, adding as necessary.
-        for (; underlyingIndex < incoming.getRecordCount(); incIndex()) {
-          if (EXTRA_DEBUG) {
-            logger.debug("Doing loop with values underlying {}, current {}", 
underlyingIndex, currentIndex);
-          }
-          if (previousIndex == -1) {
-            if (EXTRA_DEBUG) {
-              logger.debug("Adding the initial row's keys and values.");
-            }
-            addRecordInc(currentIndex);
-          }
-          else if (isSame( previousIndex, currentIndex )) {
-            if (EXTRA_DEBUG) {
-              logger.debug("Values were found the same, adding.");
-            }
-            addRecordInc(currentIndex);
-          } else {
-            if (EXTRA_DEBUG) {
-              logger.debug("Values were different, outputting previous 
batch.");
-            }
-            if(!outputToBatch(previousIndex)) {
-              // There is still space in outgoing container, so proceed to the 
next input.
-              if (EXTRA_DEBUG) {
-                logger.debug("Output successful.");
-              }
-              addRecordInc(currentIndex);
-            } else {
-              if (EXTRA_DEBUG) {
-                logger.debug("Output container has reached its capacity. 
Flushing it.");
-              }
-
-              // Update the indices to set the state for processing next 
record in incoming batch in subsequent doWork calls.
-              previousIndex = -1;
-              return setOkAndReturn();
-            }
-          }
-          previousIndex = currentIndex;
+        // loop through existing records, adding as necessary.
+        if(!processRemainingRecordsInBatch()) {
+          // output batch is full. Return.
+          return setOkAndReturn();
+        }
+        // if the current batch came with an EMIT, we're done
+        if(outerOutcome == EMIT) {
+          // output the last record
+          outputToBatch(previousIndex);
+          resetIndex();
+          return setOkAndReturnEmit();
         }
 
         /**
@@ -189,83 +209,122 @@ else if (isSame( previousIndex, currentIndex )) {
               logger.debug("Received IterOutcome of {}", out);
             }
             switch (out) {
-            case NONE:
-              done = true;
-              lastOutcome = out;
-              if (first && addedRecordCount == 0) {
-                return setOkAndReturn();
-              } else if (addedRecordCount > 0) {
-                outputToBatchPrev(previous, previousIndex, outputCount); // No 
need to check the return value
-                // (output container full or not) as we are not going to 
insert any more records.
-                if (EXTRA_DEBUG) {
-                  logger.debug("Received no more batches, returning.");
+              case NONE:
+                done = true;
+                lastOutcome = out;
+                if (firstBatchForDataSet && addedRecordCount == 0) {
+                  return setOkAndReturn();
+                } else if (addedRecordCount > 0) {
+                  outputToBatchPrev(previous, previousIndex, outputCount); // 
No need to check the return value
+                  // (output container full or not) as we are not going to 
insert any more records.
+                  if (EXTRA_DEBUG) {
+                    logger.debug("Received no more batches, returning.");
+                  }
+                  return setOkAndReturn();
+                } else {
+                  // not first batch and record Count == 0
+                  outcome = out;
+                  return AggOutcome.CLEANUP_AND_RETURN;
                 }
-                return setOkAndReturn();
-              } else {
-                if (first && out == IterOutcome.OK) {
-                  out = IterOutcome.OK_NEW_SCHEMA;
+                // EMIT is handled like OK, except that we do not loop back to 
process the
+                // next incoming batch; we return instead
+              case EMIT:
+                if (incoming.getRecordCount() == 0) {
+                  if (addedRecordCount > 0) {
+                    outputToBatchPrev(previous, previousIndex, outputCount);
+                  }
+                } else {
+                  resetIndex();
+                  if (previousIndex != -1 && isSamePrev(previousIndex, 
previous, currentIndex)) {
+                    if (EXTRA_DEBUG) {
+                      logger.debug("New value was same as last value of 
previous batch, adding.");
+                    }
+                    addRecordInc(currentIndex);
+                    previousIndex = currentIndex;
+                    incIndex();
+                    if (EXTRA_DEBUG) {
+                      logger.debug("Continuing outside");
+                    }
+                  } else { // not the same
+                    if (EXTRA_DEBUG) {
+                      logger.debug("This is not the same as the previous, add 
record and continue outside.");
+                    }
+                    if (addedRecordCount > 0) {
+                      if (outputToBatchPrev(previous, previousIndex, 
outputCount)) {
+                        if (EXTRA_DEBUG) {
+                          logger.debug("Output container is full. flushing 
it.");
+                        }
+                        return setOkAndReturnEmit();
+                      }
+                    }
+                    // important to set the previous index to -1 since we 
start a new group
+                    previousIndex = -1;
+                  }
+                  processRemainingRecordsInBatch();
+                  outputToBatch(previousIndex); // currentIndex has been reset 
to int_max so use previous index.
                 }
-                outcome = out;
-                return AggOutcome.CLEANUP_AND_RETURN;
-              }
-
-            case NOT_YET:
-              this.outcome = out;
-              return AggOutcome.RETURN_OUTCOME;
-
-            case OK_NEW_SCHEMA:
-              if (EXTRA_DEBUG) {
-                logger.debug("Received new schema.  Batch has {} records.", 
incoming.getRecordCount());
-              }
-              if (addedRecordCount > 0) {
-                outputToBatchPrev(previous, previousIndex, outputCount); // No 
need to check the return value
-                // (output container full or not) as we are not going to 
insert anymore records.
+                resetIndex();
+                return setOkAndReturnEmit();
+
+              case NOT_YET:
+                this.outcome = out;
+                return AggOutcome.RETURN_OUTCOME;
+
+              case OK_NEW_SCHEMA:
+                firstBatchForSchema = true;
+                //lastOutcome = out;
                 if (EXTRA_DEBUG) {
-                  logger.debug("Wrote out end of previous batch, returning.");
+                  logger.debug("Received new schema.  Batch has {} records.", 
incoming.getRecordCount());
                 }
-                newSchema = true;
-                return setOkAndReturn();
-              }
-              cleanup();
-              return AggOutcome.UPDATE_AGGREGATOR;
-            case OK:
-              resetIndex();
-              if (incoming.getRecordCount() == 0) {
-                continue;
-              } else {
-                if (previousIndex != -1 && isSamePrev(previousIndex , 
previous, currentIndex)) {
-                  if (EXTRA_DEBUG) {
-                    logger.debug("New value was same as last value of previous 
batch, adding.");
-                  }
-                  addRecordInc(currentIndex);
-                  previousIndex = currentIndex;
-                  incIndex();
-                  if (EXTRA_DEBUG) {
-                    logger.debug("Continuing outside");
-                  }
-                  continue outside;
-                } else { // not the same
+                if (addedRecordCount > 0) {
+                  outputToBatchPrev(previous, previousIndex, outputCount); // 
No need to check the return value
+                  // (output container full or not) as we are not going to 
insert anymore records.
                   if (EXTRA_DEBUG) {
-                    logger.debug("This is not the same as the previous, add 
record and continue outside.");
+                    logger.debug("Wrote out end of previous batch, 
returning.");
                   }
-                  if (addedRecordCount > 0) {
-                    if (outputToBatchPrev(previous, previousIndex, 
outputCount)) {
-                      if (EXTRA_DEBUG) {
-                        logger.debug("Output container is full. flushing it.");
+                  newSchema = true;
+                  return setOkAndReturn();
+                }
+                cleanup();
+                return AggOutcome.UPDATE_AGGREGATOR;
+              case OK:
+                resetIndex();
+                if (incoming.getRecordCount() == 0) {
+                  continue;
+                } else {
+                  if (previousIndex != -1 && isSamePrev(previousIndex, 
previous, currentIndex)) {
+                    if (EXTRA_DEBUG) {
+                      logger.debug("New value was same as last value of 
previous batch, adding.");
+                    }
+                    addRecordInc(currentIndex);
+                    previousIndex = currentIndex;
+                    incIndex();
+                    if (EXTRA_DEBUG) {
+                      logger.debug("Continuing outside");
+                    }
+                    continue outside;
+                  } else { // not the same
+                    if (EXTRA_DEBUG) {
+                      logger.debug("This is not the same as the previous, add 
record and continue outside.");
+                    }
+                    if (addedRecordCount > 0) {
+                      if (outputToBatchPrev(previous, previousIndex, 
outputCount)) {
+                        if (EXTRA_DEBUG) {
+                          logger.debug("Output container is full. flushing 
it.");
+                        }
+                        previousIndex = -1;
+                        return setOkAndReturn();
                       }
-                      previousIndex = -1;
-                      return setOkAndReturn();
                     }
+                    previousIndex = -1;
+                    continue outside;
                   }
-                  previousIndex = -1;
-                  continue outside;
                 }
-              }
-            case STOP:
-            default:
-              lastOutcome = out;
-              outcome = out;
-              return AggOutcome.CLEANUP_AND_RETURN;
+              case STOP:
+              default:
+                lastOutcome = out;
+                outcome = out;
+                return AggOutcome.CLEANUP_AND_RETURN;
             }
           }
         } finally {
@@ -277,12 +336,63 @@ else if (isSame( previousIndex, currentIndex )) {
       }
     } finally {
       if (first) {
-        first = !first;
+        first = false;
       }
     }
 
   }
 
+  @Override
+  public boolean isDone() {
+    return done;
+  }
+
+  /**
+   * Process the remaining records in the batch. Returns false if not all 
records are processed (if the output
+   * container gets full), true otherwise.
+   * @return  Boolean indicating all records were processed
+   */
+  private boolean processRemainingRecordsInBatch() {
+    for (; underlyingIndex < incoming.getRecordCount(); incIndex()) {
+      if (EXTRA_DEBUG) {
+        logger.debug("Doing loop with values underlying {}, current {}", 
underlyingIndex, currentIndex);
+      }
+      if (previousIndex == -1) {
+        if (EXTRA_DEBUG) {
+          logger.debug("Adding the initial row's keys and values.");
+        }
+        addRecordInc(currentIndex);
+      }
+      else if (isSame( previousIndex, currentIndex )) {
+        if (EXTRA_DEBUG) {
+          logger.debug("Values were found the same, adding.");
+        }
+        addRecordInc(currentIndex);
+      } else {
+        if (EXTRA_DEBUG) {
+          logger.debug("Values were different, outputting previous batch.");
+        }
+        if(!outputToBatch(previousIndex)) {
+          // There is still space in outgoing container, so proceed to the 
next input.
+          if (EXTRA_DEBUG) {
+            logger.debug("Output successful.");
+          }
+          addRecordInc(currentIndex);
+        } else {
+          if (EXTRA_DEBUG) {
+            logger.debug("Output container has reached its capacity. Flushing 
it.");
+          }
+
+          // Update the indices to set the state for processing next record in 
incoming batch in subsequent doWork calls.
+          previousIndex = -1;
+          return false;
+        }
+      }
+      previousIndex = currentIndex;
+    }
+    return true;
+  }
+
   private final void incIndex() {
     underlyingIndex++;
     if (underlyingIndex >= incoming.getRecordCount()) {
@@ -297,18 +407,51 @@ private final void resetIndex() {
     incIndex();
   }
 
+  /**
+   * Set the outcome to OK (or OK_NEW_SCHEMA) and return the AggOutcome 
parameter
+   *
+   * @return outcome
+   */
   private final AggOutcome setOkAndReturn() {
-    if (first) {
-      this.outcome = IterOutcome.OK_NEW_SCHEMA;
+    IterOutcome outcomeToReturn;
+    firstBatchForDataSet = false;
+    if (firstBatchForSchema) {
+      outcomeToReturn = OK_NEW_SCHEMA;
+      firstBatchForSchema = false;
     } else {
-      this.outcome = IterOutcome.OK;
+      outcomeToReturn = OK;
     }
+    this.outcome = outcomeToReturn;
+
     for (VectorWrapper<?> v : outgoing) {
       v.getValueVector().getMutator().setValueCount(outputCount);
     }
     return AggOutcome.RETURN_OUTCOME;
   }
 
+  /**
+   * setOkAndReturn (as above) if the iter outcome was EMIT
+   *
+   * @return outcome
+   */
+  private final AggOutcome setOkAndReturnEmit() {
+    IterOutcome outcomeToReturn;
+    firstBatchForDataSet = true;
+    previousIndex = -1;
+    if (firstBatchForSchema) {
+      outcomeToReturn = OK_NEW_SCHEMA;
+      firstBatchForSchema = false;
+    } else {
+      outcomeToReturn = EMIT;
+    }
+    this.outcome = outcomeToReturn;
+
+    for (VectorWrapper<?> v : outgoing) {
+      v.getValueVector().getMutator().setValueCount(outputCount);
+    }
+    return AggOutcome.RETURN_AND_RESET;
+  }
+
   // Returns output container status after insertion of the given record. 
Caller must check the return value if it
   // plans to insert more records into outgoing container.
   private final boolean outputToBatch(int inIndex) {
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java
index a300924e4ca..2a64b930795 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java
@@ -27,8 +27,30 @@
 
   public static TemplateClassDefinition<StreamingAggregator> 
TEMPLATE_DEFINITION = new 
TemplateClassDefinition<StreamingAggregator>(StreamingAggregator.class, 
StreamingAggTemplate.class);
 
+
+  /**
+   * The Aggregator can return one of the following outcomes:
+   * <p>
+   * <b>RETURN_OUTCOME:</b> The aggregation has seen a change in the group and 
should send data downstream. If
+   * complex writers are involved, then rebuild schema.
+   * <p>
+   * <b>CLEANUP_AND_RETURN:</b> End of all data. Return the data downstream, 
and cleanup.
+   * <p>
+   * <b>UPDATE_AGGREGATOR:</b> A schema change was encountered. The 
aggregator's generated  code and (possibly)
+   * container need to be updated
+   * <p>
+   * <b>RETURN_AND_RESET:</b> If the aggregator encounters an EMIT, then that 
implies the end of a data set but
+   * not of all the data. Return the data (aggregated so far) downstream, 
reset the internal state variables and
+   * come back for the next data set.
+   * <p>
+   * @see 
org.apache.drill.exec.physical.impl.aggregate.HashAggregator.AggOutcome 
HashAggregator.AggOutcome
+   */
   public static enum AggOutcome {
-    RETURN_OUTCOME, CLEANUP_AND_RETURN, UPDATE_AGGREGATOR;
+    RETURN_OUTCOME,
+    CLEANUP_AND_RETURN,
+    UPDATE_AGGREGATOR,
+    RETURN_AND_RESET
+    ;
   }
 
   public abstract void setup(OperatorContext context, RecordBatch incoming, 
StreamingAggBatch outgoing) throws SchemaChangeException;
@@ -37,7 +59,11 @@
 
   public abstract int getOutputCount();
 
-  public abstract AggOutcome doWork();
+  // do the work. Also pass in the Iteroutcome of the batch already read in 
case it might be an EMIT. If the
+  // outerOutcome is EMIT, we need to do the work without reading any more 
batches.
+  public abstract AggOutcome doWork(IterOutcome outerOutcome);
+
+  public abstract boolean isDone();
 
   public abstract void cleanup();
 
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
index ea7f51f41e2..7db4d3bb120 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
@@ -315,7 +315,9 @@ public IterOutcome innerNext() {
     case START:
       return load();
     case LOAD:
-      resetSortState();
+      if (!this.retainInMemoryBatchesOnNone) {
+        resetSortState();
+      }
       return (sortState == SortState.DONE) ? NONE : load();
     case DELIVER:
       return nextOutputBatch();
@@ -578,36 +580,20 @@ public static void releaseBatches(RecordBatch incoming) {
     }
     if (incoming instanceof ExternalSortBatch) {
       ExternalSortBatch esb = (ExternalSortBatch) incoming;
-      esb.releaseResources();
+      esb.resetSortState();
     }
   }
 
   private void releaseResources() {
-    // This means if it has received NONE outcome and flag to retain is false 
OR if it has seen an EMIT
-    // then release the resources
-    if ((sortState == SortState.DONE && !this.retainInMemoryBatchesOnNone) ||
-      (sortState == SortState.LOAD)) {
-
-      // Close the iterator here to release any remaining resources such
-      // as spill files. This is important when a query has a join: the
-      // first branch sort may complete before the second branch starts;
-      // it may be quite a while after returning the last batch before the
-      // fragment executor calls this operator's close method.
-      //
-      // Note however, that the StreamingAgg operator REQUIRES that the sort
-      // retain the batches behind an SV4 when doing an in-memory sort because
-      // the StreamingAgg retains a reference to that data that it will use
-      // after receiving a NONE result code. See DRILL-5656.
-      //zeroResources();
-      if (resultsIterator != null) {
-        resultsIterator.close();
-      }
-      // We only zero vectors for actual output container
-      outputWrapperContainer.clear();
-      outputSV4.clear();
-      container.zeroVectors();
+    if (resultsIterator != null) {
+      resultsIterator.close();
     }
 
+    // We only zero vectors for actual output container
+    outputWrapperContainer.clear();
+    outputSV4.clear();
+    container.zeroVectors();
+
     // Close sortImpl for this boundary
     if (sortImpl != null) {
       sortImpl.close();
@@ -620,6 +606,20 @@ private void releaseResources() {
    */
   private void resetSortState() {
     sortState = (lastKnownOutcome == EMIT) ? SortState.LOAD : SortState.DONE;
+    // This means if it has received NONE/EMIT outcome and flag to retain is 
false which will be the case in presence of
+    // StreamingAggBatch only since it will explicitly call releaseBacthes on 
ExternalSort when its done consuming
+    // all the data buffer.
+
+    // Close the iterator here to release any remaining resources such
+    // as spill files. This is important when a query has a join: the
+    // first branch sort may complete before the second branch starts;
+    // it may be quite a while after returning the last batch before the
+    // fragment executor calls this operator's close method.
+    //
+    // Note however, that the StreamingAgg operator REQUIRES that the sort
+    // retain the batches behind an SV4 when doing an in-memory sort because
+    // the StreamingAgg retains a reference to that data that it will use
+    // after receiving a NONE result code. See DRILL-5656.
     releaseResources();
 
     if (lastKnownOutcome == EMIT) {
@@ -674,7 +674,9 @@ private IterOutcome getFinalOutcome() {
       sortState = SortState.DELIVER;
     } else if (getRecordCount() == 0) { // There is no record to send 
downstream
       outcomeToReturn = lastKnownOutcome == EMIT ? EMIT : NONE;
-      resetSortState();
+      if (!this.retainInMemoryBatchesOnNone) {
+        resetSortState();
+      }
     } else if (lastKnownOutcome == EMIT) {
       final boolean hasMoreRecords = outputSV4.hasNext();
       sortState = hasMoreRecords ? SortState.DELIVER : SortState.LOAD;
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestStreamingAggEmitOutcome.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestStreamingAggEmitOutcome.java
new file mode 100644
index 00000000000..75c4598baf2
--- /dev/null
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestStreamingAggEmitOutcome.java
@@ -0,0 +1,614 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.agg;
+
+import org.apache.drill.categories.OperatorTest;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.physical.config.StreamingAggregate;
+import org.apache.drill.exec.physical.impl.BaseTestOpBatchEmitOutcome;
+import org.apache.drill.exec.physical.impl.MockRecordBatch;
+import org.apache.drill.exec.physical.impl.aggregate.StreamingAggBatch;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.test.rowSet.DirectRowSet;
+import org.apache.drill.test.rowSet.RowSet;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.apache.drill.test.rowSet.schema.SchemaBuilder;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
+import static 
org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+@Category(OperatorTest.class)
+public class TestStreamingAggEmitOutcome extends BaseTestOpBatchEmitOutcome {
+  //private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(TestStreamingAggEmitOutcome.class);
+  protected static TupleMetadata resultSchema;
+
+  @BeforeClass
+  public static void setUpBeforeClass2() throws Exception {
+    resultSchema = new SchemaBuilder()
+        .add("name", TypeProtos.MinorType.VARCHAR)
+        .addNullable("total_sum", TypeProtos.MinorType.BIGINT)
+        .buildSchema();
+  }
+
+  /**
+   * Verifies that if StreamingAggBatch receives empty batches with 
OK_NEW_SCHEMA and EMIT outcome then it correctly produces
+   * empty batches as output. First empty batch will be with OK_NEW_SCHEMA and 
second will be with EMIT outcome.
+   */
+  @Test
+  public void t1_testStreamingAggrEmptyBatchEmitOutcome() {
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(OK_NEW_SCHEMA);
+    inputOutcomes.add(OK_NEW_SCHEMA);
+    inputOutcomes.add(EMIT);
+
+    final MockRecordBatch mockInputBatch = new 
MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+    final StreamingAggregate streamAggrConfig = new StreamingAggregate(null,
+      parseExprs("name_left", "name"),
+      parseExprs("sum(id_left+cost_left)", "total_sum"),
+      1.0f);
+
+    final StreamingAggBatch strAggBatch = new 
StreamingAggBatch(streamAggrConfig, mockInputBatch,
+      operatorFixture.getFragmentContext());
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(0, strAggBatch.getRecordCount());
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE);
+  }
+
+  /**
+   * Verifies that if StreamingAgg receives a RecordBatch with EMIT outcome 
post build schema phase then it produces
+   * output for those input batch correctly. The first output batch will 
always be returned with OK_NEW_SCHEMA
+   * outcome followed by EMIT with empty batch. The test verifies the output 
order with the expected baseline.
+   */
+  @Test
+  public void t2_testStreamingAggrNonEmptyBatchEmitOutcome() {
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = 
operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(13, 130, "item13")
+      .addRow(13, 130, "item13")
+      .addRow(2, 20, "item2")
+      .addRow(2, 20, "item2")
+      .addRow(4, 40, "item4")
+      .build();
+
+    final RowSet.SingleRowSet expectedRowSet = 
operatorFixture.rowSetBuilder(resultSchema)
+      .addRow("item1", (long)11)
+      .addRow("item13", (long)286)
+      .addRow("item2", (long)44)
+      .addRow("item4", (long)44)
+      .build();
+
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    final MockRecordBatch mockInputBatch = new 
MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+    final StreamingAggregate streamAggrConfig = new StreamingAggregate(null,
+      parseExprs("name_left", "name"),
+      parseExprs("sum(id_left+cost_left)", "total_sum"),
+      1.0f);
+
+    final StreamingAggBatch strAggBatch = new 
StreamingAggBatch(streamAggrConfig, mockInputBatch,
+      operatorFixture.getFragmentContext());
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    // Data before EMIT is returned with an OK_NEW_SCHEMA.
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertEquals(4, strAggBatch.getRecordCount());
+
+    RowSet actualRowSet = 
DirectRowSet.fromContainer(strAggBatch.getContainer());
+    new RowSetComparison(expectedRowSet).verify(actualRowSet);
+
+    // EMIT comes with an empty batch
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE);
+
+    // Release memory for row sets
+    nonEmptyInputRowSet2.clear();
+    expectedRowSet.clear();
+  }
+
+  @Test
+  public void 
t3_testStreamingAggrEmptyBatchFollowedByNonEmptyBatchEmitOutcome() {
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = 
operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(13, 130, "item13")
+      .addRow(0, 1300, "item13")
+      .addRow(2, 20, "item2")
+      .addRow(0, 2000, "item2")
+      .addRow(4, 40, "item4")
+      .addRow(0, 4000, "item4")
+      .build();
+
+    final RowSet.SingleRowSet expectedRowSet = 
operatorFixture.rowSetBuilder(resultSchema)
+      .addRow("item13", (long)1443)
+      .addRow("item2", (long)2022)
+      .addRow("item4", (long)4044)
+      .build();
+
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    final MockRecordBatch mockInputBatch = new 
MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+    final StreamingAggregate streamAggrConfig = new StreamingAggregate(null,
+      parseExprs("name_left", "name"),
+      parseExprs("sum(id_left+cost_left)", "total_sum"),
+      1.0f);
+
+    final StreamingAggBatch strAggBatch = new 
StreamingAggBatch(streamAggrConfig, mockInputBatch,
+      operatorFixture.getFragmentContext());
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(0, strAggBatch.getRecordCount());
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(3, strAggBatch.getRecordCount());
+
+    RowSet actualRowSet = 
DirectRowSet.fromContainer(strAggBatch.getContainer());
+    new RowSetComparison(expectedRowSet).verify(actualRowSet);
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE);
+
+    // Release memory for row sets
+    nonEmptyInputRowSet2.clear();
+    expectedRowSet.clear();
+  }
+
+  @Test
+  public void 
t4_testStreamingAggrMultipleEmptyBatchFollowedByNonEmptyBatchEmitOutcome() {
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = 
operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(13, 130, "item13")
+      .addRow(0, 0, "item13")
+      .addRow(1, 33000, "item13")
+      .addRow(2, 20, "item2")
+      .addRow(0, 0, "item2")
+      .addRow(1, 11000, "item2")
+      .addRow(4, 40, "item4")
+      .build();
+
+    final RowSet.SingleRowSet expectedRowSet = 
operatorFixture.rowSetBuilder(resultSchema)
+      .addRow("item13", (long)33144)
+      .addRow("item2", (long)11023)
+      .addRow("item4", (long)44)
+      .build();
+
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    final MockRecordBatch mockInputBatch = new 
MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+    final StreamingAggregate streamAggrConfig = new StreamingAggregate(null,
+      parseExprs("name_left", "name"),
+      parseExprs("sum(id_left+cost_left)", "total_sum"),
+      1.0f);
+
+    final StreamingAggBatch strAggBatch = new 
StreamingAggBatch(streamAggrConfig, mockInputBatch,
+      operatorFixture.getFragmentContext());
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(0, strAggBatch.getRecordCount());
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(0, strAggBatch.getRecordCount());
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(0, strAggBatch.getRecordCount());
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(3, strAggBatch.getRecordCount());
+
+    RowSet actualRowSet = 
DirectRowSet.fromContainer(strAggBatch.getContainer());
+    new RowSetComparison(expectedRowSet).verify(actualRowSet);
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE);
+
+    // Release memory for row sets
+    nonEmptyInputRowSet2.clear();
+    expectedRowSet.clear();
+  }
+
+  /**
+   * Verifies that if StreamingAggr receives multiple non-empty record batch 
with EMIT outcome in between then it produces
+   * output for those input batch correctly. In this case it receives first 
non-empty batch with OK_NEW_SCHEMA in
+   * buildSchema phase followed by an empty batch with EMIT outcome. For this 
combination it produces output for the
+   * record received so far along with EMIT outcome. Then it receives second 
non-empty batch with OK outcome and
+   * produces output for it differently. The test validates that for each 
output received the order of the records are
+   * correct.
+   * @throws Exception
+   */
+  @Test
+  public void t5_testStreamingAgrResetsAfterFirstEmitOutcome() {
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = 
operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(2, 20, "item2")
+      .addRow(2, 20, "item2")
+      .addRow(3, 30, "item3")
+      .addRow(3, 30, "item3")
+      .addRow(3, 30, "item3")
+      .addRow(3, 30, "item3")
+      .addRow(3, 30, "item3")
+      .addRow(3, 30, "item3")
+      .addRow(3, 30, "item3")
+      .addRow(3, 30, "item3")
+      .addRow(3, 30, "item3")
+      .addRow(3, 30, "item3")
+      .build();
+
+    final RowSet.SingleRowSet expectedRowSet1 = 
operatorFixture.rowSetBuilder(resultSchema)
+      .addRow("item1", (long)11)
+      .build();
+
+    final RowSet.SingleRowSet expectedRowSet2 = 
operatorFixture.rowSetBuilder(resultSchema)
+      .addRow("item2", (long)44)
+      .addRow("item3", (long)330)
+      .build();
+
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    final MockRecordBatch mockInputBatch = new 
MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+    final StreamingAggregate streamAggrConfig = new StreamingAggregate(null,
+      parseExprs("name_left", "name"),
+      parseExprs("sum(id_left+cost_left)", "total_sum"),
+      1.0f);
+
+    final StreamingAggBatch strAggBatch = new 
StreamingAggBatch(streamAggrConfig, mockInputBatch,
+      operatorFixture.getFragmentContext());
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertEquals(1, strAggBatch.getRecordCount());
+
+    RowSet actualRowSet = 
DirectRowSet.fromContainer(strAggBatch.getContainer());
+    new RowSetComparison(expectedRowSet1).verify(actualRowSet);
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(0, strAggBatch.getRecordCount());
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(2, strAggBatch.getRecordCount());
+
+    actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer());
+    new RowSetComparison(expectedRowSet2).verify(actualRowSet);
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE);
+
+    // Release memory for row sets
+    nonEmptyInputRowSet2.clear();
+    expectedRowSet2.clear();
+    expectedRowSet1.clear();
+  }
+
+  /**
+   * Verifies that if StreamingAggr receives multiple non-empty record batch 
with EMIT outcome in between then it produces
+   * output for those input batch correctly. In this case it receives first 
non-empty batch with OK_NEW_SCHEMA in
+   * buildSchema phase followed by an empty batch with EMIT outcome. For this 
combination it produces output for the
+   * record received so far along with EMIT outcome. Then it receives second 
non-empty batch with OK outcome and
+   * produces output for it differently. The test validates that for each 
output received the order of the records are
+   * correct.
+   * @throws Exception
+   */
+  @Test
+  public void t6_testStreamingAggrOkFollowedByNone() {
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = 
operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(2, 20, "item2")
+      .addRow(3, 30, "item3")
+      .addRow(4, 40, "item4")
+      .addRow(4, 40, "item4")
+      .addRow(5, 50, "item5")
+      .addRow(5, 50, "item5")
+      .build();
+
+    final RowSet.SingleRowSet expectedRowSet1 = 
operatorFixture.rowSetBuilder(resultSchema)
+      .addRow("item1", (long)11)
+      .build();
+
+    final RowSet.SingleRowSet expectedRowSet2 = 
operatorFixture.rowSetBuilder(resultSchema)
+      .addRow("item2", (long)22)
+      .addRow("item3", (long)33)
+      .addRow("item4", (long)88)
+      .addRow("item5", (long)110)
+      .build();
+
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+
+    final MockRecordBatch mockInputBatch = new 
MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+    final StreamingAggregate streamAggrConfig = new StreamingAggregate(null,
+      parseExprs("name_left", "name"),
+      parseExprs("sum(id_left+cost_left)", "total_sum"),
+      1.0f);
+
+    final StreamingAggBatch strAggBatch = new 
StreamingAggBatch(streamAggrConfig, mockInputBatch,
+      operatorFixture.getFragmentContext());
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertEquals(1, strAggBatch.getRecordCount());
+
+    RowSet actualRowSet = 
DirectRowSet.fromContainer(strAggBatch.getContainer());
+    new RowSetComparison(expectedRowSet1).verify(actualRowSet);
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(0, strAggBatch.getRecordCount());
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK);
+    assertEquals(4, strAggBatch.getRecordCount());
+
+    actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer());
+    new RowSetComparison(expectedRowSet2).verify(actualRowSet);
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE);
+
+    // Release memory for row sets
+    nonEmptyInputRowSet2.clear();
+    expectedRowSet2.clear();
+    expectedRowSet1.clear();
+  }
+
+  /**
+   * Normal case
+   */
+  @Test
+  public void t7_testStreamingAggrMultipleEMITOutcome() {
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = 
operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(2, 20, "item2")
+      .addRow(3, 30, "item3")
+      .build();
+
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    final MockRecordBatch mockInputBatch = new 
MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+    final StreamingAggregate streamAggrConfig = new StreamingAggregate(null,
+      parseExprs("name_left", "name"),
+      parseExprs("sum(id_left+cost_left)", "total_sum"),
+      1.0f);
+
+    final StreamingAggBatch strAggBatch = new 
StreamingAggBatch(streamAggrConfig, mockInputBatch,
+      operatorFixture.getFragmentContext());
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertEquals(1, strAggBatch.getRecordCount());
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(0, strAggBatch.getRecordCount());
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(2, strAggBatch.getRecordCount());
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(0, strAggBatch.getRecordCount());
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE);
+
+    nonEmptyInputRowSet2.clear();
+  }
+
+  /**
+   *
+   */
+  @Test
+  public void t8_testStreamingAggrMultipleInputToSingleOutputBatch() {
+
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = 
operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(2, 20, "item2")
+      .build();
+
+    final RowSet.SingleRowSet expectedRowSet = 
operatorFixture.rowSetBuilder(resultSchema)
+      .addRow("item1", (long)11)
+      .addRow("item2", (long)22)
+      .build();
+
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    final MockRecordBatch mockInputBatch = new 
MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+    final StreamingAggregate streamAggrConfig = new StreamingAggregate(null,
+      parseExprs("name_left", "name"),
+      parseExprs("sum(id_left+cost_left)", "total_sum"),
+      1.0f);
+
+    final StreamingAggBatch strAggBatch = new 
StreamingAggBatch(streamAggrConfig, mockInputBatch,
+      operatorFixture.getFragmentContext());
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertEquals(2, strAggBatch.getRecordCount());
+
+    RowSet actualRowSet = 
DirectRowSet.fromContainer(strAggBatch.getContainer());
+    new RowSetComparison(expectedRowSet).verify(actualRowSet);
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(0, strAggBatch.getRecordCount());
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE);
+
+    nonEmptyInputRowSet2.clear();
+  }
+
+
+  
/*****************************************************************************************
+   Tests for validating regular StreamingAggr behavior with no EMIT outcome
+   
******************************************************************************************/
+  @Test
+  public void t9_testStreamingAgr_WithEmptyNonEmptyBatchesAndOKOutcome() {
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = 
operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(2, 20, "item1")
+      .addRow(13, 130, "item13")
+      .addRow(13, 130, "item13")
+      .addRow(13, 130, "item13")
+      .addRow(130, 1300, "item130")
+      .addRow(0, 0, "item130")
+      .build();
+
+    final RowSet.SingleRowSet nonEmptyInputRowSet3 = 
operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(23, 230, "item23")
+      .addRow(3, 33, "item3")
+      .addRow(7, 70, "item7")
+      .addRow(17, 170, "item7")
+      .build();
+
+    final RowSet.SingleRowSet expectedRowSet = 
operatorFixture.rowSetBuilder(resultSchema)
+      .addRow("item1", (long)33)
+      .addRow("item13", (long)429)
+      .addRow("item130", (long)1430)
+      .addRow("item23", (long)253)
+      .addRow("item3", (long)36)
+      .addRow("item7", (long)264)
+      .build();
+
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet3.container());
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+
+    final MockRecordBatch mockInputBatch = new 
MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+    final StreamingAggregate streamAggrConfig = new StreamingAggregate(null,
+      parseExprs("name_left", "name"),
+      parseExprs("sum(id_left+cost_left)", "total_sum"),
+      1.0f);
+
+    final StreamingAggBatch strAggBatch = new 
StreamingAggBatch(streamAggrConfig, mockInputBatch,
+      operatorFixture.getFragmentContext());
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertEquals(6, strAggBatch.getRecordCount());
+
+    RowSet actualRowSet = 
DirectRowSet.fromContainer(strAggBatch.getContainer());
+    new RowSetComparison(expectedRowSet).verify(actualRowSet);
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE);
+
+    nonEmptyInputRowSet2.clear();
+    nonEmptyInputRowSet3.clear();
+    expectedRowSet.clear();
+  }
+
+  @Test
+  public void t10_testStreamingAggrWithEmptyDataSet() {
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+
+    final MockRecordBatch mockInputBatch = new 
MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+    final StreamingAggregate streamAggrConfig = new StreamingAggregate(null,
+        parseExprs("name_left", "name"),
+        parseExprs("sum(id_left+cost_left)", "total_sum"),
+      1.0f);
+
+    final StreamingAggBatch strAggBatch = new 
StreamingAggBatch(streamAggrConfig, mockInputBatch,
+      operatorFixture.getFragmentContext());
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE);
+  }
+}
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java
index 6bf3f9af088..a7c3a548ebf 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java
@@ -358,6 +358,7 @@ public void testMultipleBatchesLateral_WithHashAgg() throws 
Exception {
       + "dfs.`lateraljoin/multipleFiles/` customer) t1, LATERAL (SELECT 
CAST(MAX(t.ord.o_totalprice)"
       + " AS int) AS maxprice FROM UNNEST(t1.c_orders) t(ord) GROUP BY 
t.ord.o_orderstatus) t2";
 
+    try {
     testBuilder()
       .optionSettingQueriesForTestQuery("alter session set `%s` = false",
         PlannerSettings.STREAMAGG.getOptionName())
@@ -371,6 +372,9 @@ public void testMultipleBatchesLateral_WithHashAgg() throws 
Exception {
       .baselineValues(235695)
       .baselineValues(177819)
       .build().run();
+    } finally {
+      test("alter session set `" + PlannerSettings.STREAMAGG.getOptionName() + 
"` = true");
+    }
   }
 
   @Test
@@ -378,6 +382,7 @@ public void testLateral_HashAgg_with_nulls() throws 
Exception {
     String sql = "SELECT key, t3.dsls FROM cp.`lateraljoin/with_nulls.json` t 
LEFT OUTER "
     + "JOIN LATERAL (SELECT DISTINCT t2.sls AS dsls FROM UNNEST(t.sales) 
t2(sls)) t3 ON TRUE";
 
+    try {
     testBuilder()
       .optionSettingQueriesForTestQuery("alter session set `%s` = false",
         PlannerSettings.STREAMAGG.getOptionName())
@@ -393,5 +398,63 @@ public void testLateral_HashAgg_with_nulls() throws 
Exception {
       .baselineValues("dd",111L)
       .baselineValues("dd",222L)
       .build().run();
+    } finally {
+      test("alter session set `" + PlannerSettings.STREAMAGG.getOptionName() + 
"` = true");
+    }
+  }
+
+  @Test
+  public void testMultipleBatchesLateral_WithStreamingAgg() throws Exception {
+    String sql = "SELECT t2.maxprice FROM (SELECT customer.c_orders AS 
c_orders FROM "
+        + "dfs.`lateraljoin/multipleFiles/` customer) t1, LATERAL (SELECT 
CAST(MAX(t.ord.o_totalprice)"
+        + " AS int) AS maxprice FROM UNNEST(t1.c_orders) t(ord) GROUP BY 
t.ord.o_orderstatus) t2";
+
+    testBuilder()
+        .sqlQuery(sql)
+        .unOrdered()
+        .baselineColumns("maxprice")
+        .baselineValues(367190)
+        .baselineValues(316347)
+        .baselineValues(146610)
+        .baselineValues(306996)
+        .baselineValues(235695)
+        .baselineValues(177819)
+        .build().run();
   }
+
+  @Test
+  public void testLateral_StreamingAgg_with_nulls() throws Exception {
+    String sql = "SELECT key, t3.dsls FROM cp.`lateraljoin/with_nulls.json` t 
LEFT OUTER "
+        + "JOIN LATERAL (SELECT DISTINCT t2.sls AS dsls FROM UNNEST(t.sales) 
t2(sls)) t3 ON TRUE";
+
+    testBuilder()
+        .sqlQuery(sql)
+        .unOrdered()
+        .baselineColumns("key","dsls")
+        .baselineValues("aa",null)
+        .baselineValues("bb",100L)
+        .baselineValues("bb",200L)
+        .baselineValues("bb",300L)
+        .baselineValues("bb",400L)
+        .baselineValues("cc",null)
+        .baselineValues("dd",111L)
+        .baselineValues("dd",222L)
+        .build().run();
+  }
+
+  @Test
+  public void testMultipleBatchesLateral_WithStreamingAggNoGroup() throws 
Exception {
+    String sql = "SELECT t2.maxprice FROM (SELECT customer.c_orders AS 
c_orders FROM "
+        + "dfs.`lateraljoin/multipleFiles/` customer) t1, LATERAL (SELECT 
CAST(MAX(t.ord.o_totalprice)"
+        + " AS int) AS maxprice FROM UNNEST(t1.c_orders) t(ord) ) t2";
+
+    testBuilder()
+        .sqlQuery(sql)
+        .unOrdered()
+        .baselineColumns("maxprice")
+        .baselineValues(367190)
+        .baselineValues(306996)
+        .build().run();
+  }
+
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to