This is an automated email from the ASF dual-hosted git repository.
volodymyr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new f056ea7 DRILL-7948: Unable to query file with required
fixed_len_byte_array decimal columns (#2254)
f056ea7 is described below
commit f056ea7962c6d5c3c6898f6bf338bd3ecbf68094
Author: Volodymyr Vysotskyi <[email protected]>
AuthorDate: Wed Jun 16 20:31:07 2021 +0300
DRILL-7948: Unable to query file with required fixed_len_byte_array decimal
columns (#2254)
---
.../parquet/columnreaders/AsyncPageReader.java | 24 ++-
.../parquet/columnreaders/ColumnReaderFactory.java | 225 +++++++++++----------
.../columnreaders/FixedByteAlignedReader.java | 32 +--
.../NullableFixedByteAlignedReaders.java | 5 +-
.../store/parquet/columnreaders/PageReader.java | 97 ++++++++-
.../ParquetFixedWidthDictionaryReaders.java | 34 +++-
.../parquet/ParquetSimpleTestFileGenerator.java | 30 ++-
.../store/parquet/TestParquetLogicalTypes.java | 39 +++-
.../SingleRow_RequiredFixedLength_Decimal.parquet | Bin 0 -> 916 bytes
.../resources/parquet/interval_dictionary.parquet | Bin 0 -> 396 bytes
.../parquet/nullable_interval_dictionary.parquet | Bin 0 -> 402 bytes
11 files changed, 312 insertions(+), 174 deletions(-)
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
index 8b5c926..ffb10da 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
@@ -441,12 +441,24 @@ class AsyncPageReader extends PageReader {
bytesRead = compressedSize;
synchronized (parent) {
- if (pageHeader.getType() == PageType.DICTIONARY_PAGE) {
- readStatus.setIsDictionaryPage(true);
- valuesRead +=
pageHeader.getDictionary_page_header().getNum_values();
- } else {
- valuesRead += pageHeader.getData_page_header().getNum_values();
- parent.totalPageValuesRead += valuesRead;
+ PageType type = pageHeader.getType() == null ? PageType.DATA_PAGE :
pageHeader.getType();
+ switch (type) {
+ case DICTIONARY_PAGE:
+ readStatus.setIsDictionaryPage(true);
+ valuesRead +=
pageHeader.getDictionary_page_header().getNum_values();
+ break;
+ case DATA_PAGE_V2:
+ valuesRead +=
pageHeader.getData_page_header_v2().getNum_values();
+ parent.totalPageValuesRead += valuesRead;
+ break;
+ case DATA_PAGE:
+ valuesRead += pageHeader.getData_page_header().getNum_values();
+ parent.totalPageValuesRead += valuesRead;
+ break;
+ default:
+ throw UserException.unsupportedError()
+ .message("Page type is not supported yet: " + type)
+ .build(logger);
}
long timeToRead = timer.elapsed(TimeUnit.NANOSECONDS);
readStatus.setPageHeader(pageHeader);
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
index e6bd9c3..9db336c 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
@@ -54,7 +54,6 @@ import org.apache.parquet.column.Encoding;
import org.apache.parquet.format.ConvertedType;
import org.apache.parquet.format.SchemaElement;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
-import org.apache.parquet.schema.PrimitiveType;
public class ColumnReaderFactory {
@@ -74,108 +73,7 @@ public class ColumnReaderFactory {
// if the column is required, or repeated (in which case we just want to
use this to generate our appropriate
// ColumnReader for actually transferring data into the data vector inside
of our repeated vector
if (descriptor.getMaxDefinitionLevel() == 0 ||
descriptor.getMaxRepetitionLevel() > 0) {
- if (columnChunkMetaData.getType() ==
PrimitiveType.PrimitiveTypeName.BOOLEAN) {
- return new BitReader(recordReader, descriptor, columnChunkMetaData,
- fixedLength, (BitVector) v, schemaElement);
- } else if
(!columnChunkMetaData.getEncodings().contains(Encoding.PLAIN_DICTIONARY) && (
- columnChunkMetaData.getType() ==
PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY
- || columnChunkMetaData.getType() ==
PrimitiveType.PrimitiveTypeName.INT96)) {
- if (convertedType == null) {
- return new FixedByteAlignedReader.FixedBinaryReader(recordReader,
descriptor,
- columnChunkMetaData, (VariableWidthVector) v, schemaElement);
- }
- switch (convertedType) {
- case DECIMAL:
- return new FixedByteAlignedReader.VarDecimalReader(recordReader,
descriptor,
- columnChunkMetaData, fixedLength, (VarDecimalVector) v,
schemaElement);
- case INTERVAL:
- return new FixedByteAlignedReader.IntervalReader(recordReader,
descriptor,
- columnChunkMetaData, fixedLength, (IntervalVector) v,
schemaElement);
- default:
- return new FixedByteAlignedReader.FixedBinaryReader(recordReader,
descriptor,
- columnChunkMetaData, (VariableWidthVector) v, schemaElement);
- }
- } else if (columnChunkMetaData.getType() ==
PrimitiveType.PrimitiveTypeName.INT32 && convertedType == ConvertedType.DATE){
- switch(recordReader.getDateCorruptionStatus()) {
- case META_SHOWS_CORRUPTION:
- return new FixedByteAlignedReader.CorruptDateReader(recordReader,
descriptor, columnChunkMetaData, fixedLength, (DateVector) v, schemaElement);
- case META_SHOWS_NO_CORRUPTION:
- return new FixedByteAlignedReader.DateReader(recordReader,
descriptor, columnChunkMetaData, fixedLength, (DateVector) v, schemaElement);
- case META_UNCLEAR_TEST_VALUES:
- return new
FixedByteAlignedReader.CorruptionDetectingDateReader(recordReader, descriptor,
columnChunkMetaData, fixedLength, (DateVector) v, schemaElement);
- default:
- throw new ExecutionSetupException(
- String.format("Issue setting up parquet reader for date type,
" +
- "unrecognized date corruption status %s. See
DRILL-4203 for more info.",
- recordReader.getDateCorruptionStatus()));
- }
- } else {
- if
(columnChunkMetaData.getEncodings().contains(Encoding.PLAIN_DICTIONARY)) {
- switch (columnChunkMetaData.getType()) {
- case INT32:
- if (convertedType == null) {
- return new
ParquetFixedWidthDictionaryReaders.DictionaryIntReader(recordReader,
descriptor, columnChunkMetaData, fixedLength, (IntVector) v, schemaElement);
- }
- switch (convertedType) {
- case DECIMAL:
- return new
ParquetFixedWidthDictionaryReaders.DictionaryVarDecimalReader(recordReader,
- descriptor, columnChunkMetaData, fixedLength,
(VarDecimalVector) v, schemaElement);
- case TIME_MILLIS:
- return new
ParquetFixedWidthDictionaryReaders.DictionaryTimeReader(recordReader,
descriptor, columnChunkMetaData, fixedLength, (TimeVector) v, schemaElement);
- case INT_8:
- case INT_16:
- case INT_32:
- return new
ParquetFixedWidthDictionaryReaders.DictionaryIntReader(recordReader,
descriptor, columnChunkMetaData, fixedLength, (IntVector) v, schemaElement);
- case UINT_8:
- case UINT_16:
- case UINT_32:
- return new
ParquetFixedWidthDictionaryReaders.DictionaryUInt4Reader(recordReader,
descriptor, columnChunkMetaData, fixedLength, (UInt4Vector) v, schemaElement);
- default:
- throw new ExecutionSetupException("Unsupported dictionary
converted type " + convertedType + " for primitive type INT32");
- }
- case INT64:
- if (convertedType == null) {
- return new
ParquetFixedWidthDictionaryReaders.DictionaryBigIntReader(recordReader,
descriptor, columnChunkMetaData, fixedLength, (BigIntVector) v, schemaElement);
- }
- switch (convertedType) {
- // DRILL-6670: handle TIMESTAMP_MICROS as INT64 with no
logical type
- case INT_64:
- case TIMESTAMP_MICROS:
- return new
ParquetFixedWidthDictionaryReaders.DictionaryBigIntReader(recordReader,
descriptor, columnChunkMetaData, fixedLength, (BigIntVector) v, schemaElement);
- case UINT_64:
- return new
ParquetFixedWidthDictionaryReaders.DictionaryUInt8Reader(recordReader,
descriptor, columnChunkMetaData, fixedLength, (UInt8Vector) v, schemaElement);
- case DECIMAL:
- return new
ParquetFixedWidthDictionaryReaders.DictionaryVarDecimalReader(recordReader,
- descriptor, columnChunkMetaData, fixedLength,
(VarDecimalVector) v, schemaElement);
- case TIMESTAMP_MILLIS:
- return new
ParquetFixedWidthDictionaryReaders.DictionaryTimeStampReader(recordReader,
descriptor, columnChunkMetaData, fixedLength, (TimeStampVector) v,
schemaElement);
- default:
- throw new ExecutionSetupException("Unsupported dictionary
converted type " + convertedType + " for primitive type INT64");
- }
- case FLOAT:
- return new
ParquetFixedWidthDictionaryReaders.DictionaryFloat4Reader(recordReader,
descriptor, columnChunkMetaData, fixedLength, (Float4Vector) v, schemaElement);
- case DOUBLE:
- return new
ParquetFixedWidthDictionaryReaders.DictionaryFloat8Reader(recordReader,
descriptor, columnChunkMetaData, fixedLength, (Float8Vector) v, schemaElement);
- case FIXED_LEN_BYTE_ARRAY:
- return new
ParquetFixedWidthDictionaryReaders.DictionaryFixedBinaryReader(recordReader,
descriptor, columnChunkMetaData, fixedLength, (VarBinaryVector) v,
schemaElement);
- case INT96:
- if
(recordReader.getFragmentContext().getOptions().getOption(ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP).bool_val)
{
- return new
ParquetFixedWidthDictionaryReaders.DictionaryBinaryAsTimeStampReader(recordReader,
descriptor, columnChunkMetaData, fixedLength, (TimeStampVector) v,
schemaElement);
- } else {
- return new
ParquetFixedWidthDictionaryReaders.DictionaryFixedBinaryReader(recordReader,
descriptor, columnChunkMetaData, fixedLength, (VarBinaryVector) v,
schemaElement);
- }
- default:
- throw new ExecutionSetupException("Unsupported dictionary column
type " + descriptor.getType().name() );
- }
-
- } else if (convertedType == ConvertedType.DECIMAL) {
- return new FixedByteAlignedReader.VarDecimalReader(recordReader,
- descriptor, columnChunkMetaData, fixedLength, (VarDecimalVector)
v, schemaElement);
- } else {
- return new FixedByteAlignedReader<>(recordReader, descriptor,
columnChunkMetaData,
- fixedLength, v, schemaElement);
- }
- }
+ return getColumnReader(recordReader, fixedLength, descriptor,
columnChunkMetaData, v, schemaElement, convertedType);
} else { // if the column is nullable
return getNullableColumnReader(recordReader, descriptor,
columnChunkMetaData, fixedLength, v, schemaElement);
@@ -222,6 +120,114 @@ public class ColumnReaderFactory {
}
}
+ private static ColumnReader<? extends ValueVector>
getColumnReader(ParquetRecordReader recordReader,
+ boolean fixedLength, ColumnDescriptor descriptor, ColumnChunkMetaData
columnChunkMetaData, ValueVector v,
+ SchemaElement schemaElement, ConvertedType convertedType) throws
ExecutionSetupException {
+ switch (columnChunkMetaData.getPrimitiveType().getPrimitiveTypeName()) {
+ case BOOLEAN:
+ return new BitReader(recordReader, descriptor, columnChunkMetaData,
+ fixedLength, (BitVector) v, schemaElement);
+ case INT32:
+ if (convertedType == null) {
+ return new
ParquetFixedWidthDictionaryReaders.DictionaryIntReader(recordReader, descriptor,
+ columnChunkMetaData, fixedLength, (IntVector) v, schemaElement);
+ }
+ switch (convertedType) {
+ case DATE:
+ switch(recordReader.getDateCorruptionStatus()) {
+ case META_SHOWS_CORRUPTION:
+ return new
FixedByteAlignedReader.CorruptDateReader(recordReader, descriptor,
+ columnChunkMetaData, fixedLength, (DateVector) v,
schemaElement);
+ case META_SHOWS_NO_CORRUPTION:
+ return new FixedByteAlignedReader.DateReader(recordReader,
descriptor, columnChunkMetaData,
+ fixedLength, (DateVector) v, schemaElement);
+ case META_UNCLEAR_TEST_VALUES:
+ return new
FixedByteAlignedReader.CorruptionDetectingDateReader(recordReader, descriptor,
+ columnChunkMetaData, fixedLength, (DateVector) v,
schemaElement);
+ default:
+ throw new ExecutionSetupException(
+ String.format("Issue setting up parquet reader for date
type, " +
+ "unrecognized date corruption status %s. See
DRILL-4203 for more info.",
+ recordReader.getDateCorruptionStatus()));
+ }
+ case DECIMAL:
+ return new
ParquetFixedWidthDictionaryReaders.DictionaryVarDecimalReader(recordReader,
+ descriptor, columnChunkMetaData, fixedLength,
(VarDecimalVector) v, schemaElement);
+ case TIME_MILLIS:
+ return new
ParquetFixedWidthDictionaryReaders.DictionaryTimeReader(recordReader,
descriptor,
+ columnChunkMetaData, fixedLength, (TimeVector) v,
schemaElement);
+ case INT_8:
+ case INT_16:
+ case INT_32:
+ return new
ParquetFixedWidthDictionaryReaders.DictionaryIntReader(recordReader, descriptor,
+ columnChunkMetaData, fixedLength, (IntVector) v,
schemaElement);
+ case UINT_8:
+ case UINT_16:
+ case UINT_32:
+ return new
ParquetFixedWidthDictionaryReaders.DictionaryUInt4Reader(recordReader,
descriptor,
+ columnChunkMetaData, fixedLength, (UInt4Vector) v,
schemaElement);
+ default:
+ throw new ExecutionSetupException("Unsupported dictionary
converted type " + convertedType + " for primitive type INT32");
+ }
+ case INT64:
+ if (convertedType == null) {
+ return new
ParquetFixedWidthDictionaryReaders.DictionaryBigIntReader(recordReader,
descriptor,
+ columnChunkMetaData, fixedLength, (BigIntVector) v,
schemaElement);
+ }
+ switch (convertedType) {
+ // DRILL-6670: handle TIMESTAMP_MICROS as INT64 with no logical type
+ case INT_64:
+ case TIMESTAMP_MICROS:
+ return new
ParquetFixedWidthDictionaryReaders.DictionaryBigIntReader(recordReader,
descriptor,
+ columnChunkMetaData, fixedLength, (BigIntVector) v,
schemaElement);
+ case UINT_64:
+ return new
ParquetFixedWidthDictionaryReaders.DictionaryUInt8Reader(recordReader,
descriptor,
+ columnChunkMetaData, fixedLength, (UInt8Vector) v,
schemaElement);
+ case DECIMAL:
+ return new
ParquetFixedWidthDictionaryReaders.DictionaryVarDecimalReader(recordReader,
+ descriptor, columnChunkMetaData, fixedLength,
(VarDecimalVector) v, schemaElement);
+ case TIMESTAMP_MILLIS:
+ return new
ParquetFixedWidthDictionaryReaders.DictionaryTimeStampReader(recordReader,
descriptor,
+ columnChunkMetaData, fixedLength, (TimeStampVector) v,
schemaElement);
+ default:
+ throw new ExecutionSetupException("Unsupported dictionary
converted type " + convertedType + " for primitive type INT64");
+ }
+ case FLOAT:
+ return new
ParquetFixedWidthDictionaryReaders.DictionaryFloat4Reader(recordReader,
descriptor,
+ columnChunkMetaData, fixedLength, (Float4Vector) v, schemaElement);
+ case DOUBLE:
+ return new
ParquetFixedWidthDictionaryReaders.DictionaryFloat8Reader(recordReader,
descriptor,
+ columnChunkMetaData, fixedLength, (Float8Vector) v, schemaElement);
+ case FIXED_LEN_BYTE_ARRAY:
+ if (convertedType != null) {
+ switch (convertedType) {
+ case DECIMAL:
+ return new
ParquetFixedWidthDictionaryReaders.DictionaryVarDecimalReader(recordReader,
+ descriptor, columnChunkMetaData, fixedLength,
(VarDecimalVector) v, schemaElement);
+ case INTERVAL:
+ return new FixedByteAlignedReader.IntervalReader(recordReader,
descriptor,
+ columnChunkMetaData, fixedLength, (IntervalVector) v,
schemaElement);
+ }
+ }
+ if
(columnChunkMetaData.getEncodings().contains(Encoding.PLAIN_DICTIONARY)) {
+ return new
ParquetFixedWidthDictionaryReaders.DictionaryFixedBinaryReader(recordReader,
+ descriptor, columnChunkMetaData, fixedLength, (VarBinaryVector)
v, schemaElement);
+ }
+ return new FixedByteAlignedReader.FixedBinaryReader(recordReader,
descriptor,
+ columnChunkMetaData, (VariableWidthVector) v, schemaElement);
+ case INT96:
+ if
(recordReader.getFragmentContext().getOptions().getOption(ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP).bool_val)
{
+ return new
ParquetFixedWidthDictionaryReaders.DictionaryBinaryAsTimeStampReader(recordReader,
+ descriptor, columnChunkMetaData, fixedLength, (TimeStampVector)
v, schemaElement);
+ } else {
+ return new
ParquetFixedWidthDictionaryReaders.DictionaryFixedBinaryReader(recordReader,
+ descriptor, columnChunkMetaData, fixedLength, (VarBinaryVector)
v, schemaElement);
+ }
+ default:
+ throw new ExecutionSetupException("Unsupported dictionary column type
" + descriptor.getPrimitiveType().getPrimitiveTypeName().name());
+ }
+ }
+
public static ColumnReader<?> getNullableColumnReader(ParquetRecordReader
parentReader,
ColumnDescriptor
columnDescriptor,
ColumnChunkMetaData columnChunkMetaData,
@@ -315,15 +321,14 @@ public class ColumnReaderFactory {
case INTERVAL:
return new
NullableFixedByteAlignedReaders.NullableIntervalReader(parentReader,
columnDescriptor,
columnChunkMetaData, fixedLength, (NullableIntervalVector)
valueVec, schemaElement);
- default:
- if
(!columnChunkMetaData.getEncodings().contains(Encoding.PLAIN_DICTIONARY)) {
- return new
NullableFixedByteAlignedReaders.NullableFixedByteAlignedReader<>(parentReader,columnDescriptor,
- columnChunkMetaData, fixedLength, valueVec, schemaElement);
- }
}
}
- return new
NullableFixedByteAlignedReaders.NullableFixedBinaryReader(parentReader,
columnDescriptor,
- columnChunkMetaData, fixedLength, (NullableVarBinaryVector)
valueVec, schemaElement);
+ if
(columnChunkMetaData.getEncodings().contains(Encoding.PLAIN_DICTIONARY)) {
+ return new
NullableFixedByteAlignedReaders.NullableFixedBinaryReader(parentReader,
columnDescriptor,
+ columnChunkMetaData, fixedLength, (NullableVarBinaryVector)
valueVec, schemaElement);
+ }
+ return new
NullableFixedByteAlignedReaders.NullableFixedByteAlignedReader<>(parentReader,columnDescriptor,
+ columnChunkMetaData, fixedLength, valueVec, schemaElement);
default:
throw new ExecutionSetupException("Unsupported nullable column type "
+ columnDescriptor.getPrimitiveType().getPrimitiveTypeName().name());
}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java
index 82711c2..7c04322 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java
@@ -17,14 +17,11 @@
*/
package org.apache.drill.exec.store.parquet.columnreaders;
-import org.apache.drill.shaded.guava.com.google.common.primitives.Ints;
-import org.apache.drill.shaded.guava.com.google.common.primitives.Longs;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
import org.apache.drill.exec.vector.DateVector;
import org.apache.drill.exec.vector.IntervalVector;
import org.apache.drill.exec.vector.ValueVector;
-import org.apache.drill.exec.vector.VarDecimalVector;
import org.apache.drill.exec.vector.VariableWidthVector;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.format.SchemaElement;
@@ -195,30 +192,6 @@ class FixedByteAlignedReader<V extends ValueVector>
extends ColumnReader<V> {
}
- public static class VarDecimalReader extends
ConvertedReader<VarDecimalVector> {
-
- VarDecimalReader(ParquetRecordReader parentReader, ColumnDescriptor
descriptor, ColumnChunkMetaData columnChunkMetaData,
- boolean fixedLength, VarDecimalVector v, SchemaElement schemaElement)
throws ExecutionSetupException {
- super(parentReader, descriptor, columnChunkMetaData, fixedLength, v,
schemaElement);
- }
-
- @Override
- void addNext(int start, int index) {
- switch (columnChunkMetaData.getType()) {
- case INT32:
- valueVec.getMutator().setSafe(index,
Ints.toByteArray(bytebuf.getInt(start)), 0, dataTypeLengthInBytes);
- break;
- case INT64:
- valueVec.getMutator().setSafe(index,
Longs.toByteArray(bytebuf.getLong(start)), 0, dataTypeLengthInBytes);
- break;
- case FIXED_LEN_BYTE_ARRAY:
- case BINARY:
- valueVec.getMutator().setSafe(index, start, start +
dataTypeLengthInBytes, bytebuf);
- break;
- }
- }
- }
-
public static class IntervalReader extends ConvertedReader<IntervalVector> {
IntervalReader(ParquetRecordReader parentReader, ColumnDescriptor
descriptor, ColumnChunkMetaData columnChunkMetaData,
boolean fixedLength, IntervalVector v, SchemaElement
schemaElement) throws ExecutionSetupException {
@@ -229,12 +202,13 @@ class FixedByteAlignedReader<V extends ValueVector>
extends ColumnReader<V> {
void addNext(int start, int index) {
if (usingDictionary) {
byte[] input = pageReader.dictionaryValueReader.readBytes().getBytes();
- valueVec.getMutator().setSafe(index * 12,
+ valueVec.getMutator().setSafe(index,
ParquetReaderUtility.getIntFromLEBytes(input, 0),
ParquetReaderUtility.getIntFromLEBytes(input, 4),
ParquetReaderUtility.getIntFromLEBytes(input, 8));
+ } else {
+ valueVec.getMutator().setSafe(index, bytebuf.getInt(start),
bytebuf.getInt(start + 4), bytebuf.getInt(start + 8));
}
- valueVec.getMutator().setSafe(index, bytebuf.getInt(start),
bytebuf.getInt(start + 4), bytebuf.getInt(start + 8));
}
}
}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
index 389962f..14d665c 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
@@ -504,12 +504,13 @@ public class NullableFixedByteAlignedReaders {
void addNext(int start, int index) {
if (usingDictionary) {
byte[] input = pageReader.dictionaryValueReader.readBytes().getBytes();
- valueVec.getMutator().setSafe(index * 12, 1,
+ valueVec.getMutator().setSafe(index, 1,
ParquetReaderUtility.getIntFromLEBytes(input, 0),
ParquetReaderUtility.getIntFromLEBytes(input, 4),
ParquetReaderUtility.getIntFromLEBytes(input, 8));
+ } else {
+ valueVec.getMutator().set(index, 1, bytebuf.getInt(start),
bytebuf.getInt(start + 4), bytebuf.getInt(start + 8));
}
- valueVec.getMutator().set(index, 1, bytebuf.getInt(start),
bytebuf.getInt(start + 4), bytebuf.getInt(start + 8));
}
}
}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
index b50f6b3..79c272a 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
@@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.store.parquet.columnreaders;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
import io.netty.buffer.ByteBufUtil;
@@ -39,6 +40,8 @@ import org.apache.parquet.column.ValuesType;
import org.apache.parquet.column.page.DictionaryPage;
import org.apache.parquet.column.values.ValuesReader;
import org.apache.parquet.column.values.dictionary.DictionaryValuesReader;
+import org.apache.parquet.format.DataPageHeader;
+import org.apache.parquet.format.DataPageHeaderV2;
import org.apache.parquet.format.PageHeader;
import org.apache.parquet.format.PageType;
import org.apache.parquet.format.Util;
@@ -313,11 +316,12 @@ class PageReader {
}
timer.start();
- currentPageCount = pageHeader.data_page_header.num_values;
+ PageHeaderInfoProvider pageHeaderInfoProvider =
pageHeaderInfoProviderBuilder(pageHeader);
+ currentPageCount = pageHeaderInfoProvider.getNumValues();
- final Encoding rlEncoding =
METADATA_CONVERTER.getEncoding(pageHeader.data_page_header.repetition_level_encoding);
- final Encoding dlEncoding =
METADATA_CONVERTER.getEncoding(pageHeader.data_page_header.definition_level_encoding);
- final Encoding valueEncoding =
METADATA_CONVERTER.getEncoding(pageHeader.data_page_header.encoding);
+ final Encoding rlEncoding =
METADATA_CONVERTER.getEncoding(pageHeaderInfoProvider.getRepetitionLevelEncoding());
+ final Encoding dlEncoding =
METADATA_CONVERTER.getEncoding(pageHeaderInfoProvider.getDefinitionLevelEncoding());
+ final Encoding valueEncoding =
METADATA_CONVERTER.getEncoding(pageHeaderInfoProvider.getEncoding());
byteLength = pageHeader.uncompressed_page_size;
@@ -453,8 +457,9 @@ class PageReader {
Preconditions.checkState(parentColumnReader.columnDescriptor.getMaxDefinitionLevel()
== 1);
Preconditions.checkState(currentPageCount > 0);
- final Encoding rlEncoding =
METADATA_CONVERTER.getEncoding(pageHeader.data_page_header.repetition_level_encoding);
- final Encoding dlEncoding =
METADATA_CONVERTER.getEncoding(pageHeader.data_page_header.definition_level_encoding);
+ PageHeaderInfoProvider pageHeaderInfoProvider =
pageHeaderInfoProviderBuilder(pageHeader);
+ final Encoding rlEncoding =
METADATA_CONVERTER.getEncoding(pageHeaderInfoProvider.getRepetitionLevelEncoding());
+ final Encoding dlEncoding =
METADATA_CONVERTER.getEncoding(pageHeaderInfoProvider.getDefinitionLevelEncoding());
final ByteBufferInputStream in =
ByteBufferInputStream.wrap(pageData.nioBuffer(0, pageData.capacity()));
@@ -475,4 +480,84 @@ class PageReader {
definitionLevels.skip();
}
}
+
+ /**
+ * Common interface for wrappers of {@link DataPageHeader} and {@link
DataPageHeaderV2} classes.
+ */
+ private interface PageHeaderInfoProvider {
+ int getNumValues();
+
+ org.apache.parquet.format.Encoding getEncoding();
+
+ org.apache.parquet.format.Encoding getDefinitionLevelEncoding();
+
+ org.apache.parquet.format.Encoding getRepetitionLevelEncoding();
+ }
+
+ private static class DataPageHeaderV1InfoProvider implements
PageHeaderInfoProvider {
+ private final DataPageHeader dataPageHeader;
+
+ private DataPageHeaderV1InfoProvider(DataPageHeader dataPageHeader) {
+ this.dataPageHeader = dataPageHeader;
+ }
+
+ @Override
+ public int getNumValues() {
+ return dataPageHeader.getNum_values();
+ }
+
+ @Override
+ public org.apache.parquet.format.Encoding getEncoding() {
+ return dataPageHeader.getEncoding();
+ }
+
+ @Override
+ public org.apache.parquet.format.Encoding getDefinitionLevelEncoding() {
+ return dataPageHeader.getDefinition_level_encoding();
+ }
+
+ @Override
+ public org.apache.parquet.format.Encoding getRepetitionLevelEncoding() {
+ return dataPageHeader.getRepetition_level_encoding();
+ }
+ }
+
+ private static class DataPageHeaderV2InfoProvider implements
PageHeaderInfoProvider {
+ private final DataPageHeaderV2 dataPageHeader;
+
+ private DataPageHeaderV2InfoProvider(DataPageHeaderV2 dataPageHeader) {
+ this.dataPageHeader = dataPageHeader;
+ }
+
+ @Override
+ public int getNumValues() {
+ return dataPageHeader.getNum_values();
+ }
+
+ @Override
+ public org.apache.parquet.format.Encoding getEncoding() {
+ return dataPageHeader.getEncoding();
+ }
+
+ @Override
+ public org.apache.parquet.format.Encoding getDefinitionLevelEncoding() {
+ return org.apache.parquet.format.Encoding.PLAIN;
+ }
+
+ @Override
+ public org.apache.parquet.format.Encoding getRepetitionLevelEncoding() {
+ return org.apache.parquet.format.Encoding.PLAIN;
+ }
+ }
+
+ private static PageHeaderInfoProvider
pageHeaderInfoProviderBuilder(PageHeader pageHeader) {
+ switch (pageHeader.getType()) {
+ case DATA_PAGE:
+ return new
DataPageHeaderV1InfoProvider(pageHeader.getData_page_header());
+ case DATA_PAGE_V2:
+ return new
DataPageHeaderV2InfoProvider(pageHeader.getData_page_header_v2());
+ default:
+ throw new DrillRuntimeException("Unsupported page header type:" +
pageHeader.getType());
+ }
+ }
}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java
index 9e019ee..4c25b1a 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java
@@ -240,11 +240,12 @@ public class ParquetFixedWidthDictionaryReaders {
// this method is called by its superclass during a read loop
@Override
protected void readField(long recordsToReadInThisPass) {
+ int dataTypeLengthInBytes = (int) Math.ceil(dataTypeLengthInBits / 8.0);
recordsReadInThisIteration =
Math.min(pageReader.currentPageCount - pageReader.valuesRead,
recordsToReadInThisPass - valuesReadInCurrentPass);
- switch (columnDescriptor.getType()) {
+ switch (columnDescriptor.getPrimitiveType().getPrimitiveTypeName()) {
case INT32:
if (usingDictionary) {
for (int i = 0; i < recordsReadInThisIteration; i++) {
@@ -253,7 +254,10 @@ public class ParquetFixedWidthDictionaryReaders {
}
setWriteIndex();
} else {
- super.readField(recordsToReadInThisPass);
+ for (int i = 0; i < recordsReadInThisIteration; i++) {
+ byte[] bytes = Ints.toByteArray(pageReader.pageData.getInt((int)
readStartInBytes + i * dataTypeLengthInBytes));
+ valueVec.getMutator().setSafe(valuesReadInCurrentPass + i,
bytes, 0, dataTypeLengthInBytes);
+ }
}
break;
case INT64:
@@ -264,9 +268,33 @@ public class ParquetFixedWidthDictionaryReaders {
}
setWriteIndex();
} else {
- super.readField(recordsToReadInThisPass);
+ for (int i = 0; i < recordsReadInThisIteration; i++) {
+ byte[] bytes =
Longs.toByteArray(pageReader.pageData.getLong((int) readStartInBytes + i *
dataTypeLengthInBytes));
+ valueVec.getMutator().setSafe(valuesReadInCurrentPass + i,
bytes, 0, dataTypeLengthInBytes);
+ }
}
break;
+ case FIXED_LEN_BYTE_ARRAY:
+ case BINARY:
+ if (usingDictionary) {
+ VarDecimalVector.Mutator mutator = valueVec.getMutator();
+ for (int i = 0; i < recordsReadInThisIteration; i++) {
+ Binary currDictValToWrite =
pageReader.dictionaryValueReader.readBytes();
+ mutator.setSafe(valuesReadInCurrentPass + i,
+ currDictValToWrite.toByteBuffer().slice(), 0,
currDictValToWrite.length());
+ }
+ // Set the write Index. The next page that gets read might be a
page that does not use dictionary encoding
+ // and we will go into the else condition below. The readField
method of the parent class requires the
+ // writer index to be set correctly.
+ int writerIndex = valueVec.getBuffer().writerIndex();
+ valueVec.getBuffer().setIndex(0, writerIndex + (int) readLength);
+ } else {
+ for (int i = 0; i < recordsReadInThisIteration; i++) {
+ int start = (int) readStartInBytes + i * dataTypeLengthInBytes;
+ valueVec.getMutator().setSafe(valuesReadInCurrentPass + i,
+ start, start + dataTypeLengthInBytes, pageReader.pageData);
+ }
+ }
}
}
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetSimpleTestFileGenerator.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetSimpleTestFileGenerator.java
index d4a6fcb..232aec9 100644
---
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetSimpleTestFileGenerator.java
+++
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetSimpleTestFileGenerator.java
@@ -25,6 +25,7 @@ import org.apache.parquet.example.data.GroupFactory;
import org.apache.parquet.example.data.simple.SimpleGroupFactory;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.example.ExampleParquetWriter;
import org.apache.parquet.hadoop.example.GroupWriteSupport;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.io.api.Binary;
@@ -46,8 +47,7 @@ import static
org.apache.drill.exec.store.parquet.ParquetSimpleTestFileGenerator
* that are supported by Drill. Embedded types specified in the Parquet
specification are not covered by the
* examples but can be added.
* To create a new parquet file, define a schema, create a GroupWriter based
on the schema, then add values
- * for individual records to the GroupWriter.<br>
- * TODO: DRILL-7904. To run this tool please use 28.2-jre <guava.version>
instead of 19.0 in main POM file
+ * for individual records to the GroupWriter.
* @see org.apache.drill.exec.store.parquet.TestFileGenerator
TestFileGenerator
* @see org.apache.parquet.hadoop.example.GroupWriteSupport GroupWriteSupport
* @see org.apache.parquet.example.Paper Dremel Example
@@ -55,7 +55,7 @@ import static
org.apache.drill.exec.store.parquet.ParquetSimpleTestFileGenerator
public class ParquetSimpleTestFileGenerator {
public enum EnumType {
- RANDOM_VALUE, MAX_VALUE, MIN_VALUE;
+ RANDOM_VALUE, MAX_VALUE, MIN_VALUE
}
public static Path root = new Path("file:/tmp/parquet/");
@@ -221,20 +221,16 @@ public class ParquetSimpleTestFileGenerator {
GroupWriteSupport.setSchema(schema, conf);
- ParquetWriter<Group> writer =
- new ParquetWriter<Group>(initFile(fileName),
- ParquetFileWriter.Mode.OVERWRITE,
- new GroupWriteSupport(),
- CompressionCodecName.SNAPPY,
- 1024,
- 1024,
- 512,
- dictEncoding, // enable dictionary encoding,
- false,
- ParquetProperties.WriterVersion.PARQUET_1_0, conf
- );
-
- return writer;
+ return ExampleParquetWriter.builder(initFile(fileName))
+ .withDictionaryEncoding(dictEncoding)
+ .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
+ .withCompressionCodec(CompressionCodecName.SNAPPY)
+ .withPageSize(1024)
+ .withDictionaryPageSize(512)
+ .withValidation(false)
+ .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0)
+ .withConf(conf)
+ .build();
}
public static void writeComplexValues(GroupFactory gf, ParquetWriter<Group>
complexWriter, boolean writeNulls) throws IOException {
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetLogicalTypes.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetLogicalTypes.java
index 5b7e891..efe3f08 100644
---
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetLogicalTypes.java
+++
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetLogicalTypes.java
@@ -28,6 +28,7 @@ import java.util.Arrays;
import org.apache.drill.categories.ParquetTest;
import org.apache.drill.categories.UnlikelyTest;
+import org.apache.drill.exec.ExecConstants;
import org.apache.drill.test.BaseTestQuery;
import org.joda.time.Period;
import org.junit.Test;
@@ -700,7 +701,7 @@ public class TestParquetLogicalTypes extends BaseTestQuery {
}
@Test
- public void testDecimalDictionaryEncoding() throws Exception {
+ public void testNullableDecimalDictionaryEncoding() throws Exception {
testBuilder()
.sqlQuery("select RegHrs from cp.`parquet/dict_dec.parquet`")
.ordered()
@@ -708,4 +709,40 @@ public class TestParquetLogicalTypes extends BaseTestQuery
{
.baselineValues(new BigDecimal("8.000000"))
.go();
}
+
+ @Test
+ public void testRequiredDecimalDictionaryEncoding() throws Exception {
+ testBuilder()
+ .sqlQuery("select * from
cp.`parquet/SingleRow_RequiredFixedLength_Decimal.parquet`")
+ .ordered()
+ .baselineColumns("Cost", "Sale")
+ .baselineValues(new BigDecimal("550.000000"), new
BigDecimal("1050.000000"))
+ .go();
+ }
+
+ @Test
+ public void testRequiredIntervalDictionaryEncoding() throws Exception {
+ testBuilder()
+ .sqlQuery("select * from cp.`parquet/interval_dictionary.parquet`")
+ .unOrdered()
+ .baselineColumns("_INTERVAL_fixed_len_byte_array_12")
+
.baselineValues(Period.months(875770417).plusDays(943142453).plusMillis(1650536505))
+
.baselineValues(Period.months(16843009).plusDays(16843009).plusMillis(16843009))
+ .baselineValues(Period.seconds(0))
+ .go();
+ }
+
+ @Test
+ public void testNullableIntervalDictionaryEncoding() throws Exception {
+ alterSession(ExecConstants.PARQUET_NEW_RECORD_READER, true);
+ testBuilder()
+ .sqlQuery("select * from
cp.`parquet/nullable_interval_dictionary.parquet`")
+ .unOrdered()
+ .baselineColumns("_INTERVAL_fixed_len_byte_array_12")
+
.baselineValues(Period.months(875770417).plusDays(943142453).plusMillis(1650536505))
+
.baselineValues(Period.months(16843009).plusDays(16843009).plusMillis(16843009))
+ .baselineValues(Period.seconds(0))
+ .baselineValues((Object) null)
+ .go();
+ }
}
diff --git
a/exec/java-exec/src/test/resources/parquet/SingleRow_RequiredFixedLength_Decimal.parquet
b/exec/java-exec/src/test/resources/parquet/SingleRow_RequiredFixedLength_Decimal.parquet
new file mode 100644
index 0000000..2faac7a
Binary files /dev/null and
b/exec/java-exec/src/test/resources/parquet/SingleRow_RequiredFixedLength_Decimal.parquet
differ
diff --git
a/exec/java-exec/src/test/resources/parquet/interval_dictionary.parquet
b/exec/java-exec/src/test/resources/parquet/interval_dictionary.parquet
new file mode 100644
index 0000000..202314e
Binary files /dev/null and
b/exec/java-exec/src/test/resources/parquet/interval_dictionary.parquet differ
diff --git
a/exec/java-exec/src/test/resources/parquet/nullable_interval_dictionary.parquet
b/exec/java-exec/src/test/resources/parquet/nullable_interval_dictionary.parquet
new file mode 100644
index 0000000..8bd72c3
Binary files /dev/null and
b/exec/java-exec/src/test/resources/parquet/nullable_interval_dictionary.parquet
differ