This is an automated email from the ASF dual-hosted git repository. sorabh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
commit f8593997ec2ab5da96906e0e26df04e6ef73776e Author: Sorabh Hamirwasia <shamirwa...@maprtech.com> AuthorDate: Thu Jul 26 11:47:33 2018 -0700 DRILL-6635: PartitionLimit for Lateral/Unnest PartitionLimitBatch initial implementation Add unit tests for PartitionLimitBatch --- .../drill/exec/physical/config/PartitionLimit.java | 62 ++ .../exec/physical/impl/limit/LimitRecordBatch.java | 18 +- .../impl/limit/PartitionLimitBatchCreator.java | 36 + ...rdBatch.java => PartitionLimitRecordBatch.java} | 193 ++-- .../physical/impl/BaseTestOpBatchEmitOutcome.java | 5 + .../drill/exec/physical/impl/MockRecordBatch.java | 36 +- .../PartitionLimit/TestPartitionLimitBatch.java | 1022 ++++++++++++++++++++ 7 files changed, 1272 insertions(+), 100 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/PartitionLimit.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/PartitionLimit.java new file mode 100644 index 0000000..29f8bb2 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/PartitionLimit.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.config; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.base.PhysicalVisitor; +import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType; +import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; + +@JsonTypeName("partition-limit") +public class PartitionLimit extends Limit { + private final String partitionColumn; + + @JsonCreator + public PartitionLimit(@JsonProperty("child") PhysicalOperator child, @JsonProperty("first") Integer first, + @JsonProperty("last") Integer last, @JsonProperty("partitionColumn") String partitionColumn) { + super(child, first, last); + this.partitionColumn = partitionColumn; + } + + public String getPartitionColumn() { + return partitionColumn; + } + + @Override + protected PhysicalOperator getNewWithChild(PhysicalOperator child) { + return new PartitionLimit(child, getFirst(), getLast(), getPartitionColumn()); + } + + @Override + public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E { + return physicalVisitor.visitLimit(this, value); + } + + @Override + public SelectionVectorMode getSVMode() { + return SelectionVectorMode.TWO_BYTE; + } + + @Override + public int getOperatorType() { + return CoreOperatorType.PARTITION_LIMIT_VALUE; + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java index d28fd47..06f0fdb 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java @@ -71,6 +71,10 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> { for (VectorWrapper<?> wrapper : incoming) { wrapper.getValueVector().clear(); } + // clear memory for incoming sv (if any) + if (incomingSv != null) { + incomingSv.clear(); + } upStream = next(incoming); if (upStream == IterOutcome.OUT_OF_MEMORY) { return upStream; @@ -82,6 +86,12 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> { for (VectorWrapper<?> wrapper : incoming) { wrapper.getValueVector().clear(); } + + // clear memory for incoming sv (if any) + if (incomingSv != null) { + incomingSv.clear(); + } + refreshLimitState(); return upStream; } @@ -109,7 +119,7 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> { @Override protected boolean setupNewSchema() throws SchemaChangeException { - container.zeroVectors(); + container.clear(); transfers.clear(); for(final VectorWrapper<?> v : incoming) { @@ -181,6 +191,12 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> { outgoingSv.allocateNew(inputRecordCount); limit(inputRecordCount); } + + // clear memory for incoming sv (if any) + if (incomingSv != null) { + incomingSv.clear(); + } + return getFinalOutcome(false); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/PartitionLimitBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/PartitionLimitBatchCreator.java new file mode 100644 index 0000000..9c7ebd2 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/PartitionLimitBatchCreator.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.limit; + +import com.google.common.collect.Iterables; +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.exec.ops.ExecutorFragmentContext; +import org.apache.drill.exec.physical.config.PartitionLimit; +import org.apache.drill.exec.physical.impl.BatchCreator; +import org.apache.drill.exec.record.RecordBatch; + +import java.util.List; + +public class PartitionLimitBatchCreator implements BatchCreator<PartitionLimit> { + @Override + public PartitionLimitRecordBatch getBatch(ExecutorFragmentContext context, PartitionLimit config, + List<RecordBatch> children) + throws ExecutionSetupException { + return new PartitionLimitRecordBatch(config, context, Iterables.getOnlyElement(children)); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/PartitionLimitRecordBatch.java similarity index 55% copy from exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java copy to exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/PartitionLimitRecordBatch.java index d28fd47..0409980 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/PartitionLimitRecordBatch.java @@ -17,26 +17,29 @@ */ package org.apache.drill.exec.physical.impl.limit; -import java.util.List; - -import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; import org.apache.drill.exec.exception.OutOfMemoryException; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.ops.FragmentContext; -import org.apache.drill.exec.physical.config.Limit; +import org.apache.drill.exec.physical.config.PartitionLimit; import org.apache.drill.exec.record.AbstractSingleRecordBatch; import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.TransferPair; import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.record.selection.SelectionVector2; +import org.apache.drill.exec.vector.IntVector; -import com.google.common.collect.Lists; +import java.util.List; import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT; -import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE; -public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> { +/** + * Helps to perform limit in a partition within a record batch. Currently only integer type of partition column is + * supported. This is mainly used for Lateral/Unnest subquery where each output batch from Unnest will contain an + * implicit column for rowId for each row. + */ +public class PartitionLimitRecordBatch extends AbstractSingleRecordBatch<PartitionLimit> { // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LimitRecordBatch.class); private SelectionVector2 outgoingSv; @@ -45,10 +48,14 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> { // Start offset of the records private int recordStartOffset; private int numberOfRecords; - private boolean first = true; private final List<TransferPair> transfers = Lists.newArrayList(); - public LimitRecordBatch(Limit popConfig, FragmentContext context, RecordBatch incoming) + // Partition RowId which is currently being processed, this will help to handle cases when rows for a partition id + // flows across 2 batches + private int partitionId; + private IntVector partitionColumn; + + public PartitionLimitRecordBatch(PartitionLimit popConfig, FragmentContext context, RecordBatch incoming) throws OutOfMemoryException { super(popConfig, context, incoming); outgoingSv = new SelectionVector2(oContext.getAllocator()); @@ -56,42 +63,6 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> { } @Override - public IterOutcome innerNext() { - if (!first && !needMoreRecords(numberOfRecords)) { - outgoingSv.setRecordCount(0); - incoming.kill(true); - - IterOutcome upStream = next(incoming); - if (upStream == IterOutcome.OUT_OF_MEMORY) { - return upStream; - } - - while (upStream == IterOutcome.OK || upStream == IterOutcome.OK_NEW_SCHEMA) { - // Clear the memory for the incoming batch - for (VectorWrapper<?> wrapper : incoming) { - wrapper.getValueVector().clear(); - } - upStream = next(incoming); - if (upStream == IterOutcome.OUT_OF_MEMORY) { - return upStream; - } - } - // If EMIT that means leaf operator is UNNEST, in this case refresh the limit states and return EMIT. - if (upStream == EMIT) { - // Clear the memory for the incoming batch - for (VectorWrapper<?> wrapper : incoming) { - wrapper.getValueVector().clear(); - } - refreshLimitState(); - return upStream; - } - // other leaf operator behave as before. - return NONE; - } - return super.innerNext(); - } - - @Override public SelectionVector2 getSelectionVector2() { return outgoingSv; } @@ -104,18 +75,26 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> { @Override public void close() { outgoingSv.clear(); + transfers.clear(); super.close(); } @Override protected boolean setupNewSchema() throws SchemaChangeException { - container.zeroVectors(); + container.clear(); transfers.clear(); for(final VectorWrapper<?> v : incoming) { final TransferPair pair = v.getValueVector().makeTransferPair( container.addOrGet(v.getField(), callBack)); transfers.add(pair); + + // Hold the transfer pair target vector for partitionColumn, since before applying limit it transfer all rows + // from incoming to outgoing batch + String fieldName = v.getField().getName(); + if (fieldName.equals(popConfig.getPartitionColumn())) { + partitionColumn = (IntVector) pair.getTo(); + } } final BatchSchema.SelectionVectorMode svMode = incoming.getSchema().getSelectionVectorMode(); @@ -158,28 +137,31 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> { @Override protected IterOutcome doWork() { - if (first) { - first = false; - } final int inputRecordCount = incoming.getRecordCount(); if (inputRecordCount == 0) { setOutgoingRecordCount(0); + for (VectorWrapper vw : incoming) { + vw.clear(); + } + // Release buffer for sv2 (if any) + if (incomingSv != null) { + incomingSv.clear(); + } return getFinalOutcome(false); } - for(final TransferPair tp : transfers) { + for (final TransferPair tp : transfers) { tp.transfer(); } - // Check if current input record count is less than start offset. If yes then adjust the start offset since we - // have to ignore all these records and return empty batch. - if (inputRecordCount <= recordStartOffset) { - recordStartOffset -= inputRecordCount; - setOutgoingRecordCount(0); - } else { - // Allocate SV2 vectors for the record count size since we transfer all the vectors buffer from input record - // batch to output record batch and later an SV2Remover copies the needed records. - outgoingSv.allocateNew(inputRecordCount); - limit(inputRecordCount); + + // Allocate SV2 vectors for the record count size since we transfer all the vectors buffer from input record + // batch to output record batch and later an SV2Remover copies the needed records. + outgoingSv.allocateNew(inputRecordCount); + limit(inputRecordCount); + + // Release memory for incoming sv (if any) + if (incomingSv != null) { + incomingSv.clear(); } return getFinalOutcome(false); } @@ -191,62 +173,81 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> { * @param inputRecordCount - number of records in incoming batch */ private void limit(int inputRecordCount) { - int endRecordIndex; + boolean outputAllRecords = (numberOfRecords == Integer.MIN_VALUE); + + int svIndex = 0; + // If partitionId is not -1 that means it's set to previous batch last partitionId + partitionId = (partitionId == -1) ? getCurrentRowId(0) : partitionId; + + for (int i=0; i < inputRecordCount;) { + // Get rowId from current right row + int currentRowId = getCurrentRowId(i); + + if (partitionId == currentRowId) { + // Check if there is any start offset set for each partition and skip those records + if (recordStartOffset > 0) { + --recordStartOffset; + ++i; + continue; + } + + // Once the start offset records are skipped then consider rows until numberOfRecords is reached for that + // partition + if (outputAllRecords) { + updateOutputSV2(svIndex++, i); + } else if (numberOfRecords > 0) { + updateOutputSV2(svIndex++, i); + --numberOfRecords; + } + ++i; + } else { // now a new row with different partition id is found + refreshConfigParameter(); + partitionId = currentRowId; + } + } + + setOutgoingRecordCount(svIndex); + } - if (numberOfRecords == Integer.MIN_VALUE) { - endRecordIndex = inputRecordCount; + private void updateOutputSV2(int svIndex, int incomingIndex) { + if (incomingSv != null) { + outgoingSv.setIndex(svIndex, incomingSv.getIndex(incomingIndex)); } else { - endRecordIndex = Math.min(inputRecordCount, recordStartOffset + numberOfRecords); - numberOfRecords -= Math.max(0, endRecordIndex - recordStartOffset); + outgoingSv.setIndex(svIndex, (char) incomingIndex); } + } - int svIndex = 0; - for(int i = recordStartOffset; i < endRecordIndex; svIndex++, i++) { - if (incomingSv != null) { - outgoingSv.setIndex(svIndex, incomingSv.getIndex(i)); - } else { - outgoingSv.setIndex(svIndex, (char) i); - } + private int getCurrentRowId(int incomingIndex) { + if (incomingSv != null) { + return partitionColumn.getAccessor().get(incomingSv.getIndex(incomingIndex)); + } else { + return partitionColumn.getAccessor().get(incomingIndex); } - outgoingSv.setRecordCount(svIndex); - // Update the start offset - recordStartOffset = 0; } private void setOutgoingRecordCount(int outputCount) { outgoingSv.setRecordCount(outputCount); + container.setRecordCount(outputCount); } /** - * Method which returns if more output records are needed from LIMIT operator. When numberOfRecords is set to - * {@link Integer#MIN_VALUE} that means there is no end bound on LIMIT, so get all the records past start offset. - * @return - true - more output records is expected. - * false - limit bound is reached and no more record is expected + * Reset the states for recordStartOffset, numberOfRecords and based on the {@link PartitionLimit} passed to the + * operator. It also resets the partitionId since after EMIT outcome there will be new partitionId to consider. + * This method is called for the each EMIT outcome received no matter if limit is reached or not. */ - private boolean needMoreRecords(int recordsToRead) { - boolean readMore = true; - - Preconditions.checkState(recordsToRead == Integer.MIN_VALUE || recordsToRead >= 0, - String.format("Invalid value of numberOfRecords %d inside LimitRecordBatch", recordsToRead)); - - // Above check makes sure that either numberOfRecords has no bound or if it has bounds then either we have read - // all the records or still left to read some. - // Below check just verifies if there is bound on numberOfRecords and we have read all of it. - if (recordsToRead == 0) { - readMore = false; - } - return readMore; + private void refreshLimitState() { + refreshConfigParameter(); + partitionId = -1; } /** - * Reset the states for recordStartOffset and numberOfRecords based on the popConfig passed to the operator. - * This method is called for the outcome EMIT no matter if limit is reached or not. + * Only resets the recordStartOffset and numberOfRecord based on {@link PartitionLimit} passed to the operator. It + * is explicitly called after the limit for each partitionId is met or partitionId changes within an EMIT boundary. */ - private void refreshLimitState() { + private void refreshConfigParameter() { // Make sure startOffset is non-negative recordStartOffset = Math.max(0, popConfig.getFirst()); numberOfRecords = (popConfig.getLast() == null) ? Integer.MIN_VALUE : Math.max(0, popConfig.getLast()) - recordStartOffset; - first = true; } } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/BaseTestOpBatchEmitOutcome.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/BaseTestOpBatchEmitOutcome.java index cd24640..4eaca2b 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/BaseTestOpBatchEmitOutcome.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/BaseTestOpBatchEmitOutcome.java @@ -23,6 +23,7 @@ import org.apache.drill.exec.physical.unit.PhysicalOpUnitTestBase; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.metadata.TupleMetadata; +import org.apache.drill.exec.record.selection.SelectionVector2; import org.apache.drill.exec.store.mock.MockStorePOP; import org.apache.drill.test.rowSet.RowSet; import org.apache.drill.test.rowSet.schema.SchemaBuilder; @@ -48,6 +49,9 @@ public class BaseTestOpBatchEmitOutcome extends PhysicalOpUnitTestBase { // List of incoming containers protected final List<VectorContainer> inputContainer = new ArrayList<>(5); + // List of SV2's + protected final List<SelectionVector2> inputContainerSv2 = new ArrayList<>(5); + // List of incoming IterOutcomes protected final List<RecordBatch.IterOutcome> inputOutcomes = new ArrayList<>(5); @@ -79,6 +83,7 @@ public class BaseTestOpBatchEmitOutcome extends PhysicalOpUnitTestBase { nonEmptyInputRowSet.clear(); inputContainer.clear(); inputOutcomes.clear(); + inputContainerSv2.clear(); outputRecordCount = 0; } } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java index 0c43ab2..ed7af4c 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java @@ -38,6 +38,7 @@ public class MockRecordBatch implements CloseableRecordBatch { // These resources are owned by this RecordBatch protected VectorContainer container; + protected SelectionVector2 sv2; private int currentContainerIndex; private int currentOutcomeIndex; private boolean isDone; @@ -45,6 +46,7 @@ public class MockRecordBatch implements CloseableRecordBatch { // All the below resources are owned by caller private final List<VectorContainer> allTestContainers; + private List<SelectionVector2> allTestContainersSv2; private final List<IterOutcome> allOutcomes; private final FragmentContext context; protected final OperatorContext oContext; @@ -62,19 +64,32 @@ public class MockRecordBatch implements CloseableRecordBatch { this.currentContainerIndex = 0; this.currentOutcomeIndex = 0; this.isDone = false; + this.allTestContainersSv2 = null; + this.sv2 = null; + } + + public MockRecordBatch(FragmentContext context, OperatorContext oContext, + List<VectorContainer> testContainers, List<IterOutcome> iterOutcomes, + List<SelectionVector2> testContainersSv2, BatchSchema schema) { + this(context, oContext, testContainers, iterOutcomes, schema); + allTestContainersSv2 = testContainersSv2; + sv2 = (allTestContainersSv2 != null && allTestContainersSv2.size() > 0) ? new SelectionVector2(allocator) : null; } @Override - public void close() throws Exception { + public void close() { container.clear(); container.setRecordCount(0); currentContainerIndex = 0; currentOutcomeIndex = 0; + if (sv2 != null) { + sv2.clear(); + } } @Override public SelectionVector2 getSelectionVector2() { - return null; + return sv2; } @Override @@ -94,7 +109,7 @@ public class MockRecordBatch implements CloseableRecordBatch { @Override public int getRecordCount() { - return container.getRecordCount(); + return (sv2 == null) ? container.getRecordCount() : sv2.getCount(); } @Override @@ -103,6 +118,9 @@ public class MockRecordBatch implements CloseableRecordBatch { isDone = true; container.clear(); container.setRecordCount(0); + if (sv2 != null) { + sv2.clear(); + } } } @@ -142,6 +160,18 @@ public class MockRecordBatch implements CloseableRecordBatch { } container.transferIn(input); container.setRecordCount(recordCount); + + // Transfer the sv2 as well + final SelectionVector2 inputSv2 = + (allTestContainersSv2 != null && allTestContainersSv2.size() > 0) + ? allTestContainersSv2.get(currentContainerIndex) : null; + if (inputSv2 != null) { + sv2.allocateNewSafe(inputSv2.getCount()); + for (int i=0; i<inputSv2.getCount(); ++i) { + sv2.setIndex(i, inputSv2.getIndex(i)); + } + sv2.setRecordCount(inputSv2.getCount()); + } } if (currentOutcomeIndex < allOutcomes.size()) { diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/PartitionLimit/TestPartitionLimitBatch.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/PartitionLimit/TestPartitionLimitBatch.java new file mode 100644 index 0000000..574ff76 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/PartitionLimit/TestPartitionLimitBatch.java @@ -0,0 +1,1022 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.PartitionLimit; + +import org.apache.drill.categories.OperatorTest; +import org.apache.drill.exec.physical.config.PartitionLimit; +import org.apache.drill.exec.physical.impl.BaseTestOpBatchEmitOutcome; +import org.apache.drill.exec.physical.impl.MockRecordBatch; +import org.apache.drill.exec.physical.impl.limit.PartitionLimitRecordBatch; +import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.test.rowSet.IndirectRowSet; +import org.apache.drill.test.rowSet.RowSet; +import org.apache.drill.test.rowSet.RowSetComparison; +import org.junit.After; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertTrue; + +@Category(OperatorTest.class) +public class TestPartitionLimitBatch extends BaseTestOpBatchEmitOutcome { + + private static String PARTITION_COLUMN; + + // Holds reference to actual operator instance created for each tests + private static PartitionLimitRecordBatch limitBatch; + + // Lits of expected outcomes populated by each tests. Used to verify actual IterOutcome returned with next call on + // operator to expected outcome + private final List<RecordBatch.IterOutcome> expectedOutcomes = new ArrayList<>(); + + // List of expected row counts populated by each tests. Used to verify actual output row count to expected row count + private final List<Integer> expectedRecordCounts = new ArrayList<>(); + + // List of expected row sets populated by each tests. Used to verify actual output from operator to expected output + private final List<RowSet> expectedRowSets = new ArrayList<>(); + + @BeforeClass + public static void partitionLimitSetup() { + PARTITION_COLUMN = inputSchema.column(0).getName(); + } + + /** + * Cleanup method executed post each test + */ + @After + public void afterTestCleanup() { + // close limitBatch + limitBatch.close(); + + // Release memory from expectedRowSets + for (RowSet expectedRowSet : expectedRowSets) { + expectedRowSet.clear(); + } + expectedOutcomes.clear(); + expectedRecordCounts.clear(); + expectedRowSets.clear(); + } + + /** + * Common method used by all the tests for {@link PartitionLimitRecordBatch} below. It creates the MockRecordBatch + * and {@link PartitionLimitRecordBatch} with the populated containers and outcomes list in the test. It also + * verifies the expected outcomes list and record count populated by each test against each next() call to + * {@link PartitionLimitRecordBatch}. For cases when the expected record count is >0 it verifies the actual output + * returned by {@link PartitionLimitRecordBatch} with expected output rows. + * @param start - Start offset for {@link PartitionLimit} PopConfig + * @param end - End offset for {@link PartitionLimit} PopConfig + */ + private void testPartitionLimitCommon(Integer start, Integer end) { + final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext, + inputContainer, inputOutcomes, inputContainerSv2, inputContainer.get(0).getSchema()); + + final PartitionLimit limitConf = new PartitionLimit(null, start, end, PARTITION_COLUMN); + limitBatch = new PartitionLimitRecordBatch(limitConf, operatorFixture.getFragmentContext(), mockInputBatch); + + int i=0; + int expectedRowSetIndex = 0; + while (i < expectedOutcomes.size()) { + try { + assertTrue(expectedOutcomes.get(i) == limitBatch.next()); + assertTrue(expectedRecordCounts.get(i++) == limitBatch.getRecordCount()); + + if (limitBatch.getRecordCount() > 0) { + final RowSet actualRowSet = IndirectRowSet.fromSv2(limitBatch.getContainer(), + limitBatch.getSelectionVector2()); + new RowSetComparison(expectedRowSets.get(expectedRowSetIndex++)).verify(actualRowSet); + } + } finally { + limitBatch.getSelectionVector2().clear(); + limitBatch.getContainer().zeroVectors(); + } + } + } + + /** + * Verifies that empty batch with both OK_NEW_SCHEMA and EMIT outcome is not ignored by + * {@link PartitionLimitRecordBatch} and is passed to the downstream operator. + */ + @Test + public void testPartitionLimit_EmptyBatchEmitOutcome() { + inputContainer.add(emptyInputRowSet.container()); + inputContainer.add(emptyInputRowSet.container()); + + inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + inputOutcomes.add(RecordBatch.IterOutcome.EMIT); + + expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + expectedOutcomes.add(RecordBatch.IterOutcome.EMIT); + + expectedRecordCounts.add(0); + expectedRecordCounts.add(0); + + testPartitionLimitCommon(0, 1); + } + + /** + * Verifies {@link PartitionLimitRecordBatch} considers all the batch until it sees EMIT outcome and return output + * batch with data that meets the {@link PartitionLimitRecordBatch} criteria. + */ + @Test + public void testPartitionLimit_NonEmptyBatchEmitOutcome() { + inputContainer.add(emptyInputRowSet.container()); + inputContainer.add(nonEmptyInputRowSet.container()); + + inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + inputOutcomes.add(RecordBatch.IterOutcome.EMIT); + + expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + expectedOutcomes.add(RecordBatch.IterOutcome.EMIT); + + expectedRecordCounts.add(0); + expectedRecordCounts.add(1); + + RowSet expectedBatch = operatorFixture.rowSetBuilder(inputSchema) + .addRow(1, 10, "item1") + .build(); + expectedRowSets.add(expectedBatch); + + testPartitionLimitCommon(0, 1); + } + + /** + * Verifies that {@link PartitionLimitRecordBatch} batch operates on batches across EMIT boundary with fresh + * configuration. That is it considers partition column data separately for batches across EMIT boundary. + */ + @Test + public void testPartitionLimit_ResetsAfterFirstEmitOutcome() { + final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema) + .addRow(1, 100, "item100") + .addRow(1, 101, "item101") + .addRow(1, 102, "item102") + .addRow(2, 200, "item200") + .build(); + + final RowSet expectedRowSet1 = operatorFixture.rowSetBuilder(inputSchema) + .addRow(1, 10, "item1") + .build(); + final RowSet expectedRowSet2 = operatorFixture.rowSetBuilder(inputSchema) + .addRow(1, 100, "item100") + .addRow(2, 200, "item200") + .build(); + + inputContainer.add(nonEmptyInputRowSet.container()); + inputContainer.add(emptyInputRowSet.container()); + inputContainer.add(nonEmptyInputRowSet2.container()); + + inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + inputOutcomes.add(RecordBatch.IterOutcome.EMIT); + inputOutcomes.add(RecordBatch.IterOutcome.OK); + + + expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + expectedOutcomes.add(RecordBatch.IterOutcome.EMIT); + expectedOutcomes.add(RecordBatch.IterOutcome.OK); + expectedOutcomes.add(RecordBatch.IterOutcome.NONE); + + expectedRecordCounts.add(1); + expectedRecordCounts.add(0); + // Since in this input batch there is 2 different partitionId + expectedRecordCounts.add(2); + expectedRecordCounts.add(0); + + expectedRowSets.add(expectedRowSet1); + expectedRowSets.add(expectedRowSet2); + + testPartitionLimitCommon(0, 1); + } + + /** + * Verifies that when the {@link PartitionLimitRecordBatch} number of records is found with first incoming batch, + * then next empty incoming batch with OK outcome is ignored, but the empty EMIT outcome batch is not ignored. + * Empty incoming batch with EMIT outcome produces empty output batch with EMIT outcome. + */ + @Test + public void testPartitionLimit_NonEmptyFirst_EmptyOKEmitOutcome() { + inputContainer.add(nonEmptyInputRowSet.container()); + inputContainer.add(emptyInputRowSet.container()); + inputContainer.add(emptyInputRowSet.container()); + inputContainer.add(emptyInputRowSet.container()); + + inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + inputOutcomes.add(RecordBatch.IterOutcome.OK); + inputOutcomes.add(RecordBatch.IterOutcome.EMIT); + inputOutcomes.add(RecordBatch.IterOutcome.NONE); + + expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + expectedOutcomes.add(RecordBatch.IterOutcome.EMIT); + expectedOutcomes.add(RecordBatch.IterOutcome.NONE); + + expectedRecordCounts.add(1); + expectedRecordCounts.add(0); + expectedRecordCounts.add(0); + + final RowSet expectedRowSet1 = operatorFixture.rowSetBuilder(inputSchema) + .addRow(1, 10, "item1") + .build(); + expectedRowSets.add(expectedRowSet1); + + testPartitionLimitCommon(0, 1); + } + + /** + * Verifies that {@link PartitionLimitRecordBatch} refreshes it's state after seeing first EMIT outcome and works on + * data batches following it as new set's of incoming batch and apply the partition limit rule from fresh on those. + * So for first set of batches with OK_NEW_SCHEMA and EMIT outcome the total number of records received being less + * than limit condition, it still produces an output with that many records for each partition key (in this case 1 + * even though limit number of records is 2). + * + * After seeing EMIT, it refreshes it's state and operate on next input batches to again return limit number of + * records per partition id. So for 3rd batch with 6 records and 3 partition id and with EMIT outcome it produces an + * output batch with <=2 records for each partition id. + */ + @Test + public void testPartitionLimit_AcrossEmitOutcome() { + final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema) + .addRow(1, 100, "item100") + .addRow(1, 101, "item101") + .addRow(1, 102, "item102") + .addRow(2, 200, "item200") + .addRow(3, 300, "item300") + .addRow(3, 301, "item301") + .build(); + + final RowSet expectedRows1 = operatorFixture.rowSetBuilder(inputSchema) + .addRow(1, 10, "item1") + .build(); + + final RowSet.SingleRowSet expectedRows2 = operatorFixture.rowSetBuilder(inputSchema) + .addRow(1, 100, "item100") + .addRow(1, 101, "item101") + .addRow(2, 200, "item200") + .addRow(3, 300, "item300") + .addRow(3, 301, "item301") + .build(); + + inputContainer.add(nonEmptyInputRowSet.container()); + inputContainer.add(emptyInputRowSet.container()); + inputContainer.add(nonEmptyInputRowSet2.container()); + inputContainer.add(emptyInputRowSet.container()); + + inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + inputOutcomes.add(RecordBatch.IterOutcome.EMIT); + inputOutcomes.add(RecordBatch.IterOutcome.EMIT); + + expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + expectedOutcomes.add(RecordBatch.IterOutcome.EMIT); + expectedOutcomes.add(RecordBatch.IterOutcome.EMIT); + expectedOutcomes.add(RecordBatch.IterOutcome.NONE); + + expectedRecordCounts.add(expectedRows1.rowCount()); + expectedRecordCounts.add(0); + expectedRecordCounts.add(expectedRows2.rowCount()); + expectedRecordCounts.add(0); + + expectedRowSets.add(expectedRows1); + expectedRowSets.add(expectedRows2); + + testPartitionLimitCommon(0, 2); + } + + /** + * Verifies that {@link PartitionLimitRecordBatch} considers same partition id across batches but within EMIT + * boundary to impose limit condition. + */ + @Test + public void testPartitionLimit_PartitionIdSpanningAcrossBatches() { + + final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema) + .addRow(1, 100, "item100") + .addRow(1, 101, "item101") + .addRow(1, 102, "item102") + .addRow(1, 103, "item103") + .addRow(2, 200, "item200") + .addRow(2, 201, "item201") + .addRow(2, 202, "item202") + .build(); + + final RowSet expectedRowSet1 = operatorFixture.rowSetBuilder(inputSchema) + .addRow(1, 10, "item1") + .build(); + + final RowSet expectedRowSet2 = operatorFixture.rowSetBuilder(inputSchema) + .addRow(2, 200, "item200") + .build(); + + inputContainer.add(nonEmptyInputRowSet.container()); + inputContainer.add(emptyInputRowSet.container()); + inputContainer.add(nonEmptyInputRowSet2.container()); + inputContainer.add(emptyInputRowSet.container()); + + inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + inputOutcomes.add(RecordBatch.IterOutcome.OK); + inputOutcomes.add(RecordBatch.IterOutcome.OK); + inputOutcomes.add(RecordBatch.IterOutcome.EMIT); + + expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + // second OK batch is consumed by abstractRecordBatch since it's empty + expectedOutcomes.add(RecordBatch.IterOutcome.OK); + expectedOutcomes.add(RecordBatch.IterOutcome.EMIT); + + expectedRecordCounts.add(1); + expectedRecordCounts.add(1); + expectedRecordCounts.add(0); + + expectedRowSets.add(expectedRowSet1); + expectedRowSets.add(expectedRowSet2); + + testPartitionLimitCommon(0 ,1); + } + + @Test + public void testPartitionLimit_PartitionIdSpanningAcrossBatches_WithOffset() { + final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema) + .addRow(1, 100, "item100") + .addRow(1, 101, "item101") + .addRow(1, 102, "item102") + .addRow(1, 103, "item103") + .addRow(2, 200, "item200") + .addRow(2, 201, "item201") + .addRow(2, 202, "item202") + .build(); + + final RowSet expectedRowSet1 = operatorFixture.rowSetBuilder(inputSchema) + .addRow(1, 101, "item101") + .addRow(2, 202, "item202") + .build(); + + inputContainer.add(nonEmptyInputRowSet.container()); + inputContainer.add(emptyInputRowSet.container()); + inputContainer.add(nonEmptyInputRowSet2.container()); + inputContainer.add(emptyInputRowSet.container()); + + inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + inputOutcomes.add(RecordBatch.IterOutcome.OK); + inputOutcomes.add(RecordBatch.IterOutcome.OK); + inputOutcomes.add(RecordBatch.IterOutcome.EMIT); + + expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + // second OK batch is consumed by abstractRecordBatch since it's empty + expectedOutcomes.add(RecordBatch.IterOutcome.OK); + expectedOutcomes.add(RecordBatch.IterOutcome.EMIT); + + expectedRecordCounts.add(0); + expectedRecordCounts.add(2); + expectedRecordCounts.add(0); + + expectedRowSets.add(expectedRowSet1); + + testPartitionLimitCommon(2 ,3); + } + + /** + * Verifies {@link PartitionLimitRecordBatch} works correctly in cases a partition id spans across batches and + * limit condition is met by picking records from multiple batch for same partition id. + */ + @Test + public void testPartitionLimit_PartitionIdSelectedAcrossBatches() { + final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema) + .addRow(1, 100, "item100") + .addRow(1, 101, "item101") + .addRow(1, 102, "item102") + .addRow(1, 103, "item103") + .addRow(2, 200, "item200") + .addRow(2, 201, "item201") + .addRow(2, 202, "item202") + .build(); + + final RowSet expectedRowSet1 = operatorFixture.rowSetBuilder(inputSchema) + .addRow(1, 10, "item1") + .build(); + + final RowSet expectedRowSet2 = operatorFixture.rowSetBuilder(inputSchema) + .addRow(1, 100, "item100") + .addRow(1, 101, "item101") + .addRow(1, 102, "item102") + .addRow(1, 103, "item103") + .addRow(2, 200, "item200") + .addRow(2, 201, "item201") + .addRow(2, 202, "item202") + .build(); + + inputContainer.add(nonEmptyInputRowSet.container()); + inputContainer.add(emptyInputRowSet.container()); + inputContainer.add(nonEmptyInputRowSet2.container()); + inputContainer.add(emptyInputRowSet.container()); + + inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + inputOutcomes.add(RecordBatch.IterOutcome.OK); + inputOutcomes.add(RecordBatch.IterOutcome.OK); + inputOutcomes.add(RecordBatch.IterOutcome.EMIT); + + expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + // second OK batch is consumed by abstractRecordBatch since it's empty + expectedOutcomes.add(RecordBatch.IterOutcome.OK); + expectedOutcomes.add(RecordBatch.IterOutcome.EMIT); + + expectedRecordCounts.add(expectedRowSet1.rowCount()); + expectedRecordCounts.add(expectedRowSet2.rowCount()); + expectedRecordCounts.add(0); + + expectedRowSets.add(expectedRowSet1); + expectedRowSets.add(expectedRowSet2); + + testPartitionLimitCommon(0 ,5); + } + + /** + * Verifies {@link PartitionLimitRecordBatch} works correctly in cases where start offset is such that all the + * records of a partition id is ignored but records in other partition id is selected. + */ + @Test + public void testPartitionLimit_IgnoreOnePartitionIdWithOffset() { + final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema) + .addRow(1, 100, "item100") + .addRow(1, 101, "item101") + .addRow(1, 102, "item102") + .addRow(1, 103, "item103") + .addRow(2, 200, "item200") + .addRow(2, 201, "item201") + .addRow(2, 202, "item202") + .build(); + + final RowSet expectedRowSet1 = operatorFixture.rowSetBuilder(inputSchema) + .addRow(1, 102, "item102") + .addRow(1, 103, "item103") + .build(); + + inputContainer.add(nonEmptyInputRowSet.container()); + inputContainer.add(emptyInputRowSet.container()); + inputContainer.add(nonEmptyInputRowSet2.container()); + inputContainer.add(emptyInputRowSet.container()); + + inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + inputOutcomes.add(RecordBatch.IterOutcome.OK); + inputOutcomes.add(RecordBatch.IterOutcome.OK); + inputOutcomes.add(RecordBatch.IterOutcome.EMIT); + + expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + // second OK batch is consumed by abstractRecordBatch since it's empty + expectedOutcomes.add(RecordBatch.IterOutcome.OK); + expectedOutcomes.add(RecordBatch.IterOutcome.EMIT); + + expectedRecordCounts.add(0); + expectedRecordCounts.add(expectedRowSet1.rowCount()); + expectedRecordCounts.add(0); + + expectedRowSets.add(expectedRowSet1); + + testPartitionLimitCommon(3, 5); + } + + @Test + public void testPartitionLimit_LargeOffsetIgnoreAllRecords() { + final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema) + .addRow(1, 100, "item100") + .addRow(1, 101, "item101") + .addRow(1, 102, "item102") + .addRow(1, 103, "item103") + .addRow(2, 200, "item200") + .addRow(2, 201, "item201") + .addRow(2, 202, "item202") + .build(); + + inputContainer.add(nonEmptyInputRowSet.container()); + inputContainer.add(emptyInputRowSet.container()); + inputContainer.add(nonEmptyInputRowSet2.container()); + inputContainer.add(emptyInputRowSet.container()); + + inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + inputOutcomes.add(RecordBatch.IterOutcome.OK); + inputOutcomes.add(RecordBatch.IterOutcome.OK); + inputOutcomes.add(RecordBatch.IterOutcome.EMIT); + + expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + // second OK batch is consumed by abstractRecordBatch since it's empty + expectedOutcomes.add(RecordBatch.IterOutcome.OK); + expectedOutcomes.add(RecordBatch.IterOutcome.EMIT); + + expectedRecordCounts.add(0); + expectedRecordCounts.add(0); + expectedRecordCounts.add(0); + + testPartitionLimitCommon(5, 6); + } + + /** + * Verifies {@link PartitionLimitRecordBatch} works correctly when start and end offset is same. In this case it + * works as Limit 0 scenario where it will not output any rows for any partition id across batches. + */ + @Test + public void testPartitionLimit_Limit0() { + final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema) + .addRow(1, 100, "item100") + .addRow(1, 101, "item101") + .addRow(1, 102, "item102") + .addRow(1, 103, "item103") + .addRow(2, 200, "item200") + .addRow(2, 201, "item201") + .addRow(2, 202, "item202") + .build(); + + inputContainer.add(nonEmptyInputRowSet.container()); + inputContainer.add(emptyInputRowSet.container()); + inputContainer.add(nonEmptyInputRowSet2.container()); + inputContainer.add(emptyInputRowSet.container()); + + inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + inputOutcomes.add(RecordBatch.IterOutcome.OK); + inputOutcomes.add(RecordBatch.IterOutcome.OK); + inputOutcomes.add(RecordBatch.IterOutcome.EMIT); + + expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + // second OK batch is consumed by abstractRecordBatch since it's empty + expectedOutcomes.add(RecordBatch.IterOutcome.OK); + expectedOutcomes.add(RecordBatch.IterOutcome.EMIT); + + expectedRecordCounts.add(0); + expectedRecordCounts.add(0); + expectedRecordCounts.add(0); + + testPartitionLimitCommon(0, 0); + } + + /** + * Verifies {@link PartitionLimitRecordBatch} works correctly for cases where no end offset is mentioned. This + * necessary means selecting all the records in a partition. + */ + @Test + public void testPartitionLimit_NoLimit() { + final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema) + .addRow(1, 100, "item100") + .addRow(1, 101, "item101") + .addRow(1, 102, "item102") + .addRow(1, 103, "item103") + .addRow(2, 200, "item200") + .addRow(2, 201, "item201") + .addRow(2, 202, "item202") + .build(); + + final RowSet expectedRowSet1 = operatorFixture.rowSetBuilder(inputSchema) + .addRow(1, 10, "item1") + .build(); + + final RowSet expectedRowSet2 = operatorFixture.rowSetBuilder(inputSchema) + .addRow(1, 100, "item100") + .addRow(1, 101, "item101") + .addRow(1, 102, "item102") + .addRow(1, 103, "item103") + .addRow(2, 200, "item200") + .addRow(2, 201, "item201") + .addRow(2, 202, "item202") + .build(); + + inputContainer.add(nonEmptyInputRowSet.container()); + inputContainer.add(emptyInputRowSet.container()); + inputContainer.add(nonEmptyInputRowSet2.container()); + inputContainer.add(emptyInputRowSet.container()); + + inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + inputOutcomes.add(RecordBatch.IterOutcome.OK); + inputOutcomes.add(RecordBatch.IterOutcome.OK); + inputOutcomes.add(RecordBatch.IterOutcome.EMIT); + + expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + // second OK batch is consumed by abstractRecordBatch since it's empty + expectedOutcomes.add(RecordBatch.IterOutcome.OK); + expectedOutcomes.add(RecordBatch.IterOutcome.EMIT); + + expectedRecordCounts.add(expectedRowSet1.rowCount()); + expectedRecordCounts.add(expectedRowSet2.rowCount()); + expectedRecordCounts.add(0); + + expectedRowSets.add(expectedRowSet1); + expectedRowSets.add(expectedRowSet2); + + testPartitionLimitCommon(0, null); + } + + /** + * Verifies {@link PartitionLimitRecordBatch} takes care of provided negative start offset correctly + */ + @Test + public void testPartitionLimit_NegativeOffset() { + final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema) + .addRow(1, 100, "item100") + .addRow(1, 101, "item101") + .addRow(1, 102, "item102") + .addRow(1, 103, "item103") + .addRow(2, 200, "item200") + .addRow(2, 201, "item201") + .addRow(2, 202, "item202") + .build(); + + final RowSet expectedRowSet1 = operatorFixture.rowSetBuilder(inputSchema) + .addRow(1, 10, "item1") + .build(); + + final RowSet expectedRowSet2 = operatorFixture.rowSetBuilder(inputSchema) + .addRow(1, 100, "item100") + .addRow(2, 200, "item200") + .addRow(2, 201, "item201") + .build(); + + inputContainer.add(nonEmptyInputRowSet.container()); + inputContainer.add(emptyInputRowSet.container()); + inputContainer.add(nonEmptyInputRowSet2.container()); + inputContainer.add(emptyInputRowSet.container()); + + inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + inputOutcomes.add(RecordBatch.IterOutcome.OK); + inputOutcomes.add(RecordBatch.IterOutcome.OK); + inputOutcomes.add(RecordBatch.IterOutcome.EMIT); + + expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + // second OK batch is consumed by abstractRecordBatch since it's empty + expectedOutcomes.add(RecordBatch.IterOutcome.OK); + expectedOutcomes.add(RecordBatch.IterOutcome.EMIT); + + expectedRecordCounts.add(expectedRowSet1.rowCount()); + expectedRecordCounts.add(expectedRowSet2.rowCount()); + expectedRecordCounts.add(0); + + expectedRowSets.add(expectedRowSet1); + expectedRowSets.add(expectedRowSet2); + + testPartitionLimitCommon(-5, 2); + } + + /** + * Verifies {@link PartitionLimitRecordBatch} behaves correctly across EMIT boundary with single or multiple + * batches within each EMIT boundary. It resets it states correctly across EMIT boundary and then operates on all + * the batches within EMIT boundary at a time. + */ + @Test + public void testPartitionLimit_MultipleEmit_SingleMultipleBatch() { + final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema) + .addRow(1, 100, "item100") + .addRow(1, 101, "item101") + .addRow(1, 102, "item102") + .addRow(1, 103, "item103") + .addRow(2, 200, "item200") + .addRow(2, 201, "item201") + .addRow(2, 202, "item202") + .build(); + + // Second EMIT boundary batches + final RowSet.SingleRowSet nonEmptyInputRowSet3 = operatorFixture.rowSetBuilder(inputSchema) + .addRow(1, 1001, "item1001") + .addRow(1, 1002, "item1002") + .addRow(1, 1003, "item1003") + .addRow(2, 2000, "item2000") + .addRow(2, 2001, "item2001") + .addRow(2, 2002, "item2002") + .build(); + + final RowSet.SingleRowSet nonEmptyInputRowSet4 = operatorFixture.rowSetBuilder(inputSchema) + .addRow(3, 3001, "item3001") + .addRow(3, 3002, "item3002") + .addRow(3, 3003, "item3003") + .addRow(4, 4000, "item4000") + .addRow(4, 4001, "item4001") + .build(); + + // Third EMIT boundary batches + final RowSet.SingleRowSet nonEmptyInputRowSet5 = operatorFixture.rowSetBuilder(inputSchema) + .addRow(1, 10001, "item10001") + .addRow(1, 10002, "item10002") + .addRow(1, 10003, "item10003") + .build(); + + // First EMIT boundary expected rowsets + final RowSet expectedRowSet1 = operatorFixture.rowSetBuilder(inputSchema) + .addRow(1, 10, "item1") + .build(); + + final RowSet expectedRowSet2 = operatorFixture.rowSetBuilder(inputSchema) + .addRow(1, 100, "item100") + .addRow(2, 200, "item200") + .addRow(2, 201, "item201") + .build(); + + // Second EMIT boundary expected rowsets + final RowSet expectedRowSet3 = operatorFixture.rowSetBuilder(inputSchema) + .addRow(1, 1001, "item1001") + .addRow(1, 1002, "item1002") + .addRow(2, 2000, "item2000") + .addRow(2, 2001, "item2001") + .build(); + + final RowSet expectedRowSet4 = operatorFixture.rowSetBuilder(inputSchema) + .addRow(3, 3001, "item3001") + .addRow(3, 3002, "item3002") + .addRow(4, 4000, "item4000") + .addRow(4, 4001, "item4001") + .build(); + + // Third EMIT boundary expected rowsets + final RowSet expectedRowSet5 = operatorFixture.rowSetBuilder(inputSchema) + .addRow(1, 10001, "item10001") + .addRow(1, 10002, "item10002") + .build(); + + inputContainer.add(nonEmptyInputRowSet.container()); + inputContainer.add(nonEmptyInputRowSet2.container()); + inputContainer.add(nonEmptyInputRowSet3.container()); + inputContainer.add(nonEmptyInputRowSet4.container()); + inputContainer.add(nonEmptyInputRowSet5.container()); + inputContainer.add(emptyInputRowSet.container()); + + inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + inputOutcomes.add(RecordBatch.IterOutcome.EMIT); + inputOutcomes.add(RecordBatch.IterOutcome.OK); + inputOutcomes.add(RecordBatch.IterOutcome.EMIT); + inputOutcomes.add(RecordBatch.IterOutcome.EMIT); + inputOutcomes.add(RecordBatch.IterOutcome.EMIT); + + expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + expectedOutcomes.add(RecordBatch.IterOutcome.EMIT); + expectedOutcomes.add(RecordBatch.IterOutcome.OK); + expectedOutcomes.add(RecordBatch.IterOutcome.EMIT); + expectedOutcomes.add(RecordBatch.IterOutcome.EMIT); + expectedOutcomes.add(RecordBatch.IterOutcome.EMIT); + + expectedRecordCounts.add(expectedRowSet1.rowCount()); + expectedRecordCounts.add(expectedRowSet2.rowCount()); + expectedRecordCounts.add(expectedRowSet3.rowCount()); + expectedRecordCounts.add(expectedRowSet4.rowCount()); + expectedRecordCounts.add(expectedRowSet5.rowCount()); + expectedRecordCounts.add(0); + + expectedRowSets.add(expectedRowSet1); + expectedRowSets.add(expectedRowSet2); + expectedRowSets.add(expectedRowSet3); + expectedRowSets.add(expectedRowSet4); + expectedRowSets.add(expectedRowSet5); + + testPartitionLimitCommon(-5, 2); + } + + /** + * Verifies {@link PartitionLimitRecordBatch} behaves correctly across EMIT boundary with single or multiple + * batches (with sv2) within each EMIT boundary. It resets it states correctly across EMIT boundary and then + * operates on all the batches within EMIT boundary at a time. + */ + @Test + public void testPartitionLimit_MultipleEmit_SingleMultipleBatch_WithSV2() { + final RowSet.SingleRowSet emptyWithSv2 = operatorFixture.rowSetBuilder(inputSchema) + .withSv2() + .build(); + + final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema) + .addRow(1, 100, "item100") + .addRow(1, 101, "item101") + .addRow(1, 102, "item102") + .addRow(1, 103, "item103") + .addRow(2, 200, "item200") + .addRow(2, 201, "item201") + .addRow(2, 202, "item202") + .withSv2() + .build(); + + // Second EMIT boundary batches + final RowSet.SingleRowSet nonEmptyInputRowSet3 = operatorFixture.rowSetBuilder(inputSchema) + .addRow(1, 1001, "item1001") + .addRow(1, 1002, "item1002") + .addRow(1, 1003, "item1003") + .addRow(2, 2000, "item2000") + .addRow(2, 2001, "item2001") + .addRow(2, 2002, "item2002") + .withSv2() + .build(); + + final RowSet.SingleRowSet nonEmptyInputRowSet4 = operatorFixture.rowSetBuilder(inputSchema) + .addRow(3, 3001, "item3001") + .addRow(3, 3002, "item3002") + .addRow(3, 3003, "item3003") + .addRow(4, 4000, "item4000") + .addRow(4, 4001, "item4001") + .withSv2() + .build(); + + // Third EMIT boundary batches + final RowSet.SingleRowSet nonEmptyInputRowSet5 = operatorFixture.rowSetBuilder(inputSchema) + .addRow(1, 10001, "item10001") + .addRow(1, 10002, "item10002") + .addRow(1, 10003, "item10003") + .withSv2() + .build(); + + // First EMIT boundary expected row sets + final RowSet expectedRowSet2 = operatorFixture.rowSetBuilder(inputSchema) + .addRow(1, 100, "item100") + .addRow(1, 101, "item101") + .addRow(2, 200, "item200") + .addRow(2, 201, "item201") + .build(); + + // Second EMIT boundary expected row sets + final RowSet expectedRowSet3 = operatorFixture.rowSetBuilder(inputSchema) + .addRow(1, 1001, "item1001") + .addRow(1, 1002, "item1002") + .addRow(2, 2000, "item2000") + .addRow(2, 2001, "item2001") + .build(); + + final RowSet expectedRowSet4 = operatorFixture.rowSetBuilder(inputSchema) + .addRow(3, 3001, "item3001") + .addRow(3, 3002, "item3002") + .addRow(4, 4000, "item4000") + .addRow(4, 4001, "item4001") + .build(); + + // Third EMIT boundary expected row sets + final RowSet expectedRowSet5 = operatorFixture.rowSetBuilder(inputSchema) + .addRow(1, 10001, "item10001") + .addRow(1, 10002, "item10002") + .build(); + + inputContainer.add(emptyWithSv2.container()); + inputContainer.add(nonEmptyInputRowSet2.container()); + inputContainer.add(nonEmptyInputRowSet3.container()); + inputContainer.add(nonEmptyInputRowSet4.container()); + inputContainer.add(nonEmptyInputRowSet5.container()); + inputContainer.add(emptyWithSv2.container()); + + inputContainerSv2.add(emptyWithSv2.getSv2()); + inputContainerSv2.add(nonEmptyInputRowSet2.getSv2()); + inputContainerSv2.add(nonEmptyInputRowSet3.getSv2()); + inputContainerSv2.add(nonEmptyInputRowSet4.getSv2()); + inputContainerSv2.add(nonEmptyInputRowSet5.getSv2()); + inputContainerSv2.add(emptyWithSv2.getSv2()); + + inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + inputOutcomes.add(RecordBatch.IterOutcome.EMIT); + inputOutcomes.add(RecordBatch.IterOutcome.OK); + inputOutcomes.add(RecordBatch.IterOutcome.EMIT); + inputOutcomes.add(RecordBatch.IterOutcome.EMIT); + inputOutcomes.add(RecordBatch.IterOutcome.EMIT); + + expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + expectedOutcomes.add(RecordBatch.IterOutcome.EMIT); + expectedOutcomes.add(RecordBatch.IterOutcome.OK); + expectedOutcomes.add(RecordBatch.IterOutcome.EMIT); + expectedOutcomes.add(RecordBatch.IterOutcome.EMIT); + expectedOutcomes.add(RecordBatch.IterOutcome.EMIT); + + expectedRecordCounts.add(0); + expectedRecordCounts.add(expectedRowSet2.rowCount()); + expectedRecordCounts.add(expectedRowSet3.rowCount()); + expectedRecordCounts.add(expectedRowSet4.rowCount()); + expectedRecordCounts.add(expectedRowSet5.rowCount()); + expectedRecordCounts.add(0); + + expectedRowSets.add(expectedRowSet2); + expectedRowSets.add(expectedRowSet3); + expectedRowSets.add(expectedRowSet4); + expectedRowSets.add(expectedRowSet5); + + testPartitionLimitCommon(-5, 2); + } + + /** + * Verifies {@link PartitionLimitRecordBatch} behaves correctly across EMIT boundary with single or multiple + * batches (with sv2) within each EMIT boundary. It resets it states correctly across EMIT boundary and then + * operates on all the batches within EMIT boundary at a time. + */ + @Test + public void testPartitionLimit_MultipleEmit_SingleMultipleBatch_WithSV2_FilteredRows() { + final RowSet.SingleRowSet emptyWithSv2 = operatorFixture.rowSetBuilder(inputSchema) + .withSv2() + .build(); + + final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema) + .addSelection(false, 1, 100, "item100") + .addSelection(true, 1, 101, "item101") + .addSelection(false, 1, 102, "item102") + .addSelection(true, 1, 103, "item103") + .addSelection(false, 2, 200, "item200") + .addSelection(true, 2, 201, "item201") + .addSelection(true, 2, 202, "item202") + .withSv2() + .build(); + + // Second EMIT boundary batches + final RowSet.SingleRowSet nonEmptyInputRowSet3 = operatorFixture.rowSetBuilder(inputSchema) + .addSelection(false, 1, 1001, "item1001") + .addSelection(true, 1, 1002, "item1002") + .addSelection(true, 1, 1003, "item1003") + .addSelection(true, 2, 2000, "item2000") + .addSelection(false, 2, 2001, "item2001") + .addSelection(true, 2, 2002, "item2002") + .withSv2() + .build(); + + final RowSet.SingleRowSet nonEmptyInputRowSet4 = operatorFixture.rowSetBuilder(inputSchema) + .addSelection(true, 3, 3001, "item3001") + .addSelection(false, 3, 3002, "item3002") + .addSelection(true, 3, 3003, "item3003") + .addSelection(true, 4, 4000, "item4000") + .addSelection(true, 4, 4001, "item4001") + .withSv2() + .build(); + + // Third EMIT boundary batches + final RowSet.SingleRowSet nonEmptyInputRowSet5 = operatorFixture.rowSetBuilder(inputSchema) + .addSelection(true, 1, 10001, "item10001") + .addSelection(true, 1, 10002, "item10002") + .addSelection(false, 1, 10003, "item10003") + .withSv2() + .build(); + + // First EMIT boundary expected row sets + final RowSet expectedRowSet2 = operatorFixture.rowSetBuilder(inputSchema) + .addRow(1, 101, "item101") + .addRow(1, 103, "item103") + .addRow(2, 201, "item201") + .addRow(2, 202, "item202") + .build(); + + // Second EMIT boundary expected row sets + final RowSet expectedRowSet3 = operatorFixture.rowSetBuilder(inputSchema) + .addRow(1, 1002, "item1002") + .addRow(1, 1003, "item1003") + .addRow(2, 2000, "item2000") + .addRow(2, 2002, "item2002") + .build(); + + final RowSet expectedRowSet4 = operatorFixture.rowSetBuilder(inputSchema) + .addRow(3, 3001, "item3001") + .addRow(3, 3003, "item3003") + .addRow(4, 4000, "item4000") + .addRow(4, 4001, "item4001") + .build(); + + // Third EMIT boundary expected row sets + final RowSet expectedRowSet5 = operatorFixture.rowSetBuilder(inputSchema) + .addRow(1, 10001, "item10001") + .addRow(1, 10002, "item10002") + .build(); + + inputContainer.add(emptyWithSv2.container()); + inputContainer.add(nonEmptyInputRowSet2.container()); + inputContainer.add(nonEmptyInputRowSet3.container()); + inputContainer.add(nonEmptyInputRowSet4.container()); + inputContainer.add(nonEmptyInputRowSet5.container()); + inputContainer.add(emptyWithSv2.container()); + + inputContainerSv2.add(emptyWithSv2.getSv2()); + inputContainerSv2.add(nonEmptyInputRowSet2.getSv2()); + inputContainerSv2.add(nonEmptyInputRowSet3.getSv2()); + inputContainerSv2.add(nonEmptyInputRowSet4.getSv2()); + inputContainerSv2.add(nonEmptyInputRowSet5.getSv2()); + inputContainerSv2.add(emptyWithSv2.getSv2()); + + inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + inputOutcomes.add(RecordBatch.IterOutcome.EMIT); + inputOutcomes.add(RecordBatch.IterOutcome.OK); + inputOutcomes.add(RecordBatch.IterOutcome.EMIT); + inputOutcomes.add(RecordBatch.IterOutcome.EMIT); + inputOutcomes.add(RecordBatch.IterOutcome.EMIT); + + expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + expectedOutcomes.add(RecordBatch.IterOutcome.EMIT); + expectedOutcomes.add(RecordBatch.IterOutcome.OK); + expectedOutcomes.add(RecordBatch.IterOutcome.EMIT); + expectedOutcomes.add(RecordBatch.IterOutcome.EMIT); + expectedOutcomes.add(RecordBatch.IterOutcome.EMIT); + + expectedRecordCounts.add(0); + expectedRecordCounts.add(expectedRowSet2.rowCount()); + expectedRecordCounts.add(expectedRowSet3.rowCount()); + expectedRecordCounts.add(expectedRowSet4.rowCount()); + expectedRecordCounts.add(expectedRowSet5.rowCount()); + expectedRecordCounts.add(0); + + expectedRowSets.add(expectedRowSet2); + expectedRowSets.add(expectedRowSet3); + expectedRowSets.add(expectedRowSet4); + expectedRowSets.add(expectedRowSet5); + + testPartitionLimitCommon(-5, 2); + } +}