DRILL-824: MergingRecordBatch.next() fails to reallocate the outgoing vectors 
if copyFromSafe returns false.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/ef28054b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/ef28054b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/ef28054b

Branch: refs/heads/master
Commit: ef28054b4f8eb9f9b4fbaa069089e18536394632
Parents: e6a6652
Author: vkorukanti <[email protected]>
Authored: Fri May 23 09:53:38 2014 -0700
Committer: Jacques Nadeau <[email protected]>
Committed: Wed May 28 09:13:58 2014 -0700

----------------------------------------------------------------------
 .../impl/mergereceiver/MergingRecordBatch.java  |   4 +-
 .../drill/exec/TestQueriesOnLargeFile.java      | 118 +++++++++++++++++++
 .../complex/writer/TestJsonReaderLargeFile.java | 101 ----------------
 .../largefiles/merging_receiver_large_data.json | 113 ++++++++++++++++++
 4 files changed, 234 insertions(+), 102 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ef28054b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
index cc38cbe..e3f466a 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
@@ -253,13 +253,15 @@ public class MergingRecordBatch extends 
AbstractRecordBatch<MergingReceiverPOP>
       // pop next value from pq and copy to outgoing batch
       Node node = pqueue.peek();
       if (!copyRecordToOutgoingBatch(node)) {
+        logger.debug("Outgoing vectors space is full; breaking");
+        prevBatchWasFull = true;
         break;
       }
       pqueue.poll();
 
       if (isOutgoingFull()) {
         // set a flag so that we reallocate on the next iteration
-        logger.debug("Outgoing vectors are full; breaking");
+        logger.debug("Outgoing vectors record batch size reached; breaking");
         prevBatchWasFull = true;
       }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ef28054b/exec/java-exec/src/test/java/org/apache/drill/exec/TestQueriesOnLargeFile.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/TestQueriesOnLargeFile.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/TestQueriesOnLargeFile.java
