Adding limit operator and sql rules
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/b7d41ebe Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/b7d41ebe Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/b7d41ebe Branch: refs/heads/master Commit: b7d41ebeeb393fbebaedeb8f84829987a137c68f Parents: 251022f Author: Timothy Chen <[email protected]> Authored: Mon Aug 19 23:29:58 2013 -0700 Committer: Timothy Chen <[email protected]> Committed: Mon Oct 14 12:06:18 2013 -0700 ---------------------------------------------------------------------- .../apache/drill/common/logical/data/Limit.java | 10 +- .../apache/drill/exec/opt/BasicOptimizer.java | 19 ++-- .../physical/base/AbstractPhysicalVisitor.java | 20 ++-- .../exec/physical/base/PhysicalVisitor.java | 15 +-- .../drill/exec/physical/config/Limit.java | 63 +++++++++++ .../drill/exec/physical/impl/ImplCreator.java | 19 ++-- .../physical/impl/limit/LimitBatchCreator.java | 17 +++ .../physical/impl/limit/LimitRecordBatch.java | 107 +++++++++++++++++++ .../exec/planner/fragment/StatsCollector.java | 7 ++ .../physical/impl/limit/TestSimpleLimit.java | 51 +++++++++ .../src/test/resources/limit/test1.json | 41 +++++++ .../apache/drill/exec/ref/rops/LimitROP.java | 8 +- .../org/apache/drill/optiq/DrillLimitRel.java | 43 ++++++++ .../org/apache/drill/optiq/DrillLimitRule.java | 40 +++++++ .../java/org/apache/drill/optiq/DrillOptiq.java | 1 + .../org/apache/drill/optiq/DrillSortRel.java | 14 ++- .../org/apache/drill/optiq/DrillSortRule.java | 5 + .../apache/drill/jdbc/test/FullEngineTest.java | 1 - .../org/apache/drill/jdbc/test/JdbcTest.java | 31 ++++++ 19 files changed, 450 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b7d41ebe/common/src/main/java/org/apache/drill/common/logical/data/Limit.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/drill/common/logical/data/Limit.java b/common/src/main/java/org/apache/drill/common/logical/data/Limit.java index 93eb182..6843d39 100644 --- a/common/src/main/java/org/apache/drill/common/logical/data/Limit.java +++ b/common/src/main/java/org/apache/drill/common/logical/data/Limit.java @@ -28,21 +28,21 @@ import java.util.Iterator; @JsonTypeName("limit") public class Limit extends SingleInputOperator{ - private final int first; - private final int last; + private final Integer first; + private final Integer last; @JsonCreator - public Limit(@JsonProperty("first") int first, @JsonProperty("last") int last) { + public Limit(@JsonProperty("first") Integer first, @JsonProperty("last") Integer last) { super(); this.first = first; this.last = last; } - public int getFirst() { + public Integer getFirst() { return first; } - public int getLast() { + public Integer getLast() { return last; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b7d41ebe/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java index 5a1fd6e..c09af88 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java @@ -32,19 +32,9 @@ import org.apache.drill.common.expression.LogicalExpression; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.logical.LogicalPlan; import org.apache.drill.common.logical.StorageEngineConfig; -import org.apache.drill.common.logical.data.CollapsingAggregate; -import org.apache.drill.common.logical.data.Filter; -import org.apache.drill.common.logical.data.Join; -import org.apache.drill.common.logical.data.JoinCondition; -import org.apache.drill.common.logical.data.NamedExpression; -import org.apache.drill.common.logical.data.Order; +import org.apache.drill.common.logical.data.*; import org.apache.drill.common.logical.data.Order.Direction; import org.apache.drill.common.logical.data.Order.Ordering; -import org.apache.drill.common.logical.data.Project; -import org.apache.drill.common.logical.data.Scan; -import org.apache.drill.common.logical.data.Segment; -import org.apache.drill.common.logical.data.SinkOperator; -import org.apache.drill.common.logical.data.Store; import org.apache.drill.common.logical.data.visitors.AbstractLogicalVisitor; import org.apache.drill.common.types.TypeProtos; import org.apache.drill.common.types.TypeProtos.DataMode; @@ -57,6 +47,7 @@ import org.apache.drill.exec.physical.config.MergeJoinPOP; import org.apache.drill.exec.physical.config.Screen; import org.apache.drill.exec.physical.config.SelectionVectorRemover; import org.apache.drill.exec.physical.config.Sort; +import org.apache.drill.exec.physical.config.Limit; import org.apache.drill.exec.physical.config.StreamingAggregate; import org.apache.drill.exec.store.StorageEngine; @@ -139,7 +130,11 @@ public class BasicOptimizer extends Optimizer{ return new SelectionVectorRemover(new Sort(input, ods, false)); } - + @Override + public PhysicalOperator visitLimit(org.apache.drill.common.logical.data.Limit limit, Object value) throws OptimizerException { + PhysicalOperator input = limit.getInput().accept(this, value); + return new SelectionVectorRemover(new Limit(input, limit.getFirst(), limit.getLast())); + } @Override public PhysicalOperator visitCollapsingAggregate(CollapsingAggregate agg, Object value) http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b7d41ebe/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java index 286144b..6c087a1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java @@ -17,19 +17,7 @@ */ package org.apache.drill.exec.physical.base; -import org.apache.drill.exec.physical.config.Filter; -import org.apache.drill.exec.physical.config.HashPartitionSender; -import org.apache.drill.exec.physical.config.HashToRandomExchange; -import org.apache.drill.exec.physical.config.MergeJoinPOP; -import org.apache.drill.exec.physical.config.Project; -import org.apache.drill.exec.physical.config.RandomReceiver; -import org.apache.drill.exec.physical.config.RangeSender; -import org.apache.drill.exec.physical.config.Screen; -import org.apache.drill.exec.physical.config.SingleSender; -import org.apache.drill.exec.physical.config.Sort; -import org.apache.drill.exec.physical.config.StreamingAggregate; -import org.apache.drill.exec.physical.config.Union; -import org.apache.drill.exec.physical.config.UnionExchange; +import org.apache.drill.exec.physical.config.*; public abstract class AbstractPhysicalVisitor<T, X, E extends Throwable> implements PhysicalVisitor<T, X, E> { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractPhysicalVisitor.class); @@ -58,7 +46,11 @@ public abstract class AbstractPhysicalVisitor<T, X, E extends Throwable> impleme public T visitSort(Sort sort, X value) throws E{ return visitOp(sort, value); } - + + @Override + public T visitLimit(Limit limit, X value) throws E { + return visitOp(limit, value); + } @Override public T visitStreamingAggregate(StreamingAggregate agg, X value) throws E { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b7d41ebe/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java index a36b65a..aef9d78 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java @@ -17,19 +17,7 @@ */ package org.apache.drill.exec.physical.base; -import org.apache.drill.exec.physical.config.Filter; -import org.apache.drill.exec.physical.config.HashPartitionSender; -import org.apache.drill.exec.physical.config.HashToRandomExchange; -import org.apache.drill.exec.physical.config.MergeJoinPOP; -import org.apache.drill.exec.physical.config.Project; -import org.apache.drill.exec.physical.config.RandomReceiver; -import org.apache.drill.exec.physical.config.RangeSender; -import org.apache.drill.exec.physical.config.Screen; -import org.apache.drill.exec.physical.config.SingleSender; -import org.apache.drill.exec.physical.config.Sort; -import org.apache.drill.exec.physical.config.StreamingAggregate; -import org.apache.drill.exec.physical.config.Union; -import org.apache.drill.exec.physical.config.UnionExchange; +import org.apache.drill.exec.physical.config.*; /** * Visitor class designed to traversal of a operator tree. Basis for a number of operator manipulations including fragmentation and materialization. @@ -50,6 +38,7 @@ public interface PhysicalVisitor<RETURN, EXTRA, EXCEP extends Throwable> { public RETURN visitUnion(Union union, EXTRA value) throws EXCEP; public RETURN visitProject(Project project, EXTRA value) throws EXCEP; public RETURN visitSort(Sort sort, EXTRA value) throws EXCEP; + public RETURN visitLimit(Limit limit, EXTRA value) throws EXCEP; public RETURN visitMergeJoin(MergeJoinPOP join, EXTRA value) throws EXCEP; public RETURN visitSender(Sender sender, EXTRA value) throws EXCEP; public RETURN visitReceiver(Receiver receiver, EXTRA value) throws EXCEP; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b7d41ebe/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Limit.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Limit.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Limit.java new file mode 100644 index 0000000..b926e3e --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Limit.java @@ -0,0 +1,63 @@ +/******************************************************************************* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ******************************************************************************/ +package org.apache.drill.exec.physical.config; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import org.apache.drill.common.expression.LogicalExpression; +import org.apache.drill.exec.physical.OperatorCost; +import org.apache.drill.exec.physical.base.AbstractSingle; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.base.PhysicalVisitor; + +@JsonTypeName("limit") +public class Limit extends AbstractSingle { + private final Integer first; + private final Integer last; + + @JsonCreator + public Limit(@JsonProperty("child") PhysicalOperator child, @JsonProperty("first") Integer first, @JsonProperty("last") Integer last) { + super(child); + this.first = first; + this.last = last; + } + + public Integer getFirst() { + return first; + } + + public Integer getLast() { + return last; + } + + @Override + protected PhysicalOperator getNewWithChild(PhysicalOperator child) { + return new Limit(child, first, last); + } + + @Override + public OperatorCost getCost() { + return new OperatorCost(0, 0, 0, 0.25f); + } + + @Override + public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E { + return physicalVisitor.visitLimit(this, value); + } +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b7d41ebe/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java index 94acc0e..35ef8ac 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java @@ -26,21 +26,12 @@ import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor; import org.apache.drill.exec.physical.base.FragmentRoot; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.base.SubScan; -import org.apache.drill.exec.physical.config.Filter; -import org.apache.drill.exec.physical.config.HashPartitionSender; -import org.apache.drill.exec.physical.config.MergeJoinPOP; -import org.apache.drill.exec.physical.config.Project; -import org.apache.drill.exec.physical.config.RandomReceiver; -import org.apache.drill.exec.physical.config.Screen; -import org.apache.drill.exec.physical.config.SelectionVectorRemover; -import org.apache.drill.exec.physical.config.SingleSender; -import org.apache.drill.exec.physical.config.Sort; -import org.apache.drill.exec.physical.config.StreamingAggregate; -import org.apache.drill.exec.physical.config.Union; +import org.apache.drill.exec.physical.config.*; import org.apache.drill.exec.physical.impl.aggregate.AggBatchCreator; import org.apache.drill.exec.physical.config.Union; import org.apache.drill.exec.physical.impl.filter.FilterBatchCreator; import org.apache.drill.exec.physical.impl.join.MergeJoinCreator; +import org.apache.drill.exec.physical.impl.limit.LimitBatchCreator; import org.apache.drill.exec.physical.impl.partitionsender.PartitionSenderCreator; import org.apache.drill.exec.physical.impl.project.ProjectBatchCreator; import org.apache.drill.exec.physical.impl.sort.SortBatchCreator; @@ -72,6 +63,7 @@ public class ImplCreator extends AbstractPhysicalVisitor<RecordBatch, FragmentCo private SingleSenderCreator ssc = new SingleSenderCreator(); private ProjectBatchCreator pbc = new ProjectBatchCreator(); private FilterBatchCreator fbc = new FilterBatchCreator(); + private LimitBatchCreator lbc = new LimitBatchCreator(); private UnionBatchCreator unionbc = new UnionBatchCreator(); private SVRemoverCreator svc = new SVRemoverCreator(); private SortBatchCreator sbc = new SortBatchCreator(); @@ -122,6 +114,11 @@ public class ImplCreator extends AbstractPhysicalVisitor<RecordBatch, FragmentCo } @Override + public RecordBatch visitLimit(Limit limit, FragmentContext context) throws ExecutionSetupException { + return lbc.getBatch(context, limit, getChildren(limit, context)); + } + + @Override public RecordBatch visitMergeJoin(MergeJoinPOP op, FragmentContext context) throws ExecutionSetupException { return mjc.getBatch(context, op, getChildren(op, context)); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b7d41ebe/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitBatchCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitBatchCreator.java new file mode 100644 index 0000000..b378f9b --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitBatchCreator.java @@ -0,0 +1,17 @@ +package org.apache.drill.exec.physical.impl.limit; + +import com.google.common.collect.Iterables; +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.physical.config.Limit; +import org.apache.drill.exec.physical.impl.BatchCreator; +import org.apache.drill.exec.record.RecordBatch; + +import java.util.List; + +public class LimitBatchCreator implements BatchCreator<Limit> { + @Override + public RecordBatch getBatch(FragmentContext context, Limit config, List<RecordBatch> children) throws ExecutionSetupException { + return new LimitRecordBatch(config, context, Iterables.getOnlyElement(children)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b7d41ebe/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java new file mode 100644 index 0000000..ed09663 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java @@ -0,0 +1,107 @@ +package org.apache.drill.exec.physical.impl.limit; + +import com.beust.jcommander.internal.Lists; +import com.google.common.base.Objects; +import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.physical.config.Limit; +import org.apache.drill.exec.record.*; +import org.apache.drill.exec.record.selection.SelectionVector2; +import org.apache.drill.exec.vector.ValueVector; + +import java.util.List; + +public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> { + + private SelectionVector2 outgoingSv; + private SelectionVector2 incomingSv; + + public LimitRecordBatch(Limit popConfig, FragmentContext context, RecordBatch incoming) { + super(popConfig, context, incoming); + outgoingSv = new SelectionVector2(context.getAllocator()); + } + + @Override + protected void setupNewSchema() throws SchemaChangeException { + container.clear(); + + List<TransferPair> transfers = Lists.newArrayList(); + + for(VectorWrapper<?> v : incoming){ + TransferPair pair = v.getValueVector().getTransferPair(); + container.add(pair.getTo()); + transfers.add(pair); + } + + BatchSchema.SelectionVectorMode svMode = incoming.getSchema().getSelectionVectorMode(); + + switch(svMode){ + case NONE: + break; + case TWO_BYTE: + this.incomingSv = incoming.getSelectionVector2(); + break; + default: + throw new UnsupportedOperationException(); + } + + container.buildSchema(BatchSchema.SelectionVectorMode.TWO_BYTE); + + + for(TransferPair tp : transfers) { + tp.transfer(); + } + } + + @Override + public SelectionVector2 getSelectionVector2() { + return outgoingSv; + } + + @Override + protected void doWork() { + int recordCount = incoming.getRecordCount(); + outgoingSv.allocateNew(recordCount); + + if(incomingSv != null) { + limitWithSV(recordCount); + } else { + limitWithNoSV(recordCount); + } + } + + private void limitWithNoSV(int recordCount) { + int svIndex = 0; + int offset = Math.max(0, Objects.firstNonNull(popConfig.getFirst(), 0)); + int fetch = Math.min(recordCount, Objects.firstNonNull(popConfig.getLast(), recordCount)); + for(char i = (char) offset; i < fetch; i++) { + outgoingSv.setIndex(svIndex, i); + svIndex++; + } + outgoingSv.setRecordCount(svIndex); + } + + private void limitWithSV(int recordCount) { + int svIndex = 0; + int offset = Math.max(0, Objects.firstNonNull(popConfig.getFirst(), 0)); + int fetch = Math.min(recordCount, Objects.firstNonNull(popConfig.getLast(), recordCount)); + for(int i = offset; i < fetch; i++) { + char index = incomingSv.getIndex(i); + outgoingSv.setIndex(svIndex, index); + svIndex++; + } + + outgoingSv.setRecordCount(svIndex); + } + + @Override + public int getRecordCount() { + return outgoingSv.getCount(); + } + + @Override + protected void cleanup(){ + super.cleanup(); + outgoingSv.clear(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b7d41ebe/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java index 0368d0c..ca933c6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java @@ -19,6 +19,7 @@ package org.apache.drill.exec.planner.fragment; import org.apache.drill.exec.physical.base.*; import org.apache.drill.exec.physical.base.GroupScan; +import org.apache.drill.exec.physical.config.Limit; import org.apache.drill.exec.planner.AbstractOpWrapperVisitor; import org.apache.drill.exec.planner.fragment.Fragment.ExchangeFragmentPair; @@ -92,6 +93,12 @@ public class StatsCollector { } @Override + public Void visitLimit(Limit limit, Wrapper value) throws RuntimeException { + // TODO: Implement this + return visitOp(limit, value); + } + + @Override public Void visitOp(PhysicalOperator op, Wrapper wrapper) { if(op instanceof HasAffinity){ wrapper.addEndpointAffinity(((HasAffinity)op).getOperatorAffinity()); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b7d41ebe/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestSimpleLimit.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestSimpleLimit.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestSimpleLimit.java new file mode 100644 index 0000000..c8758fd --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestSimpleLimit.java @@ -0,0 +1,51 @@ +package org.apache.drill.exec.physical.impl.limit; + +import com.google.common.base.Charsets; +import com.google.common.io.Files; +import com.yammer.metrics.MetricRegistry; +import mockit.Injectable; +import mockit.NonStrictExpectations; +import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.common.util.FileUtils; +import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry; +import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.physical.PhysicalPlan; +import org.apache.drill.exec.physical.base.FragmentRoot; +import org.apache.drill.exec.physical.impl.ImplCreator; +import org.apache.drill.exec.physical.impl.SimpleRootExec; +import org.apache.drill.exec.planner.PhysicalPlanReader; +import org.apache.drill.exec.proto.CoordinationProtos; +import org.apache.drill.exec.proto.ExecProtos; +import org.apache.drill.exec.rpc.user.UserServer; +import org.apache.drill.exec.server.DrillbitContext; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TestSimpleLimit { + + DrillConfig c = DrillConfig.create(); + @Test + public void testLimit(@Injectable final DrillbitContext bitContext, @Injectable UserServer.UserClientConnection connection) throws Throwable{ + new NonStrictExpectations(){{ + bitContext.getMetrics(); result = new MetricRegistry("test"); + bitContext.getAllocator(); result = BufferAllocator.getAllocator(c); + }}; + + PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance()); + PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile("/limit/test1.json"), Charsets.UTF_8)); + FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c); + FragmentContext context = new FragmentContext(bitContext, ExecProtos.FragmentHandle.getDefaultInstance(), connection, null, registry); + SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next())); + while(exec.next()){ + assertEquals(6, exec.getRecordCount()); + } + + if(context.getFailureCause() != null){ + throw context.getFailureCause(); + } + assertTrue(!context.isFailed()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b7d41ebe/exec/java-exec/src/test/resources/limit/test1.json ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/resources/limit/test1.json b/exec/java-exec/src/test/resources/limit/test1.json new file mode 100644 index 0000000..79d6748 --- /dev/null +++ b/exec/java-exec/src/test/resources/limit/test1.json @@ -0,0 +1,41 @@ +{ + head:{ + type:"APACHE_DRILL_PHYSICAL", + version:"1", + generator:{ + type:"manual" + } + }, + graph:[ + { + @id:1, + pop:"mock-sub-scan", + url: "http://apache.org", + entries:[ + {records: 100, types: [ + {name: "blue", type: "INT", mode: "REQUIRED"}, + {name: "red", type: "BIGINT", mode: "REQUIRED"}, + {name: "green", type: "INT", mode: "REQUIRED"} + ]} + ] + }, + { + @id:2, + child: 1, + pop:"limit", + first:5, + last:10 + }, + { + @id:4, + child:2, + pop: "selection-vector-remover" + + }, + { + @id: 3, + child: 4, + pop: "screen" + } + ] +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b7d41ebe/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/LimitROP.java ---------------------------------------------------------------------- diff --git a/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/LimitROP.java b/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/LimitROP.java index 00baf81..d26bcd9 100644 --- a/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/LimitROP.java +++ b/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/LimitROP.java @@ -25,8 +25,8 @@ import org.apache.drill.exec.ref.eval.EvaluatorFactory; public class LimitROP extends SingleInputROPBase<Limit>{ private LimitIterator iter; - private int first; - private int last; + private Integer first; + private Integer last; public LimitROP(Limit config) { super(config); @@ -62,10 +62,10 @@ public class LimitROP extends SingleInputROPBase<Limit>{ while(true){ r = incoming.next(); currentIndex++; - if (currentIndex > first && currentIndex <= last) + if (currentIndex > first && (last == null || currentIndex <= last)) return r; - if (currentIndex > last) + if (last != null && currentIndex > last) return NextOutcome.NONE_LEFT; } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b7d41ebe/sqlparser/src/main/java/org/apache/drill/optiq/DrillLimitRel.java ---------------------------------------------------------------------- diff --git a/sqlparser/src/main/java/org/apache/drill/optiq/DrillLimitRel.java b/sqlparser/src/main/java/org/apache/drill/optiq/DrillLimitRel.java new file mode 100644 index 0000000..9b4ef78 --- /dev/null +++ b/sqlparser/src/main/java/org/apache/drill/optiq/DrillLimitRel.java @@ -0,0 +1,43 @@ +package org.apache.drill.optiq; + +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.eigenbase.rel.RelNode; +import org.eigenbase.rel.SingleRel; +import org.eigenbase.relopt.RelOptCluster; +import org.eigenbase.relopt.RelTraitSet; +import org.eigenbase.rex.RexLiteral; +import org.eigenbase.rex.RexNode; + +import java.util.List; + +public class DrillLimitRel extends SingleRel implements DrillRel { + private RexNode offset; + private RexNode fetch; + + public DrillLimitRel(RelOptCluster cluster, RelTraitSet traitSet, RelNode child, RexNode offset, RexNode fetch) { + super(cluster, traitSet, child); + this.offset = offset; + this.fetch = fetch; + } + + @Override + public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) { + return new DrillLimitRel(getCluster(), traitSet, sole(inputs), offset, fetch); + } + + @Override + public int implement(DrillImplementor implementor) { + int inputId = implementor.visitChild(this, 0, getChild()); + final ObjectNode limit = implementor.mapper.createObjectNode(); + limit.put("op", "limit"); + limit.put("input", inputId); + int offsetVal = offset != null ? Math.max(0, RexLiteral.intValue(offset)) : 0; + // First offset to include into results (inclusive). Null implies it is starting from offset 0 + limit.put("first", offsetVal); + // Last offset to stop including into results (exclusive), translating fetch row counts into an offset. + // Null value implies including entire remaining result set from first offset + limit.put("last", fetch != null ? Math.max(0, RexLiteral.intValue(fetch)) + offsetVal : null); + return implementor.add(limit); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b7d41ebe/sqlparser/src/main/java/org/apache/drill/optiq/DrillLimitRule.java ---------------------------------------------------------------------- diff --git a/sqlparser/src/main/java/org/apache/drill/optiq/DrillLimitRule.java b/sqlparser/src/main/java/org/apache/drill/optiq/DrillLimitRule.java new file mode 100644 index 0000000..c7c2a4e --- /dev/null +++ b/sqlparser/src/main/java/org/apache/drill/optiq/DrillLimitRule.java @@ -0,0 +1,40 @@ +package org.apache.drill.optiq; + +import org.eigenbase.rel.RelCollationImpl; +import org.eigenbase.rel.RelNode; +import org.eigenbase.rel.SortRel; +import org.eigenbase.relopt.Convention; +import org.eigenbase.relopt.RelOptRule; +import org.eigenbase.relopt.RelOptRuleCall; +import org.eigenbase.relopt.RelTraitSet; + +public class DrillLimitRule extends RelOptRule { + public static DrillLimitRule INSTANCE = new DrillLimitRule(); + + private DrillLimitRule() { + super(RelOptRule.some(SortRel.class, Convention.NONE, RelOptRule.any(RelNode.class)), "DrillLimitRule"); + } + + @Override + public void onMatch(RelOptRuleCall call) { + final SortRel sort = call.rel(0); + if (sort.offset == null && sort.fetch == null) { + return; + } + final RelTraitSet traits = sort.getTraitSet().plus(DrillRel.CONVENTION); + RelNode input = sort.getChild(); + //RelNode input = sort.getChild(); + if (!sort.getCollation().getFieldCollations().isEmpty()) { + input = sort.copy( + sort.getTraitSet().replace(RelCollationImpl.EMPTY), + input, + RelCollationImpl.EMPTY, + null, + null); + } + RelNode x = convert( + input, + input.getTraitSet().replace(DrillRel.CONVENTION)); + call.transformTo(new DrillLimitRel(sort.getCluster(), traits, x, sort.offset, sort.fetch)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b7d41ebe/sqlparser/src/main/java/org/apache/drill/optiq/DrillOptiq.java ---------------------------------------------------------------------- diff --git a/sqlparser/src/main/java/org/apache/drill/optiq/DrillOptiq.java b/sqlparser/src/main/java/org/apache/drill/optiq/DrillOptiq.java index b01aa7d..1245cfe 100644 --- a/sqlparser/src/main/java/org/apache/drill/optiq/DrillOptiq.java +++ b/sqlparser/src/main/java/org/apache/drill/optiq/DrillOptiq.java @@ -56,6 +56,7 @@ public class DrillOptiq { // Enable when https://issues.apache.org/jira/browse/DRILL-57 fixed if (false) planner.addRule(DrillValuesRule.INSTANCE); + planner.addRule(DrillLimitRule.INSTANCE); planner.addRule(DrillSortRule.INSTANCE); planner.addRule(DrillJoinRule.INSTANCE); planner.addRule(DrillUnionRule.INSTANCE); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b7d41ebe/sqlparser/src/main/java/org/apache/drill/optiq/DrillSortRel.java ---------------------------------------------------------------------- diff --git a/sqlparser/src/main/java/org/apache/drill/optiq/DrillSortRel.java b/sqlparser/src/main/java/org/apache/drill/optiq/DrillSortRel.java index b2e9b50..929d381 100644 --- a/sqlparser/src/main/java/org/apache/drill/optiq/DrillSortRel.java +++ b/sqlparser/src/main/java/org/apache/drill/optiq/DrillSortRel.java @@ -30,19 +30,27 @@ import org.eigenbase.relopt.RelTraitSet; import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; +import org.eigenbase.rex.RexLiteral; +import org.eigenbase.rex.RexNode; /** * Sort implemented in Drill. */ public class DrillSortRel extends SortRel implements DrillRel { + /** Creates a DrillSortRel. */ public DrillSortRel(RelOptCluster cluster, RelTraitSet traits, RelNode input, RelCollation collation) { super(cluster, traits, input, collation); } + /** Creates a DrillSortRel with offset and fetch. */ + public DrillSortRel(RelOptCluster cluster, RelTraitSet traits, RelNode input, RelCollation collation, RexNode offset, RexNode fetch) { + super(cluster, traits, input, collation, offset, fetch); + } + @Override - public DrillSortRel copy(RelTraitSet traitSet, RelNode input, RelCollation collation) { - return new DrillSortRel(getCluster(), traitSet, input, collation); + public DrillSortRel copy(RelTraitSet traitSet, RelNode input, RelCollation collation, RexNode offset, RexNode fetch) { + return new DrillSortRel(getCluster(), traitSet, input, collation, offset, fetch); } @Override @@ -75,6 +83,8 @@ public class DrillSortRel extends SortRel implements DrillRel { return implementor.add(order); } + + private static String toDrill(RelFieldCollation collation) { switch (collation.getDirection()) { case Ascending: http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b7d41ebe/sqlparser/src/main/java/org/apache/drill/optiq/DrillSortRule.java ---------------------------------------------------------------------- diff --git a/sqlparser/src/main/java/org/apache/drill/optiq/DrillSortRule.java b/sqlparser/src/main/java/org/apache/drill/optiq/DrillSortRule.java index d5eac2e..22b9f5d 100644 --- a/sqlparser/src/main/java/org/apache/drill/optiq/DrillSortRule.java +++ b/sqlparser/src/main/java/org/apache/drill/optiq/DrillSortRule.java @@ -34,6 +34,11 @@ public class DrillSortRule extends RelOptRule { @Override public void onMatch(RelOptRuleCall call) { final SortRel sort = call.rel(0); + + if(sort.offset != null || sort.fetch != null) { + return; //Sort already handled by DrillLimitRule + } + final RelNode input = call.rel(1); final RelTraitSet traits = sort.getTraitSet().plus(DrillRel.CONVENTION); final RelTraitSet inputTraits = input.getTraitSet().plus(DrillRel.CONVENTION); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b7d41ebe/sqlparser/src/test/java/org/apache/drill/jdbc/test/FullEngineTest.java ---------------------------------------------------------------------- diff --git a/sqlparser/src/test/java/org/apache/drill/jdbc/test/FullEngineTest.java b/sqlparser/src/test/java/org/apache/drill/jdbc/test/FullEngineTest.java index 45d1ff3..f271bad 100644 --- a/sqlparser/src/test/java/org/apache/drill/jdbc/test/FullEngineTest.java +++ b/sqlparser/src/test/java/org/apache/drill/jdbc/test/FullEngineTest.java @@ -48,5 +48,4 @@ public class FullEngineTest { // .sql("select cast(_MAP['red'] as bigint) + 1 as red_inc from donuts ") .sql("select * from \"department.json\" ").displayResults(50); } - } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b7d41ebe/sqlparser/src/test/java/org/apache/drill/jdbc/test/JdbcTest.java ---------------------------------------------------------------------- diff --git a/sqlparser/src/test/java/org/apache/drill/jdbc/test/JdbcTest.java b/sqlparser/src/test/java/org/apache/drill/jdbc/test/JdbcTest.java index 36b024c..80196b2 100644 --- a/sqlparser/src/test/java/org/apache/drill/jdbc/test/JdbcTest.java +++ b/sqlparser/src/test/java/org/apache/drill/jdbc/test/JdbcTest.java @@ -399,6 +399,37 @@ public class JdbcTest { + "DEPTID=null; LASTNAME=John\n") .planContains("'op':'order'"); } + + @Test + public void testLimit() throws Exception { + JdbcAssert + .withModel(MODEL, "HR") + .sql("select LASTNAME from emp limit 2") + .planContains("\"op\":\"limit\"") + .returns("LASTNAME=Rafferty\n" + + "LASTNAME=Jones"); + } + + @Test + public void testOrderByWithOffset() throws Exception { + JdbcAssert + .withModel(MODEL, "HR") + .sql("select LASTNAME from emp order by LASTNAME asc offset 3") + .planContains("\"op\":\"limit\"") + .returns("LASTNAME=Robinson\n" + + "LASTNAME=Smith\n" + + "LASTNAME=John"); + } + + @Test + public void testOrderByWithOffsetAndFetch() throws Exception { + JdbcAssert + .withModel(MODEL, "HR") + .sql("select LASTNAME from emp order by LASTNAME asc offset 3 fetch next 2 rows only") + .planContains("\"op\":\"limit\"") + .returns("LASTNAME=Robinson\n" + + "LASTNAME=Smith"); + } } // End JdbcTest.java
