[ 
https://issues.apache.org/jira/browse/DRILL-6446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16505454#comment-16505454
 ] 

ASF GitHub Bot commented on DRILL-6446:
---------------------------------------

sohami closed pull request #1293: DRILL-6446: Support for EMIT outcome in TopN
URL: https://github.com/apache/drill/pull/1293
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueue.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueue.java
index e398f47485..3744b94200 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueue.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueue.java
@@ -66,6 +66,13 @@
    */
   SelectionVector4 getFinalSv4();
 
+  /**
+   * Cleanup the old state of queue and recreate a new one with HyperContainer 
containing vectors in input container
+   * and the corresponding indexes (in SV4 format) from input SelectionVector4
+   * @param container
+   * @param vector4
+   * @throws SchemaChangeException
+   */
   void resetQueue(VectorContainer container, SelectionVector4 vector4) throws 
SchemaChangeException;
 
   /**
@@ -73,5 +80,7 @@
    */
   void cleanup();
 
+  boolean isInitialized();
+
   TemplateClassDefinition<PriorityQueue> TEMPLATE_DEFINITION = new 
TemplateClassDefinition<>(PriorityQueue.class, PriorityQueueTemplate.class);
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java
index 97e26b662c..e39dce3a10 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java
@@ -40,10 +40,18 @@
 public abstract class PriorityQueueTemplate implements PriorityQueue {
   private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(PriorityQueueTemplate.class);
 
-  private SelectionVector4 heapSv4; //This holds the heap
+  // This holds the min heap of the record indexes. Heapify condition is based 
on actual record though. Only records
+  // meeting the heap condition have their indexes in this heap. Actual record 
are stored inside the hyperBatch. Since
+  // hyperBatch contains ValueVectors from all the incoming batches, the 
indexes here consider both BatchNumber and
+  // RecordNumber.
+  private SelectionVector4 heapSv4;
   private SelectionVector4 finalSv4; //This is for final sorted output
+
+  // This stores the actual incoming record batches
   private ExpandableHyperContainer hyperBatch;
   private BufferAllocator allocator;
+
+  // Limit determines the number of record to output and hold in queue.
   private int limit;
   private int queueSize = 0;
   private int batchCount = 0;
@@ -54,7 +62,11 @@ public void init(int limit, BufferAllocator allocator,  
boolean hasSv2) throws S
     this.limit = limit;
     this.allocator = allocator;
     @SuppressWarnings("resource")
+    // It's allocating memory to store (limit+1) indexes. When first limit 
number of record indexes are stored then all
+    // the other record indexes are kept at (limit+1) and evaluated with the 
root element of heap to determine if
+    // this new element will reside in heap or not.
     final DrillBuf drillBuf = allocator.buffer(4 * (limit + 1));
+    // Heap is a SelectionVector4 since it stores indexes for record relative 
to their batches.
     heapSv4 = new SelectionVector4(drillBuf, limit, Character.MAX_VALUE);
     this.hasSv2 = hasSv2;
   }
