DRILL-6327: Update unary operators to handle IterOutcome.EMIT
            Note: Handles for Non-Blocking Unary operators (like 
Filter/Project/etc) with EMIT Iter.Outcome

closes #1240


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/2f275d17
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/2f275d17
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/2f275d17

Branch: refs/heads/master
Commit: 2f275d1723a9e91acb94a359c4db2770385aac93
Parents: f563f38
Author: Sorabh Hamirwasia <[email protected]>
Authored: Wed Apr 4 17:54:58 2018 -0700
Committer: Vitalii Diravka <[email protected]>
Committed: Sun Apr 29 23:20:55 2018 +0300

----------------------------------------------------------------------
 .../physical/impl/filter/FilterRecordBatch.java |   5 +-
 .../physical/impl/filter/FilterTemplate2.java   |   1 +
 .../impl/flatten/FlattenRecordBatch.java        |   8 +-
 .../physical/impl/join/LateralJoinBatch.java    |   2 +-
 .../physical/impl/limit/LimitRecordBatch.java   | 203 ++++++++++-----
 .../impl/project/ProjectRecordBatch.java        |  61 +++--
 .../impl/svremover/RemovingRecordBatch.java     |   7 +-
 .../physical/impl/trace/TraceRecordBatch.java   |   2 +-
 .../IteratorValidatorBatchIterator.java         |   2 +
 .../exec/record/AbstractSingleRecordBatch.java  |  18 ++
 .../exec/record/AbstractUnaryRecordBatch.java   |  25 +-
 .../impl/BaseTestOpBatchEmitOutcome.java        |  84 ++++++
 .../exec/physical/impl/MockRecordBatch.java     |  13 +-
 .../impl/filter/TestFilterBatchEmitOutcome.java | 218 ++++++++++++++++
 .../impl/limit/TestLimitBatchEmitOutcome.java   | 258 +++++++++++++++++++
 .../physical/impl/limit/TestLimitOperator.java  | 131 ++++++++++
 .../impl/project/TestProjectEmitOutcome.java    | 220 ++++++++++++++++
 17 files changed, 1157 insertions(+), 101 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/2f275d17/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
index f0b832a..ac6d99f 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
@@ -81,7 +81,7 @@ public class FilterRecordBatch extends 
AbstractSingleRecordBatch<Filter> {
       throw new UnsupportedOperationException(e);
     }
 
-    return IterOutcome.OK;
+    return getFinalOutcome(false);
   }
 
   @Override
