[
https://issues.apache.org/jira/browse/DRILL-6652?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16567450#comment-16567450
]
ASF GitHub Bot commented on DRILL-6652:
---------------------------------------
sohami closed pull request #1407: DRILL-6652: PartitionLimit changes for
Lateral and Unnest
URL: https://github.com/apache/drill/pull/1407
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/contrib/native/client/src/protobuf/UserBitShared.pb.cc
b/contrib/native/client/src/protobuf/UserBitShared.pb.cc
index 282f581c5b6..739804844bf 100644
--- a/contrib/native/client/src/protobuf/UserBitShared.pb.cc
+++ b/contrib/native/client/src/protobuf/UserBitShared.pb.cc
@@ -750,7 +750,7 @@ void protobuf_AddDesc_UserBitShared_2eproto() {
"TATEMENT\020\005*\207\001\n\rFragmentState\022\013\n\007SENDING\020"
"\000\022\027\n\023AWAITING_ALLOCATION\020\001\022\013\n\007RUNNING\020\002\022"
"\014\n\010FINISHED\020\003\022\r\n\tCANCELLED\020\004\022\n\n\006FAILED\020\005"
- "\022\032\n\026CANCELLATION_REQUESTED\020\006*\271\010\n\020CoreOpe"
+ "\022\032\n\026CANCELLATION_REQUESTED\020\006*\316\010\n\020CoreOpe"
"ratorType\022\021\n\rSINGLE_SENDER\020\000\022\024\n\020BROADCAS"
"T_SENDER\020\001\022\n\n\006FILTER\020\002\022\022\n\016HASH_AGGREGATE"
"\020\003\022\r\n\tHASH_JOIN\020\004\022\016\n\nMERGE_JOIN\020\005\022\031\n\025HAS"
@@ -777,11 +777,12 @@ void protobuf_AddDesc_UserBitShared_2eproto() {
"_SCAN\020.\022\022\n\016MONGO_SUB_SCAN\020/\022\017\n\013KUDU_WRIT"
"ER\0200\022\026\n\022OPEN_TSDB_SUB_SCAN\0201\022\017\n\013JSON_WRI"
"TER\0202\022\026\n\022HTPPD_LOG_SUB_SCAN\0203\022\022\n\016IMAGE_S"
- "UB_SCAN\0204\022\025\n\021SEQUENCE_SUB_SCAN\0205*g\n\nSasl"
- "Status\022\020\n\014SASL_UNKNOWN\020\000\022\016\n\nSASL_START\020\001"
- "\022\024\n\020SASL_IN_PROGRESS\020\002\022\020\n\014SASL_SUCCESS\020\003"
- "\022\017\n\013SASL_FAILED\020\004B.\n\033org.apache.drill.ex"
- "ec.protoB\rUserBitSharedH\001", 5385);
+ "UB_SCAN\0204\022\025\n\021SEQUENCE_SUB_SCAN\0205\022\023\n\017PART"
+ "ITION_LIMIT\0206*g\n\nSaslStatus\022\020\n\014SASL_UNKN"
+ "OWN\020\000\022\016\n\nSASL_START\020\001\022\024\n\020SASL_IN_PROGRES"
+
"S\020\002\022\020\n\014SASL_SUCCESS\020\003\022\017\n\013SASL_FAILED\020\004B."
+ "\n\033org.apache.drill.exec.protoB\rUserBitSh"
+ "aredH\001", 5406);
::google::protobuf::MessageFactory::InternalRegisterGeneratedFile(
"UserBitShared.proto", &protobuf_RegisterTypes);
UserCredentials::default_instance_ = new UserCredentials();
@@ -956,6 +957,7 @@ bool CoreOperatorType_IsValid(int value) {
case 51:
case 52:
case 53:
+ case 54:
return true;
default:
return false;
diff --git a/contrib/native/client/src/protobuf/UserBitShared.pb.h
b/contrib/native/client/src/protobuf/UserBitShared.pb.h
index 134dc2b500c..4599abb23aa 100644
--- a/contrib/native/client/src/protobuf/UserBitShared.pb.h
+++ b/contrib/native/client/src/protobuf/UserBitShared.pb.h
@@ -257,11 +257,12 @@ enum CoreOperatorType {
JSON_WRITER = 50,
HTPPD_LOG_SUB_SCAN = 51,
IMAGE_SUB_SCAN = 52,
- SEQUENCE_SUB_SCAN = 53
+ SEQUENCE_SUB_SCAN = 53,
+ PARTITION_LIMIT = 54
};
bool CoreOperatorType_IsValid(int value);
const CoreOperatorType CoreOperatorType_MIN = SINGLE_SENDER;
-const CoreOperatorType CoreOperatorType_MAX = SEQUENCE_SUB_SCAN;
+const CoreOperatorType CoreOperatorType_MAX = PARTITION_LIMIT;
const int CoreOperatorType_ARRAYSIZE = CoreOperatorType_MAX + 1;
const ::google::protobuf::EnumDescriptor* CoreOperatorType_descriptor();
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/PartitionLimit.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/PartitionLimit.java
new file mode 100644
index 00000000000..29f8bb2fe3f
--- /dev/null
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/PartitionLimit.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.config;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+
+@JsonTypeName("partition-limit")
+public class PartitionLimit extends Limit {
+ private final String partitionColumn;
+
+ @JsonCreator
+ public PartitionLimit(@JsonProperty("child") PhysicalOperator child,
@JsonProperty("first") Integer first,
+ @JsonProperty("last") Integer last,
@JsonProperty("partitionColumn") String partitionColumn) {
+ super(child, first, last);
+ this.partitionColumn = partitionColumn;
+ }
+
+ public String getPartitionColumn() {
+ return partitionColumn;
+ }
+
+ @Override
+ protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
+ return new PartitionLimit(child, getFirst(), getLast(),
getPartitionColumn());
+ }
+
+ @Override
+ public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E>
physicalVisitor, X value) throws E {
+ return physicalVisitor.visitLimit(this, value);
+ }
+
+ @Override
+ public SelectionVectorMode getSVMode() {
+ return SelectionVectorMode.TWO_BYTE;
+ }
+
+ @Override
+ public int getOperatorType() {
+ return CoreOperatorType.PARTITION_LIMIT_VALUE;
+ }
+}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
index d28fd47a110..06f0fdbee0d 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
@@ -71,6 +71,10 @@ public IterOutcome innerNext() {
for (VectorWrapper<?> wrapper : incoming) {
wrapper.getValueVector().clear();
}
+ // clear memory for incoming sv (if any)
+ if (incomingSv != null) {
+ incomingSv.clear();
+ }
upStream = next(incoming);
if (upStream == IterOutcome.OUT_OF_MEMORY) {
return upStream;
@@ -82,6 +86,12 @@ public IterOutcome innerNext() {
for (VectorWrapper<?> wrapper : incoming) {
wrapper.getValueVector().clear();
}
+
+ // clear memory for incoming sv (if any)
+ if (incomingSv != null) {
+ incomingSv.clear();
+ }
+
refreshLimitState();
return upStream;
}
@@ -109,7 +119,7 @@ public void close() {
@Override
protected boolean setupNewSchema() throws SchemaChangeException {
- container.zeroVectors();
+ container.clear();
transfers.clear();
for(final VectorWrapper<?> v : incoming) {
@@ -181,6 +191,12 @@ protected IterOutcome doWork() {
outgoingSv.allocateNew(inputRecordCount);
limit(inputRecordCount);
}
+
+ // clear memory for incoming sv (if any)
+ if (incomingSv != null) {
+ incomingSv.clear();
+ }
+
return getFinalOutcome(false);
}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/PartitionLimitBatchCreator.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/PartitionLimitBatchCreator.java
new file mode 100644
index 00000000000..9c7ebd20e43
--- /dev/null
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/PartitionLimitBatchCreator.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.limit;
+
+import com.google.common.collect.Iterables;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
+import org.apache.drill.exec.physical.config.PartitionLimit;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.record.RecordBatch;
+
+import java.util.List;
+
+public class PartitionLimitBatchCreator implements
BatchCreator<PartitionLimit> {
+ @Override
+ public PartitionLimitRecordBatch getBatch(ExecutorFragmentContext context,
PartitionLimit config,
+ List<RecordBatch> children)
+ throws ExecutionSetupException {
+ return new PartitionLimitRecordBatch(config, context,
Iterables.getOnlyElement(children));
+ }
+}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/PartitionLimitRecordBatch.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/PartitionLimitRecordBatch.java
new file mode 100644
index 00000000000..04099802f7b
--- /dev/null
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/PartitionLimitRecordBatch.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.limit;
+
+import com.google.common.collect.Lists;
+import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.config.PartitionLimit;
+import org.apache.drill.exec.record.AbstractSingleRecordBatch;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.vector.IntVector;
+
+import java.util.List;
+
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
+
+/**
+ * Helps to perform limit in a partition within a record batch. Currently only
integer type of partition column is
+ * supported. This is mainly used for Lateral/Unnest subquery where each
output batch from Unnest will contain an
+ * implicit column for rowId for each row.
+ */
+public class PartitionLimitRecordBatch extends
AbstractSingleRecordBatch<PartitionLimit> {
+ // private static final org.slf4j.Logger logger =
org.slf4j.LoggerFactory.getLogger(LimitRecordBatch.class);
+
+ private SelectionVector2 outgoingSv;
+ private SelectionVector2 incomingSv;
+
+ // Start offset of the records
+ private int recordStartOffset;
+ private int numberOfRecords;
+ private final List<TransferPair> transfers = Lists.newArrayList();
+
+ // Partition RowId which is currently being processed, this will help to
handle cases when rows for a partition id
+ // flows across 2 batches
+ private int partitionId;
+ private IntVector partitionColumn;
+
+ public PartitionLimitRecordBatch(PartitionLimit popConfig, FragmentContext
context, RecordBatch incoming)
+ throws OutOfMemoryException {
+ super(popConfig, context, incoming);
+ outgoingSv = new SelectionVector2(oContext.getAllocator());
+ refreshLimitState();
+ }
+
+ @Override
+ public SelectionVector2 getSelectionVector2() {
+ return outgoingSv;
+ }
+
+ @Override
+ public int getRecordCount() {
+ return outgoingSv.getCount();
+ }
+
+ @Override
+ public void close() {
+ outgoingSv.clear();
+ transfers.clear();
+ super.close();
+ }
+
+ @Override
+ protected boolean setupNewSchema() throws SchemaChangeException {
+ container.clear();
+ transfers.clear();
+
+ for(final VectorWrapper<?> v : incoming) {
+ final TransferPair pair = v.getValueVector().makeTransferPair(
+ container.addOrGet(v.getField(), callBack));
+ transfers.add(pair);
+
+ // Hold the transfer pair target vector for partitionColumn, since
before applying limit it transfer all rows
+ // from incoming to outgoing batch
+ String fieldName = v.getField().getName();
+ if (fieldName.equals(popConfig.getPartitionColumn())) {
+ partitionColumn = (IntVector) pair.getTo();
+ }
+ }
+
+ final BatchSchema.SelectionVectorMode svMode =
incoming.getSchema().getSelectionVectorMode();
+
+ switch(svMode) {
+ case NONE:
+ break;
+ case TWO_BYTE:
+ this.incomingSv = incoming.getSelectionVector2();
+ break;
+ default:
+ throw new UnsupportedOperationException();
+ }
+
+ if (container.isSchemaChanged()) {
+ container.buildSchema(BatchSchema.SelectionVectorMode.TWO_BYTE);
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
+ * Gets the outcome to return from super implementation and then in case of
EMIT outcome it refreshes the state of
+ * operator. Refresh is done to again apply limit on all the future incoming
batches which will be part of next
+ * record boundary.
+ * @param hasRemainder
+ * @return - IterOutcome to send downstream
+ */
+ @Override
+ protected IterOutcome getFinalOutcome(boolean hasRemainder) {
+ final IterOutcome outcomeToReturn = super.getFinalOutcome(hasRemainder);
+
+ // EMIT outcome means leaf operator is UNNEST, hence refresh the state no
matter limit is reached or not.
+ if (outcomeToReturn == EMIT) {
+ refreshLimitState();
+ }
+ return outcomeToReturn;
+ }
+
+ @Override
+ protected IterOutcome doWork() {
+ final int inputRecordCount = incoming.getRecordCount();
+ if (inputRecordCount == 0) {
+ setOutgoingRecordCount(0);
+ for (VectorWrapper vw : incoming) {
+ vw.clear();
+ }
+ // Release buffer for sv2 (if any)
+ if (incomingSv != null) {
+ incomingSv.clear();
+ }
+ return getFinalOutcome(false);
+ }
+
+ for (final TransferPair tp : transfers) {
+ tp.transfer();
+ }
+
+ // Allocate SV2 vectors for the record count size since we transfer all
the vectors buffer from input record
+ // batch to output record batch and later an SV2Remover copies the needed
records.
+ outgoingSv.allocateNew(inputRecordCount);
+ limit(inputRecordCount);
+
+ // Release memory for incoming sv (if any)
+ if (incomingSv != null) {
+ incomingSv.clear();
+ }
+ return getFinalOutcome(false);
+ }
+
+ /**
+ * limit call when incoming batch has number of records more than the start
offset such that it can produce some
+ * output records. After first call of this method recordStartOffset should
be 0 since we have already skipped the
+ * required number of records as part of first incoming record batch.
+ * @param inputRecordCount - number of records in incoming batch
+ */
+ private void limit(int inputRecordCount) {
+ boolean outputAllRecords = (numberOfRecords == Integer.MIN_VALUE);
+
+ int svIndex = 0;
+ // If partitionId is not -1 that means it's set to previous batch last
partitionId
+ partitionId = (partitionId == -1) ? getCurrentRowId(0) : partitionId;
+
+ for (int i=0; i < inputRecordCount;) {
+ // Get rowId from current right row
+ int currentRowId = getCurrentRowId(i);
+
+ if (partitionId == currentRowId) {
+ // Check if there is any start offset set for each partition and skip
those records
+ if (recordStartOffset > 0) {
+ --recordStartOffset;
+ ++i;
+ continue;
+ }
+
+ // Once the start offset records are skipped then consider rows until
numberOfRecords is reached for that
+ // partition
+ if (outputAllRecords) {
+ updateOutputSV2(svIndex++, i);
+ } else if (numberOfRecords > 0) {
+ updateOutputSV2(svIndex++, i);
+ --numberOfRecords;
+ }
+ ++i;
+ } else { // now a new row with different partition id is found
+ refreshConfigParameter();
+ partitionId = currentRowId;
+ }
+ }
+
+ setOutgoingRecordCount(svIndex);
+ }
+
+ private void updateOutputSV2(int svIndex, int incomingIndex) {
+ if (incomingSv != null) {
+ outgoingSv.setIndex(svIndex, incomingSv.getIndex(incomingIndex));
+ } else {
+ outgoingSv.setIndex(svIndex, (char) incomingIndex);
+ }
+ }
+
+ private int getCurrentRowId(int incomingIndex) {
+ if (incomingSv != null) {
+ return
partitionColumn.getAccessor().get(incomingSv.getIndex(incomingIndex));
+ } else {
+ return partitionColumn.getAccessor().get(incomingIndex);
+ }
+ }
+
+ private void setOutgoingRecordCount(int outputCount) {
+ outgoingSv.setRecordCount(outputCount);
+ container.setRecordCount(outputCount);
+ }
+
+ /**
+ * Reset the states for recordStartOffset, numberOfRecords and based on the
{@link PartitionLimit} passed to the
+ * operator. It also resets the partitionId since after EMIT outcome there
will be new partitionId to consider.
+ * This method is called for the each EMIT outcome received no matter if
limit is reached or not.
+ */
+ private void refreshLimitState() {
+ refreshConfigParameter();
+ partitionId = -1;
+ }
+
+ /**
+ * Only resets the recordStartOffset and numberOfRecord based on {@link
PartitionLimit} passed to the operator. It
+ * is explicitly called after the limit for each partitionId is met or
partitionId changes within an EMIT boundary.
+ */
+ private void refreshConfigParameter() {
+ // Make sure startOffset is non-negative
+ recordStartOffset = Math.max(0, popConfig.getFirst());
+ numberOfRecords = (popConfig.getLast() == null) ?
+ Integer.MIN_VALUE : Math.max(0, popConfig.getLast()) - recordStartOffset;
+ }
+}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
index 2e2f405a3fa..e89144db59d 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
@@ -45,7 +45,6 @@
import java.util.List;
-import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK;
import static
org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
// TODO - handle the case where a user tries to unnest a scalar, should just
return the column as is
@@ -63,12 +62,6 @@
// to keep processing it. Kill may be
called by a limit in a subquery that
// requires us to stop processing
thecurrent row, but not stop processing
// the data.
- // In some cases we need to return a predetermined state from a call to
next. These are:
- // 1) Kill is called due to an error occurring in the processing of the
query. IterOutcome should be NONE
- // 2) Kill is called by LIMIT to stop processing of the current row (This
occurs when the LIMIT is part of a subquery
- // between UNNEST and LATERAL. Iteroutcome should be EMIT
- // 3) Kill is called by LIMIT downstream from LATERAL. IterOutcome should be
NONE
- private IterOutcome nextState = OK;
private int remainderIndex = 0;
private int recordCount;
private MaterializedField unnestFieldMetadata;
@@ -159,24 +152,21 @@ public int getRecordCount() {
}
protected void killIncoming(boolean sendUpstream) {
- // Kill may be received from an operator downstream of the corresponding
lateral, or from
- // a limit that is in a subqueruy between unnest and lateral. In the
latter case, unnest has to handle the limit.
- // In the former case, Lateral will handle most of the kill handling.
-
+ //
+ // In some cases we need to return a predetermined state from a call to
next. These are:
+ // 1) Kill is called due to an error occurring in the processing of the
query. IterOutcome should be NONE
+ // 2) Kill is called by LIMIT downstream from LATERAL. IterOutcome should
be NONE
+ // With PartitionLimitBatch occurring between Lateral and Unnest subquery,
kill won't be triggered by it hence no
+ // special handling is needed in that case.
+ //
Preconditions.checkNotNull(lateral);
// Do not call kill on incoming. Lateral Join has the responsibility for
killing incoming
- if (context.getExecutorState().isFailed() || lateral.getLeftOutcome() ==
IterOutcome.STOP) {
- logger.debug("Kill received. Stopping all processing");
- nextState = IterOutcome.NONE ;
- } else {
- // if we have already processed the record, then kill from a limit has
no meaning.
- // if, however, we have values remaining to be emitted, and limit has
been reached,
- // we abandon the remainder and send an empty batch with EMIT.
- logger.debug("Kill received from subquery. Stopping processing of
current input row.");
- if(hasRemainder) {
- nextState = IterOutcome.EMIT;
- }
- }
+ Preconditions.checkState(context.getExecutorState().isFailed() ||
+ lateral.getLeftOutcome() == IterOutcome.STOP, "Kill received by unnest
with unexpected state. " +
+ "Neither the LateralOutcome is STOP nor executor state is failed");
+ logger.debug("Kill received. Stopping all processing");
+ state = BatchState.DONE;
+ recordCount = 0;
hasRemainder = false; // whatever the case, we need to stop processing the
current row.
}
@@ -190,11 +180,6 @@ public IterOutcome innerNext() {
return IterOutcome.NONE;
}
- if (nextState == IterOutcome.NONE || nextState == IterOutcome.EMIT) {
- recordCount = 0;
- return nextState;
- }
-
if (hasNewSchema) {
memoryManager.update();
hasNewSchema = false;
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LimitPrel.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LimitPrel.java
index 5d3c6c68653..057cfaed2c4 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LimitPrel.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LimitPrel.java
@@ -17,9 +17,12 @@
*/
package org.apache.drill.exec.planner.physical;
+import org.apache.calcite.rel.RelWriter;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.config.Limit;
+import org.apache.drill.exec.physical.config.PartitionLimit;
import org.apache.drill.exec.planner.common.DrillLimitRelBase;
+import org.apache.drill.exec.planner.common.DrillRelOptUtil;
import org.apache.drill.exec.planner.physical.visitor.PrelVisitor;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
import org.apache.calcite.rel.RelNode;
@@ -33,6 +36,7 @@
import java.util.List;
public class LimitPrel extends DrillLimitRelBase implements Prel {
+ private boolean isPartitioned = false;
public LimitPrel(RelOptCluster cluster, RelTraitSet traitSet, RelNode child,
RexNode offset, RexNode fetch) {
super(cluster, traitSet, child, offset, fetch);
@@ -42,9 +46,14 @@ public LimitPrel(RelOptCluster cluster, RelTraitSet
traitSet, RelNode child, Rex
super(cluster, traitSet, child, offset, fetch, pushDown);
}
+ public LimitPrel(RelOptCluster cluster, RelTraitSet traitSet, RelNode child,
RexNode offset, RexNode fetch, boolean pushDown, boolean isPartitioned) {
+ super(cluster, traitSet, child, offset, fetch, pushDown);
+ this.isPartitioned = isPartitioned;
+ }
+
@Override
public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
- return new LimitPrel(getCluster(), traitSet, sole(inputs), offset, fetch,
isPushDown());
+ return new LimitPrel(getCluster(), traitSet, sole(inputs), offset, fetch,
isPushDown(), isPartitioned);
}
@Override
@@ -60,7 +69,12 @@ public PhysicalOperator
getPhysicalOperator(PhysicalPlanCreator creator) throws
// Null value implies including entire remaining result set from first
offset
Integer last = fetch != null ? Math.max(0, RexLiteral.intValue(fetch)) +
first : null;
- Limit limit = new Limit(childPOP, first, last);
+ Limit limit;
+ if (isPartitioned) {
+ limit = new PartitionLimit(childPOP, first, last,
DrillRelOptUtil.IMPLICIT_COLUMN);
+ } else {
+ limit = new Limit(childPOP, first, last);
+ }
return creator.addMetadata(this, limit);
}
@@ -74,6 +88,13 @@ public PhysicalOperator
getPhysicalOperator(PhysicalPlanCreator creator) throws
return logicalVisitor.visitPrel(this, value);
}
+ @Override
+ public RelWriter explainTerms(RelWriter pw) {
+ super.explainTerms(pw);
+ pw.itemIf("partitioned", isPartitioned, isPartitioned);
+ return pw;
+ }
+
@Override
public SelectionVectorMode[] getSupportedEncodings() {
return SelectionVectorMode.NONE_AND_TWO;
@@ -91,6 +112,6 @@ public boolean needsFinalColumnReordering() {
@Override
public Prel addImplicitRowIDCol(List<RelNode> children) {
- return (Prel) this.copy(this.traitSet, children);
+ return new LimitPrel(this.getCluster(), this.traitSet, children.get(0),
getOffset(), getFetch(), isPushDown(), true);
}
}
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/BaseTestOpBatchEmitOutcome.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/BaseTestOpBatchEmitOutcome.java
index cd246407837..4eaca2bf063 100644
---
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/BaseTestOpBatchEmitOutcome.java
+++
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/BaseTestOpBatchEmitOutcome.java
@@ -23,6 +23,7 @@
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.record.selection.SelectionVector2;
import org.apache.drill.exec.store.mock.MockStorePOP;
import org.apache.drill.test.rowSet.RowSet;
import org.apache.drill.test.rowSet.schema.SchemaBuilder;
@@ -48,6 +49,9 @@
// List of incoming containers
protected final List<VectorContainer> inputContainer = new ArrayList<>(5);
+ // List of SV2's
+ protected final List<SelectionVector2> inputContainerSv2 = new
ArrayList<>(5);
+
// List of incoming IterOutcomes
protected final List<RecordBatch.IterOutcome> inputOutcomes = new
ArrayList<>(5);
@@ -79,6 +83,7 @@ public void afterTest() throws Exception {
nonEmptyInputRowSet.clear();
inputContainer.clear();
inputOutcomes.clear();
+ inputContainerSv2.clear();
outputRecordCount = 0;
}
}
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java
index 0c43ab290f1..ed7af4cbee8 100644
---
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java
+++
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java
@@ -38,6 +38,7 @@
// These resources are owned by this RecordBatch
protected VectorContainer container;
+ protected SelectionVector2 sv2;
private int currentContainerIndex;
private int currentOutcomeIndex;
private boolean isDone;
@@ -45,6 +46,7 @@
// All the below resources are owned by caller
private final List<VectorContainer> allTestContainers;
+ private List<SelectionVector2> allTestContainersSv2;
private final List<IterOutcome> allOutcomes;
private final FragmentContext context;
protected final OperatorContext oContext;
@@ -62,19 +64,32 @@ public MockRecordBatch(FragmentContext context,
OperatorContext oContext,
this.currentContainerIndex = 0;
this.currentOutcomeIndex = 0;
this.isDone = false;
+ this.allTestContainersSv2 = null;
+ this.sv2 = null;
+ }
+
+ public MockRecordBatch(FragmentContext context, OperatorContext oContext,
+ List<VectorContainer> testContainers,
List<IterOutcome> iterOutcomes,
+ List<SelectionVector2> testContainersSv2, BatchSchema
schema) {
+ this(context, oContext, testContainers, iterOutcomes, schema);
+ allTestContainersSv2 = testContainersSv2;
+ sv2 = (allTestContainersSv2 != null && allTestContainersSv2.size() > 0) ?
new SelectionVector2(allocator) : null;
}
@Override
- public void close() throws Exception {
+ public void close() {
container.clear();
container.setRecordCount(0);
currentContainerIndex = 0;
currentOutcomeIndex = 0;
+ if (sv2 != null) {
+ sv2.clear();
+ }
}
@Override
public SelectionVector2 getSelectionVector2() {
- return null;
+ return sv2;
}
@Override
@@ -94,7 +109,7 @@ public BatchSchema getSchema() {
@Override
public int getRecordCount() {
- return container.getRecordCount();
+ return (sv2 == null) ? container.getRecordCount() : sv2.getCount();
}
@Override
@@ -103,6 +118,9 @@ public void kill(boolean sendUpstream) {
isDone = true;
container.clear();
container.setRecordCount(0);
+ if (sv2 != null) {
+ sv2.clear();
+ }
}
}
@@ -142,6 +160,18 @@ public IterOutcome next() {
}
container.transferIn(input);
container.setRecordCount(recordCount);
+
+ // Transfer the sv2 as well
+ final SelectionVector2 inputSv2 =
+ (allTestContainersSv2 != null && allTestContainersSv2.size() > 0)
+ ? allTestContainersSv2.get(currentContainerIndex) : null;
+ if (inputSv2 != null) {
+ sv2.allocateNewSafe(inputSv2.getCount());
+ for (int i=0; i<inputSv2.getCount(); ++i) {
+ sv2.setIndex(i, inputSv2.getIndex(i));
+ }
+ sv2.setRecordCount(inputSv2.getCount());
+ }
}
if (currentOutcomeIndex < allOutcomes.size()) {
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/PartitionLimit/TestPartitionLimitBatch.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/PartitionLimit/TestPartitionLimitBatch.java
new file mode 100644
index 00000000000..574ff768dad
--- /dev/null
+++
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/PartitionLimit/TestPartitionLimitBatch.java
@@ -0,0 +1,1022 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.PartitionLimit;
+
+import org.apache.drill.categories.OperatorTest;
+import org.apache.drill.exec.physical.config.PartitionLimit;
+import org.apache.drill.exec.physical.impl.BaseTestOpBatchEmitOutcome;
+import org.apache.drill.exec.physical.impl.MockRecordBatch;
+import org.apache.drill.exec.physical.impl.limit.PartitionLimitRecordBatch;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.test.rowSet.IndirectRowSet;
+import org.apache.drill.test.rowSet.RowSet;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.junit.After;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertTrue;
+
+@Category(OperatorTest.class)
+public class TestPartitionLimitBatch extends BaseTestOpBatchEmitOutcome {
+
+ private static String PARTITION_COLUMN;
+
+ // Holds reference to actual operator instance created for each tests
+ private static PartitionLimitRecordBatch limitBatch;
+
+ // Lits of expected outcomes populated by each tests. Used to verify actual
IterOutcome returned with next call on
+ // operator to expected outcome
+ private final List<RecordBatch.IterOutcome> expectedOutcomes = new
ArrayList<>();
+
+ // List of expected row counts populated by each tests. Used to verify
actual output row count to expected row count
+ private final List<Integer> expectedRecordCounts = new ArrayList<>();
+
+ // List of expected row sets populated by each tests. Used to verify actual
output from operator to expected output
+ private final List<RowSet> expectedRowSets = new ArrayList<>();
+
+ @BeforeClass
+ public static void partitionLimitSetup() {
+ PARTITION_COLUMN = inputSchema.column(0).getName();
+ }
+
+ /**
+ * Cleanup method executed post each test
+ */
+ @After
+ public void afterTestCleanup() {
+ // close limitBatch
+ limitBatch.close();
+
+ // Release memory from expectedRowSets
+ for (RowSet expectedRowSet : expectedRowSets) {
+ expectedRowSet.clear();
+ }
+ expectedOutcomes.clear();
+ expectedRecordCounts.clear();
+ expectedRowSets.clear();
+ }
+
+ /**
+ * Common method used by all the tests for {@link PartitionLimitRecordBatch}
below. It creates the MockRecordBatch
+ * and {@link PartitionLimitRecordBatch} with the populated containers and
outcomes list in the test. It also
+ * verifies the expected outcomes list and record count populated by each
test against each next() call to
+ * {@link PartitionLimitRecordBatch}. For cases when the expected record
count is >0 it verifies the actual output
+ * returned by {@link PartitionLimitRecordBatch} with expected output rows.
+ * @param start - Start offset for {@link PartitionLimit} PopConfig
+ * @param end - End offset for {@link PartitionLimit} PopConfig
+ */
+ private void testPartitionLimitCommon(Integer start, Integer end) {
+ final MockRecordBatch mockInputBatch = new
MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+ inputContainer, inputOutcomes, inputContainerSv2,
inputContainer.get(0).getSchema());
+
+ final PartitionLimit limitConf = new PartitionLimit(null, start, end,
PARTITION_COLUMN);
+ limitBatch = new PartitionLimitRecordBatch(limitConf,
operatorFixture.getFragmentContext(), mockInputBatch);
+
+ int i=0;
+ int expectedRowSetIndex = 0;
+ while (i < expectedOutcomes.size()) {
+ try {
+ assertTrue(expectedOutcomes.get(i) == limitBatch.next());
+ assertTrue(expectedRecordCounts.get(i++) ==
limitBatch.getRecordCount());
+
+ if (limitBatch.getRecordCount() > 0) {
+ final RowSet actualRowSet =
IndirectRowSet.fromSv2(limitBatch.getContainer(),
+ limitBatch.getSelectionVector2());
+ new
RowSetComparison(expectedRowSets.get(expectedRowSetIndex++)).verify(actualRowSet);
+ }
+ } finally {
+ limitBatch.getSelectionVector2().clear();
+ limitBatch.getContainer().zeroVectors();
+ }
+ }
+ }
+
+ /**
+ * Verifies that empty batch with both OK_NEW_SCHEMA and EMIT outcome is not
ignored by
+ * {@link PartitionLimitRecordBatch} and is passed to the downstream
operator.
+ */
+ @Test
+ public void testPartitionLimit_EmptyBatchEmitOutcome() {
+ inputContainer.add(emptyInputRowSet.container());
+ inputContainer.add(emptyInputRowSet.container());
+
+ inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+ expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+ expectedRecordCounts.add(0);
+ expectedRecordCounts.add(0);
+
+ testPartitionLimitCommon(0, 1);
+ }
+
+ /**
+ * Verifies {@link PartitionLimitRecordBatch} considers all the batch until
it sees EMIT outcome and return output
+ * batch with data that meets the {@link PartitionLimitRecordBatch} criteria.
+ */
+ @Test
+ public void testPartitionLimit_NonEmptyBatchEmitOutcome() {
+ inputContainer.add(emptyInputRowSet.container());
+ inputContainer.add(nonEmptyInputRowSet.container());
+
+ inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+ expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+ expectedRecordCounts.add(0);
+ expectedRecordCounts.add(1);
+
+ RowSet expectedBatch = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(1, 10, "item1")
+ .build();
+ expectedRowSets.add(expectedBatch);
+
+ testPartitionLimitCommon(0, 1);
+ }
+
+ /**
+ * Verifies that {@link PartitionLimitRecordBatch} batch operates on batches
across EMIT boundary with fresh
+ * configuration. That is it considers partition column data separately for
batches across EMIT boundary.
+ */
+ @Test
+ public void testPartitionLimit_ResetsAfterFirstEmitOutcome() {
+ final RowSet.SingleRowSet nonEmptyInputRowSet2 =
operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(1, 100, "item100")
+ .addRow(1, 101, "item101")
+ .addRow(1, 102, "item102")
+ .addRow(2, 200, "item200")
+ .build();
+
+ final RowSet expectedRowSet1 = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(1, 10, "item1")
+ .build();
+ final RowSet expectedRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(1, 100, "item100")
+ .addRow(2, 200, "item200")
+ .build();
+
+ inputContainer.add(nonEmptyInputRowSet.container());
+ inputContainer.add(emptyInputRowSet.container());
+ inputContainer.add(nonEmptyInputRowSet2.container());
+
+ inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+ inputOutcomes.add(RecordBatch.IterOutcome.OK);
+
+
+ expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+ expectedOutcomes.add(RecordBatch.IterOutcome.OK);
+ expectedOutcomes.add(RecordBatch.IterOutcome.NONE);
+
+ expectedRecordCounts.add(1);
+ expectedRecordCounts.add(0);
+ // Since in this input batch there is 2 different partitionId
+ expectedRecordCounts.add(2);
+ expectedRecordCounts.add(0);
+
+ expectedRowSets.add(expectedRowSet1);
+ expectedRowSets.add(expectedRowSet2);
+
+ testPartitionLimitCommon(0, 1);
+ }
+
+ /**
+ * Verifies that when the {@link PartitionLimitRecordBatch} number of
records is found with first incoming batch,
+ * then next empty incoming batch with OK outcome is ignored, but the empty
EMIT outcome batch is not ignored.
+ * Empty incoming batch with EMIT outcome produces empty output batch with
EMIT outcome.
+ */
+ @Test
+ public void testPartitionLimit_NonEmptyFirst_EmptyOKEmitOutcome() {
+ inputContainer.add(nonEmptyInputRowSet.container());
+ inputContainer.add(emptyInputRowSet.container());
+ inputContainer.add(emptyInputRowSet.container());
+ inputContainer.add(emptyInputRowSet.container());
+
+ inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ inputOutcomes.add(RecordBatch.IterOutcome.OK);
+ inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+ inputOutcomes.add(RecordBatch.IterOutcome.NONE);
+
+ expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+ expectedOutcomes.add(RecordBatch.IterOutcome.NONE);
+
+ expectedRecordCounts.add(1);
+ expectedRecordCounts.add(0);
+ expectedRecordCounts.add(0);
+
+ final RowSet expectedRowSet1 = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(1, 10, "item1")
+ .build();
+ expectedRowSets.add(expectedRowSet1);
+
+ testPartitionLimitCommon(0, 1);
+ }
+
+ /**
+ * Verifies that {@link PartitionLimitRecordBatch} refreshes it's state
after seeing first EMIT outcome and works on
+ * data batches following it as new set's of incoming batch and apply the
partition limit rule from fresh on those.
+ * So for first set of batches with OK_NEW_SCHEMA and EMIT outcome the total
number of records received being less
+ * than limit condition, it still produces an output with that many records
for each partition key (in this case 1
+ * even though limit number of records is 2).
+ *
+ * After seeing EMIT, it refreshes it's state and operate on next input
batches to again return limit number of
+ * records per partition id. So for 3rd batch with 6 records and 3 partition
id and with EMIT outcome it produces an
+ * output batch with <=2 records for each partition id.
+ */
+ @Test
+ public void testPartitionLimit_AcrossEmitOutcome() {
+ final RowSet.SingleRowSet nonEmptyInputRowSet2 =
operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(1, 100, "item100")
+ .addRow(1, 101, "item101")
+ .addRow(1, 102, "item102")
+ .addRow(2, 200, "item200")
+ .addRow(3, 300, "item300")
+ .addRow(3, 301, "item301")
+ .build();
+
+ final RowSet expectedRows1 = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(1, 10, "item1")
+ .build();
+
+ final RowSet.SingleRowSet expectedRows2 =
operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(1, 100, "item100")
+ .addRow(1, 101, "item101")
+ .addRow(2, 200, "item200")
+ .addRow(3, 300, "item300")
+ .addRow(3, 301, "item301")
+ .build();
+
+ inputContainer.add(nonEmptyInputRowSet.container());
+ inputContainer.add(emptyInputRowSet.container());
+ inputContainer.add(nonEmptyInputRowSet2.container());
+ inputContainer.add(emptyInputRowSet.container());
+
+ inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+ inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+ expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+ expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+ expectedOutcomes.add(RecordBatch.IterOutcome.NONE);
+
+ expectedRecordCounts.add(expectedRows1.rowCount());
+ expectedRecordCounts.add(0);
+ expectedRecordCounts.add(expectedRows2.rowCount());
+ expectedRecordCounts.add(0);
+
+ expectedRowSets.add(expectedRows1);
+ expectedRowSets.add(expectedRows2);
+
+ testPartitionLimitCommon(0, 2);
+ }
+
+ /**
+ * Verifies that {@link PartitionLimitRecordBatch} considers same partition
id across batches but within EMIT
+ * boundary to impose limit condition.
+ */
+ @Test
+ public void testPartitionLimit_PartitionIdSpanningAcrossBatches() {
+
+ final RowSet.SingleRowSet nonEmptyInputRowSet2 =
operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(1, 100, "item100")
+ .addRow(1, 101, "item101")
+ .addRow(1, 102, "item102")
+ .addRow(1, 103, "item103")
+ .addRow(2, 200, "item200")
+ .addRow(2, 201, "item201")
+ .addRow(2, 202, "item202")
+ .build();
+
+ final RowSet expectedRowSet1 = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(1, 10, "item1")
+ .build();
+
+ final RowSet expectedRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(2, 200, "item200")
+ .build();
+
+ inputContainer.add(nonEmptyInputRowSet.container());
+ inputContainer.add(emptyInputRowSet.container());
+ inputContainer.add(nonEmptyInputRowSet2.container());
+ inputContainer.add(emptyInputRowSet.container());
+
+ inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ inputOutcomes.add(RecordBatch.IterOutcome.OK);
+ inputOutcomes.add(RecordBatch.IterOutcome.OK);
+ inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+ expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ // second OK batch is consumed by abstractRecordBatch since it's empty
+ expectedOutcomes.add(RecordBatch.IterOutcome.OK);
+ expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+ expectedRecordCounts.add(1);
+ expectedRecordCounts.add(1);
+ expectedRecordCounts.add(0);
+
+ expectedRowSets.add(expectedRowSet1);
+ expectedRowSets.add(expectedRowSet2);
+
+ testPartitionLimitCommon(0 ,1);
+ }
+
+ @Test
+ public void testPartitionLimit_PartitionIdSpanningAcrossBatches_WithOffset()
{
+ final RowSet.SingleRowSet nonEmptyInputRowSet2 =
operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(1, 100, "item100")
+ .addRow(1, 101, "item101")
+ .addRow(1, 102, "item102")
+ .addRow(1, 103, "item103")
+ .addRow(2, 200, "item200")
+ .addRow(2, 201, "item201")
+ .addRow(2, 202, "item202")
+ .build();
+
+ final RowSet expectedRowSet1 = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(1, 101, "item101")
+ .addRow(2, 202, "item202")
+ .build();
+
+ inputContainer.add(nonEmptyInputRowSet.container());
+ inputContainer.add(emptyInputRowSet.container());
+ inputContainer.add(nonEmptyInputRowSet2.container());
+ inputContainer.add(emptyInputRowSet.container());
+
+ inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ inputOutcomes.add(RecordBatch.IterOutcome.OK);
+ inputOutcomes.add(RecordBatch.IterOutcome.OK);
+ inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+ expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ // second OK batch is consumed by abstractRecordBatch since it's empty
+ expectedOutcomes.add(RecordBatch.IterOutcome.OK);
+ expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+ expectedRecordCounts.add(0);
+ expectedRecordCounts.add(2);
+ expectedRecordCounts.add(0);
+
+ expectedRowSets.add(expectedRowSet1);
+
+ testPartitionLimitCommon(2 ,3);
+ }
+
+ /**
+ * Verifies {@link PartitionLimitRecordBatch} works correctly in cases a
partition id spans across batches and
+ * limit condition is met by picking records from multiple batch for same
partition id.
+ */
+ @Test
+ public void testPartitionLimit_PartitionIdSelectedAcrossBatches() {
+ final RowSet.SingleRowSet nonEmptyInputRowSet2 =
operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(1, 100, "item100")
+ .addRow(1, 101, "item101")
+ .addRow(1, 102, "item102")
+ .addRow(1, 103, "item103")
+ .addRow(2, 200, "item200")
+ .addRow(2, 201, "item201")
+ .addRow(2, 202, "item202")
+ .build();
+
+ final RowSet expectedRowSet1 = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(1, 10, "item1")
+ .build();
+
+ final RowSet expectedRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(1, 100, "item100")
+ .addRow(1, 101, "item101")
+ .addRow(1, 102, "item102")
+ .addRow(1, 103, "item103")
+ .addRow(2, 200, "item200")
+ .addRow(2, 201, "item201")
+ .addRow(2, 202, "item202")
+ .build();
+
+ inputContainer.add(nonEmptyInputRowSet.container());
+ inputContainer.add(emptyInputRowSet.container());
+ inputContainer.add(nonEmptyInputRowSet2.container());
+ inputContainer.add(emptyInputRowSet.container());
+
+ inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ inputOutcomes.add(RecordBatch.IterOutcome.OK);
+ inputOutcomes.add(RecordBatch.IterOutcome.OK);
+ inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+ expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ // second OK batch is consumed by abstractRecordBatch since it's empty
+ expectedOutcomes.add(RecordBatch.IterOutcome.OK);
+ expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+ expectedRecordCounts.add(expectedRowSet1.rowCount());
+ expectedRecordCounts.add(expectedRowSet2.rowCount());
+ expectedRecordCounts.add(0);
+
+ expectedRowSets.add(expectedRowSet1);
+ expectedRowSets.add(expectedRowSet2);
+
+ testPartitionLimitCommon(0 ,5);
+ }
+
+ /**
+ * Verifies {@link PartitionLimitRecordBatch} works correctly in cases where
start offset is such that all the
+ * records of a partition id is ignored but records in other partition id is
selected.
+ */
+ @Test
+ public void testPartitionLimit_IgnoreOnePartitionIdWithOffset() {
+ final RowSet.SingleRowSet nonEmptyInputRowSet2 =
operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(1, 100, "item100")
+ .addRow(1, 101, "item101")
+ .addRow(1, 102, "item102")
+ .addRow(1, 103, "item103")
+ .addRow(2, 200, "item200")
+ .addRow(2, 201, "item201")
+ .addRow(2, 202, "item202")
+ .build();
+
+ final RowSet expectedRowSet1 = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(1, 102, "item102")
+ .addRow(1, 103, "item103")
+ .build();
+
+ inputContainer.add(nonEmptyInputRowSet.container());
+ inputContainer.add(emptyInputRowSet.container());
+ inputContainer.add(nonEmptyInputRowSet2.container());
+ inputContainer.add(emptyInputRowSet.container());
+
+ inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ inputOutcomes.add(RecordBatch.IterOutcome.OK);
+ inputOutcomes.add(RecordBatch.IterOutcome.OK);
+ inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+ expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ // second OK batch is consumed by abstractRecordBatch since it's empty
+ expectedOutcomes.add(RecordBatch.IterOutcome.OK);
+ expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+ expectedRecordCounts.add(0);
+ expectedRecordCounts.add(expectedRowSet1.rowCount());
+ expectedRecordCounts.add(0);
+
+ expectedRowSets.add(expectedRowSet1);
+
+ testPartitionLimitCommon(3, 5);
+ }
+
+ @Test
+ public void testPartitionLimit_LargeOffsetIgnoreAllRecords() {
+ final RowSet.SingleRowSet nonEmptyInputRowSet2 =
operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(1, 100, "item100")
+ .addRow(1, 101, "item101")
+ .addRow(1, 102, "item102")
+ .addRow(1, 103, "item103")
+ .addRow(2, 200, "item200")
+ .addRow(2, 201, "item201")
+ .addRow(2, 202, "item202")
+ .build();
+
+ inputContainer.add(nonEmptyInputRowSet.container());
+ inputContainer.add(emptyInputRowSet.container());
+ inputContainer.add(nonEmptyInputRowSet2.container());
+ inputContainer.add(emptyInputRowSet.container());
+
+ inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ inputOutcomes.add(RecordBatch.IterOutcome.OK);
+ inputOutcomes.add(RecordBatch.IterOutcome.OK);
+ inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+ expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ // second OK batch is consumed by abstractRecordBatch since it's empty
+ expectedOutcomes.add(RecordBatch.IterOutcome.OK);
+ expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+ expectedRecordCounts.add(0);
+ expectedRecordCounts.add(0);
+ expectedRecordCounts.add(0);
+
+ testPartitionLimitCommon(5, 6);
+ }
+
+ /**
+ * Verifies {@link PartitionLimitRecordBatch} works correctly when start and
end offset is same. In this case it
+ * works as Limit 0 scenario where it will not output any rows for any
partition id across batches.
+ */
+ @Test
+ public void testPartitionLimit_Limit0() {
+ final RowSet.SingleRowSet nonEmptyInputRowSet2 =
operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(1, 100, "item100")
+ .addRow(1, 101, "item101")
+ .addRow(1, 102, "item102")
+ .addRow(1, 103, "item103")
+ .addRow(2, 200, "item200")
+ .addRow(2, 201, "item201")
+ .addRow(2, 202, "item202")
+ .build();
+
+ inputContainer.add(nonEmptyInputRowSet.container());
+ inputContainer.add(emptyInputRowSet.container());
+ inputContainer.add(nonEmptyInputRowSet2.container());
+ inputContainer.add(emptyInputRowSet.container());
+
+ inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ inputOutcomes.add(RecordBatch.IterOutcome.OK);
+ inputOutcomes.add(RecordBatch.IterOutcome.OK);
+ inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+ expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ // second OK batch is consumed by abstractRecordBatch since it's empty
+ expectedOutcomes.add(RecordBatch.IterOutcome.OK);
+ expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+ expectedRecordCounts.add(0);
+ expectedRecordCounts.add(0);
+ expectedRecordCounts.add(0);
+
+ testPartitionLimitCommon(0, 0);
+ }
+
+ /**
+ * Verifies {@link PartitionLimitRecordBatch} works correctly for cases
where no end offset is mentioned. This
+ * necessary means selecting all the records in a partition.
+ */
+ @Test
+ public void testPartitionLimit_NoLimit() {
+ final RowSet.SingleRowSet nonEmptyInputRowSet2 =
operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(1, 100, "item100")
+ .addRow(1, 101, "item101")
+ .addRow(1, 102, "item102")
+ .addRow(1, 103, "item103")
+ .addRow(2, 200, "item200")
+ .addRow(2, 201, "item201")
+ .addRow(2, 202, "item202")
+ .build();
+
+ final RowSet expectedRowSet1 = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(1, 10, "item1")
+ .build();
+
+ final RowSet expectedRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(1, 100, "item100")
+ .addRow(1, 101, "item101")
+ .addRow(1, 102, "item102")
+ .addRow(1, 103, "item103")
+ .addRow(2, 200, "item200")
+ .addRow(2, 201, "item201")
+ .addRow(2, 202, "item202")
+ .build();
+
+ inputContainer.add(nonEmptyInputRowSet.container());
+ inputContainer.add(emptyInputRowSet.container());
+ inputContainer.add(nonEmptyInputRowSet2.container());
+ inputContainer.add(emptyInputRowSet.container());
+
+ inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ inputOutcomes.add(RecordBatch.IterOutcome.OK);
+ inputOutcomes.add(RecordBatch.IterOutcome.OK);
+ inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+ expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ // second OK batch is consumed by abstractRecordBatch since it's empty
+ expectedOutcomes.add(RecordBatch.IterOutcome.OK);
+ expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+ expectedRecordCounts.add(expectedRowSet1.rowCount());
+ expectedRecordCounts.add(expectedRowSet2.rowCount());
+ expectedRecordCounts.add(0);
+
+ expectedRowSets.add(expectedRowSet1);
+ expectedRowSets.add(expectedRowSet2);
+
+ testPartitionLimitCommon(0, null);
+ }
+
+ /**
+ * Verifies {@link PartitionLimitRecordBatch} takes care of provided
negative start offset correctly
+ */
+ @Test
+ public void testPartitionLimit_NegativeOffset() {
+ final RowSet.SingleRowSet nonEmptyInputRowSet2 =
operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(1, 100, "item100")
+ .addRow(1, 101, "item101")
+ .addRow(1, 102, "item102")
+ .addRow(1, 103, "item103")
+ .addRow(2, 200, "item200")
+ .addRow(2, 201, "item201")
+ .addRow(2, 202, "item202")
+ .build();
+
+ final RowSet expectedRowSet1 = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(1, 10, "item1")
+ .build();
+
+ final RowSet expectedRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(1, 100, "item100")
+ .addRow(2, 200, "item200")
+ .addRow(2, 201, "item201")
+ .build();
+
+ inputContainer.add(nonEmptyInputRowSet.container());
+ inputContainer.add(emptyInputRowSet.container());
+ inputContainer.add(nonEmptyInputRowSet2.container());
+ inputContainer.add(emptyInputRowSet.container());
+
+ inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ inputOutcomes.add(RecordBatch.IterOutcome.OK);
+ inputOutcomes.add(RecordBatch.IterOutcome.OK);
+ inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+ expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ // second OK batch is consumed by abstractRecordBatch since it's empty
+ expectedOutcomes.add(RecordBatch.IterOutcome.OK);
+ expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+ expectedRecordCounts.add(expectedRowSet1.rowCount());
+ expectedRecordCounts.add(expectedRowSet2.rowCount());
+ expectedRecordCounts.add(0);
+
+ expectedRowSets.add(expectedRowSet1);
+ expectedRowSets.add(expectedRowSet2);
+
+ testPartitionLimitCommon(-5, 2);
+ }
+
+ /**
+ * Verifies {@link PartitionLimitRecordBatch} behaves correctly across EMIT
boundary with single or multiple
+ * batches within each EMIT boundary. It resets it states correctly across
EMIT boundary and then operates on all
+ * the batches within EMIT boundary at a time.
+ */
+ @Test
+ public void testPartitionLimit_MultipleEmit_SingleMultipleBatch() {
+ final RowSet.SingleRowSet nonEmptyInputRowSet2 =
operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(1, 100, "item100")
+ .addRow(1, 101, "item101")
+ .addRow(1, 102, "item102")
+ .addRow(1, 103, "item103")
+ .addRow(2, 200, "item200")
+ .addRow(2, 201, "item201")
+ .addRow(2, 202, "item202")
+ .build();
+
+ // Second EMIT boundary batches
+ final RowSet.SingleRowSet nonEmptyInputRowSet3 =
operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(1, 1001, "item1001")
+ .addRow(1, 1002, "item1002")
+ .addRow(1, 1003, "item1003")
+ .addRow(2, 2000, "item2000")
+ .addRow(2, 2001, "item2001")
+ .addRow(2, 2002, "item2002")
+ .build();
+
+ final RowSet.SingleRowSet nonEmptyInputRowSet4 =
operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(3, 3001, "item3001")
+ .addRow(3, 3002, "item3002")
+ .addRow(3, 3003, "item3003")
+ .addRow(4, 4000, "item4000")
+ .addRow(4, 4001, "item4001")
+ .build();
+
+ // Third EMIT boundary batches
+ final RowSet.SingleRowSet nonEmptyInputRowSet5 =
operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(1, 10001, "item10001")
+ .addRow(1, 10002, "item10002")
+ .addRow(1, 10003, "item10003")
+ .build();
+
+ // First EMIT boundary expected rowsets
+ final RowSet expectedRowSet1 = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(1, 10, "item1")
+ .build();
+
+ final RowSet expectedRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(1, 100, "item100")
+ .addRow(2, 200, "item200")
+ .addRow(2, 201, "item201")
+ .build();
+
+ // Second EMIT boundary expected rowsets
+ final RowSet expectedRowSet3 = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(1, 1001, "item1001")
+ .addRow(1, 1002, "item1002")
+ .addRow(2, 2000, "item2000")
+ .addRow(2, 2001, "item2001")
+ .build();
+
+ final RowSet expectedRowSet4 = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(3, 3001, "item3001")
+ .addRow(3, 3002, "item3002")
+ .addRow(4, 4000, "item4000")
+ .addRow(4, 4001, "item4001")
+ .build();
+
+ // Third EMIT boundary expected rowsets
+ final RowSet expectedRowSet5 = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(1, 10001, "item10001")
+ .addRow(1, 10002, "item10002")
+ .build();
+
+ inputContainer.add(nonEmptyInputRowSet.container());
+ inputContainer.add(nonEmptyInputRowSet2.container());
+ inputContainer.add(nonEmptyInputRowSet3.container());
+ inputContainer.add(nonEmptyInputRowSet4.container());
+ inputContainer.add(nonEmptyInputRowSet5.container());
+ inputContainer.add(emptyInputRowSet.container());
+
+ inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+ inputOutcomes.add(RecordBatch.IterOutcome.OK);
+ inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+ inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+ inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+ expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+ expectedOutcomes.add(RecordBatch.IterOutcome.OK);
+ expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+ expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+ expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+ expectedRecordCounts.add(expectedRowSet1.rowCount());
+ expectedRecordCounts.add(expectedRowSet2.rowCount());
+ expectedRecordCounts.add(expectedRowSet3.rowCount());
+ expectedRecordCounts.add(expectedRowSet4.rowCount());
+ expectedRecordCounts.add(expectedRowSet5.rowCount());
+ expectedRecordCounts.add(0);
+
+ expectedRowSets.add(expectedRowSet1);
+ expectedRowSets.add(expectedRowSet2);
+ expectedRowSets.add(expectedRowSet3);
+ expectedRowSets.add(expectedRowSet4);
+ expectedRowSets.add(expectedRowSet5);
+
+ testPartitionLimitCommon(-5, 2);
+ }
+
+ /**
+ * Verifies {@link PartitionLimitRecordBatch} behaves correctly across EMIT
boundary with single or multiple
+ * batches (with sv2) within each EMIT boundary. It resets it states
correctly across EMIT boundary and then
+ * operates on all the batches within EMIT boundary at a time.
+ */
+ @Test
+ public void testPartitionLimit_MultipleEmit_SingleMultipleBatch_WithSV2() {
+ final RowSet.SingleRowSet emptyWithSv2 =
operatorFixture.rowSetBuilder(inputSchema)
+ .withSv2()
+ .build();
+
+ final RowSet.SingleRowSet nonEmptyInputRowSet2 =
operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(1, 100, "item100")
+ .addRow(1, 101, "item101")
+ .addRow(1, 102, "item102")
+ .addRow(1, 103, "item103")
+ .addRow(2, 200, "item200")
+ .addRow(2, 201, "item201")
+ .addRow(2, 202, "item202")
+ .withSv2()
+ .build();
+
+ // Second EMIT boundary batches
+ final RowSet.SingleRowSet nonEmptyInputRowSet3 =
operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(1, 1001, "item1001")
+ .addRow(1, 1002, "item1002")
+ .addRow(1, 1003, "item1003")
+ .addRow(2, 2000, "item2000")
+ .addRow(2, 2001, "item2001")
+ .addRow(2, 2002, "item2002")
+ .withSv2()
+ .build();
+
+ final RowSet.SingleRowSet nonEmptyInputRowSet4 =
operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(3, 3001, "item3001")
+ .addRow(3, 3002, "item3002")
+ .addRow(3, 3003, "item3003")
+ .addRow(4, 4000, "item4000")
+ .addRow(4, 4001, "item4001")
+ .withSv2()
+ .build();
+
+ // Third EMIT boundary batches
+ final RowSet.SingleRowSet nonEmptyInputRowSet5 =
operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(1, 10001, "item10001")
+ .addRow(1, 10002, "item10002")
+ .addRow(1, 10003, "item10003")
+ .withSv2()
+ .build();
+
+ // First EMIT boundary expected row sets
+ final RowSet expectedRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(1, 100, "item100")
+ .addRow(1, 101, "item101")
+ .addRow(2, 200, "item200")
+ .addRow(2, 201, "item201")
+ .build();
+
+ // Second EMIT boundary expected row sets
+ final RowSet expectedRowSet3 = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(1, 1001, "item1001")
+ .addRow(1, 1002, "item1002")
+ .addRow(2, 2000, "item2000")
+ .addRow(2, 2001, "item2001")
+ .build();
+
+ final RowSet expectedRowSet4 = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(3, 3001, "item3001")
+ .addRow(3, 3002, "item3002")
+ .addRow(4, 4000, "item4000")
+ .addRow(4, 4001, "item4001")
+ .build();
+
+ // Third EMIT boundary expected row sets
+ final RowSet expectedRowSet5 = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(1, 10001, "item10001")
+ .addRow(1, 10002, "item10002")
+ .build();
+
+ inputContainer.add(emptyWithSv2.container());
+ inputContainer.add(nonEmptyInputRowSet2.container());
+ inputContainer.add(nonEmptyInputRowSet3.container());
+ inputContainer.add(nonEmptyInputRowSet4.container());
+ inputContainer.add(nonEmptyInputRowSet5.container());
+ inputContainer.add(emptyWithSv2.container());
+
+ inputContainerSv2.add(emptyWithSv2.getSv2());
+ inputContainerSv2.add(nonEmptyInputRowSet2.getSv2());
+ inputContainerSv2.add(nonEmptyInputRowSet3.getSv2());
+ inputContainerSv2.add(nonEmptyInputRowSet4.getSv2());
+ inputContainerSv2.add(nonEmptyInputRowSet5.getSv2());
+ inputContainerSv2.add(emptyWithSv2.getSv2());
+
+ inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+ inputOutcomes.add(RecordBatch.IterOutcome.OK);
+ inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+ inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+ inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+ expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+ expectedOutcomes.add(RecordBatch.IterOutcome.OK);
+ expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+ expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+ expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+ expectedRecordCounts.add(0);
+ expectedRecordCounts.add(expectedRowSet2.rowCount());
+ expectedRecordCounts.add(expectedRowSet3.rowCount());
+ expectedRecordCounts.add(expectedRowSet4.rowCount());
+ expectedRecordCounts.add(expectedRowSet5.rowCount());
+ expectedRecordCounts.add(0);
+
+ expectedRowSets.add(expectedRowSet2);
+ expectedRowSets.add(expectedRowSet3);
+ expectedRowSets.add(expectedRowSet4);
+ expectedRowSets.add(expectedRowSet5);
+
+ testPartitionLimitCommon(-5, 2);
+ }
+
+ /**
+ * Verifies {@link PartitionLimitRecordBatch} behaves correctly across EMIT
boundary with single or multiple
+ * batches (with sv2) within each EMIT boundary. It resets it states
correctly across EMIT boundary and then
+ * operates on all the batches within EMIT boundary at a time.
+ */
+ @Test
+ public void
testPartitionLimit_MultipleEmit_SingleMultipleBatch_WithSV2_FilteredRows() {
+ final RowSet.SingleRowSet emptyWithSv2 =
operatorFixture.rowSetBuilder(inputSchema)
+ .withSv2()
+ .build();
+
+ final RowSet.SingleRowSet nonEmptyInputRowSet2 =
operatorFixture.rowSetBuilder(inputSchema)
+ .addSelection(false, 1, 100, "item100")
+ .addSelection(true, 1, 101, "item101")
+ .addSelection(false, 1, 102, "item102")
+ .addSelection(true, 1, 103, "item103")
+ .addSelection(false, 2, 200, "item200")
+ .addSelection(true, 2, 201, "item201")
+ .addSelection(true, 2, 202, "item202")
+ .withSv2()
+ .build();
+
+ // Second EMIT boundary batches
+ final RowSet.SingleRowSet nonEmptyInputRowSet3 =
operatorFixture.rowSetBuilder(inputSchema)
+ .addSelection(false, 1, 1001, "item1001")
+ .addSelection(true, 1, 1002, "item1002")
+ .addSelection(true, 1, 1003, "item1003")
+ .addSelection(true, 2, 2000, "item2000")
+ .addSelection(false, 2, 2001, "item2001")
+ .addSelection(true, 2, 2002, "item2002")
+ .withSv2()
+ .build();
+
+ final RowSet.SingleRowSet nonEmptyInputRowSet4 =
operatorFixture.rowSetBuilder(inputSchema)
+ .addSelection(true, 3, 3001, "item3001")
+ .addSelection(false, 3, 3002, "item3002")
+ .addSelection(true, 3, 3003, "item3003")
+ .addSelection(true, 4, 4000, "item4000")
+ .addSelection(true, 4, 4001, "item4001")
+ .withSv2()
+ .build();
+
+ // Third EMIT boundary batches
+ final RowSet.SingleRowSet nonEmptyInputRowSet5 =
operatorFixture.rowSetBuilder(inputSchema)
+ .addSelection(true, 1, 10001, "item10001")
+ .addSelection(true, 1, 10002, "item10002")
+ .addSelection(false, 1, 10003, "item10003")
+ .withSv2()
+ .build();
+
+ // First EMIT boundary expected row sets
+ final RowSet expectedRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(1, 101, "item101")
+ .addRow(1, 103, "item103")
+ .addRow(2, 201, "item201")
+ .addRow(2, 202, "item202")
+ .build();
+
+ // Second EMIT boundary expected row sets
+ final RowSet expectedRowSet3 = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(1, 1002, "item1002")
+ .addRow(1, 1003, "item1003")
+ .addRow(2, 2000, "item2000")
+ .addRow(2, 2002, "item2002")
+ .build();
+
+ final RowSet expectedRowSet4 = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(3, 3001, "item3001")
+ .addRow(3, 3003, "item3003")
+ .addRow(4, 4000, "item4000")
+ .addRow(4, 4001, "item4001")
+ .build();
+
+ // Third EMIT boundary expected row sets
+ final RowSet expectedRowSet5 = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(1, 10001, "item10001")
+ .addRow(1, 10002, "item10002")
+ .build();
+
+ inputContainer.add(emptyWithSv2.container());
+ inputContainer.add(nonEmptyInputRowSet2.container());
+ inputContainer.add(nonEmptyInputRowSet3.container());
+ inputContainer.add(nonEmptyInputRowSet4.container());
+ inputContainer.add(nonEmptyInputRowSet5.container());
+ inputContainer.add(emptyWithSv2.container());
+
+ inputContainerSv2.add(emptyWithSv2.getSv2());
+ inputContainerSv2.add(nonEmptyInputRowSet2.getSv2());
+ inputContainerSv2.add(nonEmptyInputRowSet3.getSv2());
+ inputContainerSv2.add(nonEmptyInputRowSet4.getSv2());
+ inputContainerSv2.add(nonEmptyInputRowSet5.getSv2());
+ inputContainerSv2.add(emptyWithSv2.getSv2());
+
+ inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+ inputOutcomes.add(RecordBatch.IterOutcome.OK);
+ inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+ inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+ inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+ expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+ expectedOutcomes.add(RecordBatch.IterOutcome.OK);
+ expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+ expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+ expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+ expectedRecordCounts.add(0);
+ expectedRecordCounts.add(expectedRowSet2.rowCount());
+ expectedRecordCounts.add(expectedRowSet3.rowCount());
+ expectedRecordCounts.add(expectedRowSet4.rowCount());
+ expectedRecordCounts.add(expectedRowSet5.rowCount());
+ expectedRecordCounts.add(0);
+
+ expectedRowSets.add(expectedRowSet2);
+ expectedRowSets.add(expectedRowSet3);
+ expectedRowSets.add(expectedRowSet4);
+ expectedRowSets.add(expectedRowSet5);
+
+ testPartitionLimitCommon(-5, 2);
+ }
+}
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java
index e5775bbc4ea..cc9c14a2084 100644
---
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java
+++
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java
@@ -25,7 +25,6 @@
import org.apache.drill.test.ClusterTest;
import org.apache.drill.test.TestBuilder;
import org.junit.BeforeClass;
-import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -82,30 +81,34 @@ public void testLateral_WithFilterAndLimitInSubQuery()
throws Exception {
}
@Test
- @Ignore ("DRILL-6635")
public void testLateral_WithTopNInSubQuery() throws Exception {
+ runAndLog("alter session set `planner.enable_topn`=false");
+
String Sql = "SELECT customer.c_name, orders.o_id, orders.o_amount " +
"FROM cp.`lateraljoin/nested-customer.parquet` customer, LATERAL " +
"(SELECT t.ord.o_id as o_id, t.ord.o_amount as o_amount FROM
UNNEST(customer.orders) t(ord) ORDER BY " +
"o_amount DESC LIMIT 1) orders";
- testBuilder()
- .sqlQuery(Sql)
- .unOrdered()
- .baselineColumns("c_name", "o_id", "o_amount")
- .baselineValues("customer1", 3.0, 294.5)
- .baselineValues("customer2", 10.0, 724.5)
- .baselineValues("customer3", 23.0, 772.2)
- .baselineValues("customer4", 32.0, 1030.1)
- .go();
+ try {
+ testBuilder()
+ .sqlQuery(Sql)
+ .unOrdered()
+ .baselineColumns("c_name", "o_id", "o_amount")
+ .baselineValues("customer1", 3.0, 294.5)
+ .baselineValues("customer2", 10.0, 724.5)
+ .baselineValues("customer3", 23.0, 772.2)
+ .baselineValues("customer4", 32.0, 1030.1)
+ .go();
+ } finally {
+ runAndLog("alter session set `planner.enable_topn`=true");
+ }
}
/**
- * Test which disables the TopN operator from planner settings before
running query using SORT and LIMIT in
+ * Test which disables the TopN operator from planner
settintestLateral_WithTopNInSubQuerygs before running query using SORT and
LIMIT in
* subquery. The same query as in above test is executed and same result is
expected.
*/
@Test
- @Ignore ("DRILL-6635")
public void testLateral_WithSortAndLimitInSubQuery() throws Exception {
runAndLog("alter session set `planner.enable_topn`=false");
@@ -174,7 +177,6 @@ public void testMultiUnnestAtSameLevel() throws Exception {
}
@Test
- @Ignore ("DRILL-6638")
public void testUnnestWithItem() throws Exception {
String sql = "select u.item from\n" +
"cp.`lateraljoin/nested-customer.parquet` c," +
@@ -208,7 +210,6 @@ public void testUnnestWithFunctionCall() throws Exception {
}
@Test
- @Ignore ("DRILL-6638")
public void testUnnestWithMap() throws Exception {
String sql = "select u.item from\n" +
"cp.`lateraljoin/nested-customer.parquet` c," +
@@ -227,7 +228,6 @@ public void testUnnestWithMap() throws Exception {
}
@Test
- @Ignore ("DRILL-6638")
public void testMultiUnnestWithMap() throws Exception {
String sql = "select u.item from\n" +
"cp.`lateraljoin/nested-customer.parquet` c," +
@@ -291,8 +291,9 @@ public void
testMultipleBatchesLateral_WithLimitInSubQuery() throws Exception {
}
@Test
- @Ignore ("DRILL-6635")
public void testMultipleBatchesLateral_WithTopNInSubQuery() throws Exception
{
+ runAndLog("alter session set `planner.enable_topn`=false");
+
String sql = "SELECT customer.c_name, orders.o_orderkey,
orders.o_totalprice " +
"FROM dfs.`lateraljoin/multipleFiles` customer, LATERAL " +
"(SELECT t.ord.o_orderkey as o_orderkey, t.ord.o_totalprice as
o_totalprice FROM UNNEST(customer.c_orders) t(ord)" +
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/MockLateralJoinBatch.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/MockLateralJoinBatch.java
index 3f52351f980..c7105f9c492 100644
---
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/MockLateralJoinBatch.java
+++
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/MockLateralJoinBatch.java
@@ -131,7 +131,9 @@ public IterOutcome next() {
// Pretend that an operator somewhere between lateral and unnest
// wants to terminate processing of the record.
if(unnestLimit > 0 && unnestCount >= unnestLimit) {
- unnest.kill(true);
+ // break here rather than sending kill to unnest since with
partitionLimitBatch kill will never be
+ // sent to unnest from subquery
+ break;
}
}
return currentOutcome;
diff --git
a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
index a8574f5aa6a..77bf211b662 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
@@ -581,6 +581,10 @@ private FragmentState(int index, int value) {
* <code>SEQUENCE_SUB_SCAN = 53;</code>
*/
SEQUENCE_SUB_SCAN(53, 53),
+ /**
+ * <code>PARTITION_LIMIT = 54;</code>
+ */
+ PARTITION_LIMIT(54, 54),
;
/**
@@ -799,6 +803,10 @@ private FragmentState(int index, int value) {
* <code>SEQUENCE_SUB_SCAN = 53;</code>
*/
public static final int SEQUENCE_SUB_SCAN_VALUE = 53;
+ /**
+ * <code>PARTITION_LIMIT = 54;</code>
+ */
+ public static final int PARTITION_LIMIT_VALUE = 54;
public final int getNumber() { return value; }
@@ -859,6 +867,7 @@ public static CoreOperatorType valueOf(int value) {
case 51: return HTPPD_LOG_SUB_SCAN;
case 52: return IMAGE_SUB_SCAN;
case 53: return SEQUENCE_SUB_SCAN;
+ case 54: return PARTITION_LIMIT;
default: return null;
}
}
@@ -24395,7 +24404,7 @@ public Builder clearStatus() {
"TATEMENT\020\005*\207\001\n\rFragmentState\022\013\n\007SENDING\020" +
"\000\022\027\n\023AWAITING_ALLOCATION\020\001\022\013\n\007RUNNING\020\002\022"
+
"\014\n\010FINISHED\020\003\022\r\n\tCANCELLED\020\004\022\n\n\006FAILED\020\005"
+
- "\022\032\n\026CANCELLATION_REQUESTED\020\006*\271\010\n\020CoreOpe" +
+ "\022\032\n\026CANCELLATION_REQUESTED\020\006*\316\010\n\020CoreOpe" +
"ratorType\022\021\n\rSINGLE_SENDER\020\000\022\024\n\020BROADCAS" +
"T_SENDER\020\001\022\n\n\006FILTER\020\002\022\022\n\016HASH_AGGREGATE"
+
"\020\003\022\r\n\tHASH_JOIN\020\004\022\016\n\nMERGE_JOIN\020\005\022\031\n\025HAS"
+
@@ -24422,11 +24431,12 @@ public Builder clearStatus() {
"_SCAN\020.\022\022\n\016MONGO_SUB_SCAN\020/\022\017\n\013KUDU_WRIT" +
"ER\0200\022\026\n\022OPEN_TSDB_SUB_SCAN\0201\022\017\n\013JSON_WRI" +
"TER\0202\022\026\n\022HTPPD_LOG_SUB_SCAN\0203\022\022\n\016IMAGE_S",
- "UB_SCAN\0204\022\025\n\021SEQUENCE_SUB_SCAN\0205*g\n\nSasl" +
- "Status\022\020\n\014SASL_UNKNOWN\020\000\022\016\n\nSASL_START\020\001"
+
-
"\022\024\n\020SASL_IN_PROGRESS\020\002\022\020\n\014SASL_SUCCESS\020\003" +
- "\022\017\n\013SASL_FAILED\020\004B.\n\033org.apache.drill.ex" +
- "ec.protoB\rUserBitSharedH\001"
+ "UB_SCAN\0204\022\025\n\021SEQUENCE_SUB_SCAN\0205\022\023\n\017PART" +
+ "ITION_LIMIT\0206*g\n\nSaslStatus\022\020\n\014SASL_UNKN" +
+ "OWN\020\000\022\016\n\nSASL_START\020\001\022\024\n\020SASL_IN_PROGRES"
+
+
"S\020\002\022\020\n\014SASL_SUCCESS\020\003\022\017\n\013SASL_FAILED\020\004B."
+
+ "\n\033org.apache.drill.exec.protoB\rUserBitSh" +
+ "aredH\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner
assigner =
new
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
diff --git
a/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java
b/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java
index 53af5719015..38ac50e2d29 100644
---
a/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java
+++
b/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java
@@ -75,7 +75,8 @@
JSON_WRITER(50),
HTPPD_LOG_SUB_SCAN(51),
IMAGE_SUB_SCAN(52),
- SEQUENCE_SUB_SCAN(53);
+ SEQUENCE_SUB_SCAN(53),
+ PARTITION_LIMIT(54);
public final int number;
@@ -147,6 +148,7 @@ public static CoreOperatorType valueOf(int number)
case 51: return HTPPD_LOG_SUB_SCAN;
case 52: return IMAGE_SUB_SCAN;
case 53: return SEQUENCE_SUB_SCAN;
+ case 54: return PARTITION_LIMIT;
default: return null;
}
}
diff --git a/protocol/src/main/protobuf/UserBitShared.proto
b/protocol/src/main/protobuf/UserBitShared.proto
index 4c4960ec2e2..65ebe0b7076 100644
--- a/protocol/src/main/protobuf/UserBitShared.proto
+++ b/protocol/src/main/protobuf/UserBitShared.proto
@@ -342,6 +342,7 @@ enum CoreOperatorType {
HTPPD_LOG_SUB_SCAN = 51;
IMAGE_SUB_SCAN = 52;
SEQUENCE_SUB_SCAN = 53;
+ PARTITION_LIMIT = 54;
}
/* Registry that contains list of jars, each jar contains its name and list of
function signatures.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> PartitionLimit changes for Lateral and Unnest
> ---------------------------------------------
>
> Key: DRILL-6652
> URL: https://issues.apache.org/jira/browse/DRILL-6652
> Project: Apache Drill
> Issue Type: Task
> Components: Execution - Relational Operators, Query Planning &
> Optimization
> Affects Versions: 1.14.0
> Reporter: Sorabh Hamirwasia
> Assignee: Sorabh Hamirwasia
> Priority: Major
> Labels: ready-to-commit
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)