http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/402be7e0/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/ParquetRecordReaderTest.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/ParquetRecordReaderTest.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/ParquetRecordReaderTest.java deleted file mode 100644 index 75a52c5..0000000 --- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/ParquetRecordReaderTest.java +++ /dev/null @@ -1,594 +0,0 @@ -/******************************************************************************* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - ******************************************************************************/ -package org.apache.drill.exec.store; - -import com.beust.jcommander.internal.Lists; -import com.google.common.base.Charsets; -import com.google.common.io.Files; - -import com.google.common.util.concurrent.SettableFuture; -import org.apache.drill.common.config.DrillConfig; -import org.apache.drill.common.types.TypeProtos; -import org.apache.drill.common.util.FileUtils; -import org.apache.drill.exec.client.DrillClient; -import org.apache.drill.exec.exception.SchemaChangeException; -import org.apache.drill.exec.physical.impl.OutputMutator; - -import org.apache.drill.exec.proto.UserProtos; -import org.apache.drill.exec.record.MaterializedField; -import org.apache.drill.exec.record.RecordBatchLoader; -import org.apache.drill.exec.record.VectorWrapper; -import org.apache.drill.exec.rpc.RpcException; -import org.apache.drill.exec.rpc.user.QueryResultBatch; -import org.apache.drill.exec.rpc.user.UserResultsListener; -import org.apache.drill.exec.server.BootStrapContext; -import org.apache.drill.exec.server.Drillbit; -import org.apache.drill.exec.server.RemoteServiceSet; - -import org.apache.drill.exec.store.parquet.ParquetStorageEngine; -import org.apache.drill.exec.vector.BaseDataValueVector; -import org.apache.drill.exec.vector.ValueVector; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.junit.Test; -import parquet.bytes.BytesInput; -import parquet.column.ColumnDescriptor; -import parquet.column.page.Page; -import parquet.column.page.PageReadStore; -import parquet.column.page.PageReader; -import parquet.hadoop.Footer; - -import parquet.hadoop.ParquetFileWriter; -import parquet.hadoop.metadata.CompressionCodecName; -import parquet.hadoop.metadata.ParquetMetadata; -import parquet.schema.MessageType; -import parquet.schema.MessageTypeParser; - -import java.io.File; -import java.io.IOException; -import java.text.SimpleDateFormat; -import java.util.*; - -import static org.junit.Assert.*; -import static org.junit.Assert.assertEquals; -import static parquet.column.Encoding.PLAIN; - -public class ParquetRecordReaderTest { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StorageEngineRegistry.class); - - private boolean VERBOSE_DEBUG = false; - - @Test - public void testMultipleRowGroupsAndReads() throws Exception { - testParquetFullEngine(true, "/parquet_scan_screen.json", "/tmp/testParquetFile_many_types_3", 2, numberRowGroups, recordsPerRowGroup); - } - - @Test - public void testMultipleRowGroupsAndReadsEvent() throws Exception { - testParquetFullEngineEventBased(true, "/parquet_scan_screen.json", "/tmp/testParquetFile_many_types_3", 2, numberRowGroups, recordsPerRowGroup); - } - - int numberRowGroups = 20; - static int recordsPerRowGroup = 3000000; - - // 10 mb per page - static int bytesPerPage = 1024 * 1024 * 10; - // { 00000001, 00000010, 00000100, 00001000, 00010000, ... } - byte[] bitFields = {1, 2, 4, 8, 16, 32, 64, -128}; - static final byte allBitsTrue = -1; - static final byte allBitsFalse = 0; - static final byte[] varLen1 = {50, 51, 52, 53, 54, 55, 56}; - static final byte[] varLen2 = {15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1}; - static final byte[] varLen3 = {100, 99, 98}; - - private static class FieldInfo { - - String parquetType; - String name; - int bitLength; - int numberOfPages; - Object[] values; - TypeProtos.MinorType type; - - FieldInfo(String parquetType, String name, int bitLength, Object[] values, TypeProtos.MinorType type){ - this.parquetType = parquetType; - this.name = name; - this.bitLength = bitLength; - this.numberOfPages = Math.max(1, (int) Math.ceil(recordsPerRowGroup * bitLength / 8.0 / bytesPerPage)); - this.values = values; - // generator is designed to use 3 values - assert values.length == 3; - this.type = type; - } - } - - - private static HashMap<String, FieldInfo> fields = new HashMap<>(); - static { - Object[] intVals = {-200, 100, Integer.MAX_VALUE }; - Object[] longVals = { -5000l, 5000l, Long.MAX_VALUE}; - Object[] floatVals = { 1.74f, Float.MAX_VALUE, Float.MIN_VALUE}; - Object[] doubleVals = {100.45d, Double.MAX_VALUE, Double.MIN_VALUE,}; - Object[] boolVals = {false, false, true}; - Object[] binVals = { varLen1, varLen2, varLen3}; - Object[] bin2Vals = { varLen3, varLen2, varLen1}; - fields.put("integer/", new FieldInfo("int32", "integer", 32, intVals, TypeProtos.MinorType.INT)); - fields.put("bigInt/", new FieldInfo("int64", "bigInt", 64, longVals, TypeProtos.MinorType.BIGINT)); - fields.put("f/", new FieldInfo("float", "f", 32, floatVals, TypeProtos.MinorType.FLOAT4)); - fields.put("d/", new FieldInfo("double", "d", 64, doubleVals, TypeProtos.MinorType.FLOAT8)); -// fields.put("b/", new FieldInfo("binary", "b", 1, boolVals, TypeProtos.MinorType.BIT)); - fields.put("bin/", new FieldInfo("binary", "bin", -1, binVals, TypeProtos.MinorType.VARBINARY)); - fields.put("bin2/", new FieldInfo("binary", "bin2", -1, bin2Vals, TypeProtos.MinorType.VARBINARY)); - } - - - private String getResource(String resourceName) { - return "resource:" + resourceName; - } - - public void generateParquetFile(String filename, int numberRowGroups, int recordsPerRowGroup) throws Exception { - - int currentBooleanByte = 0; - WrapAroundCounter booleanBitCounter = new WrapAroundCounter(7); - - Configuration configuration = new Configuration(); - configuration.set(ParquetStorageEngine.HADOOP_DEFAULT_NAME, "file:///"); - //"message m { required int32 integer; required int64 integer64; required boolean b; required float f; required double d;}" - - FileSystem fs = FileSystem.get(configuration); - Path path = new Path(filename); - if (fs.exists(path)) fs.delete(path, false); - - - String messageSchema = "message m {"; - for (FieldInfo fieldInfo : fields.values()) { - messageSchema += " required " + fieldInfo.parquetType + " " + fieldInfo.name + ";"; - } - // remove the last semicolon, java really needs a join method for strings... - // TODO - nvm apparently it requires a semicolon after every field decl, might want to file a bug - //messageSchema = messageSchema.substring(schemaType, messageSchema.length() - 1); - messageSchema += "}"; - - MessageType schema = MessageTypeParser.parseMessageType(messageSchema); - - CompressionCodecName codec = CompressionCodecName.UNCOMPRESSED; - ParquetFileWriter w = new ParquetFileWriter(configuration, schema, path); - w.start(); - HashMap<String, Integer> columnValuesWritten = new HashMap(); - int valsWritten; - for (int k = 0; k < numberRowGroups; k++){ - w.startBlock(1); - - for (FieldInfo fieldInfo : fields.values()) { - - if ( ! columnValuesWritten.containsKey(fieldInfo.name)){ - columnValuesWritten.put((String) fieldInfo.name, 0); - valsWritten = 0; - } else { - valsWritten = columnValuesWritten.get(fieldInfo.name); - } - - String[] path1 = {(String) fieldInfo.name}; - ColumnDescriptor c1 = schema.getColumnDescription(path1); - - w.startColumn(c1, recordsPerRowGroup, codec); - int valsPerPage = (int) Math.ceil(recordsPerRowGroup / (float) ((int) fieldInfo.numberOfPages)); - byte[] bytes; - // for variable length binary fields - int bytesNeededToEncodeLength = 4; - if ((int) fieldInfo.bitLength > 0) { - bytes = new byte[(int) Math.ceil(valsPerPage * (int) fieldInfo.bitLength / 8.0)]; - } else { - // the twelve at the end is to account for storing a 4 byte length with each value - int totalValLength = ((byte[]) fieldInfo.values[0]).length + ((byte[]) fieldInfo.values[1]).length + ((byte[]) fieldInfo.values[2]).length + 3 * bytesNeededToEncodeLength; - // used for the case where there is a number of values in this row group that is not divisible by 3 - int leftOverBytes = 0; - if ( valsPerPage % 3 > 0 ) leftOverBytes += ((byte[])fieldInfo.values[1]).length + 4; - if ( valsPerPage % 3 > 1 ) leftOverBytes += ((byte[])fieldInfo.values[2]).length + 4; - bytes = new byte[valsPerPage / 3 * totalValLength + leftOverBytes]; - } - int bytesPerPage = (int) (valsPerPage * ((int) fieldInfo.bitLength / 8.0)); - int bytesWritten = 0; - for (int z = 0; z < (int) fieldInfo.numberOfPages; z++, bytesWritten = 0) { - for (int i = 0; i < valsPerPage; i++) { - //System.out.print(i + ", " + (i % 25 == 0 ? "\n gen " + fieldInfo.name + ": " : "")); - if (fieldInfo.values[0] instanceof Boolean) { - - bytes[currentBooleanByte] |= bitFields[booleanBitCounter.val] & ((boolean) fieldInfo.values[valsWritten % 3] - ? allBitsTrue : allBitsFalse); - booleanBitCounter.increment(); - if (booleanBitCounter.val == 0) { - currentBooleanByte++; - } - valsWritten++; - if (currentBooleanByte > bytesPerPage) break; - } else { - if (fieldInfo.values[valsWritten % 3] instanceof byte[]){ - System.arraycopy(ByteArrayUtil.toByta(((byte[])fieldInfo.values[valsWritten % 3]).length), - 0, bytes, bytesWritten, bytesNeededToEncodeLength); - System.arraycopy(fieldInfo.values[valsWritten % 3], - 0, bytes, bytesWritten + bytesNeededToEncodeLength, ((byte[])fieldInfo.values[valsWritten % 3]).length); - bytesWritten += ((byte[])fieldInfo.values[valsWritten % 3]).length + bytesNeededToEncodeLength; - } - else{ - System.arraycopy( ByteArrayUtil.toByta(fieldInfo.values[valsWritten % 3]), - 0, bytes, i * ((int) fieldInfo.bitLength / 8), (int) fieldInfo.bitLength / 8); - } - valsWritten++; - } - - } - w.writeDataPage((int)(recordsPerRowGroup / (int) fieldInfo.numberOfPages), bytes.length, BytesInput.from(bytes), PLAIN, PLAIN, PLAIN); - currentBooleanByte = 0; - } - w.endColumn(); - columnValuesWritten.remove((String) fieldInfo.name); - columnValuesWritten.put((String) fieldInfo.name, valsWritten); - } - - w.endBlock(); - } - w.end(new HashMap<String, String>()); - logger.debug("Finished generating parquet file."); - } - - private class ParquetResultListener implements UserResultsListener { - private Vector<QueryResultBatch> results = new Vector<QueryResultBatch>(); - private SettableFuture<List<QueryResultBatch>> future = SettableFuture.create(); - int count = 0; - RecordBatchLoader batchLoader; - byte[] bytes; - - int batchCounter = 1; - int columnValCounter = 0; - int i = 0; - FieldInfo currentField; - HashMap<String, Integer> valuesChecked = new HashMap(); - - ParquetResultListener(RecordBatchLoader batchLoader){ - this.batchLoader = batchLoader; - } - - @Override - public void submissionFailed(RpcException ex) { - logger.debug("Submission failed.", ex); - future.setException(ex); - } - - @Override - public void resultArrived(QueryResultBatch result) { - logger.debug("result arrived in test batch listener."); - int columnValCounter = 0; - int i = 0; - FieldInfo currentField; - count += result.getHeader().getRowCount(); - boolean schemaChanged = false; - try { - schemaChanged = batchLoader.load(result.getHeader().getDef(), result.getData()); - } catch (SchemaChangeException e) { - e.printStackTrace(); - } - - int recordCount = 0; - // print headers. - if (schemaChanged) { - } // do not believe any change is needed for when the schema changes, with the current mock scan use case - - for (VectorWrapper vw : batchLoader) { - ValueVector vv = vw.getValueVector(); - currentField = fields.get(vv.getField().getName()); - if (VERBOSE_DEBUG){ - System.out.println("\n" + (String) currentField.name); - } - if ( ! valuesChecked.containsKey(vv.getField().getName())){ - valuesChecked.put(vv.getField().getName(), 0); - columnValCounter = 0; - } else { - columnValCounter = valuesChecked.get(vv.getField().getName()); - } - for (int j = 0; j < ((BaseDataValueVector)vv).getAccessor().getValueCount(); j++) { - if (VERBOSE_DEBUG){ - System.out.print(vv.getAccessor().getObject(j) + ", " + (j % 25 == 0 ? "\n batch:" + batchCounter + " v:" + j + " - " : "")); - } - assertField(vv, j, (TypeProtos.MinorType) currentField.type, - currentField.values[columnValCounter % 3], (String) currentField.name + "/"); - columnValCounter++; - } - if (VERBOSE_DEBUG){ - System.out.println("\n" + ((BaseDataValueVector)vv).getAccessor().getValueCount()); - } - valuesChecked.remove(vv.getField().getName()); - valuesChecked.put(vv.getField().getName(), columnValCounter); - } - - if (VERBOSE_DEBUG){ - for (i = 0; i < batchLoader.getRecordCount(); i++) { - recordCount++; - if (i % 50 == 0){ - System.out.println(); - for (VectorWrapper vw : batchLoader) { - ValueVector v = vw.getValueVector(); - System.out.print(pad(v.getField().getName(), 20) + " "); - - } - System.out.println(); - System.out.println(); - } - - for (VectorWrapper vw : batchLoader) { - ValueVector v = vw.getValueVector(); - System.out.print(pad(v.getAccessor().getObject(i).toString(), 20) + " "); - } - System.out.println( - - ); - } - } - batchCounter++; - if(result.getHeader().getIsLastChunk()){ - future.set(results); - } - } - - public List<QueryResultBatch> getResults() throws RpcException{ - try{ - return future.get(); - }catch(Throwable t){ - throw RpcException.mapException(t); - } - } - } - - // specific tests should call this method, but it is not marked as a test itself intentionally - public void testParquetFullEngineEventBased(boolean generateNew, String plan, String filename, int numberOfTimesRead /* specified in json plan */, int numberRowGroups, int recordsPerRowGroup) throws Exception{ - RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); - - if (generateNew) generateParquetFile(filename, numberRowGroups, recordsPerRowGroup); - - DrillConfig config = DrillConfig.create(); - - try(Drillbit bit1 = new Drillbit(config, serviceSet); DrillClient client = new DrillClient(config, serviceSet.getCoordinator());){ - bit1.run(); - client.connect(); - List<QueryResultBatch> results = client.runQuery(UserProtos.QueryType.LOGICAL, Files.toString(FileUtils.getResourceAsFile(plan), Charsets.UTF_8)); - int count = 0; - RecordBatchLoader batchLoader = new RecordBatchLoader(bit1.getContext().getAllocator()); - ParquetResultListener resultListener = new ParquetResultListener(batchLoader); - client.runQuery(UserProtos.QueryType.LOGICAL, Files.toString(FileUtils.getResourceAsFile(plan), Charsets.UTF_8), resultListener); - } - } - - - // specific tests should call this method, but it is not marked as a test itself intentionally - public void testParquetFullEngine(boolean generateNew, String plan, String filename, int numberOfTimesRead /* specified in json plan */, int numberRowGroups, int recordsPerRowGroup) throws Exception{ - RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); - - if (generateNew) generateParquetFile(filename, numberRowGroups, recordsPerRowGroup); - - DrillConfig config = DrillConfig.create(); - - try(Drillbit bit1 = new Drillbit(config, serviceSet); DrillClient client = new DrillClient(config, serviceSet.getCoordinator())) { - long A = System.nanoTime(); - bit1.run(); - long B = System.nanoTime(); - client.connect(); - long C = System.nanoTime(); - System.out.println( new SimpleDateFormat("mm:ss S").format(new Date()) + " :Start query"); - List<QueryResultBatch> results = client.runQuery(UserProtos.QueryType.LOGICAL, Files.toString(FileUtils.getResourceAsFile("/parquet_scan_screen.json"), Charsets.UTF_8)); -// List<QueryResultBatch> results = client.runQuery(UserProtos.QueryType.PHYSICAL, Files.toString(FileUtils.getResourceAsFile("/parquet_scan_union_screen_physical.json"), Charsets.UTF_8)); - long D = System.nanoTime(); - System.out.println(String.format("Took %f s to start drillbit", (float)(B-A) / 1E9)); - System.out.println(String.format("Took %f s to connect", (float)(C-B) / 1E9)); - System.out.println(String.format("Took %f s to run query", (float)(D-C) / 1E9)); - //List<QueryResultBatch> results = client.runQuery(UserProtos.QueryType.PHYSICAL, Files.toString(FileUtils.getResourceAsFile("/parquet_scan_union_screen_physical.json"), Charsets.UTF_8)); - int count = 0; -// RecordBatchLoader batchLoader = new RecordBatchLoader(new BootStrapContext(config).getAllocator()); - RecordBatchLoader batchLoader = new RecordBatchLoader(bit1.getContext().getAllocator()); - byte[] bytes; - - int batchCounter = 1; - int columnValCounter = 0; - int i = 0; - FieldInfo currentField; - HashMap<String, Integer> valuesChecked = new HashMap(); - for(QueryResultBatch b : results){ - count += b.getHeader().getRowCount(); - boolean schemaChanged = batchLoader.load(b.getHeader().getDef(), b.getData()); - - int recordCount = 0; - // print headers. - if (schemaChanged) { - } // do not believe any change is needed for when the schema changes, with the current mock scan use case - - for (VectorWrapper vw : batchLoader) { - ValueVector vv = vw.getValueVector(); - currentField = fields.get(vv.getField().getName()); - if (VERBOSE_DEBUG){ - System.out.println("\n" + (String) currentField.name); - } - if ( ! valuesChecked.containsKey(vv.getField().getName())){ - valuesChecked.put(vv.getField().getName(), 0); - columnValCounter = 0; - } else { - columnValCounter = valuesChecked.get(vv.getField().getName()); - } - for (int j = 0; j < ((BaseDataValueVector)vv).getAccessor().getValueCount(); j++) { - if (VERBOSE_DEBUG){ - System.out.print(vv.getAccessor().getObject(j) + ", " + (j % 25 == 0 ? "\n batch:" + batchCounter + " v:" + j + " - " : "")); - } - assertField(vv, j, (TypeProtos.MinorType) currentField.type, - currentField.values[columnValCounter % 3], (String) currentField.name + "/"); - columnValCounter++; - } - if (VERBOSE_DEBUG){ - System.out.println("\n" + ((BaseDataValueVector)vv).getAccessor().getValueCount()); - } - valuesChecked.remove(vv.getField().getName()); - valuesChecked.put(vv.getField().getName(), columnValCounter); - } - - if (VERBOSE_DEBUG){ - for (i = 1; i < batchLoader.getRecordCount(); i++) { - recordCount++; - if (i % 50 == 0){ - System.out.println(); - for (VectorWrapper vw : batchLoader) { - ValueVector v = vw.getValueVector(); - System.out.print(pad(v.getField().getName(), 20) + " "); - - } - System.out.println(); - System.out.println(); - } - - for (VectorWrapper vw : batchLoader) { - ValueVector v = vw.getValueVector(); - System.out.print(pad(v.getAccessor().getObject(i).toString(), 20) + " "); - } - System.out.println( - - ); - } - } - batchCounter++; - } - for (String s : valuesChecked.keySet()) { - assertEquals("Record count incorrect for column: " + s, recordsPerRowGroup * numberRowGroups * numberOfTimesRead, (long) valuesChecked.get(s)); - } - assert valuesChecked.keySet().size() > 0; - } - } - - public String pad(String value, int length) { - return pad(value, length, " "); - } - - public String pad(String value, int length, String with) { - StringBuilder result = new StringBuilder(length); - result.append(value); - - while (result.length() < length) { - result.insert(0, with); - } - - return result.toString(); - } - - class MockOutputMutator implements OutputMutator { - List<MaterializedField> removedFields = Lists.newArrayList(); - List<ValueVector> addFields = Lists.newArrayList(); - - @Override - public void removeField(MaterializedField field) throws SchemaChangeException { - removedFields.add(field); - } - - @Override - public void addField(ValueVector vector) throws SchemaChangeException { - addFields.add(vector); - } - - @Override - public void removeAllFields() { - addFields.clear(); - } - - @Override - public void setNewSchema() throws SchemaChangeException { - } - - List<MaterializedField> getRemovedFields() { - return removedFields; - } - - List<ValueVector> getAddFields() { - return addFields; - } - } - - private <T> void assertField(ValueVector valueVector, int index, TypeProtos.MinorType expectedMinorType, Object value, String name) { - assertField(valueVector, index, expectedMinorType, value, name, 0); - } - - private <T> void assertField(ValueVector valueVector, int index, TypeProtos.MinorType expectedMinorType, T value, String name, int parentFieldId) { -// UserBitShared.FieldMetadata metadata = valueVector.getMetadata(); -// SchemaDefProtos.FieldDef def = metadata.getDef(); -// assertEquals(expectedMinorType, def.getMajorType().getMinorType()); -// assertEquals(name, def.getNameList().get(0).getName()); -// assertEquals(parentFieldId, def.getParentId()); - - if (expectedMinorType == TypeProtos.MinorType.MAP) { - return; - } - - T val = (T) valueVector.getAccessor().getObject(index); - if (val instanceof byte[]) { - assertTrue(Arrays.equals((byte[]) value, (byte[]) val)); - } else { - assertEquals(value, val); - } - } - - private class WrapAroundCounter { - - int maxVal; - int val; - - public WrapAroundCounter(int maxVal) { - this.maxVal = maxVal; - } - - public int increment() { - val++; - if (val > maxVal) { - val = 0; - } - return val; - } - - public void reset() { - val = 0; - } - - } - - private void validateFooters(final List<Footer> metadata) { - logger.debug(metadata.toString()); - assertEquals(3, metadata.size()); - for (Footer footer : metadata) { - final File file = new File(footer.getFile().toUri()); - assertTrue(file.getName(), file.getName().startsWith("part")); - assertTrue(file.getPath(), file.exists()); - final ParquetMetadata parquetMetadata = footer.getParquetMetadata(); - assertEquals(2, parquetMetadata.getBlocks().size()); - final Map<String, String> keyValueMetaData = parquetMetadata.getFileMetaData().getKeyValueMetaData(); - assertEquals("bar", keyValueMetaData.get("foo")); - assertEquals(footer.getFile().getName(), keyValueMetaData.get(footer.getFile().getName())); - } - } - - private void validateContains(MessageType schema, PageReadStore pages, String[] path, int values, BytesInput bytes) - throws IOException { - PageReader pageReader = pages.getPageReader(schema.getColumnDescription(path)); - Page page = pageReader.readPage(); - assertEquals(values, page.getValueCount()); - assertArrayEquals(bytes.toByteArray(), page.getBytes().toByteArray()); - } - -}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/402be7e0/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java new file mode 100644 index 0000000..0e31cdd --- /dev/null +++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java @@ -0,0 +1,347 @@ +/******************************************************************************* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ******************************************************************************/ +package org.apache.drill.exec.store.parquet; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Vector; + +import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.common.types.TypeProtos; +import org.apache.drill.common.util.FileUtils; +import org.apache.drill.exec.client.DrillClient; +import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.physical.impl.OutputMutator; +import org.apache.drill.exec.proto.UserProtos; +import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.record.RecordBatchLoader; +import org.apache.drill.exec.record.VectorWrapper; +import org.apache.drill.exec.rpc.RpcException; +import org.apache.drill.exec.rpc.user.QueryResultBatch; +import org.apache.drill.exec.rpc.user.UserResultsListener; +import org.apache.drill.exec.server.Drillbit; +import org.apache.drill.exec.server.RemoteServiceSet; +import org.apache.drill.exec.store.parquet.TestFileGenerator.FieldInfo; +import org.apache.drill.exec.vector.BaseDataValueVector; +import org.apache.drill.exec.vector.ValueVector; +import org.junit.Test; + +import parquet.bytes.BytesInput; +import parquet.column.page.Page; +import parquet.column.page.PageReadStore; +import parquet.column.page.PageReader; +import parquet.hadoop.Footer; +import parquet.hadoop.metadata.ParquetMetadata; +import parquet.schema.MessageType; + +import com.google.common.base.Charsets; +import com.google.common.collect.Lists; +import com.google.common.io.Files; +import com.google.common.util.concurrent.SettableFuture; + +public class ParquetRecordReaderTest { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetRecordReaderTest.class); + + private boolean VERBOSE_DEBUG = false; + + + public static void main(String[] args) throws Exception{ + new ParquetRecordReaderTest().testMultipleRowGroupsAndReadsEvent(); + } + + @Test + public void testMultipleRowGroupsAndReadsEvent() throws Exception { + String planName = "/parquet_scan_screen.json"; + String fileName = "/tmp/testParquetFile_many_types_3"; + int numberRowGroups = 20; + int recordsPerRowGroup = 300000; + //TestFileGenerator.generateParquetFile(fileName, numberRowGroups, recordsPerRowGroup); + testParquetFullEngineLocal(planName, fileName, 2, numberRowGroups, recordsPerRowGroup); + } + + private class ParquetResultListener implements UserResultsListener { + private Vector<QueryResultBatch> results = new Vector<QueryResultBatch>(); + private SettableFuture<Void> future = SettableFuture.create(); + int count = 0; + RecordBatchLoader batchLoader; + byte[] bytes; + + int numberRowGroups; + int numberOfTimesRead; + int batchCounter = 1; + int columnValCounter = 0; + int i = 0; + private FieldInfo currentField; + private final HashMap<String, Long> valuesChecked = new HashMap<>(); + private final int recordsPerRowGroup; + private final Map<String, FieldInfo> fields; + private final long totalRecords; + + ParquetResultListener(int recordsPerRowGroup, RecordBatchLoader batchLoader, int numberRowGroups, int numberOfTimesRead){ + this.batchLoader = batchLoader; + this.fields = TestFileGenerator.getFieldMap(recordsPerRowGroup); + this.recordsPerRowGroup = recordsPerRowGroup; + this.numberRowGroups = numberRowGroups; + this.numberOfTimesRead = numberOfTimesRead; + this.totalRecords = recordsPerRowGroup * numberRowGroups * numberOfTimesRead; + } + + @Override + public void submissionFailed(RpcException ex) { + logger.debug("Submission failed.", ex); + future.setException(ex); + } + + @Override + public void resultArrived(QueryResultBatch result) { + long columnValCounter = 0; + int i = 0; + FieldInfo currentField; + count += result.getHeader().getRowCount(); + boolean schemaChanged = false; + try { + schemaChanged = batchLoader.load(result.getHeader().getDef(), result.getData()); + } catch (SchemaChangeException e) { + logger.error("Failure while loading batch", e); + } + + int recordCount = 0; + // print headers. + if (schemaChanged) { + } // do not believe any change is needed for when the schema changes, with the current mock scan use case + + for (VectorWrapper vw : batchLoader) { + ValueVector vv = vw.getValueVector(); + currentField = fields.get(vv.getField().getName()); + if (VERBOSE_DEBUG){ + System.out.println("\n" + (String) currentField.name); + } + if ( ! valuesChecked.containsKey(vv.getField().getName())){ + valuesChecked.put(vv.getField().getName(), (long) 0); + columnValCounter = 0; + } else { + columnValCounter = valuesChecked.get(vv.getField().getName()); + } + for (int j = 0; j < ((BaseDataValueVector)vv).getAccessor().getValueCount(); j++) { + if (VERBOSE_DEBUG){ + System.out.print(vv.getAccessor().getObject(j) + ", " + (j % 25 == 0 ? "\n batch:" + batchCounter + " v:" + j + " - " : "")); + } + assertField(vv, j, (TypeProtos.MinorType) currentField.type, + currentField.values[(int) (columnValCounter % 3)], (String) currentField.name + "/"); + columnValCounter++; + } + if (VERBOSE_DEBUG){ + System.out.println("\n" + ((BaseDataValueVector)vv).getAccessor().getValueCount()); + } + valuesChecked.remove(vv.getField().getName()); + valuesChecked.put(vv.getField().getName(), columnValCounter); + } + + + if (VERBOSE_DEBUG){ + for (i = 0; i < batchLoader.getRecordCount(); i++) { + recordCount++; + if (i % 50 == 0){ + System.out.println(); + for (VectorWrapper<?> vw : batchLoader) { + ValueVector v = vw.getValueVector(); + System.out.print(pad(v.getField().getName(), 20) + " "); + } + System.out.println(); + System.out.println(); + } + + for (VectorWrapper<?> vw : batchLoader) { + ValueVector v = vw.getValueVector(); + System.out.print(pad(v.getAccessor().getObject(i).toString(), 20) + " "); + } + System.out.println( + + ); + } + } + + for(VectorWrapper<?> vw : batchLoader){ + vw.release(); + } + result.release(); + + batchCounter++; + if(result.getHeader().getIsLastChunk()){ + for (String s : valuesChecked.keySet()) { + assertEquals("Record count incorrect for column: " + s, totalRecords, (long) valuesChecked.get(s)); + } + + assert valuesChecked.keySet().size() > 0; + future.set(null); + } + } + + public void get() throws RpcException{ + try{ + future.get(); + return; + }catch(Throwable t){ + throw RpcException.mapException(t); + } + } + } + + + public void testParquetFullEngineRemote(String plan, String filename, int numberOfTimesRead /* specified in json plan */, int numberOfRowGroups, int recordsPerRowGroup) throws Exception{ + + DrillConfig config = DrillConfig.create(); + + try(DrillClient client = new DrillClient(config);){ + client.connect(); + RecordBatchLoader batchLoader = new RecordBatchLoader(client.getAllocator()); + ParquetResultListener resultListener = new ParquetResultListener(recordsPerRowGroup, batchLoader, numberOfRowGroups, numberOfTimesRead); + client.runQuery(UserProtos.QueryType.LOGICAL, Files.toString(FileUtils.getResourceAsFile(plan), Charsets.UTF_8), resultListener); + resultListener.get(); + } + + } + + // specific tests should call this method, but it is not marked as a test itself intentionally + public void testParquetFullEngineLocal(String plan, String filename, int numberOfTimesRead /* specified in json plan */, int numberOfRowGroups, int recordsPerRowGroup) throws Exception{ + + RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); + + DrillConfig config = DrillConfig.create(); + + try(Drillbit bit1 = new Drillbit(config, serviceSet); DrillClient client = new DrillClient(config, serviceSet.getCoordinator());){ + bit1.run(); + client.connect(); + RecordBatchLoader batchLoader = new RecordBatchLoader(client.getAllocator()); + ParquetResultListener resultListener = new ParquetResultListener(recordsPerRowGroup, batchLoader, numberOfRowGroups, numberOfTimesRead); + client.runQuery(UserProtos.QueryType.LOGICAL, Files.toString(FileUtils.getResourceAsFile(plan), Charsets.UTF_8), resultListener); + resultListener.get(); + } + + } + + + + public String pad(String value, int length) { + return pad(value, length, " "); + } + + public String pad(String value, int length, String with) { + StringBuilder result = new StringBuilder(length); + result.append(value); + + while (result.length() < length) { + result.insert(0, with); + } + + return result.toString(); + } + + class MockOutputMutator implements OutputMutator { + List<MaterializedField> removedFields = Lists.newArrayList(); + List<ValueVector> addFields = Lists.newArrayList(); + + @Override + public void removeField(MaterializedField field) throws SchemaChangeException { + removedFields.add(field); + } + + @Override + public void addField(ValueVector vector) throws SchemaChangeException { + addFields.add(vector); + } + + @Override + public void removeAllFields() { + addFields.clear(); + } + + @Override + public void setNewSchema() throws SchemaChangeException { + } + + List<MaterializedField> getRemovedFields() { + return removedFields; + } + + List<ValueVector> getAddFields() { + return addFields; + } + } + + private <T> void assertField(ValueVector valueVector, int index, TypeProtos.MinorType expectedMinorType, Object value, String name) { + assertField(valueVector, index, expectedMinorType, value, name, 0); + } + + @SuppressWarnings("unchecked") + private <T> void assertField(ValueVector valueVector, int index, TypeProtos.MinorType expectedMinorType, T value, String name, int parentFieldId) { +// UserBitShared.FieldMetadata metadata = valueVector.getMetadata(); +// SchemaDefProtos.FieldDef def = metadata.getDef(); +// assertEquals(expectedMinorType, def.getMajorType().getMinorType()); +// assertEquals(name, def.getNameList().get(0).getName()); +// assertEquals(parentFieldId, def.getParentId()); + + if (expectedMinorType == TypeProtos.MinorType.MAP) { + return; + } + + T val = (T) valueVector.getAccessor().getObject(index); + if (val instanceof byte[]) { + assertTrue(Arrays.equals((byte[]) value, (byte[]) val)); + } else { + assertEquals(value, val); + } + } + + private void validateFooters(final List<Footer> metadata) { + logger.debug(metadata.toString()); + assertEquals(3, metadata.size()); + for (Footer footer : metadata) { + final File file = new File(footer.getFile().toUri()); + assertTrue(file.getName(), file.getName().startsWith("part")); + assertTrue(file.getPath(), file.exists()); + final ParquetMetadata parquetMetadata = footer.getParquetMetadata(); + assertEquals(2, parquetMetadata.getBlocks().size()); + final Map<String, String> keyValueMetaData = parquetMetadata.getFileMetaData().getKeyValueMetaData(); + assertEquals("bar", keyValueMetaData.get("foo")); + assertEquals(footer.getFile().getName(), keyValueMetaData.get(footer.getFile().getName())); + } + } + + private void validateContains(MessageType schema, PageReadStore pages, String[] path, int values, BytesInput bytes) + throws IOException { + PageReader pageReader = pages.getPageReader(schema.getColumnDescription(path)); + Page page = pageReader.readPage(); + assertEquals(values, page.getValueCount()); + assertArrayEquals(bytes.toByteArray(), page.getBytes().toByteArray()); + } + + private String getResource(String resourceName) { + return "resource:" + resourceName; + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/402be7e0/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestFileGenerator.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestFileGenerator.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestFileGenerator.java new file mode 100644 index 0000000..72f9123 --- /dev/null +++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestFileGenerator.java @@ -0,0 +1,210 @@ +package org.apache.drill.exec.store.parquet; + +import static parquet.column.Encoding.PLAIN; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.drill.common.types.TypeProtos; +import org.apache.drill.exec.store.ByteArrayUtil; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import parquet.bytes.BytesInput; +import parquet.column.ColumnDescriptor; +import parquet.hadoop.ParquetFileWriter; +import parquet.hadoop.metadata.CompressionCodecName; +import parquet.schema.MessageType; +import parquet.schema.MessageTypeParser; + +public class TestFileGenerator { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestFileGenerator.class); + + + + // 10 mb per page + static int bytesPerPage = 1024 * 1024 * 1; + // { 00000001, 00000010, 00000100, 00001000, 00010000, ... } + static byte[] bitFields = { 1, 2, 4, 8, 16, 32, 64, -128 }; + static final byte allBitsTrue = -1; + static final byte allBitsFalse = 0; + static final byte[] varLen1 = { 50, 51, 52, 53, 54, 55, 56 }; + static final byte[] varLen2 = { 15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1 }; + static final byte[] varLen3 = { 100, 99, 98 }; + + static final Object[] intVals = { -200, 100, Integer.MAX_VALUE }; + static final Object[] longVals = { -5000l, 5000l, Long.MAX_VALUE }; + static final Object[] floatVals = { 1.74f, Float.MAX_VALUE, Float.MIN_VALUE }; + static final Object[] doubleVals = { 100.45d, Double.MAX_VALUE, Double.MIN_VALUE, }; + static final Object[] boolVals = { false, false, true }; + static final Object[] binVals = { varLen1, varLen2, varLen3 }; + static final Object[] bin2Vals = { varLen3, varLen2, varLen1 }; + + static class FieldInfo { + + String parquetType; + String name; + int bitLength; + int numberOfPages; + Object[] values; + TypeProtos.MinorType type; + + FieldInfo(int recordsPerRowGroup, String parquetType, String name, int bitLength, Object[] values, TypeProtos.MinorType type) { + this.parquetType = parquetType; + this.name = name; + this.bitLength = bitLength; + this.numberOfPages = Math.max(1, (int) Math.ceil(recordsPerRowGroup * bitLength / 8.0 / bytesPerPage)); + this.values = values; + // generator is designed to use 3 values + assert values.length == 3; + this.type = type; + } + } + + private static class WrapAroundCounter { + + int maxVal; + int val; + + public WrapAroundCounter(int maxVal) { + this.maxVal = maxVal; + } + + public int increment() { + val++; + if (val > maxVal) { + val = 0; + } + return val; + } + + public void reset() { + val = 0; + } + + } + + public static HashMap<String, FieldInfo> getFieldMap(int recordsPerRowGroup) { + HashMap<String, FieldInfo> fields = new HashMap<>(); + fields.put("integer/", new FieldInfo(recordsPerRowGroup, "int32", "integer", 32, intVals, TypeProtos.MinorType.INT)); + fields.put("bigInt/", new FieldInfo(recordsPerRowGroup, "int64", "bigInt", 64, longVals, TypeProtos.MinorType.BIGINT)); + fields.put("f/", new FieldInfo(recordsPerRowGroup, "float", "f", 32, floatVals, TypeProtos.MinorType.FLOAT4)); + fields.put("d/", new FieldInfo(recordsPerRowGroup, "double", "d", 64, doubleVals, TypeProtos.MinorType.FLOAT8)); + // fields.put("b/", new FieldInfo("binary", "b", 1, boolVals, TypeProtos.MinorType.BIT)); + fields.put("bin/", new FieldInfo(recordsPerRowGroup, "binary", "bin", -1, binVals, TypeProtos.MinorType.VARBINARY)); + fields.put("bin2/", new FieldInfo(recordsPerRowGroup, "binary", "bin2", -1, bin2Vals, TypeProtos.MinorType.VARBINARY)); + return fields; + } + + public static void generateParquetFile(String filename, int numberRowGroups, int recordsPerRowGroup) throws Exception { + final Map<String, FieldInfo> fields = getFieldMap(recordsPerRowGroup); + + int currentBooleanByte = 0; + WrapAroundCounter booleanBitCounter = new WrapAroundCounter(7); + Configuration configuration = new Configuration(); + configuration.set(ParquetStorageEngine.HADOOP_DEFAULT_NAME, "file:///"); + // "message m { required int32 integer; required int64 integer64; required boolean b; required float f; required double d;}" + + FileSystem fs = FileSystem.get(configuration); + Path path = new Path(filename); + if (fs.exists(path)) + fs.delete(path, false); + + String messageSchema = "message m {"; + for (FieldInfo fieldInfo : fields.values()) { + messageSchema += " required " + fieldInfo.parquetType + " " + fieldInfo.name + ";"; + } + // remove the last semicolon, java really needs a join method for strings... + // TODO - nvm apparently it requires a semicolon after every field decl, might want to file a bug + // messageSchema = messageSchema.substring(schemaType, messageSchema.length() - 1); + messageSchema += "}"; + + MessageType schema = MessageTypeParser.parseMessageType(messageSchema); + + CompressionCodecName codec = CompressionCodecName.UNCOMPRESSED; + ParquetFileWriter w = new ParquetFileWriter(configuration, schema, path); + w.start(); + HashMap<String, Integer> columnValuesWritten = new HashMap(); + int valsWritten; + for (int k = 0; k < numberRowGroups; k++) { + w.startBlock(1); + + for (FieldInfo fieldInfo : fields.values()) { + + if (!columnValuesWritten.containsKey(fieldInfo.name)) { + columnValuesWritten.put((String) fieldInfo.name, 0); + valsWritten = 0; + } else { + valsWritten = columnValuesWritten.get(fieldInfo.name); + } + + String[] path1 = { (String) fieldInfo.name }; + ColumnDescriptor c1 = schema.getColumnDescription(path1); + + w.startColumn(c1, recordsPerRowGroup, codec); + int valsPerPage = (int) Math.ceil(recordsPerRowGroup / (float) ((int) fieldInfo.numberOfPages)); + byte[] bytes; + // for variable length binary fields + int bytesNeededToEncodeLength = 4; + if ((int) fieldInfo.bitLength > 0) { + bytes = new byte[(int) Math.ceil(valsPerPage * (int) fieldInfo.bitLength / 8.0)]; + } else { + // the twelve at the end is to account for storing a 4 byte length with each value + int totalValLength = ((byte[]) fieldInfo.values[0]).length + ((byte[]) fieldInfo.values[1]).length + + ((byte[]) fieldInfo.values[2]).length + 3 * bytesNeededToEncodeLength; + // used for the case where there is a number of values in this row group that is not divisible by 3 + int leftOverBytes = 0; + if (valsPerPage % 3 > 0) + leftOverBytes += ((byte[]) fieldInfo.values[1]).length + 4; + if (valsPerPage % 3 > 1) + leftOverBytes += ((byte[]) fieldInfo.values[2]).length + 4; + bytes = new byte[valsPerPage / 3 * totalValLength + leftOverBytes]; + } + int bytesPerPage = (int) (valsPerPage * ((int) fieldInfo.bitLength / 8.0)); + int bytesWritten = 0; + for (int z = 0; z < (int) fieldInfo.numberOfPages; z++, bytesWritten = 0) { + for (int i = 0; i < valsPerPage; i++) { + // System.out.print(i + ", " + (i % 25 == 0 ? "\n gen " + fieldInfo.name + ": " : "")); + if (fieldInfo.values[0] instanceof Boolean) { + + bytes[currentBooleanByte] |= bitFields[booleanBitCounter.val] + & ((boolean) fieldInfo.values[valsWritten % 3] ? allBitsTrue : allBitsFalse); + booleanBitCounter.increment(); + if (booleanBitCounter.val == 0) { + currentBooleanByte++; + } + valsWritten++; + if (currentBooleanByte > bytesPerPage) + break; + } else { + if (fieldInfo.values[valsWritten % 3] instanceof byte[]) { + System.arraycopy(ByteArrayUtil.toByta(((byte[]) fieldInfo.values[valsWritten % 3]).length), 0, bytes, + bytesWritten, bytesNeededToEncodeLength); + System.arraycopy(fieldInfo.values[valsWritten % 3], 0, bytes, bytesWritten + bytesNeededToEncodeLength, + ((byte[]) fieldInfo.values[valsWritten % 3]).length); + bytesWritten += ((byte[]) fieldInfo.values[valsWritten % 3]).length + bytesNeededToEncodeLength; + } else { + System.arraycopy(ByteArrayUtil.toByta(fieldInfo.values[valsWritten % 3]), 0, bytes, i + * ((int) fieldInfo.bitLength / 8), (int) fieldInfo.bitLength / 8); + } + valsWritten++; + } + + } + w.writeDataPage((int) (recordsPerRowGroup / (int) fieldInfo.numberOfPages), bytes.length, + BytesInput.from(bytes), PLAIN, PLAIN, PLAIN); + currentBooleanByte = 0; + } + w.endColumn(); + columnValuesWritten.remove((String) fieldInfo.name); + columnValuesWritten.put((String) fieldInfo.name, valsWritten); + } + + w.endBlock(); + } + w.end(new HashMap<String, String>()); + logger.debug("Finished generating parquet file."); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/402be7e0/sandbox/prototype/exec/java-exec/src/test/resources/scan_screen_logical.json ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/test/resources/scan_screen_logical.json b/sandbox/prototype/exec/java-exec/src/test/resources/scan_screen_logical.json index 90ba2c1..4f44f9e 100644 --- a/sandbox/prototype/exec/java-exec/src/test/resources/scan_screen_logical.json +++ b/sandbox/prototype/exec/java-exec/src/test/resources/scan_screen_logical.json @@ -7,12 +7,18 @@ info:"na" } }, + "storage" : { + "mock" : { + "type" : "mock" + } +}, + query:[ { @id:"1", op:"scan", memo:"initial_scan", - storageengine:"local-logs", + storageengine:"mock", selection: [ { records : 100, http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/402be7e0/sandbox/prototype/exec/java-exec/src/test/sh/logback.xml ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/test/sh/logback.xml b/sandbox/prototype/exec/java-exec/src/test/sh/logback.xml new file mode 100644 index 0000000..6f2928a --- /dev/null +++ b/sandbox/prototype/exec/java-exec/src/test/sh/logback.xml @@ -0,0 +1,35 @@ +<?xml version="1.0" encoding="UTF-8" ?> +<configuration> + <appender name="SOCKET" + class="de.huxhorn.lilith.logback.appender.ClassicMultiplexSocketAppender"> + <Compressing>true</Compressing> + <ReconnectionDelay>10000</ReconnectionDelay> + <IncludeCallerData>true</IncludeCallerData> + <RemoteHosts>localhost</RemoteHosts> + </appender> + + <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + </pattern> + </encoder> + </appender> + + <logger name="org.apache.drill" additivity="false"> + <level value="info" /> + <appender-ref ref="STDOUT" /> +<!-- <appender-ref ref="STDOUT" /> --> + </logger> + + <logger name="io.netty" additivity="false"> + <level value="info" /> + <appender-ref ref="SOCKET" /> +<!-- <appender-ref ref="STDOUT" /> --> + </logger> + + <root> + <level value="error" /> + <appender-ref ref="STDOUT" /> + </root> + +</configuration> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/402be7e0/sandbox/prototype/exec/java-exec/src/test/sh/runbit ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/test/sh/runbit b/sandbox/prototype/exec/java-exec/src/test/sh/runbit index 31d0729..8a54006 100755 --- a/sandbox/prototype/exec/java-exec/src/test/sh/runbit +++ b/sandbox/prototype/exec/java-exec/src/test/sh/runbit @@ -4,6 +4,6 @@ PROJECT_ROOT=../../../ mvn dependency:build-classpath -f=$PROJECT_ROOT/pom.xml -Dmdep.outputFile=target/sh/cp.txt CP=`cat $PROJECT_ROOT/target/sh/cp.txt` -CP=$CP:$PROJECT_ROOT/target/classes:$PROJECT_ROOT/target/test-classes -java -XX:MaxDirectMemorySize=8192M -cp $CP org.apache.drill.exec.server.Drillbit +CP=./:$CP:$PROJECT_ROOT/target/classes:$PROJECT_ROOT/target/test-classes +java -Dlogback.configurationFile=logback.xml -XX:MaxDirectMemorySize=8192M -cp $CP org.apache.drill.exec.server.Drillbit http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/402be7e0/sandbox/prototype/pom.xml ---------------------------------------------------------------------- diff --git a/sandbox/prototype/pom.xml b/sandbox/prototype/pom.xml index 52b2250..382c5ff 100644 --- a/sandbox/prototype/pom.xml +++ b/sandbox/prototype/pom.xml @@ -56,6 +56,15 @@ <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-jar-plugin</artifactId> + <configuration> + <excludes> + <exclude>**/logging.properties</exclude> + <exclude>**/logback-test.xml</exclude> + <exclude>**/logback.out.xml</exclude> + <exclude>**/logback.xml</exclude> + </excludes> + </configuration> + <executions> <execution> <goals> @@ -64,6 +73,7 @@ </execution> </executions> </plugin> + <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-resources-plugin</artifactId>