@@ -168,6 +168,9 @@ public class FilterRecordBatch extends 
AbstractSingleRecordBatch<Filter> {
     final ErrorCollector collector = new ErrorCollectorImpl();
     final List<TransferPair> transfers = Lists.newArrayList();
     final ClassGenerator<Filterer> cg = 
CodeGenerator.getRoot(Filterer.TEMPLATE_DEFINITION2, context.getOptions());
+    // Uncomment below lines to enable saving generated code file for debugging
+    // cg.getCodeGenerator().plainJavaCapable(true);
+    // cg.getCodeGenerator().saveCodeForDebugging(true);
 
     final LogicalExpression expr = 
ExpressionTreeMaterializer.materialize(popConfig.getExpr(), incoming, collector,
             context.getFunctionRegistry(), false, unionTypeEnabled);

http://git-wip-us.apache.org/repos/asf/drill/blob/2f275d17/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java
index 52533bd..6d1f034 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java
@@ -61,6 +61,7 @@ public abstract class FilterTemplate2 implements Filterer {
   @Override
   public void filterBatch(int recordCount) throws SchemaChangeException{
     if (recordCount == 0) {
+      outgoingSelectionVector.setRecordCount(0);
       return;
     }
     if (! outgoingSelectionVector.allocateNewSafe(recordCount)) {

http://git-wip-us.apache.org/repos/asf/drill/blob/2f275d17/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
index bbe9f76..d57246d 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
@@ -191,7 +191,8 @@ public class FlattenRecordBatch extends 
AbstractSingleRecordBatch<FlattenPOP> {
   public IterOutcome innerNext() {
     if (hasRemainder) {
       handleRemainder();
-      return IterOutcome.OK;
+      // Check if we are supposed to return EMIT outcome and have consumed 
entire batch
+      return getFinalOutcome(hasRemainder);
     }
     return super.innerNext();
   }
@@ -261,7 +262,10 @@ public class FlattenRecordBatch extends 
AbstractSingleRecordBatch<FlattenPOP> {
     }
 
     flattenMemoryManager.updateOutgoingStats(outputRecords);
-    return IterOutcome.OK;
+
+    // Get the final outcome based on hasRemainder since that will determine 
if all the incoming records were
+    // consumed in current output batch or not
+    return getFinalOutcome(hasRemainder);
   }
 
   private void handleRemainder() {

http://git-wip-us.apache.org/repos/asf/drill/blob/2f275d17/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
index 295ee78..6425b29 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
@@ -718,7 +718,7 @@ public class LateralJoinBatch extends 
AbstractBinaryRecordBatch<LateralJoinPOP>
   /**
    * Simple method to allocate space for all the vectors in the container.
    */
-  private void allocateVectors() {;
+  private void allocateVectors() {
     for (VectorWrapper w : container) {
       RecordBatchSizer.ColumnSize colSize = 
batchMemoryManager.getColumnSize(w.getField().getName());
       colSize.allocateVector(w.getValueVector(), maxOutputRowCount);

http://git-wip-us.apache.org/repos/asf/drill/blob/2f275d17/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
index f5443da..5888c34 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.physical.impl.limit;
 
 import java.util.List;
 
+import com.google.common.base.Preconditions;
 import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.ops.FragmentContext;
@@ -32,15 +33,18 @@ import 
org.apache.drill.exec.record.selection.SelectionVector2;
 
 import com.google.common.collect.Lists;
 
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
+
 public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
   // private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(LimitRecordBatch.class);
 
   private SelectionVector2 outgoingSv;
   private SelectionVector2 incomingSv;
-  private int recordsToSkip;
-  private int recordsLeft;
-  private final boolean noEndLimit;
-  private boolean skipBatch;
+
+  // Start offset of the records
+  private int recordStartOffset;
+  private int numberOfRecords;
   private boolean first = true;
   private final List<TransferPair> transfers = Lists.newArrayList();
 
@@ -48,12 +52,55 @@ public class LimitRecordBatch extends 
AbstractSingleRecordBatch<Limit> {
       throws OutOfMemoryException {
     super(popConfig, context, incoming);
     outgoingSv = new SelectionVector2(oContext.getAllocator());
-    recordsToSkip = popConfig.getFirst();
-    noEndLimit = popConfig.getLast() == null;
-    if(!noEndLimit) {
-      recordsLeft = popConfig.getLast() - recordsToSkip;
-    }
-    skipBatch = false;
+    refreshLimitState();
+  }
+
+  @Override
+  public IterOutcome innerNext() {
+    if (!first && !needMoreRecords(numberOfRecords)) {
+        outgoingSv.setRecordCount(0);
+        incoming.kill(true);
+
+        IterOutcome upStream = next(incoming);
+        if (upStream == IterOutcome.OUT_OF_MEMORY) {
+          return upStream;
+        }
+
+        while (upStream == IterOutcome.OK || upStream == 
IterOutcome.OK_NEW_SCHEMA) {
+          // Clear the memory for the incoming batch
+          for (VectorWrapper<?> wrapper : incoming) {
+            wrapper.getValueVector().clear();
+          }
+          upStream = next(incoming);
+          if (upStream == IterOutcome.OUT_OF_MEMORY) {
+            return upStream;
+          }
+        }
+        // If EMIT that means leaf operator is UNNEST, in this case refresh 
the limit states and return EMIT.
+        if (upStream == EMIT) {
+          refreshLimitState();
+          return upStream;
+        }
+        // other leaf operator behave as before.
+        return NONE;
+      }
+    return super.innerNext();
+  }
+
+  @Override
+  public SelectionVector2 getSelectionVector2() {
+    return outgoingSv;
+  }
+
+  @Override
+  public int getRecordCount() {
+    return outgoingSv.getCount();
+  }
+
+  @Override
+  public void close() {
+    outgoingSv.clear();
+    super.close();
   }
 
   @Override
@@ -61,10 +108,9 @@ public class LimitRecordBatch extends 
AbstractSingleRecordBatch<Limit> {
     container.zeroVectors();
     transfers.clear();
 
-
     for(final VectorWrapper<?> v : incoming) {
       final TransferPair pair = v.getValueVector().makeTransferPair(
-          container.addOrGet(v.getField(), callBack));
+        container.addOrGet(v.getField(), callBack));
       transfers.add(pair);
     }
 
@@ -88,36 +134,22 @@ public class LimitRecordBatch extends 
AbstractSingleRecordBatch<Limit> {
     return false;
   }
 
+  /**
+   * Gets the outcome to return from super implementation and then in case of 
EMIT outcome it refreshes the state of
+   * operator. Refresh is done to again apply limit on all the future incoming 
batches which will be part of next
+   * record boundary.
+   * @param hasRemainder
+   * @return - IterOutcome to send downstream
+   */
   @Override
-  public IterOutcome innerNext() {
-    if(!first && !noEndLimit && recordsLeft <= 0) {
-      incoming.kill(true);
-
-      IterOutcome upStream = next(incoming);
-      if (upStream == IterOutcome.OUT_OF_MEMORY) {
-        return upStream;
-      }
+  protected IterOutcome getFinalOutcome(boolean hasRemainder) {
+    final IterOutcome outcomeToReturn = super.getFinalOutcome(hasRemainder);
 
-      while (upStream == IterOutcome.OK || upStream == 
IterOutcome.OK_NEW_SCHEMA) {
-        // Clear the memory for the incoming batch
-        for (VectorWrapper<?> wrapper : incoming) {
-          wrapper.getValueVector().clear();
-        }
-        upStream = next(incoming);
-        if (upStream == IterOutcome.OUT_OF_MEMORY) {
-          return upStream;
-        }
-      }
-
-      return IterOutcome.NONE;
+    // EMIT outcome means leaf operator is UNNEST, hence refresh the state no 
matter limit is reached or not.
+    if (outcomeToReturn == EMIT) {
+      refreshLimitState();
     }
-
-    return super.innerNext();
-  }
-
-  @Override
-  public SelectionVector2 getSelectionVector2() {
-    return outgoingSv;
+    return outcomeToReturn;
   }
 
   @Override
@@ -125,40 +157,47 @@ public class LimitRecordBatch extends 
AbstractSingleRecordBatch<Limit> {
     if (first) {
       first = false;
     }
-    skipBatch = false;
-    final int recordCount = incoming.getRecordCount();
-    if (recordCount == 0) {
-      skipBatch = true;
-      return IterOutcome.OK;
+    final int inputRecordCount = incoming.getRecordCount();
+    if (inputRecordCount == 0) {
+      setOutgoingRecordCount(0);
+      return getFinalOutcome(false);
     }
+
     for(final TransferPair tp : transfers) {
       tp.transfer();
     }
-    if (recordCount <= recordsToSkip) {
-      recordsToSkip -= recordCount;
-      skipBatch = true;
+    // Check if current input record count is less than start offset. If yes 
then adjust the start offset since we
+    // have to ignore all these records and return empty batch.
+    if (inputRecordCount <= recordStartOffset) {
+      recordStartOffset -= inputRecordCount;
+      setOutgoingRecordCount(0);
     } else {
-      outgoingSv.allocateNew(recordCount);
-      limit(recordCount);
+      // Allocate SV2 vectors for the record count size since we transfer all 
the vectors buffer from input record
+      // batch to output record batch and later an SV2Remover copies the 
needed records.
+      outgoingSv.allocateNew(inputRecordCount);
+      limit(inputRecordCount);
     }
-
-    return IterOutcome.OK;
+    return getFinalOutcome(false);
   }
 
-  private void limit(int recordCount) {
-    final int offset = Math.max(0, Math.min(recordCount - 1, recordsToSkip));
-    recordsToSkip -= offset;
-    int fetch;
-
-    if(noEndLimit) {
-      fetch = recordCount;
+  /**
+   * limit call when incoming batch has number of records more than the start 
offset such that it can produce some
+   * output records. After first call of this method recordStartOffset should 
be 0 since we have already skipped the
+   * required number of records as part of first incoming record batch.
+   * @param inputRecordCount - number of records in incoming batch
+   */
+  private void limit(int inputRecordCount) {
+    int endRecordIndex;
+
+    if (numberOfRecords == Integer.MIN_VALUE) {
+      endRecordIndex = inputRecordCount;
     } else {
-      fetch = Math.min(recordCount, offset + recordsLeft);
-      recordsLeft -= Math.max(0, fetch - offset);
+      endRecordIndex = Math.min(inputRecordCount, recordStartOffset + 
numberOfRecords);
+      numberOfRecords -= Math.max(0, endRecordIndex - recordStartOffset);
     }
 
     int svIndex = 0;
-    for(int i = offset; i < fetch; svIndex++, i++) {
+    for(int i = recordStartOffset; i < endRecordIndex; svIndex++, i++) {
       if (incomingSv != null) {
         outgoingSv.setIndex(svIndex, incomingSv.getIndex(i));
       } else {
@@ -166,16 +205,44 @@ public class LimitRecordBatch extends 
AbstractSingleRecordBatch<Limit> {
       }
     }
     outgoingSv.setRecordCount(svIndex);
+    // Update the start offset
+    recordStartOffset = 0;
   }
 
-  @Override
-  public int getRecordCount() {
-    return skipBatch ? 0 : outgoingSv.getCount();
+  private void setOutgoingRecordCount(int outputCount) {
+    outgoingSv.setRecordCount(outputCount);
   }
 
-  @Override
-  public void close() {
-    outgoingSv.clear();
-    super.close();
+  /**
+   * Method which returns if more output records are needed from LIMIT 
operator. When numberOfRecords is set to
+   * {@link Integer#MIN_VALUE} that means there is no end bound on LIMIT, so 
get all the records past start offset.
+   * @return - true - more output records is expected.
+   *           false - limit bound is reached and no more record is expected
+   */
+  private boolean needMoreRecords(int recordsToRead) {
+    boolean readMore = true;
+
+    Preconditions.checkState(recordsToRead == Integer.MIN_VALUE || 
recordsToRead >= 0,
+      String.format("Invalid value of numberOfRecords %d inside 
LimitRecordBatch", recordsToRead));
+
+    // Above check makes sure that either numberOfRecords has no bound or if 
it has bounds then either we have read
+    // all the records or still left to read some.
+    // Below check just verifies if there is bound on numberOfRecords and we 
have read all of it.
+    if (recordsToRead == 0) {
+      readMore = false;
+    }
+    return readMore;
+  }
+
+  /**
+   * Reset the states for recordStartOffset and numberOfRecords based on the 
popConfig passed to the operator.
+   * This method is called for the outcome EMIT no matter if limit is reached 
or not.
+   */
+  private void refreshLimitState() {
+    // Make sure startOffset is non-negative
+    recordStartOffset = Math.max(0, popConfig.getFirst());
+    numberOfRecords = (popConfig.getLast() == null) ?
+      Integer.MIN_VALUE : Math.max(0, popConfig.getLast()) - recordStartOffset;
+    first = true;
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/2f275d17/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
index a96dfe1..eab9007 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -71,6 +71,8 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.List;
 
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
+
 public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(ProjectRecordBatch.class);
   private Projector projector;
@@ -129,7 +131,8 @@ public class ProjectRecordBatch extends 
AbstractSingleRecordBatch<Project> {
     recordCount = 0;
     if (hasRemainder) {
       handleRemainder();
-      return IterOutcome.OK;
+      // Check if we are supposed to return EMIT outcome and have consumed 
entire batch
+      return getFinalOutcome(hasRemainder);
     }
     return super.innerNext();
   }
@@ -151,7 +154,14 @@ public class ProjectRecordBatch extends 
AbstractSingleRecordBatch<Project> {
       if (complexWriters != null) {
         IterOutcome next = null;
         while (incomingRecordCount == 0) {
+          if (getLastKnownOutcome() == EMIT) {
+            throw new UnsupportedOperationException("Currently functions 
producing complex types as output is not " +
+              "supported in project list for subquery between LATERAL and 
UNNEST. Please re-write the query using this " +
+              "function in the projection list of outermost query.");
+          }
+
           next = next(incoming);
+          setLastKnownOutcome(next);
           if (next == IterOutcome.OUT_OF_MEMORY) {
             outOfMemory = true;
             return next;
@@ -166,28 +176,34 @@ public class ProjectRecordBatch extends 
AbstractSingleRecordBatch<Project> {
             // Only need to add the schema for the complex exprs because 
others should already have
             // been setup during setupNewSchema
             for (FieldReference fieldReference : complexFieldReferencesList) {
-              MaterializedField field = 
MaterializedField.create(fieldReference.getAsNamePart().getName(), 
UntypedNullHolder.TYPE);
+              MaterializedField field = 
MaterializedField.create(fieldReference.getAsNamePart().getName(),
+                UntypedNullHolder.TYPE);
               container.add(new UntypedNullVector(field, 
container.getAllocator()));
             }
             container.buildSchema(SelectionVectorMode.NONE);
             wasNone = true;
             return IterOutcome.OK_NEW_SCHEMA;
-          } else if (next != IterOutcome.OK && next != 
IterOutcome.OK_NEW_SCHEMA) {
+          } else if (next != IterOutcome.OK && next != 
IterOutcome.OK_NEW_SCHEMA && next != EMIT) {
             return next;
+          } else if (next == IterOutcome.OK_NEW_SCHEMA) {
+            try {
+              setupNewSchema();
+            } catch (final SchemaChangeException e) {
+              throw new RuntimeException(e);
+            }
           }
           incomingRecordCount = incoming.getRecordCount();
         }
-        if (next == IterOutcome.OK_NEW_SCHEMA) {
-          try {
-            setupNewSchema();
-          } catch (final SchemaChangeException e) {
-            throw new RuntimeException(e);
-          }
-        }
       }
     }
-    first = false;
 
+    if (complexWriters != null && getLastKnownOutcome() == EMIT) {
+      throw new UnsupportedOperationException("Currently functions producing 
complex types as output is not " +
+        "supported in project list for subquery between LATERAL and UNNEST. 
Please re-write the query using this " +
+        "function in the projection list of outermost query.");
+    }
+
+    first = false;
     container.zeroVectors();
 
     if (!doAlloc(incomingRecordCount)) {
@@ -214,7 +230,9 @@ public class ProjectRecordBatch extends 
AbstractSingleRecordBatch<Project> {
       container.buildSchema(SelectionVectorMode.NONE);
     }
 
-    return IterOutcome.OK;
+    // Get the final outcome based on hasRemainder since that will determine 
if all the incoming records were
+    // consumed in current output batch or not
+    return getFinalOutcome(hasRemainder);
   }
 
   private void handleRemainder() {
@@ -310,11 +328,18 @@ public class ProjectRecordBatch extends 
AbstractSingleRecordBatch<Project> {
       }
     }
     this.allocationVectors = Lists.newArrayList();
+
     if (complexWriters != null) {
       container.clear();
     } else {
+      // Not clearing the container here is fine since Project output schema 
is not determined solely based on incoming
+      // batch. It is defined by the expressions it has to evaluate.
+      //
+      // If there is a case where only the type of ValueVector already present 
in container is changed then addOrGet
+      // method takes care of it by replacing the vectors.
       container.zeroVectors();
     }
+
     final List<NamedExpression> exprs = getExpressionList();
     final ErrorCollector collector = new ErrorCollectorImpl();
     final List<TransferPair> transfers = Lists.newArrayList();
@@ -357,7 +382,8 @@ public class ProjectRecordBatch extends 
AbstractSingleRecordBatch<Project> {
               }
 
               final FieldReference ref = new FieldReference(name);
-              final ValueVector vvOut = 
container.addOrGet(MaterializedField.create(ref.getAsNamePart().getName(), 
vvIn.getField().getType()), callBack);
+              final ValueVector vvOut = 
container.addOrGet(MaterializedField.create(ref.getAsNamePart().getName(),
+                vvIn.getField().getType()), callBack);
               final TransferPair tp = vvIn.makeTransferPair(vvOut);
               transfers.add(tp);
             }
@@ -436,8 +462,9 @@ public class ProjectRecordBatch extends 
AbstractSingleRecordBatch<Project> {
         Preconditions.checkNotNull(incomingBatch);
 
         final FieldReference ref = getRef(namedExpression);
-        final ValueVector vvOut = 
container.addOrGet(MaterializedField.create(ref.getLastSegment().getNameSegment().getPath(),
-                                                                              
vectorRead.getMajorType()), callBack);
+        final ValueVector vvOut =
+          
container.addOrGet(MaterializedField.create(ref.getLastSegment().getNameSegment().getPath(),
+            vectorRead.getMajorType()), callBack);
         final TransferPair tp = vvIn.makeTransferPair(vvOut);
         transfers.add(tp);
         transferFieldIds.add(vectorRead.getFieldId().getFieldIds()[0]);
@@ -456,7 +483,10 @@ public class ProjectRecordBatch extends 
AbstractSingleRecordBatch<Project> {
         cg.addExpr(expr, ClassGenerator.BlkCreateMode.TRUE_IF_BOUND);
         if (complexFieldReferencesList == null) {
           complexFieldReferencesList = Lists.newArrayList();
+        } else {
+          complexFieldReferencesList.clear();
         }
+
         // save the field reference for later for getting schema when input is 
empty
         complexFieldReferencesList.add(namedExpression.getRef());
       } else {
@@ -813,5 +843,4 @@ public class ProjectRecordBatch extends 
AbstractSingleRecordBatch<Project> {
     wasNone = true;
     return IterOutcome.OK_NEW_SCHEMA;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/2f275d17/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
index 08ca029..a4207b0 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
@@ -76,11 +76,6 @@ public class RemovingRecordBatch extends 
AbstractSingleRecordBatch<SelectionVect
   }
 
   @Override
-  public IterOutcome innerNext() {
-    return super.innerNext();
-  }
-
-  @Override
   protected IterOutcome doWork() {
     try {
       copier.copyRecords(0, incoming.getRecordCount());
@@ -99,7 +94,7 @@ public class RemovingRecordBatch extends 
AbstractSingleRecordBatch<SelectionVect
 
     logger.debug("doWork(): {} records copied out of {}, incoming schema {} ",
       container.getRecordCount(), container.getRecordCount(), 
incoming.getSchema());
-    return IterOutcome.OK;
+    return getFinalOutcome(false);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/2f275d17/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
index 61d3214..50cb26b 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
@@ -122,7 +122,7 @@ public class TraceRecordBatch extends 
AbstractSingleRecordBatch<Trace> {
     if (incomingHasSv2) {
       sv = wrap.getSv2();
     }
-    return IterOutcome.OK;
+    return getFinalOutcome(false);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/2f275d17/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
index e75619e..05eb545 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
@@ -140,6 +140,7 @@ public class IteratorValidatorBatchIterator implements 
CloseableRecordBatch {
     case OK:
     case OK_NEW_SCHEMA:
     case NONE:
+    case EMIT:
       return;
     default:
       throw new IllegalStateException(
@@ -240,6 +241,7 @@ public class IteratorValidatorBatchIterator implements 
CloseableRecordBatch {
           validateBatch();
           break;
         case OK:
+        case EMIT:
           // OK is allowed as long as OK_NEW_SCHEMA was seen, except if 
terminated
           // (checked above).
           if (validationState != ValidationState.HAVE_SCHEMA) {

http://git-wip-us.apache.org/repos/asf/drill/blob/2f275d17/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
index 1a04b40..c8e2bda 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
@@ -21,6 +21,7 @@ import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 
+
 /**
  * Implements an AbstractUnaryRecordBatch where the inoming record batch is 
known at the time of creation
  * @param <T>
@@ -40,4 +41,21 @@ public abstract class AbstractSingleRecordBatch<T extends 
PhysicalOperator> exte
     return incoming;
   }
 
+  /**
+   * Based on lastKnownOutcome and if there are more records to be output for 
current record boundary detected by
+   * EMIT outcome, this method returns EMIT or OK outcome.
+   * @param hasMoreRecordInBoundary
+   * @return - EMIT - If the lastknownOutcome was EMIT and output records 
corresponding to all the incoming records in
+   * current record boundary is already produced.
+   *         - OK - otherwise
+   */
+  protected IterOutcome getFinalOutcome(boolean hasMoreRecordInBoundary) {
+    final IterOutcome lastOutcome = getLastKnownOutcome();
+    final boolean isLastOutcomeEmit = (IterOutcome.EMIT == lastOutcome);
+    if (isLastOutcomeEmit && !hasMoreRecordInBoundary) {
+      setLastKnownOutcome(IterOutcome.OK);
+      return IterOutcome.EMIT;
+    }
+    return IterOutcome.OK;
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/2f275d17/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractUnaryRecordBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractUnaryRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractUnaryRecordBatch.java
index e941405..ec34344 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractUnaryRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractUnaryRecordBatch.java
@@ -17,7 +17,6 @@
  */
 package org.apache.drill.exec.record;
 
-import com.google.common.base.Preconditions;
 import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.ops.FragmentContext;
@@ -39,6 +38,7 @@ public abstract class AbstractUnaryRecordBatch<T extends 
PhysicalOperator> exten
 
   protected boolean outOfMemory = false;
   protected SchemaChangeCallBack callBack = new SchemaChangeCallBack();
+  private IterOutcome lastKnownOutcome;
 
   public AbstractUnaryRecordBatch(T popConfig, FragmentContext context) throws 
OutOfMemoryException {
     super(popConfig, context, false);
@@ -68,9 +68,16 @@ public abstract class AbstractUnaryRecordBatch<T extends 
PhysicalOperator> exten
         }
       } while ((upstream = next(incoming)) == IterOutcome.OK && 
incoming.getRecordCount() == 0);
     }
-    if ((state == BatchState.FIRST) && upstream == IterOutcome.OK) {
-      upstream = IterOutcome.OK_NEW_SCHEMA;
+    if (state == BatchState.FIRST) {
+      if (upstream == IterOutcome.OK) {
+        upstream = IterOutcome.OK_NEW_SCHEMA;
+      } else if (upstream == IterOutcome.EMIT) {
+        throw new IllegalStateException("Received first batch with unexpected 
EMIT IterOutcome");
+      }
     }
+
+    // update the last outcome seen
+    lastKnownOutcome = upstream;
     switch (upstream) {
       case NONE:
         if (state == BatchState.FIRST) {
@@ -104,6 +111,7 @@ public abstract class AbstractUnaryRecordBatch<T extends 
PhysicalOperator> exten
         }
         // fall through.
       case OK:
+      case EMIT:
         assert state != BatchState.FIRST : "First batch should be 
OK_NEW_SCHEMA";
         container.zeroVectors();
         IterOutcome out = doWork();
@@ -164,4 +172,15 @@ public abstract class AbstractUnaryRecordBatch<T extends 
PhysicalOperator> exten
     return IterOutcome.NONE;
   }
 
+  protected IterOutcome getLastKnownOutcome() {
+    return lastKnownOutcome;
+  }
+
+  /**
+   * Set's the outcome received with current input batch in processing
+   * @param outcome
+   */
+  protected void setLastKnownOutcome(IterOutcome outcome) {
+    lastKnownOutcome = outcome;
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/2f275d17/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/BaseTestOpBatchEmitOutcome.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/BaseTestOpBatchEmitOutcome.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/BaseTestOpBatchEmitOutcome.java
new file mode 100644
index 0000000..cd24640
--- /dev/null
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/BaseTestOpBatchEmitOutcome.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl;
+
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.unit.PhysicalOpUnitTestBase;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.mock.MockStorePOP;
+import org.apache.drill.test.rowSet.RowSet;
+import org.apache.drill.test.rowSet.schema.SchemaBuilder;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class BaseTestOpBatchEmitOutcome extends PhysicalOpUnitTestBase {
+  //private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(BaseTestOpBatchEmitOutcome.class);
+
+  // input batch schema
+  protected static TupleMetadata inputSchema;
+
+  // default Empty input RowSet
+  protected RowSet.SingleRowSet emptyInputRowSet;
+
+  // default tNon-Empty input RowSet
+  protected RowSet.SingleRowSet nonEmptyInputRowSet;
+
+  // List of incoming containers
+  protected final List<VectorContainer> inputContainer = new ArrayList<>(5);
+
+  // List of incoming IterOutcomes
+  protected final List<RecordBatch.IterOutcome> inputOutcomes = new 
ArrayList<>(5);
+
+  // output record count
+  protected int outputRecordCount;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    inputSchema = new SchemaBuilder()
+      .add("id_left", TypeProtos.MinorType.INT)
+      .add("cost_left", TypeProtos.MinorType.INT)
+      .add("name_left", TypeProtos.MinorType.VARCHAR)
+      .buildSchema();
+  }
+
+  @Before
+  public void beforeTest() throws Exception {
+    emptyInputRowSet = operatorFixture.rowSetBuilder(inputSchema).build();
+    nonEmptyInputRowSet = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 10, "item1")
+      .build();
+    final PhysicalOperator mockPopConfig = new MockStorePOP(null);
+    mockOpContext(mockPopConfig, 0, 0);
+  }
+
+  @After
+  public void afterTest() throws Exception {
+    emptyInputRowSet.clear();
+    nonEmptyInputRowSet.clear();
+    inputContainer.clear();
+    inputOutcomes.clear();
+    outputRecordCount = 0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/2f275d17/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java
index 9ed9848..5463974 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java
@@ -41,6 +41,7 @@ public class MockRecordBatch implements CloseableRecordBatch {
   private int currentContainerIndex;
   private int currentOutcomeIndex;
   private boolean isDone;
+  private boolean limitWithUnnest;
 
   // All the below resources are owned by caller
   private final List<VectorContainer> allTestContainers;
@@ -98,9 +99,11 @@ public class MockRecordBatch implements CloseableRecordBatch 
{
 
   @Override
   public void kill(boolean sendUpstream) {
-    isDone = true;
-    container.clear();
-    container.setRecordCount(0);
+    if (!limitWithUnnest) {
+      isDone = true;
+      container.clear();
+      container.setRecordCount(0);
+    }
   }
 
   @Override
@@ -182,4 +185,8 @@ public class MockRecordBatch implements 
CloseableRecordBatch {
   public boolean isCompleted() {
     return isDone;
   }
+
+  public void useUnnestKillHandlingForLimit(boolean limitWithUnnest) {
+    this.limitWithUnnest = limitWithUnnest;
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/2f275d17/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestFilterBatchEmitOutcome.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestFilterBatchEmitOutcome.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestFilterBatchEmitOutcome.java
new file mode 100644
index 0000000..21043ef
--- /dev/null
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestFilterBatchEmitOutcome.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.filter;
+
+import org.apache.drill.categories.OperatorTest;
+import org.apache.drill.exec.physical.config.Filter;
+import org.apache.drill.exec.physical.impl.MockRecordBatch;
+import org.apache.drill.exec.physical.impl.BaseTestOpBatchEmitOutcome;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.test.rowSet.RowSet;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+@Category(OperatorTest.class)
+public class TestFilterBatchEmitOutcome extends BaseTestOpBatchEmitOutcome {
+
+  /**
+   * Test to show if an empty batch is accompanied with EMIT outcome then 
Filter operator is not ignoring it and
+   * asking for next batch with data. Instead it is just returning the empty 
batch along with EMIT outcome right away.
+   *
+   * This test also shows that if first batch accompanied with OK_NEW_SCHEMA 
is empty then it is also pass through by
+   * Filter operator rather than ignoring it and waiting for a batch with some 
data in it.
+   * @throws Throwable
+   */
+  @Test
+  public void testFilterEmptyBatchEmitOutcome() throws Throwable {
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    final MockRecordBatch mockInputBatch = new 
MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+    final Filter filterConf = new Filter(null, parseExpr("id_left=5"), 1.0f);
+    final FilterRecordBatch filterRecordBatch = new 
FilterRecordBatch(filterConf, mockInputBatch,
+      operatorFixture.getFragmentContext());
+
+    assertTrue(filterRecordBatch.next() == 
RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    outputRecordCount += filterRecordBatch.getRecordCount();
+    assertTrue(filterRecordBatch.next() == RecordBatch.IterOutcome.EMIT);
+    outputRecordCount += filterRecordBatch.getRecordCount();
+    assertEquals(0, outputRecordCount);
+  }
+
+  /**
+   * Test to show if a non-empty batch is accompanied with EMIT outcome then 
Filter operator produces output for
+   * that batch with data matching filter condition and return the output 
using EMIT outcome.
+   * @throws Throwable
+   */
+  @Test
+  public void testFilterNonEmptyBatchEmitOutcome() throws Throwable {
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    final MockRecordBatch mockInputBatch = new 
MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+    final Filter filterConf = new Filter(null, parseExpr("id_left=1"), 1.0f);
+    final FilterRecordBatch filterRecordBatch = new 
FilterRecordBatch(filterConf, mockInputBatch,
+      operatorFixture.getFragmentContext());
+
+    assertTrue(filterRecordBatch.next() == 
RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    outputRecordCount += filterRecordBatch.getRecordCount();
+    assertTrue(filterRecordBatch.next() == RecordBatch.IterOutcome.EMIT);
+    outputRecordCount += filterRecordBatch.getRecordCount();
+    assertEquals(1, outputRecordCount);
+  }
+
+  /**
+   * Test to show if a non-empty batch is accompanied with EMIT outcome then 
Filter operator produces empty output
+   * batch since filter condition is not satisfied by any data in incoming 
batch. This empty output batch is
+   * accompanied with EMIT outcome.
+   * @throws Throwable
+   */
+  @Test
+  public void testFilterNonEmptyBatchEmitOutcome_WithNonMatchingCondition() 
throws Throwable {
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    final MockRecordBatch mockInputBatch = new 
MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+    final Filter filterConf = new Filter(null, parseExpr("id_left=2"), 1.0f);
+    final FilterRecordBatch filterRecordBatch = new 
FilterRecordBatch(filterConf, mockInputBatch,
+      operatorFixture.getFragmentContext());
+
+    assertTrue(filterRecordBatch.next() == 
RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    outputRecordCount += filterRecordBatch.getRecordCount();
+    assertTrue(filterRecordBatch.next() == RecordBatch.IterOutcome.EMIT);
+    outputRecordCount += filterRecordBatch.getRecordCount();
+    assertEquals(0, outputRecordCount);
+  }
+
+  /**
+   * Test to show that non-empty first batch produces output for that batch 
with OK_NEW_SCHEMA and later empty batch
+   * with EMIT outcome is also passed through rather than getting ignored.
+   * @throws Throwable
+   */
+  @Test
+  public void testFilterNonEmptyFirst_EmptyBatchEmitOutcome() throws Throwable 
{
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    final MockRecordBatch mockInputBatch = new 
MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+    final Filter filterConf = new Filter(null, parseExpr("id_left=1"), 1.0f);
+    final FilterRecordBatch filterRecordBatch = new 
FilterRecordBatch(filterConf, mockInputBatch,
+      operatorFixture.getFragmentContext());
+
+    assertTrue(filterRecordBatch.next() == 
RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    outputRecordCount += filterRecordBatch.getRecordCount();
+    assertTrue(filterRecordBatch.next() == RecordBatch.IterOutcome.EMIT);
+    outputRecordCount += filterRecordBatch.getRecordCount();
+    assertEquals(1, outputRecordCount);
+  }
+
+  /**
+   * Test to show if an empty batch is accompanied with OK outcome then that 
batch is ignored by Filter operator and
+   * it doesn't return anything instead call's next() to get another batch. If 
the subsequent next() call returns empty
+   * batch with EMIT outcome then Filter returns the EMIT outcome correctly 
rather than ignoring it because of empty
+   * batch.
+   * @throws Throwable
+   */
+  @Test
+  public void testFilterNonEmptyFirst_EmptyOK_EmptyBatchEmitOutcome() throws 
Throwable {
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.NONE);
+
+    final MockRecordBatch mockInputBatch = new 
MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+    final Filter filterConf = new Filter(null, parseExpr("id_left=1"), 1.0f);
+    final FilterRecordBatch filterRecordBatch = new 
FilterRecordBatch(filterConf, mockInputBatch,
+      operatorFixture.getFragmentContext());
+
+    assertTrue(filterRecordBatch.next() == 
RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    outputRecordCount += filterRecordBatch.getRecordCount();
+    // OK will not be received since it's was accompanied with empty batch
+    assertTrue(filterRecordBatch.next() == RecordBatch.IterOutcome.EMIT);
+    outputRecordCount += filterRecordBatch.getRecordCount();
+    assertTrue(filterRecordBatch.next() == RecordBatch.IterOutcome.NONE);
+    assertEquals(1, outputRecordCount);
+  }
+
+  /**
+   * Test to show empty batch with OK outcome is ignore and later non-empty 
batch with OK outcome produces an output
+   * batch. Whereas a empty batch with EMIT outcome is not ignored and a empty 
output batch is returned with EMIT
+   * outcome.
+   * @throws Throwable
+   */
+  @Test
+  public void testFilterNonEmptyFirst_NonEmptyOK_EmptyBatchEmitOutcome() 
throws Throwable {
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = 
operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(2, 20, "item2")
+      .build();
+
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    final MockRecordBatch mockInputBatch = new 
MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+    final Filter filterConf = new Filter(null, parseExpr("id_left>=1"), 1.0f);
+    final FilterRecordBatch filterRecordBatch = new 
FilterRecordBatch(filterConf, mockInputBatch,
+      operatorFixture.getFragmentContext());
+
+    assertTrue(filterRecordBatch.next() == 
RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    outputRecordCount += filterRecordBatch.getRecordCount();
+    assertTrue(filterRecordBatch.next() == RecordBatch.IterOutcome.OK);
+    outputRecordCount += filterRecordBatch.getRecordCount();
+    assertTrue(filterRecordBatch.next() == RecordBatch.IterOutcome.EMIT);
+    outputRecordCount += filterRecordBatch.getRecordCount();
+    assertEquals(2, outputRecordCount);
+
+    // free up resources
+    nonEmptyInputRowSet2.clear();
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/2f275d17/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitBatchEmitOutcome.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitBatchEmitOutcome.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitBatchEmitOutcome.java
new file mode 100644
index 0000000..4757488
--- /dev/null
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitBatchEmitOutcome.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.limit;
+
+import org.apache.drill.exec.physical.config.Limit;
+import org.apache.drill.exec.physical.impl.MockRecordBatch;
+import org.apache.drill.exec.physical.impl.BaseTestOpBatchEmitOutcome;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.test.rowSet.RowSet;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestLimitBatchEmitOutcome extends BaseTestOpBatchEmitOutcome {
+
+  /**
+   * Test to show empty batch with both OK_NEW_SCHEMA and EMIT outcome is not 
ignored by Limit and is pass through to
+   * the downstream operator.
+   * @throws Throwable
+   */
+  @Test
+  public void testLimitEmptyBatchEmitOutcome() throws Throwable {
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+
+    final MockRecordBatch mockInputBatch = new 
MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+    // Only set for this Test class
+    mockInputBatch.useUnnestKillHandlingForLimit(true);
+
+    final Limit limitConf = new Limit(null, 0, 1);
+    final LimitRecordBatch limitBatch = new LimitRecordBatch(limitConf, 
operatorFixture.getFragmentContext(),
+      mockInputBatch);
+
+    assertTrue(limitBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    outputRecordCount += limitBatch.getRecordCount();
+    assertTrue(limitBatch.next() == RecordBatch.IterOutcome.EMIT);
+    outputRecordCount += limitBatch.getRecordCount();
+    assertEquals(0, outputRecordCount);
+  }
+
+  /**
+   * Test to validate limit considers all the data until it sees EMIT outcome 
and return output batch with data that
+   * meets the limit criteria.
+   * @throws Throwable
+   */
+  @Test
+  public void testLimitNonEmptyBatchEmitOutcome() throws Throwable {
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+
+    final MockRecordBatch mockInputBatch = new 
MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+    // Only set for this Test class
+    mockInputBatch.useUnnestKillHandlingForLimit(true);
+
+    final Limit limitConf = new Limit(null, 0, 1);
+    final LimitRecordBatch limitBatch = new LimitRecordBatch(limitConf, 
operatorFixture.getFragmentContext(),
+      mockInputBatch);
+
+    assertTrue(limitBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    outputRecordCount += limitBatch.getRecordCount();
+    assertEquals(0, outputRecordCount);
+    assertTrue(limitBatch.next() == RecordBatch.IterOutcome.EMIT);
+    outputRecordCount += limitBatch.getRecordCount();
+    assertEquals(1, outputRecordCount);
+  }
+
+  /**
+   * Test to show that once a limit number of records is produced using first 
set of batches then on getting a batch
+   * with EMIT outcome, the limit state is again refreshed and applied to next 
set of batches with data.
+   * @throws Throwable
+   */
+  @Test
+  public void testLimitResetsAfterFirstEmitOutcome() throws Throwable {
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = 
operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(2, 20, "item2")
+      .addRow(3, 30, "item3")
+      .build();
+
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+
+    final MockRecordBatch mockInputBatch = new 
MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+    // Only set for this Test class
+    mockInputBatch.useUnnestKillHandlingForLimit(true);
+
+    final Limit limitConf = new Limit(null, 0, 1);
+    final LimitRecordBatch limitBatch = new LimitRecordBatch(limitConf, 
operatorFixture.getFragmentContext(),
+      mockInputBatch);
+
+    assertTrue(limitBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertEquals(1, limitBatch.getRecordCount());
+    assertTrue(limitBatch.next() == RecordBatch.IterOutcome.EMIT);
+
+    // State refresh happens and limit again works on new data batches
+    assertEquals(0, limitBatch.getRecordCount());
+    assertTrue(limitBatch.next() == RecordBatch.IterOutcome.OK);
+    assertEquals(1, limitBatch.getRecordCount());
+    assertTrue(limitBatch.next() == RecordBatch.IterOutcome.NONE);
+  }
+
+  /**
+   * Test to show that when the limit number of records is found with first 
incoming batch, then next empty incoming
+   * batch with OK outcome is ignored, but the empty EMIT outcome batch is not 
ignored. Empty incoming batch with
+   * EMIT outcome produces empty output batch with EMIT outcome.
+   * @throws Throwable
+   */
+  @Test
+  public void testLimitNonEmptyFirst_EmptyOKEmitOutcome() throws Throwable {
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.NONE);
+
+    final MockRecordBatch mockInputBatch = new 
MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+    // Only set for this Test class
+    mockInputBatch.useUnnestKillHandlingForLimit(true);
+
+    final Limit limitConf = new Limit(null, 0, 1);
+    final LimitRecordBatch limitBatch = new LimitRecordBatch(limitConf, 
operatorFixture.getFragmentContext(),
+      mockInputBatch);
+
+    assertTrue(limitBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertEquals(1, limitBatch.getRecordCount());
+    // OK will not be received since it's was accompanied with empty batch
+    assertTrue(limitBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(0, limitBatch.getRecordCount());
+    assertTrue(limitBatch.next() == RecordBatch.IterOutcome.NONE);
+  }
+
+  /**
+   * Test to show that limit refreshes it's state after seeing first EMIT 
outcome and works on data batches following
+   * it as new set's of incoming batch and apply the limits rule from fresh on 
those. So for first set of batches with
+   * OK_NEW_SCHEMA and EMIT outcome but total number of records received being 
less than limit condition, it still
+   * produces an output with that many records (in this case 1 even though 
limit number of records is 2).
+   *
+   * After seeing EMIT, it refreshes it's state and operate on next input 
batches to again return limit number of
+   * records. So for 3rd batch with 2 records but with EMIT outcome it 
produces an output batch with 2 records not
+   * with 1 since state is refreshed.
+   * @throws Throwable
+   */
+  @Test
+  public void testMultipleLimitWithEMITOutcome() throws Throwable {
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = 
operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(2, 20, "item2")
+      .addRow(3, 30, "item3")
+      .build();
+
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    final MockRecordBatch mockInputBatch = new 
MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+    // Only set for this Test class
+    mockInputBatch.useUnnestKillHandlingForLimit(true);
+
+    final Limit limitConf = new Limit(null, 0, 2);
+    final LimitRecordBatch limitBatch = new LimitRecordBatch(limitConf, 
operatorFixture.getFragmentContext(),
+      mockInputBatch);
+
+    // first limit evaluation
+    assertTrue(limitBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertEquals(1, limitBatch.getRecordCount());
+    assertTrue(limitBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(0, limitBatch.getRecordCount());
+
+    // After seeing EMIT limit will refresh it's state and again evaluate 
limit on next set of input batches
+    assertTrue(limitBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(2, limitBatch.getRecordCount());
+
+    // Since limit is hit it will return NONE
+    assertTrue(limitBatch.next() == RecordBatch.IterOutcome.NONE);
+  }
+
+  /**
+   * Test shows that limit operates on multiple input batches until it finds 
limit number of records or it sees an
+   * EMIT outcome to refresh it's state.
+   * @throws Throwable
+   */
+  @Test
+  public void testLimitNonEmptyFirst_NonEmptyOK_EmptyBatchEmitOutcome() throws 
Throwable {
+
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = 
operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(2, 20, "item2")
+      .build();
+
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    final MockRecordBatch mockInputBatch = new 
MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+    // Only set for this Test class
+    mockInputBatch.useUnnestKillHandlingForLimit(true);
+
+    final Limit limitConf = new Limit(null, 0, 2);
+    final LimitRecordBatch limitBatch = new LimitRecordBatch(limitConf, 
operatorFixture.getFragmentContext(),
+      mockInputBatch);
+
+    assertTrue(limitBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertEquals(1, limitBatch.getRecordCount());
+    assertTrue(limitBatch.next() == RecordBatch.IterOutcome.OK);
+    assertEquals(1, limitBatch.getRecordCount());
+    assertTrue(limitBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(0, limitBatch.getRecordCount());
+
+    nonEmptyInputRowSet2.clear();
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/2f275d17/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitOperator.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitOperator.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitOperator.java
new file mode 100644
index 0000000..22c0013
--- /dev/null
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitOperator.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.limit;
+
+import com.google.common.collect.Lists;
+import org.apache.drill.exec.physical.config.Limit;
+import org.apache.drill.exec.physical.unit.PhysicalOpUnitTestBase;
+import org.junit.Test;
+
+import java.util.List;
+
+public class TestLimitOperator extends PhysicalOpUnitTestBase {
+
+  @Test
+  public void testLimitMoreRecords() {
+    Limit limitConf = new Limit(null, 0, 10);
+    List<String> inputJsonBatches = Lists.newArrayList(
+      "[{\"a\": 5, \"b\" : 1 }]",
+      "[{\"a\": 5, \"b\" : 5},{\"a\": 3, \"b\" : 8}]");
+    opTestBuilder()
+      .physicalOperator(limitConf)
+      .inputDataStreamJson(inputJsonBatches)
+      .baselineColumns("a", "b")
+      .baselineValues(5l, 1l)
+      .baselineValues(5l, 5l)
+      .baselineValues(3l, 8l)
+      .go();
+  }
+
+  @Test
+  public void testLimitLessRecords() {
+    Limit limitConf = new Limit(null, 0, 1);
+    List<String> inputJsonBatches = Lists.newArrayList(
+      "[{\"a\": 5, \"b\" : 1 }]",
+      "[{\"a\": 5, \"b\" : 5},{\"a\": 3, \"b\" : 8}]");
+    opTestBuilder()
+      .physicalOperator(limitConf)
+      .inputDataStreamJson(inputJsonBatches)
+      .baselineColumns("a", "b")
+      .baselineValues(5l, 1l)
+      .go();
+  }
+
+  @Test
+  public void testLimitWithOffset() {
+    Limit limitConf = new Limit(null, 2, 3);
+    List<String> inputJsonBatches = Lists.newArrayList(
+      "[{\"a\": 5, \"b\" : 1 }]",
+      "[{\"a\": 5, \"b\" : 5},{\"a\": 3, \"b\" : 8}]");
+    opTestBuilder()
+      .physicalOperator(limitConf)
+      .inputDataStreamJson(inputJsonBatches)
+      .baselineColumns("a", "b")
+      .baselineValues(3l, 8l)
+      .go();
+  }
+
+  @Test
+  public void testLimitWithNoLastRecord() {
+    Limit limitConf = new Limit(null, 1, null);
+    List<String> inputJsonBatches = Lists.newArrayList(
+      "[{\"a\": 5, \"b\" : 1 }]",
+      "[{\"a\": 5, \"b\" : 5},{\"a\": 3, \"b\" : 8}]");
+    opTestBuilder()
+      .physicalOperator(limitConf)
+      .inputDataStreamJson(inputJsonBatches)
+      .baselineColumns("a", "b")
+      .baselineValues(5l, 5l)
+      .baselineValues(3l, 8l)
+      .go();
+  }
+
+  @Test
+  public void testLimitWithNegativeOffset() {
+    Limit limitConf = new Limit(null, -1, null);
+    List<String> inputJsonBatches = Lists.newArrayList(
+      "[{\"a\": 5, \"b\" : 1 }]",
+      "[{\"a\": 5, \"b\" : 5},{\"a\": 3, \"b\" : 8}]");
+    opTestBuilder()
+      .physicalOperator(limitConf)
+      .inputDataStreamJson(inputJsonBatches)
+      .baselineColumns("a", "b")
+      .baselineValues(5l, 1l)
+      .baselineValues(5l, 5l)
+      .baselineValues(3l, 8l)
+      .go();
+  }
+
+  @Test
+  public void testLimitWithNegativeFirstLast() {
+    Limit limitConf = new Limit(null, -1, -1);
+    List<String> inputJsonBatches = Lists.newArrayList(
+      "[{\"a\": 5, \"b\" : 1 }]",
+      "[{\"a\": 5, \"b\" : 5},{\"a\": 3, \"b\" : 8}]");
+    opTestBuilder()
+      .physicalOperator(limitConf)
+      .inputDataStreamJson(inputJsonBatches)
+      .baselineColumns("a", "b")
+      .expectZeroRows()
+      .go();
+  }
+
+  @Test
+  public void testLimitWithOffsetOutOfRange() {
+    Limit limitConf = new Limit(null, 10, 20);
+    List<String> inputJsonBatches = Lists.newArrayList(
+      "[{\"a\": 5, \"b\" : 1 }]",
+      "[{\"a\": 5, \"b\" : 5},{\"a\": 3, \"b\" : 8}]");
+    opTestBuilder()
+      .physicalOperator(limitConf)
+      .inputDataStreamJson(inputJsonBatches)
+      .baselineColumns("a", "b")
+      .expectZeroRows()
+      .go();
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/2f275d17/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/project/TestProjectEmitOutcome.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/project/TestProjectEmitOutcome.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/project/TestProjectEmitOutcome.java
new file mode 100644
index 0000000..b3099d0
--- /dev/null
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/project/TestProjectEmitOutcome.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.project;
+
+import org.apache.drill.exec.physical.config.Project;
+import org.apache.drill.exec.physical.impl.MockRecordBatch;
+import org.apache.drill.exec.physical.impl.BaseTestOpBatchEmitOutcome;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.test.rowSet.RowSet;
+import org.junit.Test;
+
+import static junit.framework.TestCase.fail;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestProjectEmitOutcome extends BaseTestOpBatchEmitOutcome {
+
+  /**
+   * Test that if empty input batch is received with OK_NEW_SCHEMA or EMIT 
outcome, then Project doesn't ignores
+   * these empty batches and instead return them downstream with correct 
outcomes.
+   * @throws Throwable
+   */
+  @Test
+  public void testProjectEmptyBatchEmitOutcome() throws Throwable {
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    final MockRecordBatch mockInputBatch = new 
MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+    final Project projectConf = new Project(parseExprs("id_left+5", 
"id_left"), null);
+    final ProjectRecordBatch projectBatch = new 
ProjectRecordBatch(projectConf, mockInputBatch,
+      operatorFixture.getFragmentContext());
+
+    assertTrue(projectBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    outputRecordCount += projectBatch.getRecordCount();
+    assertTrue(projectBatch.next() == RecordBatch.IterOutcome.EMIT);
+    outputRecordCount += projectBatch.getRecordCount();
+    assertEquals(0, outputRecordCount);
+  }
+
+  /**
+   * Test to show if a non-empty batch is accompanied with EMIT outcome then 
Project operator produces output for
+   * that batch and return the output using EMIT outcome.
+   * @throws Throwable
+   */
+  @Test
+  public void testProjectNonEmptyBatchEmitOutcome() throws Throwable {
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    final MockRecordBatch mockInputBatch = new 
MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+    final Project projectConf = new Project(parseExprs("id_left+5", 
"id_left"), null);
+    final ProjectRecordBatch projectBatch = new 
ProjectRecordBatch(projectConf, mockInputBatch,
+      operatorFixture.getFragmentContext());
+
+    assertTrue(projectBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    outputRecordCount += projectBatch.getRecordCount();
+    assertTrue(projectBatch.next() == RecordBatch.IterOutcome.EMIT);
+    outputRecordCount += projectBatch.getRecordCount();
+    assertEquals(1, outputRecordCount);
+  }
+
+  /**
+   * Test to show that non-empty first batch produces output for that batch 
with OK_NEW_SCHEMA and later empty batch
+   * with EMIT outcome is also passed through rather than getting ignored.
+   * @throws Throwable
+   */
+  @Test
+  public void testProjectNonEmptyFirst_EmptyBatchEmitOutcome() throws 
Throwable {
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    final MockRecordBatch mockInputBatch = new 
MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+    final Project projectConf = new Project(parseExprs("id_left+5", 
"id_left"), null);
+    final ProjectRecordBatch projectBatch = new 
ProjectRecordBatch(projectConf, mockInputBatch,
+      operatorFixture.getFragmentContext());
+
+    assertTrue(projectBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    outputRecordCount += projectBatch.getRecordCount();
+    assertTrue(projectBatch.next() == RecordBatch.IterOutcome.EMIT);
+    outputRecordCount += projectBatch.getRecordCount();
+    assertEquals(1, outputRecordCount);
+  }
+
+  /**
+   * Test to show if an empty batch is accompanied with OK outcome then that 
batch is ignored by Project operator and
+   * it doesn't return anything instead call's next() to get another batch. If 
the subsequent next() call returns empty
+   * batch with EMIT outcome then Project returns the EMIT outcome correctly 
rather than ignoring it because of empty
+   * batch.
+   * @throws Throwable
+   */
+  @Test
+  public void testProjectNonEmptyFirst_EmptyOK_EmptyBatchEmitOutcome() throws 
Throwable {
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.NONE);
+
+    final MockRecordBatch mockInputBatch = new 
MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+    final Project projectConf = new Project(parseExprs("id_left+5", 
"id_left"), null);
+    final ProjectRecordBatch projectBatch = new 
ProjectRecordBatch(projectConf, mockInputBatch,
+      operatorFixture.getFragmentContext());
+
+    assertTrue(projectBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    outputRecordCount += projectBatch.getRecordCount();
+    // OK will not be received since it's was accompanied with empty batch
+    assertTrue(projectBatch.next() == RecordBatch.IterOutcome.EMIT);
+    outputRecordCount += projectBatch.getRecordCount();
+    assertTrue(projectBatch.next() == RecordBatch.IterOutcome.NONE);
+    assertEquals(1, outputRecordCount);
+  }
+
+  /**
+   * Test to show that in cases with functions in project list whose output is 
complex types, if Project sees an EMIT
+   * outcome then it fails. This scenario can happen when complex functions 
are used in subquery between LATERAL and
+   * UNNEST. In which case guidance is to use those functions in project list 
of outermost query.
+   * Below test passes first batch as non-empty with OK_NEW_SCHEMA during 
which complex writers are cached for
+   * projected columns and later when an empty batch arrives with EMIT outcome 
the exception is thrown.
+   * @throws Throwable
+   */
+  @Test
+  public void testProjectWithComplexWritersAndEmitOutcome_NonEmptyFirstBatch() 
throws Throwable {
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = 
operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(2, 20, "{ \"a\" : 1 }")
+      .build();
+
+    inputContainer.add(nonEmptyInputRowSet2.container());
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    final MockRecordBatch mockInputBatch = new 
MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+    final Project projectConf = new 
Project(parseExprs("convert_fromJSON(name_left)", "name_columns"), null);
+    final ProjectRecordBatch projectBatch = new 
ProjectRecordBatch(projectConf, mockInputBatch,
+      operatorFixture.getFragmentContext());
+
+    try {
+      assertTrue(projectBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+      assertTrue(projectBatch.next() == RecordBatch.IterOutcome.EMIT);
+      fail();
+    } catch (Exception e) {
+      // exception is expected because of complex writers case
+      assertTrue(e instanceof UnsupportedOperationException);
+    }
+  }
+
+  /**
+   * Test to show that in cases with functions in project list whose output is 
complex types, if Project sees an EMIT
+   * outcome then it fails. This scenario can happen when complex functions 
are used in subquery between LATERAL and
+   * UNNEST. In which case guidance is to use those functions in project list 
of outermost query.
+   *
+   * Below test passes first batch as empty with OK_NEW_SCHEMA during which 
complex writers are not known so far
+   * and Project calls next() on upstream to get a batch with data. But later 
when an empty batch arrives with EMIT
+   * outcome the exception is thrown as the scenario is not supported
+   * @throws Throwable
+   */
+  @Test
+  public void testProjectWithComplexWritersAndEmitOutcome_EmptyFirstBatch() 
throws Throwable {
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = 
operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(2, 20, "{ \"a\" : 1 }")
+      .build();
+
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    final MockRecordBatch mockInputBatch = new 
MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+    final Project projectConf = new 
Project(parseExprs("convert_fromJSON(name_left)", "name_columns"), null);
+    final ProjectRecordBatch projectBatch = new 
ProjectRecordBatch(projectConf, mockInputBatch,
+      operatorFixture.getFragmentContext());
+
+    try {
+      assertTrue(projectBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+      fail();
+    } catch (Exception e) {
+      // exception is expected because of complex writers case
+      assertTrue(e instanceof UnsupportedOperationException);
+    }
+  }
+}

Reply via email to