new file mode 100644
index 0000000..879dc3c
--- /dev/null
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/TestQueriesOnLargeFile.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+import org.apache.drill.common.util.FileUtils;
+import org.apache.drill.BaseTestQuery;
+import org.apache.drill.exec.record.RecordBatchLoader;
+import org.apache.drill.exec.rpc.user.QueryResultBatch;
+import org.apache.drill.exec.vector.BigIntVector;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.PrintWriter;
+import java.util.List;
+
+import static org.junit.Assert.assertTrue;
+
+public class TestQueriesOnLargeFile extends BaseTestQuery {
+  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(TestQueriesOnLargeFile.class);
+
+  private static File dataFile = null;
+  private static int NUM_RECORDS = 15000;
+
+  @BeforeClass
+  public static void generateTestData() throws Exception {
+    // Generate a json file with NUM_RECORDS number of records
+    while (true) {
+      dataFile = File.createTempFile("drill-json", ".json");
+      if (dataFile.exists()) {
+        boolean success = dataFile.delete();
+        if (success) {
+          break;
+        }
+      }
+      logger.trace("retry creating tmp file");
+    }
+
+    PrintWriter printWriter = new PrintWriter(dataFile);
+
+    for (int i=1; i<=NUM_RECORDS; i++) {
+      printWriter.println("{");
+      printWriter.println("  \"id\" : " + Math.random() + ",");
+      printWriter.println("  \"summary\" : \"Apache Drill provides low latency 
ad-hoc queries to many different data sources, "+
+          "including nested data. Inspired by Google's Dremel, Drill is 
designed to scale to 10,000 servers and " +
+          "query petabytes of data in seconds.\"");
+      printWriter.println("}");
+    }
+
+    printWriter.close();
+  }
+
+  @Test
+  public void testRead() throws Exception {
+    List<QueryResultBatch> results = testSqlWithResults(
+        String.format("SELECT count(*) FROM dfs.`default`.`%s`", 
dataFile.getPath()));
+
+    RecordBatchLoader batchLoader = new RecordBatchLoader(getAllocator());
+
+    for(QueryResultBatch batch : results) {
+      batchLoader.load(batch.getHeader().getDef(), batch.getData());
+
+      if (batchLoader.getRecordCount() <= 0) {
+        continue;
+      }
+
+      BigIntVector countV = (BigIntVector) 
batchLoader.getValueAccessorById(BigIntVector.class, 0).getValueVector();
+      assertTrue("Total of "+ NUM_RECORDS + " records expected in count", 
countV.getAccessor().get(0) == NUM_RECORDS);
+
+      batchLoader.clear();
+      batch.release();
+    }
+  }
+
+  @Test
+  public void testMergingReceiver() throws Exception {
+    String plan = 
Files.toString(FileUtils.getResourceAsFile("/largefiles/merging_receiver_large_data.json"),
+        Charsets.UTF_8).replace("#{TEST_FILE}", dataFile.getPath());
+    List<QueryResultBatch> results = testPhysicalWithResults(plan);
+
+    int recordsInOutput = 0;
+    for(QueryResultBatch batch : results) {
+      recordsInOutput += batch.getHeader().getDef().getRecordCount();
+      batch.release();
+    }
+
+    assertTrue(String.format("Number of records in output is wrong: 
expected=%d, actual=%s",
+        NUM_RECORDS, recordsInOutput), NUM_RECORDS == recordsInOutput);
+  }
+
+  @AfterClass
+  public static void deleteTestData() throws Exception {
+    if (dataFile != null) {
+      if (dataFile.exists()) {
+        org.apache.commons.io.FileUtils.forceDelete(dataFile);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ef28054b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReaderLargeFile.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReaderLargeFile.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReaderLargeFile.java
deleted file mode 100644
index 643100a..0000000
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReaderLargeFile.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.drill.exec.vector.complex.writer;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.drill.BaseTestQuery;
-import org.apache.drill.exec.record.RecordBatchLoader;
-import org.apache.drill.exec.rpc.user.QueryResultBatch;
-import org.apache.drill.exec.vector.BigIntVector;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.io.File;
-import java.io.PrintWriter;
-import java.util.List;
-
-import static org.junit.Assert.assertTrue;
-
-public class TestJsonReaderLargeFile extends BaseTestQuery {
-  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(TestJsonReaderLargeFile.class);
-
-  private static File dataFile = null;
-  private static int NUM_RECORDS = 15000;
-
-  @BeforeClass
-  public static void generateTestData() throws Exception {
-    // Generate a json file with NUM_RECORDS number of records
-    while (true) {
-      dataFile = File.createTempFile("drill-json", ".json");
-      if (dataFile.exists()) {
-        boolean success = dataFile.delete();
-        if (success) {
-          break;
-        }
-      }
-      logger.trace("retry creating tmp file");
-    }
-
-    PrintWriter printWriter = new PrintWriter(dataFile);
-    String record = "{\n" +
-        "\"project\" : \"Drill\", \n" +
-        "\"summary\" : \"Apache Drill provides low latency ad-hoc queries to 
many different data sources, " +
-        "including nested data. Inspired by Google's Dremel, Drill is designed 
to scale to 10,000 servers and " +
-        "query petabytes of data in seconds.\"\n" +
-        "}";
-
-    for (int i=1; i<=NUM_RECORDS; i++) {
-      printWriter.println(record);
-    }
-
-    printWriter.close();
-  }
-
-  @Test
-  public void testRead() throws Exception {
-    List<QueryResultBatch> results = testSqlWithResults(
-        String.format("SELECT count(*) FROM dfs.`default`.`%s`", 
dataFile.getPath()));
-
-    RecordBatchLoader batchLoader = new RecordBatchLoader(getAllocator());
-
-    for(QueryResultBatch batch : results) {
-      batchLoader.load(batch.getHeader().getDef(), batch.getData());
-
-      if (batchLoader.getRecordCount() <= 0) {
-        continue;
-      }
-
-      BigIntVector countV = (BigIntVector) 
batchLoader.getValueAccessorById(BigIntVector.class, 0).getValueVector();
-      assertTrue("Total of "+ NUM_RECORDS + " records expected in count", 
countV.getAccessor().get(0) == NUM_RECORDS);
-
-      batchLoader.clear();
-      batch.release();
-    }
-  }
-
-  @AfterClass
-  public static void deleteTestData() throws Exception {
-    if (dataFile != null) {
-      if (dataFile.exists()) {
-        FileUtils.forceDelete(dataFile);
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ef28054b/exec/java-exec/src/test/resources/largefiles/merging_receiver_large_data.json
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/resources/largefiles/merging_receiver_large_data.json 
b/exec/java-exec/src/test/resources/largefiles/merging_receiver_large_data.json
new file mode 100644
index 0000000..3ce5526
--- /dev/null
+++ 
b/exec/java-exec/src/test/resources/largefiles/merging_receiver_large_data.json
@@ -0,0 +1,113 @@
+{
+  head:{
+    type:"APACHE_DRILL_PHYSICAL",
+    version:"1",
+    generator:{
+      type:"manual"
+    }
+  },
+  "graph" : [ {
+    "pop" : "fs-scan",
+    "@id" : 1,
+    "files" : [ "#{TEST_FILE}" ],
+    "storage" : {
+      "type" : "file",
+      "connection" : "file:///",
+      "workspaces" : {
+        "root" : {
+          "location" : "/",
+          "writable" : false,
+          "storageformat" : null
+        },
+        "tmp" : {
+          "location" : "/tmp",
+          "writable" : true,
+          "storageformat" : "csv"
+        }
+      },
+      "formats" : {
+        "psv" : {
+          "type" : "text",
+          "extensions" : [ "tbl" ],
+          "delimiter" : "|"
+        },
+        "csv" : {
+          "type" : "text",
+          "extensions" : [ "csv" ],
+          "delimiter" : ","
+        },
+        "tsv" : {
+          "type" : "text",
+          "extensions" : [ "tsv" ],
+          "delimiter" : "\t"
+        },
+        "parquet" : {
+          "type" : "parquet"
+        },
+        "json" : {
+          "type" : "json"
+        }
+      }
+    },
+    "format" : {
+      "type" : "json"
+    },
+    "columns" : [ "`id`", "`summary`" ],
+    "selectionRoot" : "#{TEST_FILE}"
+  }, {
+    "pop" : "project",
+    "@id" : 2,
+    "exprs" : [ {
+      "ref" : "`id`",
+      "expr" : "`id`"
+    }, {
+      "ref" : "`summary`",
+      "expr" : "`summary`"
+    } ],
+    "child" : 1,
+    "initialAllocation" : 1000000,
+    "maxAllocation" : 10000000000
+  }, {
+    "pop" : "hash-to-random-exchange",
+    "@id" : 3,
+    "child" : 2,
+    "expr" : "hash(`id`) ",
+    "initialAllocation" : 1000000,
+    "maxAllocation" : 10000000000
+  }, {
+    "pop" : "external-sort",
+    "@id" : 4,
+    "child" : 3,
+    "orderings" : [ {
+      "order" : "ASC",
+      "expr" : "`id`",
+      "nullDirection" : "UNSPECIFIED"
+    } ],
+    "reverse" : false,
+    "initialAllocation" : 1000000,
+    "maxAllocation" : 10000000000
+  }, {
+    "pop" : "selection-vector-remover",
+    "@id" : 5,
+    "child" : 4,
+    "initialAllocation" : 1000000,
+    "maxAllocation" : 10000000000
+  }, {
+    "pop" : "single-merge-exchange",
+    "@id" : 6,
+    "child" : 5,
+    "orderings" : [ {
+      "order" : "ASC",
+      "expr" : "`id`",
+      "nullDirection" : "UNSPECIFIED"
+    } ],
+    "initialAllocation" : 1000000,
+    "maxAllocation" : 10000000000
+  }, {
+    "pop" : "screen",
+    "@id" : 7,
+    "child" : 6,
+    "initialAllocation" : 1000000,
+    "maxAllocation" : 10000000000
+  } ]
+}
\ No newline at end of file

Reply via email to