This is an automated email from the ASF dual-hosted git repository.

volodymyr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new f056ea7  DRILL-7948: Unable to query file with required 
fixed_len_byte_array decimal columns (#2254)
f056ea7 is described below

commit f056ea7962c6d5c3c6898f6bf338bd3ecbf68094
Author: Volodymyr Vysotskyi <[email protected]>
AuthorDate: Wed Jun 16 20:31:07 2021 +0300

    DRILL-7948: Unable to query file with required fixed_len_byte_array decimal 
columns (#2254)
---
 .../parquet/columnreaders/AsyncPageReader.java     |  24 ++-
 .../parquet/columnreaders/ColumnReaderFactory.java | 225 +++++++++++----------
 .../columnreaders/FixedByteAlignedReader.java      |  32 +--
 .../NullableFixedByteAlignedReaders.java           |   5 +-
 .../store/parquet/columnreaders/PageReader.java    |  97 ++++++++-
 .../ParquetFixedWidthDictionaryReaders.java        |  34 +++-
 .../parquet/ParquetSimpleTestFileGenerator.java    |  30 ++-
 .../store/parquet/TestParquetLogicalTypes.java     |  39 +++-
 .../SingleRow_RequiredFixedLength_Decimal.parquet  | Bin 0 -> 916 bytes
 .../resources/parquet/interval_dictionary.parquet  | Bin 0 -> 396 bytes
 .../parquet/nullable_interval_dictionary.parquet   | Bin 0 -> 402 bytes
 11 files changed, 312 insertions(+), 174 deletions(-)

diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
index 8b5c926..ffb10da 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
@@ -441,12 +441,24 @@ class AsyncPageReader extends PageReader {
         bytesRead = compressedSize;
 
         synchronized (parent) {
-          if (pageHeader.getType() == PageType.DICTIONARY_PAGE) {
-            readStatus.setIsDictionaryPage(true);
-            valuesRead += 
pageHeader.getDictionary_page_header().getNum_values();
-          } else {
-            valuesRead += pageHeader.getData_page_header().getNum_values();
-            parent.totalPageValuesRead += valuesRead;
+          PageType type = pageHeader.getType() == null ? PageType.DATA_PAGE : 
pageHeader.getType();
+          switch (type) {
+            case DICTIONARY_PAGE:
+              readStatus.setIsDictionaryPage(true);
+              valuesRead += 
pageHeader.getDictionary_page_header().getNum_values();
+              break;
+            case DATA_PAGE_V2:
+              valuesRead += 
pageHeader.getData_page_header_v2().getNum_values();
+              parent.totalPageValuesRead += valuesRead;
+              break;
+            case DATA_PAGE:
+              valuesRead += pageHeader.getData_page_header().getNum_values();
+              parent.totalPageValuesRead += valuesRead;
+              break;
+            default:
+              throw UserException.unsupportedError()
+                  .message("Page type is not supported yet: " + type)
+                  .build(logger);
           }
           long timeToRead = timer.elapsed(TimeUnit.NANOSECONDS);
           readStatus.setPageHeader(pageHeader);
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
index e6bd9c3..9db336c 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
@@ -54,7 +54,6 @@ import org.apache.parquet.column.Encoding;
 import org.apache.parquet.format.ConvertedType;
 import org.apache.parquet.format.SchemaElement;
 import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
-import org.apache.parquet.schema.PrimitiveType;
 
 public class ColumnReaderFactory {
 
@@ -74,108 +73,7 @@ public class ColumnReaderFactory {
     // if the column is required, or repeated (in which case we just want to 
use this to generate our appropriate
     // ColumnReader for actually transferring data into the data vector inside 
of our repeated vector
     if (descriptor.getMaxDefinitionLevel() == 0 || 
descriptor.getMaxRepetitionLevel() > 0) {
-      if (columnChunkMetaData.getType() == 
PrimitiveType.PrimitiveTypeName.BOOLEAN) {
-        return new BitReader(recordReader, descriptor, columnChunkMetaData,
-            fixedLength, (BitVector) v, schemaElement);
-      } else if 
(!columnChunkMetaData.getEncodings().contains(Encoding.PLAIN_DICTIONARY) && (
-          columnChunkMetaData.getType() == 
PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY
-              || columnChunkMetaData.getType() == 
PrimitiveType.PrimitiveTypeName.INT96)) {
-        if (convertedType == null) {
-          return new FixedByteAlignedReader.FixedBinaryReader(recordReader, 
descriptor,
-              columnChunkMetaData, (VariableWidthVector) v, schemaElement);
-        }
-        switch (convertedType) {
-          case DECIMAL:
-            return new FixedByteAlignedReader.VarDecimalReader(recordReader, 
descriptor,
-                columnChunkMetaData, fixedLength, (VarDecimalVector) v, 
schemaElement);
-          case INTERVAL:
-            return new FixedByteAlignedReader.IntervalReader(recordReader, 
descriptor,
-                columnChunkMetaData, fixedLength, (IntervalVector) v, 
schemaElement);
-          default:
-            return new FixedByteAlignedReader.FixedBinaryReader(recordReader, 
descriptor,
-                columnChunkMetaData, (VariableWidthVector) v, schemaElement);
-        }
-      } else if (columnChunkMetaData.getType() == 
PrimitiveType.PrimitiveTypeName.INT32 && convertedType == ConvertedType.DATE){
-        switch(recordReader.getDateCorruptionStatus()) {
-          case META_SHOWS_CORRUPTION:
-            return new FixedByteAlignedReader.CorruptDateReader(recordReader, 
descriptor, columnChunkMetaData, fixedLength, (DateVector) v, schemaElement);
-          case META_SHOWS_NO_CORRUPTION:
-            return new FixedByteAlignedReader.DateReader(recordReader, 
descriptor, columnChunkMetaData, fixedLength, (DateVector) v, schemaElement);
-          case META_UNCLEAR_TEST_VALUES:
-            return new 
FixedByteAlignedReader.CorruptionDetectingDateReader(recordReader, descriptor, 
columnChunkMetaData, fixedLength, (DateVector) v, schemaElement);
-          default:
-            throw new ExecutionSetupException(
-                String.format("Issue setting up parquet reader for date type, 
" +
-                        "unrecognized date corruption status %s. See 
DRILL-4203 for more info.",
-                recordReader.getDateCorruptionStatus()));
-        }
-      } else {
-        if 
(columnChunkMetaData.getEncodings().contains(Encoding.PLAIN_DICTIONARY)) {
-          switch (columnChunkMetaData.getType()) {
-            case INT32:
-              if (convertedType == null) {
-                return new 
ParquetFixedWidthDictionaryReaders.DictionaryIntReader(recordReader, 
descriptor, columnChunkMetaData, fixedLength, (IntVector) v, schemaElement);
-              }
-              switch (convertedType) {
-                case DECIMAL:
-                  return new 
ParquetFixedWidthDictionaryReaders.DictionaryVarDecimalReader(recordReader,
-                      descriptor, columnChunkMetaData, fixedLength, 
(VarDecimalVector) v, schemaElement);
-                case TIME_MILLIS:
-                  return new 
ParquetFixedWidthDictionaryReaders.DictionaryTimeReader(recordReader, 
descriptor, columnChunkMetaData, fixedLength, (TimeVector) v, schemaElement);
-                case INT_8:
-                case INT_16:
-                case INT_32:
-                  return new 
ParquetFixedWidthDictionaryReaders.DictionaryIntReader(recordReader, 
descriptor, columnChunkMetaData, fixedLength, (IntVector) v, schemaElement);
-                case UINT_8:
-                case UINT_16:
-                case UINT_32:
-                  return new 
ParquetFixedWidthDictionaryReaders.DictionaryUInt4Reader(recordReader, 
descriptor, columnChunkMetaData, fixedLength, (UInt4Vector) v, schemaElement);
-                default:
-                  throw new ExecutionSetupException("Unsupported dictionary 
converted type " + convertedType + " for primitive type INT32");
-              }
-            case INT64:
-              if (convertedType == null) {
-                return new 
ParquetFixedWidthDictionaryReaders.DictionaryBigIntReader(recordReader, 
descriptor, columnChunkMetaData, fixedLength, (BigIntVector) v, schemaElement);
-              }
-              switch (convertedType) {
-                // DRILL-6670: handle TIMESTAMP_MICROS as INT64 with no 
logical type
-                case INT_64:
-                case TIMESTAMP_MICROS:
-                  return new 
ParquetFixedWidthDictionaryReaders.DictionaryBigIntReader(recordReader, 
descriptor, columnChunkMetaData, fixedLength, (BigIntVector) v, schemaElement);
-                case UINT_64:
-                  return new 
ParquetFixedWidthDictionaryReaders.DictionaryUInt8Reader(recordReader, 
descriptor, columnChunkMetaData, fixedLength, (UInt8Vector) v, schemaElement);
-                case DECIMAL:
-                  return new 
ParquetFixedWidthDictionaryReaders.DictionaryVarDecimalReader(recordReader,
-                      descriptor, columnChunkMetaData, fixedLength, 
(VarDecimalVector) v, schemaElement);
-                case TIMESTAMP_MILLIS:
-                  return new 
ParquetFixedWidthDictionaryReaders.DictionaryTimeStampReader(recordReader, 
descriptor, columnChunkMetaData, fixedLength, (TimeStampVector) v, 
schemaElement);
-                default:
-                  throw new ExecutionSetupException("Unsupported dictionary 
converted type " + convertedType + " for primitive type INT64");
-              }
-            case FLOAT:
-              return new 
ParquetFixedWidthDictionaryReaders.DictionaryFloat4Reader(recordReader, 
descriptor, columnChunkMetaData, fixedLength, (Float4Vector) v, schemaElement);
-            case DOUBLE:
-              return new 
ParquetFixedWidthDictionaryReaders.DictionaryFloat8Reader(recordReader, 
descriptor, columnChunkMetaData, fixedLength, (Float8Vector) v, schemaElement);
-            case FIXED_LEN_BYTE_ARRAY:
-              return new 
ParquetFixedWidthDictionaryReaders.DictionaryFixedBinaryReader(recordReader, 
descriptor, columnChunkMetaData, fixedLength, (VarBinaryVector) v, 
schemaElement);
-            case INT96:
-              if 
(recordReader.getFragmentContext().getOptions().getOption(ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP).bool_val)
 {
-                return new 
ParquetFixedWidthDictionaryReaders.DictionaryBinaryAsTimeStampReader(recordReader,
 descriptor, columnChunkMetaData, fixedLength, (TimeStampVector) v, 
schemaElement);
-              } else {
-                return new 
ParquetFixedWidthDictionaryReaders.DictionaryFixedBinaryReader(recordReader, 
descriptor, columnChunkMetaData, fixedLength, (VarBinaryVector) v, 
schemaElement);
-              }
-            default:
-              throw new ExecutionSetupException("Unsupported dictionary column 
type " + descriptor.getType().name() );
-          }
-
-        } else if (convertedType == ConvertedType.DECIMAL) {
-          return new FixedByteAlignedReader.VarDecimalReader(recordReader,
-            descriptor, columnChunkMetaData, fixedLength, (VarDecimalVector) 
v, schemaElement);
-        } else {
-          return new FixedByteAlignedReader<>(recordReader, descriptor, 
columnChunkMetaData,
-              fixedLength, v, schemaElement);
-        }
-      }
+      return getColumnReader(recordReader, fixedLength, descriptor, 
columnChunkMetaData, v, schemaElement, convertedType);
     } else { // if the column is nullable
       return getNullableColumnReader(recordReader, descriptor,
           columnChunkMetaData, fixedLength, v, schemaElement);
@@ -222,6 +120,114 @@ public class ColumnReaderFactory {
     }
   }
 
+  private static ColumnReader<? extends ValueVector> 
getColumnReader(ParquetRecordReader recordReader,
+      boolean fixedLength, ColumnDescriptor descriptor, ColumnChunkMetaData 
columnChunkMetaData, ValueVector v,
+      SchemaElement schemaElement, ConvertedType convertedType) throws 
ExecutionSetupException {
+    switch (columnChunkMetaData.getPrimitiveType().getPrimitiveTypeName()) {
+      case BOOLEAN:
+        return new BitReader(recordReader, descriptor, columnChunkMetaData,
+            fixedLength, (BitVector) v, schemaElement);
+      case INT32:
+        if (convertedType == null) {
+          return new 
ParquetFixedWidthDictionaryReaders.DictionaryIntReader(recordReader, descriptor,
+              columnChunkMetaData, fixedLength, (IntVector) v, schemaElement);
+        }
+        switch (convertedType) {
+          case DATE:
+            switch(recordReader.getDateCorruptionStatus()) {
+              case META_SHOWS_CORRUPTION:
+                return new 
FixedByteAlignedReader.CorruptDateReader(recordReader, descriptor,
+                    columnChunkMetaData, fixedLength, (DateVector) v, 
schemaElement);
+              case META_SHOWS_NO_CORRUPTION:
+                return new FixedByteAlignedReader.DateReader(recordReader, 
descriptor, columnChunkMetaData,
+                    fixedLength, (DateVector) v, schemaElement);
+              case META_UNCLEAR_TEST_VALUES:
+                return new 
FixedByteAlignedReader.CorruptionDetectingDateReader(recordReader, descriptor,
+                    columnChunkMetaData, fixedLength, (DateVector) v, 
schemaElement);
+              default:
+                throw new ExecutionSetupException(
+                    String.format("Issue setting up parquet reader for date 
type, " +
+                            "unrecognized date corruption status %s. See 
DRILL-4203 for more info.",
+                        recordReader.getDateCorruptionStatus()));
+            }
+          case DECIMAL:
+            return new 
ParquetFixedWidthDictionaryReaders.DictionaryVarDecimalReader(recordReader,
+                descriptor, columnChunkMetaData, fixedLength, 
(VarDecimalVector) v, schemaElement);
+          case TIME_MILLIS:
+            return new 
ParquetFixedWidthDictionaryReaders.DictionaryTimeReader(recordReader, 
descriptor,
+                columnChunkMetaData, fixedLength, (TimeVector) v, 
schemaElement);
+          case INT_8:
+          case INT_16:
+          case INT_32:
+            return new 
ParquetFixedWidthDictionaryReaders.DictionaryIntReader(recordReader, descriptor,
+                columnChunkMetaData, fixedLength, (IntVector) v, 
schemaElement);
+          case UINT_8:
+          case UINT_16:
+          case UINT_32:
+            return new 
ParquetFixedWidthDictionaryReaders.DictionaryUInt4Reader(recordReader, 
descriptor,
+                columnChunkMetaData, fixedLength, (UInt4Vector) v, 
schemaElement);
+          default:
+            throw new ExecutionSetupException("Unsupported dictionary 
converted type " + convertedType + " for primitive type INT32");
+        }
+      case INT64:
+        if (convertedType == null) {
+          return new 
ParquetFixedWidthDictionaryReaders.DictionaryBigIntReader(recordReader, 
descriptor,
+              columnChunkMetaData, fixedLength, (BigIntVector) v, 
schemaElement);
+        }
+        switch (convertedType) {
+          // DRILL-6670: handle TIMESTAMP_MICROS as INT64 with no logical type
+          case INT_64:
+          case TIMESTAMP_MICROS:
+            return new 
ParquetFixedWidthDictionaryReaders.DictionaryBigIntReader(recordReader, 
descriptor,
+                columnChunkMetaData, fixedLength, (BigIntVector) v, 
schemaElement);
+          case UINT_64:
+            return new 
ParquetFixedWidthDictionaryReaders.DictionaryUInt8Reader(recordReader, 
descriptor,
+                columnChunkMetaData, fixedLength, (UInt8Vector) v, 
schemaElement);
+          case DECIMAL:
+            return new 
ParquetFixedWidthDictionaryReaders.DictionaryVarDecimalReader(recordReader,
+                descriptor, columnChunkMetaData, fixedLength, 
(VarDecimalVector) v, schemaElement);
+          case TIMESTAMP_MILLIS:
+            return new 
ParquetFixedWidthDictionaryReaders.DictionaryTimeStampReader(recordReader, 
descriptor,
+                columnChunkMetaData, fixedLength, (TimeStampVector) v, 
schemaElement);
+          default:
+            throw new ExecutionSetupException("Unsupported dictionary 
converted type " + convertedType + " for primitive type INT64");
+        }
+      case FLOAT:
+        return new 
ParquetFixedWidthDictionaryReaders.DictionaryFloat4Reader(recordReader, 
descriptor,
+            columnChunkMetaData, fixedLength, (Float4Vector) v, schemaElement);
+      case DOUBLE:
+        return new 
ParquetFixedWidthDictionaryReaders.DictionaryFloat8Reader(recordReader, 
descriptor,
+            columnChunkMetaData, fixedLength, (Float8Vector) v, schemaElement);
+      case FIXED_LEN_BYTE_ARRAY:
+        if (convertedType != null) {
+          switch (convertedType) {
+            case DECIMAL:
+              return new 
ParquetFixedWidthDictionaryReaders.DictionaryVarDecimalReader(recordReader,
+                  descriptor, columnChunkMetaData, fixedLength, 
(VarDecimalVector) v, schemaElement);
+            case INTERVAL:
+              return new FixedByteAlignedReader.IntervalReader(recordReader, 
descriptor,
+                  columnChunkMetaData, fixedLength, (IntervalVector) v, 
schemaElement);
+          }
+        }
+        if 
(columnChunkMetaData.getEncodings().contains(Encoding.PLAIN_DICTIONARY)) {
+          return new 
ParquetFixedWidthDictionaryReaders.DictionaryFixedBinaryReader(recordReader,
+              descriptor, columnChunkMetaData, fixedLength, (VarBinaryVector) 
v, schemaElement);
+        }
+        return new FixedByteAlignedReader.FixedBinaryReader(recordReader, 
descriptor,
+            columnChunkMetaData, (VariableWidthVector) v, schemaElement);
+      case INT96:
+        if 
(recordReader.getFragmentContext().getOptions().getOption(ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP).bool_val)
 {
+          return new 
ParquetFixedWidthDictionaryReaders.DictionaryBinaryAsTimeStampReader(recordReader,
+              descriptor, columnChunkMetaData, fixedLength, (TimeStampVector) 
v, schemaElement);
+        } else {
+          return new 
ParquetFixedWidthDictionaryReaders.DictionaryFixedBinaryReader(recordReader,
+              descriptor, columnChunkMetaData, fixedLength, (VarBinaryVector) 
v, schemaElement);
+        }
+      default:
+        throw new ExecutionSetupException("Unsupported dictionary column type 
" + descriptor.getPrimitiveType().getPrimitiveTypeName().name());
+    }
+  }
+
   public static ColumnReader<?> getNullableColumnReader(ParquetRecordReader 
parentReader,
                                                              ColumnDescriptor 
columnDescriptor,
                                                              
ColumnChunkMetaData columnChunkMetaData,
@@ -315,15 +321,14 @@ public class ColumnReaderFactory {
             case INTERVAL:
               return new 
NullableFixedByteAlignedReaders.NullableIntervalReader(parentReader, 
columnDescriptor,
                   columnChunkMetaData, fixedLength, (NullableIntervalVector) 
valueVec, schemaElement);
-            default:
-              if 
(!columnChunkMetaData.getEncodings().contains(Encoding.PLAIN_DICTIONARY)) {
-                return new 
NullableFixedByteAlignedReaders.NullableFixedByteAlignedReader<>(parentReader,columnDescriptor,
-                    columnChunkMetaData, fixedLength, valueVec, schemaElement);
-              }
           }
         }
-        return new 
NullableFixedByteAlignedReaders.NullableFixedBinaryReader(parentReader, 
columnDescriptor,
-            columnChunkMetaData, fixedLength, (NullableVarBinaryVector) 
valueVec, schemaElement);
+        if 
(columnChunkMetaData.getEncodings().contains(Encoding.PLAIN_DICTIONARY)) {
+          return new 
NullableFixedByteAlignedReaders.NullableFixedBinaryReader(parentReader, 
columnDescriptor,
+              columnChunkMetaData, fixedLength, (NullableVarBinaryVector) 
valueVec, schemaElement);
+        }
+        return new 
NullableFixedByteAlignedReaders.NullableFixedByteAlignedReader<>(parentReader,columnDescriptor,
+            columnChunkMetaData, fixedLength, valueVec, schemaElement);
       default:
         throw new ExecutionSetupException("Unsupported nullable column type " 
+ columnDescriptor.getPrimitiveType().getPrimitiveTypeName().name());
     }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java
index 82711c2..7c04322 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java
@@ -17,14 +17,11 @@
  */
 package org.apache.drill.exec.store.parquet.columnreaders;
 
-import org.apache.drill.shaded.guava.com.google.common.primitives.Ints;
-import org.apache.drill.shaded.guava.com.google.common.primitives.Longs;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
 import org.apache.drill.exec.vector.DateVector;
 import org.apache.drill.exec.vector.IntervalVector;
 import org.apache.drill.exec.vector.ValueVector;
-import org.apache.drill.exec.vector.VarDecimalVector;
 import org.apache.drill.exec.vector.VariableWidthVector;
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.format.SchemaElement;
@@ -195,30 +192,6 @@ class FixedByteAlignedReader<V extends ValueVector> 
extends ColumnReader<V> {
 
   }
 
-  public static class VarDecimalReader extends 
ConvertedReader<VarDecimalVector> {
-
-    VarDecimalReader(ParquetRecordReader parentReader, ColumnDescriptor 
descriptor, ColumnChunkMetaData columnChunkMetaData,
-        boolean fixedLength, VarDecimalVector v, SchemaElement schemaElement) 
throws ExecutionSetupException {
-      super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, 
schemaElement);
-    }
-
-    @Override
-    void addNext(int start, int index) {
-      switch (columnChunkMetaData.getType()) {
-        case INT32:
-          valueVec.getMutator().setSafe(index, 
Ints.toByteArray(bytebuf.getInt(start)), 0, dataTypeLengthInBytes);
-          break;
-        case INT64:
-          valueVec.getMutator().setSafe(index, 
Longs.toByteArray(bytebuf.getLong(start)), 0, dataTypeLengthInBytes);
-          break;
-        case FIXED_LEN_BYTE_ARRAY:
-        case BINARY:
-          valueVec.getMutator().setSafe(index, start, start + 
dataTypeLengthInBytes, bytebuf);
-          break;
-      }
-    }
-  }
-
   public static class IntervalReader extends ConvertedReader<IntervalVector> {
     IntervalReader(ParquetRecordReader parentReader, ColumnDescriptor 
descriptor, ColumnChunkMetaData columnChunkMetaData,
                    boolean fixedLength, IntervalVector v, SchemaElement 
schemaElement) throws ExecutionSetupException {
@@ -229,12 +202,13 @@ class FixedByteAlignedReader<V extends ValueVector> 
extends ColumnReader<V> {
     void addNext(int start, int index) {
       if (usingDictionary) {
         byte[] input = pageReader.dictionaryValueReader.readBytes().getBytes();
-        valueVec.getMutator().setSafe(index * 12,
+        valueVec.getMutator().setSafe(index,
             ParquetReaderUtility.getIntFromLEBytes(input, 0),
             ParquetReaderUtility.getIntFromLEBytes(input, 4),
             ParquetReaderUtility.getIntFromLEBytes(input, 8));
+      } else {
+        valueVec.getMutator().setSafe(index, bytebuf.getInt(start), 
bytebuf.getInt(start + 4), bytebuf.getInt(start + 8));
       }
-      valueVec.getMutator().setSafe(index, bytebuf.getInt(start), 
bytebuf.getInt(start + 4), bytebuf.getInt(start + 8));
     }
   }
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
index 389962f..14d665c 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
@@ -504,12 +504,13 @@ public class NullableFixedByteAlignedReaders {
     void addNext(int start, int index) {
       if (usingDictionary) {
         byte[] input = pageReader.dictionaryValueReader.readBytes().getBytes();
-        valueVec.getMutator().setSafe(index * 12, 1,
+        valueVec.getMutator().setSafe(index, 1,
             ParquetReaderUtility.getIntFromLEBytes(input, 0),
             ParquetReaderUtility.getIntFromLEBytes(input, 4),
             ParquetReaderUtility.getIntFromLEBytes(input, 8));
+      } else {
+        valueVec.getMutator().set(index, 1, bytebuf.getInt(start), 
bytebuf.getInt(start + 4), bytebuf.getInt(start + 8));
       }
-      valueVec.getMutator().set(index, 1, bytebuf.getInt(start), 
bytebuf.getInt(start + 4), bytebuf.getInt(start + 8));
     }
   }
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
index b50f6b3..79c272a 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.store.parquet.columnreaders;
 
+import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
 import io.netty.buffer.ByteBufUtil;
@@ -39,6 +40,8 @@ import org.apache.parquet.column.ValuesType;
 import org.apache.parquet.column.page.DictionaryPage;
 import org.apache.parquet.column.values.ValuesReader;
 import org.apache.parquet.column.values.dictionary.DictionaryValuesReader;
+import org.apache.parquet.format.DataPageHeader;
+import org.apache.parquet.format.DataPageHeaderV2;
 import org.apache.parquet.format.PageHeader;
 import org.apache.parquet.format.PageType;
 import org.apache.parquet.format.Util;
@@ -313,11 +316,12 @@ class PageReader {
     }
 
     timer.start();
-    currentPageCount = pageHeader.data_page_header.num_values;
+    PageHeaderInfoProvider pageHeaderInfoProvider = 
pageHeaderInfoProviderBuilder(pageHeader);
+    currentPageCount = pageHeaderInfoProvider.getNumValues();
 
-    final Encoding rlEncoding = 
METADATA_CONVERTER.getEncoding(pageHeader.data_page_header.repetition_level_encoding);
-    final Encoding dlEncoding = 
METADATA_CONVERTER.getEncoding(pageHeader.data_page_header.definition_level_encoding);
-    final Encoding valueEncoding = 
METADATA_CONVERTER.getEncoding(pageHeader.data_page_header.encoding);
+    final Encoding rlEncoding = 
METADATA_CONVERTER.getEncoding(pageHeaderInfoProvider.getRepetitionLevelEncoding());
+    final Encoding dlEncoding = 
METADATA_CONVERTER.getEncoding(pageHeaderInfoProvider.getDefinitionLevelEncoding());
+    final Encoding valueEncoding = 
METADATA_CONVERTER.getEncoding(pageHeaderInfoProvider.getEncoding());
 
     byteLength = pageHeader.uncompressed_page_size;
 
@@ -453,8 +457,9 @@ class PageReader {
     
Preconditions.checkState(parentColumnReader.columnDescriptor.getMaxDefinitionLevel()
 == 1);
     Preconditions.checkState(currentPageCount > 0);
 
-    final Encoding rlEncoding = 
METADATA_CONVERTER.getEncoding(pageHeader.data_page_header.repetition_level_encoding);
-    final Encoding dlEncoding = 
METADATA_CONVERTER.getEncoding(pageHeader.data_page_header.definition_level_encoding);
+    PageHeaderInfoProvider pageHeaderInfoProvider = 
pageHeaderInfoProviderBuilder(pageHeader);
+    final Encoding rlEncoding = 
METADATA_CONVERTER.getEncoding(pageHeaderInfoProvider.getRepetitionLevelEncoding());
+    final Encoding dlEncoding = 
METADATA_CONVERTER.getEncoding(pageHeaderInfoProvider.getDefinitionLevelEncoding());
 
     final ByteBufferInputStream in = 
ByteBufferInputStream.wrap(pageData.nioBuffer(0, pageData.capacity()));
 
@@ -475,4 +480,84 @@ class PageReader {
       definitionLevels.skip();
     }
   }
+
+  /**
+   * Common interface for wrappers of {@link DataPageHeader} and {@link 
DataPageHeaderV2} classes.
+   */
+  private interface PageHeaderInfoProvider {
+    int getNumValues();
+
+    org.apache.parquet.format.Encoding getEncoding();
+
+    org.apache.parquet.format.Encoding getDefinitionLevelEncoding();
+
+    org.apache.parquet.format.Encoding getRepetitionLevelEncoding();
+  }
+
+  private static class DataPageHeaderV1InfoProvider implements 
PageHeaderInfoProvider {
+    private final DataPageHeader dataPageHeader;
+
+    private DataPageHeaderV1InfoProvider(DataPageHeader dataPageHeader) {
+      this.dataPageHeader = dataPageHeader;
+    }
+
+    @Override
+    public int getNumValues() {
+      return dataPageHeader.getNum_values();
+    }
+
+    @Override
+    public org.apache.parquet.format.Encoding getEncoding() {
+      return dataPageHeader.getEncoding();
+    }
+
+    @Override
+    public org.apache.parquet.format.Encoding getDefinitionLevelEncoding() {
+      return dataPageHeader.getDefinition_level_encoding();
+    }
+
+    @Override
+    public org.apache.parquet.format.Encoding getRepetitionLevelEncoding() {
+      return dataPageHeader.getRepetition_level_encoding();
+    }
+  }
+
+  private static class DataPageHeaderV2InfoProvider implements 
PageHeaderInfoProvider {
+    private final DataPageHeaderV2 dataPageHeader;
+
+    private DataPageHeaderV2InfoProvider(DataPageHeaderV2 dataPageHeader) {
+      this.dataPageHeader = dataPageHeader;
+    }
+
+    @Override
+    public int getNumValues() {
+      return dataPageHeader.getNum_values();
+    }
+
+    @Override
+    public org.apache.parquet.format.Encoding getEncoding() {
+      return dataPageHeader.getEncoding();
+    }
+
+    @Override
+    public org.apache.parquet.format.Encoding getDefinitionLevelEncoding() {
+      return org.apache.parquet.format.Encoding.PLAIN;
+    }
+
+    @Override
+    public org.apache.parquet.format.Encoding getRepetitionLevelEncoding() {
+      return org.apache.parquet.format.Encoding.PLAIN;
+    }
+  }
+
+  private static PageHeaderInfoProvider 
pageHeaderInfoProviderBuilder(PageHeader pageHeader) {
+    switch (pageHeader.getType()) {
+      case DATA_PAGE:
+        return new 
DataPageHeaderV1InfoProvider(pageHeader.getData_page_header());
+      case DATA_PAGE_V2:
+        return new 
DataPageHeaderV2InfoProvider(pageHeader.getData_page_header_v2());
+      default:
+        throw new DrillRuntimeException("Unsupported page header type:" + 
pageHeader.getType());
+    }
+  }
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java
index 9e019ee..4c25b1a 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java
@@ -240,11 +240,12 @@ public class ParquetFixedWidthDictionaryReaders {
     // this method is called by its superclass during a read loop
     @Override
     protected void readField(long recordsToReadInThisPass) {
+      int dataTypeLengthInBytes = (int) Math.ceil(dataTypeLengthInBits / 8.0);
       recordsReadInThisIteration =
           Math.min(pageReader.currentPageCount - pageReader.valuesRead,
               recordsToReadInThisPass - valuesReadInCurrentPass);
 
-      switch (columnDescriptor.getType()) {
+      switch (columnDescriptor.getPrimitiveType().getPrimitiveTypeName()) {
         case INT32:
           if (usingDictionary) {
             for (int i = 0; i < recordsReadInThisIteration; i++) {
@@ -253,7 +254,10 @@ public class ParquetFixedWidthDictionaryReaders {
             }
             setWriteIndex();
           } else {
-            super.readField(recordsToReadInThisPass);
+            for (int i = 0; i < recordsReadInThisIteration; i++) {
+              byte[] bytes = Ints.toByteArray(pageReader.pageData.getInt((int) 
readStartInBytes + i * dataTypeLengthInBytes));
+              valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, 
bytes, 0, dataTypeLengthInBytes);
+            }
           }
           break;
         case INT64:
@@ -264,9 +268,33 @@ public class ParquetFixedWidthDictionaryReaders {
             }
             setWriteIndex();
           } else {
-            super.readField(recordsToReadInThisPass);
+            for (int i = 0; i < recordsReadInThisIteration; i++) {
+              byte[] bytes = 
Longs.toByteArray(pageReader.pageData.getLong((int) readStartInBytes + i * 
dataTypeLengthInBytes));
+              valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, 
bytes, 0, dataTypeLengthInBytes);
+            }
           }
           break;
+        case FIXED_LEN_BYTE_ARRAY:
+        case BINARY:
+          if (usingDictionary) {
+            VarDecimalVector.Mutator mutator = valueVec.getMutator();
+            for (int i = 0; i < recordsReadInThisIteration; i++) {
+              Binary currDictValToWrite = 
pageReader.dictionaryValueReader.readBytes();
+              mutator.setSafe(valuesReadInCurrentPass + i,
+                  currDictValToWrite.toByteBuffer().slice(), 0, 
currDictValToWrite.length());
+            }
+            // Set the write Index. The next page that gets read might be a 
page that does not use dictionary encoding
+            // and we will go into the else condition below. The readField 
method of the parent class requires the
+            // writer index to be set correctly.
+            int writerIndex = valueVec.getBuffer().writerIndex();
+            valueVec.getBuffer().setIndex(0, writerIndex + (int) readLength);
+          } else {
+            for (int i = 0; i < recordsReadInThisIteration; i++) {
+              int start = (int) readStartInBytes + i * dataTypeLengthInBytes;
+              valueVec.getMutator().setSafe(valuesReadInCurrentPass + i,
+                  start, start + dataTypeLengthInBytes, pageReader.pageData);
+            }
+          }
       }
     }
 
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetSimpleTestFileGenerator.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetSimpleTestFileGenerator.java
index d4a6fcb..232aec9 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetSimpleTestFileGenerator.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetSimpleTestFileGenerator.java
@@ -25,6 +25,7 @@ import org.apache.parquet.example.data.GroupFactory;
 import org.apache.parquet.example.data.simple.SimpleGroupFactory;
 import org.apache.parquet.hadoop.ParquetFileWriter;
 import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.example.ExampleParquetWriter;
 import org.apache.parquet.hadoop.example.GroupWriteSupport;
 import org.apache.parquet.hadoop.metadata.CompressionCodecName;
 import org.apache.parquet.io.api.Binary;
@@ -46,8 +47,7 @@ import static 
org.apache.drill.exec.store.parquet.ParquetSimpleTestFileGenerator
  * that are supported by Drill. Embedded types specified in the Parquet 
specification are not covered by the
  * examples but can be added.
  * To create a new parquet file, define a schema, create a GroupWriter based 
on the schema, then add values
- * for individual records to the GroupWriter.<br>
- *     TODO: DRILL-7904. To run this tool please use 28.2-jre <guava.version> 
instead of 19.0 in main POM file
+ * for individual records to the GroupWriter.
  * @see  org.apache.drill.exec.store.parquet.TestFileGenerator 
TestFileGenerator
  * @see org.apache.parquet.hadoop.example.GroupWriteSupport GroupWriteSupport
  * @see org.apache.parquet.example.Paper Dremel Example
@@ -55,7 +55,7 @@ import static 
org.apache.drill.exec.store.parquet.ParquetSimpleTestFileGenerator
 public class ParquetSimpleTestFileGenerator {
 
   public enum EnumType {
-    RANDOM_VALUE, MAX_VALUE, MIN_VALUE;
+    RANDOM_VALUE, MAX_VALUE, MIN_VALUE
   }
 
   public static Path root = new Path("file:/tmp/parquet/");
@@ -221,20 +221,16 @@ public class ParquetSimpleTestFileGenerator {
 
     GroupWriteSupport.setSchema(schema, conf);
 
-    ParquetWriter<Group> writer =
-        new ParquetWriter<Group>(initFile(fileName),
-            ParquetFileWriter.Mode.OVERWRITE,
-            new GroupWriteSupport(),
-            CompressionCodecName.SNAPPY,
-            1024,
-            1024,
-            512,
-            dictEncoding, // enable dictionary encoding,
-            false,
-            ParquetProperties.WriterVersion.PARQUET_1_0, conf
-        );
-
-    return writer;
+    return ExampleParquetWriter.builder(initFile(fileName))
+        .withDictionaryEncoding(dictEncoding)
+        .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
+        .withCompressionCodec(CompressionCodecName.SNAPPY)
+        .withPageSize(1024)
+        .withDictionaryPageSize(512)
+        .withValidation(false)
+        .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0)
+        .withConf(conf)
+        .build();
   }
 
   public static void writeComplexValues(GroupFactory gf, ParquetWriter<Group> 
complexWriter, boolean writeNulls) throws IOException {
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetLogicalTypes.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetLogicalTypes.java
index 5b7e891..efe3f08 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetLogicalTypes.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetLogicalTypes.java
@@ -28,6 +28,7 @@ import java.util.Arrays;
 
 import org.apache.drill.categories.ParquetTest;
 import org.apache.drill.categories.UnlikelyTest;
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.test.BaseTestQuery;
 import org.joda.time.Period;
 import org.junit.Test;
@@ -700,7 +701,7 @@ public class TestParquetLogicalTypes extends BaseTestQuery {
   }
 
   @Test
-  public void testDecimalDictionaryEncoding() throws Exception {
+  public void testNullableDecimalDictionaryEncoding() throws Exception {
     testBuilder()
         .sqlQuery("select RegHrs from cp.`parquet/dict_dec.parquet`")
         .ordered()
@@ -708,4 +709,40 @@ public class TestParquetLogicalTypes extends BaseTestQuery 
{
         .baselineValues(new BigDecimal("8.000000"))
         .go();
   }
+
+  @Test
+  public void testRequiredDecimalDictionaryEncoding() throws Exception {
+    testBuilder()
+        .sqlQuery("select * from 
cp.`parquet/SingleRow_RequiredFixedLength_Decimal.parquet`")
+        .ordered()
+        .baselineColumns("Cost", "Sale")
+        .baselineValues(new BigDecimal("550.000000"), new 
BigDecimal("1050.000000"))
+        .go();
+  }
+
+  @Test
+  public void testRequiredIntervalDictionaryEncoding() throws Exception {
+    testBuilder()
+        .sqlQuery("select * from cp.`parquet/interval_dictionary.parquet`")
+        .unOrdered()
+        .baselineColumns("_INTERVAL_fixed_len_byte_array_12")
+        
.baselineValues(Period.months(875770417).plusDays(943142453).plusMillis(1650536505))
+        
.baselineValues(Period.months(16843009).plusDays(16843009).plusMillis(16843009))
+        .baselineValues(Period.seconds(0))
+        .go();
+  }
+
+  @Test
+  public void testNullableIntervalDictionaryEncoding() throws Exception {
+    alterSession(ExecConstants.PARQUET_NEW_RECORD_READER, true);
+    testBuilder()
+        .sqlQuery("select * from 
cp.`parquet/nullable_interval_dictionary.parquet`")
+        .unOrdered()
+        .baselineColumns("_INTERVAL_fixed_len_byte_array_12")
+        
.baselineValues(Period.months(875770417).plusDays(943142453).plusMillis(1650536505))
+        
.baselineValues(Period.months(16843009).plusDays(16843009).plusMillis(16843009))
+        .baselineValues(Period.seconds(0))
+        .baselineValues((Object) null)
+        .go();
+  }
 }
diff --git 
a/exec/java-exec/src/test/resources/parquet/SingleRow_RequiredFixedLength_Decimal.parquet
 
b/exec/java-exec/src/test/resources/parquet/SingleRow_RequiredFixedLength_Decimal.parquet
new file mode 100644
index 0000000..2faac7a
Binary files /dev/null and 
b/exec/java-exec/src/test/resources/parquet/SingleRow_RequiredFixedLength_Decimal.parquet
 differ
diff --git 
a/exec/java-exec/src/test/resources/parquet/interval_dictionary.parquet 
b/exec/java-exec/src/test/resources/parquet/interval_dictionary.parquet
new file mode 100644
index 0000000..202314e
Binary files /dev/null and 
b/exec/java-exec/src/test/resources/parquet/interval_dictionary.parquet differ
diff --git 
a/exec/java-exec/src/test/resources/parquet/nullable_interval_dictionary.parquet
 
b/exec/java-exec/src/test/resources/parquet/nullable_interval_dictionary.parquet
new file mode 100644
index 0000000..8bd72c3
Binary files /dev/null and 
b/exec/java-exec/src/test/resources/parquet/nullable_interval_dictionary.parquet
 differ

Reply via email to