Updated Branches: refs/heads/master 47985bad0 -> c50135256
Updates to Parquet for VariableLengthVectors and Bit fields Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/556bd963 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/556bd963 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/556bd963 Branch: refs/heads/master Commit: 556bd963aff5e01ab041db3529a7ae4f847f9bd0 Parents: 47985ba Author: Jason Altekruse <[email protected]> Authored: Tue Aug 13 18:22:49 2013 -0700 Committer: Jacques Nadeau <[email protected]> Committed: Thu Aug 15 18:54:14 2013 -0700 ---------------------------------------------------------------------- .../templates/VariableLengthVectors.java | 3 +- .../drill/exec/store/parquet/BitReader.java | 3 +- .../drill/exec/store/parquet/ColumnReader.java | 5 +-- .../exec/store/parquet/PageReadStatus.java | 2 +- .../exec/store/parquet/ParquetRecordReader.java | 6 +-- .../drill/exec/vector/BaseDataValueVector.java | 4 +- .../org/apache/drill/exec/vector/BitVector.java | 13 ++++++- .../parquet_scan_screen_read_entry_replace.json | 39 ++++++++++++++++++++ sandbox/prototype/pom.xml | 2 +- 9 files changed, 60 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/556bd963/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/VariableLengthVectors.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/VariableLengthVectors.java b/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/VariableLengthVectors.java index 3be6dc2..e3c59fc 100644 --- a/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/VariableLengthVectors.java +++ b/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/VariableLengthVectors.java @@ -14,7 +14,6 @@ import java.io.Closeable; import java.nio.ByteBuffer; import java.util.Random; -import org.apache.commons.lang3.ArrayUtils; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.proto.SchemaDefProtos; import org.apache.drill.exec.proto.UserBitShared.FieldMetadata; @@ -75,7 +74,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V @Override public FieldMetadata getMetadata() { - int len = (valueCount+1) * ${type.width} + getVarByteLength(); + int len = (valueCount + 1) * ${type.width} + getVarByteLength(); return FieldMetadata.newBuilder() .setDef(getField().getDef()) .setValueCount(valueCount) http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/556bd963/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/BitReader.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/BitReader.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/BitReader.java index c85d4aa..8727341 100644 --- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/BitReader.java +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/BitReader.java @@ -52,11 +52,10 @@ public final class BitReader extends ColumnReader { vectorData = ((BaseDataValueVector) valueVecHolder.getValueVector()).getData(); nextByte = bytes[(int) Math.max(0, Math.ceil(pageReadStatus.valuesRead / 8.0) - 1)]; readLengthInBits = recordsReadInThisIteration + pageReadStatus.bitShift; - //recordsReadInThisIteration -= (8 - pageReadStatus.bitShift); int i = 0; // read individual bytes with appropriate shifting - for (; i <= (int) readLength; i++) { + for (; i < (int) readLength; i++) { currentByte = nextByte; currentByte = (byte) (currentByte >>> pageReadStatus.bitShift); // mask the bits about to be added from the next byte http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/556bd963/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnReader.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnReader.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnReader.java index 99b65e6..c62613a 100644 --- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnReader.java +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnReader.java @@ -39,8 +39,6 @@ public abstract class ColumnReader { long readPositionInBuffer; - int compressedSize; - // quick reference to see if the field is fixed length (as this requires an instanceof) boolean isFixedLength; // counter for the total number of values read from one or more pages @@ -89,7 +87,6 @@ public abstract class ColumnReader { // if no page has been read, or all of the records have been read out of a page, read the next one if (pageReadStatus.currentPage == null || pageReadStatus.valuesRead == pageReadStatus.currentPage.getValueCount()) { - totalValuesRead += pageReadStatus.valuesRead; if (!pageReadStatus.next()) { break; } @@ -107,7 +104,7 @@ public abstract class ColumnReader { } } while (valuesReadInCurrentPass < recordsToReadInThisPass && pageReadStatus.currentPage != null); - ((BaseDataValueVector) valueVecHolder.getValueVector()).getMutator().setValueCount( + valueVecHolder.getValueVector().getMutator().setValueCount( valuesReadInCurrentPass); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/556bd963/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/PageReadStatus.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/PageReadStatus.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/PageReadStatus.java index 0378960..c5c0d87 100644 --- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/PageReadStatus.java +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/PageReadStatus.java @@ -80,7 +80,7 @@ public final class PageReadStatus { // because it is needed, but there might be a problem with it ByteBufInputStream f = new ByteBufInputStream(parentColumnReader.parentReader.getBufferWithAllData().slice( (int) parentColumnReader.readPositionInBuffer, - Math.min(50, parentColumnReader.parentReader.getBufferWithAllData().capacity() - (int) parentColumnReader.readPositionInBuffer))); + Math.min(200, parentColumnReader.parentReader.getBufferWithAllData().capacity() - (int) parentColumnReader.readPositionInBuffer))); int before = f.available(); PageHeader pageHeader = readPageHeader(f); int length = before - f.available(); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/556bd963/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java index 2d36a08..4e46034 100644 --- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java @@ -203,8 +203,6 @@ public class ParquetRecordReader implements RecordReader { @Override public void setup(OutputMutator output) throws ExecutionSetupException { - long tA = System.nanoTime(), tB; - System.out.println( new SimpleDateFormat("mm:ss S").format(new Date()) + " :Start of ParquetRecordReader.setup"); output.removeAllFields(); try { @@ -232,6 +230,9 @@ public class ParquetRecordReader implements RecordReader { else{ start = rowGroupOffset; } + // TODO - the methods for get total size and get total uncompressed size seem to have the opposite results of + // what they should + // I found the bug in the mainline and made a issue for it, hopefully it will be fixed soon for (ColumnReader crs : columnStatuses){ totalByteLength += crs.columnChunkMetaData.getTotalSize(); } @@ -256,7 +257,6 @@ public class ParquetRecordReader implements RecordReader { } catch (IOException e) { throw new ExecutionSetupException("Error opening or reading metatdata for parquet file at location: " + hadoopPath.getName()); } - System.out.println( "Total time in method: " + ((float) (System.nanoTime() - tA) / 1e9)); } private static String toFieldName(String[] paths) { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/556bd963/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java index 54a6cb8..f41dcd2 100644 --- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java @@ -44,9 +44,7 @@ public abstract class BaseDataValueVector extends BaseValueVector{ } @Override - public FieldMetadata getMetadata() { - return null; - } + public abstract FieldMetadata getMetadata(); public ByteBuf getData(){ return data; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/556bd963/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java index 910b80e..c868dff 100644 --- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java @@ -5,6 +5,7 @@ import io.netty.buffer.ByteBuf; import java.util.Random; import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.proto.UserBitShared; import org.apache.drill.exec.proto.UserBitShared.FieldMetadata; import org.apache.drill.exec.record.DeadBuf; import org.apache.drill.exec.record.MaterializedField; @@ -27,8 +28,17 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe super(field, allocator); } + @Override + public FieldMetadata getMetadata() { + return FieldMetadata.newBuilder() + .setDef(getField().getDef()) + .setValueCount(valueCount) + .setBufferLength( (int) Math.ceil(valueCount / 8.0)) + .build(); + } + private int getSizeFromCount(int valueCount) { - return (int) Math.ceil((float)valueCount / 8); + return (int) Math.ceil((float)valueCount / 8.0); } /** @@ -42,6 +52,7 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe valueCapacity = valueCount; int valueSize = getSizeFromCount(valueCount); data = allocator.buffer(valueSize); + this.data.retain(); for (int i = 0; i < valueSize; i++) { data.setByte(i, 0); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/556bd963/sandbox/prototype/exec/java-exec/src/test/resources/parquet_scan_screen_read_entry_replace.json ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/test/resources/parquet_scan_screen_read_entry_replace.json b/sandbox/prototype/exec/java-exec/src/test/resources/parquet_scan_screen_read_entry_replace.json new file mode 100644 index 0000000..af76e01 --- /dev/null +++ b/sandbox/prototype/exec/java-exec/src/test/resources/parquet_scan_screen_read_entry_replace.json @@ -0,0 +1,39 @@ +{ + head:{ + type:"APACHE_DRILL_LOGICAL", + version:"1", + generator:{ + type:"manual", + info:"na" + } + }, + storage:{ + "parquet" : + { + "type":"parquet", + "dfsName" : "file:///" + } + }, + query:[ + { + @id:"1", + op:"scan", + memo:"initial_scan", + storageengine:"parquet", + selection: [ + &REPLACED_IN_PARQUET_TEST& + ] + }, + { + @id:"2", + input: 1, + op: "store", + memo: "output sink", + target: { + file: "console:///stdout" + } + + } + + ] +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/556bd963/sandbox/prototype/pom.xml ---------------------------------------------------------------------- diff --git a/sandbox/prototype/pom.xml b/sandbox/prototype/pom.xml index 382c5ff..76bdf24 100644 --- a/sandbox/prototype/pom.xml +++ b/sandbox/prototype/pom.xml @@ -131,7 +131,7 @@ <artifactId>maven-surefire-plugin</artifactId> <version>2.15</version> <configuration> - <argLine>-XX:MaxDirectMemorySize=4096M</argLine> + <argLine>-XX:MaxDirectMemorySize=4096M </argLine> </configuration> </plugin> <!--This plugin's configuration is used to store Eclipse m2e settings
