This is an automated email from the ASF dual-hosted git repository.

parthc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit a77fd142d86dd5648cda8866b8ff3af39c7b6b11
Author: Parth Chandra <par...@apache.org>
AuthorDate: Mon Jun 18 21:34:20 2018 -0700

    DRILL-6516: EMIT support in streaming agg
    
    This closes #1358
---
 .../physical/impl/aggregate/StreamingAggBatch.java | 274 ++++++---
 .../impl/aggregate/StreamingAggTemplate.java       | 425 +++++++++-----
 .../impl/aggregate/StreamingAggregator.java        |  30 +-
 .../impl/agg/TestStreamingAggEmitOutcome.java      | 614 +++++++++++++++++++++
 .../impl/lateraljoin/TestE2EUnnestAndLateral.java  |  63 +++
 5 files changed, 1198 insertions(+), 208 deletions(-)

diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
index caeed50..882c36d 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
@@ -67,15 +67,49 @@ import com.sun.codemodel.JExpr;
 import com.sun.codemodel.JVar;
 import org.apache.drill.exec.vector.complex.writer.BaseWriter;
 
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK;
+import static 
org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
+
 public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> 
{
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(StreamingAggBatch.class);
 
   private StreamingAggregator aggregator;
   private final RecordBatch incoming;
   private List<BaseWriter.ComplexWriter> complexWriters;
-  private boolean done = false;
-  private boolean first = true;
-  private int recordCount = 0;
+  //
+  // Streaming agg can be in (a) a normal pipeline or (b) it may be in a 
pipeline that is part of a subquery involving
+  // lateral and unnest. In case(a), the aggregator proceeds normally until it 
sees a group change or a NONE. If a
+  // group has changed, the aggregated data is sent downstream and the 
aggregation continues with the next group. If
+  // a NONE is seen, the aggregator completes, sends data downstream and 
cleans up.
+  // In case (b), the aggregator behaves similar to case(a) if a group change 
or NONE is observed. However it will
+  // also encounter a new state EMIT, every time unnest processes a new row. 
In this case the aggregator must complete the
+  // aggregation, send out the results, AND reset to receive more data. To 
make the treatment of these two cases
+  // similar, we define the aggregation operation in terms of data sets.
+  //   Data Set = The set of data that the aggregator is currently 
aggregating. In a normal query, the entire data is
+  //   a single data set. In the case of a Lateral subquery, every row 
processed by unnest is a data set.  There can,
+  //   therefore, be one or more data sets in an aggregation.
+  //   Data Sets may have multiple batches and may contain one or more empty 
batches. A data set may consist entirely
+  //   of empty batches.
+  //   Schema may change across Data Sets.
+  //   A corner case is the case of a Data Set having many empty batches in 
the beginning. Such a data set may see a
+  //   schema change once the first non-empty batch is received.
+  //   Schema change within a Data Set is not supported.
+  //
+  //   We will define some states for internal management
+  //
+  private boolean done = false;  // END of all data
+  private boolean first = true;  // Beginning of new data set. True during the 
build schema phase. False once the first
+                                 // call to inner next is made.
+  private boolean sendEmit = false; // In the case where we see an 
OK_NEW_SCHEMA along with the end of a data set
+                                    // we send out a batch with OK_NEW_SCHEMA 
first, then in the next iteration,
+                                    // we send out an emopty batch with EMIT.
+  private IterOutcome lastKnownOutcome = OK; // keep track of the outcome from 
the previous call to incoming.next
+  private boolean firstBatchForSchema = true; // true if the current batch 
came in with an OK_NEW_SCHEMA
+  private boolean firstBatchForDataSet = true; // true if the current batch is 
the first batch in a data set
+  private int recordCount = 0;  // number of records output in the current 
data set
+
   private BatchSchema incomingSchema;
 
   /*
@@ -154,83 +188,174 @@ public class StreamingAggBatch extends 
AbstractRecordBatch<StreamingAggregate> {
   public IterOutcome innerNext() {
 
     // if a special batch has been sent, we have no data in the incoming so 
exit early
-    if (specialBatchSent) {
-      return IterOutcome.NONE;
+    if ( done || specialBatchSent) {
+      return NONE;
+    }
+
+    // We sent an OK_NEW_SCHEMA and also encountered the end of a data set. So 
we need to send
+    // an EMIT with an empty batch now
+    if (sendEmit) {
+      sendEmit = false;
+      firstBatchForDataSet = true;
+      recordCount = 0;
+      return EMIT;
     }
 
     // this is only called on the first batch. Beyond this, the aggregator 
manages batches.
     if (aggregator == null || first) {
-      IterOutcome outcome;
       if (first && incoming.getRecordCount() > 0) {
         first = false;
-        outcome = IterOutcome.OK_NEW_SCHEMA;
+        lastKnownOutcome = OK_NEW_SCHEMA;
       } else {
-        outcome = next(incoming);
+        lastKnownOutcome = next(incoming);
       }
-      logger.debug("Next outcome of {}", outcome);
-      switch (outcome) {
-      case NONE:
-        if (first && popConfig.getKeys().size() == 0) {
+      logger.debug("Next outcome of {}", lastKnownOutcome);
+      switch (lastKnownOutcome) {
+        case NONE:
+          if (firstBatchForDataSet && popConfig.getKeys().size() == 0) {
+            // if we have a straight aggregate and empty input batch, we need 
to handle it in a different way
+            constructSpecialBatch();
+            // set state to indicate the fact that we have sent a special 
batch and input is empty
+            specialBatchSent = true;
+            // If outcome is NONE then we send the special batch in the first 
iteration and the NONE
+            // outcome in the next iteration. If outcome is EMIT, we can send 
the special
+            // batch and the EMIT outcome at the same time.
+            return getFinalOutcome();
+          }
+          // else fall thru
+        case OUT_OF_MEMORY:
+        case NOT_YET:
+        case STOP:
+          return lastKnownOutcome;
+        case OK_NEW_SCHEMA:
+          if (!createAggregator()) {
+            done = true;
+            return IterOutcome.STOP;
+          }
+          break;
+        case EMIT:
+          // if we get an EMIT with an empty batch as the first (and therefore 
only) batch
+          // we have to do the special handling
+          if (firstBatchForDataSet && popConfig.getKeys().size() == 0 && 
incoming.getRecordCount() == 0) {
+            constructSpecialBatch();
+            // set state to indicate the fact that we have sent a special 
batch and input is empty
+            specialBatchSent = true;
+            firstBatchForDataSet = true; // reset on the next iteration
+            // If outcome is NONE then we send the special batch in the first 
iteration and the NONE
+            // outcome in the next iteration. If outcome is EMIT, we can send 
the special
+            // batch and the EMIT outcome at the same time.
+            return getFinalOutcome();
+          }
+          // else fall thru
+        case OK:
+          break;
+        default:
+          throw new IllegalStateException(String.format("unknown outcome %s", 
lastKnownOutcome));
+      }
+    } else {
+      if ( lastKnownOutcome != NONE && firstBatchForDataSet && 
!aggregator.isDone()) {
+        lastKnownOutcome = incoming.next();
+        if (!first ) {
+          //Setup needs to be called again. During setup, generated code saves 
a reference to the vectors
+          // pointed to by the incoming batch so that the dereferencing of the 
vector wrappers to get to
+          // the vectors  does not have to be done at each call to eval. 
However, after an EMIT is seen,
+          // the vectors are replaced and the reference to the old vectors is 
no longer valid
+          try {
+            aggregator.setup(oContext, incoming, this);
+          } catch (SchemaChangeException e) {
+            UserException.Builder exceptionBuilder = 
UserException.functionError(e)
+                .message("A Schema change exception occured in calling setup() 
in generated code.");
+            throw exceptionBuilder.build(logger);
+          }
+        }
+      }
+      // We sent an EMIT in the previous iteration, so we must be starting a 
new data set
+      if (firstBatchForDataSet) {
+        done = false;
+        sendEmit = false;
+        specialBatchSent = false;
+        firstBatchForDataSet = false;
+      }
+    }
+    AggOutcome aggOutcome = aggregator.doWork(lastKnownOutcome);
+    recordCount = aggregator.getOutputCount();
+    container.setRecordCount(recordCount);
+    logger.debug("Aggregator response {}, records {}", aggOutcome, 
aggregator.getOutputCount());
+    // overwrite the outcome variable since we no longer need to remember the 
first batch outcome
+    lastKnownOutcome = aggregator.getOutcome();
+    switch (aggOutcome) {
+      case CLEANUP_AND_RETURN:
+        if (!first) {
+          container.zeroVectors();
+        }
+        done = true;
+        ExternalSortBatch.releaseBatches(incoming);
+        return lastKnownOutcome;
+      case RETURN_AND_RESET:
+        //WE could have got a string of batches, all empty, until we hit an 
emit
+        if (firstBatchForDataSet && popConfig.getKeys().size() == 0 && 
recordCount == 0) {
           // if we have a straight aggregate and empty input batch, we need to 
handle it in a different way
           constructSpecialBatch();
-          first = false;
           // set state to indicate the fact that we have sent a special batch 
and input is empty
           specialBatchSent = true;
-          return IterOutcome.OK;
+          // If outcome is NONE then we send the special batch in the first 
iteration and the NONE
+          // outcome in the next iteration. If outcome is EMIT, we can send 
the special
+          // batch and the EMIT outcome at the same time.
+          return getFinalOutcome();
         }
-      case OUT_OF_MEMORY:
-      case NOT_YET:
-      case STOP:
-        return outcome;
-      case OK_NEW_SCHEMA:
-        if (!createAggregator()) {
-          done = true;
+        firstBatchForDataSet = true;
+        if(first) {
+          first = false;
+        }
+        if(lastKnownOutcome == OK_NEW_SCHEMA) {
+          sendEmit = true;
+        }
+        // Release external sort batches after EMIT is seen
+        ExternalSortBatch.releaseBatches(incoming);
+        return lastKnownOutcome;
+      case RETURN_OUTCOME:
+        // In case of complex writer expression, vectors would be added to 
batch run-time.
+        // We have to re-build the schema.
+        if (complexWriters != null) {
+          container.buildSchema(SelectionVectorMode.NONE);
+        }
+        if (lastKnownOutcome == IterOutcome.NONE ) {
+          // we will set the 'done' flag in the next call to innerNext and use 
the lastKnownOutcome
+          // to determine whether we should set the flag or not.
+          // This is so that if someone calls getRecordCount in between calls 
to innerNext, we will
+          // return the correct record count (if the done flag is set, we will 
return 0).
+          if (first) {
+            first = false;
+            return OK_NEW_SCHEMA;
+          } else {
+            return OK;
+          }
+        } else if (lastKnownOutcome == OK && first) {
+          lastKnownOutcome = OK_NEW_SCHEMA;
+        } else if (lastKnownOutcome != IterOutcome.OUT_OF_MEMORY) {
+          first = false;
+        }
+        return lastKnownOutcome;
+      case UPDATE_AGGREGATOR:
+        // We could get this either between data sets or within a data set.
+        // If the former, we can handle the change and so need to update the 
aggregator and
+        // continue. If the latter, we cannot (currently) handle the schema 
change, so throw
+        // and exception
+        // This case is not tested since there are no unit tests for this and 
there is no support
+        // from the sort operator for this case
+        if (lastKnownOutcome == EMIT) {
+          createAggregator();
+          return OK_NEW_SCHEMA;
+        } else {
+          
context.getExecutorState().fail(UserException.unsupportedError().message(SchemaChangeException
+              .schemaChanged("Streaming aggregate does not support schema 
changes", incomingSchema,
+                  incoming.getSchema()).getMessage()).build(logger));
+          close();
+          killIncoming(false);
           return IterOutcome.STOP;
         }
-        break;
-      case OK:
-        break;
       default:
-        throw new IllegalStateException(String.format("unknown outcome %s", 
outcome));
-      }
-    }
-    AggOutcome out = aggregator.doWork();
-    recordCount = aggregator.getOutputCount();
-    logger.debug("Aggregator response {}, records {}", out, 
aggregator.getOutputCount());
-    switch (out) {
-    case CLEANUP_AND_RETURN:
-      if (!first) {
-        container.zeroVectors();
-      }
-      done = true;
-      ExternalSortBatch.releaseBatches(incoming);
-      // fall through
-    case RETURN_OUTCOME:
-      IterOutcome outcome = aggregator.getOutcome();
-      // In case of complex writer expression, vectors would be added to batch 
run-time.
-      // We have to re-build the schema.
-      if (complexWriters != null) {
-        container.buildSchema(SelectionVectorMode.NONE);
-      }
-      if (outcome == IterOutcome.NONE && first) {
-        first = false;
-        done = true;
-        return IterOutcome.OK_NEW_SCHEMA;
-      } else if (outcome == IterOutcome.OK && first) {
-        outcome = IterOutcome.OK_NEW_SCHEMA;
-      } else if (outcome != IterOutcome.OUT_OF_MEMORY) {
-        first = false;
-      }
-      return outcome;
-    case UPDATE_AGGREGATOR:
-      context.getExecutorState().fail(UserException.unsupportedError()
-        .message(SchemaChangeException.schemaChanged("Streaming aggregate does 
not support schema changes", incomingSchema, incoming.getSchema()).getMessage())
-        .build(logger));
-      close();
-      killIncoming(false);
-      return IterOutcome.STOP;
-    default:
-      throw new IllegalStateException(String.format("Unknown state %s.", out));
+        throw new IllegalStateException(String.format("Unknown state %s.", 
aggOutcome));
     }
   }
 
@@ -309,7 +434,7 @@ public class StreamingAggBatch extends 
AbstractRecordBatch<StreamingAggregate> {
     ClassGenerator<StreamingAggregator> cg = 
CodeGenerator.getRoot(StreamingAggTemplate.TEMPLATE_DEFINITION, 
context.getOptions());
     cg.getCodeGenerator().plainJavaCapable(true);
     // Uncomment out this line to debug the generated code.
-    //cg.getCodeGenerator().saveCodeForDebugging(true);
+    //  cg.getCodeGenerator().saveCodeForDebugging(true);
     container.clear();
 
     LogicalExpression[] keyExprs = new 
LogicalExpression[popConfig.getKeys().size()];
@@ -496,6 +621,25 @@ public class StreamingAggBatch extends 
AbstractRecordBatch<StreamingAggregate> {
     }
   }
 
+  private IterOutcome getFinalOutcome() {
+    IterOutcome outcomeToReturn;
+
+    if (firstBatchForDataSet) {
+      firstBatchForDataSet = false;
+    }
+    if (firstBatchForSchema) {
+      outcomeToReturn = OK_NEW_SCHEMA;
+      firstBatchForSchema = false;
+    } else if (lastKnownOutcome == EMIT) {
+      firstBatchForDataSet = true;
+      outcomeToReturn = EMIT;
+    } else {
+      // get the outcome to return before calling refresh since that resets 
the lastKnowOutcome to OK
+      outcomeToReturn = (recordCount == 0) ? NONE : OK;
+    }
+    return outcomeToReturn;
+  }
+
   @Override
   public void close() {
     super.close();
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
index fb4d508..a752c7e 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
@@ -25,26 +25,49 @@ import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.RecordBatch.IterOutcome;
 import org.apache.drill.exec.record.VectorWrapper;
 
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK;
+import static 
org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
+
 public abstract class StreamingAggTemplate implements StreamingAggregator {
   private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(StreamingAggregator.class);
   private static final boolean EXTRA_DEBUG = false;
   private static final int OUTPUT_BATCH_SIZE = 32*1024;
 
+  // lastOutcome is set ONLY if the lastOutcome was NONE or STOP
   private IterOutcome lastOutcome = null;
+
+  // First batch after build schema phase
   private boolean first = true;
+  private boolean firstBatchForSchema = true; // true if the current batch 
came in with an OK_NEW_SCHEMA.
+  private boolean firstBatchForDataSet = true; // true if the current batch is 
the first batch in a data set
+
   private boolean newSchema = false;
-  private int previousIndex = -1;
+
+  // End of all data
+  private boolean done = false;
+
+  // index in the incoming (sv4/sv2/vector)
   private int underlyingIndex = 0;
-  private int currentIndex;
+  // The indexes below refer to the actual record indexes in input batch
+  // (i.e if a selection vector the sv4/sv2 entry has been dereferenced or if 
a vector then the record index itself)
+  private int previousIndex = -1;  // the last index that has been processed. 
Initialized to -1 every time a new
+                                   // aggregate group begins (including every 
time a new data set begins)
+  private int currentIndex; // current index being processed
   /**
    * Number of records added to the current aggregation group.
    */
   private long addedRecordCount = 0;
+  // There are two outcomes from the aggregator. One is the aggregator's 
outcome defined in
+  // StreamingAggregator.AggOutcome. The other is the outcome from the last 
call to incoming.next
   private IterOutcome outcome;
+  // Number of aggregation groups added into the output batch
   private int outputCount = 0;
   private RecordBatch incoming;
+  // the Streaming Agg Batch that this aggregator belongs to
   private StreamingAggBatch outgoing;
-  private boolean done = false;
+
   private OperatorContext context;
 
 
@@ -73,45 +96,67 @@ public abstract class StreamingAggTemplate implements 
StreamingAggregator {
   }
 
   @Override
-  public AggOutcome doWork() {
-    if (done) {
+  public AggOutcome doWork(IterOutcome outerOutcome) {
+    if (done || outerOutcome == NONE) {
       outcome = IterOutcome.NONE;
       return AggOutcome.CLEANUP_AND_RETURN;
     }
-    try { // outside loop to ensure that first is set to false after the first 
run.
+
+    try { // outside block to ensure that first is set to false after the 
first run.
       outputCount = 0;
       // allocate outgoing since either this is the first time or if a 
subsequent time we would
       // have sent the previous outgoing batch to downstream operator
       allocateOutgoing();
 
-      if (first) {
+      if (firstBatchForDataSet) {
         this.currentIndex = incoming.getRecordCount() == 0 ? 0 : 
this.getVectorIndex(underlyingIndex);
 
-        // consume empty batches until we get one with data.
-        if (incoming.getRecordCount() == 0) {
-          outer: while (true) {
-            IterOutcome out = outgoing.next(0, incoming);
-            switch (out) {
-            case OK_NEW_SCHEMA:
-            case OK:
-              if (incoming.getRecordCount() == 0) {
-                continue;
-              } else {
-                currentIndex = this.getVectorIndex(underlyingIndex);
-                break outer;
-              }
-            case OUT_OF_MEMORY:
-              outcome = out;
-              return AggOutcome.RETURN_OUTCOME;
-            case NONE:
-              out = IterOutcome.OK_NEW_SCHEMA;
-            case STOP:
-            default:
-              lastOutcome = out;
-              outcome = out;
-              done = true;
-              return AggOutcome.CLEANUP_AND_RETURN;
-            }
+        if (outerOutcome == OK_NEW_SCHEMA) {
+          firstBatchForSchema = true;
+        }
+        // consume empty batches until we get one with data (unless we got an 
EMIT). If we got an emit
+        // then this is the first batch, it was empty and we also got an emit.
+        if (incoming.getRecordCount() == 0 ) {
+          if (outerOutcome != EMIT) {
+            outer:
+            while (true) {
+              IterOutcome out = outgoing.next(0, incoming);
+              switch (out) {
+                case OK_NEW_SCHEMA:
+                  //lastOutcome = out;
+                  firstBatchForSchema = true;
+                case OK:
+                  if (incoming.getRecordCount() == 0) {
+                    continue;
+                  } else {
+                    currentIndex = this.getVectorIndex(underlyingIndex);
+                    break outer;
+                  }
+                case OUT_OF_MEMORY:
+                  outcome = out;
+                  return AggOutcome.RETURN_OUTCOME;
+                case EMIT:
+                  if (incoming.getRecordCount() == 0) {
+                    // When we see an EMIT we let the  agg record batch know 
that it should either
+                    // send out an EMIT or an OK_NEW_SCHEMA, followed by an 
EMIT. To do that we simply return
+                    // RETURN_AND_RESET with the outcome so the record batch 
can take care of it.
+                    return setOkAndReturnEmit();
+                  } else {
+                    break outer;
+                  }
+
+                case NONE:
+                  out = IterOutcome.OK_NEW_SCHEMA;
+                case STOP:
+                default:
+                  lastOutcome = out;
+                  outcome = out;
+                  done = true;
+                  return AggOutcome.CLEANUP_AND_RETURN;
+              } // switch (outcome)
+            } // while empty batches are seen
+          } else {
+            return setOkAndReturnEmit();
           }
         }
       }
@@ -121,49 +166,24 @@ public abstract class StreamingAggTemplate implements 
StreamingAggregator {
         return AggOutcome.UPDATE_AGGREGATOR;
       }
 
-      if (lastOutcome != null) {
+      // if the previous iteration has an outcome that was terminal, don't do 
anything.
+      if (lastOutcome != null /*&& lastOutcome != IterOutcome.OK_NEW_SCHEMA*/) 
{
         outcome = lastOutcome;
         return AggOutcome.CLEANUP_AND_RETURN;
       }
 
       outside: while(true) {
-      // loop through existing records, adding as necessary.
-        for (; underlyingIndex < incoming.getRecordCount(); incIndex()) {
-          if (EXTRA_DEBUG) {
-            logger.debug("Doing loop with values underlying {}, current {}", 
underlyingIndex, currentIndex);
-          }
-          if (previousIndex == -1) {
-            if (EXTRA_DEBUG) {
-              logger.debug("Adding the initial row's keys and values.");
-            }
-            addRecordInc(currentIndex);
-          }
-          else if (isSame( previousIndex, currentIndex )) {
-            if (EXTRA_DEBUG) {
-              logger.debug("Values were found the same, adding.");
-            }
-            addRecordInc(currentIndex);
-          } else {
-            if (EXTRA_DEBUG) {
-              logger.debug("Values were different, outputting previous 
batch.");
-            }
-            if(!outputToBatch(previousIndex)) {
-              // There is still space in outgoing container, so proceed to the 
next input.
-              if (EXTRA_DEBUG) {
-                logger.debug("Output successful.");
-              }
-              addRecordInc(currentIndex);
-            } else {
-              if (EXTRA_DEBUG) {
-                logger.debug("Output container has reached its capacity. 
Flushing it.");
-              }
-
-              // Update the indices to set the state for processing next 
record in incoming batch in subsequent doWork calls.
-              previousIndex = -1;
-              return setOkAndReturn();
-            }
-          }
-          previousIndex = currentIndex;
+        // loop through existing records, adding as necessary.
+        if(!processRemainingRecordsInBatch()) {
+          // output batch is full. Return.
+          return setOkAndReturn();
+        }
+        // if the current batch came with an EMIT, we're done
+        if(outerOutcome == EMIT) {
+          // output the last record
+          outputToBatch(previousIndex);
+          resetIndex();
+          return setOkAndReturnEmit();
         }
 
         /**
@@ -189,83 +209,122 @@ public abstract class StreamingAggTemplate implements 
StreamingAggregator {
               logger.debug("Received IterOutcome of {}", out);
             }
             switch (out) {
-            case NONE:
-              done = true;
-              lastOutcome = out;
-              if (first && addedRecordCount == 0) {
-                return setOkAndReturn();
-              } else if (addedRecordCount > 0) {
-                outputToBatchPrev(previous, previousIndex, outputCount); // No 
need to check the return value
-                // (output container full or not) as we are not going to 
insert any more records.
-                if (EXTRA_DEBUG) {
-                  logger.debug("Received no more batches, returning.");
+              case NONE:
+                done = true;
+                lastOutcome = out;
+                if (firstBatchForDataSet && addedRecordCount == 0) {
+                  return setOkAndReturn();
+                } else if (addedRecordCount > 0) {
+                  outputToBatchPrev(previous, previousIndex, outputCount); // 
No need to check the return value
+                  // (output container full or not) as we are not going to 
insert any more records.
+                  if (EXTRA_DEBUG) {
+                    logger.debug("Received no more batches, returning.");
+                  }
+                  return setOkAndReturn();
+                } else {
+                  // not first batch and record Count == 0
+                  outcome = out;
+                  return AggOutcome.CLEANUP_AND_RETURN;
                 }
-                return setOkAndReturn();
-              } else {
-                if (first && out == IterOutcome.OK) {
-                  out = IterOutcome.OK_NEW_SCHEMA;
+                // EMIT is handled like OK, except that we do not loop back to 
process the
+                // next incoming batch; we return instead
+              case EMIT:
+                if (incoming.getRecordCount() == 0) {
+                  if (addedRecordCount > 0) {
+                    outputToBatchPrev(previous, previousIndex, outputCount);
+                  }
+                } else {
+                  resetIndex();
+                  if (previousIndex != -1 && isSamePrev(previousIndex, 
previous, currentIndex)) {
+                    if (EXTRA_DEBUG) {
+                      logger.debug("New value was same as last value of 
previous batch, adding.");
+                    }
+                    addRecordInc(currentIndex);
+                    previousIndex = currentIndex;
+                    incIndex();
+                    if (EXTRA_DEBUG) {
+                      logger.debug("Continuing outside");
+                    }
+                  } else { // not the same
+                    if (EXTRA_DEBUG) {
+                      logger.debug("This is not the same as the previous, add 
record and continue outside.");
+                    }
+                    if (addedRecordCount > 0) {
+                      if (outputToBatchPrev(previous, previousIndex, 
outputCount)) {
+                        if (EXTRA_DEBUG) {
+                          logger.debug("Output container is full. flushing 
it.");
+                        }
+                        return setOkAndReturnEmit();
+                      }
+                    }
+                    // important to set the previous index to -1 since we 
start a new group
+                    previousIndex = -1;
+                  }
+                  processRemainingRecordsInBatch();
+                  outputToBatch(previousIndex); // currentIndex has been reset 
to int_max so use previous index.
                 }
-                outcome = out;
-                return AggOutcome.CLEANUP_AND_RETURN;
-              }
-
-            case NOT_YET:
-              this.outcome = out;
-              return AggOutcome.RETURN_OUTCOME;
-
-            case OK_NEW_SCHEMA:
-              if (EXTRA_DEBUG) {
-                logger.debug("Received new schema.  Batch has {} records.", 
incoming.getRecordCount());
-              }
-              if (addedRecordCount > 0) {
-                outputToBatchPrev(previous, previousIndex, outputCount); // No 
need to check the return value
-                // (output container full or not) as we are not going to 
insert anymore records.
+                resetIndex();
+                return setOkAndReturnEmit();
+
+              case NOT_YET:
+                this.outcome = out;
+                return AggOutcome.RETURN_OUTCOME;
+
+              case OK_NEW_SCHEMA:
+                firstBatchForSchema = true;
+                //lastOutcome = out;
                 if (EXTRA_DEBUG) {
-                  logger.debug("Wrote out end of previous batch, returning.");
+                  logger.debug("Received new schema.  Batch has {} records.", 
incoming.getRecordCount());
                 }
-                newSchema = true;
-                return setOkAndReturn();
-              }
-              cleanup();
-              return AggOutcome.UPDATE_AGGREGATOR;
-            case OK:
-              resetIndex();
-              if (incoming.getRecordCount() == 0) {
-                continue;
-              } else {
-                if (previousIndex != -1 && isSamePrev(previousIndex , 
previous, currentIndex)) {
-                  if (EXTRA_DEBUG) {
-                    logger.debug("New value was same as last value of previous 
batch, adding.");
-                  }
-                  addRecordInc(currentIndex);
-                  previousIndex = currentIndex;
-                  incIndex();
-                  if (EXTRA_DEBUG) {
-                    logger.debug("Continuing outside");
-                  }
-                  continue outside;
-                } else { // not the same
+                if (addedRecordCount > 0) {
+                  outputToBatchPrev(previous, previousIndex, outputCount); // 
No need to check the return value
+                  // (output container full or not) as we are not going to 
insert anymore records.
                   if (EXTRA_DEBUG) {
-                    logger.debug("This is not the same as the previous, add 
record and continue outside.");
+                    logger.debug("Wrote out end of previous batch, 
returning.");
                   }
-                  if (addedRecordCount > 0) {
-                    if (outputToBatchPrev(previous, previousIndex, 
outputCount)) {
-                      if (EXTRA_DEBUG) {
-                        logger.debug("Output container is full. flushing it.");
+                  newSchema = true;
+                  return setOkAndReturn();
+                }
+                cleanup();
+                return AggOutcome.UPDATE_AGGREGATOR;
+              case OK:
+                resetIndex();
+                if (incoming.getRecordCount() == 0) {
+                  continue;
+                } else {
+                  if (previousIndex != -1 && isSamePrev(previousIndex, 
previous, currentIndex)) {
+                    if (EXTRA_DEBUG) {
+                      logger.debug("New value was same as last value of 
previous batch, adding.");
+                    }
+                    addRecordInc(currentIndex);
+                    previousIndex = currentIndex;
+                    incIndex();
+                    if (EXTRA_DEBUG) {
+                      logger.debug("Continuing outside");
+                    }
+                    continue outside;
+                  } else { // not the same
+                    if (EXTRA_DEBUG) {
+                      logger.debug("This is not the same as the previous, add 
record and continue outside.");
+                    }
+                    if (addedRecordCount > 0) {
+                      if (outputToBatchPrev(previous, previousIndex, 
outputCount)) {
+                        if (EXTRA_DEBUG) {
+                          logger.debug("Output container is full. flushing 
it.");
+                        }
+                        previousIndex = -1;
+                        return setOkAndReturn();
                       }
-                      previousIndex = -1;
-                      return setOkAndReturn();
                     }
+                    previousIndex = -1;
+                    continue outside;
                   }
-                  previousIndex = -1;
-                  continue outside;
                 }
-              }
-            case STOP:
-            default:
-              lastOutcome = out;
-              outcome = out;
-              return AggOutcome.CLEANUP_AND_RETURN;
+              case STOP:
+              default:
+                lastOutcome = out;
+                outcome = out;
+                return AggOutcome.CLEANUP_AND_RETURN;
             }
           }
         } finally {
@@ -277,12 +336,63 @@ public abstract class StreamingAggTemplate implements 
StreamingAggregator {
       }
     } finally {
       if (first) {
-        first = !first;
+        first = false;
       }
     }
 
   }
 
+  @Override
+  public boolean isDone() {
+    return done;
+  }
+
+  /**
+   * Process the remaining records in the batch. Returns false if not all 
records are processed (if the output
+   * container gets full), true otherwise.
+   * @return  Boolean indicating all records were processed
+   */
+  private boolean processRemainingRecordsInBatch() {
+    for (; underlyingIndex < incoming.getRecordCount(); incIndex()) {
+      if (EXTRA_DEBUG) {
+        logger.debug("Doing loop with values underlying {}, current {}", 
underlyingIndex, currentIndex);
+      }
+      if (previousIndex == -1) {
+        if (EXTRA_DEBUG) {
+          logger.debug("Adding the initial row's keys and values.");
+        }
+        addRecordInc(currentIndex);
+      }
+      else if (isSame( previousIndex, currentIndex )) {
+        if (EXTRA_DEBUG) {
+          logger.debug("Values were found the same, adding.");
+        }
+        addRecordInc(currentIndex);
+      } else {
+        if (EXTRA_DEBUG) {
+          logger.debug("Values were different, outputting previous batch.");
+        }
+        if(!outputToBatch(previousIndex)) {
+          // There is still space in outgoing container, so proceed to the 
next input.
+          if (EXTRA_DEBUG) {
+            logger.debug("Output successful.");
+          }
+          addRecordInc(currentIndex);
+        } else {
+          if (EXTRA_DEBUG) {
+            logger.debug("Output container has reached its capacity. Flushing 
it.");
+          }
+
+          // Update the indices to set the state for processing next record in 
incoming batch in subsequent doWork calls.
+          previousIndex = -1;
+          return false;
+        }
+      }
+      previousIndex = currentIndex;
+    }
+    return true;
+  }
+
   private final void incIndex() {
     underlyingIndex++;
     if (underlyingIndex >= incoming.getRecordCount()) {
@@ -297,18 +407,51 @@ public abstract class StreamingAggTemplate implements 
StreamingAggregator {
     incIndex();
   }
 
+  /**
+   * Set the outcome to OK (or OK_NEW_SCHEMA) and return the AggOutcome 
parameter
+   *
+   * @return outcome
+   */
   private final AggOutcome setOkAndReturn() {
-    if (first) {
-      this.outcome = IterOutcome.OK_NEW_SCHEMA;
+    IterOutcome outcomeToReturn;
+    firstBatchForDataSet = false;
+    if (firstBatchForSchema) {
+      outcomeToReturn = OK_NEW_SCHEMA;
+      firstBatchForSchema = false;
     } else {
-      this.outcome = IterOutcome.OK;
+      outcomeToReturn = OK;
     }
+    this.outcome = outcomeToReturn;
+
     for (VectorWrapper<?> v : outgoing) {
       v.getValueVector().getMutator().setValueCount(outputCount);
     }
     return AggOutcome.RETURN_OUTCOME;
   }
 
+  /**
+   * setOkAndReturn (as above) if the iter outcome was EMIT
+   *
+   * @return outcome
+   */
+  private final AggOutcome setOkAndReturnEmit() {
+    IterOutcome outcomeToReturn;
+    firstBatchForDataSet = true;
+    previousIndex = -1;
+    if (firstBatchForSchema) {
+      outcomeToReturn = OK_NEW_SCHEMA;
+      firstBatchForSchema = false;
+    } else {
+      outcomeToReturn = EMIT;
+    }
+    this.outcome = outcomeToReturn;
+
+    for (VectorWrapper<?> v : outgoing) {
+      v.getValueVector().getMutator().setValueCount(outputCount);
+    }
+    return AggOutcome.RETURN_AND_RESET;
+  }
+
   // Returns output container status after insertion of the given record. 
Caller must check the return value if it
   // plans to insert more records into outgoing container.
   private final boolean outputToBatch(int inIndex) {
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java
index a300924..2a64b93 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java
@@ -27,8 +27,30 @@ public interface StreamingAggregator {
 
   public static TemplateClassDefinition<StreamingAggregator> 
TEMPLATE_DEFINITION = new 
TemplateClassDefinition<StreamingAggregator>(StreamingAggregator.class, 
StreamingAggTemplate.class);
 
+
+  /**
+   * The Aggregator can return one of the following outcomes:
+   * <p>
+   * <b>RETURN_OUTCOME:</b> The aggregation has seen a change in the group and 
should send data downstream. If
+   * complex writers are involved, then rebuild schema.
+   * <p>
+   * <b>CLEANUP_AND_RETURN:</b> End of all data. Return the data downstream, 
and cleanup.
+   * <p>
+   * <b>UPDATE_AGGREGATOR:</b> A schema change was encountered. The 
aggregator's generated  code and (possibly)
+   * container need to be updated
+   * <p>
+   * <b>RETURN_AND_RESET:</b> If the aggregator encounters an EMIT, then that 
implies the end of a data set but
+   * not of all the data. Return the data (aggregated so far) downstream, 
reset the internal state variables and
+   * come back for the next data set.
+   * <p>
+   * @see 
org.apache.drill.exec.physical.impl.aggregate.HashAggregator.AggOutcome 
HashAggregator.AggOutcome
+   */
   public static enum AggOutcome {
-    RETURN_OUTCOME, CLEANUP_AND_RETURN, UPDATE_AGGREGATOR;
+    RETURN_OUTCOME,
+    CLEANUP_AND_RETURN,
+    UPDATE_AGGREGATOR,
+    RETURN_AND_RESET
+    ;
   }
 
   public abstract void setup(OperatorContext context, RecordBatch incoming, 
StreamingAggBatch outgoing) throws SchemaChangeException;
@@ -37,7 +59,11 @@ public interface StreamingAggregator {
 
   public abstract int getOutputCount();
 
-  public abstract AggOutcome doWork();
+  // do the work. Also pass in the Iteroutcome of the batch already read in 
case it might be an EMIT. If the
+  // outerOutcome is EMIT, we need to do the work without reading any more 
batches.
+  public abstract AggOutcome doWork(IterOutcome outerOutcome);
+
+  public abstract boolean isDone();
 
   public abstract void cleanup();
 
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestStreamingAggEmitOutcome.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestStreamingAggEmitOutcome.java
new file mode 100644
index 0000000..75c4598
--- /dev/null
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestStreamingAggEmitOutcome.java
@@ -0,0 +1,614 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.agg;
+
+import org.apache.drill.categories.OperatorTest;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.physical.config.StreamingAggregate;
+import org.apache.drill.exec.physical.impl.BaseTestOpBatchEmitOutcome;
+import org.apache.drill.exec.physical.impl.MockRecordBatch;
+import org.apache.drill.exec.physical.impl.aggregate.StreamingAggBatch;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.test.rowSet.DirectRowSet;
+import org.apache.drill.test.rowSet.RowSet;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.apache.drill.test.rowSet.schema.SchemaBuilder;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
+import static 
org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+@Category(OperatorTest.class)
+public class TestStreamingAggEmitOutcome extends BaseTestOpBatchEmitOutcome {
+  //private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(TestStreamingAggEmitOutcome.class);
+  protected static TupleMetadata resultSchema;
+
+  @BeforeClass
+  public static void setUpBeforeClass2() throws Exception {
+    resultSchema = new SchemaBuilder()
+        .add("name", TypeProtos.MinorType.VARCHAR)
+        .addNullable("total_sum", TypeProtos.MinorType.BIGINT)
+        .buildSchema();
+  }
+
+  /**
+   * Verifies that if StreamingAggBatch receives empty batches with 
OK_NEW_SCHEMA and EMIT outcome then it correctly produces
+   * empty batches as output. First empty batch will be with OK_NEW_SCHEMA and 
second will be with EMIT outcome.
+   */
+  @Test
+  public void t1_testStreamingAggrEmptyBatchEmitOutcome() {
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(OK_NEW_SCHEMA);
+    inputOutcomes.add(OK_NEW_SCHEMA);
+    inputOutcomes.add(EMIT);
+
+    final MockRecordBatch mockInputBatch = new 
MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+    final StreamingAggregate streamAggrConfig = new StreamingAggregate(null,
+      parseExprs("name_left", "name"),
+      parseExprs("sum(id_left+cost_left)", "total_sum"),
+      1.0f);
+
+    final StreamingAggBatch strAggBatch = new 
StreamingAggBatch(streamAggrConfig, mockInputBatch,
+      operatorFixture.getFragmentContext());
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(0, strAggBatch.getRecordCount());
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE);
+  }
+
+  /**
+   * Verifies that if StreamingAgg receives a RecordBatch with EMIT outcome 
post build schema phase then it produces
+   * output for those input batch correctly. The first output batch will 
always be returned with OK_NEW_SCHEMA
+   * outcome followed by EMIT with empty batch. The test verifies the output 
order with the expected baseline.
+   */
+  @Test
+  public void t2_testStreamingAggrNonEmptyBatchEmitOutcome() {
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = 
operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(13, 130, "item13")
+      .addRow(13, 130, "item13")
+      .addRow(2, 20, "item2")
+      .addRow(2, 20, "item2")
+      .addRow(4, 40, "item4")
+      .build();
+
+    final RowSet.SingleRowSet expectedRowSet = 
operatorFixture.rowSetBuilder(resultSchema)
+      .addRow("item1", (long)11)
+      .addRow("item13", (long)286)
+      .addRow("item2", (long)44)
+      .addRow("item4", (long)44)
+      .build();
+
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    final MockRecordBatch mockInputBatch = new 
MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+    final StreamingAggregate streamAggrConfig = new StreamingAggregate(null,
+      parseExprs("name_left", "name"),
+      parseExprs("sum(id_left+cost_left)", "total_sum"),
+      1.0f);
+
+    final StreamingAggBatch strAggBatch = new 
StreamingAggBatch(streamAggrConfig, mockInputBatch,
+      operatorFixture.getFragmentContext());
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    // Data before EMIT is returned with an OK_NEW_SCHEMA.
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertEquals(4, strAggBatch.getRecordCount());
+
+    RowSet actualRowSet = 
DirectRowSet.fromContainer(strAggBatch.getContainer());
+    new RowSetComparison(expectedRowSet).verify(actualRowSet);
+
+    // EMIT comes with an empty batch
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE);
+
+    // Release memory for row sets
+    nonEmptyInputRowSet2.clear();
+    expectedRowSet.clear();
+  }
+
+  @Test
+  public void 
t3_testStreamingAggrEmptyBatchFollowedByNonEmptyBatchEmitOutcome() {
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = 
operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(13, 130, "item13")
+      .addRow(0, 1300, "item13")
+      .addRow(2, 20, "item2")
+      .addRow(0, 2000, "item2")
+      .addRow(4, 40, "item4")
+      .addRow(0, 4000, "item4")
+      .build();
+
+    final RowSet.SingleRowSet expectedRowSet = 
operatorFixture.rowSetBuilder(resultSchema)
+      .addRow("item13", (long)1443)
+      .addRow("item2", (long)2022)
+      .addRow("item4", (long)4044)
+      .build();
+
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    final MockRecordBatch mockInputBatch = new 
MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+    final StreamingAggregate streamAggrConfig = new StreamingAggregate(null,
+      parseExprs("name_left", "name"),
+      parseExprs("sum(id_left+cost_left)", "total_sum"),
+      1.0f);
+
+    final StreamingAggBatch strAggBatch = new 
StreamingAggBatch(streamAggrConfig, mockInputBatch,
+      operatorFixture.getFragmentContext());
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(0, strAggBatch.getRecordCount());
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(3, strAggBatch.getRecordCount());
+
+    RowSet actualRowSet = 
DirectRowSet.fromContainer(strAggBatch.getContainer());
+    new RowSetComparison(expectedRowSet).verify(actualRowSet);
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE);
+
+    // Release memory for row sets
+    nonEmptyInputRowSet2.clear();
+    expectedRowSet.clear();
+  }
+
+  @Test
+  public void 
t4_testStreamingAggrMultipleEmptyBatchFollowedByNonEmptyBatchEmitOutcome() {
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = 
operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(13, 130, "item13")
+      .addRow(0, 0, "item13")
+      .addRow(1, 33000, "item13")
+      .addRow(2, 20, "item2")
+      .addRow(0, 0, "item2")
+      .addRow(1, 11000, "item2")
+      .addRow(4, 40, "item4")
+      .build();
+
+    final RowSet.SingleRowSet expectedRowSet = 
operatorFixture.rowSetBuilder(resultSchema)
+      .addRow("item13", (long)33144)
+      .addRow("item2", (long)11023)
+      .addRow("item4", (long)44)
+      .build();
+
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    final MockRecordBatch mockInputBatch = new 
MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+    final StreamingAggregate streamAggrConfig = new StreamingAggregate(null,
+      parseExprs("name_left", "name"),
+      parseExprs("sum(id_left+cost_left)", "total_sum"),
+      1.0f);
+
+    final StreamingAggBatch strAggBatch = new 
StreamingAggBatch(streamAggrConfig, mockInputBatch,
+      operatorFixture.getFragmentContext());
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(0, strAggBatch.getRecordCount());
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(0, strAggBatch.getRecordCount());
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(0, strAggBatch.getRecordCount());
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(3, strAggBatch.getRecordCount());
+
+    RowSet actualRowSet = 
DirectRowSet.fromContainer(strAggBatch.getContainer());
+    new RowSetComparison(expectedRowSet).verify(actualRowSet);
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE);
+
+    // Release memory for row sets
+    nonEmptyInputRowSet2.clear();
+    expectedRowSet.clear();
+  }
+
+  /**
+   * Verifies that if StreamingAggr receives multiple non-empty record batch 
with EMIT outcome in between then it produces
+   * output for those input batch correctly. In this case it receives first 
non-empty batch with OK_NEW_SCHEMA in
+   * buildSchema phase followed by an empty batch with EMIT outcome. For this 
combination it produces output for the
+   * record received so far along with EMIT outcome. Then it receives second 
non-empty batch with OK outcome and
+   * produces output for it differently. The test validates that for each 
output received the order of the records are
+   * correct.
+   * @throws Exception
+   */
+  @Test
+  public void t5_testStreamingAgrResetsAfterFirstEmitOutcome() {
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = 
operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(2, 20, "item2")
+      .addRow(2, 20, "item2")
+      .addRow(3, 30, "item3")
+      .addRow(3, 30, "item3")
+      .addRow(3, 30, "item3")
+      .addRow(3, 30, "item3")
+      .addRow(3, 30, "item3")
+      .addRow(3, 30, "item3")
+      .addRow(3, 30, "item3")
+      .addRow(3, 30, "item3")
+      .addRow(3, 30, "item3")
+      .addRow(3, 30, "item3")
+      .build();
+
+    final RowSet.SingleRowSet expectedRowSet1 = 
operatorFixture.rowSetBuilder(resultSchema)
+      .addRow("item1", (long)11)
+      .build();
+
+    final RowSet.SingleRowSet expectedRowSet2 = 
operatorFixture.rowSetBuilder(resultSchema)
+      .addRow("item2", (long)44)
+      .addRow("item3", (long)330)
+      .build();
+
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    final MockRecordBatch mockInputBatch = new 
MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+    final StreamingAggregate streamAggrConfig = new StreamingAggregate(null,
+      parseExprs("name_left", "name"),
+      parseExprs("sum(id_left+cost_left)", "total_sum"),
+      1.0f);
+
+    final StreamingAggBatch strAggBatch = new 
StreamingAggBatch(streamAggrConfig, mockInputBatch,
+      operatorFixture.getFragmentContext());
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertEquals(1, strAggBatch.getRecordCount());
+
+    RowSet actualRowSet = 
DirectRowSet.fromContainer(strAggBatch.getContainer());
+    new RowSetComparison(expectedRowSet1).verify(actualRowSet);
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(0, strAggBatch.getRecordCount());
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(2, strAggBatch.getRecordCount());
+
+    actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer());
+    new RowSetComparison(expectedRowSet2).verify(actualRowSet);
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE);
+
+    // Release memory for row sets
+    nonEmptyInputRowSet2.clear();
+    expectedRowSet2.clear();
+    expectedRowSet1.clear();
+  }
+
+  /**
+   * Verifies that if StreamingAggr receives multiple non-empty record batch 
with EMIT outcome in between then it produces
+   * output for those input batch correctly. In this case it receives first 
non-empty batch with OK_NEW_SCHEMA in
+   * buildSchema phase followed by an empty batch with EMIT outcome. For this 
combination it produces output for the
+   * record received so far along with EMIT outcome. Then it receives second 
non-empty batch with OK outcome and
+   * produces output for it differently. The test validates that for each 
output received the order of the records are
+   * correct.
+   * @throws Exception
+   */
+  @Test
+  public void t6_testStreamingAggrOkFollowedByNone() {
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = 
operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(2, 20, "item2")
+      .addRow(3, 30, "item3")
+      .addRow(4, 40, "item4")
+      .addRow(4, 40, "item4")
+      .addRow(5, 50, "item5")
+      .addRow(5, 50, "item5")
+      .build();
+
+    final RowSet.SingleRowSet expectedRowSet1 = 
operatorFixture.rowSetBuilder(resultSchema)
+      .addRow("item1", (long)11)
+      .build();
+
+    final RowSet.SingleRowSet expectedRowSet2 = 
operatorFixture.rowSetBuilder(resultSchema)
+      .addRow("item2", (long)22)
+      .addRow("item3", (long)33)
+      .addRow("item4", (long)88)
+      .addRow("item5", (long)110)
+      .build();
+
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+
+    final MockRecordBatch mockInputBatch = new 
MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+    final StreamingAggregate streamAggrConfig = new StreamingAggregate(null,
+      parseExprs("name_left", "name"),
+      parseExprs("sum(id_left+cost_left)", "total_sum"),
+      1.0f);
+
+    final StreamingAggBatch strAggBatch = new 
StreamingAggBatch(streamAggrConfig, mockInputBatch,
+      operatorFixture.getFragmentContext());
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertEquals(1, strAggBatch.getRecordCount());
+
+    RowSet actualRowSet = 
DirectRowSet.fromContainer(strAggBatch.getContainer());
+    new RowSetComparison(expectedRowSet1).verify(actualRowSet);
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(0, strAggBatch.getRecordCount());
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK);
+    assertEquals(4, strAggBatch.getRecordCount());
+
+    actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer());
+    new RowSetComparison(expectedRowSet2).verify(actualRowSet);
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE);
+
+    // Release memory for row sets
+    nonEmptyInputRowSet2.clear();
+    expectedRowSet2.clear();
+    expectedRowSet1.clear();
+  }
+
+  /**
+   * Normal case
+   */
+  @Test
+  public void t7_testStreamingAggrMultipleEMITOutcome() {
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = 
operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(2, 20, "item2")
+      .addRow(3, 30, "item3")
+      .build();
+
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    final MockRecordBatch mockInputBatch = new 
MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+    final StreamingAggregate streamAggrConfig = new StreamingAggregate(null,
+      parseExprs("name_left", "name"),
+      parseExprs("sum(id_left+cost_left)", "total_sum"),
+      1.0f);
+
+    final StreamingAggBatch strAggBatch = new 
StreamingAggBatch(streamAggrConfig, mockInputBatch,
+      operatorFixture.getFragmentContext());
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertEquals(1, strAggBatch.getRecordCount());
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(0, strAggBatch.getRecordCount());
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(2, strAggBatch.getRecordCount());
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(0, strAggBatch.getRecordCount());
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE);
+
+    nonEmptyInputRowSet2.clear();
+  }
+
+  /**
+   *
+   */
+  @Test
+  public void t8_testStreamingAggrMultipleInputToSingleOutputBatch() {
+
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = 
operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(2, 20, "item2")
+      .build();
+
+    final RowSet.SingleRowSet expectedRowSet = 
operatorFixture.rowSetBuilder(resultSchema)
+      .addRow("item1", (long)11)
+      .addRow("item2", (long)22)
+      .build();
+
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    final MockRecordBatch mockInputBatch = new 
MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+    final StreamingAggregate streamAggrConfig = new StreamingAggregate(null,
+      parseExprs("name_left", "name"),
+      parseExprs("sum(id_left+cost_left)", "total_sum"),
+      1.0f);
+
+    final StreamingAggBatch strAggBatch = new 
StreamingAggBatch(streamAggrConfig, mockInputBatch,
+      operatorFixture.getFragmentContext());
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertEquals(2, strAggBatch.getRecordCount());
+
+    RowSet actualRowSet = 
DirectRowSet.fromContainer(strAggBatch.getContainer());
+    new RowSetComparison(expectedRowSet).verify(actualRowSet);
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(0, strAggBatch.getRecordCount());
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE);
+
+    nonEmptyInputRowSet2.clear();
+  }
+
+
+  
/*****************************************************************************************
+   Tests for validating regular StreamingAggr behavior with no EMIT outcome
+   
******************************************************************************************/
+  @Test
+  public void t9_testStreamingAgr_WithEmptyNonEmptyBatchesAndOKOutcome() {
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = 
operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(2, 20, "item1")
+      .addRow(13, 130, "item13")
+      .addRow(13, 130, "item13")
+      .addRow(13, 130, "item13")
+      .addRow(130, 1300, "item130")
+      .addRow(0, 0, "item130")
+      .build();
+
+    final RowSet.SingleRowSet nonEmptyInputRowSet3 = 
operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(23, 230, "item23")
+      .addRow(3, 33, "item3")
+      .addRow(7, 70, "item7")
+      .addRow(17, 170, "item7")
+      .build();
+
+    final RowSet.SingleRowSet expectedRowSet = 
operatorFixture.rowSetBuilder(resultSchema)
+      .addRow("item1", (long)33)
+      .addRow("item13", (long)429)
+      .addRow("item130", (long)1430)
+      .addRow("item23", (long)253)
+      .addRow("item3", (long)36)
+      .addRow("item7", (long)264)
+      .build();
+
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet3.container());
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+
+    final MockRecordBatch mockInputBatch = new 
MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+    final StreamingAggregate streamAggrConfig = new StreamingAggregate(null,
+      parseExprs("name_left", "name"),
+      parseExprs("sum(id_left+cost_left)", "total_sum"),
+      1.0f);
+
+    final StreamingAggBatch strAggBatch = new 
StreamingAggBatch(streamAggrConfig, mockInputBatch,
+      operatorFixture.getFragmentContext());
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertEquals(6, strAggBatch.getRecordCount());
+
+    RowSet actualRowSet = 
DirectRowSet.fromContainer(strAggBatch.getContainer());
+    new RowSetComparison(expectedRowSet).verify(actualRowSet);
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE);
+
+    nonEmptyInputRowSet2.clear();
+    nonEmptyInputRowSet3.clear();
+    expectedRowSet.clear();
+  }
+
+  @Test
+  public void t10_testStreamingAggrWithEmptyDataSet() {
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+
+    final MockRecordBatch mockInputBatch = new 
MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+    final StreamingAggregate streamAggrConfig = new StreamingAggregate(null,
+        parseExprs("name_left", "name"),
+        parseExprs("sum(id_left+cost_left)", "total_sum"),
+      1.0f);
+
+    final StreamingAggBatch strAggBatch = new 
StreamingAggBatch(streamAggrConfig, mockInputBatch,
+      operatorFixture.getFragmentContext());
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE);
+  }
+}
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java
index c57093c..17a9d33 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java
@@ -449,6 +449,7 @@ public class TestE2EUnnestAndLateral extends ClusterTest {
       + "dfs.`lateraljoin/multipleFiles/` customer) t1, LATERAL (SELECT 
CAST(MAX(t.ord.o_totalprice)"
       + " AS int) AS maxprice FROM UNNEST(t1.c_orders) t(ord) GROUP BY 
t.ord.o_orderstatus) t2";
 
+    try {
     testBuilder()
       .optionSettingQueriesForTestQuery("alter session set `%s` = false",
         PlannerSettings.STREAMAGG.getOptionName())
@@ -462,6 +463,9 @@ public class TestE2EUnnestAndLateral extends ClusterTest {
       .baselineValues(235695)
       .baselineValues(177819)
       .build().run();
+    } finally {
+      test("alter session set `" + PlannerSettings.STREAMAGG.getOptionName() + 
"` = true");
+    }
   }
 
   @Test
@@ -469,6 +473,7 @@ public class TestE2EUnnestAndLateral extends ClusterTest {
     String sql = "SELECT key, t3.dsls FROM cp.`lateraljoin/with_nulls.json` t 
LEFT OUTER "
     + "JOIN LATERAL (SELECT DISTINCT t2.sls AS dsls FROM UNNEST(t.sales) 
t2(sls)) t3 ON TRUE";
 
+    try {
     testBuilder()
       .optionSettingQueriesForTestQuery("alter session set `%s` = false",
         PlannerSettings.STREAMAGG.getOptionName())
@@ -484,5 +489,63 @@ public class TestE2EUnnestAndLateral extends ClusterTest {
       .baselineValues("dd",111L)
       .baselineValues("dd",222L)
       .build().run();
+    } finally {
+      test("alter session set `" + PlannerSettings.STREAMAGG.getOptionName() + 
"` = true");
+    }
+  }
+
+  @Test
+  public void testMultipleBatchesLateral_WithStreamingAgg() throws Exception {
+    String sql = "SELECT t2.maxprice FROM (SELECT customer.c_orders AS 
c_orders FROM "
+        + "dfs.`lateraljoin/multipleFiles/` customer) t1, LATERAL (SELECT 
CAST(MAX(t.ord.o_totalprice)"
+        + " AS int) AS maxprice FROM UNNEST(t1.c_orders) t(ord) GROUP BY 
t.ord.o_orderstatus) t2";
+
+    testBuilder()
+        .sqlQuery(sql)
+        .unOrdered()
+        .baselineColumns("maxprice")
+        .baselineValues(367190)
+        .baselineValues(316347)
+        .baselineValues(146610)
+        .baselineValues(306996)
+        .baselineValues(235695)
+        .baselineValues(177819)
+        .build().run();
   }
+
+  @Test
+  public void testLateral_StreamingAgg_with_nulls() throws Exception {
+    String sql = "SELECT key, t3.dsls FROM cp.`lateraljoin/with_nulls.json` t 
LEFT OUTER "
+        + "JOIN LATERAL (SELECT DISTINCT t2.sls AS dsls FROM UNNEST(t.sales) 
t2(sls)) t3 ON TRUE";
+
+    testBuilder()
+        .sqlQuery(sql)
+        .unOrdered()
+        .baselineColumns("key","dsls")
+        .baselineValues("aa",null)
+        .baselineValues("bb",100L)
+        .baselineValues("bb",200L)
+        .baselineValues("bb",300L)
+        .baselineValues("bb",400L)
+        .baselineValues("cc",null)
+        .baselineValues("dd",111L)
+        .baselineValues("dd",222L)
+        .build().run();
+  }
+
+  @Test
+  public void testMultipleBatchesLateral_WithStreamingAggNoGroup() throws 
Exception {
+    String sql = "SELECT t2.maxprice FROM (SELECT customer.c_orders AS 
c_orders FROM "
+        + "dfs.`lateraljoin/multipleFiles/` customer) t1, LATERAL (SELECT 
CAST(MAX(t.ord.o_totalprice)"
+        + " AS int) AS maxprice FROM UNNEST(t1.c_orders) t(ord) ) t2";
+
+    testBuilder()
+        .sqlQuery(sql)
+        .unOrdered()
+        .baselineColumns("maxprice")
+        .baselineValues(367190)
+        .baselineValues(306996)
+        .build().run();
+  }
+
 }

Reply via email to