This is an automated email from the ASF dual-hosted git repository.
cgivre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new 2c0e718 DRILL-7919: Fix reading parquet with decimal dictionary
encoding (#2232)
2c0e718 is described below
commit 2c0e718e42dd332f54e4e626afe097855a89691a
Author: Volodymyr Vysotskyi <[email protected]>
AuthorDate: Fri May 28 17:14:36 2021 +0300
DRILL-7919: Fix reading parquet with decimal dictionary encoding (#2232)
---
.../parquet/columnreaders/ColumnReaderFactory.java | 187 ++++++++++-----------
.../NullableFixedByteAlignedReaders.java | 46 +++--
.../store/parquet/TestParquetLogicalTypes.java | 11 ++
.../src/test/resources/parquet/dict_dec.parquet | Bin 0 -> 826 bytes
4 files changed, 120 insertions(+), 124 deletions(-)
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
index 25c9904..e6bd9c3 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
@@ -177,38 +177,8 @@ public class ColumnReaderFactory {
}
}
} else { // if the column is nullable
- if (columnChunkMetaData.getType() ==
PrimitiveType.PrimitiveTypeName.BOOLEAN) {
- return new NullableBitReader(recordReader, descriptor,
columnChunkMetaData,
- fixedLength, (NullableBitVector) v, schemaElement);
- } else if (columnChunkMetaData.getType() ==
PrimitiveType.PrimitiveTypeName.INT32 && convertedType == ConvertedType.DATE) {
- switch(recordReader.getDateCorruptionStatus()) {
- case META_SHOWS_CORRUPTION:
- return new
NullableFixedByteAlignedReaders.NullableCorruptDateReader(recordReader,
descriptor, columnChunkMetaData, fixedLength, (NullableDateVector)v,
schemaElement);
- case META_SHOWS_NO_CORRUPTION:
- return new
NullableFixedByteAlignedReaders.NullableDateReader(recordReader, descriptor,
columnChunkMetaData, fixedLength, (NullableDateVector) v, schemaElement);
- case META_UNCLEAR_TEST_VALUES:
- return new
NullableFixedByteAlignedReaders.CorruptionDetectingNullableDateReader(recordReader,
descriptor, columnChunkMetaData, fixedLength, (NullableDateVector) v,
schemaElement);
- default:
- throw new ExecutionSetupException(
- String.format("Issue setting up parquet reader for date type,
" +
- "unrecognized date corruption status %s. See
DRILL-4203 for more info.",
- recordReader.getDateCorruptionStatus()));
- }
- } else if (columnChunkMetaData.getType() ==
PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) {
- if (convertedType == ConvertedType.DECIMAL) {
- return new
NullableFixedByteAlignedReaders.NullableVarDecimalReader(recordReader,
- descriptor, columnChunkMetaData, fixedLength,
(NullableVarDecimalVector) v, schemaElement);
- } else if (convertedType == ConvertedType.INTERVAL) {
- return new
NullableFixedByteAlignedReaders.NullableIntervalReader(recordReader, descriptor,
- columnChunkMetaData, fixedLength, (NullableIntervalVector) v,
schemaElement);
- } else {
- return new
NullableFixedByteAlignedReaders.NullableFixedBinaryReader(recordReader,
descriptor,
- columnChunkMetaData, fixedLength, (NullableVarBinaryVector)
v, schemaElement);
- }
- } else {
- return getNullableColumnReader(recordReader, descriptor,
- columnChunkMetaData, fixedLength, v, schemaElement);
- }
+ return getNullableColumnReader(recordReader, descriptor,
+ columnChunkMetaData, fixedLength, v, schemaElement);
}
}
@@ -252,7 +222,7 @@ public class ColumnReaderFactory {
}
}
- public static NullableColumnReader<?>
getNullableColumnReader(ParquetRecordReader parentReader,
+ public static ColumnReader<?> getNullableColumnReader(ParquetRecordReader
parentReader,
ColumnDescriptor
columnDescriptor,
ColumnChunkMetaData columnChunkMetaData,
boolean
fixedLength,
@@ -260,83 +230,102 @@ public class ColumnReaderFactory {
SchemaElement
schemaElement) throws ExecutionSetupException {
ConvertedType convertedType = schemaElement.getConverted_type();
- if (!
columnChunkMetaData.getEncodings().contains(Encoding.PLAIN_DICTIONARY)) {
- if (columnDescriptor.getType() == PrimitiveType.PrimitiveTypeName.INT96)
{
- // TODO: check convertedType once parquet support TIMESTAMP_NANOS
type annotation.
+ switch (columnDescriptor.getPrimitiveType().getPrimitiveTypeName()) {
+ case BOOLEAN:
+ return new NullableBitReader(parentReader, columnDescriptor,
columnChunkMetaData,
+ fixedLength, (NullableBitVector) valueVec, schemaElement);
+ case INT32:
+ if (convertedType == null) {
+ return new
NullableFixedByteAlignedReaders.NullableDictionaryIntReader(parentReader,
columnDescriptor, columnChunkMetaData, fixedLength, (NullableIntVector)
valueVec, schemaElement);
+ }
+ switch (convertedType) {
+ case INT_8:
+ case INT_16:
+ case INT_32:
+ return new
NullableFixedByteAlignedReaders.NullableDictionaryIntReader(parentReader,
+ columnDescriptor, columnChunkMetaData, fixedLength,
(NullableIntVector) valueVec, schemaElement);
+ case UINT_8:
+ case UINT_16:
+ case UINT_32:
+ return new
NullableFixedByteAlignedReaders.NullableDictionaryUInt4Reader(parentReader,
+ columnDescriptor, columnChunkMetaData, fixedLength,
(NullableUInt4Vector) valueVec, schemaElement);
+ case DECIMAL:
+ return new
NullableFixedByteAlignedReaders.NullableDictionaryVarDecimalReader(parentReader,
+ columnDescriptor, columnChunkMetaData, fixedLength,
(NullableVarDecimalVector) valueVec, schemaElement);
+ case TIME_MILLIS:
+ return new
NullableFixedByteAlignedReaders.NullableDictionaryTimeReader(parentReader,
columnDescriptor, columnChunkMetaData, fixedLength, (NullableTimeVector)
valueVec, schemaElement);
+ case DATE:
+ switch (parentReader.getDateCorruptionStatus()) {
+ case META_SHOWS_CORRUPTION:
+ return new
NullableFixedByteAlignedReaders.NullableCorruptDateReader(parentReader,
+ columnDescriptor, columnChunkMetaData, fixedLength,
(NullableDateVector) valueVec, schemaElement);
+ case META_SHOWS_NO_CORRUPTION:
+ return new
NullableFixedByteAlignedReaders.NullableDateReader(parentReader,
+ columnDescriptor, columnChunkMetaData, fixedLength,
(NullableDateVector) valueVec, schemaElement);
+ case META_UNCLEAR_TEST_VALUES:
+ return new
NullableFixedByteAlignedReaders.CorruptionDetectingNullableDateReader(parentReader,
+ columnDescriptor, columnChunkMetaData, fixedLength,
(NullableDateVector) valueVec, schemaElement);
+ default:
+ throw new ExecutionSetupException(
+ String.format("Issue setting up parquet reader for date
type, " +
+ "unrecognized date corruption status %s. See
DRILL-4203 for more info.",
+ parentReader.getDateCorruptionStatus()));
+ }
+ default:
+ throw new ExecutionSetupException("Unsupported nullable converted
type " + convertedType + " for primitive type INT32");
+ }
+ case INT64:
+ if (convertedType == null) {
+ return new
NullableFixedByteAlignedReaders.NullableDictionaryBigIntReader(parentReader,
columnDescriptor, columnChunkMetaData, fixedLength,
(NullableBigIntVector)valueVec, schemaElement);
+ }
+ switch (convertedType) {
+ case UINT_64:
+ return new
NullableFixedByteAlignedReaders.NullableDictionaryUInt8Reader(parentReader,
columnDescriptor,
+ columnChunkMetaData, fixedLength, (NullableUInt8Vector)
valueVec, schemaElement);
+ case DECIMAL:
+ return new
NullableFixedByteAlignedReaders.NullableDictionaryVarDecimalReader(parentReader,
+ columnDescriptor, columnChunkMetaData, fixedLength,
(NullableVarDecimalVector) valueVec, schemaElement);
+ case TIMESTAMP_MILLIS:
+ return new
NullableFixedByteAlignedReaders.NullableDictionaryTimeStampReader(parentReader,
columnDescriptor, columnChunkMetaData, fixedLength,
(NullableTimeStampVector)valueVec, schemaElement);
+ // DRILL-6670: handle TIMESTAMP_MICROS as INT64 with no logical type
+ case TIMESTAMP_MICROS:
+ case INT_64:
+ return new
NullableFixedByteAlignedReaders.NullableDictionaryBigIntReader(parentReader,
+ columnDescriptor, columnChunkMetaData, fixedLength,
(NullableBigIntVector) valueVec, schemaElement);
+ default:
+ throw new ExecutionSetupException("Unsupported nullable converted
type " + convertedType + " for primitive type INT64");
+ }
+ case INT96:
+ // TODO: check convertedType once parquet support TIMESTAMP_NANOS type
annotation.
if
(parentReader.getFragmentContext().getOptions().getOption(ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP).bool_val)
{
return new
NullableFixedByteAlignedReaders.NullableFixedBinaryAsTimeStampReader(parentReader,
columnDescriptor, columnChunkMetaData, true, (NullableTimeStampVector)
valueVec, schemaElement);
} else {
return new
NullableFixedByteAlignedReaders.NullableFixedBinaryReader(parentReader,
columnDescriptor, columnChunkMetaData, true, (NullableVarBinaryVector)
valueVec, schemaElement);
}
- } else if (convertedType == ConvertedType.DECIMAL) {
- // NullableVarDecimalVector allows storing of values with different
width,
- // so every time when the value is added, offset vector should be
updated.
- // Therefore NullableVarDecimalReader is used here instead of
NullableFixedByteAlignedReader.
- return new
NullableFixedByteAlignedReaders.NullableVarDecimalReader(parentReader,
- columnDescriptor, columnChunkMetaData, fixedLength,
(NullableVarDecimalVector) valueVec, schemaElement);
- } else {
- return new
NullableFixedByteAlignedReaders.NullableFixedByteAlignedReader<>(parentReader,
columnDescriptor, columnChunkMetaData, fixedLength, valueVec, schemaElement);
- }
- } else {
- switch (columnDescriptor.getType()) {
- case INT32:
- if (convertedType == null) {
- return new
NullableFixedByteAlignedReaders.NullableDictionaryIntReader(parentReader,
columnDescriptor, columnChunkMetaData, fixedLength, (NullableIntVector)
valueVec, schemaElement);
- }
+ case FLOAT:
+ return new
NullableFixedByteAlignedReaders.NullableDictionaryFloat4Reader(parentReader,
columnDescriptor, columnChunkMetaData, fixedLength,
(NullableFloat4Vector)valueVec, schemaElement);
+ case DOUBLE:
+ return new
NullableFixedByteAlignedReaders.NullableDictionaryFloat8Reader(parentReader,
columnDescriptor, columnChunkMetaData, fixedLength,
(NullableFloat8Vector)valueVec, schemaElement);
+ case FIXED_LEN_BYTE_ARRAY:
+ if (convertedType != null) {
switch (convertedType) {
- case INT_8:
- case INT_16:
- case INT_32:
- return new
NullableFixedByteAlignedReaders.NullableDictionaryIntReader(parentReader,
- columnDescriptor, columnChunkMetaData, fixedLength,
(NullableIntVector) valueVec, schemaElement);
- case UINT_8:
- case UINT_16:
- case UINT_32:
- return new
NullableFixedByteAlignedReaders.NullableDictionaryUInt4Reader(parentReader,
- columnDescriptor, columnChunkMetaData, fixedLength,
(NullableUInt4Vector) valueVec, schemaElement);
case DECIMAL:
return new
NullableFixedByteAlignedReaders.NullableDictionaryVarDecimalReader(parentReader,
columnDescriptor, columnChunkMetaData, fixedLength,
(NullableVarDecimalVector) valueVec, schemaElement);
- case TIME_MILLIS:
- return new
NullableFixedByteAlignedReaders.NullableDictionaryTimeReader(parentReader,
columnDescriptor, columnChunkMetaData, fixedLength,
(NullableTimeVector)valueVec, schemaElement);
+ case INTERVAL:
+ return new
NullableFixedByteAlignedReaders.NullableIntervalReader(parentReader,
columnDescriptor,
+ columnChunkMetaData, fixedLength, (NullableIntervalVector)
valueVec, schemaElement);
default:
- throw new ExecutionSetupException("Unsupported nullable
converted type " + convertedType + " for primitive type INT32");
- }
- case INT64:
- if (convertedType == null) {
- return new
NullableFixedByteAlignedReaders.NullableDictionaryBigIntReader(parentReader,
columnDescriptor, columnChunkMetaData, fixedLength,
(NullableBigIntVector)valueVec, schemaElement);
- }
- switch (convertedType) {
- case UINT_64:
- return new
NullableFixedByteAlignedReaders.NullableDictionaryUInt8Reader(parentReader,
columnDescriptor,
- columnChunkMetaData, fixedLength, (NullableUInt8Vector)
valueVec, schemaElement);
- case DECIMAL:
- return new
NullableFixedByteAlignedReaders.NullableDictionaryVarDecimalReader(parentReader,
- columnDescriptor, columnChunkMetaData, fixedLength,
(NullableVarDecimalVector) valueVec, schemaElement);
- case TIMESTAMP_MILLIS:
- return new
NullableFixedByteAlignedReaders.NullableDictionaryTimeStampReader(parentReader,
columnDescriptor, columnChunkMetaData, fixedLength,
(NullableTimeStampVector)valueVec, schemaElement);
- // DRILL-6670: handle TIMESTAMP_MICROS as INT64 with no logical
type
- case TIMESTAMP_MICROS:
- case INT_64:
- return new
NullableFixedByteAlignedReaders.NullableDictionaryBigIntReader(parentReader,
- columnDescriptor, columnChunkMetaData, fixedLength,
(NullableBigIntVector) valueVec, schemaElement);
- default:
- throw new ExecutionSetupException("Unsupported nullable
converted type " + convertedType + " for primitive type INT64");
- }
- case INT96:
- // TODO: check convertedType once parquet support TIMESTAMP_NANOS
type annotation.
- if
(parentReader.getFragmentContext().getOptions().getOption(ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP).bool_val)
{
- return new
NullableFixedByteAlignedReaders.NullableFixedBinaryAsTimeStampReader(parentReader,
columnDescriptor, columnChunkMetaData, true, (NullableTimeStampVector)
valueVec, schemaElement);
- } else {
- return new
NullableFixedByteAlignedReaders.NullableFixedBinaryReader(parentReader,
columnDescriptor, columnChunkMetaData, true, (NullableVarBinaryVector)
valueVec, schemaElement);
+ if
(!columnChunkMetaData.getEncodings().contains(Encoding.PLAIN_DICTIONARY)) {
+ return new
NullableFixedByteAlignedReaders.NullableFixedByteAlignedReader<>(parentReader,columnDescriptor,
+ columnChunkMetaData, fixedLength, valueVec, schemaElement);
+ }
}
- case FLOAT:
- return new
NullableFixedByteAlignedReaders.NullableDictionaryFloat4Reader(parentReader,
columnDescriptor, columnChunkMetaData, fixedLength,
(NullableFloat4Vector)valueVec, schemaElement);
- case DOUBLE:
- return new
NullableFixedByteAlignedReaders.NullableDictionaryFloat8Reader(parentReader,
columnDescriptor, columnChunkMetaData, fixedLength,
(NullableFloat8Vector)valueVec, schemaElement);
- default:
- throw new ExecutionSetupException("Unsupported nullable column type
" + columnDescriptor.getType().name() );
- }
+ }
+ return new
NullableFixedByteAlignedReaders.NullableFixedBinaryReader(parentReader,
columnDescriptor,
+ columnChunkMetaData, fixedLength, (NullableVarBinaryVector)
valueVec, schemaElement);
+ default:
+ throw new ExecutionSetupException("Unsupported nullable column type "
+ columnDescriptor.getPrimitiveType().getPrimitiveTypeName().name());
}
}
}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
index 94a1f59..9849b7d 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
@@ -318,6 +318,27 @@ public class NullableFixedByteAlignedReaders {
}
}
break;
+ case FIXED_LEN_BYTE_ARRAY:
+ case BINARY:
+ if (usingDictionary) {
+ NullableVarDecimalVector.Mutator mutator = valueVec.getMutator();
+ for (int i = 0; i < recordsReadInThisIteration; i++) {
+ Binary currDictValToWrite =
pageReader.dictionaryValueReader.readBytes();
+ mutator.setSafe(valuesReadInCurrentPass + i,
currDictValToWrite.toByteBuffer().slice(), 0,
+ currDictValToWrite.length());
+ }
+ // Set the write Index. The next page that gets read might be a
page that does not use dictionary encoding
+ // and we will go into the else condition below. The readField
method of the parent class requires the
+ // writer index to be set correctly.
+ int writerIndex = valueVec.getBuffer().writerIndex();
+ valueVec.getBuffer().setIndex(0, writerIndex + (int) readLength);
+ } else {
+ for (int i = 0; i < recordsToReadInThisPass; i++) {
+ Binary valueToWrite = pageReader.valueReader.readBytes();
+ valueVec.getMutator().setSafe(valuesReadInCurrentPass + i,
valueToWrite.toByteBuffer().slice(), 0,
+ valueToWrite.length());
+ }
+ }
}
}
@@ -471,31 +492,6 @@ public class NullableFixedByteAlignedReaders {
}
}
- public static class NullableVarDecimalReader extends
NullableConvertedReader<NullableVarDecimalVector> {
- NullableVarDecimalReader(ParquetRecordReader parentReader,
ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
- boolean fixedLength, NullableVarDecimalVector v, SchemaElement
schemaElement) throws ExecutionSetupException {
- super(parentReader, descriptor, columnChunkMetaData, fixedLength, v,
schemaElement);
- }
-
- // TODO: allow reading page instead of reading every record separately
- @Override
- void addNext(int start, int index) {
- switch (columnChunkMetaData.getType()) {
- case INT32:
- valueVec.getMutator().setSafe(index,
Ints.toByteArray(bytebuf.getInt(start)), 0, dataTypeLengthInBytes);
- break;
- case INT64:
- valueVec.getMutator().setSafe(index,
Longs.toByteArray(bytebuf.getLong(start)), 0, dataTypeLengthInBytes);
- break;
- case FIXED_LEN_BYTE_ARRAY:
- case BINARY:
- valueVec.getMutator().setSafe(index, 1, start, start +
dataTypeLengthInBytes, bytebuf);
- break;
- }
-
- }
- }
-
public static class NullableIntervalReader extends
NullableConvertedReader<NullableIntervalVector> {
NullableIntervalReader(ParquetRecordReader parentReader, ColumnDescriptor
descriptor, ColumnChunkMetaData columnChunkMetaData,
boolean fixedLength, NullableIntervalVector v,
SchemaElement schemaElement) throws ExecutionSetupException {
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetLogicalTypes.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetLogicalTypes.java
index 0ac7d0a..5b7e891 100644
---
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetLogicalTypes.java
+++
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetLogicalTypes.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.store.parquet;
import static org.apache.drill.test.TestBuilder.mapOf;
+import java.math.BigDecimal;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
@@ -697,4 +698,14 @@ public class TestParquetLogicalTypes extends BaseTestQuery
{
.baselineValues(new Object[]{null})
.go();
}
+
+ @Test
+ public void testDecimalDictionaryEncoding() throws Exception {
+ testBuilder()
+ .sqlQuery("select RegHrs from cp.`parquet/dict_dec.parquet`")
+ .ordered()
+ .baselineColumns("RegHrs")
+ .baselineValues(new BigDecimal("8.000000"))
+ .go();
+ }
}
diff --git a/exec/java-exec/src/test/resources/parquet/dict_dec.parquet
b/exec/java-exec/src/test/resources/parquet/dict_dec.parquet
new file mode 100644
index 0000000..5b8dd5a
Binary files /dev/null and
b/exec/java-exec/src/test/resources/parquet/dict_dec.parquet differ