DRILL-1241: Ensure that Limit produces at least 1 batch with the output schema.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/cca9ce18 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/cca9ce18 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/cca9ce18 Branch: refs/heads/master Commit: cca9ce18110bfba7658f2874091f36f2976e47b8 Parents: c2df146 Author: Aman Sinha <asi...@maprtech.com> Authored: Thu Aug 14 16:22:39 2014 -0700 Committer: Aditya Kishore <adi...@maprtech.com> Committed: Mon Aug 18 14:56:52 2014 +0530 ---------------------------------------------------------------------- .../physical/impl/limit/LimitRecordBatch.java | 34 ++++++++++++++++++++ .../org/apache/drill/TestExampleQueries.java | 10 ++++++ 2 files changed, 44 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cca9ce18/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java index 12ee406..32eb709 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java @@ -23,23 +23,29 @@ import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.config.Limit; +import org.apache.drill.exec.physical.impl.project.ProjectRecordBatch; import org.apache.drill.exec.record.AbstractSingleRecordBatch; import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.TransferPair; import org.apache.drill.exec.record.VectorWrapper; +import org.apache.drill.exec.record.RecordBatch.IterOutcome; import org.apache.drill.exec.record.selection.SelectionVector2; import com.google.common.collect.Lists; public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LimitRecordBatch.class); + private SelectionVector2 outgoingSv; private SelectionVector2 incomingSv; private int recordsToSkip; private int recordsLeft; private boolean noEndLimit; private boolean skipBatch; + private boolean first = true; + private boolean done = false; List<TransferPair> transfers = Lists.newArrayList(); public LimitRecordBatch(Limit popConfig, FragmentContext context, RecordBatch incoming) throws OutOfMemoryException { @@ -82,7 +88,15 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> { @Override public IterOutcome innerNext() { + if (done) { + return IterOutcome.NONE; + } + if(!noEndLimit && recordsLeft <= 0) { + if (first) { + return produceEmptyFirstBatch(); + } + incoming.kill(true); IterOutcome upStream = incoming.next(); @@ -96,9 +110,11 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> { upStream = incoming.next(); } + first = false; return IterOutcome.NONE; } + first = false; return super.innerNext(); } @@ -127,6 +143,23 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> { } } + private IterOutcome produceEmptyFirstBatch() { + incoming.next(); + first = false; + done = true; + // Build the container schema and set the count + for (VectorWrapper<?> v : incoming) { + TransferPair pair = v.getValueVector().getTransferPair(); + container.add(pair.getTo()); + transfers.add(pair); + } + container.buildSchema(BatchSchema.SelectionVectorMode.TWO_BYTE); + container.setRecordCount(0); + + incoming.kill(true); + return IterOutcome.OK_NEW_SCHEMA; + } + private void limitWithNoSV(int recordCount) { int offset = Math.max(0, Math.min(recordCount - 1, recordsToSkip)); recordsToSkip -= offset; @@ -178,4 +211,5 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> { outgoingSv.clear(); super.cleanup(); } + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cca9ce18/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java b/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java index 8ed8171..11075f6 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java +++ b/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java @@ -283,4 +283,14 @@ public class TestExampleQueries extends BaseTestQuery{ test("select cast(r_name as varchar(20)) from cp.`tpch/region.parquet` order by r_name"); } + @Test // tests with LIMIT 0 + public void testLimit0_1() throws Exception { + test("select n_nationkey, n_name from cp.`tpch/nation.parquet` limit 0"); + test("select n_nationkey, n_name from cp.`tpch/nation.parquet` limit 0 offset 5"); + test("select n_nationkey, n_name from cp.`tpch/nation.parquet` order by n_nationkey limit 0"); + test("select * from cp.`tpch/nation.parquet` limit 0"); + test("select n.n_nationkey from cp.`tpch/nation.parquet` n, cp.`tpch/region.parquet` r where n.n_regionkey = r.r_regionkey limit 0"); + test("select n_regionkey, count(*) from cp.`tpch/nation.parquet` group by n_regionkey limit 0"); + } + }