Adding limit operator and sql rules

Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/b7d41ebe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/b7d41ebe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/b7d41ebe

Branch: refs/heads/master
Commit: b7d41ebeeb393fbebaedeb8f84829987a137c68f
Parents: 251022f
Author: Timothy Chen <[email protected]>
Authored: Mon Aug 19 23:29:58 2013 -0700
Committer: Timothy Chen <[email protected]>
Committed: Mon Oct 14 12:06:18 2013 -0700

----------------------------------------------------------------------
 .../apache/drill/common/logical/data/Limit.java |  10 +-
 .../apache/drill/exec/opt/BasicOptimizer.java   |  19 ++--
 .../physical/base/AbstractPhysicalVisitor.java  |  20 ++--
 .../exec/physical/base/PhysicalVisitor.java     |  15 +--
 .../drill/exec/physical/config/Limit.java       |  63 +++++++++++
 .../drill/exec/physical/impl/ImplCreator.java   |  19 ++--
 .../physical/impl/limit/LimitBatchCreator.java  |  17 +++
 .../physical/impl/limit/LimitRecordBatch.java   | 107 +++++++++++++++++++
 .../exec/planner/fragment/StatsCollector.java   |   7 ++
 .../physical/impl/limit/TestSimpleLimit.java    |  51 +++++++++
 .../src/test/resources/limit/test1.json         |  41 +++++++
 .../apache/drill/exec/ref/rops/LimitROP.java    |   8 +-
 .../org/apache/drill/optiq/DrillLimitRel.java   |  43 ++++++++
 .../org/apache/drill/optiq/DrillLimitRule.java  |  40 +++++++
 .../java/org/apache/drill/optiq/DrillOptiq.java |   1 +
 .../org/apache/drill/optiq/DrillSortRel.java    |  14 ++-
 .../org/apache/drill/optiq/DrillSortRule.java   |   5 +
 .../apache/drill/jdbc/test/FullEngineTest.java  |   1 -
 .../org/apache/drill/jdbc/test/JdbcTest.java    |  31 ++++++
 19 files changed, 450 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b7d41ebe/common/src/main/java/org/apache/drill/common/logical/data/Limit.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/drill/common/logical/data/Limit.java 
