This is an automated email from the ASF dual-hosted git repository.

cgivre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new 2c0e718  DRILL-7919: Fix reading parquet with decimal dictionary 
encoding (#2232)
2c0e718 is described below

commit 2c0e718e42dd332f54e4e626afe097855a89691a
Author: Volodymyr Vysotskyi <[email protected]>
AuthorDate: Fri May 28 17:14:36 2021 +0300

    DRILL-7919: Fix reading parquet with decimal dictionary encoding (#2232)
---
 .../parquet/columnreaders/ColumnReaderFactory.java | 187 ++++++++++-----------
 .../NullableFixedByteAlignedReaders.java           |  46 +++--
 .../store/parquet/TestParquetLogicalTypes.java     |  11 ++
 .../src/test/resources/parquet/dict_dec.parquet    | Bin 0 -> 826 bytes
 4 files changed, 120 insertions(+), 124 deletions(-)

diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
index 25c9904..e6bd9c3 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
@@ -177,38 +177,8 @@ public class ColumnReaderFactory {
         }
       }
     } else { // if the column is nullable
-      if (columnChunkMetaData.getType() == 
PrimitiveType.PrimitiveTypeName.BOOLEAN) {
-        return new NullableBitReader(recordReader, descriptor, 
columnChunkMetaData,
-            fixedLength, (NullableBitVector) v, schemaElement);
-      } else if (columnChunkMetaData.getType() == 
PrimitiveType.PrimitiveTypeName.INT32 && convertedType == ConvertedType.DATE) {
-        switch(recordReader.getDateCorruptionStatus()) {
-          case META_SHOWS_CORRUPTION:
-            return new 
NullableFixedByteAlignedReaders.NullableCorruptDateReader(recordReader, 
descriptor, columnChunkMetaData, fixedLength, (NullableDateVector)v, 
schemaElement);
-          case META_SHOWS_NO_CORRUPTION:
-            return new 
NullableFixedByteAlignedReaders.NullableDateReader(recordReader, descriptor, 
columnChunkMetaData, fixedLength, (NullableDateVector) v, schemaElement);
-          case META_UNCLEAR_TEST_VALUES:
-            return new 
NullableFixedByteAlignedReaders.CorruptionDetectingNullableDateReader(recordReader,
 descriptor, columnChunkMetaData, fixedLength, (NullableDateVector) v, 
schemaElement);
-          default:
-            throw new ExecutionSetupException(
-                String.format("Issue setting up parquet reader for date type, 
" +
-                        "unrecognized date corruption status %s. See 
DRILL-4203 for more info.",
-                    recordReader.getDateCorruptionStatus()));
-        }
-      } else if (columnChunkMetaData.getType() == 
PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) {
-        if (convertedType == ConvertedType.DECIMAL) {
-          return new 
NullableFixedByteAlignedReaders.NullableVarDecimalReader(recordReader,
-              descriptor, columnChunkMetaData, fixedLength, 
(NullableVarDecimalVector) v, schemaElement);
-        } else if (convertedType == ConvertedType.INTERVAL) {
-          return new 
NullableFixedByteAlignedReaders.NullableIntervalReader(recordReader, descriptor,
-              columnChunkMetaData, fixedLength, (NullableIntervalVector) v, 
schemaElement);
-        } else {
-          return new 
NullableFixedByteAlignedReaders.NullableFixedBinaryReader(recordReader, 
descriptor,
-                  columnChunkMetaData, fixedLength, (NullableVarBinaryVector) 
v, schemaElement);
-        }
-      } else {
-        return getNullableColumnReader(recordReader, descriptor,
-            columnChunkMetaData, fixedLength, v, schemaElement);
-      }
+      return getNullableColumnReader(recordReader, descriptor,
+          columnChunkMetaData, fixedLength, v, schemaElement);
     }
   }
 
@@ -252,7 +222,7 @@ public class ColumnReaderFactory {
     }
   }
 
