http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b73c214/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java new file mode 100644 index 0000000..4513aaa --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.parquet.columnreaders; + +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.util.DecimalUtility; +import org.apache.drill.exec.expr.holders.Decimal28SparseHolder; +import org.apache.drill.exec.expr.holders.Decimal38SparseHolder; +import org.apache.drill.exec.store.ParquetOutputRecordWriter; +import org.apache.drill.exec.vector.DateVector; +import org.apache.drill.exec.vector.Decimal28SparseVector; +import org.apache.drill.exec.vector.Decimal38SparseVector; +import org.apache.drill.exec.vector.ValueVector; +import org.joda.time.DateTimeUtils; +import parquet.column.ColumnDescriptor; +import parquet.format.SchemaElement; +import parquet.hadoop.metadata.ColumnChunkMetaData; + +import java.math.BigDecimal; + +class FixedByteAlignedReader extends ColumnReader { + + protected byte[] bytes; + + + FixedByteAlignedReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, + boolean fixedLength, ValueVector v, SchemaElement schemaElement) throws ExecutionSetupException { + super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); + } + + // this method is called by its superclass during a read loop + @Override + protected void readField(long recordsToReadInThisPass) { + + recordsReadInThisIteration = Math.min(pageReader.currentPage.getValueCount() + - pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass); + + readStartInBytes = pageReader.readPosInBytes; + readLengthInBits = recordsReadInThisIteration * dataTypeLengthInBits; + readLength = (int) Math.ceil(readLengthInBits / 8.0); + + bytes = pageReader.pageDataByteArray; + // vectorData is assigned by the superclass read loop method + writeData(); + } + + protected void writeData() { + vectorData.writeBytes(bytes, + (int) readStartInBytes, (int) readLength); + } + + public static abstract class ConvertedReader extends FixedByteAlignedReader { + + protected int dataTypeLengthInBytes; + + ConvertedReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, + boolean fixedLength, ValueVector v, SchemaElement schemaElement) throws ExecutionSetupException { + super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); + } + + public void writeData() { + dataTypeLengthInBytes = (int) Math.ceil(dataTypeLengthInBits / 8.0); + for (int i = 0; i < recordsReadInThisIteration; i++) { + addNext((int)readStartInBytes + i * dataTypeLengthInBytes, i + valuesReadInCurrentPass); + } + } + + /** + * Reads from bytes, converts, and writes to buffer + * @param start the index in bytes to start reading from + * @param index the index of the ValueVector + */ + abstract void addNext(int start, int index); + } + + public static class DateReader extends ConvertedReader { + + DateVector dateVector; + + DateReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, + boolean fixedLength, ValueVector v, SchemaElement schemaElement) throws ExecutionSetupException { + super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); + dateVector = (DateVector) v; + } + + @Override + void addNext(int start, int index) { + dateVector.getMutator().set(index, DateTimeUtils.fromJulianDay( + NullableFixedByteAlignedReaders.NullableDateReader.readIntLittleEndian(bytes, start) + - ParquetOutputRecordWriter.JULIAN_DAY_EPOC - 0.5)); + } + } + + public static class Decimal28Reader extends ConvertedReader { + + Decimal28SparseVector decimal28Vector; + + Decimal28Reader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, + boolean fixedLength, ValueVector v, SchemaElement schemaElement) throws ExecutionSetupException { + super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); + decimal28Vector = (Decimal28SparseVector) v; + } + + @Override + void addNext(int start, int index) { + int width = Decimal28SparseHolder.WIDTH; + BigDecimal intermediate = DecimalUtility.getBigDecimalFromByteArray(bytes, start, dataTypeLengthInBytes, schemaElement.getScale()); + DecimalUtility.getSparseFromBigDecimal(intermediate, decimal28Vector.getData(), index * width, schemaElement.getScale(), + schemaElement.getPrecision(), Decimal28SparseHolder.nDecimalDigits); + } + } + + public static class Decimal38Reader extends ConvertedReader { + + Decimal38SparseVector decimal38Vector; + + Decimal38Reader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, + boolean fixedLength, ValueVector v, SchemaElement schemaElement) throws ExecutionSetupException { + super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); + decimal38Vector = (Decimal38SparseVector) v; + } + + @Override + void addNext(int start, int index) { + int width = Decimal38SparseHolder.WIDTH; + BigDecimal intermediate = DecimalUtility.getBigDecimalFromByteArray(bytes, start, dataTypeLengthInBytes, schemaElement.getScale()); + DecimalUtility.getSparseFromBigDecimal(intermediate, decimal38Vector.getData(), index * width, schemaElement.getScale(), + schemaElement.getPrecision(), Decimal38SparseHolder.nDecimalDigits); + } + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b73c214/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedWidthRepeatedReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedWidthRepeatedReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedWidthRepeatedReader.java new file mode 100644 index 0000000..bbff574 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedWidthRepeatedReader.java @@ -0,0 +1,213 @@ +/******************************************************************************* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ******************************************************************************/ +package org.apache.drill.exec.store.parquet.columnreaders; + +import org.apache.drill.exec.vector.RepeatedFixedWidthVector; +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.exec.vector.ValueVector; +import parquet.column.ColumnDescriptor; +import parquet.format.SchemaElement; +import parquet.hadoop.metadata.ColumnChunkMetaData; + +import java.io.IOException; + +public class FixedWidthRepeatedReader extends VarLengthColumn { + + RepeatedFixedWidthVector castedRepeatedVector; + ColumnReader dataReader; + int dataTypeLengthInBytes; + // we can do a vector copy of the data once we figure out how much we need to copy + // this tracks the number of values to transfer (the dataReader will translate this to a number + // of bytes to transfer and re-use the code from the non-repeated types) + int valuesToRead; + int repeatedGroupsReadInCurrentPass; + int repeatedValuesInCurrentList; + // empty lists are notated by definition levels, to stop reading at the correct time, we must keep + // track of the number of empty lists as well as the length of all of the defined lists together + int definitionLevelsRead; + // parquet currently does not restrict lists reaching across pages for repeated values, this necessitates + // tracking when this happens to stop some of the state updates until we know the full length of the repeated + // value for the current record + boolean notFishedReadingList; + byte[] leftOverBytes; + + FixedWidthRepeatedReader(ParquetRecordReader parentReader, ColumnReader dataReader, int dataTypeLengthInBytes, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, ValueVector valueVector, SchemaElement schemaElement) throws ExecutionSetupException { + super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, valueVector, schemaElement); + castedRepeatedVector = (RepeatedFixedWidthVector) valueVector; + this.dataTypeLengthInBytes = dataTypeLengthInBytes; + this.dataReader = dataReader; + this.dataReader.pageReader = this.pageReader; + // this is not in the reset method because it needs to be initialized only for the very first page read + // in all other cases if a read ends at a page boundary we will need to keep track of this flag and not + // clear it at the start of the next read loop + notFishedReadingList = false; + } + + public void reset() { + bytesReadInCurrentPass = 0; + valuesReadInCurrentPass = 0; + pageReader.valuesReadyToRead = 0; + dataReader.vectorData = castedRepeatedVector.getMutator().getDataVector().getData(); + dataReader.valuesReadInCurrentPass = 0; + repeatedGroupsReadInCurrentPass = 0; + } + + public int getRecordsReadInCurrentPass() { + return repeatedGroupsReadInCurrentPass; + } + + @Override + protected void readField(long recordsToRead) { + //To change body of implemented methods use File | Settings | File Templates. + } + + public boolean skipReadyToReadPositionUpdate() { + return false; + } + + public void updateReadyToReadPosition() { + valuesToRead += repeatedValuesInCurrentList; + pageReader.valuesReadyToRead += repeatedValuesInCurrentList; + repeatedGroupsReadInCurrentPass++; + currDictVal = null; + if ( ! notFishedReadingList) + repeatedValuesInCurrentList = -1; + } + + public void updatePosition() { + pageReader.readPosInBytes += dataTypeLengthInBits; + bytesReadInCurrentPass += dataTypeLengthInBits; + valuesReadInCurrentPass++; + } + + public void hitRowGroupEnd() { + pageReader.valuesReadyToRead = 0; + definitionLevelsRead = 0; + } + + public void postPageRead() { + super.postPageRead(); + // this is no longer correct as we figured out that lists can reach across pages + if ( ! notFishedReadingList) + repeatedValuesInCurrentList = -1; + definitionLevelsRead = 0; + } + + protected int totalValuesReadAndReadyToReadInPage() { + // we need to prevent the page reader from getting rid of the current page in the case where we have a repeated + // value split across a page boundary + if (notFishedReadingList) { + return definitionLevelsRead - repeatedValuesInCurrentList; + } + return definitionLevelsRead; + } + + protected boolean checkVectorCapacityReached() { + boolean doneReading = super.checkVectorCapacityReached(); + if (doneReading) + return true; + if (valuesReadInCurrentPass + pageReader.valuesReadyToRead + repeatedValuesInCurrentList >= valueVec.getValueCapacity()) + return true; + else + return false; + } + + protected boolean readAndStoreValueSizeInformation() { + boolean readingValsAcrossPageBoundary = false; + int numLeftoverVals = 0; + if (notFishedReadingList) { + numLeftoverVals = repeatedValuesInCurrentList; + readRecords(numLeftoverVals); + readingValsAcrossPageBoundary = true; + notFishedReadingList = false; + pageReader.valuesReadyToRead = 0; + try { + boolean stopReading = readPage(); + if (stopReading) { + // hit the end of a row group + return false; + } + } catch (IOException e) { + throw new RuntimeException("Unexpected error reading parquet repeated column.", e); + } + } + if ( currDefLevel == -1 ) { + currDefLevel = pageReader.definitionLevels.readInteger(); + definitionLevelsRead++; + } + int repLevel; + if ( columnDescriptor.getMaxDefinitionLevel() == currDefLevel){ + if (repeatedValuesInCurrentList == -1 || notFishedReadingList) { + repeatedValuesInCurrentList = 1; + do { + repLevel = pageReader.repetitionLevels.readInteger(); + if (repLevel > 0) { + repeatedValuesInCurrentList++; + currDefLevel = pageReader.definitionLevels.readInteger(); + definitionLevelsRead++; + + // we hit the end of this page, without confirmation that we reached the end of the current record + if (definitionLevelsRead == pageReader.currentPage.getValueCount()) { + // check that we have not hit the end of the row group (in which case we will not find the repetition level indicating + // the end of this record as there is no next page to check, we have read all the values in this repetition so it is okay + // to add it to the read ) + if (totalValuesRead + pageReader.valuesReadyToRead + repeatedValuesInCurrentList != columnChunkMetaData.getValueCount()){ + notFishedReadingList = true; + // if we hit this case, we cut off the current batch at the previous value, these extra values as well + // as those that spill into the next page will be added to the next batch + return true; + } + } + } + } while (repLevel != 0); + } + } + else { + repeatedValuesInCurrentList = 0; + } + int currentValueListLength = repeatedValuesInCurrentList; + if (readingValsAcrossPageBoundary) { + currentValueListLength += numLeftoverVals; + } + // this should not fail + if (!castedRepeatedVector.getMutator().setRepetitionAtIndexSafe(repeatedGroupsReadInCurrentPass, + currentValueListLength)) { + return true; + } + // This field is being referenced in the superclass determineSize method, so we need to set it here + // again going to make this the length in BYTES to avoid repetitive multiplication/division + dataTypeLengthInBits = repeatedValuesInCurrentList * dataTypeLengthInBytes; + return false; + } + + protected void readRecords(int valuesToRead) { + if (valuesToRead == 0) return; + // TODO - validate that this works in all cases, it fixes a bug when reading from multiple pages into + // a single vector + dataReader.valuesReadInCurrentPass = 0; + dataReader.readValues(valuesToRead); + valuesReadInCurrentPass += valuesToRead; + castedRepeatedVector.getMutator().setValueCounts(repeatedGroupsReadInCurrentPass, valuesReadInCurrentPass); + } + + @Override + public int capacity() { + return castedRepeatedVector.getMutator().getDataVector().getData().capacity(); + } +} + http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b73c214/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableBitReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableBitReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableBitReader.java new file mode 100644 index 0000000..fbf1dee --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableBitReader.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.parquet.columnreaders; + +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.exec.vector.NullableBitVector; +import org.apache.drill.exec.vector.ValueVector; +import parquet.column.ColumnDescriptor; +import parquet.format.SchemaElement; +import parquet.hadoop.metadata.ColumnChunkMetaData; + +/** + * This class is used in conjunction with its superclass to read nullable bit columns in a parquet file. + * It currently is using an inefficient value-by-value approach. + * TODO - make this more efficient by copying runs of values like in NullableFixedByteAlignedReader + * This will also involve incorporating the ideas from the BitReader (the reader for non-nullable bits) + * because page/batch boundaries that do not land on byte boundaries require shifting of all of the values + * in the next batch. + */ +final class NullableBitReader extends ColumnReader { + + NullableBitReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, + boolean fixedLength, ValueVector v, SchemaElement schemaElement) throws ExecutionSetupException { + super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); + } + + @Override + public void readField(long recordsToReadInThisPass) { + + recordsReadInThisIteration = Math.min(pageReader.currentPage.getValueCount() + - pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass); + int defLevel; + for (int i = 0; i < recordsReadInThisIteration; i++){ + defLevel = pageReader.definitionLevels.readInteger(); + // if the value is defined + if (defLevel == columnDescriptor.getMaxDefinitionLevel()){ + if (!((NullableBitVector)valueVec).getMutator().setSafe(i + valuesReadInCurrentPass, + pageReader.valueReader.readBoolean() ? 1 : 0 )) { + throw new RuntimeException(); + } + } + // otherwise the value is skipped, because the bit vector indicating nullability is zero filled + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b73c214/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableColumnReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableColumnReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableColumnReader.java new file mode 100644 index 0000000..2babc20 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableColumnReader.java @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.parquet.columnreaders; + +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.exec.vector.BaseValueVector; +import org.apache.drill.exec.vector.NullableVectorDefinitionSetter; +import org.apache.drill.exec.vector.ValueVector; +import parquet.column.ColumnDescriptor; +import parquet.format.SchemaElement; +import parquet.hadoop.metadata.ColumnChunkMetaData; + +import java.io.IOException; + +abstract class NullableColumnReader<V extends ValueVector> extends ColumnReader<V>{ + + int nullsFound; + // used to skip nulls found + int rightBitShift; + // used when copying less than a byte worth of data at a time, to indicate the number of used bits in the current byte + int bitsUsed; + BaseValueVector castedBaseVector; + NullableVectorDefinitionSetter castedVectorMutator; + + NullableColumnReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, + boolean fixedLength, V v, SchemaElement schemaElement) throws ExecutionSetupException { + super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); + castedBaseVector = (BaseValueVector) v; + castedVectorMutator = (NullableVectorDefinitionSetter) v.getMutator(); + } + + public void processPages(long recordsToReadInThisPass) throws IOException { + readStartInBytes = 0; + readLength = 0; + readLengthInBits = 0; + recordsReadInThisIteration = 0; + vectorData = castedBaseVector.getData(); + + do { + // if no page has been read, or all of the records have been read out of a page, read the next one + if (pageReader.currentPage == null + || pageReader.valuesRead == pageReader.currentPage.getValueCount()) { + if (!pageReader.next()) { + break; + } + } + + // values need to be spaced out where nulls appear in the column + // leaving blank space for nulls allows for random access to values + // to optimize copying data out of the buffered disk stream, runs of defined values + // are located and copied together, rather than copying individual values + + long runStart = pageReader.readPosInBytes; + int runLength; + int currentDefinitionLevel; + int currentValueIndexInVector = (int) recordsReadInThisIteration; + boolean lastValueWasNull; + int definitionLevelsRead; + // loop to find the longest run of defined values available, can be preceded by several nulls + while (true){ + definitionLevelsRead = 0; + lastValueWasNull = true; + nullsFound = 0; + runLength = 0; + if (currentValueIndexInVector == recordsToReadInThisPass + || currentValueIndexInVector >= valueVec.getValueCapacity()) { + break; + } + while(currentValueIndexInVector < recordsToReadInThisPass + && currentValueIndexInVector < valueVec.getValueCapacity() + && pageReader.valuesRead + definitionLevelsRead < pageReader.currentPage.getValueCount()){ + currentDefinitionLevel = pageReader.definitionLevels.readInteger(); + definitionLevelsRead++; + if ( currentDefinitionLevel < columnDescriptor.getMaxDefinitionLevel()){ + // a run of non-null values was found, break out of this loop to do a read in the outer loop + nullsFound++; + if ( ! lastValueWasNull ){ + currentValueIndexInVector++; + break; + } + lastValueWasNull = true; + } + else{ + if (lastValueWasNull){ + runStart = pageReader.readPosInBytes; + runLength = 0; + lastValueWasNull = false; + } + runLength++; + castedVectorMutator.setIndexDefined(currentValueIndexInVector); + } + currentValueIndexInVector++; + } + pageReader.readPosInBytes = runStart; + recordsReadInThisIteration = runLength; + + readField( runLength); + int writerIndex = ((BaseValueVector) valueVec).getData().writerIndex(); + if ( dataTypeLengthInBits > 8 || (dataTypeLengthInBits < 8 && totalValuesRead + runLength % 8 == 0)){ + castedBaseVector.getData().setIndex(0, writerIndex + (int) Math.ceil( nullsFound * dataTypeLengthInBits / 8.0)); + } + else if (dataTypeLengthInBits < 8){ + rightBitShift += dataTypeLengthInBits * nullsFound; + } + recordsReadInThisIteration += nullsFound; + valuesReadInCurrentPass += recordsReadInThisIteration; + totalValuesRead += recordsReadInThisIteration; + pageReader.valuesRead += recordsReadInThisIteration; + if ( (readStartInBytes + readLength >= pageReader.byteLength && bitsUsed == 0) + || pageReader.valuesRead == pageReader.currentPage.getValueCount()) { + if (!pageReader.next()) { + break; + } + } else { + pageReader.readPosInBytes = readStartInBytes + readLength; + } + } + } while (valuesReadInCurrentPass < recordsToReadInThisPass && pageReader.currentPage != null); + valueVec.getMutator().setValueCount( + valuesReadInCurrentPass); + } + + protected abstract void readField(long recordsToRead); +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b73c214/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java new file mode 100644 index 0000000..c1575de --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java @@ -0,0 +1,238 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.parquet.columnreaders; + +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.util.DecimalUtility; +import org.apache.drill.exec.vector.NullableBigIntVector; +import org.apache.drill.exec.vector.NullableFloat4Vector; +import org.apache.drill.exec.vector.NullableFloat8Vector; +import org.apache.drill.exec.vector.NullableIntVector; +import org.apache.drill.exec.vector.ValueVector; +import org.apache.drill.exec.expr.holders.NullableDecimal28SparseHolder; +import org.apache.drill.exec.expr.holders.NullableDecimal38SparseHolder; +import org.apache.drill.exec.store.ParquetOutputRecordWriter; +import org.apache.drill.exec.vector.NullableDateVector; +import org.apache.drill.exec.vector.NullableDecimal28SparseVector; +import org.apache.drill.exec.vector.NullableDecimal38SparseVector; + +import org.joda.time.DateTimeUtils; +import parquet.column.ColumnDescriptor; +import parquet.format.SchemaElement; +import parquet.hadoop.metadata.ColumnChunkMetaData; + +import java.math.BigDecimal; + +public class NullableFixedByteAlignedReaders { + + static class NullableFixedByteAlignedReader extends NullableColumnReader { + protected byte[] bytes; + + NullableFixedByteAlignedReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, + ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, ValueVector v, SchemaElement schemaElement) throws ExecutionSetupException { + super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); + } + + // this method is called by its superclass during a read loop + @Override + protected void readField(long recordsToReadInThisPass) { + this.recordsReadInThisIteration = recordsToReadInThisPass; + + // set up metadata + this.readStartInBytes = pageReader.readPosInBytes; + this.readLengthInBits = recordsReadInThisIteration * dataTypeLengthInBits; + this.readLength = (int) Math.ceil(readLengthInBits / 8.0); + this.bytes = pageReader.pageDataByteArray; + + // fill in data. + vectorData.writeBytes(bytes, (int) readStartInBytes, (int) readLength); + } + } + + static class NullableDictionaryIntReader extends NullableColumnReader<NullableIntVector> { + + private byte[] bytes; + + NullableDictionaryIntReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, + ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableIntVector v, + SchemaElement schemaElement) throws ExecutionSetupException { + super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); + } + + // this method is called by its superclass during a read loop + @Override + protected void readField(long recordsToReadInThisPass) { + if (usingDictionary) { + for (int i = 0; i < recordsToReadInThisPass; i++){ + valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readInteger()); + } + } + } + } + + static class NullableDictionaryBigIntReader extends NullableColumnReader<NullableBigIntVector> { + + private byte[] bytes; + + NullableDictionaryBigIntReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, + ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableBigIntVector v, + SchemaElement schemaElement) throws ExecutionSetupException { + super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); + } + + // this method is called by its superclass during a read loop + @Override + protected void readField(long recordsToReadInThisPass) { + for (int i = 0; i < recordsToReadInThisPass; i++){ + valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readLong()); + } + } + } + + static class NullableDictionaryFloat4Reader extends NullableColumnReader<NullableFloat4Vector> { + + private byte[] bytes; + + NullableDictionaryFloat4Reader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, + ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableFloat4Vector v, + SchemaElement schemaElement) throws ExecutionSetupException { + super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); + } + + // this method is called by its superclass during a read loop + @Override + protected void readField(long recordsToReadInThisPass) { + for (int i = 0; i < recordsToReadInThisPass; i++){ + valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readFloat()); + } + } + } + + static class NullableDictionaryFloat8Reader extends NullableColumnReader<NullableFloat8Vector> { + + private byte[] bytes; + + NullableDictionaryFloat8Reader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, + ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableFloat8Vector v, + SchemaElement schemaElement) throws ExecutionSetupException { + super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); + } + + // this method is called by its superclass during a read loop + @Override + protected void readField(long recordsToReadInThisPass) { + for (int i = 0; i < recordsToReadInThisPass; i++){ + valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readDouble()); + } + } + } + + static abstract class NullableConvertedReader extends NullableFixedByteAlignedReader { + + protected int dataTypeLengthInBytes; + + NullableConvertedReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, + ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, ValueVector v, SchemaElement schemaElement) throws ExecutionSetupException { + super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); + } + + @Override + protected void readField(long recordsToReadInThisPass) { + + this.recordsReadInThisIteration = recordsToReadInThisPass; + + // set up metadata + this.readStartInBytes = pageReader.readPosInBytes; + this.readLengthInBits = recordsReadInThisIteration * dataTypeLengthInBits; + this.readLength = (int) Math.ceil(readLengthInBits / 8.0); + this.bytes = pageReader.pageDataByteArray; + + dataTypeLengthInBytes = (int) Math.ceil(dataTypeLengthInBits / 8.0); + for (int i = 0; i < recordsReadInThisIteration; i++) { + addNext((int) readStartInBytes + i * dataTypeLengthInBytes, i + valuesReadInCurrentPass); + } + } + + abstract void addNext(int start, int index); + } + + public static class NullableDateReader extends NullableConvertedReader { + + NullableDateVector dateVector; + + NullableDateReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, + boolean fixedLength, ValueVector v, SchemaElement schemaElement) throws ExecutionSetupException { + super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); + dateVector = (NullableDateVector) v; + } + + @Override + void addNext(int start, int index) { + dateVector.getMutator().set(index, DateTimeUtils.fromJulianDay(readIntLittleEndian(bytes, start) - ParquetOutputRecordWriter.JULIAN_DAY_EPOC - 0.5)); + } + + // copied out of parquet library, didn't want to deal with the uneeded throws statement they had declared + public static int readIntLittleEndian(byte[] in, int offset) { + int ch4 = in[offset] & 0xff; + int ch3 = in[offset + 1] & 0xff; + int ch2 = in[offset + 2] & 0xff; + int ch1 = in[offset + 3] & 0xff; + return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0)); + } + + } + + public static class NullableDecimal28Reader extends NullableConvertedReader { + + NullableDecimal28SparseVector decimal28Vector; + + NullableDecimal28Reader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, + boolean fixedLength, ValueVector v, SchemaElement schemaElement) throws ExecutionSetupException { + super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); + decimal28Vector = (NullableDecimal28SparseVector) v; + } + + @Override + void addNext(int start, int index) { + int width = NullableDecimal28SparseHolder.WIDTH; + BigDecimal intermediate = DecimalUtility.getBigDecimalFromByteArray(bytes, start, dataTypeLengthInBytes, schemaElement.getScale()); + DecimalUtility.getSparseFromBigDecimal(intermediate, decimal28Vector.getData(), index * width, schemaElement.getScale(), + schemaElement.getPrecision(), NullableDecimal28SparseHolder.nDecimalDigits); + } + } + + public static class NullableDecimal38Reader extends NullableConvertedReader { + + NullableDecimal38SparseVector decimal38Vector; + + NullableDecimal38Reader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, + boolean fixedLength, ValueVector v, SchemaElement schemaElement) throws ExecutionSetupException { + super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); + decimal38Vector = (NullableDecimal38SparseVector) v; + } + + @Override + void addNext(int start, int index) { + int width = NullableDecimal38SparseHolder.WIDTH; + BigDecimal intermediate = DecimalUtility.getBigDecimalFromByteArray(bytes, start, dataTypeLengthInBytes, schemaElement.getScale()); + DecimalUtility.getSparseFromBigDecimal(intermediate, decimal38Vector.getData(), index * width, schemaElement.getScale(), + schemaElement.getPrecision(), NullableDecimal38SparseHolder.nDecimalDigits); + } + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b73c214/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableVarLengthValuesColumn.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableVarLengthValuesColumn.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableVarLengthValuesColumn.java new file mode 100644 index 0000000..2be9a37 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableVarLengthValuesColumn.java @@ -0,0 +1,130 @@ +/******************************************************************************* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ******************************************************************************/ +package org.apache.drill.exec.store.parquet.columnreaders; + +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.exec.vector.ValueVector; +import parquet.bytes.BytesUtils; +import parquet.column.ColumnDescriptor; +import parquet.format.SchemaElement; +import parquet.hadoop.metadata.ColumnChunkMetaData; + +import java.io.IOException; + +public abstract class NullableVarLengthValuesColumn<V extends ValueVector> extends VarLengthValuesColumn<V> { + + int nullsRead; + boolean currentValNull = false; + + NullableVarLengthValuesColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, + ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, V v, + SchemaElement schemaElement) throws ExecutionSetupException { + super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); + } + + public abstract boolean setSafe(int index, byte[] value, int start, int length); + + public abstract int capacity(); + + public void reset() { + bytesReadInCurrentPass = 0; + valuesReadInCurrentPass = 0; + nullsRead = 0; + pageReader.valuesReadyToRead = 0; + } + + protected void postPageRead() { + currLengthDeterminingDictVal = null; + pageReader.valuesReadyToRead = 0; + } + + protected boolean readAndStoreValueSizeInformation() throws IOException { + // we need to read all of the lengths to determine if this value will fit in the current vector, + // as we can only read each definition level once, we have to store the last one as we will need it + // at the start of the next read if we decide after reading all of the varlength values in this record + // that it will not fit in this batch + currentValNull = false; + if ( currDefLevel == -1 ) { + currDefLevel = pageReader.definitionLevels.readInteger(); + } + if ( columnDescriptor.getMaxDefinitionLevel() > currDefLevel){ + nullsRead++; + // set length of zero, each index in the vector defaults to null so no need to set the nullability + variableWidthVector.getMutator().setValueLengthSafe( + valuesReadInCurrentPass + pageReader.valuesReadyToRead, 0); + currentValNull = true; + return false;// field is null, no length to add to data vector + } + + if (usingDictionary) { + if (currLengthDeterminingDictVal == null) { + currLengthDeterminingDictVal = pageReader.dictionaryLengthDeterminingReader.readBytes(); + } + currDictValToWrite = currLengthDeterminingDictVal; + // re-purposing this field here for length in BYTES to prevent repetitive multiplication/division + dataTypeLengthInBits = currLengthDeterminingDictVal.length(); + } + else { + // re-purposing this field here for length in BYTES to prevent repetitive multiplication/division + dataTypeLengthInBits = BytesUtils.readIntLittleEndian(pageReader.pageDataByteArray, + (int) pageReader.readyToReadPosInBytes); + } + // I think this also needs to happen if it is null for the random access + if (! variableWidthVector.getMutator().setValueLengthSafe((int) valuesReadInCurrentPass + pageReader.valuesReadyToRead, dataTypeLengthInBits)) { + return true; + } + boolean success = setSafe(valuesReadInCurrentPass + pageReader.valuesReadyToRead, pageReader.pageDataByteArray, + (int) pageReader.readyToReadPosInBytes + 4, dataTypeLengthInBits); + assert success; + return false; + } + + public void updateReadyToReadPosition() { + if (! currentValNull){ + pageReader.readyToReadPosInBytes += dataTypeLengthInBits + 4; + } + pageReader.valuesReadyToRead++; + currLengthDeterminingDictVal = null; + } + + public void updatePosition() { + if (! currentValNull){ + pageReader.readPosInBytes += dataTypeLengthInBits + 4; + bytesReadInCurrentPass += dataTypeLengthInBits; + } + currentValNull = false; + valuesReadInCurrentPass++; + } + + @Override + protected void readField(long recordsToRead) { + if (usingDictionary) { + currDictValToWrite = pageReader.dictionaryValueReader.readBytes(); + // re-purposing this field here for length in BYTES to prevent repetitive multiplication/division + } + dataTypeLengthInBits = variableWidthVector.getAccessor().getValueLength(valuesReadInCurrentPass); + currentValNull = variableWidthVector.getAccessor().getObject(valuesReadInCurrentPass) == null; + // again, I am re-purposing the unused field here, it is a length n BYTES, not bits + if (! currentValNull){ + boolean success = setSafe(valuesReadInCurrentPass, pageReader.pageDataByteArray, + (int) pageReader.readPosInBytes + 4, dataTypeLengthInBits); + assert success; + } + updatePosition(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b73c214/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java new file mode 100644 index 0000000..1d300bb --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java @@ -0,0 +1,225 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.parquet.columnreaders; + +import java.io.IOException; + +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.exec.store.parquet.ColumnDataReader; +import org.apache.drill.exec.store.parquet.ParquetFormatPlugin; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import parquet.bytes.BytesInput; +import parquet.column.Dictionary; +import parquet.column.ValuesType; +import parquet.column.page.DictionaryPage; +import parquet.column.page.Page; +import parquet.column.values.ValuesReader; +import parquet.column.values.dictionary.DictionaryValuesReader; +import parquet.format.PageHeader; +import parquet.format.PageType; +import parquet.format.Util; +import parquet.hadoop.metadata.ColumnChunkMetaData; +import parquet.schema.PrimitiveType; + +// class to keep track of the read position of variable length columns +final class PageReader { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PageReader.class); + + private final ColumnReader parentColumnReader; + private final ColumnDataReader dataReader; + // store references to the pages that have been uncompressed, but not copied to ValueVectors yet + Page currentPage; + // buffer to store bytes of current page + byte[] pageDataByteArray; + + // for variable length data we need to keep track of our current position in the page data + // as the values and lengths are intermixed, making random access to the length data impossible + long readyToReadPosInBytes; + // read position in the current page, stored in the ByteBuf in ParquetRecordReader called bufferWithAllData + long readPosInBytes; + // bit shift needed for the next page if the last one did not line up with a byte boundary + int bitShift; + // storage space for extra bits at the end of a page if they did not line up with a byte boundary + // prevents the need to keep the entire last page, as these pageDataByteArray need to be added to the next batch + //byte extraBits; + + // used for columns where the number of values that will fit in a vector is unknown + // currently used for variable length + // TODO - reuse this when compressed vectors are added, where fixed length values will take up a + // variable amount of space + // For example: if nulls are stored without extra space left in the data vector + // (this is currently simplifying random access to the data during processing, but increases the size of the vectors) + int valuesReadyToRead; + + // the number of values read out of the last page + int valuesRead; + int byteLength; + //int rowGroupIndex; + ValuesReader definitionLevels; + ValuesReader repetitionLevels; + ValuesReader valueReader; + ValuesReader dictionaryLengthDeterminingReader; + ValuesReader dictionaryValueReader; + Dictionary dictionary; + PageHeader pageHeader = null; + + PageReader(ColumnReader parentStatus, FileSystem fs, Path path, ColumnChunkMetaData columnChunkMetaData) throws ExecutionSetupException{ + this.parentColumnReader = parentStatus; + + long totalByteLength = columnChunkMetaData.getTotalUncompressedSize(); + long start = columnChunkMetaData.getFirstDataPageOffset(); + try { + FSDataInputStream f = fs.open(path); + this.dataReader = new ColumnDataReader(f, start, totalByteLength); + if (columnChunkMetaData.getDictionaryPageOffset() > 0) { + f.seek(columnChunkMetaData.getDictionaryPageOffset()); + PageHeader pageHeader = Util.readPageHeader(f); + assert pageHeader.type == PageType.DICTIONARY_PAGE; + BytesInput bytesIn = parentColumnReader.parentReader.getCodecFactoryExposer() + .decompress( // + dataReader.getPageAsBytesInput(pageHeader.compressed_page_size), // + pageHeader.getUncompressed_page_size(), // + parentColumnReader.columnChunkMetaData.getCodec()); + DictionaryPage page = new DictionaryPage( + bytesIn, + pageHeader.uncompressed_page_size, + pageHeader.dictionary_page_header.num_values, + parquet.column.Encoding.valueOf(pageHeader.dictionary_page_header.encoding.name()) + ); + this.dictionary = page.getEncoding().initDictionary(parentStatus.columnDescriptor, page); + } + } catch (IOException e) { + throw new ExecutionSetupException("Error opening or reading metatdata for parquet file at location: " + path.getName(), e); + } + + } + + + /** + * Grab the next page. + * + * @return - if another page was present + * @throws java.io.IOException + */ + public boolean next() throws IOException { + + currentPage = null; + valuesRead = 0; + valuesReadyToRead = 0; + + // TODO - the metatdata for total size appears to be incorrect for impala generated files, need to find cause + // and submit a bug report + if(!dataReader.hasRemainder() || parentColumnReader.totalValuesRead == parentColumnReader.columnChunkMetaData.getValueCount()) { + return false; + } + + // next, we need to decompress the bytes + // TODO - figure out if we need multiple dictionary pages, I believe it may be limited to one + // I think we are clobbering parts of the dictionary if there can be multiple pages of dictionary + do { + pageHeader = dataReader.readPageHeader(); + if (pageHeader.getType() == PageType.DICTIONARY_PAGE) { + BytesInput bytesIn = parentColumnReader.parentReader.getCodecFactoryExposer() + .decompress( // + dataReader.getPageAsBytesInput(pageHeader.compressed_page_size), // + pageHeader.getUncompressed_page_size(), // + parentColumnReader.columnChunkMetaData.getCodec()); + DictionaryPage page = new DictionaryPage( + bytesIn, + pageHeader.uncompressed_page_size, + pageHeader.dictionary_page_header.num_values, + parquet.column.Encoding.valueOf(pageHeader.dictionary_page_header.encoding.name()) + ); + this.dictionary = page.getEncoding().initDictionary(parentColumnReader.columnDescriptor, page); + } + } while (pageHeader.getType() == PageType.DICTIONARY_PAGE); + + BytesInput bytesIn = parentColumnReader.parentReader.getCodecFactoryExposer() + .decompress( // + dataReader.getPageAsBytesInput(pageHeader.compressed_page_size), // + pageHeader.getUncompressed_page_size(), // + parentColumnReader.columnChunkMetaData.getCodec()); + currentPage = new Page( + bytesIn, + pageHeader.data_page_header.num_values, + pageHeader.uncompressed_page_size, + ParquetFormatPlugin.parquetMetadataConverter.getEncoding(pageHeader.data_page_header.repetition_level_encoding), + ParquetFormatPlugin.parquetMetadataConverter.getEncoding(pageHeader.data_page_header.definition_level_encoding), + ParquetFormatPlugin.parquetMetadataConverter.getEncoding(pageHeader.data_page_header.encoding) + ); + + byteLength = pageHeader.uncompressed_page_size; + + if (currentPage == null) { + return false; + } + + pageDataByteArray = currentPage.getBytes().toByteArray(); + + readPosInBytes = 0; + if (parentColumnReader.getColumnDescriptor().getMaxRepetitionLevel() > 0) { + repetitionLevels = currentPage.getRlEncoding().getValuesReader(parentColumnReader.columnDescriptor, ValuesType.REPETITION_LEVEL); + repetitionLevels.initFromPage(currentPage.getValueCount(), pageDataByteArray, (int) readPosInBytes); + // we know that the first value will be a 0, at the end of each list of repeated values we will hit another 0 indicating + // a new record, although we don't know the length until we hit it (and this is a one way stream of integers) so we + // read the first zero here to simplify the reading processes, and start reading the first value the same as all + // of the rest. Effectively we are 'reading' the non-existent value in front of the first allowing direct access to + // the first list of repetition levels + readPosInBytes = repetitionLevels.getNextOffset(); + repetitionLevels.readInteger(); + } + if (parentColumnReader.columnDescriptor.getMaxDefinitionLevel() != 0){ + parentColumnReader.currDefLevel = -1; + if (!currentPage.getValueEncoding().usesDictionary()) { + parentColumnReader.usingDictionary = false; + definitionLevels = currentPage.getDlEncoding().getValuesReader(parentColumnReader.columnDescriptor, ValuesType.DEFINITION_LEVEL); + definitionLevels.initFromPage(currentPage.getValueCount(), pageDataByteArray, (int) readPosInBytes); + readPosInBytes = definitionLevels.getNextOffset(); + if (parentColumnReader.columnDescriptor.getType() == PrimitiveType.PrimitiveTypeName.BOOLEAN) { + valueReader = currentPage.getValueEncoding().getValuesReader(parentColumnReader.columnDescriptor, ValuesType.VALUES); + valueReader.initFromPage(currentPage.getValueCount(), pageDataByteArray, (int) readPosInBytes); + } + } else { + parentColumnReader.usingDictionary = true; + definitionLevels = currentPage.getDlEncoding().getValuesReader(parentColumnReader.columnDescriptor, ValuesType.DEFINITION_LEVEL); + definitionLevels.initFromPage(currentPage.getValueCount(), pageDataByteArray, (int) readPosInBytes); + readPosInBytes = definitionLevels.getNextOffset(); + // initialize two of the dictionary readers, one is for determining the lengths of each value, the second is for + // actually copying the values out into the vectors + dictionaryLengthDeterminingReader = new DictionaryValuesReader(dictionary); + dictionaryLengthDeterminingReader.initFromPage(currentPage.getValueCount(), pageDataByteArray, (int) readPosInBytes); + dictionaryValueReader = new DictionaryValuesReader(dictionary); + dictionaryValueReader.initFromPage(currentPage.getValueCount(), pageDataByteArray, (int) readPosInBytes); + this.parentColumnReader.usingDictionary = true; + } + } + // readPosInBytes is used for actually reading the values after we determine how many will fit in the vector + // readyToReadPosInBytes serves a similar purpose for the vector types where we must count up the values that will + // fit one record at a time, such as for variable length data. Both operations must start in the same location after the + // definition and repetition level data which is stored alongside the page data itself + readyToReadPosInBytes = readPosInBytes; + return true; + } + + public void clear(){ + this.dataReader.clear(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b73c214/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReader.java new file mode 100644 index 0000000..ad849b4 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReader.java @@ -0,0 +1,53 @@ +/******************************************************************************* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ******************************************************************************/ +package org.apache.drill.exec.store.parquet.columnreaders; + +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.exec.vector.BigIntVector; +import org.apache.drill.exec.vector.ValueVector; +import parquet.column.ColumnDescriptor; +import parquet.format.SchemaElement; +import parquet.hadoop.metadata.ColumnChunkMetaData; +import parquet.schema.PrimitiveType; + +public class ParquetFixedWidthDictionaryReader extends ColumnReader{ + + ParquetFixedWidthDictionaryReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, + ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, ValueVector v, + SchemaElement schemaElement) throws ExecutionSetupException { + super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); + } + + @Override + public void readField(long recordsToReadInThisPass) { + + recordsReadInThisIteration = Math.min(pageReader.currentPage.getValueCount() + - pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass); + int defLevel; + for (int i = 0; i < recordsReadInThisIteration; i++){ + defLevel = pageReader.definitionLevels.readInteger(); + // if the value is defined + if (defLevel == columnDescriptor.getMaxDefinitionLevel()){ + if (columnDescriptor.getType() == PrimitiveType.PrimitiveTypeName.INT64) + ((BigIntVector)valueVec).getMutator().set(i + valuesReadInCurrentPass, + pageReader.valueReader.readLong() ); + } + // otherwise the value is skipped, because the bit vector indicating nullability is zero filled + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5b73c214/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java new file mode 100644 index 0000000..2228787 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java @@ -0,0 +1,361 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.parquet.columnreaders; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +import org.apache.drill.common.exceptions.DrillRuntimeException; +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.types.TypeProtos; +import org.apache.drill.common.types.TypeProtos.DataMode; +import org.apache.drill.common.types.TypeProtos.MajorType; +import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.expr.TypeHelper; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.physical.impl.OutputMutator; +import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.store.RecordReader; +import org.apache.drill.exec.vector.ValueVector; +import org.apache.drill.exec.vector.RepeatedFixedWidthVector; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import parquet.column.ColumnDescriptor; +import parquet.format.ConvertedType; +import parquet.format.FileMetaData; +import parquet.format.SchemaElement; +import parquet.format.converter.ParquetMetadataConverter; +import parquet.hadoop.CodecFactoryExposer; +import parquet.hadoop.ParquetFileWriter; +import parquet.hadoop.metadata.ColumnChunkMetaData; +import parquet.hadoop.metadata.ParquetMetadata; +import parquet.schema.PrimitiveType; + +public class ParquetRecordReader implements RecordReader { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetRecordReader.class); + + // this value has been inflated to read in multiple value vectors at once, and then break them up into smaller vectors + private static final int NUMBER_OF_VECTORS = 1; + private static final long DEFAULT_BATCH_LENGTH = 256 * 1024 * NUMBER_OF_VECTORS; // 256kb + private static final long DEFAULT_BATCH_LENGTH_IN_BITS = DEFAULT_BATCH_LENGTH * 8; // 256kb + private static final char DEFAULT_RECORDS_TO_READ_IF_NOT_FIXED_WIDTH = 32*1024; + + // TODO - should probably find a smarter way to set this, currently 1 megabyte + private static final int VAR_LEN_FIELD_LENGTH = 1024 * 1024 * 1; + public static final int PARQUET_PAGE_MAX_SIZE = 1024 * 1024 * 1; + private static final String SEPERATOR = System.getProperty("file.separator"); + + // used for clearing the last n bits of a byte + public static final byte[] endBitMasks = {-2, -4, -8, -16, -32, -64, -128}; + // used for clearing the first n bits of a byte + public static final byte[] startBitMasks = {127, 63, 31, 15, 7, 3, 1}; + + private int bitWidthAllFixedFields; + private boolean allFieldsFixedLength; + private int recordsPerBatch; + private long totalRecords; + private long rowGroupOffset; + + private List<ColumnReader> columnStatuses; + private FileSystem fileSystem; + private long batchSize; + Path hadoopPath; + private VarLenBinaryReader varLengthReader; + private ParquetMetadata footer; + private List<SchemaPath> columns; + private final CodecFactoryExposer codecFactoryExposer; + int rowGroupIndex; + + public ParquetRecordReader(FragmentContext fragmentContext, // + String path, // + int rowGroupIndex, // + FileSystem fs, // + CodecFactoryExposer codecFactoryExposer, // + ParquetMetadata footer, // + List<SchemaPath> columns) throws ExecutionSetupException { + this(fragmentContext, DEFAULT_BATCH_LENGTH_IN_BITS, path, rowGroupIndex, fs, codecFactoryExposer, footer, + columns); + } + + public ParquetRecordReader(FragmentContext fragmentContext, long batchSize, + String path, int rowGroupIndex, FileSystem fs, + CodecFactoryExposer codecFactoryExposer, ParquetMetadata footer, + List<SchemaPath> columns) throws ExecutionSetupException { + hadoopPath = new Path(path); + fileSystem = fs; + this.codecFactoryExposer = codecFactoryExposer; + this.rowGroupIndex = rowGroupIndex; + this.batchSize = batchSize; + this.footer = footer; + this.columns = columns; + } + + public CodecFactoryExposer getCodecFactoryExposer() { + return codecFactoryExposer; + } + + public Path getHadoopPath() { + return hadoopPath; + } + + public FileSystem getFileSystem() { + return fileSystem; + } + + public int getRowGroupIndex() { + return rowGroupIndex; + } + + public int getBitWidthAllFixedFields() { + return bitWidthAllFixedFields; + } + + public long getBatchSize() { + return batchSize; + } + + /** + * @param type a fixed length type from the parquet library enum + * @return the length in pageDataByteArray of the type + */ + public static int getTypeLengthInBits(PrimitiveType.PrimitiveTypeName type) { + switch (type) { + case INT64: return 64; + case INT32: return 32; + case BOOLEAN: return 1; + case FLOAT: return 32; + case DOUBLE: return 64; + case INT96: return 96; + // binary and fixed length byte array + default: + throw new IllegalStateException("Length cannot be determined for type " + type); + } + } + + private boolean fieldSelected(MaterializedField field){ + // TODO - not sure if this is how we want to represent this + // for now it makes the existing tests pass, simply selecting + // all available data if no columns are provided + if (this.columns != null){ + for (SchemaPath expr : this.columns){ + if ( field.matches(expr)){ + return true; + } + } + return false; + } + return true; + } + + @Override + public void setup(OutputMutator output) throws ExecutionSetupException { + + columnStatuses = new ArrayList<>(); + totalRecords = footer.getBlocks().get(rowGroupIndex).getRowCount(); + List<ColumnDescriptor> columns = footer.getFileMetaData().getSchema().getColumns(); + allFieldsFixedLength = true; + ColumnDescriptor column; + ColumnChunkMetaData columnChunkMetaData; + int columnsToScan = 0; + + MaterializedField field; + ParquetMetadataConverter metaConverter = new ParquetMetadataConverter(); + FileMetaData fileMetaData; + + // TODO - figure out how to deal with this better once we add nested reading, note also look where this map is used below + // store a map from column name to converted types if they are non-null + HashMap<String, SchemaElement> schemaElements = new HashMap<>(); + fileMetaData = new ParquetMetadataConverter().toParquetMetadata(ParquetFileWriter.CURRENT_VERSION, footer); + for (SchemaElement se : fileMetaData.getSchema()) { + schemaElements.put(se.getName(), se); + } + + // loop to add up the length of the fixed width columns and build the schema + for (int i = 0; i < columns.size(); ++i) { + column = columns.get(i); + logger.debug("name: " + fileMetaData.getSchema().get(i).name); + SchemaElement se = schemaElements.get(column.getPath()[0]); + MajorType mt = ParquetToDrillTypeConverter.toMajorType(column.getType(), se.getType_length(), getDataMode(column), se); + field = MaterializedField.create(toFieldName(column.getPath()),mt); + if ( ! fieldSelected(field)){ + continue; + } + columnsToScan++; + // sum the lengths of all of the fixed length fields + if (column.getType() != PrimitiveType.PrimitiveTypeName.BINARY) { + if (column.getMaxRepetitionLevel() > 0) { + allFieldsFixedLength = false; + } + // There is not support for the fixed binary type yet in parquet, leaving a task here as a reminder + // TODO - implement this when the feature is added upstream + if (column.getType() == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY){ + bitWidthAllFixedFields += se.getType_length() * 8; + } else { + bitWidthAllFixedFields += getTypeLengthInBits(column.getType()); + } + } else { + allFieldsFixedLength = false; + } + } + rowGroupOffset = footer.getBlocks().get(rowGroupIndex).getColumns().get(0).getFirstDataPageOffset(); + + // none of the columns in the parquet file matched the request columns from the query + if (columnsToScan == 0){ + throw new ExecutionSetupException("Error reading from parquet file. No columns requested were found in the file."); + } + if (allFieldsFixedLength) { + recordsPerBatch = (int) Math.min(Math.min(batchSize / bitWidthAllFixedFields, + footer.getBlocks().get(0).getColumns().get(0).getValueCount()), 65535); + } + else { + recordsPerBatch = DEFAULT_RECORDS_TO_READ_IF_NOT_FIXED_WIDTH; + } + + try { + ValueVector v; + ConvertedType convertedType; + SchemaElement schemaElement; + ArrayList<VarLengthColumn> varLengthColumns = new ArrayList<>(); + // initialize all of the column read status objects + boolean fieldFixedLength = false; + for (int i = 0; i < columns.size(); ++i) { + column = columns.get(i); + columnChunkMetaData = footer.getBlocks().get(rowGroupIndex).getColumns().get(i); + schemaElement = schemaElements.get(column.getPath()[0]); + convertedType = schemaElement.getConverted_type(); + MajorType type = ParquetToDrillTypeConverter.toMajorType(column.getType(), schemaElement.getType_length(), getDataMode(column), schemaElement); + field = MaterializedField.create(toFieldName(column.getPath()), type); + // the field was not requested to be read + if ( ! fieldSelected(field)) continue; + + fieldFixedLength = column.getType() != PrimitiveType.PrimitiveTypeName.BINARY; + v = output.addField(field, (Class<? extends ValueVector>) TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode())); + if (column.getType() != PrimitiveType.PrimitiveTypeName.BINARY) { + if (column.getMaxRepetitionLevel() > 0) { + ColumnReader dataReader = ColumnReaderFactory.createFixedColumnReader(this, fieldFixedLength, column, columnChunkMetaData, recordsPerBatch, + ((RepeatedFixedWidthVector) v).getMutator().getDataVector(), schemaElement); + varLengthColumns.add(new FixedWidthRepeatedReader(this, dataReader, + getTypeLengthInBits(column.getType()), -1, column, columnChunkMetaData, false, v, schemaElement)); + } + else { + columnStatuses.add(ColumnReaderFactory.createFixedColumnReader(this, fieldFixedLength, column, columnChunkMetaData, recordsPerBatch, v, + schemaElement)); + } + } else { + // create a reader and add it to the appropriate list + varLengthColumns.add(ColumnReaderFactory.getReader(this, -1, column, columnChunkMetaData, false, v, schemaElement)); + } + } + varLengthReader = new VarLenBinaryReader(this, varLengthColumns); + } catch (SchemaChangeException e) { + throw new ExecutionSetupException(e); + } catch (Exception e) { + throw new ExecutionSetupException(e); + } + } + + private SchemaPath toFieldName(String[] paths) { + return SchemaPath.getCompoundPath(paths); + } + + private TypeProtos.DataMode getDataMode(ColumnDescriptor column) { + if (column.getMaxRepetitionLevel() > 0 ) { + return DataMode.REPEATED; + } else if (column.getMaxDefinitionLevel() == 0) { + return TypeProtos.DataMode.REQUIRED; + } else { + return TypeProtos.DataMode.OPTIONAL; + } + } + + private void resetBatch() { + for (ColumnReader column : columnStatuses) { + column.valuesReadInCurrentPass = 0; + } + for (VarLengthColumn r : varLengthReader.columns){ + r.valuesReadInCurrentPass = 0; + } + } + + public void readAllFixedFields(long recordsToRead) throws IOException { + + for (ColumnReader crs : columnStatuses){ + crs.processPages(recordsToRead); + } + } + + @Override + public int next() { + resetBatch(); + long recordsToRead = 0; + try { + ColumnReader firstColumnStatus; + if (columnStatuses.size() > 0){ + firstColumnStatus = columnStatuses.iterator().next(); + } + else{ + if (varLengthReader.columns.size() > 0){ + firstColumnStatus = varLengthReader.columns.iterator().next(); + } + else{ + firstColumnStatus = null; + } + } + // TODO - replace this with new functionality of returning batches even if no columns are selected + // the query 'select 5 from parquetfile' should return the number of records that the parquet file contains + // we don't need to read any of the data, we just need to fill batches with a record count and a useless vector with + // the right number of values + if (firstColumnStatus == null) throw new DrillRuntimeException("Unexpected error reading parquet file, not reading any columns"); + + if (allFieldsFixedLength) { + recordsToRead = Math.min(recordsPerBatch, firstColumnStatus.columnChunkMetaData.getValueCount() - firstColumnStatus.totalValuesRead); + } else { + recordsToRead = DEFAULT_RECORDS_TO_READ_IF_NOT_FIXED_WIDTH; + + } + + if (allFieldsFixedLength) { + readAllFixedFields(recordsToRead); + } else { // variable length columns + long fixedRecordsToRead = varLengthReader.readFields(recordsToRead, firstColumnStatus); + readAllFixedFields(fixedRecordsToRead); + } + + return firstColumnStatus.getRecordsReadInCurrentPass(); + } catch (IOException e) { + throw new DrillRuntimeException(e); + } + } + + @Override + public void cleanup() { + for (ColumnReader column : columnStatuses) { + column.clear(); + } + columnStatuses.clear(); + + for (VarLengthColumn r : varLengthReader.columns){ + r.clear(); + } + varLengthReader.columns.clear(); + } +}