Address comments and make limit work across batches

Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/ecb5e15e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/ecb5e15e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/ecb5e15e

Branch: refs/heads/master
Commit: ecb5e15ed5591bfa3cdd6935eb848cd6f57c4ca0
Parents: b7d41eb
Author: Timothy Chen <[email protected]>
Authored: Wed Sep 18 22:28:58 2013 -0700
Committer: Timothy Chen <[email protected]>
Committed: Mon Oct 14 12:06:18 2013 -0700

----------------------------------------------------------------------
 LICENSE                                         |   2 +-
 .../org/apache/drill/common/JSONOptions.java    |   2 +-
 .../drill/common/logical/data/Constant.java     |   2 +-
 .../apache/drill/common/logical/data/Limit.java |  40 +++--
 .../common/logical/data/LogicalOperator.java    |  31 ++--
 .../logical/data/LogicalOperatorBase.java       |   6 +
 .../physical/impl/limit/LimitBatchCreator.java  |  17 ++
 .../physical/impl/limit/LimitRecordBatch.java   |  81 +++++++--
 .../physical/impl/limit/TestSimpleLimit.java    |  51 +++++-
 .../src/test/resources/limit/test2.json         |  41 +++++
 .../src/test/resources/limit/test3.json         |  40 +++++
 pom.xml                                         |   2 +-
 .../apache/drill/optiq/DrillImplementor.java    |   7 +-
 .../org/apache/drill/optiq/DrillLimitRel.java   |  29 +++-
 .../org/apache/drill/optiq/DrillLimitRule.java  |  21 ++-
 .../org/apache/drill/optiq/DrillSortRule.java   |   2 +-
 .../org/apache/drill/jdbc/test/JdbcAssert.java  |  62 ++++---
 .../org/apache/drill/jdbc/test/JdbcTest.java    | 165 +++++++++++++------
 18 files changed, 476 insertions(+), 125 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ecb5e15e/LICENSE