-  public static NullableColumnReader<?> 
getNullableColumnReader(ParquetRecordReader parentReader,
+  public static ColumnReader<?> getNullableColumnReader(ParquetRecordReader 
parentReader,
                                                              ColumnDescriptor 
columnDescriptor,
                                                              
ColumnChunkMetaData columnChunkMetaData,
                                                              boolean 
fixedLength,
@@ -260,83 +230,102 @@ public class ColumnReaderFactory {
                                                              SchemaElement 
schemaElement) throws ExecutionSetupException {
     ConvertedType convertedType = schemaElement.getConverted_type();
 
-    if (! 
columnChunkMetaData.getEncodings().contains(Encoding.PLAIN_DICTIONARY)) {
-      if (columnDescriptor.getType() == PrimitiveType.PrimitiveTypeName.INT96) 
{
-         // TODO: check convertedType once parquet support TIMESTAMP_NANOS 
type annotation.
+    switch (columnDescriptor.getPrimitiveType().getPrimitiveTypeName()) {
+      case BOOLEAN:
+        return new NullableBitReader(parentReader, columnDescriptor, 
columnChunkMetaData,
+            fixedLength, (NullableBitVector) valueVec, schemaElement);
+      case INT32:
+        if (convertedType == null) {
+          return new 
NullableFixedByteAlignedReaders.NullableDictionaryIntReader(parentReader, 
columnDescriptor, columnChunkMetaData, fixedLength, (NullableIntVector) 
valueVec, schemaElement);
+        }
+        switch (convertedType) {
+          case INT_8:
+          case INT_16:
+          case INT_32:
+            return new 
NullableFixedByteAlignedReaders.NullableDictionaryIntReader(parentReader,
+              columnDescriptor, columnChunkMetaData, fixedLength, 
(NullableIntVector) valueVec, schemaElement);
+          case UINT_8:
+          case UINT_16:
+          case UINT_32:
+            return new 
NullableFixedByteAlignedReaders.NullableDictionaryUInt4Reader(parentReader,
+              columnDescriptor, columnChunkMetaData, fixedLength, 
(NullableUInt4Vector) valueVec, schemaElement);
+          case DECIMAL:
+            return new 
NullableFixedByteAlignedReaders.NullableDictionaryVarDecimalReader(parentReader,
+                columnDescriptor, columnChunkMetaData, fixedLength, 
(NullableVarDecimalVector) valueVec, schemaElement);
+          case TIME_MILLIS:
+            return new 
NullableFixedByteAlignedReaders.NullableDictionaryTimeReader(parentReader, 
columnDescriptor, columnChunkMetaData, fixedLength, (NullableTimeVector) 
valueVec, schemaElement);
+          case DATE:
+            switch (parentReader.getDateCorruptionStatus()) {
+              case META_SHOWS_CORRUPTION:
+                return new 
NullableFixedByteAlignedReaders.NullableCorruptDateReader(parentReader,
+                    columnDescriptor, columnChunkMetaData, fixedLength, 
(NullableDateVector) valueVec, schemaElement);
+              case META_SHOWS_NO_CORRUPTION:
+                return new 
NullableFixedByteAlignedReaders.NullableDateReader(parentReader,
+                    columnDescriptor, columnChunkMetaData, fixedLength, 
(NullableDateVector) valueVec, schemaElement);
+              case META_UNCLEAR_TEST_VALUES:
+                return new 
NullableFixedByteAlignedReaders.CorruptionDetectingNullableDateReader(parentReader,
+                    columnDescriptor, columnChunkMetaData, fixedLength, 
(NullableDateVector) valueVec, schemaElement);
+              default:
+                throw new ExecutionSetupException(
+                    String.format("Issue setting up parquet reader for date 
type, " +
+                            "unrecognized date corruption status %s. See 
DRILL-4203 for more info.",
+                        parentReader.getDateCorruptionStatus()));
+            }
+          default:
+            throw new ExecutionSetupException("Unsupported nullable converted 
type " + convertedType + " for primitive type INT32");
+        }
+      case INT64:
+        if (convertedType == null) {
+          return new 
NullableFixedByteAlignedReaders.NullableDictionaryBigIntReader(parentReader, 
columnDescriptor, columnChunkMetaData, fixedLength, 
(NullableBigIntVector)valueVec, schemaElement);
+        }
+        switch (convertedType) {
+          case UINT_64:
+            return new 
NullableFixedByteAlignedReaders.NullableDictionaryUInt8Reader(parentReader, 
columnDescriptor,
+              columnChunkMetaData, fixedLength, (NullableUInt8Vector) 
valueVec, schemaElement);
+          case DECIMAL:
+            return new 
NullableFixedByteAlignedReaders.NullableDictionaryVarDecimalReader(parentReader,
+                columnDescriptor, columnChunkMetaData, fixedLength, 
(NullableVarDecimalVector) valueVec, schemaElement);
+          case TIMESTAMP_MILLIS:
+            return new 
NullableFixedByteAlignedReaders.NullableDictionaryTimeStampReader(parentReader, 
columnDescriptor, columnChunkMetaData, fixedLength, 
(NullableTimeStampVector)valueVec, schemaElement);
+          // DRILL-6670: handle TIMESTAMP_MICROS as INT64 with no logical type
+          case TIMESTAMP_MICROS:
+          case INT_64:
+            return new 
NullableFixedByteAlignedReaders.NullableDictionaryBigIntReader(parentReader,
+              columnDescriptor, columnChunkMetaData, fixedLength, 
(NullableBigIntVector) valueVec, schemaElement);
+          default:
+            throw new ExecutionSetupException("Unsupported nullable converted 
type " + convertedType + " for primitive type INT64");
+        }
+      case INT96:
+        // TODO: check convertedType once parquet support TIMESTAMP_NANOS type 
annotation.
         if 
(parentReader.getFragmentContext().getOptions().getOption(ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP).bool_val)
 {
           return new 
NullableFixedByteAlignedReaders.NullableFixedBinaryAsTimeStampReader(parentReader,
 columnDescriptor, columnChunkMetaData, true, (NullableTimeStampVector) 
valueVec, schemaElement);
         } else {
           return new 
NullableFixedByteAlignedReaders.NullableFixedBinaryReader(parentReader, 
columnDescriptor, columnChunkMetaData, true, (NullableVarBinaryVector) 
valueVec, schemaElement);
         }
-      } else if (convertedType == ConvertedType.DECIMAL) {
-        // NullableVarDecimalVector allows storing of values with different 
width,
-        // so every time when the value is added, offset vector should be 
updated.
-        // Therefore NullableVarDecimalReader is used here instead of 
NullableFixedByteAlignedReader.
-        return new 
NullableFixedByteAlignedReaders.NullableVarDecimalReader(parentReader,
-            columnDescriptor, columnChunkMetaData, fixedLength, 
(NullableVarDecimalVector) valueVec, schemaElement);
-      } else {
-        return new 
NullableFixedByteAlignedReaders.NullableFixedByteAlignedReader<>(parentReader, 
columnDescriptor, columnChunkMetaData, fixedLength, valueVec, schemaElement);
-      }
-    } else {
-      switch (columnDescriptor.getType()) {
-        case INT32:
-          if (convertedType == null) {
-            return new 
NullableFixedByteAlignedReaders.NullableDictionaryIntReader(parentReader, 
columnDescriptor, columnChunkMetaData, fixedLength, (NullableIntVector) 
valueVec, schemaElement);
-          }
+      case FLOAT:
+        return new 
NullableFixedByteAlignedReaders.NullableDictionaryFloat4Reader(parentReader, 
columnDescriptor, columnChunkMetaData, fixedLength, 
(NullableFloat4Vector)valueVec, schemaElement);
+      case DOUBLE:
+        return new 
NullableFixedByteAlignedReaders.NullableDictionaryFloat8Reader(parentReader, 
columnDescriptor, columnChunkMetaData, fixedLength, 
(NullableFloat8Vector)valueVec, schemaElement);
+      case FIXED_LEN_BYTE_ARRAY:
+        if (convertedType != null) {
           switch (convertedType) {
-            case INT_8:
-            case INT_16:
-            case INT_32:
-              return new 
NullableFixedByteAlignedReaders.NullableDictionaryIntReader(parentReader,
-                columnDescriptor, columnChunkMetaData, fixedLength, 
(NullableIntVector) valueVec, schemaElement);
-            case UINT_8:
-            case UINT_16:
-            case UINT_32:
-              return new 
NullableFixedByteAlignedReaders.NullableDictionaryUInt4Reader(parentReader,
-                columnDescriptor, columnChunkMetaData, fixedLength, 
(NullableUInt4Vector) valueVec, schemaElement);
             case DECIMAL:
               return new 
NullableFixedByteAlignedReaders.NullableDictionaryVarDecimalReader(parentReader,
                   columnDescriptor, columnChunkMetaData, fixedLength, 
(NullableVarDecimalVector) valueVec, schemaElement);
-            case TIME_MILLIS:
-              return new 
NullableFixedByteAlignedReaders.NullableDictionaryTimeReader(parentReader, 
columnDescriptor, columnChunkMetaData, fixedLength, 
(NullableTimeVector)valueVec, schemaElement);
+            case INTERVAL:
+              return new 
NullableFixedByteAlignedReaders.NullableIntervalReader(parentReader, 
columnDescriptor,
+                  columnChunkMetaData, fixedLength, (NullableIntervalVector) 
valueVec, schemaElement);
             default:
-              throw new ExecutionSetupException("Unsupported nullable 
converted type " + convertedType + " for primitive type INT32");
-          }
-        case INT64:
-          if (convertedType == null) {
-            return new 
NullableFixedByteAlignedReaders.NullableDictionaryBigIntReader(parentReader, 
columnDescriptor, columnChunkMetaData, fixedLength, 
(NullableBigIntVector)valueVec, schemaElement);
-          }
-          switch (convertedType) {
-            case UINT_64:
-              return new 
NullableFixedByteAlignedReaders.NullableDictionaryUInt8Reader(parentReader, 
columnDescriptor,
-                columnChunkMetaData, fixedLength, (NullableUInt8Vector) 
valueVec, schemaElement);
-            case DECIMAL:
-              return new 
NullableFixedByteAlignedReaders.NullableDictionaryVarDecimalReader(parentReader,
-                  columnDescriptor, columnChunkMetaData, fixedLength, 
(NullableVarDecimalVector) valueVec, schemaElement);
-            case TIMESTAMP_MILLIS:
-              return new 
NullableFixedByteAlignedReaders.NullableDictionaryTimeStampReader(parentReader, 
columnDescriptor, columnChunkMetaData, fixedLength, 
(NullableTimeStampVector)valueVec, schemaElement);
-            // DRILL-6670: handle TIMESTAMP_MICROS as INT64 with no logical 
type
-            case TIMESTAMP_MICROS:
-            case INT_64:
-              return new 
NullableFixedByteAlignedReaders.NullableDictionaryBigIntReader(parentReader,
-                columnDescriptor, columnChunkMetaData, fixedLength, 
(NullableBigIntVector) valueVec, schemaElement);
-            default:
-              throw new ExecutionSetupException("Unsupported nullable 
converted type " + convertedType + " for primitive type INT64");
-          }
-        case INT96:
-          // TODO: check convertedType once parquet support TIMESTAMP_NANOS 
type annotation.
-          if 
(parentReader.getFragmentContext().getOptions().getOption(ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP).bool_val)
 {
-            return new 
NullableFixedByteAlignedReaders.NullableFixedBinaryAsTimeStampReader(parentReader,
 columnDescriptor, columnChunkMetaData, true, (NullableTimeStampVector) 
valueVec, schemaElement);
-          } else {
-            return new 
NullableFixedByteAlignedReaders.NullableFixedBinaryReader(parentReader, 
columnDescriptor, columnChunkMetaData, true, (NullableVarBinaryVector) 
valueVec, schemaElement);
+              if 
(!columnChunkMetaData.getEncodings().contains(Encoding.PLAIN_DICTIONARY)) {
+                return new 
NullableFixedByteAlignedReaders.NullableFixedByteAlignedReader<>(parentReader,columnDescriptor,
+                    columnChunkMetaData, fixedLength, valueVec, schemaElement);
+              }
           }
-        case FLOAT:
-          return new 
NullableFixedByteAlignedReaders.NullableDictionaryFloat4Reader(parentReader, 
columnDescriptor, columnChunkMetaData, fixedLength, 
(NullableFloat4Vector)valueVec, schemaElement);
-        case DOUBLE:
-          return new 
NullableFixedByteAlignedReaders.NullableDictionaryFloat8Reader(parentReader, 
columnDescriptor, columnChunkMetaData, fixedLength, 
(NullableFloat8Vector)valueVec, schemaElement);
-        default:
-          throw new ExecutionSetupException("Unsupported nullable column type 
" + columnDescriptor.getType().name() );
-      }
+        }
+        return new 
NullableFixedByteAlignedReaders.NullableFixedBinaryReader(parentReader, 
columnDescriptor,
+            columnChunkMetaData, fixedLength, (NullableVarBinaryVector) 
valueVec, schemaElement);
+      default:
+        throw new ExecutionSetupException("Unsupported nullable column type " 
+ columnDescriptor.getPrimitiveType().getPrimitiveTypeName().name());
     }
   }
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
index 94a1f59..9849b7d 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
@@ -318,6 +318,27 @@ public class NullableFixedByteAlignedReaders {
             }
           }
           break;
+        case FIXED_LEN_BYTE_ARRAY:
+        case BINARY:
+          if (usingDictionary) {
+            NullableVarDecimalVector.Mutator mutator = valueVec.getMutator();
+            for (int i = 0; i < recordsReadInThisIteration; i++) {
+              Binary currDictValToWrite = 
pageReader.dictionaryValueReader.readBytes();
+              mutator.setSafe(valuesReadInCurrentPass + i, 
currDictValToWrite.toByteBuffer().slice(), 0,
+                  currDictValToWrite.length());
+            }
+            // Set the write Index. The next page that gets read might be a 
page that does not use dictionary encoding
+            // and we will go into the else condition below. The readField 
method of the parent class requires the
+            // writer index to be set correctly.
+            int writerIndex = valueVec.getBuffer().writerIndex();
+            valueVec.getBuffer().setIndex(0, writerIndex + (int) readLength);
+          } else {
+            for (int i = 0; i < recordsToReadInThisPass; i++) {
+              Binary valueToWrite = pageReader.valueReader.readBytes();
+              valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, 
valueToWrite.toByteBuffer().slice(), 0,
+                  valueToWrite.length());
+            }
+          }
       }
     }
 
@@ -471,31 +492,6 @@ public class NullableFixedByteAlignedReaders {
     }
   }
 
-  public static class NullableVarDecimalReader extends 
NullableConvertedReader<NullableVarDecimalVector> {
-    NullableVarDecimalReader(ParquetRecordReader parentReader, 
ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
-        boolean fixedLength, NullableVarDecimalVector v, SchemaElement 
schemaElement) throws ExecutionSetupException {
-      super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, 
schemaElement);
-    }
-
-    // TODO: allow reading page instead of reading every record separately
-    @Override
-    void addNext(int start, int index) {
-      switch (columnChunkMetaData.getType()) {
-        case INT32:
-          valueVec.getMutator().setSafe(index, 
Ints.toByteArray(bytebuf.getInt(start)), 0, dataTypeLengthInBytes);
-          break;
-        case INT64:
-          valueVec.getMutator().setSafe(index, 
Longs.toByteArray(bytebuf.getLong(start)), 0, dataTypeLengthInBytes);
-          break;
-        case FIXED_LEN_BYTE_ARRAY:
-        case BINARY:
-          valueVec.getMutator().setSafe(index, 1, start, start + 
dataTypeLengthInBytes, bytebuf);
-          break;
-      }
-
-    }
-  }
-
   public static class NullableIntervalReader extends 
NullableConvertedReader<NullableIntervalVector> {
     NullableIntervalReader(ParquetRecordReader parentReader, ColumnDescriptor 
descriptor, ColumnChunkMetaData columnChunkMetaData,
                    boolean fixedLength, NullableIntervalVector v, 
SchemaElement schemaElement) throws ExecutionSetupException {
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetLogicalTypes.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetLogicalTypes.java
index 0ac7d0a..5b7e891 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetLogicalTypes.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetLogicalTypes.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.store.parquet;
 
 import static org.apache.drill.test.TestBuilder.mapOf;
 
+import java.math.BigDecimal;
 import java.time.Instant;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
@@ -697,4 +698,14 @@ public class TestParquetLogicalTypes extends BaseTestQuery 
{
             .baselineValues(new Object[]{null})
             .go();
   }
+
+  @Test
+  public void testDecimalDictionaryEncoding() throws Exception {
+    testBuilder()
+        .sqlQuery("select RegHrs from cp.`parquet/dict_dec.parquet`")
+        .ordered()
+        .baselineColumns("RegHrs")
+        .baselineValues(new BigDecimal("8.000000"))
+        .go();
+  }
 }
diff --git a/exec/java-exec/src/test/resources/parquet/dict_dec.parquet 
b/exec/java-exec/src/test/resources/parquet/dict_dec.parquet
new file mode 100644
index 0000000..5b8dd5a
Binary files /dev/null and 
b/exec/java-exec/src/test/resources/parquet/dict_dec.parquet differ

Reply via email to