Updates for Parquet varlen merge
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/c5013525 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/c5013525 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/c5013525 Branch: refs/heads/master Commit: c501352565d1c8e39e1f6e7e66d9fa3a0fe2bf9e Parents: 556bd96 Author: Jacques Nadeau <[email protected]> Authored: Thu Aug 15 20:38:04 2013 -0700 Committer: Jacques Nadeau <[email protected]> Committed: Thu Aug 15 20:38:04 2013 -0700 ---------------------------------------------------------------------- .../exec/store/parquet/ParquetGroupScan.java | 34 ++++++----- .../store/parquet/ParquetRecordReaderTest.java | 61 +++++++++++++++----- .../exec/store/parquet/TestFileGenerator.java | 8 ++- .../resources/parquet/parquet_scan_screen.json | 44 ++++++++++++++ .../parquet_scan_screen_read_entry_replace.json | 39 +++++++++++++ .../parquet_scan_union_screen_physical.json | 35 +++++++++++ .../src/test/resources/parquet_scan_screen.json | 44 -------------- .../parquet_scan_screen_read_entry_replace.json | 39 ------------- .../parquet_scan_union_screen_physical.json | 35 ----------- 9 files changed, 186 insertions(+), 153 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c5013525/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java index 66c1550..9e48d33 100644 --- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java @@ -18,16 +18,14 @@ package org.apache.drill.exec.store.parquet; import java.io.IOException; -import java.util.*; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; -import com.fasterxml.jackson.annotation.JacksonInject; -import com.fasterxml.jackson.annotation.JsonIgnore; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.annotation.JsonTypeName; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; - -import org.apache.drill.common.JSONOptions; import org.apache.drill.common.config.DrillConfig; import org.apache.drill.exec.exception.SetupException; import org.apache.drill.exec.physical.EndpointAffinity; @@ -38,13 +36,9 @@ import org.apache.drill.exec.physical.base.AbstractGroupScan; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.base.Size; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; - -import com.google.common.base.Preconditions; - -import org.apache.drill.exec.server.DrillbitContext; -import org.apache.drill.exec.store.StorageEngineRegistry; import org.apache.drill.exec.store.AffinityCalculator; -import org.apache.drill.exec.store.mock.MockGroupScanPOP; +import org.apache.drill.exec.store.StorageEngineRegistry; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -54,6 +48,12 @@ import parquet.hadoop.metadata.ColumnChunkMetaData; import parquet.hadoop.metadata.ParquetMetadata; import parquet.org.codehaus.jackson.annotate.JsonCreator; +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.google.common.base.Preconditions; + @JsonTypeName("parquet-scan") public class ParquetGroupScan extends AbstractGroupScan { @@ -116,8 +116,10 @@ public class ParquetGroupScan extends AbstractGroupScan { ColumnChunkMetaData columnChunkMetaData; for (ReadEntryWithPath readEntryWithPath : entries){ Path path = new Path(readEntryWithPath.getPath()); - ParquetMetadata footer = ParquetFileReader.readFooter(this.storageEngine.getHadoopConfig(), path); +// FileSystem fs = FileSystem.get(this.storageEngine.getHadoopConfig()); +// FileStatus status = fs.getFileStatus(path); +// ParquetMetadata footer = ParquetFileReader.readFooter(this.storageEngine.getHadoopConfig(), status); readEntryWithPath.getPath(); int i = 0; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c5013525/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java index 5628f50..1d91455 100644 --- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java +++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java @@ -27,6 +27,7 @@ import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import org.apache.drill.common.config.DrillConfig; import org.apache.drill.common.types.TypeProtos; @@ -46,6 +47,7 @@ import org.apache.drill.exec.server.RemoteServiceSet; import org.apache.drill.exec.store.parquet.TestFileGenerator.FieldInfo; import org.apache.drill.exec.vector.BaseDataValueVector; import org.apache.drill.exec.vector.ValueVector; +import org.junit.BeforeClass; import org.junit.Test; import parquet.bytes.BytesInput; @@ -57,6 +59,7 @@ import parquet.hadoop.metadata.ParquetMetadata; import parquet.schema.MessageType; import com.google.common.base.Charsets; +import com.google.common.base.Stopwatch; import com.google.common.collect.Lists; import com.google.common.io.Files; import com.google.common.util.concurrent.SettableFuture; @@ -66,23 +69,38 @@ public class ParquetRecordReaderTest { private boolean VERBOSE_DEBUG = false; - - public static void main(String[] args) throws Exception{ - new ParquetRecordReaderTest().testMultipleRowGroupsAndReadsEvent(); - } + static final int numberRowGroups = 20; + static final int recordsPerRowGroup = 300000; + static final String fileName = "/tmp/parquet_test_file_many_types"; - - @Test - public void testMultipleRowGroupsAndReadsEvent() throws Exception { - String planName = "/parquet_scan_screen.json"; - String fileName = "/tmp/parquet_test_file_many_types"; - int numberRowGroups = 20; - int recordsPerRowGroup = 300000; + @BeforeClass + public static void generateFile() throws Exception{ File f = new File(fileName); if(!f.exists()) TestFileGenerator.generateParquetFile(fileName, numberRowGroups, recordsPerRowGroup); - testParquetFullEngineLocal(planName, fileName, 2, numberRowGroups, recordsPerRowGroup); + } + + @Test + public void testMultipleRowGroupsAndReads() throws Exception { + String planName = "/parquet/parquet_scan_screen.json"; + testParquetFullEngineLocalPath(planName, fileName, 2, numberRowGroups, recordsPerRowGroup); + } + + @Test + public void testMultipleRowGroupsAndReads2() throws Exception { + String readEntries; + readEntries = ""; + // number of times to read the file + int i = 3; + for (int j = 0; j < i; j++){ + readEntries += "{path: \""+fileName+"\"}"; + if (j < i - 1) + readEntries += ","; + } + String planText = Files.toString(FileUtils.getResourceAsFile("/parquet/parquet_scan_screen_read_entry_replace.json"), Charsets.UTF_8).replaceFirst( "&REPLACED_IN_PARQUET_TEST&", readEntries); + testParquetFullEngineLocalText(planText, fileName, i, numberRowGroups, recordsPerRowGroup); } + private class ParquetResultListener implements UserResultsListener { private SettableFuture<Void> future = SettableFuture.create(); RecordBatchLoader batchLoader; @@ -198,6 +216,8 @@ public class ParquetRecordReaderTest { } + + public void testParquetFullEngineRemote(String plan, String filename, int numberOfTimesRead /* specified in json plan */, int numberOfRowGroups, int recordsPerRowGroup) throws Exception{ DrillConfig config = DrillConfig.create(); @@ -212,8 +232,13 @@ public class ParquetRecordReaderTest { } - // specific tests should call this method, but it is not marked as a test itself intentionally - public void testParquetFullEngineLocal(String plan, String filename, int numberOfTimesRead /* specified in json plan */, int numberOfRowGroups, int recordsPerRowGroup) throws Exception{ + + public void testParquetFullEngineLocalPath(String planFileName, String filename, int numberOfTimesRead /* specified in json plan */, int numberOfRowGroups, int recordsPerRowGroup) throws Exception{ + testParquetFullEngineLocalText(Files.toString(FileUtils.getResourceAsFile(planFileName), Charsets.UTF_8), filename, numberOfTimesRead, numberOfRowGroups, recordsPerRowGroup); + } + + //specific tests should call this method, but it is not marked as a test itself intentionally + public void testParquetFullEngineLocalText(String planText, String filename, int numberOfTimesRead /* specified in json plan */, int numberOfRowGroups, int recordsPerRowGroup) throws Exception{ RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); @@ -224,8 +249,11 @@ public class ParquetRecordReaderTest { client.connect(); RecordBatchLoader batchLoader = new RecordBatchLoader(client.getAllocator()); ParquetResultListener resultListener = new ParquetResultListener(recordsPerRowGroup, batchLoader, numberOfRowGroups, numberOfTimesRead); - client.runQuery(UserProtos.QueryType.LOGICAL, Files.toString(FileUtils.getResourceAsFile(plan), Charsets.UTF_8), resultListener); + Stopwatch watch = new Stopwatch().start(); + client.runQuery(UserProtos.QueryType.LOGICAL, planText, resultListener); resultListener.get(); + System.out.println(String.format("Took %d ms to run query", watch.elapsed(TimeUnit.MILLISECONDS))); + } } @@ -312,7 +340,8 @@ public class ParquetRecordReaderTest { assertEquals(footer.getFile().getName(), keyValueMetaData.get(footer.getFile().getName())); } } - + + private void validateContains(MessageType schema, PageReadStore pages, String[] path, int values, BytesInput bytes) throws IOException { PageReader pageReader = pages.getPageReader(schema.getColumnDescription(path)); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c5013525/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestFileGenerator.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestFileGenerator.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestFileGenerator.java index 72f9123..1f1f01b 100644 --- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestFileGenerator.java +++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestFileGenerator.java @@ -54,7 +54,7 @@ public class TestFileGenerator { this.parquetType = parquetType; this.name = name; this.bitLength = bitLength; - this.numberOfPages = Math.max(1, (int) Math.ceil(recordsPerRowGroup * bitLength / 8.0 / bytesPerPage)); + this.numberOfPages = Math.max(1, (int) Math.ceil( ((long) recordsPerRowGroup) * bitLength / 8.0 / bytesPerPage)); this.values = values; // generator is designed to use 3 values assert values.length == 3; @@ -91,7 +91,7 @@ public class TestFileGenerator { fields.put("bigInt/", new FieldInfo(recordsPerRowGroup, "int64", "bigInt", 64, longVals, TypeProtos.MinorType.BIGINT)); fields.put("f/", new FieldInfo(recordsPerRowGroup, "float", "f", 32, floatVals, TypeProtos.MinorType.FLOAT4)); fields.put("d/", new FieldInfo(recordsPerRowGroup, "double", "d", 64, doubleVals, TypeProtos.MinorType.FLOAT8)); - // fields.put("b/", new FieldInfo("binary", "b", 1, boolVals, TypeProtos.MinorType.BIT)); + fields.put("b/", new FieldInfo(recordsPerRowGroup, "boolean", "b", 1, boolVals, TypeProtos.MinorType.BIT)); fields.put("bin/", new FieldInfo(recordsPerRowGroup, "binary", "bin", -1, binVals, TypeProtos.MinorType.VARBINARY)); fields.put("bin2/", new FieldInfo(recordsPerRowGroup, "binary", "bin2", -1, bin2Vals, TypeProtos.MinorType.VARBINARY)); return fields; @@ -129,6 +129,8 @@ public class TestFileGenerator { int valsWritten; for (int k = 0; k < numberRowGroups; k++) { w.startBlock(1); + currentBooleanByte = 0; + booleanBitCounter.reset(); for (FieldInfo fieldInfo : fields.values()) { @@ -143,7 +145,7 @@ public class TestFileGenerator { ColumnDescriptor c1 = schema.getColumnDescription(path1); w.startColumn(c1, recordsPerRowGroup, codec); - int valsPerPage = (int) Math.ceil(recordsPerRowGroup / (float) ((int) fieldInfo.numberOfPages)); + int valsPerPage = (int) Math.ceil(recordsPerRowGroup / (float) fieldInfo.numberOfPages); byte[] bytes; // for variable length binary fields int bytesNeededToEncodeLength = 4; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c5013525/sandbox/prototype/exec/java-exec/src/test/resources/parquet/parquet_scan_screen.json ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/test/resources/parquet/parquet_scan_screen.json b/sandbox/prototype/exec/java-exec/src/test/resources/parquet/parquet_scan_screen.json new file mode 100644 index 0000000..29cab68 --- /dev/null +++ b/sandbox/prototype/exec/java-exec/src/test/resources/parquet/parquet_scan_screen.json @@ -0,0 +1,44 @@ +{ + head:{ + type:"APACHE_DRILL_LOGICAL", + version:"1", + generator:{ + type:"manual", + info:"na" + } + }, + storage:{ + "parquet" : + { + "type":"parquet", + "dfsName" : "file:///" + } + }, + query:[ + { + @id:"1", + op:"scan", + memo:"initial_scan", + storageengine:"parquet", + selection: [ + { + path: "/tmp/parquet_test_file_many_types" + }, + { + path: "/tmp/parquet_test_file_many_types" + } + ] + }, + { + @id:"2", + input: 1, + op: "store", + memo: "output sink", + target: { + file: "console:///stdout" + } + + } + + ] +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c5013525/sandbox/prototype/exec/java-exec/src/test/resources/parquet/parquet_scan_screen_read_entry_replace.json ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/test/resources/parquet/parquet_scan_screen_read_entry_replace.json b/sandbox/prototype/exec/java-exec/src/test/resources/parquet/parquet_scan_screen_read_entry_replace.json new file mode 100644 index 0000000..af76e01 --- /dev/null +++ b/sandbox/prototype/exec/java-exec/src/test/resources/parquet/parquet_scan_screen_read_entry_replace.json @@ -0,0 +1,39 @@ +{ + head:{ + type:"APACHE_DRILL_LOGICAL", + version:"1", + generator:{ + type:"manual", + info:"na" + } + }, + storage:{ + "parquet" : + { + "type":"parquet", + "dfsName" : "file:///" + } + }, + query:[ + { + @id:"1", + op:"scan", + memo:"initial_scan", + storageengine:"parquet", + selection: [ + &REPLACED_IN_PARQUET_TEST& + ] + }, + { + @id:"2", + input: 1, + op: "store", + memo: "output sink", + target: { + file: "console:///stdout" + } + + } + + ] +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c5013525/sandbox/prototype/exec/java-exec/src/test/resources/parquet/parquet_scan_union_screen_physical.json ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/test/resources/parquet/parquet_scan_union_screen_physical.json b/sandbox/prototype/exec/java-exec/src/test/resources/parquet/parquet_scan_union_screen_physical.json new file mode 100644 index 0000000..f508d09 --- /dev/null +++ b/sandbox/prototype/exec/java-exec/src/test/resources/parquet/parquet_scan_union_screen_physical.json @@ -0,0 +1,35 @@ +{ + head : { + type : "APACHE_DRILL_PHYSICAL", + version : 1, + generator : { + type : "manual" + } + }, + graph : [ { + pop : "parquet-scan", + @id : 1, + entries : [ + { + path : "/tmp/testParquetFile_many_types_3" + }, + { + path : "/tmp/testParquetFile_many_types_3" + } + ], + storageengine:{ + "type":"parquet", + "dfsName" : "file:///" + } + }, + { + "@id": 2, + "child": 1, + "pop": "union-exchange" + }, + { + pop : "screen", + @id : 3, + child : 2 + } ] +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c5013525/sandbox/prototype/exec/java-exec/src/test/resources/parquet_scan_screen.json ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/test/resources/parquet_scan_screen.json b/sandbox/prototype/exec/java-exec/src/test/resources/parquet_scan_screen.json deleted file mode 100644 index 29cab68..0000000 --- a/sandbox/prototype/exec/java-exec/src/test/resources/parquet_scan_screen.json +++ /dev/null @@ -1,44 +0,0 @@ -{ - head:{ - type:"APACHE_DRILL_LOGICAL", - version:"1", - generator:{ - type:"manual", - info:"na" - } - }, - storage:{ - "parquet" : - { - "type":"parquet", - "dfsName" : "file:///" - } - }, - query:[ - { - @id:"1", - op:"scan", - memo:"initial_scan", - storageengine:"parquet", - selection: [ - { - path: "/tmp/parquet_test_file_many_types" - }, - { - path: "/tmp/parquet_test_file_many_types" - } - ] - }, - { - @id:"2", - input: 1, - op: "store", - memo: "output sink", - target: { - file: "console:///stdout" - } - - } - - ] -} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c5013525/sandbox/prototype/exec/java-exec/src/test/resources/parquet_scan_screen_read_entry_replace.json ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/test/resources/parquet_scan_screen_read_entry_replace.json b/sandbox/prototype/exec/java-exec/src/test/resources/parquet_scan_screen_read_entry_replace.json deleted file mode 100644 index af76e01..0000000 --- a/sandbox/prototype/exec/java-exec/src/test/resources/parquet_scan_screen_read_entry_replace.json +++ /dev/null @@ -1,39 +0,0 @@ -{ - head:{ - type:"APACHE_DRILL_LOGICAL", - version:"1", - generator:{ - type:"manual", - info:"na" - } - }, - storage:{ - "parquet" : - { - "type":"parquet", - "dfsName" : "file:///" - } - }, - query:[ - { - @id:"1", - op:"scan", - memo:"initial_scan", - storageengine:"parquet", - selection: [ - &REPLACED_IN_PARQUET_TEST& - ] - }, - { - @id:"2", - input: 1, - op: "store", - memo: "output sink", - target: { - file: "console:///stdout" - } - - } - - ] -} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c5013525/sandbox/prototype/exec/java-exec/src/test/resources/parquet_scan_union_screen_physical.json ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/test/resources/parquet_scan_union_screen_physical.json b/sandbox/prototype/exec/java-exec/src/test/resources/parquet_scan_union_screen_physical.json deleted file mode 100644 index 954082c..0000000 --- a/sandbox/prototype/exec/java-exec/src/test/resources/parquet_scan_union_screen_physical.json +++ /dev/null @@ -1,35 +0,0 @@ -{ - head : { - type : "APACHE_DRILL_PHYSICAL", - version : 1, - generator : { - type : "manual" - } - }, - graph : [ { - pop : "parquet-scan", - @id : 1, - entries : [ - { - path : "/tmp/testParquetFile_many_types_3" - }, - { - path : "/tmp/testParquetFile_many_types_3" - } - ], - storageengine:{ - "type":"parquet", - "dfsName" : "maprfs:///" - } - }, - { - "@id": 2, - "child": 1, - "pop": "union-exchange" - }, - { - pop : "screen", - @id : 3, - child : 2 - } ] -} \ No newline at end of file