@@ -103,11 +115,17 @@ public void add(RecordBatchData batch) throws 
SchemaChangeException{
     if (hasSv2) {
       sv2 = batch.getSv2();
     }
+    // Will only be called until queueSize has reached the limit which means 
it has seen limit number of records in
+    // one or many batches. For each new record siftUp (or heapify) to adjust 
min heap property is called.
     for (; queueSize < limit && count < batch.getRecordCount();  count++) {
       heapSv4.set(queueSize, batchCount, hasSv2 ? sv2.getIndex(count) : count);
       queueSize++;
       siftUp();
     }
+
+    // For all the other records which fall beyond limit, it compares them 
with the root element in the current heap
+    // and perform heapify if need be. Note: Even though heapSv4 stores only 
limit+1 indexes but in hyper batch we
+    // are still keeping all the records unless purge is called.
     for (; count < batch.getRecordCount(); count++) {
       heapSv4.set(limit, batchCount, hasSv2 ? sv2.getIndex(count) : count);
       if (compare(limit, 0) < 0) {
@@ -153,15 +171,35 @@ public SelectionVector4 getFinalSv4() {
   public void cleanup() {
     if (heapSv4 != null) {
       heapSv4.clear();
+      heapSv4 = null;
     }
     if (hyperBatch != null) {
       hyperBatch.clear();
+      hyperBatch = null;
     }
     if (finalSv4 != null) {
       finalSv4.clear();
+      finalSv4 = null;
     }
+    batchCount = 0;
   }
 
+  /**
+   * When cleanup is called then heapSv4 is cleared and set to null and is 
only initialized during init call. Hence
+   * this is used to determine if priority queue is initialized or not.
+   * @return - true - queue is still initialized
+   *           false - queue is not yet initialized and before using queue 
init should be called
+   */
+  public boolean isInitialized() {
+    return (heapSv4 != null);
+  }
+
+  /**
+   * Perform Heapify for the record stored at index which was added as leaf 
node in the array. The new record is
+   * compared with the record stored at parent index. Since the new record 
index will flow up in the array hence the
+   * name siftUp
+   * @throws SchemaChangeException
+   */
   private void siftUp() throws SchemaChangeException {
     int p = queueSize - 1;
     while (p > 0) {
@@ -174,6 +212,14 @@ private void siftUp() throws SchemaChangeException {
     }
   }
 
+  /**
+   * Compares the record stored at the index of 0th index element of heapSv4 
(or root element) with the record
+   * stored at index of limit index element of heapSv4 (or new element). If 
the root element is greater than new element
+   * then new element is discarded else root element is replaced with new 
element and again heapify is performed on
+   * new root element.
+   * This is done for all the records which are seen after the queue is filled 
with limit number of record indexes.
+   * @throws SchemaChangeException
+   */
   private void siftDown() throws SchemaChangeException {
     int p = 0;
     int next;
@@ -196,6 +242,12 @@ private void siftDown() throws SchemaChangeException {
     }
   }
 
+  /**
+   * Pop the root element which holds the minimum value in heap. In this case 
root element will be the index of
+   * record with minimum value. After extracting the root element it swaps the 
root element with last element in
+   * heapSv4 and does heapify (by calling siftDown) again.
+   * @return - Index for
+   */
   public int pop() {
     int value = heapSv4.get(0);
     swap(0, queueSize - 1);
@@ -220,9 +272,25 @@ public int compare(int leftIndex, int rightIndex) throws 
SchemaChangeException {
     return doEval(sv1, sv2);
   }
 
+  /**
+   * Stores the reference to the hyperBatch container which holds all the 
records across incoming batches in it. This
+   * is used in doEval function to compare records in this hyper batch at 
given indexes.
+   * @param incoming - reference to hyperBatch
+   * @param outgoing - null
+   * @throws SchemaChangeException
+   */
   public abstract void doSetup(@Named("incoming") VectorContainer incoming,
                                @Named("outgoing") RecordBatch outgoing)
                        throws SchemaChangeException;
+
+  /**
+   * Evaluates the value of record at leftIndex and rightIndex w.r.t min heap 
condition. It is used in
+   * {@link PriorityQueueTemplate#compare(int, int)} method while Heapifying 
the queue.
+   * @param leftIndex
+   * @param rightIndex
+   * @return
+   * @throws SchemaChangeException
+   */
   public abstract int doEval(@Named("leftIndex") int leftIndex,
                              @Named("rightIndex") int rightIndex)
                       throws SchemaChangeException;
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
index 366e4e8888..a8c6804fd8 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
@@ -52,6 +52,7 @@
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.apache.drill.exec.record.ExpandableHyperContainer;
+import org.apache.drill.exec.record.HyperVectorWrapper;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.SchemaUtil;
 import org.apache.drill.exec.record.SimpleRecordBatch;
@@ -69,6 +70,16 @@
 import com.sun.codemodel.JConditional;
 import com.sun.codemodel.JExpr;
 
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK;
+import static 
org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
+
+/**
+ * Operator Batch which implements the TopN functionality. It is more 
efficient than (sort + limit) since unlike sort
+ * it doesn't have to store all the input data to sort it first and then apply 
limit on the sorted data. Instead
+ * internally it maintains a priority queue backed by a heap with the size 
being same as limit value.
+ */
 public class TopNBatch extends AbstractRecordBatch<TopN> {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(TopNBatch.class);
 
@@ -84,12 +95,15 @@
   private boolean schemaChanged = false;
   private PriorityQueue priorityQueue;
   private TopN config;
-  SelectionVector4 sv4;
+  private SelectionVector4 sv4;
   private long countSincePurge;
   private int batchCount;
   private Copier copier;
   private boolean first = true;
   private int recordCount = 0;
+  private IterOutcome lastKnownOutcome = OK;
+  private boolean firstBatchForSchema = true;
+  private boolean hasOutputRecords = false;
 
   public TopNBatch(TopN popConfig, FragmentContext context, RecordBatch 
incoming) throws OutOfMemoryException {
     super(popConfig, context);
@@ -117,12 +131,7 @@ public SelectionVector4 getSelectionVector4() {
 
   @Override
   public void close() {
-    if (sv4 != null) {
-      sv4.clear();
-    }
-    if (priorityQueue != null) {
-      priorityQueue.cleanup();
-    }
+    releaseResource();
     super.close();
   }
 
@@ -134,6 +143,9 @@ public void buildSchema() throws SchemaChangeException {
       case OK:
       case OK_NEW_SCHEMA:
         for (VectorWrapper<?> w : incoming) {
+          // TODO: Not sure why the special handling for 
AbstractContainerVector is needed since creation of child
+          // vectors is taken care correctly if the field is retrieved from 
incoming vector and passed to it rather than
+          // creating a new Field instance just based on name and type.
           @SuppressWarnings("resource")
           ValueVector v = c.addOrGet(w.getField());
           if (v instanceof AbstractContainerVector) {
@@ -162,8 +174,10 @@ public void buildSchema() throws SchemaChangeException {
         return;
       case NONE:
         state = BatchState.DONE;
+      case EMIT:
+        throw new IllegalStateException("Unexpected EMIT outcome received in 
buildSchema phase");
       default:
-        return;
+        throw new IllegalStateException("Unexpected outcome received in 
buildSchema phase");
     }
   }
 
@@ -171,47 +185,69 @@ public void buildSchema() throws SchemaChangeException {
   public IterOutcome innerNext() {
     recordCount = 0;
     if (state == BatchState.DONE) {
-      return IterOutcome.NONE;
+      return NONE;
     }
-    if (schema != null) {
-      if (getSelectionVector4().next()) {
-        recordCount = sv4.getCount();
-        return IterOutcome.OK;
-      } else {
-        recordCount = 0;
-        return IterOutcome.NONE;
-      }
+
+    // Check if anything is remaining from previous record boundary
+    if (hasOutputRecords) {
+      return handleRemainingOutput();
     }
 
+    // Reset the TopN state for next iteration
+    resetTopNState();
+
     try{
+      boolean incomingHasSv2 = false;
+      switch (incoming.getSchema().getSelectionVectorMode()) {
+        case NONE: {
+          break;
+        }
+        case TWO_BYTE: {
+          incomingHasSv2 = true;
+          break;
+        }
+        case FOUR_BYTE: {
+          throw new SchemaChangeException("TopN doesn't support incoming with 
SV4 mode");
+        }
+        default:
+          throw new UnsupportedOperationException("Unsupported SV mode 
detected in TopN incoming batch");
+      }
+
       outer: while (true) {
         Stopwatch watch = Stopwatch.createStarted();
-        IterOutcome upstream;
         if (first) {
-          upstream = IterOutcome.OK_NEW_SCHEMA;
+          lastKnownOutcome = IterOutcome.OK_NEW_SCHEMA;
+          // Create the SV4 object upfront to be used for both empty and 
non-empty incoming batches at EMIT boundary
+          sv4 = new SelectionVector4(context.getAllocator(), 0);
           first = false;
         } else {
-          upstream = next(incoming);
+          lastKnownOutcome = next(incoming);
         }
-        if (upstream == IterOutcome.OK && schema == null) {
-          upstream = IterOutcome.OK_NEW_SCHEMA;
+        if (lastKnownOutcome == OK && schema == null) {
+          lastKnownOutcome = IterOutcome.OK_NEW_SCHEMA;
           container.clear();
         }
         logger.debug("Took {} us to get next", 
watch.elapsed(TimeUnit.MICROSECONDS));
-        switch (upstream) {
+        switch (lastKnownOutcome) {
         case NONE:
           break outer;
         case NOT_YET:
           throw new UnsupportedOperationException();
         case OUT_OF_MEMORY:
         case STOP:
-          return upstream;
+          return lastKnownOutcome;
         case OK_NEW_SCHEMA:
           // only change in the case that the schema truly changes.  
Artificial schema changes are ignored.
+          // schema change handling in case when EMIT is also seen is same as 
without EMIT. i.e. only if union type
+          // is enabled it will be handled.
+          container.clear();
+          firstBatchForSchema = true;
           if (!incoming.getSchema().equals(schema)) {
             if (schema != null) {
               if (!unionTypeEnabled) {
-                throw new UnsupportedOperationException("Sort doesn't 
currently support sorts with changing schemas.");
+                throw new UnsupportedOperationException(String.format("TopN 
currently doesn't support changing " +
+                  "schemas with union type disabled. Please try enabling union 
type: %s and re-execute the query",
+                  ExecConstants.ENABLE_UNION_TYPE_KEY));
               } else {
                 this.schema = SchemaUtil.mergeSchemas(this.schema, 
incoming.getSchema());
                 purgeAndResetPriorityQueue();
@@ -223,10 +259,15 @@ public IterOutcome innerNext() {
           }
           // fall through.
         case OK:
+        case EMIT:
           if (incoming.getRecordCount() == 0) {
             for (VectorWrapper<?> w : incoming) {
               w.clear();
             }
+            // Release memory for incoming SV2 vector
+            if (incomingHasSv2) {
+              incoming.getSelectionVector2().clear();
+            }
             break;
           }
           countSincePurge += incoming.getRecordCount();
@@ -240,10 +281,16 @@ public IterOutcome innerNext() {
           boolean success = false;
           try {
             if (priorityQueue == null) {
-              assert !schemaChanged;
               priorityQueue = createNewPriorityQueue(new 
ExpandableHyperContainer(batch.getContainer()), config.getLimit());
+            } else if (!priorityQueue.isInitialized()) {
+              // means priority queue is cleaned up after producing output for 
first record boundary. We should
+              // initialize it for next record boundary
+              priorityQueue.init(config.getLimit(), oContext.getAllocator(),
+                schema.getSelectionVectorMode() == 
SelectionVectorMode.TWO_BYTE);
             }
             priorityQueue.add(batch);
+            // Based on static threshold of number of batches, perform purge 
operation to release the memory for
+            // RecordBatches which are of no use or doesn't fall under TopN 
category
             if (countSincePurge > config.getLimit() && batchCount > 
batchPurgeThreshold) {
               purge();
               countSincePurge = 0;
@@ -259,25 +306,29 @@ public IterOutcome innerNext() {
         default:
           throw new UnsupportedOperationException();
         }
+
+        // If the last seen outcome is EMIT then break the loop. We do it here 
since we want to process the batch
+        // with records and EMIT outcome in above case statements
+        if (lastKnownOutcome == EMIT) {
+          break;
+        }
       }
 
-      if (schema == null || priorityQueue == null) {
+      // PriorityQueue can be null here if first batch is received with 
OK_NEW_SCHEMA and is empty and second next()
+      // call returned NONE or EMIT.
+      // PriorityQueue can be uninitialized here if only empty batch is 
received between 2 EMIT outcome.
+      if (schema == null || (priorityQueue == null || 
!priorityQueue.isInitialized())) {
         // builder may be null at this point if the first incoming batch is 
empty
-        state = BatchState.DONE;
-        return IterOutcome.NONE;
+        return handleEmptyBatches(lastKnownOutcome);
       }
 
       priorityQueue.generate();
+      prepareOutputContainer(priorityQueue.getHyperBatch(), 
priorityQueue.getFinalSv4());
 
-      this.sv4 = priorityQueue.getFinalSv4();
-      container.clear();
-      for (VectorWrapper<?> w : priorityQueue.getHyperBatch()) {
-        container.add(w.getValueVectors());
-      }
-      container.buildSchema(BatchSchema.SelectionVectorMode.FOUR_BYTE);
-      recordCount = sv4.getCount();
-      return IterOutcome.OK_NEW_SCHEMA;
-
+      // With EMIT outcome control will come here multiple times whereas 
without EMIT outcome control will only come
+      // here once. In EMIT outcome case if there is schema change in any 
iteration then that will be handled by
+      // lastKnownOutcome.
+      return getFinalOutcome();
     } catch(SchemaChangeException | ClassTransformationException | IOException 
ex) {
       kill(false);
       logger.error("Failure during query", ex);
@@ -286,14 +337,27 @@ public IterOutcome innerNext() {
     }
   }
 
+  /**
+   * When PriorityQueue is built up then it stores the list of limit number of 
record indexes (in heapSv4) which falls
+   * under TopN category. But it also stores all the incoming RecordBatches 
with all records inside a HyperContainer
+   * (hyperBatch). When a certain threshold of batches are reached then this 
method is called which copies the limit
+   * number of records whose indexes are stored in heapSv4 out of HyperBatch 
to a new VectorContainer and releases
+   * all other records and their batches. Later this new VectorContainer is 
stored inside the HyperBatch and it's
+   * corresponding indexes are stored in the heapSv4 vector. This is done to 
avoid holding up lot's of Record Batches
+   * which can create OutOfMemory condition.
+   * @throws SchemaChangeException
+   */
   private void purge() throws SchemaChangeException {
     Stopwatch watch = Stopwatch.createStarted();
     VectorContainer c = priorityQueue.getHyperBatch();
+
+    // Simple VectorConatiner which stores limit number of records only. The 
records whose indexes are stored inside
+    // selectionVector4 below are only copied from Hyper container to this 
simple container.
     VectorContainer newContainer = new VectorContainer(oContext);
     @SuppressWarnings("resource")
+    // SV4 storing the limit number of indexes
     SelectionVector4 selectionVector4 = priorityQueue.getSv4();
     SimpleSV4RecordBatch batch = new SimpleSV4RecordBatch(c, selectionVector4, 
context);
-    SimpleSV4RecordBatch newBatch = new SimpleSV4RecordBatch(newContainer, 
null, context);
     if (copier == null) {
       copier = GenericSV4Copier.createCopier(batch, newContainer, null);
     } else {
@@ -308,25 +372,14 @@ private void purge() throws SchemaChangeException {
     @SuppressWarnings("resource")
     SortRecordBatchBuilder builder = new 
SortRecordBatchBuilder(oContext.getAllocator());
     try {
-      do {
-        int count = selectionVector4.getCount();
-        int copiedRecords = copier.copyRecords(0, count);
-        assert copiedRecords == count;
-        for (VectorWrapper<?> v : newContainer) {
-          ValueVector.Mutator m = v.getValueVector().getMutator();
-          m.setValueCount(count);
-        }
-        newContainer.buildSchema(BatchSchema.SelectionVectorMode.NONE);
-        newContainer.setRecordCount(count);
-        builder.add(newBatch);
-      } while (selectionVector4.next());
-      selectionVector4.clear();
-      c.clear();
+      // Purge all the existing batches to a new batch which only holds the 
selected records
+      copyToPurge(newContainer, builder);
+      // New VectorContainer that contains only limit number of records and is 
later passed to resetQueue to create a
+      // HyperContainer backing the priority queue out of it
       VectorContainer newQueue = new VectorContainer();
       builder.build(newQueue);
       priorityQueue.resetQueue(newQueue, 
builder.getSv4().createNewWrapperCurrent());
       builder.getSv4().clear();
-      selectionVector4.clear();
     } finally {
       DrillAutoCloseables.closeNoChecked(builder);
     }
@@ -414,25 +467,12 @@ public void purgeAndResetPriorityQueue() throws 
SchemaChangeException, ClassTran
     @SuppressWarnings("resource")
     final SelectionVector4 selectionVector4 = priorityQueue.getSv4();
     final SimpleSV4RecordBatch batch = new SimpleSV4RecordBatch(c, 
selectionVector4, context);
-    final SimpleSV4RecordBatch newBatch = new 
SimpleSV4RecordBatch(newContainer, null, context);
     copier = GenericSV4Copier.createCopier(batch, newContainer, null);
     @SuppressWarnings("resource")
     SortRecordBatchBuilder builder = new 
SortRecordBatchBuilder(oContext.getAllocator());
     try {
-      do {
-        final int count = selectionVector4.getCount();
-        final int copiedRecords = copier.copyRecords(0, count);
-        assert copiedRecords == count;
-        for (VectorWrapper<?> v : newContainer) {
-          ValueVector.Mutator m = v.getValueVector().getMutator();
-          m.setValueCount(count);
-        }
-        newContainer.buildSchema(BatchSchema.SelectionVectorMode.NONE);
-        newContainer.setRecordCount(count);
-        builder.add(newBatch);
-      } while (selectionVector4.next());
-      selectionVector4.clear();
-      c.clear();
+      // Purge all the existing batches to a new batch which only holds the 
selected records
+      copyToPurge(newContainer, builder);
       final VectorContainer oldSchemaContainer = new VectorContainer(oContext);
       builder.build(oldSchemaContainer);
       oldSchemaContainer.setRecordCount(builder.getSv4().getCount());
@@ -458,6 +498,190 @@ protected void killIncoming(boolean sendUpstream) {
     incoming.kill(sendUpstream);
   }
 
+  /**
+   * Resets TopNBatch state to process next incoming batches independent of 
already seen incoming batches.
+   */
+  private void resetTopNState() {
+    lastKnownOutcome = OK;
+    countSincePurge = 0;
+    batchCount = 0;
+    hasOutputRecords = false;
+    releaseResource();
+  }
+
+  /**
+   * Cleanup resources held by TopN Batch such as sv4, priority queue and 
outgoing container
+   */
+  private void releaseResource() {
+    if (sv4 != null) {
+      sv4.clear();
+    }
+
+    if (priorityQueue != null) {
+      priorityQueue.cleanup();
+    }
+    container.zeroVectors();
+  }
+
+  /**
+   * Returns the final IterOutcome which TopN should return for this next 
call. Return OK_NEW_SCHEMA with first output
+   * batch after a new schema is seen. This is indicated by firstBatchSchema 
flag. It is also true for very first
+   * output batch after buildSchema()phase too since in buildSchema() a dummy 
schema was returned downstream without
+   * correct SelectionVectorMode.
+   * In other cases when there is no schema change then either OK or EMIT is 
returned with output batches depending upon
+   * if EMIT is seen or not. In cases when EMIT is not seen then OK is always 
returned with an output batch. When all
+   * the data is returned then NONE is sent in the end.
+   *
+   * @return - IterOutcome - outcome to send downstream
+   */
+  private IterOutcome getFinalOutcome() {
+    IterOutcome outcomeToReturn;
+
+    if (firstBatchForSchema) {
+      outcomeToReturn = OK_NEW_SCHEMA;
+      firstBatchForSchema = false;
+    } else if (recordCount == 0) {
+      // get the outcome to return before calling refresh since that resets 
the lastKnowOutcome to OK
+      outcomeToReturn = lastKnownOutcome == EMIT ? EMIT : NONE;
+      resetTopNState();
+    } else if (lastKnownOutcome == EMIT) {
+      // in case of EMIT check if this output batch returns all the data or 
not. If yes then return EMIT along with this
+      // output batch else return OK. Remaining data will be sent downstream 
in subsequent next() call.
+      final boolean hasMoreRecords = sv4.hasNext();
+      outcomeToReturn = (hasMoreRecords) ? OK : EMIT;
+      hasOutputRecords = hasMoreRecords;
+    } else {
+      outcomeToReturn = OK;
+    }
+
+    return outcomeToReturn;
+  }
+
+  /**
+   * Copies all the selected records into the new container to purge all the 
incoming batches into a single batch.
+   * @param newContainer - New container holding the ValueVectors with 
selected records
+   * @param batchBuilder - Builder to build hyper vectors batches
+   * @throws SchemaChangeException
+   */
+  private void copyToPurge(VectorContainer newContainer, 
SortRecordBatchBuilder batchBuilder)
+    throws SchemaChangeException {
+    final VectorContainer c = priorityQueue.getHyperBatch();
+    final SelectionVector4 queueSv4 = priorityQueue.getSv4();
+    final SimpleSV4RecordBatch newBatch = new 
SimpleSV4RecordBatch(newContainer, null, context);
+
+    do {
+      // count is the limit number of records required by TopN batch
+      final int count = queueSv4.getCount();
+      // Transfers count number of records from hyperBatch to simple container
+      final int copiedRecords = copier.copyRecords(0, count);
+      assert copiedRecords == count;
+      for (VectorWrapper<?> v : newContainer) {
+        ValueVector.Mutator m = v.getValueVector().getMutator();
+        m.setValueCount(count);
+      }
+      newContainer.buildSchema(BatchSchema.SelectionVectorMode.NONE);
+      newContainer.setRecordCount(count);
+      // Store all the batches containing limit number of records
+      batchBuilder.add(newBatch);
+    } while (queueSv4.next());
+    // Release the memory stored for the priority queue heap to store indexes
+    queueSv4.clear();
+    // Release the memory from HyperBatch container
+    c.clear();
+  }
+
+  /**
+   * Prepares an output container with batches from Priority Queue for each 
record boundary. In case when this is the
+   * first batch for the known schema (indicated by true value of 
firstBatchForSchema) the output container is cleared
+   * and recreated with new HyperVectorWrapper objects and ValueVectors from 
PriorityQueue. In cases when the schema
+   * has not changed then it prepares the container keeping the VectorWrapper 
and SV4 references as is since that is
+   * what is needed by downstream operator.
+   */
+  private void prepareOutputContainer(VectorContainer dataContainer, 
SelectionVector4 dataSv4) {
+    container.zeroVectors();
+    hasOutputRecords = true;
+    // Check if this is the first output batch for the new known schema. If 
yes then prepare the output container
+    // with the proper vectors, otherwise re-use the previous vectors.
+    if (firstBatchForSchema) {
+      container.clear();
+      for (VectorWrapper<?> w : dataContainer) {
+        container.add(w.getValueVectors());
+      }
+      container.buildSchema(BatchSchema.SelectionVectorMode.FOUR_BYTE);
+      sv4 = dataSv4;
+    } else {
+      // Schema didn't changed so we should keep the reference of 
HyperVectorWrapper in outgoing container intact and
+      // populate the HyperVectorWrapper with new list of vectors. Here the 
assumption is order of ValueVectors is same
+      // across multiple record boundary unless a new schema is observed
+      int index = 0;
+      for (VectorWrapper<?> w : dataContainer) {
+        HyperVectorWrapper wrapper = (HyperVectorWrapper<?>) 
container.getValueVector(index++);
+        wrapper.updateVectorList(w.getValueVectors());
+      }
+      // Since the reference of SV4 is held by downstream operator and there 
is no schema change, so just copy the
+      // underlying buffer from priority queue sv4.
+      this.sv4.copy(dataSv4);
+    }
+    recordCount = sv4.getCount();
+    container.setRecordCount(recordCount);
+  }
+
+  /**
+   * Method handles returning correct outcome and setting recordCount for 
output container when next() is called
+   * multiple time for single record boundary. It handles cases when some 
output was already returned at current record
+   * boundary but Either there is more left to return OR proper outcome with 
empty batch is left to return.
+   * Example: For first EMIT record boundary if all the records were returned 
in previous call with OK_NEW_SCHEMA
+   * outcome, then this method will handle returning empty output batch with 
EMIT outcome in subsequent next() call.
+   * @return - Outcome to return downstream
+   */
+  private IterOutcome handleRemainingOutput() {
+    // if priority queue is not null that means the incoming batches were 
non-empty. And if there are more records
+    // to send downstream for this record boundary
+    if (priorityQueue != null && sv4.next()) {
+      recordCount = sv4.getCount();
+      container.setRecordCount(recordCount);
+    } else { // This means that either:
+      // 1) Priority Queue was not null and all records have been sent 
downstream for this record boundary
+      // 2) Or Priority Queue is null, since all the incoming batches were 
empty for current record boundary (or EMIT
+      // outcome). In the previous call we must have returned OK_NEW_SCHEMA 
along with SV4 container, so it will
+      // return EMIT outcome now
+      recordCount = 0;
+      container.setRecordCount(0);
+    }
+    return getFinalOutcome();
+  }
+
+  /**
+   * Method to handle preparing output container and returning proper outcome 
to downstream when either NONE or only
+   * empty batches have been seen but with EMIT outcome. In either of the case 
PriorityQueue is not created yet since no
+   * actual records have been received so far.
+   * @param incomingOutcome - outcome received from upstream. Either NONE or 
EMIT
+   * @return - outcome to return downstream. NONE when incomingOutcome is 
NONE. OK_NEW_SCHEMA/EMIT when incomingOutcome
+   * is EMIT and is first/non-first empty input batch respectively.
+   */
+  private IterOutcome handleEmptyBatches(IterOutcome incomingOutcome) {
+    IterOutcome outcomeToReturn = incomingOutcome;
+
+    // In case of NONE it will change state to DONE and return NONE whereas in 
case of
+    // EMIT it has to still continue working for future records.
+    if (incomingOutcome == NONE) { // this means we saw NONE
+      state = BatchState.DONE;
+      container.clear();
+      recordCount = 0;
+      container.setRecordCount(recordCount);
+    } else if (incomingOutcome == EMIT) {
+      // since priority queue is null that means it has not seen any batch 
with data
+      assert (countSincePurge == 0 && batchCount == 0);
+      final VectorContainer hyperContainer = new 
ExpandableHyperContainer(incoming.getContainer());
+      prepareOutputContainer(hyperContainer, sv4);
+
+      // update the outcome to return
+      outcomeToReturn = getFinalOutcome();
+    }
+
+    return outcomeToReturn;
+  }
+
   public static class SimpleSV4RecordBatch extends SimpleRecordBatch {
     private SelectionVector4 sv4;
 
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV2Copier.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV2Copier.java
index d9f1c8ea12..321d9a87d6 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV2Copier.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV2Copier.java
@@ -34,7 +34,6 @@ public void setup(RecordBatch incoming, VectorContainer 
outgoing) throws SchemaC
     this.sv2 = incoming.getSelectionVector2();
 
     final int count = outgoing.getNumberOfColumns();
-
     vvIn = new ValueVector[count];
 
     {
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV4Copier.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV4Copier.java
index 4f3afc36ce..cd6af07b37 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV4Copier.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV4Copier.java
@@ -22,10 +22,12 @@
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.record.selection.SelectionVector4;
-import org.apache.drill.exec.vector.ValueVector;
 
 public abstract class AbstractSV4Copier extends AbstractCopier {
-  protected ValueVector[][] vvIn;
+  // Storing VectorWrapper reference instead of ValueVector[]. With EMIT 
outcome support underlying operator
+  // operator can generate multiple output batches with no schema changes 
which will change the ValueVector[]
+  // reference but not VectorWrapper reference.
+  protected VectorWrapper<?>[] vvIn;
   private SelectionVector4 sv4;
 
   @Override
@@ -34,14 +36,13 @@ public void setup(RecordBatch incoming, VectorContainer 
outgoing) throws SchemaC
     this.sv4 = incoming.getSelectionVector4();
 
     final int count = outgoing.getNumberOfColumns();
-
-    vvIn = new ValueVector[count][];
+    vvIn = new VectorWrapper[count];
 
     {
       int index = 0;
 
       for (VectorWrapper vectorWrapper: incoming) {
-        vvIn[index] = vectorWrapper.getValueVectors();
+        vvIn[index] = vectorWrapper;
         index++;
       }
     }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericSV4Copier.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericSV4Copier.java
index f9b153d050..1f3d28bdc4 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericSV4Copier.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericSV4Copier.java
@@ -30,7 +30,8 @@ public void copyEntry(int inIndex, int outIndex) throws 
SchemaChangeException {
     int inOffset = inIndex & 0xFFFF;
     int inVector = inIndex >>> 16;
     for ( int i = 0;  i < vvIn.length;  i++ ) {
-      vvOut[i].copyEntry(outIndex, vvIn[i][inVector], inOffset);
+      ValueVector[] vectorsFromIncoming = vvIn[i].getValueVectors();
+      vvOut[i].copyEntry(outIndex, vectorsFromIncoming[inVector], inOffset);
     }
   }
 
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java
index 4e47051253..eddc9dff1d 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java
@@ -26,6 +26,8 @@
 
 import com.google.common.base.Preconditions;
 
+import java.lang.reflect.Array;
+
 
 public class HyperVectorWrapper<T extends ValueVector> implements 
VectorWrapper<T>{
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(HyperVectorWrapper.class);
@@ -138,7 +140,7 @@ public void addVector(ValueVector v) {
 
   @SuppressWarnings("unchecked")
   public void addVectors(ValueVector[] vv) {
-    vectors = (T[]) ArrayUtils.add(vectors, vv);
+    vectors = (T[]) ArrayUtils.addAll(vectors, vv);
   }
 
   /**
@@ -157,4 +159,22 @@ public void transfer(VectorWrapper<?> destination) {
       vectors[i].makeTransferPair(destionationVectors[i]).transfer();
     }
   }
+
+  /**
+   * Method to replace existing list of vectors with the newly provided 
ValueVectors list in this HyperVectorWrapper
+   * @param vv - New list of ValueVectors to be stored
+   */
+  @SuppressWarnings("unchecked")
+  public void updateVectorList(ValueVector[] vv) {
+    Preconditions.checkArgument(vv.length > 0);
+    
Preconditions.checkArgument(getField().getType().equals(vv[0].getField().getType()));
+    // vectors.length will always be > 0 since in constructor that is enforced
+    
Preconditions.checkArgument(vv[0].getClass().equals(vectors[0].getClass()));
+    clear();
+
+    final Class<?> clazz = vv[0].getClass();
+    final ValueVector[] c = (ValueVector[]) Array.newInstance(clazz, 
vv.length);
+    System.arraycopy(vv, 0, c, 0, vv.length);
+    vectors = (T[])c;
+  }
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java
index 2c10d6d437..a0b47ed89d 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java
@@ -60,6 +60,10 @@ public int getCount() {
     return length;
   }
 
+  private ByteBuf getData() {
+    return data;
+  }
+
   public void setCount(int length) {
     this.length = length;
     this.recordCount = length;
@@ -104,8 +108,7 @@ public SelectionVector4 createNewWrapperCurrent() {
   public boolean next() {
 //    logger.debug("Next called. Start: {}, Length: {}, recordCount: " + 
recordCount, start, length);
 
-    if (start + length >= recordCount) {
-
+    if (!hasNext()) {
       start = recordCount;
       length = 0;
 //      logger.debug("Setting count to zero.");
@@ -119,15 +122,33 @@ public boolean next() {
     return true;
   }
 
+  public boolean hasNext() {
+    final int endIndex = start + length;
+    return endIndex < recordCount;
+  }
+
   public void clear() {
     start = 0;
     length = 0;
+    recordCount = 0;
     if (data != DeadBuf.DEAD_BUFFER) {
       data.release();
       data = DeadBuf.DEAD_BUFFER;
     }
   }
 
+  public void copy(SelectionVector4 fromSV4) {
+    clear();
+    this.recordCount = fromSV4.getTotalCount();
+    this.length = fromSV4.getCount();
+    this.data = fromSV4.getData();
+    // Need to retain the data buffer since if fromSV4 clears out the buffer 
it's not actually released unless the
+    // copied SV4 has also released it
+    if (data != DeadBuf.DEAD_BUFFER) {
+      this.data.retain();
+    }
+  }
+
   public static int getBatchIndex(int sv4Index) {
     return (sv4Index >> 16) & 0xFFFF;
   }
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TopN/TestTopNEmitOutcome.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TopN/TestTopNEmitOutcome.java
new file mode 100644
index 0000000000..6066572558
--- /dev/null
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TopN/TestTopNEmitOutcome.java
@@ -0,0 +1,710 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.TopN;
+
+import com.google.common.collect.Lists;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.drill.categories.OperatorTest;
+import org.apache.drill.exec.physical.config.TopN;
+import org.apache.drill.exec.physical.impl.BaseTestOpBatchEmitOutcome;
+import org.apache.drill.exec.physical.impl.MockRecordBatch;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+import org.apache.drill.test.rowSet.RowSet;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
+import static 
org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+@Category(OperatorTest.class)
+public class TestTopNEmitOutcome extends BaseTestOpBatchEmitOutcome {
+  //private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(TestTopNEmitOutcome.class);
+
+  /**
+   * Verifies count of column in the received batch is same as expected count 
of columns.
+   * @param batch - Incoming record batch
+   * @param expectedColCount - Expected count of columns in the record batch
+   */
+  private void verifyColumnCount(VectorAccessible batch, int expectedColCount) 
{
+    List<String> columns = Lists.newArrayList();
+    SelectionVector4 sv4 = batch.getSelectionVector4();
+    for (VectorWrapper<?> vw : batch) {
+      if (sv4 != null) {
+        columns.add(vw.getValueVectors()[0].getField().getName());
+      } else {
+        columns.add(vw.getValueVector().getField().getName());
+      }
+    }
+    assertEquals(String.format("Actual number of columns: %d is different than 
expected count: %d",
+      columns.size(), expectedColCount), columns.size(), expectedColCount);
+  }
+
+  /**
+   * Verifies the data received in incoming record batch with the expected 
data stored inside the expected batch.
+   * Assumes input record batch has associated sv4 with it.
+   * @param batch - incoming record batch
+   * @param sv4 - associated sv4 with incoming record batch
+   * @param expectedBatch - expected record batch with expected data
+   */
+  private void verifyBaseline(VectorAccessible batch, SelectionVector4 sv4, 
VectorContainer expectedBatch) {
+    assertTrue(sv4 != null);
+    List<String> columns = Lists.newArrayList();
+    for (VectorWrapper<?> vw : batch) {
+      columns.add(vw.getValueVectors()[0].getField().getName());
+    }
+
+    for (int j = 0; j < sv4.getCount(); j++) {
+      List<String> eValue = new ArrayList<>(columns.size());
+      List<String> value = new ArrayList<>(columns.size());
+
+      for (VectorWrapper<?> vw : batch) {
+        Object o = vw.getValueVectors()[sv4.get(j) >>> 
16].getAccessor().getObject(sv4.get(j) & 65535);
+        decodeAndAddValue(o, value);
+      }
+
+      for (VectorWrapper<?> vw : expectedBatch) {
+        Object e = vw.getValueVector().getAccessor().getObject(j);
+        decodeAndAddValue(e, eValue);
+      }
+      assertTrue("Some of expected value didn't matches with actual 
value",eValue.equals(value));
+    }
+  }
+
+  private void decodeAndAddValue(Object currentValue, List<String> listToAdd) {
+    if (currentValue == null) {
+      listToAdd.add("null");
+    } else if (currentValue instanceof byte[]) {
+      listToAdd.add(new String((byte[]) currentValue));
+    } else {
+      listToAdd.add(currentValue.toString());
+    }
+  }
+
+  /**
+   * Verifies that if TopNBatch receives empty batches with OK_NEW_SCHEMA and 
EMIT outcome then it correctly produces
+   * empty batches as output. First empty batch will be with OK_NEW_SCHEMA and 
second will be with EMIT outcome.
+   * @throws Exception
+   */
+  @Test
+  public void testTopNEmptyBatchEmitOutcome() {
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(OK_NEW_SCHEMA);
+    inputOutcomes.add(EMIT);
+
+    final MockRecordBatch mockInputBatch = new 
MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+    final TopN topNConfig = new TopN(null,
+      Lists.newArrayList(ordering("id_left", 
RelFieldCollation.Direction.ASCENDING,
+        RelFieldCollation.NullDirection.FIRST)), false, 10);
+    final TopNBatch topNBatch = new TopNBatch(topNConfig, 
operatorFixture.getFragmentContext(), mockInputBatch);
+
+    assertTrue(topNBatch.next() == OK_NEW_SCHEMA);
+    outputRecordCount += topNBatch.getRecordCount();
+    assertTrue(topNBatch.next() == OK_NEW_SCHEMA);
+    assertTrue(topNBatch.next() == EMIT);
+    outputRecordCount += topNBatch.getRecordCount();
+    assertEquals(0, outputRecordCount);
+    assertTrue(topNBatch.next() == NONE);
+  }
+
+  /**
+   * Verifies that if TopNBatch receives a RecordBatch with EMIT outcome post 
build schema phase then it produces
+   * output for those input batch correctly. The first output batch will 
always be returned with OK_NEW_SCHEMA
+   * outcome followed by EMIT with empty batch. The test verifies the output 
order with the expected baseline.
+   * @throws Exception
+   */
+  @Test
+  public void testTopNNonEmptyBatchEmitOutcome() {
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = 
operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(2, 20, "item2")
+      .addRow(13, 130, "item13")
+      .addRow(4, 40, "item4")
+      .build();
+
+    final RowSet.SingleRowSet expectedRowSet = 
operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(13, 130, "item13")
+      .addRow(4, 40, "item4")
+      .addRow(2, 20, "item2")
+      .build();
+
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    final MockRecordBatch mockInputBatch = new 
MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+    final TopN topNConfig = new TopN(null,
+      Lists.newArrayList(ordering("id_left", 
RelFieldCollation.Direction.DESCENDING,
+        RelFieldCollation.NullDirection.FIRST)), false, 10);
+    final TopNBatch topNBatch = new TopNBatch(topNConfig, 
operatorFixture.getFragmentContext(), mockInputBatch);
+
+    assertTrue(topNBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    outputRecordCount += topNBatch.getRecordCount();
+    assertEquals(0, outputRecordCount);
+    assertTrue(topNBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    outputRecordCount += topNBatch.getRecordCount();
+    assertEquals(3, outputRecordCount);
+
+    // verify results
+    verifyColumnCount(topNBatch, inputSchema.size());
+    verifyBaseline(topNBatch, topNBatch.getSelectionVector4(), 
expectedRowSet.container());
+
+    assertTrue(topNBatch.next() == RecordBatch.IterOutcome.EMIT);
+    outputRecordCount += topNBatch.getRecordCount();
+    assertEquals(3, outputRecordCount);
+
+    // Release memory for row sets
+    nonEmptyInputRowSet2.clear();
+    expectedRowSet.clear();
+  }
+
+  @Test
+  public void testTopNEmptyBatchFollowedByNonEmptyBatchEmitOutcome() {
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = 
operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(2, 20, "item2")
+      .addRow(13, 130, "item13")
+      .addRow(4, 40, "item4")
+      .build();
+
+    final RowSet.SingleRowSet expectedRowSet = 
operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(13, 130, "item13")
+      .addRow(4, 40, "item4")
+      .addRow(2, 20, "item2")
+      .build();
+
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    final MockRecordBatch mockInputBatch = new 
MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+    final TopN topNConfig = new TopN(null,
+      Lists.newArrayList(ordering("id_left", 
RelFieldCollation.Direction.DESCENDING,
+        RelFieldCollation.NullDirection.FIRST)), false, 10);
+    final TopNBatch topNBatch = new TopNBatch(topNConfig, 
operatorFixture.getFragmentContext(), mockInputBatch);
+
+    assertTrue(topNBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    outputRecordCount += topNBatch.getRecordCount();
+    assertEquals(0, outputRecordCount);
+    assertTrue(topNBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertTrue(topNBatch.next() == RecordBatch.IterOutcome.EMIT);
+
+    assertTrue(topNBatch.next() == RecordBatch.IterOutcome.EMIT);
+    outputRecordCount += topNBatch.getRecordCount();
+    assertEquals(3, outputRecordCount);
+
+    // verify results
+    verifyColumnCount(topNBatch, inputSchema.size());
+    verifyBaseline(topNBatch, topNBatch.getSelectionVector4(), 
expectedRowSet.container());
+
+    // Release memory for row sets
+    nonEmptyInputRowSet2.clear();
+    expectedRowSet.clear();
+  }
+
+  @Test
+  public void testTopNMultipleEmptyBatchFollowedByNonEmptyBatchEmitOutcome() {
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = 
operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(2, 20, "item2")
+      .addRow(13, 130, "item13")
+      .addRow(4, 40, "item4")
+      .build();
+
+    final RowSet.SingleRowSet expectedRowSet = 
operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(13, 130, "item13")
+      .addRow(4, 40, "item4")
+      .addRow(2, 20, "item2")
+      .build();
+
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    final MockRecordBatch mockInputBatch = new 
MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+    final TopN topNConfig = new TopN(null,
+      Lists.newArrayList(ordering("id_left", 
RelFieldCollation.Direction.DESCENDING,
+        RelFieldCollation.NullDirection.FIRST)), false, 10);
+    final TopNBatch topNBatch = new TopNBatch(topNConfig, 
operatorFixture.getFragmentContext(), mockInputBatch);
+
+    assertTrue(topNBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    outputRecordCount += topNBatch.getRecordCount();
+    assertEquals(0, outputRecordCount);
+    assertTrue(topNBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertTrue(topNBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertTrue(topNBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertTrue(topNBatch.next() == RecordBatch.IterOutcome.EMIT);
+
+    assertTrue(topNBatch.next() == RecordBatch.IterOutcome.EMIT);
+    outputRecordCount += topNBatch.getRecordCount();
+    assertEquals(3, outputRecordCount);
+
+    // verify results
+    verifyColumnCount(topNBatch, inputSchema.size());
+    verifyBaseline(topNBatch, topNBatch.getSelectionVector4(), 
expectedRowSet.container());
+
+    // Release memory for row sets
+    nonEmptyInputRowSet2.clear();
+    expectedRowSet.clear();
+  }
+
+  /**
+   * Verifies that if TopNBatch receives multiple non-empty record batch with 
EMIT outcome in between then it produces
+   * output for those input batch correctly. In this case it receives first 
non-empty batch with OK_NEW_SCHEMA in
+   * buildSchema phase followed by an empty batch with EMIT outcome. For this 
combination it produces output for the
+   * record received so far along with EMIT outcome. Then it receives second 
non-empty batch with OK outcome and
+   * produces output for it differently. The test validates that for each 
output received the order of the records are
+   * correct.
+   * @throws Exception
+   */
+  @Test
+  public void testTopNResetsAfterFirstEmitOutcome() {
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = 
operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(2, 20, "item2")
+      .addRow(3, 30, "item3")
+      .build();
+
+    final RowSet.SingleRowSet expectedRowSet1 = 
operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 10, "item1")
+      .build();
+
+    final RowSet.SingleRowSet expectedRowSet2 = 
operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(3, 30, "item3")
+      .addRow(2, 20, "item2")
+      .build();
+
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    final MockRecordBatch mockInputBatch = new 
MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+    final TopN topNConfig = new TopN(null,
+      Lists.newArrayList(ordering("id_left", 
RelFieldCollation.Direction.DESCENDING,
+        RelFieldCollation.NullDirection.FIRST)), false, 10);
+    final TopNBatch topNBatch = new TopNBatch(topNConfig, 
operatorFixture.getFragmentContext(), mockInputBatch);
+
+
+    assertTrue(topNBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertTrue(topNBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertEquals(1, topNBatch.getRecordCount());
+
+    // verify results with baseline
+    verifyColumnCount(topNBatch, inputSchema.size());
+    verifyBaseline(topNBatch, topNBatch.getSelectionVector4(), 
expectedRowSet1.container());
+
+    assertTrue(topNBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(0, topNBatch.getRecordCount());
+
+    // State refresh happens and limit again works on new data batches
+    assertTrue(topNBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(2, topNBatch.getRecordCount());
+
+    // verify results with baseline
+    verifyColumnCount(topNBatch, inputSchema.size());
+    verifyBaseline(topNBatch, topNBatch.getSelectionVector4(), 
expectedRowSet2.container());
+
+    // Release memory for row sets
+    nonEmptyInputRowSet2.clear();
+    expectedRowSet2.clear();
+    expectedRowSet1.clear();
+  }
+
+  /**
+   * Verifies TopNBatch correctness for the case where it receives non-empty 
batch in build schema phase followed by
+   * empty batchs with OK and EMIT outcomes.
+   * @throws Exception
+   */
+  @Test
+  public void testTopN_NonEmptyFirst_EmptyOKEmitOutcome() {
+    final RowSet.SingleRowSet expectedRowSet1 = 
operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 10, "item1")
+      .build();
+
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.NONE);
+
+    final MockRecordBatch mockInputBatch = new 
MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+    final TopN topNConfig = new TopN(null,
+      Lists.newArrayList(ordering("id_left", 
RelFieldCollation.Direction.DESCENDING,
+        RelFieldCollation.NullDirection.FIRST)), false, 10);
+    final TopNBatch topNBatch = new TopNBatch(topNConfig, 
operatorFixture.getFragmentContext(), mockInputBatch);
+
+
+    assertTrue(topNBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertEquals(0, topNBatch.getRecordCount());
+    assertTrue(topNBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertEquals(1, topNBatch.getRecordCount());
+
+    // verify results with baseline
+    verifyColumnCount(topNBatch, inputSchema.size());
+    verifyBaseline(topNBatch, topNBatch.getSelectionVector4(), 
expectedRowSet1.container());
+
+    assertTrue(topNBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(0, topNBatch.getRecordCount());
+    assertTrue(topNBatch.next() == RecordBatch.IterOutcome.NONE);
+    // Release memory for row set
+    expectedRowSet1.clear();
+  }
+
+  /**
+   * Verifies that if TopNBatch receives multiple non-empty record batch with 
EMIT outcome in between then it produces
+   * output for those input batch correctly. In this case it receives first 
non-empty batch with OK_NEW_SCHEMA in
+   * buildSchema phase followed by an empty batch with EMIT outcome. For this 
combination it produces output for the
+   * record received so far along with EMIT outcome. Then it receives second 
non-empty batch with OK outcome and
+   * produces output for it differently. The test validates that for each 
output received the order of the records are
+   * correct.
+   * @throws Exception
+   */
+  @Test
+  public void testTopNMultipleOutputBatchWithLowerLimits() {
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = 
operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(4, 40, "item4")
+      .addRow(2, 20, "item2")
+      .addRow(5, 50, "item5")
+      .addRow(3, 30, "item3")
+      .build();
+
+    final RowSet.SingleRowSet expectedRowSet1 = 
operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 10, "item1")
+      .build();
+
+    final RowSet.SingleRowSet expectedRowSet2 = 
operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(5, 50, "item5")
+      .build();
+
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+
+    final MockRecordBatch mockInputBatch = new 
MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+    final TopN topNConfig = new TopN(null,
+      Lists.newArrayList(ordering("id_left", 
RelFieldCollation.Direction.DESCENDING,
+        RelFieldCollation.NullDirection.FIRST)), false, 1);
+    final TopNBatch topNBatch = new TopNBatch(topNConfig, 
operatorFixture.getFragmentContext(), mockInputBatch);
+
+
+    assertTrue(topNBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertTrue(topNBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertEquals(1, topNBatch.getRecordCount());
+
+    // verify results with baseline
+    verifyColumnCount(topNBatch, inputSchema.size());
+    verifyBaseline(topNBatch, topNBatch.getSelectionVector4(), 
expectedRowSet1.container());
+
+    assertTrue(topNBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(0, topNBatch.getRecordCount());
+
+    // State refresh happens and limit again works on new data batches
+    assertTrue(topNBatch.next() == RecordBatch.IterOutcome.OK);
+    assertEquals(1, topNBatch.getRecordCount());
+
+    // verify results with baseline
+    verifyColumnCount(topNBatch, inputSchema.size());
+    verifyBaseline(topNBatch, topNBatch.getSelectionVector4(), 
expectedRowSet2.container());
+
+    assertTrue(topNBatch.next() == RecordBatch.IterOutcome.NONE);
+
+    // Release memory for row sets
+    nonEmptyInputRowSet2.clear();
+    expectedRowSet2.clear();
+    expectedRowSet1.clear();
+  }
+
+  @Test
+  public void testTopNMultipleEMITOutcome() {
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = 
operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(2, 20, "item2")
+      .addRow(3, 30, "item3")
+      .build();
+
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    final MockRecordBatch mockInputBatch = new 
MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+    final TopN topNConfig = new TopN(null,
+      Lists.newArrayList(ordering("id_left", 
RelFieldCollation.Direction.DESCENDING,
+        RelFieldCollation.NullDirection.FIRST)), false, 10);
+    final TopNBatch topNBatch = new TopNBatch(topNConfig, 
operatorFixture.getFragmentContext(), mockInputBatch);
+
+    // first limit evaluation
+    assertTrue(topNBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertTrue(topNBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertEquals(1, topNBatch.getRecordCount());
+    assertTrue(topNBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(0, topNBatch.getRecordCount());
+
+    // After seeing EMIT limit will refresh it's state and again evaluate 
limit on next set of input batches
+    assertTrue(topNBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(2, topNBatch.getRecordCount());
+    assertTrue(topNBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(0, topNBatch.getRecordCount());
+
+    nonEmptyInputRowSet2.clear();
+  }
+
+  @Test
+  public void testTopNMultipleInputToSingleOutputBatch() throws Exception {
+
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = 
operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(2, 20, "item2")
+      .build();
+
+    final RowSet.SingleRowSet expectedRowSet = 
operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(2, 20, "item2")
+      .addRow(1, 10, "item1")
+      .build();
+
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    final MockRecordBatch mockInputBatch = new 
MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+    final TopN topNConfig = new TopN(null,
+      Lists.newArrayList(ordering("id_left", 
RelFieldCollation.Direction.DESCENDING,
+        RelFieldCollation.NullDirection.FIRST)), false, 10);
+    final TopNBatch topNBatch = new TopNBatch(topNConfig, 
operatorFixture.getFragmentContext(), mockInputBatch);
+
+    assertTrue(topNBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertTrue(topNBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertEquals(2, topNBatch.getRecordCount());
+
+    verifyColumnCount(topNBatch, inputSchema.size());
+    verifyBaseline(topNBatch, topNBatch.getSelectionVector4(), 
expectedRowSet.container());
+
+    assertTrue(topNBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(0, topNBatch.getRecordCount());
+
+    nonEmptyInputRowSet2.clear();
+  }
+
+  @Test
+  public void testTopNMultipleInputToMultipleOutputBatch_LowerLimits() {
+
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = 
operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(7, 70, "item7")
+      .addRow(3, 30, "item3")
+      .addRow(13, 130, "item13")
+      .build();
+
+    final RowSet.SingleRowSet nonEmptyInputRowSet3 = 
operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(17, 170, "item17")
+      .addRow(3, 30, "item3")
+      .addRow(13, 130, "item13")
+      .build();
+
+    final RowSet.SingleRowSet expectedRowSet1 = 
operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(13, 130, "item13")
+      .addRow(7, 70, "item7")
+      .build();
+
+    final RowSet.SingleRowSet expectedRowSet2 = 
operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(17, 170, "item17")
+      .addRow(13, 130, "item13")
+      .build();
+
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet3.container());
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    final MockRecordBatch mockInputBatch = new 
MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+    final TopN topNConfig = new TopN(null,
+      Lists.newArrayList(ordering("id_left", 
RelFieldCollation.Direction.DESCENDING,
+        RelFieldCollation.NullDirection.FIRST)), false, 2);
+    final TopNBatch topNBatch = new TopNBatch(topNConfig, 
operatorFixture.getFragmentContext(), mockInputBatch);
+
+    assertTrue(topNBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertTrue(topNBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertEquals(2, topNBatch.getRecordCount());
+
+    verifyColumnCount(topNBatch, inputSchema.size());
+    verifyBaseline(topNBatch, topNBatch.getSelectionVector4(), 
expectedRowSet1.container());
+
+    assertTrue(topNBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(0, topNBatch.getRecordCount());
+    assertTrue(topNBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(2, topNBatch.getRecordCount());
+
+    verifyColumnCount(topNBatch, inputSchema.size());
+    verifyBaseline(topNBatch, topNBatch.getSelectionVector4(), 
expectedRowSet2.container());
+
+    nonEmptyInputRowSet2.clear();
+    nonEmptyInputRowSet3.clear();
+    expectedRowSet1.clear();
+    expectedRowSet2.clear();
+  }
+
+  
/*****************************************************************************************
+   Tests for validating regular TopN behavior with no EMIT outcome
+  
******************************************************************************************/
+  @Test
+  public void testTopN_WithEmptyNonEmptyBatchesAndOKOutcome() {
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = 
operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(7, 70, "item7")
+      .addRow(3, 30, "item3")
+      .addRow(13, 130, "item13")
+      .build();
+
+    final RowSet.SingleRowSet nonEmptyInputRowSet3 = 
operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(17, 170, "item17")
+      .addRow(23, 230, "item23")
+      .addRow(130, 1300, "item130")
+      .build();
+
+    final RowSet.SingleRowSet expectedRowSet = 
operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(130, 1300, "item130")
+      .addRow(23, 230, "item23")
+      .addRow(17, 170, "item17")
+      .addRow(13, 130, "item13")
+      .build();
+
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet3.container());
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+
+    final MockRecordBatch mockInputBatch = new 
MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+    final TopN topNConfig = new TopN(null,
+      Lists.newArrayList(ordering("id_left", 
RelFieldCollation.Direction.DESCENDING,
+        RelFieldCollation.NullDirection.FIRST)), false, 4);
+    final TopNBatch topNBatch = new TopNBatch(topNConfig, 
operatorFixture.getFragmentContext(), mockInputBatch);
+
+    assertTrue(topNBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertTrue(topNBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertEquals(4, topNBatch.getRecordCount());
+
+    verifyColumnCount(topNBatch, inputSchema.size());
+    verifyBaseline(topNBatch, topNBatch.getSelectionVector4(), 
expectedRowSet.container());
+
+    assertTrue(topNBatch.next() == RecordBatch.IterOutcome.NONE);
+
+    nonEmptyInputRowSet2.clear();
+    nonEmptyInputRowSet3.clear();
+    expectedRowSet.clear();
+  }
+
+  @Test
+  public void testRegularTopNWithEmptyDataSet() {
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+
+    final MockRecordBatch mockInputBatch = new 
MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+    final TopN topNConfig = new TopN(null,
+      Lists.newArrayList(ordering("id_left", 
RelFieldCollation.Direction.DESCENDING,
+        RelFieldCollation.NullDirection.FIRST)), false, 4);
+    final TopNBatch topNBatch = new TopNBatch(topNConfig, 
operatorFixture.getFragmentContext(), mockInputBatch);
+
+    assertTrue(topNBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertTrue(topNBatch.next() == RecordBatch.IterOutcome.NONE);
+  }
+}
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java
index 3a17ef5c3f..f2cc366279 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java
@@ -69,6 +69,24 @@ public void testLateral_WithFilterAndLimitInSubQuery() 
throws Exception {
     test(Sql);
   }
 
+  @Test
+  public void testLateral_WithTopNInSubQuery() throws Exception {
+    String Sql = "SELECT customer.c_name, orders.o_id, orders.o_amount " +
+      "FROM cp.`lateraljoin/nested-customer.parquet` customer, LATERAL " +
+      "(SELECT t.ord.o_id as o_id, t.ord.o_amount as o_amount FROM 
UNNEST(customer.orders) t(ord) ORDER BY " +
+      "o_amount DESC LIMIT 1) orders";
+
+    testBuilder()
+      .sqlQuery(Sql)
+      .unOrdered()
+      .baselineColumns("c_name", "o_id", "o_amount")
+      .baselineValues("customer1", 3.0,  294.5)
+      .baselineValues("customer2", 10.0,  724.5)
+      .baselineValues("customer3", 23.0,  772.2)
+      .baselineValues("customer4", 32.0,  1030.1)
+      .go();
+  }
+
   @Test
   public void testOuterApply_WithFilterAndLimitInSubQuery() throws Exception {
     String Sql = "SELECT customer.c_name, customer.c_address, orders.o_id, 
orders.o_amount " +
@@ -122,6 +140,22 @@ public void 
testMultipleBatchesLateral_WithLimitInSubQuery() throws Exception {
     test(sql);
   }
 
+  @Test
+  public void testMultipleBatchesLateral_WithTopNInSubQuery() throws Exception 
{
+    String sql = "SELECT customer.c_name, orders.o_orderkey, 
orders.o_totalprice " +
+      "FROM dfs.`lateraljoin/multipleFiles` customer, LATERAL " +
+      "(SELECT t.ord.o_orderkey as o_orderkey, t.ord.o_totalprice as 
o_totalprice FROM UNNEST(customer.c_orders) t(ord)" +
+      " ORDER BY o_totalprice DESC LIMIT 1) orders";
+
+    testBuilder()
+      .sqlQuery(sql)
+      .unOrdered()
+      .baselineColumns("c_name", "o_orderkey", "o_totalprice")
+      .baselineValues("Customer#000951313", (long)47035683, 306996.2)
+      .baselineValues("Customer#000007180", (long)54646821, 367189.55)
+      .go();
+  }
+
   @Test
   public void testMultipleBatchesLateral_WithLimitFilterInSubQuery() throws 
Exception {
     String sql = "SELECT customer.c_name, customer.c_address, 
orders.o_orderkey, orders.o_totalprice " +


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Support for EMIT outcome in TopN
> --------------------------------
>
>                 Key: DRILL-6446
>                 URL: https://issues.apache.org/jira/browse/DRILL-6446
>             Project: Apache Drill
>          Issue Type: Task
>          Components: Execution - Relational Operators
>            Reporter: Sorabh Hamirwasia
>            Assignee: Sorabh Hamirwasia
>            Priority: Major
>              Labels: ready-to-commit
>             Fix For: 1.14.0
>
>
> With Lateral and Unnest if TopN is present in the sub-query, then it needs to 
> handle the EMIT outcome correctly. This means when a EMIT is received then 
> perform the TopN operation on the records buffered so far and produce output 
> with it. After EMIT TopN should refresh it's state and again work on next 
> batches of incoming record unless an EMIT is seen again.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to