----------------------------------------------------------------------
diff --git a/LICENSE b/LICENSE
index 5234110..124026f 100644
--- a/LICENSE
+++ b/LICENSE
@@ -269,7 +269,7 @@ For
   Netty/Common (http://netty.io/netty-common/) 
io.netty:netty-common:bundle:4.0.7.Final
   Netty/Handler (http://netty.io/netty-handler/) 
io.netty:netty-handler:bundle:4.0.7.Final
   Netty/Transport (http://netty.io/netty-transport/) 
io.netty:netty-transport:bundle:4.0.7.Final
-  optiq (http://github.com/julianhyde/optiq) net.hydromatic:optiq:jar:0.4.10
+  optiq (http://github.com/julianhyde/optiq) net.hydromatic:optiq:jar:0.4.11
   Parquet Column (https://github.com/Parquet/parquet-mr) 
com.twitter:parquet-column:jar:1.0.1
   Parquet Common (https://github.com/Parquet/parquet-mr) 
com.twitter:parquet-common:jar:1.0.1
   Parquet Encodings (https://github.com/Parquet/parquet-mr) 
com.twitter:parquet-encoding:jar:1.0.1

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ecb5e15e/common/src/main/java/org/apache/drill/common/JSONOptions.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/JSONOptions.java 
b/common/src/main/java/org/apache/drill/common/JSONOptions.java
index 8157dc6..23de124 100644
--- a/common/src/main/java/org/apache/drill/common/JSONOptions.java
+++ b/common/src/main/java/org/apache/drill/common/JSONOptions.java
@@ -48,7 +48,7 @@ public class JSONOptions {
   private JsonNode root;
   private JsonLocation location;
   
-  private JSONOptions(JsonNode n, JsonLocation location){
+  public JSONOptions(JsonNode n, JsonLocation location){
     this.root = n;
     this.location = location;
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ecb5e15e/common/src/main/java/org/apache/drill/common/logical/data/Constant.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/drill/common/logical/data/Constant.java 
b/common/src/main/java/org/apache/drill/common/logical/data/Constant.java
index e05ce41..f9e7394 100644
--- a/common/src/main/java/org/apache/drill/common/logical/data/Constant.java
+++ b/common/src/main/java/org/apache/drill/common/logical/data/Constant.java
@@ -28,7 +28,7 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
 import com.google.common.base.Preconditions;
 
 @JsonTypeName("constant")
-public class Constant extends SourceOperator{
+public class Constant extends SourceOperator {
 
     private final JSONOptions content;
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ecb5e15e/common/src/main/java/org/apache/drill/common/logical/data/Limit.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/drill/common/logical/data/Limit.java 
b/common/src/main/java/org/apache/drill/common/logical/data/Limit.java
index 6843d39..110204b 100644
--- a/common/src/main/java/org/apache/drill/common/logical/data/Limit.java
+++ b/common/src/main/java/org/apache/drill/common/logical/data/Limit.java
@@ -20,17 +20,19 @@ package org.apache.drill.common.logical.data;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.collect.Iterators;
 import org.apache.drill.common.logical.data.visitors.LogicalVisitor;
 
 import java.util.Iterator;
 
 @JsonTypeName("limit")
-public class Limit extends SingleInputOperator{
-  
+public class Limit extends SingleInputOperator {
+
   private final Integer first;
   private final Integer last;
-  
+
   @JsonCreator
   public Limit(@JsonProperty("first") Integer first, @JsonProperty("last") 
Integer last) {
     super();
@@ -46,15 +48,31 @@ public class Limit extends SingleInputOperator{
     return last;
   }
 
-    @Override
-    public <T, X, E extends Throwable> T accept(LogicalVisitor<T, X, E> 
logicalVisitor, X value) throws E {
-        return logicalVisitor.visitLimit(this, value);
-    }
+  @Override
+  public <T, X, E extends Throwable> T accept(LogicalVisitor<T, X, E> 
logicalVisitor, X value) throws E {
+    return logicalVisitor.visitLimit(this, value);
+  }
 
-    @Override
-    public Iterator<LogicalOperator> iterator() {
-        return Iterators.singletonIterator(getInput());
-    }
+  @Override
+  public NodeBuilder nodeBuilder() {
+    return new LimitNodeBuilder();  //To change body of implemented methods 
use File | Settings | File Templates.
+  }
+
+  @Override
+  public Iterator<LogicalOperator> iterator() {
+    return Iterators.singletonIterator(getInput());
+  }
 
+  public static class LimitNodeBuilder implements NodeBuilder<Limit> {
 
+    @Override
+    public ObjectNode convert(ObjectMapper mapper, Limit operator, Integer 
inputId) {
+      ObjectNode limitNode = mapper.createObjectNode();
+      limitNode.put("op", "limit");
+      limitNode.put("input", inputId);
+      limitNode.put("first", operator.first);
+      limitNode.put("last", operator.last);
+      return limitNode;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ecb5e15e/common/src/main/java/org/apache/drill/common/logical/data/LogicalOperator.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/drill/common/logical/data/LogicalOperator.java
 
b/common/src/main/java/org/apache/drill/common/logical/data/LogicalOperator.java
index 9810e75..531e6a6 100644
--- 
a/common/src/main/java/org/apache/drill/common/logical/data/LogicalOperator.java
+++ 
b/common/src/main/java/org/apache/drill/common/logical/data/LogicalOperator.java
@@ -21,26 +21,33 @@ import java.util.Collection;
 import java.util.List;
 
 import com.fasterxml.jackson.annotation.*;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
 import org.apache.drill.common.graph.GraphValue;
 import org.apache.drill.common.logical.ValidationError;
 
 import org.apache.drill.common.logical.data.visitors.LogicalVisitor;
 
 @JsonPropertyOrder({"@id", "memo", "input"}) // op will always be first since 
it is wrapped.
-@JsonIdentityInfo(generator=ObjectIdGenerators.IntSequenceGenerator.class, 
property="@id")
-@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, 
property="op")
-public interface LogicalOperator extends GraphValue<LogicalOperator>{
+@JsonIdentityInfo(generator = ObjectIdGenerators.IntSequenceGenerator.class, 
property = "@id")
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, 
property = "op")
+public interface LogicalOperator extends GraphValue<LogicalOperator> {
 
-       public void setupAndValidate(List<LogicalOperator> operators, 
Collection<ValidationError> errors);
+  public void setupAndValidate(List<LogicalOperator> operators, 
Collection<ValidationError> errors);
 
-    /**
-     * Provides capability to build a set of output based on traversing a 
query graph tree.
-     *
-     * @param logicalVisitor
-     * @return
-     */
-    public <T, X, E extends Throwable> T accept(LogicalVisitor<T, X, E> 
logicalVisitor, X value) throws E;
+  /**
+   * Provides capability to build a set of output based on traversing a query 
graph tree.
+   *
+   * @param logicalVisitor
+   * @return
+   */
+  public <T, X, E extends Throwable> T accept(LogicalVisitor<T, X, E> 
logicalVisitor, X value) throws E;
 
-    public void registerAsSubscriber(LogicalOperator operator);
+  public void registerAsSubscriber(LogicalOperator operator);
 
+  NodeBuilder nodeBuilder();
+
+  public interface NodeBuilder<T extends LogicalOperator> {
+    ObjectNode convert(ObjectMapper mapper, T operator, Integer inputId);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ecb5e15e/common/src/main/java/org/apache/drill/common/logical/data/LogicalOperatorBase.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/drill/common/logical/data/LogicalOperatorBase.java
 
b/common/src/main/java/org/apache/drill/common/logical/data/LogicalOperatorBase.java
index 29c994f..69a1c3c 100644
--- 
a/common/src/main/java/org/apache/drill/common/logical/data/LogicalOperatorBase.java
+++ 
b/common/src/main/java/org/apache/drill/common/logical/data/LogicalOperatorBase.java
@@ -48,6 +48,12 @@ public abstract class LogicalOperatorBase implements 
LogicalOperator{
        }
 
   @Override
+  public NodeBuilder nodeBuilder() {
+    // FIXME: Implement this on all logical operators
+    throw new UnsupportedOperationException("Not yet implemented.");
+  }
+
+  @Override
   public void accept(GraphVisitor<LogicalOperator> visitor) {
     if(visitor.enter(this)) visitor.leave(this);
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ecb5e15e/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitBatchCreator.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitBatchCreator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitBatchCreator.java
index b378f9b..ccbf755 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitBatchCreator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitBatchCreator.java
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.drill.exec.physical.impl.limit;
 
 import com.google.common.collect.Iterables;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ecb5e15e/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
index ed09663..783980b 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.drill.exec.physical.impl.limit;
 
 import com.beust.jcommander.internal.Lists;
@@ -15,10 +32,20 @@ public class LimitRecordBatch extends 
AbstractSingleRecordBatch<Limit> {
 
   private SelectionVector2 outgoingSv;
   private SelectionVector2 incomingSv;
+  private int recordsToSkip;
+  private int recordsLeft;
+  private boolean noEndLimit;
+  private boolean skipBatch;
 
   public LimitRecordBatch(Limit popConfig, FragmentContext context, 
RecordBatch incoming) {
     super(popConfig, context, incoming);
     outgoingSv = new SelectionVector2(context.getAllocator());
+    recordsToSkip = popConfig.getFirst();
+    noEndLimit = popConfig.getLast() == null;
+    if(!noEndLimit) {
+      recordsLeft = popConfig.getLast() - recordsToSkip;
+    }
+    skipBatch = false;
   }
 
   @Override
@@ -47,33 +74,55 @@ public class LimitRecordBatch extends 
AbstractSingleRecordBatch<Limit> {
 
     container.buildSchema(BatchSchema.SelectionVectorMode.TWO_BYTE);
 
-
     for(TransferPair tp : transfers) {
       tp.transfer();
     }
   }
 
   @Override
+  public IterOutcome next() {
+    if(!noEndLimit && recordsLeft <= 0) {
+      return IterOutcome.NONE;
+    }
+
+    return super.next();
+  }
+
+  @Override
   public SelectionVector2 getSelectionVector2() {
     return outgoingSv;
   }
 
   @Override
   protected void doWork() {
+    skipBatch = false;
     int recordCount = incoming.getRecordCount();
-    outgoingSv.allocateNew(recordCount);
-
-    if(incomingSv != null) {
-      limitWithSV(recordCount);
+    if(recordCount <= recordsToSkip) {
+      recordsToSkip -= recordCount;
+      skipBatch = true;
     } else {
-      limitWithNoSV(recordCount);
+      outgoingSv.allocateNew(recordCount);
+      if(incomingSv != null) {
+       limitWithSV(recordCount);
+      } else {
+       limitWithNoSV(recordCount);
+      }
     }
   }
 
   private void limitWithNoSV(int recordCount) {
+    int offset = Math.max(0, Math.min(recordCount - 1, recordsToSkip));
+    recordsToSkip -= offset;
+    int fetch;
+
+    if(noEndLimit) {
+      fetch = recordCount;
+    } else {
+      fetch = Math.min(recordCount, offset + recordsLeft);
+      recordsLeft -= Math.max(0, fetch - offset);
+    }
+
     int svIndex = 0;
-    int offset = Math.max(0, Objects.firstNonNull(popConfig.getFirst(), 0));
-    int fetch = Math.min(recordCount, 
Objects.firstNonNull(popConfig.getLast(), recordCount));
     for(char i = (char) offset; i < fetch; i++) {
       outgoingSv.setIndex(svIndex, i);
       svIndex++;
@@ -82,21 +131,29 @@ public class LimitRecordBatch extends 
AbstractSingleRecordBatch<Limit> {
   }
 
   private void limitWithSV(int recordCount) {
+    int offset = Math.max(0, Math.min(recordCount - 1, recordsToSkip));
+    recordsToSkip -= offset;
+    int fetch;
+
+    if(noEndLimit) {
+      fetch = recordCount;
+    } else {
+      fetch = Math.min(recordCount, recordsLeft);
+      recordsLeft -= Math.max(0, fetch - offset);
+    }
+
     int svIndex = 0;
-    int offset = Math.max(0, Objects.firstNonNull(popConfig.getFirst(), 0));
-    int fetch = Math.min(recordCount, 
Objects.firstNonNull(popConfig.getLast(), recordCount));
     for(int i = offset; i < fetch; i++) {
       char index = incomingSv.getIndex(i);
       outgoingSv.setIndex(svIndex, index);
       svIndex++;
     }
-
     outgoingSv.setRecordCount(svIndex);
   }
 
   @Override
   public int getRecordCount() {
-    return outgoingSv.getCount();
+    return skipBatch ? 0 : outgoingSv.getCount();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ecb5e15e/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestSimpleLimit.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestSimpleLimit.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestSimpleLimit.java
index c8758fd..ef7638e 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestSimpleLimit.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestSimpleLimit.java
@@ -1,8 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.drill.exec.physical.impl.limit;
 
 import com.google.common.base.Charsets;
 import com.google.common.io.Files;
 import com.yammer.metrics.MetricRegistry;
+import junit.framework.Assert;
 import mockit.Injectable;
 import mockit.NonStrictExpectations;
 import org.apache.drill.common.config.DrillConfig;
@@ -25,8 +43,8 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 public class TestSimpleLimit {
-
   DrillConfig c = DrillConfig.create();
+
   @Test
   public void testLimit(@Injectable final DrillbitContext bitContext, 
@Injectable UserServer.UserClientConnection connection) throws Throwable{
     new NonStrictExpectations(){{
@@ -34,15 +52,42 @@ public class TestSimpleLimit {
       bitContext.getAllocator(); result = BufferAllocator.getAllocator(c);
     }};
 
+    verifyLimitCount(bitContext, connection, "test1.json", 5);
+  }
+
+  @Test
+  public void testLimitNoEnd(@Injectable final DrillbitContext bitContext, 
@Injectable UserServer.UserClientConnection connection) throws Throwable{
+    new NonStrictExpectations(){{
+      bitContext.getMetrics(); result = new MetricRegistry("test");
+      bitContext.getAllocator(); result = BufferAllocator.getAllocator(c);
+    }};
+
+    verifyLimitCount(bitContext, connection, "test3.json", 95);
+  }
+
+  @Test
+  public void testLimitAcrossBatches(@Injectable final DrillbitContext 
bitContext, @Injectable UserServer.UserClientConnection connection) throws 
Throwable{
+    new NonStrictExpectations(){{
+      bitContext.getMetrics(); result = new MetricRegistry("test");
+      bitContext.getAllocator(); result = BufferAllocator.getAllocator(c);
+    }};
+
+    verifyLimitCount(bitContext, connection, "test2.json", 69999);
+  }
+
+  private void verifyLimitCount(DrillbitContext bitContext, 
UserServer.UserClientConnection connection, String testPlan, int expectedCount) 
throws Throwable {
     PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), 
CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
-    PhysicalPlan plan = 
reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile("/limit/test1.json"),
 Charsets.UTF_8));
+    PhysicalPlan plan = 
reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile("/limit/" + 
testPlan), Charsets.UTF_8));
     FunctionImplementationRegistry registry = new 
FunctionImplementationRegistry(c);
     FragmentContext context = new FragmentContext(bitContext, 
ExecProtos.FragmentHandle.getDefaultInstance(), connection, null, registry);
     SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, 
(FragmentRoot) plan.getSortedOperators(false).iterator().next()));
+    int recordCount = 0;
     while(exec.next()){
-      assertEquals(6, exec.getRecordCount());
+      recordCount += exec.getRecordCount();
     }
 
+    assertEquals(expectedCount, recordCount);
+
     if(context.getFailureCause() != null){
       throw context.getFailureCause();
     }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ecb5e15e/exec/java-exec/src/test/resources/limit/test2.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/limit/test2.json 
b/exec/java-exec/src/test/resources/limit/test2.json
new file mode 100644
index 0000000..5ab1ab5
--- /dev/null
+++ b/exec/java-exec/src/test/resources/limit/test2.json
@@ -0,0 +1,41 @@
+{
+    head:{
+        type:"APACHE_DRILL_PHYSICAL",
+        version:"1",
+        generator:{
+            type:"manual"
+        }
+    },
+       graph:[
+        {
+            @id:1,
+            pop:"mock-sub-scan",
+            url: "http://apache.org";,
+            entries:[
+               {records: 100000000, types: [
+                 {name: "blue", type: "INT", mode: "REQUIRED"},
+                 {name: "red", type: "BIGINT", mode: "REQUIRED"},
+                 {name: "green", type: "INT", mode: "REQUIRED"}
+               ]}
+            ]
+        },
+        {
+            @id:2,
+            child: 1,
+            pop:"limit",
+            first:30001,
+            last:100000
+        },
+        {
+          @id:4,
+          child:2,
+          pop: "selection-vector-remover"
+
+        },
+        {
+            @id: 3,
+            child: 4,
+            pop: "screen"
+        }
+    ]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ecb5e15e/exec/java-exec/src/test/resources/limit/test3.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/limit/test3.json 
b/exec/java-exec/src/test/resources/limit/test3.json
new file mode 100644
index 0000000..26b78bb
--- /dev/null
+++ b/exec/java-exec/src/test/resources/limit/test3.json
@@ -0,0 +1,40 @@
+{
+    head:{
+        type:"APACHE_DRILL_PHYSICAL",
+        version:"1",
+        generator:{
+            type:"manual"
+        }
+    },
+       graph:[
+        {
+            @id:1,
+            pop:"mock-sub-scan",
+            url: "http://apache.org";,
+            entries:[
+               {records: 100, types: [
+                 {name: "blue", type: "INT", mode: "REQUIRED"},
+                 {name: "red", type: "BIGINT", mode: "REQUIRED"},
+                 {name: "green", type: "INT", mode: "REQUIRED"}
+               ]}
+            ]
+        },
+        {
+            @id:2,
+            child: 1,
+            pop:"limit",
+            first:5
+        },
+        {
+          @id:4,
+          child:2,
+          pop: "selection-vector-remover"
+
+        },
+        {
+            @id: 3,
+            child: 4,
+            pop: "screen"
+        }
+    ]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ecb5e15e/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index ce1f93d..69d280c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -555,7 +555,7 @@
           <dependency>
             <groupId>net.hydromatic</groupId>
             <artifactId>optiq</artifactId>
-            <version>0.4.10</version>
+            <version>0.4.11</version>
             <exclusions>
               <exclusion>
                 <groupId>org.jgrapht</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ecb5e15e/sqlparser/src/main/java/org/apache/drill/optiq/DrillImplementor.java
----------------------------------------------------------------------
diff --git 
a/sqlparser/src/main/java/org/apache/drill/optiq/DrillImplementor.java 
b/sqlparser/src/main/java/org/apache/drill/optiq/DrillImplementor.java
index 6efe60b..715e64f 100644
--- a/sqlparser/src/main/java/org/apache/drill/optiq/DrillImplementor.java
+++ b/sqlparser/src/main/java/org/apache/drill/optiq/DrillImplementor.java
@@ -20,13 +20,12 @@ package org.apache.drill.optiq;
 import java.io.IOException;
 import java.util.Set;
 
+import org.apache.drill.common.logical.data.LogicalOperator;
 import org.apache.drill.exec.ref.rse.QueueRSE.QueueOutputInfo;
 import org.apache.drill.jdbc.DrillTable;
 import org.eigenbase.rel.RelNode;
 
-import com.fasterxml.jackson.core.JsonParseException;
 import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.databind.JsonMappingException;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.SerializationFeature;
@@ -111,6 +110,10 @@ public class DrillImplementor {
     return add(writeOp);
   }
 
+  public int add(LogicalOperator operator, Integer inputId) {
+    return add(operator.nodeBuilder().convert(mapper, operator, inputId));
+  }
+
   public int add(ObjectNode operator) {
     operatorsNode.add(operator);
     final int id = operatorsNode.size();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ecb5e15e/sqlparser/src/main/java/org/apache/drill/optiq/DrillLimitRel.java
----------------------------------------------------------------------
diff --git a/sqlparser/src/main/java/org/apache/drill/optiq/DrillLimitRel.java 
b/sqlparser/src/main/java/org/apache/drill/optiq/DrillLimitRel.java
index 9b4ef78..e05a10a 100644
--- a/sqlparser/src/main/java/org/apache/drill/optiq/DrillLimitRel.java
+++ b/sqlparser/src/main/java/org/apache/drill/optiq/DrillLimitRel.java
@@ -1,6 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.drill.optiq;
 
 import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.drill.common.logical.data.Limit;
 import org.eigenbase.rel.RelNode;
 import org.eigenbase.rel.SingleRel;
 import org.eigenbase.relopt.RelOptCluster;
@@ -29,15 +47,14 @@ public class DrillLimitRel extends SingleRel implements 
DrillRel {
   public int implement(DrillImplementor implementor) {
     int inputId = implementor.visitChild(this, 0, getChild());
     final ObjectNode limit = implementor.mapper.createObjectNode();
-    limit.put("op", "limit");
-    limit.put("input", inputId);
-    int offsetVal = offset != null ? Math.max(0, RexLiteral.intValue(offset)) 
: 0;
+
     // First offset to include into results (inclusive). Null implies it is 
starting from offset 0
-    limit.put("first", offsetVal);
+    int first = offset != null ? Math.max(0, RexLiteral.intValue(offset)) : 0;
+
     // Last offset to stop including into results (exclusive), translating 
fetch row counts into an offset.
     // Null value implies including entire remaining result set from first 
offset
-    limit.put("last", fetch != null ? Math.max(0, RexLiteral.intValue(fetch)) 
+ offsetVal : null);
-    return implementor.add(limit);
+    Integer last = fetch != null ? Math.max(0, RexLiteral.intValue(fetch)) + 
first : null;
+    return implementor.add(new Limit(first, last), inputId);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ecb5e15e/sqlparser/src/main/java/org/apache/drill/optiq/DrillLimitRule.java
----------------------------------------------------------------------
diff --git a/sqlparser/src/main/java/org/apache/drill/optiq/DrillLimitRule.java 
b/sqlparser/src/main/java/org/apache/drill/optiq/DrillLimitRule.java
index c7c2a4e..6a7d3ba 100644
--- a/sqlparser/src/main/java/org/apache/drill/optiq/DrillLimitRule.java
+++ b/sqlparser/src/main/java/org/apache/drill/optiq/DrillLimitRule.java
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.drill.optiq;
 
 import org.eigenbase.rel.RelCollationImpl;
@@ -8,6 +25,9 @@ import org.eigenbase.relopt.RelOptRule;
 import org.eigenbase.relopt.RelOptRuleCall;
 import org.eigenbase.relopt.RelTraitSet;
 
+/**
+ * This rule converts a SortRel that has either a offset and fetch into a 
Drill Sort and Limit Rel
+ */
 public class DrillLimitRule extends RelOptRule {
   public static DrillLimitRule INSTANCE = new DrillLimitRule();
 
@@ -23,7 +43,6 @@ public class DrillLimitRule extends RelOptRule {
     }
     final RelTraitSet traits = sort.getTraitSet().plus(DrillRel.CONVENTION);
     RelNode input = sort.getChild();
-    //RelNode input = sort.getChild();
     if (!sort.getCollation().getFieldCollations().isEmpty()) {
       input = sort.copy(
           sort.getTraitSet().replace(RelCollationImpl.EMPTY),

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ecb5e15e/sqlparser/src/main/java/org/apache/drill/optiq/DrillSortRule.java
----------------------------------------------------------------------
diff --git a/sqlparser/src/main/java/org/apache/drill/optiq/DrillSortRule.java 
b/sqlparser/src/main/java/org/apache/drill/optiq/DrillSortRule.java
index 22b9f5d..871715a 100644
--- a/sqlparser/src/main/java/org/apache/drill/optiq/DrillSortRule.java
+++ b/sqlparser/src/main/java/org/apache/drill/optiq/DrillSortRule.java
@@ -36,7 +36,7 @@ public class DrillSortRule extends RelOptRule {
     final SortRel sort = call.rel(0);
 
     if(sort.offset != null || sort.fetch != null) {
-      return; //Sort already handled by DrillLimitRule
+      return;
     }
 
     final RelNode input = call.rel(1);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ecb5e15e/sqlparser/src/test/java/org/apache/drill/jdbc/test/JdbcAssert.java
----------------------------------------------------------------------
diff --git a/sqlparser/src/test/java/org/apache/drill/jdbc/test/JdbcAssert.java 
b/sqlparser/src/test/java/org/apache/drill/jdbc/test/JdbcAssert.java
index e78d5c7..651a509 100644
--- a/sqlparser/src/test/java/org/apache/drill/jdbc/test/JdbcAssert.java
+++ b/sqlparser/src/test/java/org/apache/drill/jdbc/test/JdbcAssert.java
@@ -29,9 +29,18 @@ import java.util.Properties;
 import java.util.SortedSet;
 import java.util.TreeSet;
 
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
 import net.hydromatic.linq4j.Ord;
 
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.logical.LogicalPlan;
+import org.apache.drill.common.logical.data.LogicalOperator;
 import org.apache.drill.common.util.Hook;
+import org.codehaus.jackson.node.ObjectNode;
 import org.junit.Assert;
 
 import com.google.common.base.Function;
@@ -53,7 +62,7 @@ public class JdbcAssert {
     return new ModelAndSchema(info, false);
   }
 
-  
+
   static String toString(ResultSet resultSet, int expectedRecordCount) throws 
SQLException {
     StringBuilder buf = new StringBuilder();
     int total = 0, n;
@@ -72,7 +81,7 @@ public class JdbcAssert {
     }
     return buf.toString();
   }
-  
+
   static String toString(ResultSet resultSet) throws SQLException {
     StringBuilder buf = new StringBuilder();
     final List<Ord<String>> columns = columnLabels(resultSet);
@@ -86,7 +95,6 @@ public class JdbcAssert {
   }
 
 
-
   static List<String> toStrings(ResultSet resultSet) throws SQLException {
     final List<String> list = new ArrayList<>();
     StringBuilder buf = new StringBuilder();
@@ -117,20 +125,20 @@ public class JdbcAssert {
     public ModelAndSchema(Properties info) {
       this(info, true);
     }
-    
+
     public ModelAndSchema(Properties info, final boolean ref) {
       this.info = info;
       this.connectionFactory = new ConnectionFactory() {
         public Connection createConnection() throws Exception {
           String connect = ref ? "jdbc:drillref:" : "jdbc:drill:";
-          if(ref){
+          if (ref) {
             Class.forName("org.apache.drill.jdbc.RefDriver");
-          }else{
-            Class.forName("org.apache.drill.jdbc.Driver");  
+          } else {
+            Class.forName("org.apache.drill.jdbc.Driver");
           }
-          
-          
-          return DriverManager.getConnection(connect, 
ModelAndSchema.this.info);  
+
+
+          return DriverManager.getConnection(connect, 
ModelAndSchema.this.info);
         }
       };
     }
@@ -161,7 +169,9 @@ public class JdbcAssert {
       this.sql = sql;
     }
 
-    /** Checks that the current SQL statement returns the expected result. */
+    /**
+     * Checks that the current SQL statement returns the expected result.
+     */
     public TestDataConnection returns(String expected) throws Exception {
       Connection connection = null;
       Statement statement = null;
@@ -171,7 +181,7 @@ public class JdbcAssert {
         ResultSet resultSet = statement.executeQuery(sql);
         expected = expected.trim();
         String result = JdbcAssert.toString(resultSet).trim();
-        
+
         Assert.assertTrue(String.format("Generated string:\n%s\ndoes not 
match:\n%s", result, expected), expected.equals(result));
         Assert.assertEquals(expected, result);
         resultSet.close();
@@ -185,7 +195,6 @@ public class JdbcAssert {
         }
       }
     }
-    
 
 
     /**
@@ -211,10 +220,10 @@ public class JdbcAssert {
         }
       }
     }
-    
+
     public TestDataConnection displayResults(int recordCount) throws Exception 
{
       // record count check is done in toString method
-      
+
       Connection connection = null;
       Statement statement = null;
       try {
@@ -232,7 +241,7 @@ public class JdbcAssert {
           connection.close();
         }
       }
-      
+
     }
 
     private SortedSet<String> unsortedList(List<String> strings) {
@@ -242,11 +251,9 @@ public class JdbcAssert {
       }
       return set;
     }
-    
 
-
-    public TestDataConnection planContains(String expected) {
-      final String[] plan0 = { null };
+    public LogicalPlan logicalPlan() {
+      final String[] plan0 = {null};
       Connection connection = null;
       Statement statement = null;
       final Hook.Closeable x = Hook.LOGICAL_PLAN.add(new Function<String, 
Void>() {
@@ -260,11 +267,7 @@ public class JdbcAssert {
         statement = connection.prepareStatement(sql);
         statement.close();
         final String plan = plan0[0].trim();
-        // it's easier to write java strings containing single quotes than
-        // double quotes
-        String expected2 = expected.replace("'", "\"").trim();
-        Assert.assertTrue(String.format("Plan of: \n%s \n does not contain 
expected string of: \n%s",plan, expected2), plan.contains(expected2));
-        return this;
+        return LogicalPlan.parse(DrillConfig.create(), plan);
       } catch (Exception e) {
         throw new RuntimeException(e);
       } finally {
@@ -285,6 +288,15 @@ public class JdbcAssert {
         x.close();
       }
     }
+
+    public <T extends LogicalOperator> T planContains(final Class<T> 
operatorClazz) {
+      return (T) Iterables.find(logicalPlan().getSortedOperators(), new 
Predicate<LogicalOperator>() {
+        @Override
+        public boolean apply(LogicalOperator input) {
+          return input.getClass().equals(operatorClazz);
+        }
+      });
+    }
   }
 
   private static interface ConnectionFactory {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ecb5e15e/sqlparser/src/test/java/org/apache/drill/jdbc/test/JdbcTest.java
----------------------------------------------------------------------
diff --git a/sqlparser/src/test/java/org/apache/drill/jdbc/test/JdbcTest.java 
b/sqlparser/src/test/java/org/apache/drill/jdbc/test/JdbcTest.java
index 80196b2..4a65748 100644
--- a/sqlparser/src/test/java/org/apache/drill/jdbc/test/JdbcTest.java
+++ b/sqlparser/src/test/java/org/apache/drill/jdbc/test/JdbcTest.java
@@ -21,8 +21,23 @@ import java.io.IOException;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.Statement;
-
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import com.beust.jcommander.internal.Lists;
+import com.beust.jcommander.internal.Maps;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import org.apache.drill.common.JSONOptions;
+import org.apache.drill.common.PlanProperties;
+import org.apache.drill.common.logical.LogicalPlan;
+import org.apache.drill.common.logical.StorageEngineConfig;
+import org.apache.drill.common.logical.data.*;
 import org.apache.drill.exec.ref.ReferenceInterpreter;
+import org.apache.drill.exec.ref.rse.ClasspathRSE;
+import org.apache.drill.exec.ref.rse.QueueRSE;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Test;
@@ -149,16 +164,31 @@ public class JdbcTest {
   /** Checks the logical plan. */
   @Test
   public void testProjectPlan() throws Exception {
-    JdbcAssert
+    LogicalPlan plan = JdbcAssert
         .withModel(MODEL, "DONUTS")
         .sql("select _MAP['ppu'] as ppu from donuts")
-        .planContains(
-            
"{'head':{'type':'APACHE_DRILL_LOGICAL','version':'1','generator':{'type':'optiq','info':'na'}},"
-                + 
"'storage':{'donuts-json':{'type':'classpath'},'queue':{'type':'queue'}},"
-                + "'query':["
-                + 
"{'op':'scan','memo':'initial_scan','ref':'_MAP','storageengine':'donuts-json','selection':{'path':'/donuts.json','type':'JSON'},'@id':1},"
-                + 
"{'op':'project','input':1,'projections':[{'expr':'_MAP.ppu','ref':'output.PPU'}],'@id':2},"
-                + 
"{'op':'store','input':2,'storageengine':'queue','memo':'output 
sink','target':{'number':0},'@id':3}]}");
+        .logicalPlan();
+
+    PlanProperties planProperties = plan.getProperties();
+    Assert.assertEquals("optiq", planProperties.generator.type);
+    Assert.assertEquals("na", planProperties.generator.info);
+    Assert.assertEquals(1, planProperties.version);
+    Assert.assertEquals(PlanProperties.PlanType.APACHE_DRILL_LOGICAL, 
planProperties.type);
+    Map<String, StorageEngineConfig> seConfigs = plan.getStorageEngines();
+    StorageEngineConfig config = seConfigs.get("donuts-json");
+    Assert.assertTrue(config != null && config instanceof 
ClasspathRSE.ClasspathRSEConfig);
+    config = seConfigs.get("queue");
+    Assert.assertTrue(config != null && config instanceof 
QueueRSE.QueueRSEConfig);
+    Scan scan = findOnlyOperator(plan, Scan.class);
+    Assert.assertEquals("donuts-json", scan.getStorageEngine());
+    Assert.assertEquals("_MAP", scan.getOutputReference().getPath());
+    Project project = findOnlyOperator(plan, Project.class);
+    Assert.assertEquals(1, project.getSelections().length);
+    Assert.assertEquals(Scan.class, project.getInput().getClass());
+    Store store = findOnlyOperator(plan, Store.class);
+    Assert.assertEquals("queue", store.getStorageEngine());
+    Assert.assertEquals("output sink", store.getMemo());
+    Assert.assertEquals(Project.class, store.getInput().getClass());
   }
 
   /**
@@ -174,21 +204,52 @@ public class JdbcTest {
         .returns("NAME=Raised; XX=null\n" + "NAME=Filled; XX=null\n" + 
"NAME=Apple Fritter; XX=null\n");
   }
 
+  private static <T extends LogicalOperator> Iterable<T> 
findOperator(LogicalPlan plan, final Class<T> operatorClazz) {
+    return (Iterable<T>) Iterables.filter(plan.getSortedOperators(), new 
Predicate<LogicalOperator>() {
+      @Override
+      public boolean apply(LogicalOperator input) {
+        return input.getClass().equals(operatorClazz);
+      }
+    });
+  }
+
+  private static <T extends LogicalOperator> T findOnlyOperator(LogicalPlan 
plan, final Class<T> operatorClazz) {
+    return Iterables.getOnlyElement(findOperator(plan, operatorClazz));
+  }
+
   @Test
   public void testProjectFilterSubqueryPlan() throws Exception {
-    JdbcAssert
+    LogicalPlan plan = JdbcAssert
         .withModel(MODEL, "DONUTS")
         .sql(
             "select d['name'] as name, d['xx'] as xx from (\n" + " select 
_MAP['donuts'] as d from donuts)\n"
                 + "where cast(d['ppu'] as double) > 0.6")
-        .planContains(
-            
"{'head':{'type':'APACHE_DRILL_LOGICAL','version':'1','generator':{'type':'optiq','info':'na'}},'storage':{'donuts-json':{'type':'classpath'},'queue':{'type':'queue'}},"
-                + "'query':["
-                + 
"{'op':'scan','memo':'initial_scan','ref':'_MAP','storageengine':'donuts-json','selection':{'path':'/donuts.json','type':'JSON'},'@id':1},"
-                + "{'op':'filter','input':1,'expr':'(_MAP.donuts.ppu > 
0.6)','@id':2},"
-                + 
"{'op':'project','input':2,'projections':[{'expr':'_MAP.donuts','ref':'output.D'}],'@id':3},"
-                + 
"{'op':'project','input':3,'projections':[{'expr':'D.name','ref':'output.NAME'},{'expr':'D.xx','ref':'output.XX'}],'@id':4},"
-                + 
"{'op':'store','input':4,'storageengine':'queue','memo':'output 
sink','target':{'number':0},'@id':5}]}");
+        .logicalPlan();
+    PlanProperties planProperties = plan.getProperties();
+    Assert.assertEquals("optiq", planProperties.generator.type);
+    Assert.assertEquals("na", planProperties.generator.info);
+    Assert.assertEquals(1, planProperties.version);
+    Assert.assertEquals(PlanProperties.PlanType.APACHE_DRILL_LOGICAL, 
planProperties.type);
+    Map<String, StorageEngineConfig> seConfigs = plan.getStorageEngines();
+    StorageEngineConfig config = seConfigs.get("donuts-json");
+    Assert.assertTrue(config != null && config instanceof 
ClasspathRSE.ClasspathRSEConfig);
+    config = seConfigs.get("queue");
+    Assert.assertTrue(config != null && config instanceof 
QueueRSE.QueueRSEConfig);
+    Scan scan = findOnlyOperator(plan, Scan.class);
+    Assert.assertEquals("donuts-json", scan.getStorageEngine());
+    Assert.assertEquals("_MAP", scan.getOutputReference().getPath());
+    Filter filter = findOnlyOperator(plan, Filter.class);
+    Assert.assertTrue(filter.getInput() instanceof Scan);
+    Project[] projects = Iterables.toArray(findOperator(plan, Project.class), 
Project.class);
+    Assert.assertEquals(2, projects.length);
+    Assert.assertEquals(1, projects[0].getSelections().length);
+    Assert.assertEquals(Filter.class, projects[0].getInput().getClass());
+    Assert.assertEquals(2, projects[1].getSelections().length);
+    Assert.assertEquals(Project.class, projects[1].getInput().getClass());
+    Store store = findOnlyOperator(plan, Store.class);
+    Assert.assertEquals("queue", store.getStorageEngine());
+    Assert.assertEquals("output sink", store.getMemo());
+    Assert.assertEquals(Project.class, store.getInput().getClass());
   }
 
   /** Query that projects one field. (Disabled; uses sugared syntax.) */
@@ -221,56 +282,57 @@ public class JdbcTest {
   public void testDistinct() throws Exception {
     JdbcAssert.withModel(MODEL, "HR").sql("select distinct deptId from emp")
         .returnsUnordered("DEPTID=null", "DEPTID=31", "DEPTID=34", "DEPTID=33")
-        .planContains("\"op\":\"collapsingaggregate\"");
+        .planContains(CollapsingAggregate.class);
   }
 
   @Test
   public void testCountNoGroupBy() throws Exception {
     // 5 out of 6 employees have a not-null deptId
     JdbcAssert.withModel(MODEL, "HR").sql("select count(deptId) as cd, 
count(*) as c from emp").returns("CD=5; C=6\n")
-        .planContains("\"op\":\"collapsingaggregate\"");
+        .planContains(CollapsingAggregate.class);
   }
 
   @Test
   public void testDistinctCountNoGroupBy() throws Exception {
     JdbcAssert.withModel(MODEL, "HR").sql("select count(distinct deptId) as c 
from emp").returns("C=3\n")
-        .planContains("\"op\":\"collapsingaggregate\"");
+        .planContains(CollapsingAggregate.class);
   }
 
   @Test
   public void testDistinctCountGroupByEmpty() throws Exception {
     JdbcAssert.withModel(MODEL, "HR").sql("select count(distinct deptId) as c 
from emp group by ()").returns("C=3\n")
-        .planContains("\"op\":\"collapsingaggregate\"");
+        .planContains(CollapsingAggregate.class);
   }
 
   @Test
   public void testCountNull() throws Exception {
     JdbcAssert.withModel(MODEL, "HR").sql("select count(distinct deptId) as c 
from emp group by ()").returns("C=3\n")
-        .planContains("\"op\":\"collapsingaggregate\"");
+        .planContains(CollapsingAggregate.class);
   }
 
   @Test
   public void testCount() throws Exception {
     JdbcAssert.withModel(MODEL, "HR").sql("select deptId, count(*) as c from 
emp group by deptId")
         .returnsUnordered("DEPTID=31; C=1", "DEPTID=33; C=2", "DEPTID=34; 
C=2", "DEPTID=null; C=1")
-        .planContains("\"op\":\"collapsingaggregate\""); // make sure using 
drill
+        .planContains(CollapsingAggregate.class); // make sure using drill
   }
 
   @Test
   public void testJoin() throws Exception {
-    JdbcAssert
+    Join join = JdbcAssert
         .withModel(MODEL, "HR")
         .sql("select * from emp join dept on emp.deptId = dept.deptId")
         .returnsUnordered("DEPTID=31; LASTNAME=Rafferty; DEPTID0=31; 
NAME=Sales",
             "DEPTID=33; LASTNAME=Jones; DEPTID0=33; NAME=Engineering",
             "DEPTID=33; LASTNAME=Steinberg; DEPTID0=33; NAME=Engineering",
             "DEPTID=34; LASTNAME=Robinson; DEPTID0=34; NAME=Clerical",
-            "DEPTID=34; LASTNAME=Smith; DEPTID0=34; 
NAME=Clerical").planContains("'type':'inner'");
+            "DEPTID=34; LASTNAME=Smith; DEPTID0=34; 
NAME=Clerical").planContains(Join.class);
+    Assert.assertEquals(Join.JoinType.INNER, join.getJointType());
   }
 
   @Test
   public void testLeftJoin() throws Exception {
-    JdbcAssert
+    Join join = JdbcAssert
         .withModel(MODEL, "HR")
         .sql("select * from emp left join dept on emp.deptId = dept.deptId")
         .returnsUnordered("DEPTID=31; LASTNAME=Rafferty; DEPTID0=31; 
NAME=Sales",
@@ -278,7 +340,8 @@ public class JdbcTest {
             "DEPTID=33; LASTNAME=Steinberg; DEPTID0=33; NAME=Engineering",
             "DEPTID=34; LASTNAME=Robinson; DEPTID0=34; NAME=Clerical",
             "DEPTID=34; LASTNAME=Smith; DEPTID0=34; NAME=Clerical",
-            "DEPTID=null; LASTNAME=John; DEPTID0=null; 
NAME=null").planContains("'type':'left'");
+            "DEPTID=null; LASTNAME=John; DEPTID0=null; 
NAME=null").planContains(Join.class);
+    Assert.assertEquals(Join.JoinType.LEFT, join.getJointType());
   }
 
   /**
@@ -286,13 +349,14 @@ public class JdbcTest {
    */
   @Test @Ignore
   public void testRightJoin() throws Exception {
-    JdbcAssert.withModel(MODEL, "HR").sql("select * from emp right join dept 
on emp.deptId = dept.deptId")
-        .returnsUnordered("xx").planContains("'type':'left'");
+    Join join = JdbcAssert.withModel(MODEL, "HR").sql("select * from emp right 
join dept on emp.deptId = dept.deptId")
+        .returnsUnordered("xx").planContains(Join.class);
+    Assert.assertEquals(Join.JoinType.LEFT, join.getJointType());
   }
 
   @Test
   public void testFullJoin() throws Exception {
-    JdbcAssert
+    Join join = JdbcAssert
         .withModel(MODEL, "HR")
         .sql("select * from emp full join dept on emp.deptId = dept.deptId")
         .returnsUnordered("DEPTID=31; LASTNAME=Rafferty; DEPTID0=31; 
NAME=Sales",
@@ -301,7 +365,8 @@ public class JdbcTest {
             "DEPTID=34; LASTNAME=Robinson; DEPTID0=34; NAME=Clerical",
             "DEPTID=34; LASTNAME=Smith; DEPTID0=34; NAME=Clerical",
             "DEPTID=null; LASTNAME=John; DEPTID0=null; NAME=null",
-            "DEPTID=null; LASTNAME=null; DEPTID0=35; 
NAME=Marketing").planContains("'type':'outer'");
+            "DEPTID=null; LASTNAME=null; DEPTID0=35; 
NAME=Marketing").planContains(Join.class);
+    Assert.assertEquals(Join.JoinType.OUTER, join.getJointType());
   }
 
   /**
@@ -310,7 +375,7 @@ public class JdbcTest {
    */
   @Test
   public void testJoinOnSubquery() throws Exception {
-    JdbcAssert
+    Join join = JdbcAssert
         .withModel(MODEL, "HR")
         .sql(
             "select * from (\n" + "select deptId, lastname, 'x' as name from 
emp) as e\n"
@@ -319,7 +384,8 @@ public class JdbcTest {
             "DEPTID=33; LASTNAME=Jones; NAME=x; DEPTID0=33; NAME0=Engineering",
             "DEPTID=33; LASTNAME=Steinberg; NAME=x; DEPTID0=33; 
NAME0=Engineering",
             "DEPTID=34; LASTNAME=Robinson; NAME=x; DEPTID0=34; NAME0=Clerical",
-            "DEPTID=34; LASTNAME=Smith; NAME=x; DEPTID0=34; 
NAME0=Clerical").planContains("'type':'inner'");
+            "DEPTID=34; LASTNAME=Smith; NAME=x; DEPTID0=34; 
NAME0=Clerical").planContains(Join.class);
+    Assert.assertEquals(Join.JoinType.INNER, join.getJointType());
   }
 
   /** Tests that one of the FoodMart tables is present. */
@@ -335,16 +401,18 @@ public class JdbcTest {
 
   @Test
   public void testUnionAll() throws Exception {
-    JdbcAssert.withModel(MODEL, "HR").sql("select deptId from dept\n" + "union 
all\n" + "select deptId from emp")
+    Union union = JdbcAssert.withModel(MODEL, "HR").sql("select deptId from 
dept\n" + "union all\n" + "select deptId from emp")
         .returnsUnordered("DEPTID=31", "DEPTID=33", "DEPTID=34", "DEPTID=35", 
"DEPTID=null")
-        .planContains("'op':'union','distinct':false");
+        .planContains(Union.class);
+    Assert.assertFalse(union.isDistinct());
   }
 
   @Test
   public void testUnion() throws Exception {
-    JdbcAssert.withModel(MODEL, "HR").sql("select deptId from dept\n" + 
"union\n" + "select deptId from emp")
+    Union union = JdbcAssert.withModel(MODEL, "HR").sql("select deptId from 
dept\n" + "union\n" + "select deptId from emp")
         .returnsUnordered("DEPTID=31", "DEPTID=33", "DEPTID=34", "DEPTID=35", 
"DEPTID=null")
-        .planContains("'op':'union','distinct':true");
+        .planContains(Union.class);
+    Assert.assertTrue(union.isDistinct());
   }
 
   @Test
@@ -356,7 +424,7 @@ public class JdbcTest {
         .returns(
             "DEPTID=null; LASTNAME=John\n" + "DEPTID=34; LASTNAME=Robinson\n" 
+ "DEPTID=34; LASTNAME=Smith\n"
                 + "DEPTID=33; LASTNAME=Jones\n" + "DEPTID=33; 
LASTNAME=Steinberg\n" + "DEPTID=31; LASTNAME=Rafferty\n")
-        .planContains("'op':'order'");
+        .planContains(Order.class);
   }
 
   @Test
@@ -368,7 +436,7 @@ public class JdbcTest {
         .returns(
             "DEPTID=34; LASTNAME=Robinson\n" + "DEPTID=34; LASTNAME=Smith\n" + 
"DEPTID=33; LASTNAME=Jones\n"
                 + "DEPTID=33; LASTNAME=Steinberg\n" + "DEPTID=31; 
LASTNAME=Rafferty\n" + "DEPTID=null; LASTNAME=John\n")
-        .planContains("'op':'order'");
+        .planContains(Order.class);
   }
 
   @Test @Ignore
@@ -381,7 +449,7 @@ public class JdbcTest {
         .returns(
             "DEPTID=null; LASTNAME=John\n" + "DEPTID=34; LASTNAME=Robinson\n" 
+ "DEPTID=34; LASTNAME=Smith\n"
                 + "DEPTID=33; LASTNAME=Jones\n" + "DEPTID=33; 
LASTNAME=Steinberg\n" + "DEPTID=31; LASTNAME=Rafferty\n")
-        .planContains("'op':'order'");
+        .planContains(Order.class);
   }
 
   @Test
@@ -397,7 +465,7 @@ public class JdbcTest {
             + "DEPTID=34; LASTNAME=Robinson\n"
             + "DEPTID=34; LASTNAME=Smith\n"
             + "DEPTID=null; LASTNAME=John\n")
-        .planContains("'op':'order'");
+        .planContains(Order.class);
   }
 
   @Test
@@ -405,9 +473,9 @@ public class JdbcTest {
     JdbcAssert
         .withModel(MODEL, "HR")
         .sql("select LASTNAME from emp limit 2")
-        .planContains("\"op\":\"limit\"")
         .returns("LASTNAME=Rafferty\n" +
-            "LASTNAME=Jones");
+            "LASTNAME=Jones")
+        .planContains(Limit.class);
   }
 
   @Test
@@ -415,10 +483,11 @@ public class JdbcTest {
     JdbcAssert
         .withModel(MODEL, "HR")
         .sql("select LASTNAME from emp order by LASTNAME asc offset 3")
-        .planContains("\"op\":\"limit\"")
         .returns("LASTNAME=Robinson\n" +
             "LASTNAME=Smith\n" +
-            "LASTNAME=John");
+            "LASTNAME=John")
+        .planContains(Limit.class);
+
   }
 
   @Test
@@ -426,9 +495,9 @@ public class JdbcTest {
     JdbcAssert
         .withModel(MODEL, "HR")
         .sql("select LASTNAME from emp order by LASTNAME asc offset 3 fetch 
next 2 rows only")
-        .planContains("\"op\":\"limit\"")
         .returns("LASTNAME=Robinson\n" +
-            "LASTNAME=Smith");
+            "LASTNAME=Smith")
+        .planContains(Limit.class);
   }
 }
 

Reply via email to