DRILL-1241: Ensure that Limit produces at least 1 batch with the output schema.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/cca9ce18
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/cca9ce18
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/cca9ce18

Branch: refs/heads/master
Commit: cca9ce18110bfba7658f2874091f36f2976e47b8
Parents: c2df146
Author: Aman Sinha <asi...@maprtech.com>
Authored: Thu Aug 14 16:22:39 2014 -0700
Committer: Aditya Kishore <adi...@maprtech.com>
Committed: Mon Aug 18 14:56:52 2014 +0530

----------------------------------------------------------------------
 .../physical/impl/limit/LimitRecordBatch.java   | 34 ++++++++++++++++++++
 .../org/apache/drill/TestExampleQueries.java    | 10 ++++++
 2 files changed, 44 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cca9ce18/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
index 12ee406..32eb709 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
@@ -23,23 +23,29 @@ import 
org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.config.Limit;
+import org.apache.drill.exec.physical.impl.project.ProjectRecordBatch;
 import org.apache.drill.exec.record.AbstractSingleRecordBatch;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.TransferPair;
 import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.record.RecordBatch.IterOutcome;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 
 import com.google.common.collect.Lists;
 
 public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
 
+  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(LimitRecordBatch.class);
+  
   private SelectionVector2 outgoingSv;
   private SelectionVector2 incomingSv;
   private int recordsToSkip;
   private int recordsLeft;
   private boolean noEndLimit;
   private boolean skipBatch;
+  private boolean first = true;
+  private boolean done = false;
   List<TransferPair> transfers = Lists.newArrayList();
 
   public LimitRecordBatch(Limit popConfig, FragmentContext context, 
RecordBatch incoming) throws OutOfMemoryException {
@@ -82,7 +88,15 @@ public class LimitRecordBatch extends 
AbstractSingleRecordBatch<Limit> {
 
   @Override
   public IterOutcome innerNext() {
+    if (done) {
+      return IterOutcome.NONE;
+    }
+
     if(!noEndLimit && recordsLeft <= 0) {
+      if (first) {
+        return produceEmptyFirstBatch();
+      }
+      
       incoming.kill(true);
 
       IterOutcome upStream = incoming.next();
@@ -96,9 +110,11 @@ public class LimitRecordBatch extends 
AbstractSingleRecordBatch<Limit> {
         upStream = incoming.next();
       }
 
+      first = false;
       return IterOutcome.NONE;
     }
 
+    first = false;
     return super.innerNext();
   }
 
@@ -127,6 +143,23 @@ public class LimitRecordBatch extends 
AbstractSingleRecordBatch<Limit> {
     }
   }
 
+  private IterOutcome produceEmptyFirstBatch() {
+    incoming.next();
+    first = false;
+    done = true;
+    // Build the container schema and set the count
+    for (VectorWrapper<?> v : incoming) {
+      TransferPair pair = v.getValueVector().getTransferPair();
+      container.add(pair.getTo());
+      transfers.add(pair);
+    }
+    container.buildSchema(BatchSchema.SelectionVectorMode.TWO_BYTE);
+    container.setRecordCount(0);
+
+    incoming.kill(true);
+    return IterOutcome.OK_NEW_SCHEMA;
+  }
+  
   private void limitWithNoSV(int recordCount) {
     int offset = Math.max(0, Math.min(recordCount - 1, recordsToSkip));
     recordsToSkip -= offset;
@@ -178,4 +211,5 @@ public class LimitRecordBatch extends 
AbstractSingleRecordBatch<Limit> {
     outgoingSv.clear();
     super.cleanup();
   }
+  
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cca9ce18/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java 
b/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
index 8ed8171..11075f6 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
@@ -283,4 +283,14 @@ public class TestExampleQueries extends BaseTestQuery{
     test("select cast(r_name as varchar(20)) from cp.`tpch/region.parquet` 
order by r_name");
   }
 
+  @Test  // tests with LIMIT 0
+  public void testLimit0_1() throws Exception {
+    test("select n_nationkey, n_name from cp.`tpch/nation.parquet` limit 0");
+    test("select n_nationkey, n_name from cp.`tpch/nation.parquet` limit 0 
offset 5");
+    test("select n_nationkey, n_name from cp.`tpch/nation.parquet` order by 
n_nationkey limit 0");
+    test("select * from cp.`tpch/nation.parquet` limit 0");
+    test("select n.n_nationkey from cp.`tpch/nation.parquet` n, 
cp.`tpch/region.parquet` r where n.n_regionkey = r.r_regionkey limit 0");
+    test("select n_regionkey, count(*) from cp.`tpch/nation.parquet` group by 
n_regionkey limit 0");
+  }
+  
 }

Reply via email to