DRILL-6327: Update unary operators to handle IterOutcome.EMIT
Note: Handles for Non-Blocking Unary operators (like
Filter/Project/etc) with EMIT Iter.Outcome
closes #1240
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/2f275d17
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/2f275d17
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/2f275d17
Branch: refs/heads/master
Commit: 2f275d1723a9e91acb94a359c4db2770385aac93
Parents: f563f38
Author: Sorabh Hamirwasia <[email protected]>
Authored: Wed Apr 4 17:54:58 2018 -0700
Committer: Vitalii Diravka <[email protected]>
Committed: Sun Apr 29 23:20:55 2018 +0300
----------------------------------------------------------------------
.../physical/impl/filter/FilterRecordBatch.java | 5 +-
.../physical/impl/filter/FilterTemplate2.java | 1 +
.../impl/flatten/FlattenRecordBatch.java | 8 +-
.../physical/impl/join/LateralJoinBatch.java | 2 +-
.../physical/impl/limit/LimitRecordBatch.java | 203 ++++++++++-----
.../impl/project/ProjectRecordBatch.java | 61 +++--
.../impl/svremover/RemovingRecordBatch.java | 7 +-
.../physical/impl/trace/TraceRecordBatch.java | 2 +-
.../IteratorValidatorBatchIterator.java | 2 +
.../exec/record/AbstractSingleRecordBatch.java | 18 ++
.../exec/record/AbstractUnaryRecordBatch.java | 25 +-
.../impl/BaseTestOpBatchEmitOutcome.java | 84 ++++++
.../exec/physical/impl/MockRecordBatch.java | 13 +-
.../impl/filter/TestFilterBatchEmitOutcome.java | 218 ++++++++++++++++
.../impl/limit/TestLimitBatchEmitOutcome.java | 258 +++++++++++++++++++
.../physical/impl/limit/TestLimitOperator.java | 131 ++++++++++
.../impl/project/TestProjectEmitOutcome.java | 220 ++++++++++++++++
17 files changed, 1157 insertions(+), 101 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/2f275d17/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
----------------------------------------------------------------------
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
index f0b832a..ac6d99f 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
@@ -81,7 +81,7 @@ public class FilterRecordBatch extends
AbstractSingleRecordBatch<Filter> {
throw new UnsupportedOperationException(e);
}
- return IterOutcome.OK;
+ return getFinalOutcome(false);
}
@Override
@@ -168,6 +168,9 @@ public class FilterRecordBatch extends
AbstractSingleRecordBatch<Filter> {
final ErrorCollector collector = new ErrorCollectorImpl();
final List<TransferPair> transfers = Lists.newArrayList();
final ClassGenerator<Filterer> cg =
CodeGenerator.getRoot(Filterer.TEMPLATE_DEFINITION2, context.getOptions());
+ // Uncomment below lines to enable saving generated code file for debugging
+ // cg.getCodeGenerator().plainJavaCapable(true);
+ // cg.getCodeGenerator().saveCodeForDebugging(true);
final LogicalExpression expr =
ExpressionTreeMaterializer.materialize(popConfig.getExpr(), incoming, collector,
context.getFunctionRegistry(), false, unionTypeEnabled);
http://git-wip-us.apache.org/repos/asf/drill/blob/2f275d17/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java
----------------------------------------------------------------------
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java
index 52533bd..6d1f034 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java
@@ -61,6 +61,7 @@ public abstract class FilterTemplate2 implements Filterer {
@Override
public void filterBatch(int recordCount) throws SchemaChangeException{
if (recordCount == 0) {
+ outgoingSelectionVector.setRecordCount(0);
return;
}
if (! outgoingSelectionVector.allocateNewSafe(recordCount)) {
http://git-wip-us.apache.org/repos/asf/drill/blob/2f275d17/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
----------------------------------------------------------------------
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
index bbe9f76..d57246d 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
@@ -191,7 +191,8 @@ public class FlattenRecordBatch extends
AbstractSingleRecordBatch<FlattenPOP> {
public IterOutcome innerNext() {
if (hasRemainder) {
handleRemainder();
- return IterOutcome.OK;
+ // Check if we are supposed to return EMIT outcome and have consumed
entire batch
+ return getFinalOutcome(hasRemainder);
}
return super.innerNext();
}
@@ -261,7 +262,10 @@ public class FlattenRecordBatch extends
AbstractSingleRecordBatch<FlattenPOP> {
}
flattenMemoryManager.updateOutgoingStats(outputRecords);
- return IterOutcome.OK;
+
+ // Get the final outcome based on hasRemainder since that will determine
if all the incoming records were
+ // consumed in current output batch or not
+ return getFinalOutcome(hasRemainder);
}
private void handleRemainder() {
http://git-wip-us.apache.org/repos/asf/drill/blob/2f275d17/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
----------------------------------------------------------------------
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
index 295ee78..6425b29 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
@@ -718,7 +718,7 @@ public class LateralJoinBatch extends
AbstractBinaryRecordBatch<LateralJoinPOP>
/**
* Simple method to allocate space for all the vectors in the container.
*/
- private void allocateVectors() {;
+ private void allocateVectors() {
for (VectorWrapper w : container) {
RecordBatchSizer.ColumnSize colSize =
batchMemoryManager.getColumnSize(w.getField().getName());
colSize.allocateVector(w.getValueVector(), maxOutputRowCount);
http://git-wip-us.apache.org/repos/asf/drill/blob/2f275d17/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
----------------------------------------------------------------------
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
index f5443da..5888c34 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.physical.impl.limit;
import java.util.List;
+import com.google.common.base.Preconditions;
import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.ops.FragmentContext;
@@ -32,15 +33,18 @@ import
org.apache.drill.exec.record.selection.SelectionVector2;
import com.google.common.collect.Lists;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
+
public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
// private static final org.slf4j.Logger logger =
org.slf4j.LoggerFactory.getLogger(LimitRecordBatch.class);
private SelectionVector2 outgoingSv;
private SelectionVector2 incomingSv;
- private int recordsToSkip;
- private int recordsLeft;
- private final boolean noEndLimit;
- private boolean skipBatch;
+
+ // Start offset of the records
+ private int recordStartOffset;
+ private int numberOfRecords;
private boolean first = true;
private final List<TransferPair> transfers = Lists.newArrayList();
@@ -48,12 +52,55 @@ public class LimitRecordBatch extends
AbstractSingleRecordBatch<Limit> {
throws OutOfMemoryException {
super(popConfig, context, incoming);
outgoingSv = new SelectionVector2(oContext.getAllocator());
- recordsToSkip = popConfig.getFirst();
- noEndLimit = popConfig.getLast() == null;
- if(!noEndLimit) {
- recordsLeft = popConfig.getLast() - recordsToSkip;
- }
- skipBatch = false;
+ refreshLimitState();
+ }
+
+ @Override
+ public IterOutcome innerNext() {
+ if (!first && !needMoreRecords(numberOfRecords)) {
+ outgoingSv.setRecordCount(0);
+ incoming.kill(true);
+
+ IterOutcome upStream = next(incoming);
+ if (upStream == IterOutcome.OUT_OF_MEMORY) {
+ return upStream;
+ }
+
+ while (upStream == IterOutcome.OK || upStream ==
IterOutcome.OK_NEW_SCHEMA) {
+ // Clear the memory for the incoming batch
+ for (VectorWrapper<?> wrapper : incoming) {
+ wrapper.getValueVector().clear();
+ }
+ upStream = next(incoming);
+ if (upStream == IterOutcome.OUT_OF_MEMORY) {
+ return upStream;
+ }
+ }
+ // If EMIT that means leaf operator is UNNEST, in this case refresh
the limit states and return EMIT.
+ if (upStream == EMIT) {
+ refreshLimitState();
+ return upStream;
+ }
+ // other leaf operator behave as before.
+ return NONE;
+ }
+ return super.innerNext();
+ }
+
+ @Override
+ public SelectionVector2 getSelectionVector2() {
+ return outgoingSv;
+ }
+
+ @Override
+ public int getRecordCount() {
+ return outgoingSv.getCount();
+ }
+
+ @Override
+ public void close() {
+ outgoingSv.clear();
+ super.close();
}
@Override
@@ -61,10 +108,9 @@ public class LimitRecordBatch extends
AbstractSingleRecordBatch<Limit> {
container.zeroVectors();
transfers.clear();
-
for(final VectorWrapper<?> v : incoming) {
final TransferPair pair = v.getValueVector().makeTransferPair(
- container.addOrGet(v.getField(), callBack));
+ container.addOrGet(v.getField(), callBack));
transfers.add(pair);
}
@@ -88,36 +134,22 @@ public class LimitRecordBatch extends
AbstractSingleRecordBatch<Limit> {
return false;
}
+ /**
+ * Gets the outcome to return from super implementation and then in case of
EMIT outcome it refreshes the state of
+ * operator. Refresh is done to again apply limit on all the future incoming
batches which will be part of next
+ * record boundary.
+ * @param hasRemainder
+ * @return - IterOutcome to send downstream
+ */
@Override
- public IterOutcome innerNext() {
- if(!first && !noEndLimit && recordsLeft <= 0) {
- incoming.kill(true);
-
- IterOutcome upStream = next(incoming);
- if (upStream == IterOutcome.OUT_OF_MEMORY) {
- return upStream;
- }
+ protected IterOutcome getFinalOutcome(boolean hasRemainder) {
+ final IterOutcome outcomeToReturn = super.getFinalOutcome(hasRemainder);
- while (upStream == IterOutcome.OK || upStream ==
IterOutcome.OK_NEW_SCHEMA) {
- // Clear the memory for the incoming batch
- for (VectorWrapper<?> wrapper : incoming) {
- wrapper.getValueVector().clear();
- }
- upStream = next(incoming);
- if (upStream == IterOutcome.OUT_OF_MEMORY) {
- return upStream;
- }
- }
-
- return IterOutcome.NONE;
+ // EMIT outcome means leaf operator is UNNEST, hence refresh the state no
matter limit is reached or not.
+ if (outcomeToReturn == EMIT) {
+ refreshLimitState();
}
-
- return super.innerNext();
- }
-
- @Override
- public SelectionVector2 getSelectionVector2() {
- return outgoingSv;
+ return outcomeToReturn;
}
@Override
@@ -125,40 +157,47 @@ public class LimitRecordBatch extends
AbstractSingleRecordBatch<Limit> {
if (first) {
first = false;
}
- skipBatch = false;
- final int recordCount = incoming.getRecordCount();
- if (recordCount == 0) {
- skipBatch = true;
- return IterOutcome.OK;
+ final int inputRecordCount = incoming.getRecordCount();
+ if (inputRecordCount == 0) {
+ setOutgoingRecordCount(0);
+ return getFinalOutcome(false);
}
+
for(final TransferPair tp : transfers) {
tp.transfer();
}
- if (recordCount <= recordsToSkip) {
- recordsToSkip -= recordCount;
- skipBatch = true;
+ // Check if current input record count is less than start offset. If yes
then adjust the start offset since we
+ // have to ignore all these records and return empty batch.
+ if (inputRecordCount <= recordStartOffset) {
+ recordStartOffset -= inputRecordCount;
+ setOutgoingRecordCount(0);
} else {
- outgoingSv.allocateNew(recordCount);
- limit(recordCount);
+ // Allocate SV2 vectors for the record count size since we transfer all
the vectors buffer from input record
+ // batch to output record batch and later an SV2Remover copies the
needed records.
+ outgoingSv.allocateNew(inputRecordCount);
+ limit(inputRecordCount);
}
-
- return IterOutcome.OK;
+ return getFinalOutcome(false);
}
- private void limit(int recordCount) {
- final int offset = Math.max(0, Math.min(recordCount - 1, recordsToSkip));
- recordsToSkip -= offset;
- int fetch;
-
- if(noEndLimit) {
- fetch = recordCount;
+ /**
+ * limit call when incoming batch has number of records more than the start
offset such that it can produce some
+ * output records. After first call of this method recordStartOffset should
be 0 since we have already skipped the
+ * required number of records as part of first incoming record batch.
+ * @param inputRecordCount - number of records in incoming batch
+ */
+ private void limit(int inputRecordCount) {
+ int endRecordIndex;
+
+ if (numberOfRecords == Integer.MIN_VALUE) {
+ endRecordIndex = inputRecordCount;
} else {
- fetch = Math.min(recordCount, offset + recordsLeft);
- recordsLeft -= Math.max(0, fetch - offset);
+ endRecordIndex = Math.min(inputRecordCount, recordStartOffset +
numberOfRecords);
+ numberOfRecords -= Math.max(0, endRecordIndex - recordStartOffset);
}
int svIndex = 0;
- for(int i = offset; i < fetch; svIndex++, i++) {
+ for(int i = recordStartOffset; i < endRecordIndex; svIndex++, i++) {
if (incomingSv != null) {
outgoingSv.setIndex(svIndex, incomingSv.getIndex(i));
} else {
@@ -166,16 +205,44 @@ public class LimitRecordBatch extends
AbstractSingleRecordBatch<Limit> {
}
}
outgoingSv.setRecordCount(svIndex);
+ // Update the start offset
+ recordStartOffset = 0;
}
- @Override
- public int getRecordCount() {
- return skipBatch ? 0 : outgoingSv.getCount();
+ private void setOutgoingRecordCount(int outputCount) {
+ outgoingSv.setRecordCount(outputCount);
}
- @Override
- public void close() {
- outgoingSv.clear();
- super.close();
+ /**
+ * Method which returns if more output records are needed from LIMIT
operator. When numberOfRecords is set to
+ * {@link Integer#MIN_VALUE} that means there is no end bound on LIMIT, so
get all the records past start offset.
+ * @return - true - more output records is expected.
+ * false - limit bound is reached and no more record is expected
+ */
+ private boolean needMoreRecords(int recordsToRead) {
+ boolean readMore = true;
+
+ Preconditions.checkState(recordsToRead == Integer.MIN_VALUE ||
recordsToRead >= 0,
+ String.format("Invalid value of numberOfRecords %d inside
LimitRecordBatch", recordsToRead));
+
+ // Above check makes sure that either numberOfRecords has no bound or if
it has bounds then either we have read
+ // all the records or still left to read some.
+ // Below check just verifies if there is bound on numberOfRecords and we
have read all of it.
+ if (recordsToRead == 0) {
+ readMore = false;
+ }
+ return readMore;
+ }
+
+ /**
+ * Reset the states for recordStartOffset and numberOfRecords based on the
popConfig passed to the operator.
+ * This method is called for the outcome EMIT no matter if limit is reached
or not.
+ */
+ private void refreshLimitState() {
+ // Make sure startOffset is non-negative
+ recordStartOffset = Math.max(0, popConfig.getFirst());
+ numberOfRecords = (popConfig.getLast() == null) ?
+ Integer.MIN_VALUE : Math.max(0, popConfig.getLast()) - recordStartOffset;
+ first = true;
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/2f275d17/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
----------------------------------------------------------------------
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
index a96dfe1..eab9007 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -71,6 +71,8 @@ import java.io.IOException;
import java.util.HashMap;
import java.util.List;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
+
public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
static final org.slf4j.Logger logger =
org.slf4j.LoggerFactory.getLogger(ProjectRecordBatch.class);
private Projector projector;
@@ -129,7 +131,8 @@ public class ProjectRecordBatch extends
AbstractSingleRecordBatch<Project> {
recordCount = 0;
if (hasRemainder) {
handleRemainder();
- return IterOutcome.OK;
+ // Check if we are supposed to return EMIT outcome and have consumed
entire batch
+ return getFinalOutcome(hasRemainder);
}
return super.innerNext();
}
@@ -151,7 +154,14 @@ public class ProjectRecordBatch extends
AbstractSingleRecordBatch<Project> {
if (complexWriters != null) {
IterOutcome next = null;
while (incomingRecordCount == 0) {
+ if (getLastKnownOutcome() == EMIT) {
+ throw new UnsupportedOperationException("Currently functions
producing complex types as output is not " +
+ "supported in project list for subquery between LATERAL and
UNNEST. Please re-write the query using this " +
+ "function in the projection list of outermost query.");
+ }
+
next = next(incoming);
+ setLastKnownOutcome(next);
if (next == IterOutcome.OUT_OF_MEMORY) {
outOfMemory = true;
return next;
@@ -166,28 +176,34 @@ public class ProjectRecordBatch extends
AbstractSingleRecordBatch<Project> {
// Only need to add the schema for the complex exprs because
others should already have
// been setup during setupNewSchema
for (FieldReference fieldReference : complexFieldReferencesList) {
- MaterializedField field =
MaterializedField.create(fieldReference.getAsNamePart().getName(),
UntypedNullHolder.TYPE);
+ MaterializedField field =
MaterializedField.create(fieldReference.getAsNamePart().getName(),
+ UntypedNullHolder.TYPE);
container.add(new UntypedNullVector(field,
container.getAllocator()));
}
container.buildSchema(SelectionVectorMode.NONE);
wasNone = true;
return IterOutcome.OK_NEW_SCHEMA;
- } else if (next != IterOutcome.OK && next !=
IterOutcome.OK_NEW_SCHEMA) {
+ } else if (next != IterOutcome.OK && next !=
IterOutcome.OK_NEW_SCHEMA && next != EMIT) {
return next;
+ } else if (next == IterOutcome.OK_NEW_SCHEMA) {
+ try {
+ setupNewSchema();
+ } catch (final SchemaChangeException e) {
+ throw new RuntimeException(e);
+ }
}
incomingRecordCount = incoming.getRecordCount();
}
- if (next == IterOutcome.OK_NEW_SCHEMA) {
- try {
- setupNewSchema();
- } catch (final SchemaChangeException e) {
- throw new RuntimeException(e);
- }
- }
}
}
- first = false;
+ if (complexWriters != null && getLastKnownOutcome() == EMIT) {
+ throw new UnsupportedOperationException("Currently functions producing
complex types as output is not " +
+ "supported in project list for subquery between LATERAL and UNNEST.
Please re-write the query using this " +
+ "function in the projection list of outermost query.");
+ }
+
+ first = false;
container.zeroVectors();
if (!doAlloc(incomingRecordCount)) {
@@ -214,7 +230,9 @@ public class ProjectRecordBatch extends
AbstractSingleRecordBatch<Project> {
container.buildSchema(SelectionVectorMode.NONE);
}
- return IterOutcome.OK;
+ // Get the final outcome based on hasRemainder since that will determine
if all the incoming records were
+ // consumed in current output batch or not
+ return getFinalOutcome(hasRemainder);
}
private void handleRemainder() {
@@ -310,11 +328,18 @@ public class ProjectRecordBatch extends
AbstractSingleRecordBatch<Project> {
}
}
this.allocationVectors = Lists.newArrayList();
+
if (complexWriters != null) {
container.clear();
} else {
+ // Not clearing the container here is fine since Project output schema
is not determined solely based on incoming
+ // batch. It is defined by the expressions it has to evaluate.
+ //
+ // If there is a case where only the type of ValueVector already present
in container is changed then addOrGet
+ // method takes care of it by replacing the vectors.
container.zeroVectors();
}
+
final List<NamedExpression> exprs = getExpressionList();
final ErrorCollector collector = new ErrorCollectorImpl();
final List<TransferPair> transfers = Lists.newArrayList();
@@ -357,7 +382,8 @@ public class ProjectRecordBatch extends
AbstractSingleRecordBatch<Project> {
}
final FieldReference ref = new FieldReference(name);
- final ValueVector vvOut =
container.addOrGet(MaterializedField.create(ref.getAsNamePart().getName(),
vvIn.getField().getType()), callBack);
+ final ValueVector vvOut =
container.addOrGet(MaterializedField.create(ref.getAsNamePart().getName(),
+ vvIn.getField().getType()), callBack);
final TransferPair tp = vvIn.makeTransferPair(vvOut);
transfers.add(tp);
}
@@ -436,8 +462,9 @@ public class ProjectRecordBatch extends
AbstractSingleRecordBatch<Project> {
Preconditions.checkNotNull(incomingBatch);
final FieldReference ref = getRef(namedExpression);
- final ValueVector vvOut =
container.addOrGet(MaterializedField.create(ref.getLastSegment().getNameSegment().getPath(),
-
vectorRead.getMajorType()), callBack);
+ final ValueVector vvOut =
+
container.addOrGet(MaterializedField.create(ref.getLastSegment().getNameSegment().getPath(),
+ vectorRead.getMajorType()), callBack);
final TransferPair tp = vvIn.makeTransferPair(vvOut);
transfers.add(tp);
transferFieldIds.add(vectorRead.getFieldId().getFieldIds()[0]);
@@ -456,7 +483,10 @@ public class ProjectRecordBatch extends
AbstractSingleRecordBatch<Project> {
cg.addExpr(expr, ClassGenerator.BlkCreateMode.TRUE_IF_BOUND);
if (complexFieldReferencesList == null) {
complexFieldReferencesList = Lists.newArrayList();
+ } else {
+ complexFieldReferencesList.clear();
}
+
// save the field reference for later for getting schema when input is
empty
complexFieldReferencesList.add(namedExpression.getRef());
} else {
@@ -813,5 +843,4 @@ public class ProjectRecordBatch extends
AbstractSingleRecordBatch<Project> {
wasNone = true;
return IterOutcome.OK_NEW_SCHEMA;
}
-
}
http://git-wip-us.apache.org/repos/asf/drill/blob/2f275d17/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
----------------------------------------------------------------------
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
index 08ca029..a4207b0 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
@@ -76,11 +76,6 @@ public class RemovingRecordBatch extends
AbstractSingleRecordBatch<SelectionVect
}
@Override
- public IterOutcome innerNext() {
- return super.innerNext();
- }
-
- @Override
protected IterOutcome doWork() {
try {
copier.copyRecords(0, incoming.getRecordCount());
@@ -99,7 +94,7 @@ public class RemovingRecordBatch extends
AbstractSingleRecordBatch<SelectionVect
logger.debug("doWork(): {} records copied out of {}, incoming schema {} ",
container.getRecordCount(), container.getRecordCount(),
incoming.getSchema());
- return IterOutcome.OK;
+ return getFinalOutcome(false);
}
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/2f275d17/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
----------------------------------------------------------------------
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
index 61d3214..50cb26b 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
@@ -122,7 +122,7 @@ public class TraceRecordBatch extends
AbstractSingleRecordBatch<Trace> {
if (incomingHasSv2) {
sv = wrap.getSv2();
}
- return IterOutcome.OK;
+ return getFinalOutcome(false);
}
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/2f275d17/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
----------------------------------------------------------------------
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
index e75619e..05eb545 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
@@ -140,6 +140,7 @@ public class IteratorValidatorBatchIterator implements
CloseableRecordBatch {
case OK:
case OK_NEW_SCHEMA:
case NONE:
+ case EMIT:
return;
default:
throw new IllegalStateException(
@@ -240,6 +241,7 @@ public class IteratorValidatorBatchIterator implements
CloseableRecordBatch {
validateBatch();
break;
case OK:
+ case EMIT:
// OK is allowed as long as OK_NEW_SCHEMA was seen, except if
terminated
// (checked above).
if (validationState != ValidationState.HAVE_SCHEMA) {
http://git-wip-us.apache.org/repos/asf/drill/blob/2f275d17/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
----------------------------------------------------------------------
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
index 1a04b40..c8e2bda 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
@@ -21,6 +21,7 @@ import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.base.PhysicalOperator;
+
/**
* Implements an AbstractUnaryRecordBatch where the inoming record batch is
known at the time of creation
* @param <T>
@@ -40,4 +41,21 @@ public abstract class AbstractSingleRecordBatch<T extends
PhysicalOperator> exte
return incoming;
}
+ /**
+ * Based on lastKnownOutcome and if there are more records to be output for
current record boundary detected by
+ * EMIT outcome, this method returns EMIT or OK outcome.
+ * @param hasMoreRecordInBoundary
+ * @return - EMIT - If the lastknownOutcome was EMIT and output records
corresponding to all the incoming records in
+ * current record boundary is already produced.
+ * - OK - otherwise
+ */
+ protected IterOutcome getFinalOutcome(boolean hasMoreRecordInBoundary) {
+ final IterOutcome lastOutcome = getLastKnownOutcome();
+ final boolean isLastOutcomeEmit = (IterOutcome.EMIT == lastOutcome);
+ if (isLastOutcomeEmit && !hasMoreRecordInBoundary) {
+ setLastKnownOutcome(IterOutcome.OK);
+ return IterOutcome.EMIT;
+ }
+ return IterOutcome.OK;
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/2f275d17/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractUnaryRecordBatch.java
----------------------------------------------------------------------
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractUnaryRecordBatch.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractUnaryRecordBatch.java
index e941405..ec34344 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractUnaryRecordBatch.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractUnaryRecordBatch.java
@@ -17,7 +17,6 @@
*/
package org.apache.drill.exec.record;
-import com.google.common.base.Preconditions;
import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.ops.FragmentContext;
@@ -39,6 +38,7 @@ public abstract class AbstractUnaryRecordBatch<T extends
PhysicalOperator> exten
protected boolean outOfMemory = false;
protected SchemaChangeCallBack callBack = new SchemaChangeCallBack();
+ private IterOutcome lastKnownOutcome;
public AbstractUnaryRecordBatch(T popConfig, FragmentContext context) throws
OutOfMemoryException {
super(popConfig, context, false);
@@ -68,9 +68,16 @@ public abstract class AbstractUnaryRecordBatch<T extends
PhysicalOperator> exten
}
} while ((upstream = next(incoming)) == IterOutcome.OK &&
incoming.getRecordCount() == 0);
}
- if ((state == BatchState.FIRST) && upstream == IterOutcome.OK) {
- upstream = IterOutcome.OK_NEW_SCHEMA;
+ if (state == BatchState.FIRST) {
+ if (upstream == IterOutcome.OK) {
+ upstream = IterOutcome.OK_NEW_SCHEMA;
+ } else if (upstream == IterOutcome.EMIT) {
+ throw new IllegalStateException("Received first batch with unexpected
EMIT IterOutcome");
+ }
}
+
+ // update the last outcome seen
+ lastKnownOutcome = upstream;
switch (upstream) {
case NONE:
if (state == BatchState.FIRST) {
@@ -104,6 +111,7 @@ public abstract class AbstractUnaryRecordBatch<T extends
PhysicalOperator> exten
}
// fall through.
case OK:
+ case EMIT:
assert state != BatchState.FIRST : "First batch should be
OK_NEW_SCHEMA";
container.zeroVectors();
IterOutcome out = doWork();
@@ -164,4 +172,15 @@ public abstract class AbstractUnaryRecordBatch<T extends
PhysicalOperator> exten
return IterOutcome.NONE;
}
+ protected IterOutcome getLastKnownOutcome() {
+ return lastKnownOutcome;
+ }
+
+ /**
+ * Set's the outcome received with current input batch in processing
+ * @param outcome
+ */
+ protected void setLastKnownOutcome(IterOutcome outcome) {
+ lastKnownOutcome = outcome;
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/2f275d17/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/BaseTestOpBatchEmitOutcome.java
----------------------------------------------------------------------
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/BaseTestOpBatchEmitOutcome.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/BaseTestOpBatchEmitOutcome.java
new file mode 100644
index 0000000..cd24640
--- /dev/null
+++
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/BaseTestOpBatchEmitOutcome.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl;
+
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.unit.PhysicalOpUnitTestBase;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.mock.MockStorePOP;
+import org.apache.drill.test.rowSet.RowSet;
+import org.apache.drill.test.rowSet.schema.SchemaBuilder;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class BaseTestOpBatchEmitOutcome extends PhysicalOpUnitTestBase {
+ //private static final org.slf4j.Logger logger =
org.slf4j.LoggerFactory.getLogger(BaseTestOpBatchEmitOutcome.class);
+
+ // input batch schema
+ protected static TupleMetadata inputSchema;
+
+ // default Empty input RowSet
+ protected RowSet.SingleRowSet emptyInputRowSet;
+
+ // default tNon-Empty input RowSet
+ protected RowSet.SingleRowSet nonEmptyInputRowSet;
+
+ // List of incoming containers
+ protected final List<VectorContainer> inputContainer = new ArrayList<>(5);
+
+ // List of incoming IterOutcomes
+ protected final List<RecordBatch.IterOutcome> inputOutcomes = new
ArrayList<>(5);
+
+ // output record count
+ protected int outputRecordCount;
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ inputSchema = new SchemaBuilder()
+ .add("id_left", TypeProtos.MinorType.INT)
+ .add("cost_left", TypeProtos.MinorType.INT)
+ .add("name_left", TypeProtos.MinorType.VARCHAR)
+ .buildSchema();
+ }
+
+ @Before
+ public void beforeTest() throws Exception {
+ emptyInputRowSet = operatorFixture.rowSetBuilder(inputSchema).build();
+ nonEmptyInputRowSet = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(1, 10, "item1")
+ .build();
+ final PhysicalOperator mockPopConfig = new MockStorePOP(null);
+ mockOpContext(mockPopConfig, 0, 0);
+ }
+
+ @After
+ public void afterTest() throws Exception {
+ emptyInputRowSet.clear();
+ nonEmptyInputRowSet.clear();
+ inputContainer.clear();
+ inputOutcomes.clear();
+ outputRecordCount = 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/2f275d17/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java
----------------------------------------------------------------------
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java
index 9ed9848..5463974 100644
---
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java
+++
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java
@@ -41,6 +41,7 @@ public class MockRecordBatch implements CloseableRecordBatch {
private int currentContainerIndex;
private int currentOutcomeIndex;
private boolean isDone;
+ private boolean limitWithUnnest;
// All the below resources are owned by caller
private final List<VectorContainer> allTestContainers;
@@ -98,9 +99,11 @@ public class MockRecordBatch implements CloseableRecordBatch
{
@Override
public void kill(boolean sendUpstream) {
- isDone = true;
- container.clear();
- container.setRecordCount(0);
+ if (!limitWithUnnest) {
+ isDone = true;
+ container.clear();
+ container.setRecordCount(0);
+ }
}
@Override
@@ -182,4 +185,8 @@ public class MockRecordBatch implements
CloseableRecordBatch {
public boolean isCompleted() {
return isDone;
}
+
+ public void useUnnestKillHandlingForLimit(boolean limitWithUnnest) {
+ this.limitWithUnnest = limitWithUnnest;
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/2f275d17/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestFilterBatchEmitOutcome.java
----------------------------------------------------------------------
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestFilterBatchEmitOutcome.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestFilterBatchEmitOutcome.java
new file mode 100644
index 0000000..21043ef
--- /dev/null
+++
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestFilterBatchEmitOutcome.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.filter;
+
+import org.apache.drill.categories.OperatorTest;
+import org.apache.drill.exec.physical.config.Filter;
+import org.apache.drill.exec.physical.impl.MockRecordBatch;
+import org.apache.drill.exec.physical.impl.BaseTestOpBatchEmitOutcome;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.test.rowSet.RowSet;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+@Category(OperatorTest.class)
+public class TestFilterBatchEmitOutcome extends BaseTestOpBatchEmitOutcome {
+
+ /**
+ * Test to show if an empty batch is accompanied with EMIT outcome then
Filter operator is not ignoring it and
+ * asking for next batch with data. Instead it is just returning the empty
batch along with EMIT outcome right away.
+ *
+ * This test also shows that if first batch accompanied with OK_NEW_SCHEMA
is empty then it is also pass through by
+ * Filter operator rather than ignoring it and waiting for a batch with some
data in it.
+ * @throws Throwable
+ */
+ @Test
+ public void testFilterEmptyBatchEmitOutcome() throws Throwable {
+ inputContainer.add(emptyInputRowSet.container());
+ inputContainer.add(emptyInputRowSet.container());
+ inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+ final MockRecordBatch mockInputBatch = new
MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+ inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+ final Filter filterConf = new Filter(null, parseExpr("id_left=5"), 1.0f);
+ final FilterRecordBatch filterRecordBatch = new
FilterRecordBatch(filterConf, mockInputBatch,
+ operatorFixture.getFragmentContext());
+
+ assertTrue(filterRecordBatch.next() ==
RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ outputRecordCount += filterRecordBatch.getRecordCount();
+ assertTrue(filterRecordBatch.next() == RecordBatch.IterOutcome.EMIT);
+ outputRecordCount += filterRecordBatch.getRecordCount();
+ assertEquals(0, outputRecordCount);
+ }
+
+ /**
+ * Test to show if a non-empty batch is accompanied with EMIT outcome then
Filter operator produces output for
+ * that batch with data matching filter condition and return the output
using EMIT outcome.
+ * @throws Throwable
+ */
+ @Test
+ public void testFilterNonEmptyBatchEmitOutcome() throws Throwable {
+ inputContainer.add(emptyInputRowSet.container());
+ inputContainer.add(nonEmptyInputRowSet.container());
+ inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+ final MockRecordBatch mockInputBatch = new
MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+ inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+ final Filter filterConf = new Filter(null, parseExpr("id_left=1"), 1.0f);
+ final FilterRecordBatch filterRecordBatch = new
FilterRecordBatch(filterConf, mockInputBatch,
+ operatorFixture.getFragmentContext());
+
+ assertTrue(filterRecordBatch.next() ==
RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ outputRecordCount += filterRecordBatch.getRecordCount();
+ assertTrue(filterRecordBatch.next() == RecordBatch.IterOutcome.EMIT);
+ outputRecordCount += filterRecordBatch.getRecordCount();
+ assertEquals(1, outputRecordCount);
+ }
+
+ /**
+ * Test to show if a non-empty batch is accompanied with EMIT outcome then
Filter operator produces empty output
+ * batch since filter condition is not satisfied by any data in incoming
batch. This empty output batch is
+ * accompanied with EMIT outcome.
+ * @throws Throwable
+ */
+ @Test
+ public void testFilterNonEmptyBatchEmitOutcome_WithNonMatchingCondition()
throws Throwable {
+ inputContainer.add(emptyInputRowSet.container());
+ inputContainer.add(nonEmptyInputRowSet.container());
+ inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+ final MockRecordBatch mockInputBatch = new
MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+ inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+ final Filter filterConf = new Filter(null, parseExpr("id_left=2"), 1.0f);
+ final FilterRecordBatch filterRecordBatch = new
FilterRecordBatch(filterConf, mockInputBatch,
+ operatorFixture.getFragmentContext());
+
+ assertTrue(filterRecordBatch.next() ==
RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ outputRecordCount += filterRecordBatch.getRecordCount();
+ assertTrue(filterRecordBatch.next() == RecordBatch.IterOutcome.EMIT);
+ outputRecordCount += filterRecordBatch.getRecordCount();
+ assertEquals(0, outputRecordCount);
+ }
+
+ /**
+ * Test to show that non-empty first batch produces output for that batch
with OK_NEW_SCHEMA and later empty batch
+ * with EMIT outcome is also passed through rather than getting ignored.
+ * @throws Throwable
+ */
+ @Test
+ public void testFilterNonEmptyFirst_EmptyBatchEmitOutcome() throws Throwable
{
+ inputContainer.add(nonEmptyInputRowSet.container());
+ inputContainer.add(emptyInputRowSet.container());
+ inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+ final MockRecordBatch mockInputBatch = new
MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+ inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+ final Filter filterConf = new Filter(null, parseExpr("id_left=1"), 1.0f);
+ final FilterRecordBatch filterRecordBatch = new
FilterRecordBatch(filterConf, mockInputBatch,
+ operatorFixture.getFragmentContext());
+
+ assertTrue(filterRecordBatch.next() ==
RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ outputRecordCount += filterRecordBatch.getRecordCount();
+ assertTrue(filterRecordBatch.next() == RecordBatch.IterOutcome.EMIT);
+ outputRecordCount += filterRecordBatch.getRecordCount();
+ assertEquals(1, outputRecordCount);
+ }
+
+ /**
+ * Test to show if an empty batch is accompanied with OK outcome then that
batch is ignored by Filter operator and
+ * it doesn't return anything instead call's next() to get another batch. If
the subsequent next() call returns empty
+ * batch with EMIT outcome then Filter returns the EMIT outcome correctly
rather than ignoring it because of empty
+ * batch.
+ * @throws Throwable
+ */
+ @Test
+ public void testFilterNonEmptyFirst_EmptyOK_EmptyBatchEmitOutcome() throws
Throwable {
+ inputContainer.add(nonEmptyInputRowSet.container());
+ inputContainer.add(emptyInputRowSet.container());
+ inputContainer.add(emptyInputRowSet.container());
+ inputContainer.add(emptyInputRowSet.container());
+
+ inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ inputOutcomes.add(RecordBatch.IterOutcome.OK);
+ inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+ inputOutcomes.add(RecordBatch.IterOutcome.NONE);
+
+ final MockRecordBatch mockInputBatch = new
MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+ inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+ final Filter filterConf = new Filter(null, parseExpr("id_left=1"), 1.0f);
+ final FilterRecordBatch filterRecordBatch = new
FilterRecordBatch(filterConf, mockInputBatch,
+ operatorFixture.getFragmentContext());
+
+ assertTrue(filterRecordBatch.next() ==
RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ outputRecordCount += filterRecordBatch.getRecordCount();
+ // OK will not be received since it's was accompanied with empty batch
+ assertTrue(filterRecordBatch.next() == RecordBatch.IterOutcome.EMIT);
+ outputRecordCount += filterRecordBatch.getRecordCount();
+ assertTrue(filterRecordBatch.next() == RecordBatch.IterOutcome.NONE);
+ assertEquals(1, outputRecordCount);
+ }
+
+ /**
+ * Test to show empty batch with OK outcome is ignore and later non-empty
batch with OK outcome produces an output
+ * batch. Whereas a empty batch with EMIT outcome is not ignored and a empty
output batch is returned with EMIT
+ * outcome.
+ * @throws Throwable
+ */
+ @Test
+ public void testFilterNonEmptyFirst_NonEmptyOK_EmptyBatchEmitOutcome()
throws Throwable {
+ final RowSet.SingleRowSet nonEmptyInputRowSet2 =
operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(2, 20, "item2")
+ .build();
+
+ inputContainer.add(nonEmptyInputRowSet.container());
+ inputContainer.add(emptyInputRowSet.container());
+ inputContainer.add(nonEmptyInputRowSet2.container());
+ inputContainer.add(emptyInputRowSet.container());
+
+ inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ inputOutcomes.add(RecordBatch.IterOutcome.OK);
+ inputOutcomes.add(RecordBatch.IterOutcome.OK);
+ inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+ final MockRecordBatch mockInputBatch = new
MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+ inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+ final Filter filterConf = new Filter(null, parseExpr("id_left>=1"), 1.0f);
+ final FilterRecordBatch filterRecordBatch = new
FilterRecordBatch(filterConf, mockInputBatch,
+ operatorFixture.getFragmentContext());
+
+ assertTrue(filterRecordBatch.next() ==
RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ outputRecordCount += filterRecordBatch.getRecordCount();
+ assertTrue(filterRecordBatch.next() == RecordBatch.IterOutcome.OK);
+ outputRecordCount += filterRecordBatch.getRecordCount();
+ assertTrue(filterRecordBatch.next() == RecordBatch.IterOutcome.EMIT);
+ outputRecordCount += filterRecordBatch.getRecordCount();
+ assertEquals(2, outputRecordCount);
+
+ // free up resources
+ nonEmptyInputRowSet2.clear();
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/2f275d17/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitBatchEmitOutcome.java
----------------------------------------------------------------------
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitBatchEmitOutcome.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitBatchEmitOutcome.java
new file mode 100644
index 0000000..4757488
--- /dev/null
+++
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitBatchEmitOutcome.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.limit;
+
+import org.apache.drill.exec.physical.config.Limit;
+import org.apache.drill.exec.physical.impl.MockRecordBatch;
+import org.apache.drill.exec.physical.impl.BaseTestOpBatchEmitOutcome;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.test.rowSet.RowSet;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestLimitBatchEmitOutcome extends BaseTestOpBatchEmitOutcome {
+
+ /**
+ * Test to show empty batch with both OK_NEW_SCHEMA and EMIT outcome is not
ignored by Limit and is pass through to
+ * the downstream operator.
+ * @throws Throwable
+ */
+ @Test
+ public void testLimitEmptyBatchEmitOutcome() throws Throwable {
+ inputContainer.add(emptyInputRowSet.container());
+ inputContainer.add(emptyInputRowSet.container());
+
+ inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+
+ final MockRecordBatch mockInputBatch = new
MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+ inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+ // Only set for this Test class
+ mockInputBatch.useUnnestKillHandlingForLimit(true);
+
+ final Limit limitConf = new Limit(null, 0, 1);
+ final LimitRecordBatch limitBatch = new LimitRecordBatch(limitConf,
operatorFixture.getFragmentContext(),
+ mockInputBatch);
+
+ assertTrue(limitBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ outputRecordCount += limitBatch.getRecordCount();
+ assertTrue(limitBatch.next() == RecordBatch.IterOutcome.EMIT);
+ outputRecordCount += limitBatch.getRecordCount();
+ assertEquals(0, outputRecordCount);
+ }
+
+ /**
+ * Test to validate limit considers all the data until it sees EMIT outcome
and return output batch with data that
+ * meets the limit criteria.
+ * @throws Throwable
+ */
+ @Test
+ public void testLimitNonEmptyBatchEmitOutcome() throws Throwable {
+ inputContainer.add(emptyInputRowSet.container());
+ inputContainer.add(nonEmptyInputRowSet.container());
+
+ inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+
+ final MockRecordBatch mockInputBatch = new
MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+ inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+ // Only set for this Test class
+ mockInputBatch.useUnnestKillHandlingForLimit(true);
+
+ final Limit limitConf = new Limit(null, 0, 1);
+ final LimitRecordBatch limitBatch = new LimitRecordBatch(limitConf,
operatorFixture.getFragmentContext(),
+ mockInputBatch);
+
+ assertTrue(limitBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ outputRecordCount += limitBatch.getRecordCount();
+ assertEquals(0, outputRecordCount);
+ assertTrue(limitBatch.next() == RecordBatch.IterOutcome.EMIT);
+ outputRecordCount += limitBatch.getRecordCount();
+ assertEquals(1, outputRecordCount);
+ }
+
+ /**
+ * Test to show that once a limit number of records is produced using first
set of batches then on getting a batch
+ * with EMIT outcome, the limit state is again refreshed and applied to next
set of batches with data.
+ * @throws Throwable
+ */
+ @Test
+ public void testLimitResetsAfterFirstEmitOutcome() throws Throwable {
+ final RowSet.SingleRowSet nonEmptyInputRowSet2 =
operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(2, 20, "item2")
+ .addRow(3, 30, "item3")
+ .build();
+
+ inputContainer.add(nonEmptyInputRowSet.container());
+ inputContainer.add(emptyInputRowSet.container());
+ inputContainer.add(nonEmptyInputRowSet2.container());
+
+ inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+ inputOutcomes.add(RecordBatch.IterOutcome.OK);
+
+ final MockRecordBatch mockInputBatch = new
MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+ inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+ // Only set for this Test class
+ mockInputBatch.useUnnestKillHandlingForLimit(true);
+
+ final Limit limitConf = new Limit(null, 0, 1);
+ final LimitRecordBatch limitBatch = new LimitRecordBatch(limitConf,
operatorFixture.getFragmentContext(),
+ mockInputBatch);
+
+ assertTrue(limitBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ assertEquals(1, limitBatch.getRecordCount());
+ assertTrue(limitBatch.next() == RecordBatch.IterOutcome.EMIT);
+
+ // State refresh happens and limit again works on new data batches
+ assertEquals(0, limitBatch.getRecordCount());
+ assertTrue(limitBatch.next() == RecordBatch.IterOutcome.OK);
+ assertEquals(1, limitBatch.getRecordCount());
+ assertTrue(limitBatch.next() == RecordBatch.IterOutcome.NONE);
+ }
+
+ /**
+ * Test to show that when the limit number of records is found with first
incoming batch, then next empty incoming
+ * batch with OK outcome is ignored, but the empty EMIT outcome batch is not
ignored. Empty incoming batch with
+ * EMIT outcome produces empty output batch with EMIT outcome.
+ * @throws Throwable
+ */
+ @Test
+ public void testLimitNonEmptyFirst_EmptyOKEmitOutcome() throws Throwable {
+ inputContainer.add(nonEmptyInputRowSet.container());
+ inputContainer.add(emptyInputRowSet.container());
+ inputContainer.add(emptyInputRowSet.container());
+ inputContainer.add(emptyInputRowSet.container());
+
+ inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ inputOutcomes.add(RecordBatch.IterOutcome.OK);
+ inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+ inputOutcomes.add(RecordBatch.IterOutcome.NONE);
+
+ final MockRecordBatch mockInputBatch = new
MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+ inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+ // Only set for this Test class
+ mockInputBatch.useUnnestKillHandlingForLimit(true);
+
+ final Limit limitConf = new Limit(null, 0, 1);
+ final LimitRecordBatch limitBatch = new LimitRecordBatch(limitConf,
operatorFixture.getFragmentContext(),
+ mockInputBatch);
+
+ assertTrue(limitBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ assertEquals(1, limitBatch.getRecordCount());
+ // OK will not be received since it's was accompanied with empty batch
+ assertTrue(limitBatch.next() == RecordBatch.IterOutcome.EMIT);
+ assertEquals(0, limitBatch.getRecordCount());
+ assertTrue(limitBatch.next() == RecordBatch.IterOutcome.NONE);
+ }
+
+ /**
+ * Test to show that limit refreshes it's state after seeing first EMIT
outcome and works on data batches following
+ * it as new set's of incoming batch and apply the limits rule from fresh on
those. So for first set of batches with
+ * OK_NEW_SCHEMA and EMIT outcome but total number of records received being
less than limit condition, it still
+ * produces an output with that many records (in this case 1 even though
limit number of records is 2).
+ *
+ * After seeing EMIT, it refreshes it's state and operate on next input
batches to again return limit number of
+ * records. So for 3rd batch with 2 records but with EMIT outcome it
produces an output batch with 2 records not
+ * with 1 since state is refreshed.
+ * @throws Throwable
+ */
+ @Test
+ public void testMultipleLimitWithEMITOutcome() throws Throwable {
+ final RowSet.SingleRowSet nonEmptyInputRowSet2 =
operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(2, 20, "item2")
+ .addRow(3, 30, "item3")
+ .build();
+
+ inputContainer.add(nonEmptyInputRowSet.container());
+ inputContainer.add(emptyInputRowSet.container());
+ inputContainer.add(nonEmptyInputRowSet2.container());
+ inputContainer.add(emptyInputRowSet.container());
+
+ inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+ inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+ final MockRecordBatch mockInputBatch = new
MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+ inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+ // Only set for this Test class
+ mockInputBatch.useUnnestKillHandlingForLimit(true);
+
+ final Limit limitConf = new Limit(null, 0, 2);
+ final LimitRecordBatch limitBatch = new LimitRecordBatch(limitConf,
operatorFixture.getFragmentContext(),
+ mockInputBatch);
+
+ // first limit evaluation
+ assertTrue(limitBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ assertEquals(1, limitBatch.getRecordCount());
+ assertTrue(limitBatch.next() == RecordBatch.IterOutcome.EMIT);
+ assertEquals(0, limitBatch.getRecordCount());
+
+ // After seeing EMIT limit will refresh it's state and again evaluate
limit on next set of input batches
+ assertTrue(limitBatch.next() == RecordBatch.IterOutcome.EMIT);
+ assertEquals(2, limitBatch.getRecordCount());
+
+ // Since limit is hit it will return NONE
+ assertTrue(limitBatch.next() == RecordBatch.IterOutcome.NONE);
+ }
+
+ /**
+ * Test shows that limit operates on multiple input batches until it finds
limit number of records or it sees an
+ * EMIT outcome to refresh it's state.
+ * @throws Throwable
+ */
+ @Test
+ public void testLimitNonEmptyFirst_NonEmptyOK_EmptyBatchEmitOutcome() throws
Throwable {
+
+ final RowSet.SingleRowSet nonEmptyInputRowSet2 =
operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(2, 20, "item2")
+ .build();
+
+ inputContainer.add(nonEmptyInputRowSet.container());
+ inputContainer.add(emptyInputRowSet.container());
+ inputContainer.add(nonEmptyInputRowSet2.container());
+ inputContainer.add(emptyInputRowSet.container());
+
+ inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ inputOutcomes.add(RecordBatch.IterOutcome.OK);
+ inputOutcomes.add(RecordBatch.IterOutcome.OK);
+ inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+ final MockRecordBatch mockInputBatch = new
MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+ inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+ // Only set for this Test class
+ mockInputBatch.useUnnestKillHandlingForLimit(true);
+
+ final Limit limitConf = new Limit(null, 0, 2);
+ final LimitRecordBatch limitBatch = new LimitRecordBatch(limitConf,
operatorFixture.getFragmentContext(),
+ mockInputBatch);
+
+ assertTrue(limitBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ assertEquals(1, limitBatch.getRecordCount());
+ assertTrue(limitBatch.next() == RecordBatch.IterOutcome.OK);
+ assertEquals(1, limitBatch.getRecordCount());
+ assertTrue(limitBatch.next() == RecordBatch.IterOutcome.EMIT);
+ assertEquals(0, limitBatch.getRecordCount());
+
+ nonEmptyInputRowSet2.clear();
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/2f275d17/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitOperator.java
----------------------------------------------------------------------
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitOperator.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitOperator.java
new file mode 100644
index 0000000..22c0013
--- /dev/null
+++
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitOperator.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.limit;
+
+import com.google.common.collect.Lists;
+import org.apache.drill.exec.physical.config.Limit;
+import org.apache.drill.exec.physical.unit.PhysicalOpUnitTestBase;
+import org.junit.Test;
+
+import java.util.List;
+
+public class TestLimitOperator extends PhysicalOpUnitTestBase {
+
+ @Test
+ public void testLimitMoreRecords() {
+ Limit limitConf = new Limit(null, 0, 10);
+ List<String> inputJsonBatches = Lists.newArrayList(
+ "[{\"a\": 5, \"b\" : 1 }]",
+ "[{\"a\": 5, \"b\" : 5},{\"a\": 3, \"b\" : 8}]");
+ opTestBuilder()
+ .physicalOperator(limitConf)
+ .inputDataStreamJson(inputJsonBatches)
+ .baselineColumns("a", "b")
+ .baselineValues(5l, 1l)
+ .baselineValues(5l, 5l)
+ .baselineValues(3l, 8l)
+ .go();
+ }
+
+ @Test
+ public void testLimitLessRecords() {
+ Limit limitConf = new Limit(null, 0, 1);
+ List<String> inputJsonBatches = Lists.newArrayList(
+ "[{\"a\": 5, \"b\" : 1 }]",
+ "[{\"a\": 5, \"b\" : 5},{\"a\": 3, \"b\" : 8}]");
+ opTestBuilder()
+ .physicalOperator(limitConf)
+ .inputDataStreamJson(inputJsonBatches)
+ .baselineColumns("a", "b")
+ .baselineValues(5l, 1l)
+ .go();
+ }
+
+ @Test
+ public void testLimitWithOffset() {
+ Limit limitConf = new Limit(null, 2, 3);
+ List<String> inputJsonBatches = Lists.newArrayList(
+ "[{\"a\": 5, \"b\" : 1 }]",
+ "[{\"a\": 5, \"b\" : 5},{\"a\": 3, \"b\" : 8}]");
+ opTestBuilder()
+ .physicalOperator(limitConf)
+ .inputDataStreamJson(inputJsonBatches)
+ .baselineColumns("a", "b")
+ .baselineValues(3l, 8l)
+ .go();
+ }
+
+ @Test
+ public void testLimitWithNoLastRecord() {
+ Limit limitConf = new Limit(null, 1, null);
+ List<String> inputJsonBatches = Lists.newArrayList(
+ "[{\"a\": 5, \"b\" : 1 }]",
+ "[{\"a\": 5, \"b\" : 5},{\"a\": 3, \"b\" : 8}]");
+ opTestBuilder()
+ .physicalOperator(limitConf)
+ .inputDataStreamJson(inputJsonBatches)
+ .baselineColumns("a", "b")
+ .baselineValues(5l, 5l)
+ .baselineValues(3l, 8l)
+ .go();
+ }
+
+ @Test
+ public void testLimitWithNegativeOffset() {
+ Limit limitConf = new Limit(null, -1, null);
+ List<String> inputJsonBatches = Lists.newArrayList(
+ "[{\"a\": 5, \"b\" : 1 }]",
+ "[{\"a\": 5, \"b\" : 5},{\"a\": 3, \"b\" : 8}]");
+ opTestBuilder()
+ .physicalOperator(limitConf)
+ .inputDataStreamJson(inputJsonBatches)
+ .baselineColumns("a", "b")
+ .baselineValues(5l, 1l)
+ .baselineValues(5l, 5l)
+ .baselineValues(3l, 8l)
+ .go();
+ }
+
+ @Test
+ public void testLimitWithNegativeFirstLast() {
+ Limit limitConf = new Limit(null, -1, -1);
+ List<String> inputJsonBatches = Lists.newArrayList(
+ "[{\"a\": 5, \"b\" : 1 }]",
+ "[{\"a\": 5, \"b\" : 5},{\"a\": 3, \"b\" : 8}]");
+ opTestBuilder()
+ .physicalOperator(limitConf)
+ .inputDataStreamJson(inputJsonBatches)
+ .baselineColumns("a", "b")
+ .expectZeroRows()
+ .go();
+ }
+
+ @Test
+ public void testLimitWithOffsetOutOfRange() {
+ Limit limitConf = new Limit(null, 10, 20);
+ List<String> inputJsonBatches = Lists.newArrayList(
+ "[{\"a\": 5, \"b\" : 1 }]",
+ "[{\"a\": 5, \"b\" : 5},{\"a\": 3, \"b\" : 8}]");
+ opTestBuilder()
+ .physicalOperator(limitConf)
+ .inputDataStreamJson(inputJsonBatches)
+ .baselineColumns("a", "b")
+ .expectZeroRows()
+ .go();
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/2f275d17/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/project/TestProjectEmitOutcome.java
----------------------------------------------------------------------
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/project/TestProjectEmitOutcome.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/project/TestProjectEmitOutcome.java
new file mode 100644
index 0000000..b3099d0
--- /dev/null
+++
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/project/TestProjectEmitOutcome.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.project;
+
+import org.apache.drill.exec.physical.config.Project;
+import org.apache.drill.exec.physical.impl.MockRecordBatch;
+import org.apache.drill.exec.physical.impl.BaseTestOpBatchEmitOutcome;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.test.rowSet.RowSet;
+import org.junit.Test;
+
+import static junit.framework.TestCase.fail;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestProjectEmitOutcome extends BaseTestOpBatchEmitOutcome {
+
+ /**
+ * Test that if empty input batch is received with OK_NEW_SCHEMA or EMIT
outcome, then Project doesn't ignores
+ * these empty batches and instead return them downstream with correct
outcomes.
+ * @throws Throwable
+ */
+ @Test
+ public void testProjectEmptyBatchEmitOutcome() throws Throwable {
+ inputContainer.add(emptyInputRowSet.container());
+ inputContainer.add(emptyInputRowSet.container());
+ inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+ final MockRecordBatch mockInputBatch = new
MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+ inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+ final Project projectConf = new Project(parseExprs("id_left+5",
"id_left"), null);
+ final ProjectRecordBatch projectBatch = new
ProjectRecordBatch(projectConf, mockInputBatch,
+ operatorFixture.getFragmentContext());
+
+ assertTrue(projectBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ outputRecordCount += projectBatch.getRecordCount();
+ assertTrue(projectBatch.next() == RecordBatch.IterOutcome.EMIT);
+ outputRecordCount += projectBatch.getRecordCount();
+ assertEquals(0, outputRecordCount);
+ }
+
+ /**
+ * Test to show if a non-empty batch is accompanied with EMIT outcome then
Project operator produces output for
+ * that batch and return the output using EMIT outcome.
+ * @throws Throwable
+ */
+ @Test
+ public void testProjectNonEmptyBatchEmitOutcome() throws Throwable {
+ inputContainer.add(emptyInputRowSet.container());
+ inputContainer.add(nonEmptyInputRowSet.container());
+ inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+ final MockRecordBatch mockInputBatch = new
MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+ inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+ final Project projectConf = new Project(parseExprs("id_left+5",
"id_left"), null);
+ final ProjectRecordBatch projectBatch = new
ProjectRecordBatch(projectConf, mockInputBatch,
+ operatorFixture.getFragmentContext());
+
+ assertTrue(projectBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ outputRecordCount += projectBatch.getRecordCount();
+ assertTrue(projectBatch.next() == RecordBatch.IterOutcome.EMIT);
+ outputRecordCount += projectBatch.getRecordCount();
+ assertEquals(1, outputRecordCount);
+ }
+
+ /**
+ * Test to show that non-empty first batch produces output for that batch
with OK_NEW_SCHEMA and later empty batch
+ * with EMIT outcome is also passed through rather than getting ignored.
+ * @throws Throwable
+ */
+ @Test
+ public void testProjectNonEmptyFirst_EmptyBatchEmitOutcome() throws
Throwable {
+ inputContainer.add(nonEmptyInputRowSet.container());
+ inputContainer.add(emptyInputRowSet.container());
+ inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+ final MockRecordBatch mockInputBatch = new
MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+ inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+ final Project projectConf = new Project(parseExprs("id_left+5",
"id_left"), null);
+ final ProjectRecordBatch projectBatch = new
ProjectRecordBatch(projectConf, mockInputBatch,
+ operatorFixture.getFragmentContext());
+
+ assertTrue(projectBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ outputRecordCount += projectBatch.getRecordCount();
+ assertTrue(projectBatch.next() == RecordBatch.IterOutcome.EMIT);
+ outputRecordCount += projectBatch.getRecordCount();
+ assertEquals(1, outputRecordCount);
+ }
+
+ /**
+ * Test to show if an empty batch is accompanied with OK outcome then that
batch is ignored by Project operator and
+ * it doesn't return anything instead call's next() to get another batch. If
the subsequent next() call returns empty
+ * batch with EMIT outcome then Project returns the EMIT outcome correctly
rather than ignoring it because of empty
+ * batch.
+ * @throws Throwable
+ */
+ @Test
+ public void testProjectNonEmptyFirst_EmptyOK_EmptyBatchEmitOutcome() throws
Throwable {
+ inputContainer.add(nonEmptyInputRowSet.container());
+ inputContainer.add(emptyInputRowSet.container());
+ inputContainer.add(emptyInputRowSet.container());
+ inputContainer.add(emptyInputRowSet.container());
+
+ inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ inputOutcomes.add(RecordBatch.IterOutcome.OK);
+ inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+ inputOutcomes.add(RecordBatch.IterOutcome.NONE);
+
+ final MockRecordBatch mockInputBatch = new
MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+ inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+ final Project projectConf = new Project(parseExprs("id_left+5",
"id_left"), null);
+ final ProjectRecordBatch projectBatch = new
ProjectRecordBatch(projectConf, mockInputBatch,
+ operatorFixture.getFragmentContext());
+
+ assertTrue(projectBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ outputRecordCount += projectBatch.getRecordCount();
+ // OK will not be received since it's was accompanied with empty batch
+ assertTrue(projectBatch.next() == RecordBatch.IterOutcome.EMIT);
+ outputRecordCount += projectBatch.getRecordCount();
+ assertTrue(projectBatch.next() == RecordBatch.IterOutcome.NONE);
+ assertEquals(1, outputRecordCount);
+ }
+
+ /**
+ * Test to show that in cases with functions in project list whose output is
complex types, if Project sees an EMIT
+ * outcome then it fails. This scenario can happen when complex functions
are used in subquery between LATERAL and
+ * UNNEST. In which case guidance is to use those functions in project list
of outermost query.
+ * Below test passes first batch as non-empty with OK_NEW_SCHEMA during
which complex writers are cached for
+ * projected columns and later when an empty batch arrives with EMIT outcome
the exception is thrown.
+ * @throws Throwable
+ */
+ @Test
+ public void testProjectWithComplexWritersAndEmitOutcome_NonEmptyFirstBatch()
throws Throwable {
+ final RowSet.SingleRowSet nonEmptyInputRowSet2 =
operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(2, 20, "{ \"a\" : 1 }")
+ .build();
+
+ inputContainer.add(nonEmptyInputRowSet2.container());
+ inputContainer.add(emptyInputRowSet.container());
+
+ inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+ final MockRecordBatch mockInputBatch = new
MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+ inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+ final Project projectConf = new
Project(parseExprs("convert_fromJSON(name_left)", "name_columns"), null);
+ final ProjectRecordBatch projectBatch = new
ProjectRecordBatch(projectConf, mockInputBatch,
+ operatorFixture.getFragmentContext());
+
+ try {
+ assertTrue(projectBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ assertTrue(projectBatch.next() == RecordBatch.IterOutcome.EMIT);
+ fail();
+ } catch (Exception e) {
+ // exception is expected because of complex writers case
+ assertTrue(e instanceof UnsupportedOperationException);
+ }
+ }
+
+ /**
+ * Test to show that in cases with functions in project list whose output is
complex types, if Project sees an EMIT
+ * outcome then it fails. This scenario can happen when complex functions
are used in subquery between LATERAL and
+ * UNNEST. In which case guidance is to use those functions in project list
of outermost query.
+ *
+ * Below test passes first batch as empty with OK_NEW_SCHEMA during which
complex writers are not known so far
+ * and Project calls next() on upstream to get a batch with data. But later
when an empty batch arrives with EMIT
+ * outcome the exception is thrown as the scenario is not supported
+ * @throws Throwable
+ */
+ @Test
+ public void testProjectWithComplexWritersAndEmitOutcome_EmptyFirstBatch()
throws Throwable {
+ final RowSet.SingleRowSet nonEmptyInputRowSet2 =
operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(2, 20, "{ \"a\" : 1 }")
+ .build();
+
+ inputContainer.add(emptyInputRowSet.container());
+ inputContainer.add(nonEmptyInputRowSet2.container());
+
+ inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+ final MockRecordBatch mockInputBatch = new
MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+ inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+ final Project projectConf = new
Project(parseExprs("convert_fromJSON(name_left)", "name_columns"), null);
+ final ProjectRecordBatch projectBatch = new
ProjectRecordBatch(projectConf, mockInputBatch,
+ operatorFixture.getFragmentContext());
+
+ try {
+ assertTrue(projectBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ fail();
+ } catch (Exception e) {
+ // exception is expected because of complex writers case
+ assertTrue(e instanceof UnsupportedOperationException);
+ }
+ }
+}