DRILL-313: Fix for Limit operator only transferring buffers on new schema
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/6c0389f3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/6c0389f3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/6c0389f3 Branch: refs/heads/master Commit: 6c0389f394a789beb74103309f5bed13ddeccf95 Parents: b91f2e8 Author: Steven Phillips <[email protected]> Authored: Sun Dec 1 19:48:57 2013 -0800 Committer: Jacques Nadeau <[email protected]> Committed: Sun Dec 1 19:48:57 2013 -0800 ---------------------------------------------------------------------- .../physical/impl/limit/LimitRecordBatch.java | 8 ++-- .../drill/exec/fn/impl/GeneratorFunctions.java | 22 ++++++++- .../physical/impl/limit/TestSimpleLimit.java | 35 ++++++++++++++ .../src/test/resources/limit/test4.json | 49 ++++++++++++++++++++ 4 files changed, 109 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6c0389f3/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java index 8390997..712af9f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java @@ -36,6 +36,7 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> { private int recordsLeft; private boolean noEndLimit; private boolean skipBatch; + List<TransferPair> transfers = Lists.newArrayList(); public LimitRecordBatch(Limit popConfig, FragmentContext context, RecordBatch incoming) { super(popConfig, context, incoming); @@ -52,7 +53,6 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> { protected void setupNewSchema() throws SchemaChangeException { container.clear(); - List<TransferPair> transfers = Lists.newArrayList(); for(VectorWrapper<?> v : incoming){ TransferPair pair = v.getValueVector().getTransferPair(); @@ -74,9 +74,6 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> { container.buildSchema(BatchSchema.SelectionVectorMode.TWO_BYTE); - for(TransferPair tp : transfers) { - tp.transfer(); - } } @Override @@ -96,6 +93,9 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> { @Override protected void doWork() { + for(TransferPair tp : transfers) { + tp.transfer(); + } skipBatch = false; int recordCount = incoming.getRecordCount(); if(recordCount <= recordsToSkip) { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6c0389f3/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/GeneratorFunctions.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/GeneratorFunctions.java b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/GeneratorFunctions.java index d12633e..b79ccd0 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/GeneratorFunctions.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/GeneratorFunctions.java @@ -26,6 +26,7 @@ import org.apache.drill.exec.expr.annotations.FunctionTemplate; import org.apache.drill.exec.expr.annotations.FunctionTemplate.FunctionScope; import org.apache.drill.exec.expr.annotations.Output; import org.apache.drill.exec.expr.annotations.Param; +import org.apache.drill.exec.expr.annotations.Workspace; import org.apache.drill.exec.expr.holders.*; import org.apache.drill.exec.record.RecordBatch; @@ -38,17 +39,36 @@ public class GeneratorFunctions { OutputTypeDeterminer.FIXED_BIGINT, "randomBigInt"); public static final FunctionDefinition RANDOM_FLOAT8 = FunctionDefinition.simple("randomFloat8", new ArgumentValidators.NumericTypeAllowed(1,2, true), OutputTypeDeterminer.FIXED_FLOAT8, "randomFloat8"); + public static final FunctionDefinition INCREASING_BIGINT = FunctionDefinition.simple("increasingBigInt", new ArgumentValidators.NumericTypeAllowed(1, true), + OutputTypeDeterminer.FIXED_BIGINT, "increasingBigInt"); public static class Provider implements CallProvider { @Override public FunctionDefinition[] getFunctionDefintions() { return new FunctionDefinition[] { RANDOM_BIG_INT, - RANDOM_FLOAT8 }; + RANDOM_FLOAT8, + INCREASING_BIGINT }; } } + @FunctionTemplate(name = "increasingBigInt", scope = FunctionScope.SIMPLE, nulls = FunctionTemplate.NullHandling.NULL_IF_NULL) + public static class IncreasingBigInt implements DrillSimpleFunc { + + @Param BigIntHolder start; + @Workspace long current; + @Output BigIntHolder out; + + public void setup(RecordBatch incoming) { + current = 0; + } + + public void eval() { + out.value = start.value + current++; + } + } + @FunctionTemplate(name = "randomBigInt", scope = FunctionScope.SIMPLE, nulls = FunctionTemplate.NullHandling.NULL_IF_NULL) public static class RandomBigIntGauss implements DrillSimpleFunc { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6c0389f3/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestSimpleLimit.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestSimpleLimit.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestSimpleLimit.java index b254fc0..1ee9ceb 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestSimpleLimit.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestSimpleLimit.java @@ -38,8 +38,11 @@ import org.apache.drill.exec.physical.impl.SimpleRootExec; import org.apache.drill.exec.planner.PhysicalPlanReader; import org.apache.drill.exec.proto.CoordinationProtos; import org.apache.drill.exec.proto.ExecProtos; +import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.rpc.user.UserServer; import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.exec.vector.BigIntVector; +import org.apache.drill.exec.vector.ValueVector; import org.junit.Test; import static org.junit.Assert.assertEquals; @@ -79,6 +82,13 @@ public class TestSimpleLimit { }}; verifyLimitCount(bitContext, connection, "test2.json", 69999); + long start = 30000; + long end = 100000; + long expectedSum = (end - start) * (end + start - 1) / 2; //Formula for sum of series + + verifySum(bitContext, connection, "test4.json", 70000, expectedSum); + + } private void verifyLimitCount(DrillbitContext bitContext, UserServer.UserClientConnection connection, String testPlan, int expectedCount) throws Throwable { @@ -99,4 +109,29 @@ public class TestSimpleLimit { } assertTrue(!context.isFailed()); } + + private void verifySum(DrillbitContext bitContext, UserServer.UserClientConnection connection, String testPlan, int expectedCount, long expectedSum) throws Throwable { + PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance()); + PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile("/limit/" + testPlan), Charsets.UTF_8)); + FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c); + FragmentContext context = new FragmentContext(bitContext, ExecProtos.FragmentHandle.getDefaultInstance(), connection, null, registry); + SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next())); + int recordCount = 0; + long sum = 0; + while(exec.next()){ + recordCount += exec.getRecordCount(); + BigIntVector v = (BigIntVector) exec.iterator().next(); + for (int i = 0; i < v.getAccessor().getValueCount(); i++) { + sum += v.getAccessor().get(i); + } + } + + assertEquals(expectedCount, recordCount); + assertEquals(expectedSum, sum); + + if(context.getFailureCause() != null){ + throw context.getFailureCause(); + } + assertTrue(!context.isFailed()); + } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6c0389f3/exec/java-exec/src/test/resources/limit/test4.json ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/resources/limit/test4.json b/exec/java-exec/src/test/resources/limit/test4.json new file mode 100644 index 0000000..b7793b1 --- /dev/null +++ b/exec/java-exec/src/test/resources/limit/test4.json @@ -0,0 +1,49 @@ +{ + head:{ + type:"APACHE_DRILL_PHYSICAL", + version:"1", + generator:{ + type:"manual" + } + }, + graph:[ + { + @id:1, + pop:"mock-sub-scan", + url: "http://apache.org", + entries:[ + {records: 100000000, types: [ + {name: "blue", type: "INT", mode: "REQUIRED"}, + {name: "red", type: "BIGINT", mode: "REQUIRED"}, + {name: "green", type: "INT", mode: "REQUIRED"} + ]} + ] + }, + { + @id:2, + child: 1, + pop:"project", + exprs: [ + { ref: "col1", expr:"increasingBigInt(0)"} + ] + }, + { + @id:3, + child: 2, + pop:"limit", + first:30000, + last:100000 + }, + { + @id:4, + child:3, + pop: "selection-vector-remover" + + }, + { + @id: 5, + child: 4, + pop: "screen" + } + ] +} \ No newline at end of file
