This is an automated email from the ASF dual-hosted git repository.

sorabh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit f8593997ec2ab5da96906e0e26df04e6ef73776e
Author: Sorabh Hamirwasia <shamirwa...@maprtech.com>
AuthorDate: Thu Jul 26 11:47:33 2018 -0700

    DRILL-6635: PartitionLimit for Lateral/Unnest
    PartitionLimitBatch initial implementation
    Add unit tests for PartitionLimitBatch
---
 .../drill/exec/physical/config/PartitionLimit.java |   62 ++
 .../exec/physical/impl/limit/LimitRecordBatch.java |   18 +-
 .../impl/limit/PartitionLimitBatchCreator.java     |   36 +
 ...rdBatch.java => PartitionLimitRecordBatch.java} |  193 ++--
 .../physical/impl/BaseTestOpBatchEmitOutcome.java  |    5 +
 .../drill/exec/physical/impl/MockRecordBatch.java  |   36 +-
 .../PartitionLimit/TestPartitionLimitBatch.java    | 1022 ++++++++++++++++++++
 7 files changed, 1272 insertions(+), 100 deletions(-)

diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/PartitionLimit.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/PartitionLimit.java
new file mode 100644
index 0000000..29f8bb2
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/PartitionLimit.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.config;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+
+@JsonTypeName("partition-limit")
+public class PartitionLimit extends Limit {
+  private final String partitionColumn;
+
+  @JsonCreator
+  public PartitionLimit(@JsonProperty("child") PhysicalOperator child, 
@JsonProperty("first") Integer first,
+                        @JsonProperty("last") Integer last, 
@JsonProperty("partitionColumn") String partitionColumn) {
+    super(child, first, last);
+    this.partitionColumn = partitionColumn;
+  }
+
+  public String getPartitionColumn() {
+    return partitionColumn;
+  }
+
+  @Override
+  protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
+    return new PartitionLimit(child, getFirst(), getLast(), 
getPartitionColumn());
+  }
+
+  @Override
+  public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> 
physicalVisitor, X value) throws E {
+    return physicalVisitor.visitLimit(this, value);
+  }
+
+  @Override
+  public SelectionVectorMode getSVMode() {
+    return SelectionVectorMode.TWO_BYTE;
+  }
+
+  @Override
+  public int getOperatorType() {
+    return CoreOperatorType.PARTITION_LIMIT_VALUE;
+  }
+}
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
index d28fd47..06f0fdb 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
@@ -71,6 +71,10 @@ public class LimitRecordBatch extends 
AbstractSingleRecordBatch<Limit> {
           for (VectorWrapper<?> wrapper : incoming) {
             wrapper.getValueVector().clear();
           }
+          // clear memory for incoming sv (if any)
+          if (incomingSv != null) {
+            incomingSv.clear();
+          }
           upStream = next(incoming);
           if (upStream == IterOutcome.OUT_OF_MEMORY) {
             return upStream;
@@ -82,6 +86,12 @@ public class LimitRecordBatch extends 
AbstractSingleRecordBatch<Limit> {
           for (VectorWrapper<?> wrapper : incoming) {
             wrapper.getValueVector().clear();
           }
+
+          // clear memory for incoming sv (if any)
+          if (incomingSv != null) {
+            incomingSv.clear();
+          }
+
           refreshLimitState();
           return upStream;
         }
@@ -109,7 +119,7 @@ public class LimitRecordBatch extends 
AbstractSingleRecordBatch<Limit> {
 
   @Override
   protected boolean setupNewSchema() throws SchemaChangeException {
-    container.zeroVectors();
+    container.clear();
     transfers.clear();
 
     for(final VectorWrapper<?> v : incoming) {
@@ -181,6 +191,12 @@ public class LimitRecordBatch extends 
AbstractSingleRecordBatch<Limit> {
       outgoingSv.allocateNew(inputRecordCount);
       limit(inputRecordCount);
     }
+
+    // clear memory for incoming sv (if any)
+    if (incomingSv != null) {
+      incomingSv.clear();
+    }
+
     return getFinalOutcome(false);
   }
 
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/PartitionLimitBatchCreator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/PartitionLimitBatchCreator.java
new file mode 100644
index 0000000..9c7ebd2
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/PartitionLimitBatchCreator.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.limit;
+
+import com.google.common.collect.Iterables;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
+import org.apache.drill.exec.physical.config.PartitionLimit;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.record.RecordBatch;
+
+import java.util.List;
+
+public class PartitionLimitBatchCreator implements 
BatchCreator<PartitionLimit> {
+  @Override
+  public PartitionLimitRecordBatch getBatch(ExecutorFragmentContext context, 
PartitionLimit config,
+                                            List<RecordBatch> children)
+      throws ExecutionSetupException {
+    return new PartitionLimitRecordBatch(config, context, 
Iterables.getOnlyElement(children));
+  }
+}
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/PartitionLimitRecordBatch.java
similarity index 55%
copy from 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
copy to 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/PartitionLimitRecordBatch.java
index d28fd47..0409980 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/PartitionLimitRecordBatch.java
@@ -17,26 +17,29 @@
  */
 package org.apache.drill.exec.physical.impl.limit;
 
-import java.util.List;
-
-import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.config.Limit;
+import org.apache.drill.exec.physical.config.PartitionLimit;
 import org.apache.drill.exec.record.AbstractSingleRecordBatch;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.TransferPair;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.vector.IntVector;
 
-import com.google.common.collect.Lists;
+import java.util.List;
 
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
-import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
 
-public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
+/**
+ * Helps to perform limit in a partition within a record batch. Currently only 
integer type of partition column is
+ * supported. This is mainly used for Lateral/Unnest subquery where each 
output batch from Unnest will contain an
+ * implicit column for rowId for each row.
+ */
+public class PartitionLimitRecordBatch extends 
AbstractSingleRecordBatch<PartitionLimit> {
   // private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(LimitRecordBatch.class);
 
   private SelectionVector2 outgoingSv;
@@ -45,10 +48,14 @@ public class LimitRecordBatch extends 
AbstractSingleRecordBatch<Limit> {
   // Start offset of the records
   private int recordStartOffset;
   private int numberOfRecords;
-  private boolean first = true;
   private final List<TransferPair> transfers = Lists.newArrayList();
 
-  public LimitRecordBatch(Limit popConfig, FragmentContext context, 
RecordBatch incoming)
+  // Partition RowId which is currently being processed, this will help to 
handle cases when rows for a partition id
+  // flows across 2 batches
+  private int partitionId;
+  private IntVector partitionColumn;
+
+  public PartitionLimitRecordBatch(PartitionLimit popConfig, FragmentContext 
context, RecordBatch incoming)
       throws OutOfMemoryException {
     super(popConfig, context, incoming);
     outgoingSv = new SelectionVector2(oContext.getAllocator());
@@ -56,42 +63,6 @@ public class LimitRecordBatch extends 
AbstractSingleRecordBatch<Limit> {
   }
 
   @Override
-  public IterOutcome innerNext() {
-    if (!first && !needMoreRecords(numberOfRecords)) {
-        outgoingSv.setRecordCount(0);
-        incoming.kill(true);
-
-        IterOutcome upStream = next(incoming);
-        if (upStream == IterOutcome.OUT_OF_MEMORY) {
-          return upStream;
-        }
-
-        while (upStream == IterOutcome.OK || upStream == 
IterOutcome.OK_NEW_SCHEMA) {
-          // Clear the memory for the incoming batch
-          for (VectorWrapper<?> wrapper : incoming) {
-            wrapper.getValueVector().clear();
-          }
-          upStream = next(incoming);
-          if (upStream == IterOutcome.OUT_OF_MEMORY) {
-            return upStream;
-          }
-        }
-        // If EMIT that means leaf operator is UNNEST, in this case refresh 
the limit states and return EMIT.
-        if (upStream == EMIT) {
-          // Clear the memory for the incoming batch
-          for (VectorWrapper<?> wrapper : incoming) {
-            wrapper.getValueVector().clear();
-          }
-          refreshLimitState();
-          return upStream;
-        }
-        // other leaf operator behave as before.
-        return NONE;
-      }
-    return super.innerNext();
-  }
-
-  @Override
   public SelectionVector2 getSelectionVector2() {
     return outgoingSv;
   }
@@ -104,18 +75,26 @@ public class LimitRecordBatch extends 
AbstractSingleRecordBatch<Limit> {
   @Override
   public void close() {
     outgoingSv.clear();
+    transfers.clear();
     super.close();
   }
 
   @Override
   protected boolean setupNewSchema() throws SchemaChangeException {
-    container.zeroVectors();
+    container.clear();
     transfers.clear();
 
     for(final VectorWrapper<?> v : incoming) {
       final TransferPair pair = v.getValueVector().makeTransferPair(
         container.addOrGet(v.getField(), callBack));
       transfers.add(pair);
+
+      // Hold the transfer pair target vector for partitionColumn, since 
before applying limit it transfer all rows
+      // from incoming to outgoing batch
+      String fieldName = v.getField().getName();
+      if (fieldName.equals(popConfig.getPartitionColumn())) {
+        partitionColumn = (IntVector) pair.getTo();
+      }
     }
 
     final BatchSchema.SelectionVectorMode svMode = 
incoming.getSchema().getSelectionVectorMode();
@@ -158,28 +137,31 @@ public class LimitRecordBatch extends 
AbstractSingleRecordBatch<Limit> {
 
   @Override
   protected IterOutcome doWork() {
-    if (first) {
-      first = false;
-    }
     final int inputRecordCount = incoming.getRecordCount();
     if (inputRecordCount == 0) {
       setOutgoingRecordCount(0);
+      for (VectorWrapper vw : incoming) {
+        vw.clear();
+      }
+      // Release buffer for sv2 (if any)
+      if (incomingSv != null) {
+        incomingSv.clear();
+      }
       return getFinalOutcome(false);
     }
 
-    for(final TransferPair tp : transfers) {
+    for (final TransferPair tp : transfers) {
       tp.transfer();
     }
-    // Check if current input record count is less than start offset. If yes 
then adjust the start offset since we
-    // have to ignore all these records and return empty batch.
-    if (inputRecordCount <= recordStartOffset) {
-      recordStartOffset -= inputRecordCount;
-      setOutgoingRecordCount(0);
-    } else {
-      // Allocate SV2 vectors for the record count size since we transfer all 
the vectors buffer from input record
-      // batch to output record batch and later an SV2Remover copies the 
needed records.
-      outgoingSv.allocateNew(inputRecordCount);
-      limit(inputRecordCount);
+
+    // Allocate SV2 vectors for the record count size since we transfer all 
the vectors buffer from input record
+    // batch to output record batch and later an SV2Remover copies the needed 
records.
+    outgoingSv.allocateNew(inputRecordCount);
+    limit(inputRecordCount);
+
+    // Release memory for incoming sv (if any)
+    if (incomingSv != null) {
+      incomingSv.clear();
     }
     return getFinalOutcome(false);
   }
@@ -191,62 +173,81 @@ public class LimitRecordBatch extends 
AbstractSingleRecordBatch<Limit> {
    * @param inputRecordCount - number of records in incoming batch
    */
   private void limit(int inputRecordCount) {
-    int endRecordIndex;
+    boolean outputAllRecords = (numberOfRecords == Integer.MIN_VALUE);
+
+    int svIndex = 0;
+    // If partitionId is not -1 that means it's set to previous batch last 
partitionId
+    partitionId = (partitionId == -1) ? getCurrentRowId(0) : partitionId;
+
+    for (int i=0; i < inputRecordCount;) {
+      // Get rowId from current right row
+      int currentRowId = getCurrentRowId(i);
+
+      if (partitionId == currentRowId) {
+        // Check if there is any start offset set for each partition and skip 
those records
+        if (recordStartOffset > 0) {
+          --recordStartOffset;
+          ++i;
+          continue;
+        }
+
+        // Once the start offset records are skipped then consider rows until 
numberOfRecords is reached for that
+        // partition
+        if (outputAllRecords) {
+          updateOutputSV2(svIndex++, i);
+        } else if (numberOfRecords > 0) {
+          updateOutputSV2(svIndex++, i);
+          --numberOfRecords;
+        }
+        ++i;
+      } else { // now a new row with different partition id is found
+        refreshConfigParameter();
+        partitionId = currentRowId;
+      }
+    }
+
+    setOutgoingRecordCount(svIndex);
+  }
 
-    if (numberOfRecords == Integer.MIN_VALUE) {
-      endRecordIndex = inputRecordCount;
+  private void updateOutputSV2(int svIndex, int incomingIndex) {
+    if (incomingSv != null) {
+      outgoingSv.setIndex(svIndex, incomingSv.getIndex(incomingIndex));
     } else {
-      endRecordIndex = Math.min(inputRecordCount, recordStartOffset + 
numberOfRecords);
-      numberOfRecords -= Math.max(0, endRecordIndex - recordStartOffset);
+      outgoingSv.setIndex(svIndex, (char) incomingIndex);
     }
+  }
 
-    int svIndex = 0;
-    for(int i = recordStartOffset; i < endRecordIndex; svIndex++, i++) {
-      if (incomingSv != null) {
-        outgoingSv.setIndex(svIndex, incomingSv.getIndex(i));
-      } else {
-        outgoingSv.setIndex(svIndex, (char) i);
-      }
+  private int getCurrentRowId(int incomingIndex) {
+    if (incomingSv != null) {
+      return 
partitionColumn.getAccessor().get(incomingSv.getIndex(incomingIndex));
+    } else {
+      return partitionColumn.getAccessor().get(incomingIndex);
     }
-    outgoingSv.setRecordCount(svIndex);
-    // Update the start offset
-    recordStartOffset = 0;
   }
 
   private void setOutgoingRecordCount(int outputCount) {
     outgoingSv.setRecordCount(outputCount);
+    container.setRecordCount(outputCount);
   }
 
   /**
-   * Method which returns if more output records are needed from LIMIT 
operator. When numberOfRecords is set to
-   * {@link Integer#MIN_VALUE} that means there is no end bound on LIMIT, so 
get all the records past start offset.
-   * @return - true - more output records is expected.
-   *           false - limit bound is reached and no more record is expected
+   * Reset the states for recordStartOffset, numberOfRecords and based on the 
{@link PartitionLimit} passed to the
+   * operator. It also resets the partitionId since after EMIT outcome there 
will be new partitionId to consider.
+   * This method is called for the each EMIT outcome received no matter if 
limit is reached or not.
    */
-  private boolean needMoreRecords(int recordsToRead) {
-    boolean readMore = true;
-
-    Preconditions.checkState(recordsToRead == Integer.MIN_VALUE || 
recordsToRead >= 0,
-      String.format("Invalid value of numberOfRecords %d inside 
LimitRecordBatch", recordsToRead));
-
-    // Above check makes sure that either numberOfRecords has no bound or if 
it has bounds then either we have read
-    // all the records or still left to read some.
-    // Below check just verifies if there is bound on numberOfRecords and we 
have read all of it.
-    if (recordsToRead == 0) {
-      readMore = false;
-    }
-    return readMore;
+  private void refreshLimitState() {
+    refreshConfigParameter();
+    partitionId = -1;
   }
 
   /**
-   * Reset the states for recordStartOffset and numberOfRecords based on the 
popConfig passed to the operator.
-   * This method is called for the outcome EMIT no matter if limit is reached 
or not.
+   * Only resets the recordStartOffset and numberOfRecord based on {@link 
PartitionLimit} passed to the operator. It
+   * is explicitly called after the limit for each partitionId is met or 
partitionId changes within an EMIT boundary.
    */
-  private void refreshLimitState() {
+  private void refreshConfigParameter() {
     // Make sure startOffset is non-negative
     recordStartOffset = Math.max(0, popConfig.getFirst());
     numberOfRecords = (popConfig.getLast() == null) ?
       Integer.MIN_VALUE : Math.max(0, popConfig.getLast()) - recordStartOffset;
-    first = true;
   }
 }
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/BaseTestOpBatchEmitOutcome.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/BaseTestOpBatchEmitOutcome.java
index cd24640..4eaca2b 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/BaseTestOpBatchEmitOutcome.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/BaseTestOpBatchEmitOutcome.java
@@ -23,6 +23,7 @@ import 
org.apache.drill.exec.physical.unit.PhysicalOpUnitTestBase;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.store.mock.MockStorePOP;
 import org.apache.drill.test.rowSet.RowSet;
 import org.apache.drill.test.rowSet.schema.SchemaBuilder;
@@ -48,6 +49,9 @@ public class BaseTestOpBatchEmitOutcome extends 
PhysicalOpUnitTestBase {
   // List of incoming containers
   protected final List<VectorContainer> inputContainer = new ArrayList<>(5);
 
+  // List of SV2's
+  protected final List<SelectionVector2> inputContainerSv2 = new 
ArrayList<>(5);
+
   // List of incoming IterOutcomes
   protected final List<RecordBatch.IterOutcome> inputOutcomes = new 
ArrayList<>(5);
 
@@ -79,6 +83,7 @@ public class BaseTestOpBatchEmitOutcome extends 
PhysicalOpUnitTestBase {
     nonEmptyInputRowSet.clear();
     inputContainer.clear();
     inputOutcomes.clear();
+    inputContainerSv2.clear();
     outputRecordCount = 0;
   }
 }
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java
index 0c43ab2..ed7af4c 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java
@@ -38,6 +38,7 @@ public class MockRecordBatch implements CloseableRecordBatch {
 
   // These resources are owned by this RecordBatch
   protected VectorContainer container;
+  protected SelectionVector2 sv2;
   private int currentContainerIndex;
   private int currentOutcomeIndex;
   private boolean isDone;
@@ -45,6 +46,7 @@ public class MockRecordBatch implements CloseableRecordBatch {
 
   // All the below resources are owned by caller
   private final List<VectorContainer> allTestContainers;
+  private List<SelectionVector2> allTestContainersSv2;
   private final List<IterOutcome> allOutcomes;
   private final FragmentContext context;
   protected final OperatorContext oContext;
@@ -62,19 +64,32 @@ public class MockRecordBatch implements 
CloseableRecordBatch {
     this.currentContainerIndex = 0;
     this.currentOutcomeIndex = 0;
     this.isDone = false;
+    this.allTestContainersSv2 = null;
+    this.sv2 = null;
+  }
+
+  public MockRecordBatch(FragmentContext context, OperatorContext oContext,
+                         List<VectorContainer> testContainers, 
List<IterOutcome> iterOutcomes,
+                         List<SelectionVector2> testContainersSv2, BatchSchema 
schema) {
+    this(context, oContext, testContainers, iterOutcomes, schema);
+    allTestContainersSv2 = testContainersSv2;
+    sv2 = (allTestContainersSv2 != null && allTestContainersSv2.size() > 0) ? 
new SelectionVector2(allocator) : null;
   }
 
   @Override
-  public void close() throws Exception {
+  public void close() {
     container.clear();
     container.setRecordCount(0);
     currentContainerIndex = 0;
     currentOutcomeIndex = 0;
+    if (sv2 != null) {
+      sv2.clear();
+    }
   }
 
   @Override
   public SelectionVector2 getSelectionVector2() {
-    return null;
+    return sv2;
   }
 
   @Override
@@ -94,7 +109,7 @@ public class MockRecordBatch implements CloseableRecordBatch 
{
 
   @Override
   public int getRecordCount() {
-    return container.getRecordCount();
+    return (sv2 == null) ? container.getRecordCount() : sv2.getCount();
   }
 
   @Override
@@ -103,6 +118,9 @@ public class MockRecordBatch implements 
CloseableRecordBatch {
       isDone = true;
       container.clear();
       container.setRecordCount(0);
+      if (sv2 != null) {
+        sv2.clear();
+      }
     }
   }
 
@@ -142,6 +160,18 @@ public class MockRecordBatch implements 
CloseableRecordBatch {
       }
       container.transferIn(input);
       container.setRecordCount(recordCount);
+
+      // Transfer the sv2 as well
+      final SelectionVector2 inputSv2 =
+        (allTestContainersSv2 != null && allTestContainersSv2.size() > 0)
+          ? allTestContainersSv2.get(currentContainerIndex) : null;
+      if (inputSv2 != null) {
+        sv2.allocateNewSafe(inputSv2.getCount());
+        for (int i=0; i<inputSv2.getCount(); ++i) {
+          sv2.setIndex(i, inputSv2.getIndex(i));
+        }
+        sv2.setRecordCount(inputSv2.getCount());
+      }
     }
 
     if (currentOutcomeIndex < allOutcomes.size()) {
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/PartitionLimit/TestPartitionLimitBatch.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/PartitionLimit/TestPartitionLimitBatch.java
new file mode 100644
index 0000000..574ff76
--- /dev/null
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/PartitionLimit/TestPartitionLimitBatch.java
@@ -0,0 +1,1022 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.PartitionLimit;
+
+import org.apache.drill.categories.OperatorTest;
+import org.apache.drill.exec.physical.config.PartitionLimit;
+import org.apache.drill.exec.physical.impl.BaseTestOpBatchEmitOutcome;
+import org.apache.drill.exec.physical.impl.MockRecordBatch;
+import org.apache.drill.exec.physical.impl.limit.PartitionLimitRecordBatch;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.test.rowSet.IndirectRowSet;
+import org.apache.drill.test.rowSet.RowSet;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.junit.After;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertTrue;
+
+@Category(OperatorTest.class)
+public class TestPartitionLimitBatch extends BaseTestOpBatchEmitOutcome {
+
+  private static String PARTITION_COLUMN;
+
+  // Holds reference to actual operator instance created for each tests
+  private static PartitionLimitRecordBatch limitBatch;
+
+  // Lits of expected outcomes populated by each tests. Used to verify actual 
IterOutcome returned with next call on
+  // operator to expected outcome
+  private final List<RecordBatch.IterOutcome> expectedOutcomes = new 
ArrayList<>();
+
+  // List of expected row counts populated by each tests. Used to verify 
actual output row count to expected row count
+  private final List<Integer> expectedRecordCounts = new ArrayList<>();
+
+  // List of expected row sets populated by each tests. Used to verify actual 
output from operator to expected output
+  private final List<RowSet> expectedRowSets = new ArrayList<>();
+
+  @BeforeClass
+  public static void partitionLimitSetup() {
+    PARTITION_COLUMN = inputSchema.column(0).getName();
+  }
+
+  /**
+   * Cleanup method executed post each test
+   */
+  @After
+  public void afterTestCleanup() {
+    // close limitBatch
+    limitBatch.close();
+
+    // Release memory from expectedRowSets
+    for (RowSet expectedRowSet : expectedRowSets) {
+      expectedRowSet.clear();
+    }
+    expectedOutcomes.clear();
+    expectedRecordCounts.clear();
+    expectedRowSets.clear();
+  }
+
+  /**
+   * Common method used by all the tests for {@link PartitionLimitRecordBatch} 
below. It creates the MockRecordBatch
+   * and {@link PartitionLimitRecordBatch} with the populated containers and 
outcomes list in the test. It also
+   * verifies the expected outcomes list and record count populated by each 
test against each next() call to
+   * {@link PartitionLimitRecordBatch}. For cases when the expected record 
count is >0 it verifies the actual output
+   * returned by {@link PartitionLimitRecordBatch} with expected output rows.
+   * @param start - Start offset for {@link PartitionLimit} PopConfig
+   * @param end - End offset for {@link PartitionLimit} PopConfig
+   */
+  private void testPartitionLimitCommon(Integer start, Integer end) {
+    final MockRecordBatch mockInputBatch = new 
MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, inputContainerSv2, 
inputContainer.get(0).getSchema());
+
+    final PartitionLimit limitConf = new PartitionLimit(null, start, end, 
PARTITION_COLUMN);
+    limitBatch = new PartitionLimitRecordBatch(limitConf, 
operatorFixture.getFragmentContext(), mockInputBatch);
+
+    int i=0;
+    int expectedRowSetIndex = 0;
+    while (i < expectedOutcomes.size()) {
+      try {
+        assertTrue(expectedOutcomes.get(i) == limitBatch.next());
+        assertTrue(expectedRecordCounts.get(i++) == 
limitBatch.getRecordCount());
+
+        if (limitBatch.getRecordCount() > 0) {
+          final RowSet actualRowSet = 
IndirectRowSet.fromSv2(limitBatch.getContainer(),
+            limitBatch.getSelectionVector2());
+          new 
RowSetComparison(expectedRowSets.get(expectedRowSetIndex++)).verify(actualRowSet);
+        }
+      } finally {
+        limitBatch.getSelectionVector2().clear();
+        limitBatch.getContainer().zeroVectors();
+      }
+    }
+  }
+
+  /**
+   * Verifies that empty batch with both OK_NEW_SCHEMA and EMIT outcome is not 
ignored by
+   * {@link PartitionLimitRecordBatch} and is passed to the downstream 
operator.
+   */
+  @Test
+  public void testPartitionLimit_EmptyBatchEmitOutcome() {
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    expectedRecordCounts.add(0);
+    expectedRecordCounts.add(0);
+
+    testPartitionLimitCommon(0, 1);
+  }
+
+  /**
+   * Verifies {@link PartitionLimitRecordBatch} considers all the batch until 
it sees EMIT outcome and return output
+   * batch with data that meets the {@link PartitionLimitRecordBatch} criteria.
+   */
+  @Test
+  public void testPartitionLimit_NonEmptyBatchEmitOutcome() {
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    expectedRecordCounts.add(0);
+    expectedRecordCounts.add(1);
+
+    RowSet expectedBatch =  operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 10, "item1")
+      .build();
+    expectedRowSets.add(expectedBatch);
+
+    testPartitionLimitCommon(0, 1);
+  }
+
+  /**
+   * Verifies that {@link PartitionLimitRecordBatch} batch operates on batches 
across EMIT boundary with fresh
+   * configuration. That is it considers partition column data separately for 
batches across EMIT boundary.
+   */
+  @Test
+  public void testPartitionLimit_ResetsAfterFirstEmitOutcome() {
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = 
operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 100, "item100")
+      .addRow(1, 101, "item101")
+      .addRow(1, 102, "item102")
+      .addRow(2, 200, "item200")
+      .build();
+
+    final RowSet expectedRowSet1 =  operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 10, "item1")
+      .build();
+    final RowSet expectedRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 100, "item100")
+      .addRow(2, 200, "item200")
+      .build();
+
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+
+
+    expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    expectedOutcomes.add(RecordBatch.IterOutcome.OK);
+    expectedOutcomes.add(RecordBatch.IterOutcome.NONE);
+
+    expectedRecordCounts.add(1);
+    expectedRecordCounts.add(0);
+    // Since in this input batch there is 2 different partitionId
+    expectedRecordCounts.add(2);
+    expectedRecordCounts.add(0);
+
+    expectedRowSets.add(expectedRowSet1);
+    expectedRowSets.add(expectedRowSet2);
+
+    testPartitionLimitCommon(0, 1);
+  }
+
+  /**
+   * Verifies that when the {@link PartitionLimitRecordBatch} number of 
records is found with first incoming batch,
+   * then next empty incoming batch with OK outcome is ignored, but the empty 
EMIT outcome batch is not ignored.
+   * Empty incoming batch with EMIT outcome produces empty output batch with 
EMIT outcome.
+   */
+  @Test
+  public void testPartitionLimit_NonEmptyFirst_EmptyOKEmitOutcome() {
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.NONE);
+
+    expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    expectedOutcomes.add(RecordBatch.IterOutcome.NONE);
+
+    expectedRecordCounts.add(1);
+    expectedRecordCounts.add(0);
+    expectedRecordCounts.add(0);
+
+    final RowSet expectedRowSet1 =  operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 10, "item1")
+      .build();
+    expectedRowSets.add(expectedRowSet1);
+
+    testPartitionLimitCommon(0, 1);
+  }
+
+  /**
+   * Verifies that {@link PartitionLimitRecordBatch} refreshes it's state 
after seeing first EMIT outcome and works on
+   * data batches following it as new set's of incoming batch and apply the 
partition limit rule from fresh on those.
+   * So for first set of batches with OK_NEW_SCHEMA and EMIT outcome the total 
number of records received being less
+   * than limit condition, it still produces an output with that many records 
for each partition key (in this case 1
+   * even though limit number of records is 2).
+   *
+   * After seeing EMIT, it refreshes it's state and operate on next input 
batches to again return limit number of
+   * records per partition id. So for 3rd batch with 6 records and 3 partition 
id and with EMIT outcome it produces an
+   * output batch with <=2 records for each partition id.
+   */
+  @Test
+  public void testPartitionLimit_AcrossEmitOutcome() {
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = 
operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 100, "item100")
+      .addRow(1, 101, "item101")
+      .addRow(1, 102, "item102")
+      .addRow(2, 200, "item200")
+      .addRow(3, 300, "item300")
+      .addRow(3, 301, "item301")
+      .build();
+
+    final RowSet expectedRows1 =  operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 10, "item1")
+      .build();
+
+    final RowSet.SingleRowSet expectedRows2 = 
operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 100, "item100")
+      .addRow(1, 101, "item101")
+      .addRow(2, 200, "item200")
+      .addRow(3, 300, "item300")
+      .addRow(3, 301, "item301")
+      .build();
+
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    expectedOutcomes.add(RecordBatch.IterOutcome.NONE);
+
+    expectedRecordCounts.add(expectedRows1.rowCount());
+    expectedRecordCounts.add(0);
+    expectedRecordCounts.add(expectedRows2.rowCount());
+    expectedRecordCounts.add(0);
+
+    expectedRowSets.add(expectedRows1);
+    expectedRowSets.add(expectedRows2);
+
+    testPartitionLimitCommon(0, 2);
+  }
+
+  /**
+   * Verifies that {@link PartitionLimitRecordBatch} considers same partition 
id across batches but within EMIT
+   * boundary to impose limit condition.
+   */
+  @Test
+  public void testPartitionLimit_PartitionIdSpanningAcrossBatches() {
+
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = 
operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 100, "item100")
+      .addRow(1, 101, "item101")
+      .addRow(1, 102, "item102")
+      .addRow(1, 103, "item103")
+      .addRow(2, 200, "item200")
+      .addRow(2, 201, "item201")
+      .addRow(2, 202, "item202")
+      .build();
+
+    final RowSet expectedRowSet1 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 10, "item1")
+      .build();
+
+    final RowSet expectedRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(2, 200, "item200")
+      .build();
+
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    // second OK batch is consumed by abstractRecordBatch since it's empty
+    expectedOutcomes.add(RecordBatch.IterOutcome.OK);
+    expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    expectedRecordCounts.add(1);
+    expectedRecordCounts.add(1);
+    expectedRecordCounts.add(0);
+
+    expectedRowSets.add(expectedRowSet1);
+    expectedRowSets.add(expectedRowSet2);
+
+    testPartitionLimitCommon(0 ,1);
+  }
+
+  @Test
+  public void testPartitionLimit_PartitionIdSpanningAcrossBatches_WithOffset() 
{
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = 
operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 100, "item100")
+      .addRow(1, 101, "item101")
+      .addRow(1, 102, "item102")
+      .addRow(1, 103, "item103")
+      .addRow(2, 200, "item200")
+      .addRow(2, 201, "item201")
+      .addRow(2, 202, "item202")
+      .build();
+
+    final RowSet expectedRowSet1 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 101, "item101")
+      .addRow(2, 202, "item202")
+      .build();
+
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    // second OK batch is consumed by abstractRecordBatch since it's empty
+    expectedOutcomes.add(RecordBatch.IterOutcome.OK);
+    expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    expectedRecordCounts.add(0);
+    expectedRecordCounts.add(2);
+    expectedRecordCounts.add(0);
+
+    expectedRowSets.add(expectedRowSet1);
+
+    testPartitionLimitCommon(2 ,3);
+  }
+
+  /**
+   * Verifies {@link PartitionLimitRecordBatch} works correctly in cases a 
partition id spans across batches and
+   * limit condition is met by picking records from multiple batch for same 
partition id.
+   */
+  @Test
+  public void testPartitionLimit_PartitionIdSelectedAcrossBatches() {
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = 
operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 100, "item100")
+      .addRow(1, 101, "item101")
+      .addRow(1, 102, "item102")
+      .addRow(1, 103, "item103")
+      .addRow(2, 200, "item200")
+      .addRow(2, 201, "item201")
+      .addRow(2, 202, "item202")
+      .build();
+
+    final RowSet expectedRowSet1 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 10, "item1")
+      .build();
+
+    final RowSet expectedRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 100, "item100")
+      .addRow(1, 101, "item101")
+      .addRow(1, 102, "item102")
+      .addRow(1, 103, "item103")
+      .addRow(2, 200, "item200")
+      .addRow(2, 201, "item201")
+      .addRow(2, 202, "item202")
+      .build();
+
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    // second OK batch is consumed by abstractRecordBatch since it's empty
+    expectedOutcomes.add(RecordBatch.IterOutcome.OK);
+    expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    expectedRecordCounts.add(expectedRowSet1.rowCount());
+    expectedRecordCounts.add(expectedRowSet2.rowCount());
+    expectedRecordCounts.add(0);
+
+    expectedRowSets.add(expectedRowSet1);
+    expectedRowSets.add(expectedRowSet2);
+
+    testPartitionLimitCommon(0 ,5);
+  }
+
+  /**
+   * Verifies {@link PartitionLimitRecordBatch} works correctly in cases where 
start offset is such that all the
+   * records of a partition id is ignored but records in other partition id is 
selected.
+   */
+  @Test
+  public void testPartitionLimit_IgnoreOnePartitionIdWithOffset() {
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = 
operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 100, "item100")
+      .addRow(1, 101, "item101")
+      .addRow(1, 102, "item102")
+      .addRow(1, 103, "item103")
+      .addRow(2, 200, "item200")
+      .addRow(2, 201, "item201")
+      .addRow(2, 202, "item202")
+      .build();
+
+    final RowSet expectedRowSet1 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 102, "item102")
+      .addRow(1, 103, "item103")
+      .build();
+
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    // second OK batch is consumed by abstractRecordBatch since it's empty
+    expectedOutcomes.add(RecordBatch.IterOutcome.OK);
+    expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    expectedRecordCounts.add(0);
+    expectedRecordCounts.add(expectedRowSet1.rowCount());
+    expectedRecordCounts.add(0);
+
+    expectedRowSets.add(expectedRowSet1);
+
+    testPartitionLimitCommon(3, 5);
+  }
+
+  @Test
+  public void testPartitionLimit_LargeOffsetIgnoreAllRecords() {
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = 
operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 100, "item100")
+      .addRow(1, 101, "item101")
+      .addRow(1, 102, "item102")
+      .addRow(1, 103, "item103")
+      .addRow(2, 200, "item200")
+      .addRow(2, 201, "item201")
+      .addRow(2, 202, "item202")
+      .build();
+
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    // second OK batch is consumed by abstractRecordBatch since it's empty
+    expectedOutcomes.add(RecordBatch.IterOutcome.OK);
+    expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    expectedRecordCounts.add(0);
+    expectedRecordCounts.add(0);
+    expectedRecordCounts.add(0);
+
+    testPartitionLimitCommon(5, 6);
+  }
+
+  /**
+   * Verifies {@link PartitionLimitRecordBatch} works correctly when start and 
end offset is same. In this case it
+   * works as Limit 0 scenario where it will not output any rows for any 
partition id across batches.
+   */
+  @Test
+  public void testPartitionLimit_Limit0() {
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = 
operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 100, "item100")
+      .addRow(1, 101, "item101")
+      .addRow(1, 102, "item102")
+      .addRow(1, 103, "item103")
+      .addRow(2, 200, "item200")
+      .addRow(2, 201, "item201")
+      .addRow(2, 202, "item202")
+      .build();
+
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    // second OK batch is consumed by abstractRecordBatch since it's empty
+    expectedOutcomes.add(RecordBatch.IterOutcome.OK);
+    expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    expectedRecordCounts.add(0);
+    expectedRecordCounts.add(0);
+    expectedRecordCounts.add(0);
+
+    testPartitionLimitCommon(0, 0);
+  }
+
+  /**
+   * Verifies {@link PartitionLimitRecordBatch} works correctly for cases 
where no end offset is mentioned. This
+   * necessary means selecting all the records in a partition.
+   */
+  @Test
+  public void testPartitionLimit_NoLimit() {
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = 
operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 100, "item100")
+      .addRow(1, 101, "item101")
+      .addRow(1, 102, "item102")
+      .addRow(1, 103, "item103")
+      .addRow(2, 200, "item200")
+      .addRow(2, 201, "item201")
+      .addRow(2, 202, "item202")
+      .build();
+
+    final RowSet expectedRowSet1 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 10, "item1")
+      .build();
+
+    final RowSet expectedRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 100, "item100")
+      .addRow(1, 101, "item101")
+      .addRow(1, 102, "item102")
+      .addRow(1, 103, "item103")
+      .addRow(2, 200, "item200")
+      .addRow(2, 201, "item201")
+      .addRow(2, 202, "item202")
+      .build();
+
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    // second OK batch is consumed by abstractRecordBatch since it's empty
+    expectedOutcomes.add(RecordBatch.IterOutcome.OK);
+    expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    expectedRecordCounts.add(expectedRowSet1.rowCount());
+    expectedRecordCounts.add(expectedRowSet2.rowCount());
+    expectedRecordCounts.add(0);
+
+    expectedRowSets.add(expectedRowSet1);
+    expectedRowSets.add(expectedRowSet2);
+
+    testPartitionLimitCommon(0, null);
+  }
+
+  /**
+   * Verifies {@link PartitionLimitRecordBatch} takes care of provided 
negative start offset correctly
+   */
+  @Test
+  public void testPartitionLimit_NegativeOffset() {
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = 
operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 100, "item100")
+      .addRow(1, 101, "item101")
+      .addRow(1, 102, "item102")
+      .addRow(1, 103, "item103")
+      .addRow(2, 200, "item200")
+      .addRow(2, 201, "item201")
+      .addRow(2, 202, "item202")
+      .build();
+
+    final RowSet expectedRowSet1 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 10, "item1")
+      .build();
+
+    final RowSet expectedRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 100, "item100")
+      .addRow(2, 200, "item200")
+      .addRow(2, 201, "item201")
+      .build();
+
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    // second OK batch is consumed by abstractRecordBatch since it's empty
+    expectedOutcomes.add(RecordBatch.IterOutcome.OK);
+    expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    expectedRecordCounts.add(expectedRowSet1.rowCount());
+    expectedRecordCounts.add(expectedRowSet2.rowCount());
+    expectedRecordCounts.add(0);
+
+    expectedRowSets.add(expectedRowSet1);
+    expectedRowSets.add(expectedRowSet2);
+
+    testPartitionLimitCommon(-5, 2);
+  }
+
+  /**
+   * Verifies {@link PartitionLimitRecordBatch} behaves correctly across EMIT 
boundary with single or multiple
+   * batches within each EMIT boundary. It resets it states correctly across 
EMIT boundary and then operates on all
+   * the batches within EMIT boundary at a time.
+   */
+  @Test
+  public void testPartitionLimit_MultipleEmit_SingleMultipleBatch() {
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = 
operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 100, "item100")
+      .addRow(1, 101, "item101")
+      .addRow(1, 102, "item102")
+      .addRow(1, 103, "item103")
+      .addRow(2, 200, "item200")
+      .addRow(2, 201, "item201")
+      .addRow(2, 202, "item202")
+      .build();
+
+    // Second EMIT boundary batches
+    final RowSet.SingleRowSet nonEmptyInputRowSet3 = 
operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 1001, "item1001")
+      .addRow(1, 1002, "item1002")
+      .addRow(1, 1003, "item1003")
+      .addRow(2, 2000, "item2000")
+      .addRow(2, 2001, "item2001")
+      .addRow(2, 2002, "item2002")
+      .build();
+
+    final RowSet.SingleRowSet nonEmptyInputRowSet4 = 
operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(3, 3001, "item3001")
+      .addRow(3, 3002, "item3002")
+      .addRow(3, 3003, "item3003")
+      .addRow(4, 4000, "item4000")
+      .addRow(4, 4001, "item4001")
+      .build();
+
+    // Third EMIT boundary batches
+    final RowSet.SingleRowSet nonEmptyInputRowSet5 = 
operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 10001, "item10001")
+      .addRow(1, 10002, "item10002")
+      .addRow(1, 10003, "item10003")
+      .build();
+
+    // First EMIT boundary expected rowsets
+    final RowSet expectedRowSet1 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 10, "item1")
+      .build();
+
+    final RowSet expectedRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 100, "item100")
+      .addRow(2, 200, "item200")
+      .addRow(2, 201, "item201")
+      .build();
+
+    // Second EMIT boundary expected rowsets
+    final RowSet expectedRowSet3 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 1001, "item1001")
+      .addRow(1, 1002, "item1002")
+      .addRow(2, 2000, "item2000")
+      .addRow(2, 2001, "item2001")
+      .build();
+
+    final RowSet expectedRowSet4 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(3, 3001, "item3001")
+      .addRow(3, 3002, "item3002")
+      .addRow(4, 4000, "item4000")
+      .addRow(4, 4001, "item4001")
+      .build();
+
+    // Third EMIT boundary expected rowsets
+    final RowSet expectedRowSet5 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 10001, "item10001")
+      .addRow(1, 10002, "item10002")
+      .build();
+
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+    inputContainer.add(nonEmptyInputRowSet3.container());
+    inputContainer.add(nonEmptyInputRowSet4.container());
+    inputContainer.add(nonEmptyInputRowSet5.container());
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    expectedOutcomes.add(RecordBatch.IterOutcome.OK);
+    expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    expectedRecordCounts.add(expectedRowSet1.rowCount());
+    expectedRecordCounts.add(expectedRowSet2.rowCount());
+    expectedRecordCounts.add(expectedRowSet3.rowCount());
+    expectedRecordCounts.add(expectedRowSet4.rowCount());
+    expectedRecordCounts.add(expectedRowSet5.rowCount());
+    expectedRecordCounts.add(0);
+
+    expectedRowSets.add(expectedRowSet1);
+    expectedRowSets.add(expectedRowSet2);
+    expectedRowSets.add(expectedRowSet3);
+    expectedRowSets.add(expectedRowSet4);
+    expectedRowSets.add(expectedRowSet5);
+
+    testPartitionLimitCommon(-5, 2);
+  }
+
+  /**
+   * Verifies {@link PartitionLimitRecordBatch} behaves correctly across EMIT 
boundary with single or multiple
+   * batches (with sv2) within each EMIT boundary. It resets it states 
correctly across EMIT boundary and then
+   * operates on all the batches within EMIT boundary at a time.
+   */
+  @Test
+  public void testPartitionLimit_MultipleEmit_SingleMultipleBatch_WithSV2() {
+    final RowSet.SingleRowSet emptyWithSv2 = 
operatorFixture.rowSetBuilder(inputSchema)
+      .withSv2()
+      .build();
+
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = 
operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 100, "item100")
+      .addRow(1, 101, "item101")
+      .addRow(1, 102, "item102")
+      .addRow(1, 103, "item103")
+      .addRow(2, 200, "item200")
+      .addRow(2, 201, "item201")
+      .addRow(2, 202, "item202")
+      .withSv2()
+      .build();
+
+    // Second EMIT boundary batches
+    final RowSet.SingleRowSet nonEmptyInputRowSet3 = 
operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 1001, "item1001")
+      .addRow(1, 1002, "item1002")
+      .addRow(1, 1003, "item1003")
+      .addRow(2, 2000, "item2000")
+      .addRow(2, 2001, "item2001")
+      .addRow(2, 2002, "item2002")
+      .withSv2()
+      .build();
+
+    final RowSet.SingleRowSet nonEmptyInputRowSet4 = 
operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(3, 3001, "item3001")
+      .addRow(3, 3002, "item3002")
+      .addRow(3, 3003, "item3003")
+      .addRow(4, 4000, "item4000")
+      .addRow(4, 4001, "item4001")
+      .withSv2()
+      .build();
+
+    // Third EMIT boundary batches
+    final RowSet.SingleRowSet nonEmptyInputRowSet5 = 
operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 10001, "item10001")
+      .addRow(1, 10002, "item10002")
+      .addRow(1, 10003, "item10003")
+      .withSv2()
+      .build();
+
+    // First EMIT boundary expected row sets
+    final RowSet expectedRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 100, "item100")
+      .addRow(1, 101, "item101")
+      .addRow(2, 200, "item200")
+      .addRow(2, 201, "item201")
+      .build();
+
+    // Second EMIT boundary expected row sets
+    final RowSet expectedRowSet3 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 1001, "item1001")
+      .addRow(1, 1002, "item1002")
+      .addRow(2, 2000, "item2000")
+      .addRow(2, 2001, "item2001")
+      .build();
+
+    final RowSet expectedRowSet4 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(3, 3001, "item3001")
+      .addRow(3, 3002, "item3002")
+      .addRow(4, 4000, "item4000")
+      .addRow(4, 4001, "item4001")
+      .build();
+
+    // Third EMIT boundary expected row sets
+    final RowSet expectedRowSet5 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 10001, "item10001")
+      .addRow(1, 10002, "item10002")
+      .build();
+
+    inputContainer.add(emptyWithSv2.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+    inputContainer.add(nonEmptyInputRowSet3.container());
+    inputContainer.add(nonEmptyInputRowSet4.container());
+    inputContainer.add(nonEmptyInputRowSet5.container());
+    inputContainer.add(emptyWithSv2.container());
+
+    inputContainerSv2.add(emptyWithSv2.getSv2());
+    inputContainerSv2.add(nonEmptyInputRowSet2.getSv2());
+    inputContainerSv2.add(nonEmptyInputRowSet3.getSv2());
+    inputContainerSv2.add(nonEmptyInputRowSet4.getSv2());
+    inputContainerSv2.add(nonEmptyInputRowSet5.getSv2());
+    inputContainerSv2.add(emptyWithSv2.getSv2());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    expectedOutcomes.add(RecordBatch.IterOutcome.OK);
+    expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    expectedRecordCounts.add(0);
+    expectedRecordCounts.add(expectedRowSet2.rowCount());
+    expectedRecordCounts.add(expectedRowSet3.rowCount());
+    expectedRecordCounts.add(expectedRowSet4.rowCount());
+    expectedRecordCounts.add(expectedRowSet5.rowCount());
+    expectedRecordCounts.add(0);
+
+    expectedRowSets.add(expectedRowSet2);
+    expectedRowSets.add(expectedRowSet3);
+    expectedRowSets.add(expectedRowSet4);
+    expectedRowSets.add(expectedRowSet5);
+
+    testPartitionLimitCommon(-5, 2);
+  }
+
+  /**
+   * Verifies {@link PartitionLimitRecordBatch} behaves correctly across EMIT 
boundary with single or multiple
+   * batches (with sv2) within each EMIT boundary. It resets it states 
correctly across EMIT boundary and then
+   * operates on all the batches within EMIT boundary at a time.
+   */
+  @Test
+  public void 
testPartitionLimit_MultipleEmit_SingleMultipleBatch_WithSV2_FilteredRows() {
+    final RowSet.SingleRowSet emptyWithSv2 = 
operatorFixture.rowSetBuilder(inputSchema)
+      .withSv2()
+      .build();
+
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = 
operatorFixture.rowSetBuilder(inputSchema)
+      .addSelection(false, 1, 100, "item100")
+      .addSelection(true, 1, 101, "item101")
+      .addSelection(false, 1, 102, "item102")
+      .addSelection(true, 1, 103, "item103")
+      .addSelection(false, 2, 200, "item200")
+      .addSelection(true, 2, 201, "item201")
+      .addSelection(true, 2, 202, "item202")
+      .withSv2()
+      .build();
+
+    // Second EMIT boundary batches
+    final RowSet.SingleRowSet nonEmptyInputRowSet3 = 
operatorFixture.rowSetBuilder(inputSchema)
+      .addSelection(false, 1, 1001, "item1001")
+      .addSelection(true, 1, 1002, "item1002")
+      .addSelection(true, 1, 1003, "item1003")
+      .addSelection(true, 2, 2000, "item2000")
+      .addSelection(false, 2, 2001, "item2001")
+      .addSelection(true, 2, 2002, "item2002")
+      .withSv2()
+      .build();
+
+    final RowSet.SingleRowSet nonEmptyInputRowSet4 = 
operatorFixture.rowSetBuilder(inputSchema)
+      .addSelection(true, 3, 3001, "item3001")
+      .addSelection(false, 3, 3002, "item3002")
+      .addSelection(true, 3, 3003, "item3003")
+      .addSelection(true, 4, 4000, "item4000")
+      .addSelection(true, 4, 4001, "item4001")
+      .withSv2()
+      .build();
+
+    // Third EMIT boundary batches
+    final RowSet.SingleRowSet nonEmptyInputRowSet5 = 
operatorFixture.rowSetBuilder(inputSchema)
+      .addSelection(true, 1, 10001, "item10001")
+      .addSelection(true, 1, 10002, "item10002")
+      .addSelection(false, 1, 10003, "item10003")
+      .withSv2()
+      .build();
+
+    // First EMIT boundary expected row sets
+    final RowSet expectedRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 101, "item101")
+      .addRow(1, 103, "item103")
+      .addRow(2, 201, "item201")
+      .addRow(2, 202, "item202")
+      .build();
+
+    // Second EMIT boundary expected row sets
+    final RowSet expectedRowSet3 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 1002, "item1002")
+      .addRow(1, 1003, "item1003")
+      .addRow(2, 2000, "item2000")
+      .addRow(2, 2002, "item2002")
+      .build();
+
+    final RowSet expectedRowSet4 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(3, 3001, "item3001")
+      .addRow(3, 3003, "item3003")
+      .addRow(4, 4000, "item4000")
+      .addRow(4, 4001, "item4001")
+      .build();
+
+    // Third EMIT boundary expected row sets
+    final RowSet expectedRowSet5 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 10001, "item10001")
+      .addRow(1, 10002, "item10002")
+      .build();
+
+    inputContainer.add(emptyWithSv2.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+    inputContainer.add(nonEmptyInputRowSet3.container());
+    inputContainer.add(nonEmptyInputRowSet4.container());
+    inputContainer.add(nonEmptyInputRowSet5.container());
+    inputContainer.add(emptyWithSv2.container());
+
+    inputContainerSv2.add(emptyWithSv2.getSv2());
+    inputContainerSv2.add(nonEmptyInputRowSet2.getSv2());
+    inputContainerSv2.add(nonEmptyInputRowSet3.getSv2());
+    inputContainerSv2.add(nonEmptyInputRowSet4.getSv2());
+    inputContainerSv2.add(nonEmptyInputRowSet5.getSv2());
+    inputContainerSv2.add(emptyWithSv2.getSv2());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    expectedOutcomes.add(RecordBatch.IterOutcome.OK);
+    expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    expectedRecordCounts.add(0);
+    expectedRecordCounts.add(expectedRowSet2.rowCount());
+    expectedRecordCounts.add(expectedRowSet3.rowCount());
+    expectedRecordCounts.add(expectedRowSet4.rowCount());
+    expectedRecordCounts.add(expectedRowSet5.rowCount());
+    expectedRecordCounts.add(0);
+
+    expectedRowSets.add(expectedRowSet2);
+    expectedRowSets.add(expectedRowSet3);
+    expectedRowSets.add(expectedRowSet4);
+    expectedRowSets.add(expectedRowSet5);
+
+    testPartitionLimitCommon(-5, 2);
+  }
+}

Reply via email to