b/common/src/main/java/org/apache/drill/common/logical/data/Limit.java
index 93eb182..6843d39 100644
--- a/common/src/main/java/org/apache/drill/common/logical/data/Limit.java
+++ b/common/src/main/java/org/apache/drill/common/logical/data/Limit.java
@@ -28,21 +28,21 @@ import java.util.Iterator;
 @JsonTypeName("limit")
 public class Limit extends SingleInputOperator{
   
-  private final int first; 
-  private final int last;
+  private final Integer first;
+  private final Integer last;
   
   @JsonCreator
-  public Limit(@JsonProperty("first") int first, @JsonProperty("last") int 
last) {
+  public Limit(@JsonProperty("first") Integer first, @JsonProperty("last") 
Integer last) {
     super();
     this.first = first;
     this.last = last;
   }
 
-  public int getFirst() {
+  public Integer getFirst() {
     return first;
   }
 
-  public int getLast() {
+  public Integer getLast() {
     return last;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b7d41ebe/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
index 5a1fd6e..c09af88 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
@@ -32,19 +32,9 @@ import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.LogicalPlan;
 import org.apache.drill.common.logical.StorageEngineConfig;
-import org.apache.drill.common.logical.data.CollapsingAggregate;
-import org.apache.drill.common.logical.data.Filter;
-import org.apache.drill.common.logical.data.Join;
-import org.apache.drill.common.logical.data.JoinCondition;
-import org.apache.drill.common.logical.data.NamedExpression;
-import org.apache.drill.common.logical.data.Order;
+import org.apache.drill.common.logical.data.*;
 import org.apache.drill.common.logical.data.Order.Direction;
 import org.apache.drill.common.logical.data.Order.Ordering;
-import org.apache.drill.common.logical.data.Project;
-import org.apache.drill.common.logical.data.Scan;
-import org.apache.drill.common.logical.data.Segment;
-import org.apache.drill.common.logical.data.SinkOperator;
-import org.apache.drill.common.logical.data.Store;
 import org.apache.drill.common.logical.data.visitors.AbstractLogicalVisitor;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.TypeProtos.DataMode;
@@ -57,6 +47,7 @@ import org.apache.drill.exec.physical.config.MergeJoinPOP;
 import org.apache.drill.exec.physical.config.Screen;
 import org.apache.drill.exec.physical.config.SelectionVectorRemover;
 import org.apache.drill.exec.physical.config.Sort;
+import org.apache.drill.exec.physical.config.Limit;
 import org.apache.drill.exec.physical.config.StreamingAggregate;
 import org.apache.drill.exec.store.StorageEngine;
 
@@ -139,7 +130,11 @@ public class BasicOptimizer extends Optimizer{
       return new SelectionVectorRemover(new Sort(input, ods, false));
     }
 
-
+    @Override
+    public PhysicalOperator 
visitLimit(org.apache.drill.common.logical.data.Limit limit, Object value) 
throws OptimizerException {
+      PhysicalOperator input = limit.getInput().accept(this, value);
+      return new SelectionVectorRemover(new Limit(input, limit.getFirst(), 
limit.getLast()));
+    }
 
     @Override
     public PhysicalOperator visitCollapsingAggregate(CollapsingAggregate agg, 
Object value)

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b7d41ebe/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
index 286144b..6c087a1 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
@@ -17,19 +17,7 @@
  */
 package org.apache.drill.exec.physical.base;
 
-import org.apache.drill.exec.physical.config.Filter;
-import org.apache.drill.exec.physical.config.HashPartitionSender;
-import org.apache.drill.exec.physical.config.HashToRandomExchange;
-import org.apache.drill.exec.physical.config.MergeJoinPOP;
-import org.apache.drill.exec.physical.config.Project;
-import org.apache.drill.exec.physical.config.RandomReceiver;
-import org.apache.drill.exec.physical.config.RangeSender;
-import org.apache.drill.exec.physical.config.Screen;
-import org.apache.drill.exec.physical.config.SingleSender;
-import org.apache.drill.exec.physical.config.Sort;
-import org.apache.drill.exec.physical.config.StreamingAggregate;
-import org.apache.drill.exec.physical.config.Union;
-import org.apache.drill.exec.physical.config.UnionExchange;
+import org.apache.drill.exec.physical.config.*;
 
 public abstract class AbstractPhysicalVisitor<T, X, E extends Throwable> 
implements PhysicalVisitor<T, X, E> {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(AbstractPhysicalVisitor.class);
@@ -58,7 +46,11 @@ public abstract class AbstractPhysicalVisitor<T, X, E 
extends Throwable> impleme
   public T visitSort(Sort sort, X value) throws E{
     return visitOp(sort, value);
   }
-  
+
+  @Override
+  public T visitLimit(Limit limit, X value) throws E {
+    return visitOp(limit, value);
+  }
 
   @Override
   public T visitStreamingAggregate(StreamingAggregate agg, X value) throws E {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b7d41ebe/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
index a36b65a..aef9d78 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
@@ -17,19 +17,7 @@
  */
 package org.apache.drill.exec.physical.base;
 
-import org.apache.drill.exec.physical.config.Filter;
-import org.apache.drill.exec.physical.config.HashPartitionSender;
-import org.apache.drill.exec.physical.config.HashToRandomExchange;
-import org.apache.drill.exec.physical.config.MergeJoinPOP;
-import org.apache.drill.exec.physical.config.Project;
-import org.apache.drill.exec.physical.config.RandomReceiver;
-import org.apache.drill.exec.physical.config.RangeSender;
-import org.apache.drill.exec.physical.config.Screen;
-import org.apache.drill.exec.physical.config.SingleSender;
-import org.apache.drill.exec.physical.config.Sort;
-import org.apache.drill.exec.physical.config.StreamingAggregate;
-import org.apache.drill.exec.physical.config.Union;
-import org.apache.drill.exec.physical.config.UnionExchange;
+import org.apache.drill.exec.physical.config.*;
 
 /**
  * Visitor class designed to traversal of a operator tree.  Basis for a number 
of operator manipulations including fragmentation and materialization.
@@ -50,6 +38,7 @@ public interface PhysicalVisitor<RETURN, EXTRA, EXCEP extends 
Throwable> {
   public RETURN visitUnion(Union union, EXTRA value) throws EXCEP;
   public RETURN visitProject(Project project, EXTRA value) throws EXCEP;
   public RETURN visitSort(Sort sort, EXTRA value) throws EXCEP;
+  public RETURN visitLimit(Limit limit, EXTRA value) throws EXCEP;
   public RETURN visitMergeJoin(MergeJoinPOP join, EXTRA value) throws EXCEP;
   public RETURN visitSender(Sender sender, EXTRA value) throws EXCEP;
   public RETURN visitReceiver(Receiver receiver, EXTRA value) throws EXCEP;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b7d41ebe/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Limit.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Limit.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Limit.java
new file mode 100644
index 0000000..b926e3e
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Limit.java
@@ -0,0 +1,63 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ 
******************************************************************************/
+package org.apache.drill.exec.physical.config;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.exec.physical.OperatorCost;
+import org.apache.drill.exec.physical.base.AbstractSingle;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+
+@JsonTypeName("limit")
+public class Limit extends AbstractSingle {
+  private final Integer first;
+  private final Integer last;
+
+  @JsonCreator
+  public Limit(@JsonProperty("child") PhysicalOperator child, 
@JsonProperty("first") Integer first, @JsonProperty("last") Integer last) {
+    super(child);
+    this.first = first;
+    this.last = last;
+  }
+
+  public Integer getFirst() {
+    return first;
+  }
+
+  public Integer getLast() {
+    return last;
+  }
+
+  @Override
+  protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
+    return new Limit(child, first, last);
+  }
+
+  @Override
+  public OperatorCost getCost() {
+    return new OperatorCost(0, 0, 0, 0.25f);
+  }
+
+  @Override
+  public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> 
physicalVisitor, X value) throws E {
+    return physicalVisitor.visitLimit(this, value);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b7d41ebe/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
index 94acc0e..35ef8ac 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
@@ -26,21 +26,12 @@ import 
org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
 import org.apache.drill.exec.physical.base.FragmentRoot;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.SubScan;
-import org.apache.drill.exec.physical.config.Filter;
-import org.apache.drill.exec.physical.config.HashPartitionSender;
-import org.apache.drill.exec.physical.config.MergeJoinPOP;
-import org.apache.drill.exec.physical.config.Project;
-import org.apache.drill.exec.physical.config.RandomReceiver;
-import org.apache.drill.exec.physical.config.Screen;
-import org.apache.drill.exec.physical.config.SelectionVectorRemover;
-import org.apache.drill.exec.physical.config.SingleSender;
-import org.apache.drill.exec.physical.config.Sort;
-import org.apache.drill.exec.physical.config.StreamingAggregate;
-import org.apache.drill.exec.physical.config.Union;
+import org.apache.drill.exec.physical.config.*;
 import org.apache.drill.exec.physical.impl.aggregate.AggBatchCreator;
 import org.apache.drill.exec.physical.config.Union;
 import org.apache.drill.exec.physical.impl.filter.FilterBatchCreator;
 import org.apache.drill.exec.physical.impl.join.MergeJoinCreator;
+import org.apache.drill.exec.physical.impl.limit.LimitBatchCreator;
 import 
org.apache.drill.exec.physical.impl.partitionsender.PartitionSenderCreator;
 import org.apache.drill.exec.physical.impl.project.ProjectBatchCreator;
 import org.apache.drill.exec.physical.impl.sort.SortBatchCreator;
@@ -72,6 +63,7 @@ public class ImplCreator extends 
AbstractPhysicalVisitor<RecordBatch, FragmentCo
   private SingleSenderCreator ssc = new SingleSenderCreator();
   private ProjectBatchCreator pbc = new ProjectBatchCreator();
   private FilterBatchCreator fbc = new FilterBatchCreator();
+  private LimitBatchCreator lbc = new LimitBatchCreator();
   private UnionBatchCreator unionbc = new UnionBatchCreator();
   private SVRemoverCreator svc = new SVRemoverCreator();
   private SortBatchCreator sbc = new SortBatchCreator();
@@ -122,6 +114,11 @@ public class ImplCreator extends 
AbstractPhysicalVisitor<RecordBatch, FragmentCo
   }
 
   @Override
+  public RecordBatch visitLimit(Limit limit, FragmentContext context) throws 
ExecutionSetupException {
+    return lbc.getBatch(context, limit, getChildren(limit, context));
+  }
+
+  @Override
   public RecordBatch visitMergeJoin(MergeJoinPOP op, FragmentContext context) 
throws ExecutionSetupException {
     return mjc.getBatch(context, op, getChildren(op, context));
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b7d41ebe/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitBatchCreator.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitBatchCreator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitBatchCreator.java
new file mode 100644
index 0000000..b378f9b
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitBatchCreator.java
@@ -0,0 +1,17 @@
+package org.apache.drill.exec.physical.impl.limit;
+
+import com.google.common.collect.Iterables;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.config.Limit;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.record.RecordBatch;
+
+import java.util.List;
+
+public class LimitBatchCreator implements BatchCreator<Limit> {
+  @Override
+  public RecordBatch getBatch(FragmentContext context, Limit config, 
List<RecordBatch> children) throws ExecutionSetupException {
+    return new LimitRecordBatch(config, context, 
Iterables.getOnlyElement(children));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b7d41ebe/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
new file mode 100644
index 0000000..ed09663
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
@@ -0,0 +1,107 @@
+package org.apache.drill.exec.physical.impl.limit;
+
+import com.beust.jcommander.internal.Lists;
+import com.google.common.base.Objects;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.config.Limit;
+import org.apache.drill.exec.record.*;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.vector.ValueVector;
+
+import java.util.List;
+
+public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
+
+  private SelectionVector2 outgoingSv;
+  private SelectionVector2 incomingSv;
+
+  public LimitRecordBatch(Limit popConfig, FragmentContext context, 
RecordBatch incoming) {
+    super(popConfig, context, incoming);
+    outgoingSv = new SelectionVector2(context.getAllocator());
+  }
+
+  @Override
+  protected void setupNewSchema() throws SchemaChangeException {
+    container.clear();
+
+    List<TransferPair> transfers = Lists.newArrayList();
+
+    for(VectorWrapper<?> v : incoming){
+      TransferPair pair = v.getValueVector().getTransferPair();
+      container.add(pair.getTo());
+      transfers.add(pair);
+    }
+
+    BatchSchema.SelectionVectorMode svMode = 
incoming.getSchema().getSelectionVectorMode();
+
+    switch(svMode){
+      case NONE:
+        break;
+      case TWO_BYTE:
+        this.incomingSv = incoming.getSelectionVector2();
+        break;
+      default:
+        throw new UnsupportedOperationException();
+    }
+
+    container.buildSchema(BatchSchema.SelectionVectorMode.TWO_BYTE);
+
+
+    for(TransferPair tp : transfers) {
+      tp.transfer();
+    }
+  }
+
+  @Override
+  public SelectionVector2 getSelectionVector2() {
+    return outgoingSv;
+  }
+
+  @Override
+  protected void doWork() {
+    int recordCount = incoming.getRecordCount();
+    outgoingSv.allocateNew(recordCount);
+
+    if(incomingSv != null) {
+      limitWithSV(recordCount);
+    } else {
+      limitWithNoSV(recordCount);
+    }
+  }
+
+  private void limitWithNoSV(int recordCount) {
+    int svIndex = 0;
+    int offset = Math.max(0, Objects.firstNonNull(popConfig.getFirst(), 0));
+    int fetch = Math.min(recordCount, 
Objects.firstNonNull(popConfig.getLast(), recordCount));
+    for(char i = (char) offset; i < fetch; i++) {
+      outgoingSv.setIndex(svIndex, i);
+      svIndex++;
+    }
+    outgoingSv.setRecordCount(svIndex);
+  }
+
+  private void limitWithSV(int recordCount) {
+    int svIndex = 0;
+    int offset = Math.max(0, Objects.firstNonNull(popConfig.getFirst(), 0));
+    int fetch = Math.min(recordCount, 
Objects.firstNonNull(popConfig.getLast(), recordCount));
+    for(int i = offset; i < fetch; i++) {
+      char index = incomingSv.getIndex(i);
+      outgoingSv.setIndex(svIndex, index);
+      svIndex++;
+    }
+
+    outgoingSv.setRecordCount(svIndex);
+  }
+
+  @Override
+  public int getRecordCount() {
+    return outgoingSv.getCount();
+  }
+
+  @Override
+  protected void cleanup(){
+    super.cleanup();
+    outgoingSv.clear();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b7d41ebe/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java
index 0368d0c..ca933c6 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.planner.fragment;
 
 import org.apache.drill.exec.physical.base.*;
 import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.config.Limit;
 import org.apache.drill.exec.planner.AbstractOpWrapperVisitor;
 import org.apache.drill.exec.planner.fragment.Fragment.ExchangeFragmentPair;
 
@@ -92,6 +93,12 @@ public class StatsCollector {
     }
 
     @Override
+    public Void visitLimit(Limit limit, Wrapper value) throws RuntimeException 
{
+      // TODO: Implement this
+      return visitOp(limit, value);
+    }
+
+    @Override
     public Void visitOp(PhysicalOperator op, Wrapper wrapper) {
       if(op instanceof HasAffinity){
         wrapper.addEndpointAffinity(((HasAffinity)op).getOperatorAffinity());

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b7d41ebe/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestSimpleLimit.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestSimpleLimit.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestSimpleLimit.java
new file mode 100644
index 0000000..c8758fd
--- /dev/null
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestSimpleLimit.java
@@ -0,0 +1,51 @@
+package org.apache.drill.exec.physical.impl.limit;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+import com.yammer.metrics.MetricRegistry;
+import mockit.Injectable;
+import mockit.NonStrictExpectations;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.util.FileUtils;
+import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.PhysicalPlan;
+import org.apache.drill.exec.physical.base.FragmentRoot;
+import org.apache.drill.exec.physical.impl.ImplCreator;
+import org.apache.drill.exec.physical.impl.SimpleRootExec;
+import org.apache.drill.exec.planner.PhysicalPlanReader;
+import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.proto.ExecProtos;
+import org.apache.drill.exec.rpc.user.UserServer;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestSimpleLimit {
+
+  DrillConfig c = DrillConfig.create();
+  @Test
+  public void testLimit(@Injectable final DrillbitContext bitContext, 
@Injectable UserServer.UserClientConnection connection) throws Throwable{
+    new NonStrictExpectations(){{
+      bitContext.getMetrics(); result = new MetricRegistry("test");
+      bitContext.getAllocator(); result = BufferAllocator.getAllocator(c);
+    }};
+
+    PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), 
CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
+    PhysicalPlan plan = 
reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile("/limit/test1.json"),
 Charsets.UTF_8));
+    FunctionImplementationRegistry registry = new 
FunctionImplementationRegistry(c);
+    FragmentContext context = new FragmentContext(bitContext, 
ExecProtos.FragmentHandle.getDefaultInstance(), connection, null, registry);
+    SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, 
(FragmentRoot) plan.getSortedOperators(false).iterator().next()));
+    while(exec.next()){
+      assertEquals(6, exec.getRecordCount());
+    }
+
+    if(context.getFailureCause() != null){
+      throw context.getFailureCause();
+    }
+    assertTrue(!context.isFailed());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b7d41ebe/exec/java-exec/src/test/resources/limit/test1.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/limit/test1.json 
b/exec/java-exec/src/test/resources/limit/test1.json
new file mode 100644
index 0000000..79d6748
--- /dev/null
+++ b/exec/java-exec/src/test/resources/limit/test1.json
@@ -0,0 +1,41 @@
+{
+    head:{
+        type:"APACHE_DRILL_PHYSICAL",
+        version:"1",
+        generator:{
+            type:"manual"
+        }
+    },
+       graph:[
+        {
+            @id:1,
+            pop:"mock-sub-scan",
+            url: "http://apache.org";,
+            entries:[
+               {records: 100, types: [
+                 {name: "blue", type: "INT", mode: "REQUIRED"},
+                 {name: "red", type: "BIGINT", mode: "REQUIRED"},
+                 {name: "green", type: "INT", mode: "REQUIRED"}
+               ]}
+            ]
+        },
+        {
+            @id:2,
+            child: 1,
+            pop:"limit",
+            first:5,
+            last:10
+        },
+        {
+          @id:4,
+          child:2,
+          pop: "selection-vector-remover"
+
+        },
+        {
+            @id: 3,
+            child: 4,
+            pop: "screen"
+        }
+    ]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b7d41ebe/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/LimitROP.java
----------------------------------------------------------------------
diff --git 
a/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/LimitROP.java 
b/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/LimitROP.java
index 00baf81..d26bcd9 100644
--- a/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/LimitROP.java
+++ b/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/LimitROP.java
@@ -25,8 +25,8 @@ import org.apache.drill.exec.ref.eval.EvaluatorFactory;
 public class LimitROP extends SingleInputROPBase<Limit>{
 
   private LimitIterator iter;
-  private int first;
-  private int last;
+  private Integer first;
+  private Integer last;
 
   public LimitROP(Limit config) {
     super(config);
@@ -62,10 +62,10 @@ public class LimitROP extends SingleInputROPBase<Limit>{
       while(true){
         r = incoming.next();
         currentIndex++;
-        if (currentIndex > first && currentIndex <= last)
+        if (currentIndex > first && (last == null || currentIndex <= last))
             return r;
 
-        if (currentIndex > last)
+        if (last != null && currentIndex > last)
             return NextOutcome.NONE_LEFT;
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b7d41ebe/sqlparser/src/main/java/org/apache/drill/optiq/DrillLimitRel.java
----------------------------------------------------------------------
diff --git a/sqlparser/src/main/java/org/apache/drill/optiq/DrillLimitRel.java 
b/sqlparser/src/main/java/org/apache/drill/optiq/DrillLimitRel.java
new file mode 100644
index 0000000..9b4ef78
--- /dev/null
+++ b/sqlparser/src/main/java/org/apache/drill/optiq/DrillLimitRel.java
@@ -0,0 +1,43 @@
+package org.apache.drill.optiq;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.rel.SingleRel;
+import org.eigenbase.relopt.RelOptCluster;
+import org.eigenbase.relopt.RelTraitSet;
+import org.eigenbase.rex.RexLiteral;
+import org.eigenbase.rex.RexNode;
+
+import java.util.List;
+
+public class DrillLimitRel extends SingleRel implements DrillRel {
+  private RexNode offset;
+  private RexNode fetch;
+
+  public DrillLimitRel(RelOptCluster cluster, RelTraitSet traitSet, RelNode 
child, RexNode offset, RexNode fetch) {
+    super(cluster, traitSet, child);
+    this.offset = offset;
+    this.fetch = fetch;
+  }
+
+  @Override
+  public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+    return new DrillLimitRel(getCluster(), traitSet, sole(inputs), offset, 
fetch);
+  }
+
+  @Override
+  public int implement(DrillImplementor implementor) {
+    int inputId = implementor.visitChild(this, 0, getChild());
+    final ObjectNode limit = implementor.mapper.createObjectNode();
+    limit.put("op", "limit");
+    limit.put("input", inputId);
+    int offsetVal = offset != null ? Math.max(0, RexLiteral.intValue(offset)) 
: 0;
+    // First offset to include into results (inclusive). Null implies it is 
starting from offset 0
+    limit.put("first", offsetVal);
+    // Last offset to stop including into results (exclusive), translating 
fetch row counts into an offset.
+    // Null value implies including entire remaining result set from first 
offset
+    limit.put("last", fetch != null ? Math.max(0, RexLiteral.intValue(fetch)) 
+ offsetVal : null);
+    return implementor.add(limit);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b7d41ebe/sqlparser/src/main/java/org/apache/drill/optiq/DrillLimitRule.java
----------------------------------------------------------------------
diff --git a/sqlparser/src/main/java/org/apache/drill/optiq/DrillLimitRule.java 
b/sqlparser/src/main/java/org/apache/drill/optiq/DrillLimitRule.java
new file mode 100644
index 0000000..c7c2a4e
--- /dev/null
+++ b/sqlparser/src/main/java/org/apache/drill/optiq/DrillLimitRule.java
@@ -0,0 +1,40 @@
+package org.apache.drill.optiq;
+
+import org.eigenbase.rel.RelCollationImpl;
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.rel.SortRel;
+import org.eigenbase.relopt.Convention;
+import org.eigenbase.relopt.RelOptRule;
+import org.eigenbase.relopt.RelOptRuleCall;
+import org.eigenbase.relopt.RelTraitSet;
+
+public class DrillLimitRule extends RelOptRule {
+  public static DrillLimitRule INSTANCE = new DrillLimitRule();
+
+  private DrillLimitRule() {
+    super(RelOptRule.some(SortRel.class, Convention.NONE, 
RelOptRule.any(RelNode.class)), "DrillLimitRule");
+  }
+
+  @Override
+  public void onMatch(RelOptRuleCall call) {
+    final SortRel sort = call.rel(0);
+    if (sort.offset == null && sort.fetch == null) {
+      return;
+    }
+    final RelTraitSet traits = sort.getTraitSet().plus(DrillRel.CONVENTION);
+    RelNode input = sort.getChild();
+    //RelNode input = sort.getChild();
+    if (!sort.getCollation().getFieldCollations().isEmpty()) {
+      input = sort.copy(
+          sort.getTraitSet().replace(RelCollationImpl.EMPTY),
+          input,
+          RelCollationImpl.EMPTY,
+          null,
+          null);
+    }
+    RelNode x = convert(
+        input,
+        input.getTraitSet().replace(DrillRel.CONVENTION));
+    call.transformTo(new DrillLimitRel(sort.getCluster(), traits, x, 
sort.offset, sort.fetch));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b7d41ebe/sqlparser/src/main/java/org/apache/drill/optiq/DrillOptiq.java
----------------------------------------------------------------------
diff --git a/sqlparser/src/main/java/org/apache/drill/optiq/DrillOptiq.java 
b/sqlparser/src/main/java/org/apache/drill/optiq/DrillOptiq.java
index b01aa7d..1245cfe 100644
--- a/sqlparser/src/main/java/org/apache/drill/optiq/DrillOptiq.java
+++ b/sqlparser/src/main/java/org/apache/drill/optiq/DrillOptiq.java
@@ -56,6 +56,7 @@ public class DrillOptiq {
 
     // Enable when https://issues.apache.org/jira/browse/DRILL-57 fixed
     if (false) planner.addRule(DrillValuesRule.INSTANCE);
+    planner.addRule(DrillLimitRule.INSTANCE);
     planner.addRule(DrillSortRule.INSTANCE);
     planner.addRule(DrillJoinRule.INSTANCE);
     planner.addRule(DrillUnionRule.INSTANCE);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b7d41ebe/sqlparser/src/main/java/org/apache/drill/optiq/DrillSortRel.java
----------------------------------------------------------------------
diff --git a/sqlparser/src/main/java/org/apache/drill/optiq/DrillSortRel.java 
b/sqlparser/src/main/java/org/apache/drill/optiq/DrillSortRel.java
index b2e9b50..929d381 100644
--- a/sqlparser/src/main/java/org/apache/drill/optiq/DrillSortRel.java
+++ b/sqlparser/src/main/java/org/apache/drill/optiq/DrillSortRel.java
@@ -30,19 +30,27 @@ import org.eigenbase.relopt.RelTraitSet;
 
 import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.eigenbase.rex.RexLiteral;
+import org.eigenbase.rex.RexNode;
 
 /**
  * Sort implemented in Drill.
  */
 public class DrillSortRel extends SortRel implements DrillRel {
+
   /** Creates a DrillSortRel. */
   public DrillSortRel(RelOptCluster cluster, RelTraitSet traits, RelNode 
input, RelCollation collation) {
     super(cluster, traits, input, collation);
   }
 
+  /** Creates a DrillSortRel with offset and fetch. */
+  public DrillSortRel(RelOptCluster cluster, RelTraitSet traits, RelNode 
input, RelCollation collation, RexNode offset, RexNode fetch) {
+    super(cluster, traits, input, collation, offset, fetch);
+  }
+
   @Override
-  public DrillSortRel copy(RelTraitSet traitSet, RelNode input, RelCollation 
collation) {
-    return new DrillSortRel(getCluster(), traitSet, input, collation);
+  public DrillSortRel copy(RelTraitSet traitSet, RelNode input, RelCollation 
collation, RexNode offset, RexNode fetch) {
+    return new DrillSortRel(getCluster(), traitSet, input, collation, offset, 
fetch);
   }
 
   @Override
@@ -75,6 +83,8 @@ public class DrillSortRel extends SortRel implements DrillRel 
{
     return implementor.add(order);
   }
 
+
+
   private static String toDrill(RelFieldCollation collation) {
     switch (collation.getDirection()) {
     case Ascending:

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b7d41ebe/sqlparser/src/main/java/org/apache/drill/optiq/DrillSortRule.java
----------------------------------------------------------------------
diff --git a/sqlparser/src/main/java/org/apache/drill/optiq/DrillSortRule.java 
b/sqlparser/src/main/java/org/apache/drill/optiq/DrillSortRule.java
index d5eac2e..22b9f5d 100644
--- a/sqlparser/src/main/java/org/apache/drill/optiq/DrillSortRule.java
+++ b/sqlparser/src/main/java/org/apache/drill/optiq/DrillSortRule.java
@@ -34,6 +34,11 @@ public class DrillSortRule extends RelOptRule {
   @Override
   public void onMatch(RelOptRuleCall call) {
     final SortRel sort = call.rel(0);
+
+    if(sort.offset != null || sort.fetch != null) {
+      return; //Sort already handled by DrillLimitRule
+    }
+
     final RelNode input = call.rel(1);
     final RelTraitSet traits = sort.getTraitSet().plus(DrillRel.CONVENTION);
     final RelTraitSet inputTraits = 
input.getTraitSet().plus(DrillRel.CONVENTION);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b7d41ebe/sqlparser/src/test/java/org/apache/drill/jdbc/test/FullEngineTest.java
----------------------------------------------------------------------
diff --git 
a/sqlparser/src/test/java/org/apache/drill/jdbc/test/FullEngineTest.java 
b/sqlparser/src/test/java/org/apache/drill/jdbc/test/FullEngineTest.java
index 45d1ff3..f271bad 100644
--- a/sqlparser/src/test/java/org/apache/drill/jdbc/test/FullEngineTest.java
+++ b/sqlparser/src/test/java/org/apache/drill/jdbc/test/FullEngineTest.java
@@ -48,5 +48,4 @@ public class FullEngineTest {
     // .sql("select cast(_MAP['red'] as bigint) + 1 as red_inc from donuts ")
         .sql("select * from \"department.json\" ").displayResults(50);
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b7d41ebe/sqlparser/src/test/java/org/apache/drill/jdbc/test/JdbcTest.java
----------------------------------------------------------------------
diff --git a/sqlparser/src/test/java/org/apache/drill/jdbc/test/JdbcTest.java 
b/sqlparser/src/test/java/org/apache/drill/jdbc/test/JdbcTest.java
index 36b024c..80196b2 100644
--- a/sqlparser/src/test/java/org/apache/drill/jdbc/test/JdbcTest.java
+++ b/sqlparser/src/test/java/org/apache/drill/jdbc/test/JdbcTest.java
@@ -399,6 +399,37 @@ public class JdbcTest {
             + "DEPTID=null; LASTNAME=John\n")
         .planContains("'op':'order'");
   }
+
+  @Test
+  public void testLimit() throws Exception {
+    JdbcAssert
+        .withModel(MODEL, "HR")
+        .sql("select LASTNAME from emp limit 2")
+        .planContains("\"op\":\"limit\"")
+        .returns("LASTNAME=Rafferty\n" +
+            "LASTNAME=Jones");
+  }
+
+  @Test
+  public void testOrderByWithOffset() throws Exception {
+    JdbcAssert
+        .withModel(MODEL, "HR")
+        .sql("select LASTNAME from emp order by LASTNAME asc offset 3")
+        .planContains("\"op\":\"limit\"")
+        .returns("LASTNAME=Robinson\n" +
+            "LASTNAME=Smith\n" +
+            "LASTNAME=John");
+  }
+
+  @Test
+  public void testOrderByWithOffsetAndFetch() throws Exception {
+    JdbcAssert
+        .withModel(MODEL, "HR")
+        .sql("select LASTNAME from emp order by LASTNAME asc offset 3 fetch 
next 2 rows only")
+        .planContains("\"op\":\"limit\"")
+        .returns("LASTNAME=Robinson\n" +
+            "LASTNAME=Smith");
+  }
 }
 
 // End JdbcTest.java

Reply via email to