http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/63a90fe9/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/ParquetRecordReaderTest.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/ParquetRecordReaderTest.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/ParquetRecordReaderTest.java new file mode 100644 index 0000000..cf790ac --- /dev/null +++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/ParquetRecordReaderTest.java @@ -0,0 +1,657 @@ +/******************************************************************************* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ******************************************************************************/ +package org.apache.drill.exec.store; + +import com.beust.jcommander.internal.Lists; +import com.google.common.base.Charsets; +import com.google.common.io.Files; + +import com.google.common.util.concurrent.SettableFuture; +import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.common.types.TypeProtos; +import org.apache.drill.common.util.FileUtils; +import org.apache.drill.exec.client.DrillClient; +import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.physical.impl.OutputMutator; + +import org.apache.drill.exec.proto.UserProtos; +import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.record.RecordBatchLoader; +import org.apache.drill.exec.record.VectorWrapper; +import org.apache.drill.exec.rpc.RpcException; +import org.apache.drill.exec.rpc.user.QueryResultBatch; +import org.apache.drill.exec.rpc.user.UserResultsListener; +import org.apache.drill.exec.server.Drillbit; +import org.apache.drill.exec.server.RemoteServiceSet; + +import org.apache.drill.exec.store.json.JsonSchemaProvider; +import org.apache.drill.exec.store.parquet.ParquetStorageEngine; +import org.apache.drill.exec.vector.BaseDataValueVector; +import org.apache.drill.exec.vector.ValueVector; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.junit.Ignore; +import org.junit.Test; +import parquet.bytes.BytesInput; +import parquet.column.ColumnDescriptor; + +import parquet.hadoop.ParquetFileWriter; +import parquet.hadoop.metadata.CompressionCodecName; +import parquet.schema.MessageType; +import parquet.schema.MessageTypeParser; + +import java.util.*; + +import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static parquet.column.Encoding.PLAIN; + +public class ParquetRecordReaderTest { + org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetRecordReaderTest.class); + + private static final boolean VERBOSE_DEBUG = false; + + // { 00000001, 00000010, 00000100, 00001000, 00010000, ... } + byte[] bitFields = {1, 2, 4, 8, 16, 32, 64, -128}; + byte allBitsTrue = -1; + byte allBitsFalse = 0; + int DEFAULT_BYTES_PER_PAGE = 1024 * 1024 * 1; + static Object[] intVals = {-200, 100, Integer.MAX_VALUE }; + static Object[] longVals = { -5000l, 5000l, Long.MAX_VALUE}; + static Object[] floatVals = { 1.74f, Float.MAX_VALUE, Float.MIN_VALUE}; + static Object[] doubleVals = {100.45d, Double.MAX_VALUE, Double.MIN_VALUE,}; + static Object[] boolVals = {false, false, true}; + static byte[] varLen1 = {50, 51, 52, 53, 54, 55, 56, 57, 58, 59}; + static byte[] varLen2 = {15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1}; + static byte[] varLen3 = {100, 99, 98}; + static Object[] binVals = { varLen3, varLen2, varLen3}; + static Object[] bin2Vals = { varLen3, varLen2, varLen1}; + + private void populateFieldInfoMap(ParquetTestProperties props){ + props.fields.put("integer", new FieldInfo("int32", "integer", 32, intVals, TypeProtos.MinorType.INT, props)); + props.fields.put("bigInt", new FieldInfo("int64", "bigInt", 64, longVals, TypeProtos.MinorType.BIGINT, props)); + props.fields.put("f", new FieldInfo("float", "f", 32, floatVals, TypeProtos.MinorType.FLOAT4, props)); + props.fields.put("d", new FieldInfo("double", "d", 64, doubleVals, TypeProtos.MinorType.FLOAT8, props)); + props.fields.put("b", new FieldInfo("boolean", "b", 1, boolVals, TypeProtos.MinorType.BIT, props)); + props.fields.put("bin", new FieldInfo("binary", "bin", -1, binVals, TypeProtos.MinorType.VARBINARY, props)); + props.fields.put("bin2", new FieldInfo("binary", "bin2", -1, bin2Vals, TypeProtos.MinorType.VARBINARY, props)); + } + + @Test + public void testMultipleRowGroups() throws Exception { + HashMap<String, FieldInfo> fields = new HashMap<>(); + ParquetTestProperties props = new ParquetTestProperties(3, 3000, DEFAULT_BYTES_PER_PAGE, fields); + populateFieldInfoMap(props); + testParquetFullEngine(true, "/parquet_scan_screen.json", "/tmp/test.parquet", 1, props); + } + + // TODO - Test currently marked ignore to prevent breaking of the build process, requires a binary file that was + // generated using pig. Will need to find a good place to keep files like this. + // For now I will upload it to the JIRA as an attachment. + @Ignore + @Test + public void testNullableColumns() throws Exception { + HashMap<String, FieldInfo> fields = new HashMap<>(); + ParquetTestProperties props = new ParquetTestProperties(1, 3000000, DEFAULT_BYTES_PER_PAGE, fields); + Object[] boolVals = {true, null, null}; + props.fields.put("a", new FieldInfo("boolean", "a", 1, boolVals, TypeProtos.MinorType.BIT, props)); + testParquetFullEngine(false, "/parquet_nullable.json", "/tmp/nullable.parquet", 1, props); + } + + @Ignore + @Test + public void testNullableColumnsVarLen() throws Exception { + HashMap<String, FieldInfo> fields = new HashMap<>(); + ParquetTestProperties props = new ParquetTestProperties(1, 3000000, DEFAULT_BYTES_PER_PAGE, fields); + byte[] val = {'b'}; +// Object[] boolVals = { val, null, null}; +// Object[] boolVals = { null, null, null}; + Object[] boolVals = { val, val, val}; + props.fields.put("a", new FieldInfo("boolean", "a", 1, boolVals, TypeProtos.MinorType.BIT, props)); + testParquetFullEngine(false, "/parquet_nullable_varlen.json", "/tmp/nullable.parquet", 1, props); + } + + @Test + public void testMultipleRowGroupsAndReads() throws Exception { + HashMap<String, FieldInfo> fields = new HashMap<>(); + ParquetTestProperties props = new ParquetTestProperties(4, 3000, DEFAULT_BYTES_PER_PAGE, fields); + populateFieldInfoMap(props); + String readEntries = ""; + // number of times to read the file + int i = 3; + for (int j = 0; j < i; j++){ + readEntries += "{path: \"/tmp/test.parquet\"}"; + if (j < i - 1) + readEntries += ","; + } + testParquetFullEngineEventBased(true, "/parquet_scan_screen_read_entry_replace.json", readEntries, + "/tmp/test.parquet", i, props); + } + + @Test + public void testMultipleRowGroupsEvent() throws Exception { + HashMap<String, FieldInfo> fields = new HashMap<>(); + ParquetTestProperties props = new ParquetTestProperties(4, 3000, DEFAULT_BYTES_PER_PAGE, fields); + populateFieldInfoMap(props); + testParquetFullEngineEventBased(true, "/parquet_scan_screen.json", "/tmp/test.parquet", 1, props); + } + + + private class ParquetTestProperties{ + int numberRowGroups; + int recordsPerRowGroup; + int bytesPerPage = 1024 * 1024 * 1; + HashMap<String, FieldInfo> fields = new HashMap<>(); + + public ParquetTestProperties(int numberRowGroups, int recordsPerRowGroup, int bytesPerPage, + HashMap<String, FieldInfo> fields){ + this.numberRowGroups = numberRowGroups; + this.recordsPerRowGroup = recordsPerRowGroup; + this.bytesPerPage = bytesPerPage; + this.fields = fields; + } + + } + + private static class FieldInfo { + + String parquetType; + String name; + int bitLength; + int numberOfPages; + Object[] values; + TypeProtos.MinorType type; + + FieldInfo(String parquetType, String name, int bitLength, Object[] values, TypeProtos.MinorType type, ParquetTestProperties props){ + this.parquetType = parquetType; + this.name = name; + this.bitLength = bitLength; + this.numberOfPages = Math.max(1, (int) Math.ceil( ((long) props.recordsPerRowGroup) * bitLength / 8.0 / props.bytesPerPage)); + this.values = values; + // generator is designed to use 3 values + assert values.length == 3; + this.type = type; + } + } + + private String getResource(String resourceName) { + return "resource:" + resourceName; + } + + public void generateParquetFile(String filename, ParquetTestProperties props) throws Exception { + + int currentBooleanByte = 0; + WrapAroundCounter booleanBitCounter = new WrapAroundCounter(7); + + Configuration configuration = new Configuration(); + configuration.set(JsonSchemaProvider.HADOOP_DEFAULT_NAME, "file:///"); + //"message m { required int32 integer; required int64 integer64; required boolean b; required float f; required double d;}" + + FileSystem fs = FileSystem.get(configuration); + Path path = new Path(filename); + if (fs.exists(path)) fs.delete(path, false); + + + String messageSchema = "message m {"; + for (FieldInfo fieldInfo : props.fields.values()) { + messageSchema += " required " + fieldInfo.parquetType + " " + fieldInfo.name + ";"; + } + // remove the last semicolon, java really needs a join method for strings... + // TODO - nvm apparently it requires a semicolon after every field decl, might want to file a bug + //messageSchema = messageSchema.substring(schemaType, messageSchema.length() - 1); + messageSchema += "}"; + + MessageType schema = MessageTypeParser.parseMessageType(messageSchema); + + CompressionCodecName codec = CompressionCodecName.UNCOMPRESSED; + ParquetFileWriter w = new ParquetFileWriter(configuration, schema, path); + w.start(); + HashMap<String, Integer> columnValuesWritten = new HashMap(); + int valsWritten; + for (int k = 0; k < props.numberRowGroups; k++){ + w.startBlock(1); + currentBooleanByte = 0; + booleanBitCounter.reset(); + + for (FieldInfo fieldInfo : props.fields.values()) { + + if ( ! columnValuesWritten.containsKey(fieldInfo.name)){ + columnValuesWritten.put((String) fieldInfo.name, 0); + valsWritten = 0; + } else { + valsWritten = columnValuesWritten.get(fieldInfo.name); + } + + String[] path1 = {(String) fieldInfo.name}; + ColumnDescriptor c1 = schema.getColumnDescription(path1); + + w.startColumn(c1, props.recordsPerRowGroup, codec); + int valsPerPage = (int) Math.ceil(props.recordsPerRowGroup / (float) fieldInfo.numberOfPages); + byte[] bytes; + // for variable length binary fields + int bytesNeededToEncodeLength = 4; + if ((int) fieldInfo.bitLength > 0) { + bytes = new byte[(int) Math.ceil(valsPerPage * (int) fieldInfo.bitLength / 8.0)]; + } else { + // the twelve at the end is to account for storing a 4 byte length with each value + int totalValLength = ((byte[]) fieldInfo.values[0]).length + ((byte[]) fieldInfo.values[1]).length + ((byte[]) fieldInfo.values[2]).length + 3 * bytesNeededToEncodeLength; + // used for the case where there is a number of values in this row group that is not divisible by 3 + int leftOverBytes = 0; + if ( valsPerPage % 3 > 0 ) leftOverBytes += ((byte[])fieldInfo.values[1]).length + bytesNeededToEncodeLength; + if ( valsPerPage % 3 > 1 ) leftOverBytes += ((byte[])fieldInfo.values[2]).length + bytesNeededToEncodeLength; + bytes = new byte[valsPerPage / 3 * totalValLength + leftOverBytes]; + } + int bytesPerPage = (int) (valsPerPage * ((int) fieldInfo.bitLength / 8.0)); + int bytesWritten = 0; + for (int z = 0; z < (int) fieldInfo.numberOfPages; z++, bytesWritten = 0) { + for (int i = 0; i < valsPerPage; i++) { + //System.out.print(i + ", " + (i % 25 == 0 ? "\n gen " + fieldInfo.name + ": " : "")); + if (fieldInfo.values[0] instanceof Boolean) { + + bytes[currentBooleanByte] |= bitFields[booleanBitCounter.val] & ((boolean) fieldInfo.values[valsWritten % 3] + ? allBitsTrue : allBitsFalse); + booleanBitCounter.increment(); + if (booleanBitCounter.val == 0) { + currentBooleanByte++; + } + valsWritten++; + if (currentBooleanByte > bytesPerPage) break; + } else { + if (fieldInfo.values[valsWritten % 3] instanceof byte[]){ + System.arraycopy(ByteArrayUtil.toByta(((byte[])fieldInfo.values[valsWritten % 3]).length), + 0, bytes, bytesWritten, bytesNeededToEncodeLength); + try{ + System.arraycopy(fieldInfo.values[valsWritten % 3], + 0, bytes, bytesWritten + bytesNeededToEncodeLength, ((byte[])fieldInfo.values[valsWritten % 3]).length); + } + catch (Exception ex){ + Math.min(4, 5); + } + bytesWritten += ((byte[])fieldInfo.values[valsWritten % 3]).length + bytesNeededToEncodeLength; + } + else{ + System.arraycopy( ByteArrayUtil.toByta(fieldInfo.values[valsWritten % 3]), + 0, bytes, i * ((int) fieldInfo.bitLength / 8), (int) fieldInfo.bitLength / 8); + } + valsWritten++; + } + + } + w.writeDataPage((int) (props.recordsPerRowGroup / (int) fieldInfo.numberOfPages), bytes.length, BytesInput.from(bytes), PLAIN, PLAIN, PLAIN); + currentBooleanByte = 0; + } + w.endColumn(); + columnValuesWritten.remove((String) fieldInfo.name); + columnValuesWritten.put((String) fieldInfo.name, valsWritten); + } + + w.endBlock(); + } + w.end(new HashMap<String, String>()); + logger.debug("Finished generating parquet file."); + } + + private class ParquetResultListener implements UserResultsListener { + private SettableFuture<Void> future = SettableFuture.create(); + int count = 0; + RecordBatchLoader batchLoader; + + int batchCounter = 1; + HashMap<String, Integer> valuesChecked = new HashMap(); + ParquetTestProperties props; + + ParquetResultListener(RecordBatchLoader batchLoader, ParquetTestProperties props){ + this.batchLoader = batchLoader; + this.props = props; + } + + @Override + public void submissionFailed(RpcException ex) { + logger.debug("Submission failed.", ex); + future.setException(ex); + } + + @Override + public void resultArrived(QueryResultBatch result) { + logger.debug("result arrived in test batch listener."); + if(result.getHeader().getIsLastChunk()){ + future.set(null); + } + int columnValCounter = 0; + int i = 0; + FieldInfo currentField; + count += result.getHeader().getRowCount(); + boolean schemaChanged = false; + try { + schemaChanged = batchLoader.load(result.getHeader().getDef(), result.getData()); + } catch (SchemaChangeException e) { + throw new RuntimeException(e); + } + + int recordCount = 0; + // print headers. + if (schemaChanged) { + } // do not believe any change is needed for when the schema changes, with the current mock scan use case + + for (VectorWrapper vw : batchLoader) { + ValueVector vv = vw.getValueVector(); + currentField = props.fields.get(vv.getField().getName()); + if (VERBOSE_DEBUG){ + System.out.println("\n" + (String) currentField.name); + } + if ( ! valuesChecked.containsKey(vv.getField().getName())){ + valuesChecked.put(vv.getField().getName(), 0); + columnValCounter = 0; + } else { + columnValCounter = valuesChecked.get(vv.getField().getName()); + } + for (int j = 0; j < ((BaseDataValueVector)vv).getAccessor().getValueCount(); j++) { + if (VERBOSE_DEBUG){ + System.out.print(vv.getAccessor().getObject(j) + ", " + (j % 25 == 0 ? "\n batch:" + batchCounter + " v:" + j + " - " : "")); + } + assertField(vv, j, (TypeProtos.MinorType) currentField.type, + currentField.values[columnValCounter % 3], (String) currentField.name + "/"); + columnValCounter++; + } + if (VERBOSE_DEBUG){ + System.out.println("\n" + ((BaseDataValueVector)vv).getAccessor().getValueCount()); + } + valuesChecked.remove(vv.getField().getName()); + valuesChecked.put(vv.getField().getName(), columnValCounter); + } + + if (VERBOSE_DEBUG){ + for (i = 0; i < batchLoader.getRecordCount(); i++) { + recordCount++; + if (i % 50 == 0){ + System.out.println(); + for (VectorWrapper vw : batchLoader) { + ValueVector v = vw.getValueVector(); + System.out.print(pad(v.getField().getName(), 20) + " "); + + } + System.out.println(); + System.out.println(); + } + + for (VectorWrapper vw : batchLoader) { + ValueVector v = vw.getValueVector(); + System.out.print(pad(v.getAccessor().getObject(i).toString(), 20) + " "); + } + System.out.println( + + ); + } + } + batchCounter++; + if(result.getHeader().getIsLastChunk()){ + future.set(null); + } + } + + public void getResults() throws RpcException{ + try{ + future.get(); + }catch(Throwable t){ + throw RpcException.mapException(t); + } + } + } + + // specific tests should call this method, but it is not marked as a test itself intentionally + public void testParquetFullEngineEventBased(boolean generateNew, String plan, String filename, int numberOfTimesRead /* specified in json plan */, ParquetTestProperties props) throws Exception{ + testParquetFullEngine(generateNew, plan, null, filename, numberOfTimesRead, props); + } + + // specific tests should call this method, but it is not marked as a test itself intentionally + public void testParquetFullEngineEventBased(boolean generateNew, String plan, String readEntries, String filename, + int numberOfTimesRead /* specified in json plan */, ParquetTestProperties props) throws Exception{ + RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); + + if (generateNew) generateParquetFile(filename, props); + + DrillConfig config = DrillConfig.create(); + + try(Drillbit bit1 = new Drillbit(config, serviceSet); DrillClient client = new DrillClient(config, serviceSet.getCoordinator());){ + bit1.run(); + client.connect(); + RecordBatchLoader batchLoader = new RecordBatchLoader(bit1.getContext().getAllocator()); + ParquetResultListener resultListener = new ParquetResultListener(batchLoader, props); + long C = System.nanoTime(); + if (readEntries != null){ + client.runQuery(UserProtos.QueryType.LOGICAL, (Files.toString(FileUtils.getResourceAsFile(plan), Charsets.UTF_8).replaceFirst( "&REPLACED_IN_PARQUET_TEST&", readEntries)), resultListener); + } + else{ + client.runQuery(UserProtos.QueryType.LOGICAL, Files.toString(FileUtils.getResourceAsFile(plan), Charsets.UTF_8), resultListener); + } + resultListener.getResults(); + long D = System.nanoTime(); + System.out.println(String.format("Took %f s to run query", (float)(D-C) / 1E9)); + } + } + + // specific tests should call this method, but it is not marked as a test itself intentionally + public void testParquetFullEngine(boolean generateNew, String plan, String filename, int numberOfTimesRead /* specified in json plan */, ParquetTestProperties props) throws Exception{ + testParquetFullEngine(generateNew, plan, null, filename, numberOfTimesRead, props); + } + + // specific tests should call this method, but it is not marked as a test itself intentionally + public void testParquetFullEngine(boolean generateNew, String plan, String readEntries, String filename, + int numberOfTimesRead /* specified in json plan */, ParquetTestProperties props) throws Exception{ + RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); + + if (generateNew) generateParquetFile(filename, props); + + DrillConfig config = DrillConfig.create(); + + try(Drillbit bit1 = new Drillbit(config, serviceSet); DrillClient client = new DrillClient(config, serviceSet.getCoordinator())) { + long A = System.nanoTime(); + bit1.run(); + long B = System.nanoTime(); + client.connect(); + long C = System.nanoTime(); + List<QueryResultBatch> results; + // insert a variable number of reads + if (readEntries != null){ + results = client.runQuery(UserProtos.QueryType.LOGICAL, (Files.toString(FileUtils.getResourceAsFile(plan), Charsets.UTF_8).replaceFirst( "&REPLACED_IN_PARQUET_TEST&", readEntries))); + } + else{ + results = client.runQuery(UserProtos.QueryType.LOGICAL, Files.toString(FileUtils.getResourceAsFile(plan), Charsets.UTF_8)); + } +// List<QueryResultBatch> results = client.runQuery(UserProtos.QueryType.PHYSICAL, Files.toString(FileUtils.getResourceAsFile("/parquet_scan_union_screen_physical.json"), Charsets.UTF_8)); + long D = System.nanoTime(); + System.out.println(String.format("Took %f s to start drillbit", (float)(B-A) / 1E9)); + System.out.println(String.format("Took %f s to connect", (float)(C-B) / 1E9)); + System.out.println(String.format("Took %f s to run query", (float)(D-C) / 1E9)); + //List<QueryResultBatch> results = client.runQuery(UserProtos.QueryType.PHYSICAL, Files.toString(FileUtils.getResourceAsFile("/parquet_scan_union_screen_physical.json"), Charsets.UTF_8)); + int count = 0; +// RecordBatchLoader batchLoader = new RecordBatchLoader(new BootStrapContext(config).getAllocator()); + RecordBatchLoader batchLoader = new RecordBatchLoader(bit1.getContext().getAllocator()); + byte[] bytes; + + int batchCounter = 1; + int columnValCounter = 0; + int i = 0; + FieldInfo currentField; + HashMap<String, Integer> valuesChecked = new HashMap(); + for(QueryResultBatch b : results){ + + count += b.getHeader().getRowCount(); + boolean schemaChanged = batchLoader.load(b.getHeader().getDef(), b.getData()); + + int recordCount = 0; + // print headers. + if (schemaChanged) { + } // do not believe any change is needed for when the schema changes, with the current mock scan use case + + for (VectorWrapper vw : batchLoader) { + ValueVector vv = vw.getValueVector(); + currentField = props.fields.get(vv.getField().getName()); + if (VERBOSE_DEBUG){ + System.out.println("\n" + (String) currentField.name); + } + if ( ! valuesChecked.containsKey(vv.getField().getName())){ + valuesChecked.put(vv.getField().getName(), 0); + columnValCounter = 0; + } else { + columnValCounter = valuesChecked.get(vv.getField().getName()); + } + for (int j = 0; j < vv.getAccessor().getValueCount(); j++) { + if (VERBOSE_DEBUG){ + System.out.print(vv.getAccessor().getObject(j) + ", " + (j % 25 == 0 ? "\n batch:" + batchCounter + " v:" + j + " - " : "")); + } + assertField(vv, j, currentField.type, + currentField.values[columnValCounter % 3], currentField.name + "/"); + columnValCounter++; + } + if (VERBOSE_DEBUG){ + System.out.println("\n" + vv.getAccessor().getValueCount()); + } + valuesChecked.remove(vv.getField().getName()); + valuesChecked.put(vv.getField().getName(), columnValCounter); + } + + if (VERBOSE_DEBUG){ + for (i = 0; i < batchLoader.getRecordCount(); i++) { + recordCount++; + if (i % 50 == 0){ + System.out.println(); + for (VectorWrapper vw : batchLoader) { + ValueVector v = vw.getValueVector(); + System.out.print(pad(v.getField().getName(), 20) + " "); + + } + System.out.println(); + System.out.println(); + } + + for (VectorWrapper vw : batchLoader) { + ValueVector v = vw.getValueVector(); + System.out.print(pad(v.getAccessor().getObject(i) + "", 20) + " "); + } + System.out.println( + + ); + } + } + batchCounter++; + } + for (String s : valuesChecked.keySet()) { + assertEquals("Record count incorrect for column: " + s, props.recordsPerRowGroup * props.numberRowGroups * numberOfTimesRead, (long) valuesChecked.get(s)); + } + assert valuesChecked.keySet().size() > 0; + } + } + + public String pad(String value, int length) { + return pad(value, length, " "); + } + + public String pad(String value, int length, String with) { + StringBuilder result = new StringBuilder(length); + result.append(value); + + while (result.length() < length) { + result.insert(0, with); + } + + return result.toString(); + } + + class MockOutputMutator implements OutputMutator { + List<MaterializedField> removedFields = Lists.newArrayList(); + List<ValueVector> addFields = Lists.newArrayList(); + + @Override + public void removeField(MaterializedField field) throws SchemaChangeException { + removedFields.add(field); + } + + @Override + public void addField(ValueVector vector) throws SchemaChangeException { + addFields.add(vector); + } + + @Override + public void removeAllFields() { + addFields.clear(); + } + + @Override + public void setNewSchema() throws SchemaChangeException { + } + + List<MaterializedField> getRemovedFields() { + return removedFields; + } + + List<ValueVector> getAddFields() { + return addFields; + } + } + + private <T> void assertField(ValueVector valueVector, int index, TypeProtos.MinorType expectedMinorType, Object value, String name) { + assertField(valueVector, index, expectedMinorType, value, name, 0); + } + + private <T> void assertField(ValueVector valueVector, int index, TypeProtos.MinorType expectedMinorType, T value, String name, int parentFieldId) { +// UserBitShared.FieldMetadata metadata = valueVector.getMetadata(); +// SchemaDefProtos.FieldDef def = metadata.getDef(); +// assertEquals(expectedMinorType, def.getMajorType().getMinorType()); +// assertEquals(name, def.getNameList().get(0).getName()); +// assertEquals(parentFieldId, def.getParentId()); + + if (expectedMinorType == TypeProtos.MinorType.MAP) { + return; + } + + T val = (T) valueVector.getAccessor().getObject(index); + if (val instanceof String){ + assertEquals(value, val); + } + else if (val instanceof byte[]) { + assertTrue(Arrays.equals((byte[]) value, (byte[]) val)); + } else { + assertEquals(value, val); + } + } + + private class WrapAroundCounter { + + int maxVal; + int val; + + public WrapAroundCounter(int maxVal) { + this.maxVal = maxVal; + } + + public int increment() { + val++; + if (val > maxVal) { + val = 0; + } + return val; + } + + public void reset() { + val = 0; + } + + } +}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/63a90fe9/sandbox/prototype/exec/java-exec/src/test/resources/parquet_nullable.json ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/test/resources/parquet_nullable.json b/sandbox/prototype/exec/java-exec/src/test/resources/parquet_nullable.json new file mode 100644 index 0000000..5822bdb --- /dev/null +++ b/sandbox/prototype/exec/java-exec/src/test/resources/parquet_nullable.json @@ -0,0 +1,41 @@ +{ + head:{ + type:"APACHE_DRILL_LOGICAL", + version:"1", + generator:{ + type:"manual", + info:"na" + } + }, + storage:{ + "parquet" : + { + "type":"parquet", + "dfsName" : "file:///" + } + }, + query:[ + { + @id:"1", + op:"scan", + memo:"initial_scan", + storageengine:"parquet", + selection: [ + { + path: "/tmp/nullable_test.parquet" + } + ] + }, + { + @id:"2", + input: 1, + op: "store", + memo: "output sink", + target: { + file: "console:///stdout" + } + + } + + ] +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/63a90fe9/sandbox/prototype/exec/java-exec/src/test/resources/parquet_scan_screen.json ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/test/resources/parquet_scan_screen.json b/sandbox/prototype/exec/java-exec/src/test/resources/parquet_scan_screen.json new file mode 100644 index 0000000..f18c738 --- /dev/null +++ b/sandbox/prototype/exec/java-exec/src/test/resources/parquet_scan_screen.json @@ -0,0 +1,41 @@ +{ + head:{ + type:"APACHE_DRILL_LOGICAL", + version:"1", + generator:{ + type:"manual", + info:"na" + } + }, + storage:{ + "parquet" : + { + "type":"parquet", + "dfsName" : "file:///" + } + }, + query:[ + { + @id:"1", + op:"scan", + memo:"initial_scan", + storageengine:"parquet", + selection: [ + { + path: "/tmp/test.parquet" + } + ] + }, + { + @id:"2", + input: 1, + op: "store", + memo: "output sink", + target: { + file: "console:///stdout" + } + + } + + ] +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/63a90fe9/sandbox/prototype/exec/java-exec/src/test/resources/parquet_scan_screen_read_entry_replace.json ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/test/resources/parquet_scan_screen_read_entry_replace.json b/sandbox/prototype/exec/java-exec/src/test/resources/parquet_scan_screen_read_entry_replace.json new file mode 100644 index 0000000..af76e01 --- /dev/null +++ b/sandbox/prototype/exec/java-exec/src/test/resources/parquet_scan_screen_read_entry_replace.json @@ -0,0 +1,39 @@ +{ + head:{ + type:"APACHE_DRILL_LOGICAL", + version:"1", + generator:{ + type:"manual", + info:"na" + } + }, + storage:{ + "parquet" : + { + "type":"parquet", + "dfsName" : "file:///" + } + }, + query:[ + { + @id:"1", + op:"scan", + memo:"initial_scan", + storageengine:"parquet", + selection: [ + &REPLACED_IN_PARQUET_TEST& + ] + }, + { + @id:"2", + input: 1, + op: "store", + memo: "output sink", + target: { + file: "console:///stdout" + } + + } + + ] +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/63a90fe9/sandbox/prototype/exec/java-exec/src/test/resources/parquet_scan_union_screen_physical.json ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/test/resources/parquet_scan_union_screen_physical.json b/sandbox/prototype/exec/java-exec/src/test/resources/parquet_scan_union_screen_physical.json new file mode 100644 index 0000000..954082c --- /dev/null +++ b/sandbox/prototype/exec/java-exec/src/test/resources/parquet_scan_union_screen_physical.json @@ -0,0 +1,35 @@ +{ + head : { + type : "APACHE_DRILL_PHYSICAL", + version : 1, + generator : { + type : "manual" + } + }, + graph : [ { + pop : "parquet-scan", + @id : 1, + entries : [ + { + path : "/tmp/testParquetFile_many_types_3" + }, + { + path : "/tmp/testParquetFile_many_types_3" + } + ], + storageengine:{ + "type":"parquet", + "dfsName" : "maprfs:///" + } + }, + { + "@id": 2, + "child": 1, + "pop": "union-exchange" + }, + { + pop : "screen", + @id : 3, + child : 2 + } ] +} \ No newline at end of file
