This is an automated email from the ASF dual-hosted git repository.

dbecker pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 4261225f65cc952ff544f8e926b72c01c0f8ebfb
Author: Csaba Ringhofer <[email protected]>
AuthorDate: Mon Apr 17 21:35:30 2023 +0200

    IMPALA-6433: Add read support for PageHeaderV2
    
    Parquet v2 means several changes in Parquet files compared to v1:
    
    1. file version = 2 instead of 1
    
    
https://github.com/apache/parquet-format/blob/c185faf0c4fc0c7d3075d1abd4ed0985cdbe5d87/src/main/thrift/parquet.thrift#L1016
    Before this patch Impala rejected Parquet files with version!=1.
    
    2. possible use of DataPageHeaderV2 instead DataPageHeader
    
    
https://github.com/apache/parquet-format/blob/c185faf0c4fc0c7d3075d1abd4ed0985cdbe5d87/src/main/thrift/parquet.thrift#L561
    
    The main differences compared to V1 DataPageHeader:
    a. rep/def levels are not compressed, so the compressed part contains
       only the actual encoded values
    b. rep/def levels must be RLE encoded (Impala only supports RLE encoded
       levels even for V1 pages)
    c. compression can be turned on/off per page (member is_compressed)
    d. number of nulls (member num_nulls) is required - in v1 it was
       included in statistics which is optional
    e. number of rows is required (member num_rows) which can help with
       matching collection items with the top level collection
    
    The patch adds support for understanding v2 data pages but does not
    implement some potential optimizations:
    
    a. would allow an optimization for queries that need only the nullness
    of a column but not the actual value: as the values are not needed the
    decompression of the page data can be skipped. This optimization is not
    implemented - currently Impala materializes both the null bit and the
    value for all columns regardless of whether the value is actually
    needed.
    
    d. could be also used for optimizations / additional validity checks
    but it is not used currently
    
    e. could make skipping rows easier but is not used, as the existing
    scanner has to be able to skip rows efficiently also in v1 files so
    it can't rely on num_rows
    
    3. possible use of new encodings (e.g. DELTA_BINARY_PACKED)
    
    No new encoding is added - when an unsupported encoding is encountered
    Impala returns an error.
    
    parquet-mr uses new encodings (DELTA_BINARY_PACKED, DELTA_BYTE_ARRAY)
    for most types if the file version is 2, so with this patch Impala is
    not yet able to read all v2 Parquet tables written by Hive.
    
    4. Encoding PLAIN_DICTIONARY is deprecated and RLE_DICTIONARY is used
    instead. The semantics of the two encodings are exactly the same.
    
    Additional changes:
    Some responsibilites are moved from ParquetColumnReader to
    ParquetColumnChunkReader:
    - ParquetColumnChunkReader decodes rep/def level sizes to hide v1/v2
      differences (see 2.a.)
    - ParquetColumnChunkReader skips empty data pages in
      ReadNextDataPageHeader
    - the state machine of ParquetColumnChunkReader is simplified by
      separating data page header reading / reading rest of the page
    
    Testing:
    - added 4 v2 Parquet test tables (written by Hive) to cover
      compressed / uncompressed and scalar/complex cases
    - added EE and fuzz tests for the test tables above
    - manual tested v2 Parquet files written by pyarrow
    - ran core tests
    
    Note that no test is added where some pages are compressed while
    some are not. It would be tricky to create such files with existing
    writers. The code should handle this case and it is very unlikely that
    files like this will be encountered.
    
    Change-Id: I282962a6e4611e2b662c04a81592af83ecaf08ca
    Reviewed-on: http://gerrit.cloudera.org:8080/19793
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/exec/parquet/hdfs-parquet-table-writer.cc   |   2 +-
 be/src/exec/parquet/parquet-column-chunk-reader.cc | 206 +++++++++++++++++----
 be/src/exec/parquet/parquet-column-chunk-reader.h  |  71 ++++---
 be/src/exec/parquet/parquet-column-readers.cc      | 125 ++++++-------
 be/src/exec/parquet/parquet-column-readers.h       |  15 +-
 be/src/exec/parquet/parquet-common.h               |   3 +-
 be/src/exec/parquet/parquet-level-decoder.cc       |  75 ++++----
 be/src/exec/parquet/parquet-level-decoder.h        |  18 +-
 be/src/exec/parquet/parquet-metadata-utils.cc      |   2 +-
 .../functional/functional_schema_template.sql      |  86 +++++++++
 .../datasets/functional/schema_constraints.csv     |   5 +
 .../queries/QueryTest/parquet-v2.test              |  97 ++++++++++
 tests/query_test/test_scanners.py                  |  15 ++
 tests/query_test/test_scanners_fuzz.py             |  25 +++
 14 files changed, 562 insertions(+), 183 deletions(-)

diff --git a/be/src/exec/parquet/hdfs-parquet-table-writer.cc 
b/be/src/exec/parquet/hdfs-parquet-table-writer.cc
index 80c0f1a7f..f07921f1e 100644
--- a/be/src/exec/parquet/hdfs-parquet-table-writer.cc
+++ b/be/src/exec/parquet/hdfs-parquet-table-writer.cc
@@ -1280,7 +1280,7 @@ void HdfsParquetTableWriter::ConfigureForIceberg(int 
num_cols) {
 
 Status HdfsParquetTableWriter::Init() {
   // Initialize file metadata
-  file_metadata_.version = PARQUET_CURRENT_VERSION;
+  file_metadata_.version = PARQUET_WRITER_VERSION;
 
   stringstream created_by;
   created_by << "impala version " << GetDaemonBuildVersion()
diff --git a/be/src/exec/parquet/parquet-column-chunk-reader.cc 
b/be/src/exec/parquet/parquet-column-chunk-reader.cc
index 29c2cfb67..61866da2f 100644
--- a/be/src/exec/parquet/parquet-column-chunk-reader.cc
+++ b/be/src/exec/parquet/parquet-column-chunk-reader.cc
@@ -19,6 +19,7 @@
 
 #include <string>
 
+#include "exec/parquet/parquet-level-decoder.h"
 #include "runtime/mem-pool.h"
 #include "runtime/runtime-state.h"
 #include "runtime/scoped-buffer.h"
@@ -44,13 +45,16 @@ static bool RequiresSkippedDictionaryHeaderCheck(
 }
 
 ParquetColumnChunkReader::ParquetColumnChunkReader(HdfsParquetScanner* parent,
-    string schema_name, int slot_id, ValueMemoryType value_mem_type)
+    string schema_name, int slot_id, ValueMemoryType value_mem_type,
+    bool has_rep_level, bool has_def_level)
   : parent_(parent),
     schema_name_(schema_name),
     page_reader_(parent, schema_name),
     slot_id_(slot_id),
     data_page_pool_(new MemPool(parent->scan_node_->mem_tracker())),
-    value_mem_type_(value_mem_type)
+    value_mem_type_(value_mem_type),
+    has_rep_level_(has_rep_level),
+    has_def_level_(has_def_level)
 {
 }
 
@@ -203,79 +207,185 @@ Status 
ParquetColumnChunkReader::ReadDictionaryData(ScopedBuffer* uncompressed_b
   return Status::OK();
 }
 
-Status ParquetColumnChunkReader::ReadNextDataPage(
-    bool* eos, uint8_t** data, int* data_size, bool read_data) {
-  // Read the next data page, skipping page types we don't care about. This 
method should
-  // be called after we know that the first page is not a dictionary page. 
Therefore, if
-  // we find a dictionary page, it is an error in the parquet file and we 
return a non-ok
-  // status (returned by page_reader_.ReadPageHeader()).
+Status ParquetColumnChunkReader::ReadNextDataPageHeader(int* num_values) {
+  // Read the next data page header, skipping page types we don't care about 
and empty
+  // data pages. This method should be called after we know that the first 
page is not a
+  // dictionary page. Therefore, if we find a dictionary page, it is an error 
in
+  // the parquet file and we return a non-ok status (returned by
+  // page_reader_.ReadPageHeader()).
+  bool eos = false;
+  *num_values = 0;
   bool next_data_page_found = false;
   while (!next_data_page_found) {
-    RETURN_IF_ERROR(page_reader_.ReadPageHeader(eos));
+    RETURN_IF_ERROR(page_reader_.ReadPageHeader(&eos));
 
-    const parquet::PageHeader current_page_header = CurrentPageHeader();
+    const parquet::PageHeader& header = CurrentPageHeader();
+
+    // If page_reader_.ReadPageHeader() > 0 and the page is a dictionary page 
then
+    // ReadPageHeader would have returned an error.
     DCHECK(page_reader_.PageHeadersRead() > 0
-        || !current_page_header.__isset.dictionary_page_header)
+        || !header.__isset.dictionary_page_header)
         << "Should not call this method on the first page if it is a 
dictionary.";
 
-    if (*eos) return Status::OK();
-
-    if (current_page_header.type == parquet::PageType::DATA_PAGE) {
-      next_data_page_found = true;
+    if (eos) return Status::OK();
+
+    if (header.type== parquet::PageType::DATA_PAGE
+        || header.type == parquet::PageType::DATA_PAGE_V2) {
+      bool is_v2 = header.type == parquet::PageType::DATA_PAGE_V2;
+      int tmp_num_values = is_v2 ? header.data_page_header_v2.num_values
+                                 : header.data_page_header.num_values;
+      if (tmp_num_values < 0) {
+        return Status(Substitute("Error reading data page in Parquet file 
'$0'. "
+              "Invalid number of values in metadata: $1", filename(), 
tmp_num_values));
+      } else if (tmp_num_values == 0) {
+        // Skip pages with 0 values.
+        VLOG_FILE << "Found empty page in " << filename();
+        RETURN_IF_ERROR(SkipPageData());
+      } else {
+        *num_values = tmp_num_values;
+        next_data_page_found = true;
+      }
     } else {
       // We can safely skip non-data pages
       RETURN_IF_ERROR(SkipPageData());
     }
   }
-  if (read_data) {
-    return ReadDataPageData(data, data_size);
-  } else {
-    return Status::OK();
+  return Status::OK();
+}
+
+Status ParquetColumnChunkReader::ProcessRepDefLevelsInDataPageV1(
+    const parquet::DataPageHeader* header_v1,  DataPageInfo* page_info,
+    uint8_t** data, int* data_size) {
+  page_info->rep_level_encoding = header_v1->repetition_level_encoding;
+  page_info->def_level_encoding = header_v1->definition_level_encoding;
+
+  int32_t rep_level_size = 0;
+  if (has_rep_level_) {
+    RETURN_IF_ERROR(ParquetLevelDecoder::ValidateEncoding(
+        filename(), page_info->rep_level_encoding));
+    RETURN_IF_ERROR(ParquetLevelDecoder::ParseRleByteSize(
+        filename(), data, data_size, &rep_level_size));
+  }
+  page_info->rep_level_ptr = *data;
+  page_info->rep_level_size = rep_level_size;
+  *data += rep_level_size;
+  *data_size -= rep_level_size;
+
+  int32_t def_level_size = 0;
+  if (has_def_level_) {
+    RETURN_IF_ERROR(ParquetLevelDecoder::ValidateEncoding(
+        filename(), page_info->def_level_encoding));
+    RETURN_IF_ERROR(ParquetLevelDecoder::ParseRleByteSize(
+        filename(), data, data_size, &def_level_size));
   }
+  page_info->def_level_ptr = *data;
+  page_info->def_level_size = def_level_size;
+  *data += def_level_size;
+  *data_size -= def_level_size;
+
+  return Status::OK();
 }
 
-Status ParquetColumnChunkReader::ReadDataPageData(uint8_t** data, int* 
data_size) {
-  const parquet::PageHeader& current_page_header = CurrentPageHeader();
+Status ParquetColumnChunkReader::ProcessRepDefLevelsInDataPageV2(
+    const parquet::DataPageHeaderV2* header_v2,
+    DataPageInfo* page_info, uint8_t* data, int max_size) {
+  int rep_level_size = header_v2->repetition_levels_byte_length;
+  int def_level_size = header_v2->definition_levels_byte_length;
+  if (rep_level_size < 0 || def_level_size < 0
+      || rep_level_size + def_level_size > max_size) {
+    return Status(Substitute("Corrupt rep/def level sizes in v2 data page in 
file '$0'. "
+        "rep level size: $1 def level size: $2 max size: $3",
+        filename(), rep_level_size, def_level_size, max_size));
+  }
 
-  int compressed_size = current_page_header.compressed_page_size;
-  int uncompressed_size = current_page_header.uncompressed_page_size;
-  uint8_t* compressed_data;
+  page_info->rep_level_size = rep_level_size;
+  page_info->def_level_size = def_level_size;
+  page_info->rep_level_ptr = data;
+  page_info->def_level_ptr = data + rep_level_size;
+  // v2 pages always use RLE for rep/def levels.
+  page_info->rep_level_encoding = parquet::Encoding::RLE;
+  page_info->def_level_encoding = parquet::Encoding::RLE;
+
+  return Status::OK();
+}
+
+Status ParquetColumnChunkReader::ReadDataPageData(DataPageInfo* page_info) {
+  DCHECK(page_info != nullptr);
+  page_info->is_valid = false;
+
+  const parquet::PageHeader& header = CurrentPageHeader();
+  bool is_v2 = header.type == parquet::PageType::DATA_PAGE_V2;
+  DCHECK(is_v2 || header.type == parquet::PageType::DATA_PAGE);
+
+  // In v2 pages if decompressor_ == nullptr it is still possible that 
is_compressed
+  // is true in the header (parquet-mr writes like this if 
compression=UNCOMPRESSED).
+  bool is_compressed = decompressor_.get() != nullptr
+      && (!is_v2 || header.data_page_header_v2.is_compressed);
 
+  int orig_compressed_size = header.compressed_page_size;
+  int orig_uncompressed_size = header.uncompressed_page_size;
+  int compressed_size = orig_compressed_size;
+  int uncompressed_size = orig_uncompressed_size;
+
+  // Read compressed data.
+  uint8_t* compressed_data;
   RETURN_IF_ERROR(page_reader_.ReadPageData(&compressed_data));
 
+  // If v2 data page, fill rep/def level info based on header. For v1 pages 
this will be
+  // done after decompression.
+  if (is_v2) {
+    
RETURN_IF_ERROR(ProcessRepDefLevelsInDataPageV2(&header.data_page_header_v2,
+        page_info, compressed_data, orig_uncompressed_size));
+    // In v2 pages compressed_page_size size also includes the uncompressed
+    // rep/def levels.
+    // 
https://github.com/apache/parquet-format/blob/2a481fe1aad64ff770e21734533bb7ef5a057dac/src/main/thrift/parquet.thrift#L578
+    int levels_size = page_info->rep_level_size + page_info->def_level_size;
+    compressed_size -= levels_size;
+    uncompressed_size -= levels_size;
+    compressed_data += levels_size;
+  }
+
   const bool has_slot_desc = value_mem_type_ != ValueMemoryType::NO_SLOT_DESC;
 
-  *data_size = uncompressed_size;
-  if (decompressor_.get() != nullptr) {
+  int data_size = uncompressed_size;
+  uint8_t* data = nullptr;
+  if (is_compressed) {
     SCOPED_TIMER(parent_->decompress_timer_);
     uint8_t* decompressed_buffer;
     RETURN_IF_ERROR(AllocateUncompressedDataPage(
         uncompressed_size, "decompressed data", &decompressed_buffer));
+    int actual_uncompressed_size = uncompressed_size;
     RETURN_IF_ERROR(decompressor_->ProcessBlock32(true,
-        compressed_size, compressed_data, &uncompressed_size,
+        compressed_size, compressed_data, &actual_uncompressed_size,
         &decompressed_buffer));
     // TODO: can't we call stream_->ReleaseCompletedResources(false); at this 
point?
-    VLOG_FILE << "Decompressed " << current_page_header.compressed_page_size
-              << " to " << uncompressed_size;
-    if (current_page_header.uncompressed_page_size != uncompressed_size) {
+    //       (we can't in v2 data page as the original buffer contains rep/def 
levels)
+    VLOG_FILE << "Decompressed " << compressed_size
+              << " to " << actual_uncompressed_size;
+    if (uncompressed_size != actual_uncompressed_size) {
       return Status(Substitute("Error decompressing data page in file '$0'. "
           "Expected $1 uncompressed bytes but got $2", filename(),
-          current_page_header.uncompressed_page_size, uncompressed_size));
+          uncompressed_size, actual_uncompressed_size));
     }
-    *data = decompressed_buffer;
+    data = decompressed_buffer;
 
     if (has_slot_desc) {
-      parent_->scan_node_->UpdateBytesRead(slot_id_, uncompressed_size, 
compressed_size);
-      parent_->UpdateUncompressedPageSizeCounter(uncompressed_size);
-      parent_->UpdateCompressedPageSizeCounter(compressed_size);
+      // Use original sizes (includes levels in v2) in the profile.
+      parent_->scan_node_->UpdateBytesRead(
+          slot_id_, orig_uncompressed_size, orig_compressed_size);
+      parent_->UpdateUncompressedPageSizeCounter(orig_uncompressed_size);
+      parent_->UpdateCompressedPageSizeCounter(orig_compressed_size);
     }
   } else {
     if (compressed_size != uncompressed_size) {
       return Status(Substitute("Error reading data page in file '$0'. "
-          "Expected $1 bytes but got $2", filename(),
+          "Compressed size ($1) should be the same as uncompressed size ($2) "
+          "in pages without compression.", filename(),
           compressed_size, uncompressed_size));
     }
 
+    // TODO: could skip copying when the data page is dict encoded as strings
+    //       will point to the dictionary instead of the data buffer 
(IMPALA-12137)
     const bool copy_buffer = value_mem_type_ == ValueMemoryType::VAR_LEN_STR;
 
     if (copy_buffer) {
@@ -286,15 +396,31 @@ Status 
ParquetColumnChunkReader::ReadDataPageData(uint8_t** data, int* data_size
       RETURN_IF_ERROR(AllocateUncompressedDataPage(
           uncompressed_size, "uncompressed variable-length data", &buffer));
       memcpy(buffer, compressed_data, uncompressed_size);
-      *data = buffer;
+      data = buffer;
     } else {
-      *data = compressed_data;
+      data = compressed_data;
     }
     if (has_slot_desc) {
-      parent_->scan_node_->UpdateBytesRead(slot_id_, uncompressed_size, 0);
-      parent_->UpdateUncompressedPageSizeCounter(uncompressed_size);
+      // Use original sizes (includes levels in v2) in the profile.
+      parent_->scan_node_->UpdateBytesRead(slot_id_, orig_uncompressed_size, 
0);
+      parent_->UpdateUncompressedPageSizeCounter(orig_uncompressed_size);
     }
+
   }
+  // The buffers to return are ready at this point.
+
+  // If v1 data page, fill rep/def level info by parsing the beginning of the 
data. For
+  // v2 pages this was done before decompression.
+  if (!is_v2) {
+    RETURN_IF_ERROR(ProcessRepDefLevelsInDataPageV1(&header.data_page_header, 
page_info,
+        &data, &data_size));
+  }
+
+  page_info->data_encoding = is_v2 ? header.data_page_header_v2.encoding
+                                   : header.data_page_header.encoding;
+  page_info->data_ptr = data;
+  page_info->data_size = data_size;
+  page_info->is_valid = true;
 
   return Status::OK();
 }
diff --git a/be/src/exec/parquet/parquet-column-chunk-reader.h 
b/be/src/exec/parquet/parquet-column-chunk-reader.h
index dcef83231..0105091e4 100644
--- a/be/src/exec/parquet/parquet-column-chunk-reader.h
+++ b/be/src/exec/parquet/parquet-column-chunk-reader.h
@@ -29,7 +29,8 @@ class MemPool;
 class ScopedBuffer;
 
 /// A class to read data from Parquet pages. It handles the page headers, 
decompression
-/// and the possible copying of the data buffers.
+/// and the possible copying of the data buffers. It also hides the 
differences between
+/// v1 and v2 data pages.
 /// Before reading, InitColumnChunk(), set_io_reservation() and StartScan() 
must be called
 /// in this order.
 class ParquetColumnChunkReader {
@@ -50,23 +51,29 @@ class ParquetColumnChunkReader {
     VAR_LEN_STR
   };
 
-  const char* filename() const { return parent_->filename(); }
+  // Class to hide differences between v1 and v2 data pages.
+  struct DataPageInfo {
+    parquet::Encoding::type rep_level_encoding;
+    parquet::Encoding::type def_level_encoding;
+    parquet::Encoding::type data_encoding;
+    uint8_t* rep_level_ptr = nullptr;
+    uint8_t* def_level_ptr = nullptr;
+    uint8_t* data_ptr  = nullptr;
+    int rep_level_size = -1;
+    int def_level_size = -1;
+    int data_size = -1;
+    bool is_valid = false;
+  };
 
-  const parquet::PageHeader& CurrentPageHeader() const {
-    return page_reader_.CurrentPageHeader();
-  }
+  const char* filename() const { return parent_->filename(); }
 
   io::ScanRange* scan_range() const { return page_reader_.scan_range(); }
-  parquet::PageType::type page_type() const { return CurrentPageHeader().type; 
}
   ScannerContext::Stream* stream() const { return page_reader_.stream(); }
 
-  parquet::Encoding::type encoding() const {
-    return CurrentPageHeader().data_page_header.encoding;
-  }
-
   /// Moved to implementation to be able to forward declare class in 
scoped_ptr.
   ParquetColumnChunkReader(HdfsParquetScanner* parent, std::string schema_name,
-      int slot_id, ValueMemoryType value_mem_type);
+      int slot_id, ValueMemoryType value_mem_type,
+      bool has_rep_level, bool has_def_level);
   ~ParquetColumnChunkReader();
 
   /// Resets the reader for each row group in the file and creates the scan
@@ -113,16 +120,16 @@ class ParquetColumnChunkReader {
       ScopedBuffer* uncompressed_buffer, uint8_t** dict_values,
       int64_t* data_size, int* num_entries);
 
-  /// Reads the next data page to '*data' and '*data_size', if 'read_data' is 
true.
-  /// Else reads page header only, following which client should either call
-  /// 'ReadDataPageData' or 'SkipPageData'.
-  /// Skips other types of pages (except for dictionary) until it finds a data 
page. If it
-  /// finds a dictionary page, returns an error as the dictionary page should 
be the first
-  /// page and this method should only be called if a data page is expected.
-  /// If the stream reaches the end before reading a complete page header, 
'*eos' is set
-  /// to true.
-  Status ReadNextDataPage(
-      bool* eos, uint8_t** data, int* data_size, bool read_data = true);
+  /// Reads the next non-empty data page's page header, following which the 
client
+  /// should either call 'ReadDataPageData' or 'SkipPageData'.
+  /// Skips other types of pages (except for dictionary) and empty data pages 
until it
+  /// finds a non-empty data page. If it finds a dictionary page, returns an 
error as
+  /// the dictionary page should be the first page and this method should only 
be called
+  /// if a data page is expected.
+  /// `num_values` is set to the number of values in the page (including 
nulls).
+  /// If the stream reaches the end before reading a complete page header for 
a non-empty
+  /// data page `num_values` is set to 0 to indicate EOS.
+  Status ReadNextDataPageHeader(int* num_values);
 
   /// If the column type is a variable length string, transfers the remaining 
resources
   /// backing tuples to 'mem_pool' and frees up other resources. Otherwise 
frees all
@@ -132,12 +139,11 @@ class ParquetColumnChunkReader {
   /// Skips the data part of the page. The header must be already read.
   Status SkipPageData();
 
-  /// Reads the data part of the next data page. Sets '*data' to point to the 
buffer and
-  /// '*data_size' to its size.
+  /// Reads the data part of the next data page and fills the members of 
`page_info`.
   /// If the column type is a variable length string, the buffer is allocated 
from
   /// data_page_pool_. Otherwise the returned buffer will be valid only until 
the next
   /// function call that advances the buffer.
-  Status ReadDataPageData(uint8_t** data, int* data_size);
+  Status ReadDataPageData(DataPageInfo* page_info);
 
  private:
   HdfsParquetScanner* parent_;
@@ -160,6 +166,11 @@ class ParquetColumnChunkReader {
 
   boost::scoped_ptr<Codec> decompressor_;
 
+  ValueMemoryType value_mem_type_;
+
+  const bool has_rep_level_;
+  const bool has_def_level_;
+
   /// See TryReadDictionaryPage() for information about the parameters.
   Status ReadDictionaryData(ScopedBuffer* uncompressed_buffer, uint8_t** 
dict_values,
       int64_t* data_size, int* num_entries);
@@ -170,7 +181,17 @@ class ParquetColumnChunkReader {
   Status AllocateUncompressedDataPage(
       int64_t size, const char* err_ctx, uint8_t** buffer);
 
-  ValueMemoryType value_mem_type_;
+  const parquet::PageHeader& CurrentPageHeader() const {
+    return page_reader_.CurrentPageHeader();
+  }
+
+  // Fills rep/def level related members in page_info by parsing the start of 
the buffer.
+  Status ProcessRepDefLevelsInDataPageV1(const parquet::DataPageHeader* 
header_v1,
+      DataPageInfo* page_info, uint8_t** data, int* data_size);
+
+  // Fills rep/def level related members in page_info based on the header.
+  Status ProcessRepDefLevelsInDataPageV2(const parquet::DataPageHeaderV2* 
header_v2,
+      DataPageInfo* page_info, uint8_t* data, int max_size);
 };
 
 } // namespace impala
diff --git a/be/src/exec/parquet/parquet-column-readers.cc 
b/be/src/exec/parquet/parquet-column-readers.cc
index 15bd9bfbe..7e58b1c24 100644
--- a/be/src/exec/parquet/parquet-column-readers.cc
+++ b/be/src/exec/parquet/parquet-column-readers.cc
@@ -184,7 +184,7 @@ class ScalarColumnReader : public BaseScalarColumnReader {
     dict_decoder_init_ = false;
   }
 
-  virtual Status InitDataPage(uint8_t* data, int size) override;
+  virtual Status InitDataDecoder(uint8_t* data, int size) override;
 
   virtual bool SkipEncodedValuesInPage(int64_t num_values) override;
 
@@ -344,18 +344,23 @@ inline bool ScalarColumnReader<InternalType, 
PARQUET_TYPE, MATERIALIZED>
 
 // TODO: consider performing filter selectivity checks in this function.
 template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool 
MATERIALIZED>
-Status ScalarColumnReader<InternalType, PARQUET_TYPE, 
MATERIALIZED>::InitDataPage(
+Status ScalarColumnReader<InternalType, PARQUET_TYPE, 
MATERIALIZED>::InitDataDecoder(
     uint8_t* data, int size) {
   // Data can be empty if the column contains all NULLs
   DCHECK_GE(size, 0);
   DCHECK(slot_desc_ == nullptr || slot_desc_->type().type != TYPE_BOOLEAN)
       << "Bool has specialized impl";
-  page_encoding_ = col_chunk_reader_.encoding();
   if (!IsDictionaryEncoding(page_encoding_)
       && page_encoding_ != parquet::Encoding::PLAIN) {
     return GetUnsupportedDecodingError();
   }
 
+  // PLAIN_DICTIONARY is deprecated in Parquet V2. It means the same as 
RLE_DICTIONARY
+  // so internally PLAIN_DICTIONARY can be used to represent both encodings.
+  if (page_encoding_ == parquet::Encoding::RLE_DICTIONARY) {
+    page_encoding_ = parquet::Encoding::PLAIN_DICTIONARY;
+  }
+
   // If slot_desc_ is NULL, we don't need to decode any values so 
dict_decoder_ does
   // not need to be initialized.
   if (IsDictionaryEncoding(page_encoding_) && slot_desc_ != nullptr) {
@@ -379,11 +384,10 @@ Status ScalarColumnReader<InternalType, PARQUET_TYPE, 
MATERIALIZED>::InitDataPag
 }
 
 template <>
-Status ScalarColumnReader<bool, parquet::Type::BOOLEAN, true>::InitDataPage(
+Status ScalarColumnReader<bool, parquet::Type::BOOLEAN, true>::InitDataDecoder(
     uint8_t* data, int size) {
   // Data can be empty if the column contains all NULLs
   DCHECK_GE(size, 0);
-  page_encoding_ = col_chunk_reader_.encoding();
 
   /// Boolean decoding is delegated to 'bool_decoder_'.
   if (bool_decoder_->SetData(page_encoding_, data, size)) return Status::OK();
@@ -1156,34 +1160,20 @@ Status BaseScalarColumnReader::ReadDataPage() {
     return Status::OK();
   }
 
-  bool eos;
-  int data_size;
-  RETURN_IF_ERROR(col_chunk_reader_.ReadNextDataPage(&eos, &data_, 
&data_size));
-  if (eos) return HandleTooEarlyEos();
-  data_end_ = data_ + data_size;
-  const parquet::PageHeader& current_page_header = 
col_chunk_reader_.CurrentPageHeader();
-  int num_values = current_page_header.data_page_header.num_values;
-  if (num_values < 0) {
-    return Status(Substitute("Error reading data page in Parquet file '$0'. "
-          "Invalid number of values in metadata: $1", filename(), num_values));
-  }
-  num_buffered_values_ = num_values;
+  // Read the next header, return if not found.
+  
RETURN_IF_ERROR(col_chunk_reader_.ReadNextDataPageHeader(&num_buffered_values_));
+  if (num_buffered_values_ == 0) return HandleTooEarlyEos();
+  DCHECK_GT(num_buffered_values_, 0);
+
+  // Read the data in the data page.
+  ParquetColumnChunkReader::DataPageInfo page;
+  RETURN_IF_ERROR(col_chunk_reader_.ReadDataPageData(&page));
+  DCHECK(page.is_valid);
+
   num_values_read_ += num_buffered_values_;
 
-  /// TODO: Move the level decoder initialisation to ParquetPageReader to 
abstract away
-  /// the differences between Parquet header V1 and V2.
-  // Initialize the repetition level data
-  RETURN_IF_ERROR(rep_levels_.Init(filename(),
-        &current_page_header.data_page_header.repetition_level_encoding,
-        parent_->perm_pool_.get(), parent_->state_->batch_size(), 
max_rep_level(), &data_,
-        &data_size));
-  // Initialize the definition level data
-  RETURN_IF_ERROR(def_levels_.Init(filename(),
-        &current_page_header.data_page_header.definition_level_encoding,
-        parent_->perm_pool_.get(), parent_->state_->batch_size(), 
max_def_level(), &data_,
-        &data_size));
-  // Data can be empty if the column contains all NULLs
-  RETURN_IF_ERROR(InitDataPage(data_, data_size));
+  RETURN_IF_ERROR(InitDataPageDecoders(page));
+
   // Skip rows if needed.
   RETURN_IF_ERROR(StartPageFiltering());
 
@@ -1208,45 +1198,46 @@ Status BaseScalarColumnReader::ReadNextDataPageHeader() 
{
     return Status::OK();
   }
 
-  bool eos;
-  int data_size;
-  RETURN_IF_ERROR(col_chunk_reader_.ReadNextDataPage(&eos, &data_, &data_size,
-      false /*Read next data page's header only*/));
-  if (eos) return HandleTooEarlyEos();
-  const parquet::PageHeader& current_page_header = 
col_chunk_reader_.CurrentPageHeader();
-  int num_values = current_page_header.data_page_header.num_values;
-  if (UNLIKELY(num_values < 0)) {
-    return Status(Substitute("Error reading data page in Parquet file '$0'. "
-                             "Invalid number of values in metadata: $1",
-        filename(), num_values));
-  }
-  num_buffered_values_ = num_values;
+  
RETURN_IF_ERROR(col_chunk_reader_.ReadNextDataPageHeader(&num_buffered_values_));
+  if (num_buffered_values_ == 0) return HandleTooEarlyEos();
+  DCHECK_GT(num_buffered_values_, 0);
+
   num_values_read_ += num_buffered_values_;
   if (parent_->candidate_ranges_.empty()) 
COUNTER_ADD(parent_->num_pages_counter_, 1);
   return Status::OK();
 }
 
 Status BaseScalarColumnReader::ReadCurrentDataPage() {
-  int data_size;
-  RETURN_IF_ERROR(col_chunk_reader_.ReadDataPageData(&data_, &data_size));
-  data_end_ = data_ + data_size;
-  const parquet::PageHeader& current_page_header = 
col_chunk_reader_.CurrentPageHeader();
-  /// TODO: Move the level decoder initialisation to ParquetPageReader to 
abstract away
-  /// the differences between Parquet header V1 and V2.
+  ParquetColumnChunkReader::DataPageInfo page;
+  RETURN_IF_ERROR(col_chunk_reader_.ReadDataPageData(&page));
+  DCHECK(page.is_valid);
+
+  RETURN_IF_ERROR(InitDataPageDecoders(page));
+
+  // Skip rows if needed.
+  RETURN_IF_ERROR(StartPageFiltering());
+  return Status::OK();
+}
+
+Status BaseScalarColumnReader::InitDataPageDecoders(
+    const ParquetColumnChunkReader::DataPageInfo& page) {
   // Initialize the repetition level data
+  DCHECK(page.rep_level_encoding == Encoding::RLE || page.rep_level_size == 0 
);
   RETURN_IF_ERROR(rep_levels_.Init(filename(),
-      &current_page_header.data_page_header.repetition_level_encoding,
-      parent_->perm_pool_.get(), parent_->state_->batch_size(), 
max_rep_level(), &data_,
-      &data_size));
+      parent_->perm_pool_.get(), parent_->state_->batch_size(), 
max_rep_level(),
+      page.rep_level_ptr, page.rep_level_size));
+
   // Initialize the definition level data
+  DCHECK(page.def_level_encoding == Encoding::RLE || page.def_level_size == 0 
);
   RETURN_IF_ERROR(def_levels_.Init(filename(),
-      &current_page_header.data_page_header.definition_level_encoding,
-      parent_->perm_pool_.get(), parent_->state_->batch_size(), 
max_def_level(), &data_,
-      &data_size));
+      parent_->perm_pool_.get(), parent_->state_->batch_size(), 
max_def_level(),
+      page.def_level_ptr, page.def_level_size));
+
+  page_encoding_ = page.data_encoding;
+  data_ = page.data_ptr;
+  data_end_ = data_ + page.data_size;
   // Data can be empty if the column contains all NULLs
-  RETURN_IF_ERROR(InitDataPage(data_, data_size));
-  // Skip rows if needed.
-  RETURN_IF_ERROR(StartPageFiltering());
+  RETURN_IF_ERROR(InitDataDecoder(page.data_ptr, page.data_size));
   return Status::OK();
 }
 
@@ -1661,24 +1652,18 @@ bool BaseScalarColumnReader::SkipRowsInternal(int64_t 
num_rows, int64_t skip_row
       if (!AdvanceNextPageHeader()) {
         return false;
       }
-      const parquet::PageHeader& current_page_header =
-          col_chunk_reader_.CurrentPageHeader();
-      int32_t current_page_values = 
current_page_header.data_page_header.num_values;
-      if (UNLIKELY(current_page_values <= 0)) {
-        return false;
-      }
+      DCHECK_GT(num_buffered_values_, 0);
       // Keep advancing to next page header if rows to be skipped are more 
than number
       // of values in the page. Note we will just be reading headers and 
skipping
       // pages without decompressing them as we advance.
-      while (num_rows > current_page_values) {
+      while (num_rows > num_buffered_values_) {
         
COUNTER_ADD(parent_->num_pages_skipped_by_late_materialization_counter_, 1);
-        num_rows -= current_page_values;
-        current_row_ += current_page_values;
+        num_rows -= num_buffered_values_;
+        current_row_ += num_buffered_values_;
         if (!col_chunk_reader_.SkipPageData().ok() || 
!AdvanceNextPageHeader()) {
           return false;
         }
-        current_page_values =
-            col_chunk_reader_.CurrentPageHeader().data_page_header.num_values;
+        DCHECK_GT(num_buffered_values_, 0);
       }
       // Read the data page (includes decompressing them if required).
       Status page_read = ReadCurrentDataPage();
diff --git a/be/src/exec/parquet/parquet-column-readers.h 
b/be/src/exec/parquet/parquet-column-readers.h
index 76e0825b3..7e5f21432 100644
--- a/be/src/exec/parquet/parquet-column-readers.h
+++ b/be/src/exec/parquet/parquet-column-readers.h
@@ -301,7 +301,9 @@ class BaseScalarColumnReader : public ParquetColumnReader {
       const SlotDescriptor* slot_desc)
     : ParquetColumnReader(parent, node, slot_desc),
       col_chunk_reader_(parent, node.element->name,
-        slot_desc != nullptr ? slot_desc->id() : -1, 
PageReaderValueMemoryType()) {
+        slot_desc != nullptr ? slot_desc->id() : -1, 
PageReaderValueMemoryType(),
+        max_rep_level() > 0,
+        max_def_level() > 0) {
     DCHECK_GE(node_.col_idx, 0) << node_.DebugString();
   }
 
@@ -407,7 +409,11 @@ class BaseScalarColumnReader : public ParquetColumnReader {
   ParquetLevelDecoder rep_levels_{false};
 
   /// Page encoding for values of the current data page. Cached here for perf. 
Set in
-  /// InitDataPage().
+  /// InitDataPageDecoders().
+  ///
+  /// Parquet V2 deprecated PLAIN_DICTIONARY and RLE_DICTIONARY should be used 
instead.
+  /// In this member PLAIN_DICTIONARY is used both for pages with 
PLAIN_DICTIONARY and
+  /// RLE_DICTIONARY as the encodings mean the same.
   parquet::Encoding::type page_encoding_ = parquet::Encoding::PLAIN_DICTIONARY;
 
   /// Num values remaining in the current data page
@@ -519,7 +525,10 @@ class BaseScalarColumnReader : public ParquetColumnReader {
   /// decompressed data page. Decoders can initialize state from here. The 
caller must
   /// validate the input such that 'size' is non-negative and that 'data' has 
at least
   /// 'size' bytes remaining.
-  virtual Status InitDataPage(uint8_t* data, int size) = 0;
+  virtual Status InitDataDecoder(uint8_t* data, int size) = 0;
+
+  /// Initializes decoders for rep/def levels and data.
+  Status InitDataPageDecoders(const ParquetColumnChunkReader::DataPageInfo& 
page_info);
 
   ParquetColumnChunkReader::ValueMemoryType PageReaderValueMemoryType() {
     if (slot_desc_ == nullptr) {
diff --git a/be/src/exec/parquet/parquet-common.h 
b/be/src/exec/parquet/parquet-common.h
index 0e5477961..c204871b8 100644
--- a/be/src/exec/parquet/parquet-common.h
+++ b/be/src/exec/parquet/parquet-common.h
@@ -37,7 +37,8 @@
 namespace impala {
 
 const uint8_t PARQUET_VERSION_NUMBER[4] = {'P', 'A', 'R', '1'};
-const uint32_t PARQUET_CURRENT_VERSION = 1;
+const uint32_t PARQUET_MAX_SUPPORTED_VERSION = 2;
+const uint32_t PARQUET_WRITER_VERSION = 1;
 
 /// Struct that specifies an inclusive range of rows.
 struct RowRange {
diff --git a/be/src/exec/parquet/parquet-level-decoder.cc 
b/be/src/exec/parquet/parquet-level-decoder.cc
index e4c2abc40..7bfdc6068 100644
--- a/be/src/exec/parquet/parquet-level-decoder.cc
+++ b/be/src/exec/parquet/parquet-level-decoder.cc
@@ -33,53 +33,54 @@ const int16_t ParquetLevel::ROW_GROUP_END;
 const int16_t ParquetLevel::INVALID_LEVEL;
 const int16_t ParquetLevel::INVALID_POS;
 
-Status ParquetLevelDecoder::Init(const string& filename, const Encoding::type* 
encoding,
-    MemPool* cache_pool, int cache_size, int max_level, uint8_t** data, int* 
data_size) {
-  DCHECK(*data != nullptr);
-  DCHECK_GE(*data_size, 0);
-  DCHECK_GT(cache_size, 0);
-  cache_size = BitUtil::RoundUpToPowerOf2(cache_size, 32);
-  max_level_ = max_level;
-  filename_ = filename;
-  RETURN_IF_ERROR(InitCache(cache_pool, cache_size));
-
-  // Return because there is no level data to read, e.g., required field.
-  if (max_level == 0) return Status::OK();
-
-  int32_t num_bytes = 0;
-  if (Ubsan::EnumToInt(encoding) > Encoding::MAX_ENUM_VALUE) {
+Status ParquetLevelDecoder::ValidateEncoding(const string& filename,
+    const Encoding::type encoding) {
+  if (Ubsan::EnumToInt(&encoding) > Encoding::MAX_ENUM_VALUE) {
     stringstream ss;
-    ss << "Unsupported encoding: " << Ubsan::EnumToInt(encoding);
+    ss << "Unsupported encoding: " << Ubsan::EnumToInt(&encoding);
     return Status(ss.str());
   }
-  switch (*encoding) {
-    case Encoding::RLE: {
-      Status status;
-      if (!ReadWriteUtil::Read(data, data_size, &num_bytes, &status)) {
-        return status;
-      }
-      if (num_bytes < 0 || num_bytes > *data_size) {
-        return Status(TErrorCode::PARQUET_CORRUPT_RLE_BYTES, filename, 
num_bytes);
-      }
-      int bit_width = BitUtil::Log2Ceiling64(max_level + 1);
-      rle_decoder_.Reset(*data, num_bytes, bit_width);
-      break;
-    }
-    case parquet::Encoding::BIT_PACKED:
+  switch (encoding) {
+    case Encoding::RLE:
+      return Status::OK();
+    case Encoding::BIT_PACKED:
       return Status(TErrorCode::PARQUET_BIT_PACKED_LEVELS, filename);
     default: {
       stringstream ss;
-      ss << "Unsupported encoding: " << *encoding;
+      ss << "Unsupported encoding: " << encoding;
       return Status(ss.str());
     }
   }
-  if (UNLIKELY(num_bytes < 0 || num_bytes > *data_size)) {
-    return Status(Substitute("Corrupt Parquet file '$0': $1 bytes of encoded 
levels but "
-                             "only $2 bytes left in page",
-        filename, num_bytes, *data_size));
+}
+
+Status ParquetLevelDecoder::ParseRleByteSize(const string& filename,
+    uint8_t** data, int* total_data_size, int32_t* num_bytes) {
+  Status status;
+  if (!ReadWriteUtil::Read(data, total_data_size, num_bytes, &status)) {
+    return status;
+  }
+  if (*num_bytes < 0 || *num_bytes > *total_data_size) {
+    return Status(TErrorCode::PARQUET_CORRUPT_RLE_BYTES, filename, *num_bytes);
   }
-  *data += num_bytes;
-  *data_size -= num_bytes;
+  return Status::OK();
+}
+
+Status ParquetLevelDecoder::Init(const string& filename, MemPool* cache_pool,
+    int cache_size, int max_level, uint8_t* data, int32_t num_bytes) {
+  DCHECK(data != nullptr);
+  DCHECK_GE(num_bytes, 0);
+  DCHECK_GT(cache_size, 0);
+  cache_size = BitUtil::RoundUpToPowerOf2(cache_size, 32);
+  max_level_ = max_level;
+  filename_ = filename;
+  RETURN_IF_ERROR(InitCache(cache_pool, cache_size));
+
+  // Return because there is no level data to read, e.g., required field.
+  if (max_level == 0) return Status::OK();
+
+  int bit_width = BitUtil::Log2Ceiling64(max_level + 1);
+  rle_decoder_.Reset(data, num_bytes, bit_width);
+
   return Status::OK();
 }
 
diff --git a/be/src/exec/parquet/parquet-level-decoder.h 
b/be/src/exec/parquet/parquet-level-decoder.h
index 8b3bc5818..1200b1373 100644
--- a/be/src/exec/parquet/parquet-level-decoder.h
+++ b/be/src/exec/parquet/parquet-level-decoder.h
@@ -49,11 +49,19 @@ class ParquetLevelDecoder {
     : decoding_error_code_(is_def_level_decoder ? 
TErrorCode::PARQUET_DEF_LEVEL_ERROR :
                                                   
TErrorCode::PARQUET_REP_LEVEL_ERROR) {}
 
-  /// Initialize the LevelDecoder. Reads and advances the provided data buffer 
if the
-  /// encoding requires reading metadata from the page header. 'cache_size' 
will be
-  /// rounded up to a multiple of 32 internally.
-  Status Init(const std::string& filename, const parquet::Encoding::type* 
encoding,
-      MemPool* cache_pool, int cache_size, int max_level, uint8_t** data, int* 
data_size);
+  /// Initialize the LevelDecoder. Assumes that data is RLE encoded.
+  /// 'cache_size' will be rounded up to a multiple of 32 internally.
+  Status Init(const std::string& filename, MemPool* cache_pool, int cache_size,
+      int max_level, uint8_t* data, int32_t num_bytes);
+
+  /// Parses the number of bytes used for level encoding from the buffer and 
moves
+  /// 'data' forward.
+  static Status ParseRleByteSize(const string& filename,
+      uint8_t** data, int* total_data_size, int32_t* num_bytes);
+
+  // Validates that encoding is RLE.
+  static Status ValidateEncoding(const string& filename,
+      const parquet::Encoding::type encoding);
 
   /// Returns the next level or INVALID_LEVEL if there was an error. Not as 
efficient
   /// as batched methods.
diff --git a/be/src/exec/parquet/parquet-metadata-utils.cc 
b/be/src/exec/parquet/parquet-metadata-utils.cc
index c23dd2ae7..99c2d2319 100644
--- a/be/src/exec/parquet/parquet-metadata-utils.cc
+++ b/be/src/exec/parquet/parquet-metadata-utils.cc
@@ -253,7 +253,7 @@ const std::vector<ParquetSchemaResolver::ArrayEncoding>
 
 Status ParquetMetadataUtils::ValidateFileVersion(
     const parquet::FileMetaData& file_metadata, const char* filename) {
-  if (file_metadata.version > PARQUET_CURRENT_VERSION) {
+  if (file_metadata.version > PARQUET_MAX_SUPPORTED_VERSION) {
     stringstream ss;
     ss << "File: " << filename << " is of an unsupported version. "
        << "file version: " << file_metadata.version;
diff --git a/testdata/datasets/functional/functional_schema_template.sql 
b/testdata/datasets/functional/functional_schema_template.sql
index 90e842ea8..87f0c9b39 100644
--- a/testdata/datasets/functional/functional_schema_template.sql
+++ b/testdata/datasets/functional/functional_schema_template.sql
@@ -4068,3 +4068,89 @@ INSERT INTO TABLE {db_name}{db_suffix}.{table_name} 
VALUES (6);
 transactional=true
 transactional_properties=insert_only
 ====
+---- DATASET
+functional
+---- BASE_TABLE_NAME
+alltypesagg_parquet_v2_uncompressed
+---- PARTITION_COLUMNS
+year int
+month int
+day int
+---- COLUMNS
+id int
+bool_col boolean
+tinyint_col tinyint
+smallint_col smallint
+int_col int
+bigint_col bigint
+float_col float
+double_col double
+date_string_col string
+string_col string
+timestamp_col timestamp
+---- DEPENDENT_LOAD_HIVE
+INSERT OVERWRITE {db_name}{db_suffix}.{table_name} select * from 
functional.alltypesagg;
+---- TABLE_PROPERTIES
+parquet.writer.version=v2
+parquet.compression=UNCOMPRESSED
+====
+---- DATASET
+functional
+---- BASE_TABLE_NAME
+alltypesagg_parquet_v2_snappy
+---- PARTITION_COLUMNS
+year int
+month int
+day int
+---- COLUMNS
+id int
+bool_col boolean
+tinyint_col tinyint
+smallint_col smallint
+int_col int
+bigint_col bigint
+float_col float
+double_col double
+date_string_col string
+string_col string
+timestamp_col timestamp
+---- DEPENDENT_LOAD_HIVE
+INSERT OVERWRITE {db_name}{db_suffix}.{table_name} select * from 
functional.alltypesagg;
+---- TABLE_PROPERTIES
+parquet.writer.version=v2
+parquet.compression=SNAPPY
+====
+---- DATASET
+functional
+---- BASE_TABLE_NAME
+complextypestbl_parquet_v2_uncompressed
+---- COLUMNS
+id bigint
+int_array array<int>
+int_array_array array<array<int>>
+int_map map<string, int>
+int_map_array array<map<string, int>>
+nested_struct struct<a: int, b: array<int>, c: struct<d: array<array<struct<e: 
int, f: string>>>>, g: map<string, struct<h: struct<i: array<double>>>>>
+---- DEPENDENT_LOAD_HIVE
+INSERT OVERWRITE {db_name}{db_suffix}.{table_name} select * from 
functional_parquet.complextypestbl;
+---- TABLE_PROPERTIES
+parquet.writer.version=v2
+parquet.compression=UNCOMPRESSED
+====
+---- DATASET
+functional
+---- BASE_TABLE_NAME
+complextypestbl_parquet_v2_snappy
+---- COLUMNS
+id bigint
+int_array array<int>
+int_array_array array<array<int>>
+int_map map<string, int>
+int_map_array array<map<string, int>>
+nested_struct struct<a: int, b: array<int>, c: struct<d: array<array<struct<e: 
int, f: string>>>>, g: map<string, struct<h: struct<i: array<double>>>>>
+---- DEPENDENT_LOAD_HIVE
+INSERT OVERWRITE {db_name}{db_suffix}.{table_name} select * from 
functional_parquet.complextypestbl;
+---- TABLE_PROPERTIES
+parquet.writer.version=v2
+parquet.compression=SNAPPY
+====
\ No newline at end of file
diff --git a/testdata/datasets/functional/schema_constraints.csv 
b/testdata/datasets/functional/schema_constraints.csv
index ee8bee76b..7f2af5b6d 100644
--- a/testdata/datasets/functional/schema_constraints.csv
+++ b/testdata/datasets/functional/schema_constraints.csv
@@ -372,3 +372,8 @@ table_name:insert_only_major_and_minor_compacted, 
constraint:restrict_to, table_
 
 # The table is used in large scale metadata test. File format doesn't matter 
so restrict to text only
 table_name:widetable_2000_cols_partitioned, constraint:restrict_to, 
table_format:text/none/none
+
+table_name:alltypesagg_parquet_v2_uncompressed, constraint:restrict_to, 
table_format:parquet/none/none
+table_name:alltypesagg_parquet_v2_snappy, constraint:restrict_to, 
table_format:parquet/none/none
+table_name:complextypestbl_parquet_v2_uncompressed, constraint:restrict_to, 
table_format:parquet/none/none
+table_name:complextypestbl_parquet_v2_snappy, constraint:restrict_to, 
table_format:parquet/none/none
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/parquet-v2.test 
b/testdata/workloads/functional-query/queries/QueryTest/parquet-v2.test
new file mode 100644
index 000000000..3f6ddbb8d
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/parquet-v2.test
@@ -0,0 +1,97 @@
+====
+---- QUERY
+# Check if count(*) optimization works correctly.
+select count(*) from alltypesagg_parquet_v2_uncompressed
+---- RESULTS
+11000
+---- TYPES
+BIGINT
+====
+---- QUERY
+select count(*) from alltypesagg_parquet_v2_snappy
+---- RESULTS
+11000
+---- TYPES
+BIGINT
+====
+---- QUERY
+# Check that definition levels are decoded correctly.
+select count(double_col) from alltypesagg_parquet_v2_uncompressed
+---- RESULTS
+10980
+---- TYPES
+BIGINT
+====
+---- QUERY
+select count(double_col) from alltypesagg_parquet_v2_snappy
+---- RESULTS
+10980
+---- TYPES
+BIGINT
+====
+---- QUERY
+# Check that values are decoded correctly.
+select distinct double_col from alltypesagg_parquet_v2_uncompressed
+  order by double_col limit 5
+---- RESULTS
+10.1
+20.2
+30.3
+40.4
+50.5
+---- TYPES
+double
+====
+---- QUERY
+select distinct double_col from alltypesagg_parquet_v2_snappy
+  order by double_col limit 5
+---- RESULTS
+10.1
+20.2
+30.3
+40.4
+50.5
+---- TYPES
+double
+====
+---- QUERY
+# Check that repetition levels are decoded correctly.
+select int_array from complextypestbl_parquet_v2_uncompressed
+---- RESULTS
+'[1,2,3]'
+'[null,1,2,null,3,null]'
+'[]'
+'NULL'
+'NULL'
+'NULL'
+'NULL'
+'[-1]'
+---- TYPES
+string
+====
+---- QUERY
+select int_array from complextypestbl_parquet_v2_snappy
+---- RESULTS
+'[1,2,3]'
+'[null,1,2,null,3,null]'
+'[]'
+'NULL'
+'NULL'
+'NULL'
+'NULL'
+'[-1]'
+---- TYPES
+string
+====
+---- QUERY
+# Check that DELTA_BINARY_PACKED encoding returns an error message.
+select count(id) from alltypesagg_parquet_v2_snappy
+---- CATCH
+unsupported encoding: DELTA_BINARY_PACKED for column 'id'
+====
+---- QUERY
+# Check that DELTA_BYTE_ARRAY encoding returns an error message.
+select count(string_col) from alltypesagg_parquet_v2_snappy
+---- CATCH
+unsupported encoding: DELTA_BYTE_ARRAY for column 'string_col'
+====
diff --git a/tests/query_test/test_scanners.py 
b/tests/query_test/test_scanners.py
index 45072e47a..66edaf880 100644
--- a/tests/query_test/test_scanners.py
+++ b/tests/query_test/test_scanners.py
@@ -1909,3 +1909,18 @@ class TestBinaryType(ImpalaTestSuite):
 
   def test_binary_type(self, vector):
     self.run_test_case('QueryTest/binary-type', vector)
+
+
+class TestParquetV2(ImpalaTestSuite):
+  @classmethod
+  def get_workload(cls):
+    return 'functional-query'
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestParquetV2, cls).add_test_dimensions()
+    cls.ImpalaTestMatrix.add_constraint(
+      lambda v: v.get_value('table_format').file_format == 'parquet')
+
+  def test_parquet_v2(self, vector):
+    self.run_test_case('QueryTest/parquet-v2', vector)
diff --git a/tests/query_test/test_scanners_fuzz.py 
b/tests/query_test/test_scanners_fuzz.py
index bda50f982..adea46bae 100644
--- a/tests/query_test/test_scanners_fuzz.py
+++ b/tests/query_test/test_scanners_fuzz.py
@@ -159,6 +159,28 @@ class TestScannersFuzzing(ImpalaTestSuite):
         self.run_fuzz_test(vector, "functional_orc_def", src_table_name, 
unique_database,
                            fuzz_table_name, 10)
 
+  def test_fuzz_parquet_v2(self, vector, unique_database):
+    table_format = vector.get_value('table_format')
+    if table_format.file_format != 'parquet': pytest.skip()
+
+    tables = ["alltypesagg_parquet_v2_uncompressed", 
"alltypesagg_parquet_v2_snappy"]
+    for table_name in tables:
+      custom_queries = [
+        "select avg(float_col), avg(double_col), avg(timestamp_col)"
+        "  from %s where bool_col;" % table_name
+      ]
+      self.run_fuzz_test(vector, "functional_parquet", table_name, 
unique_database,
+                      table_name, 10, custom_queries)
+
+    tables = ["complextypestbl_parquet_v2_uncompressed",
+              "complextypestbl_parquet_v2_snappy"]
+    for table_name in tables:
+      custom_queries = [
+        "select int_array from %s;" % table_name
+      ]
+      self.run_fuzz_test(vector, "functional_parquet", table_name, 
unique_database,
+                  table_name, 10, custom_queries)
+
   # TODO: add test coverage for additional data types like char and varchar
 
   def run_fuzz_test(self, vector, src_db, src_table, fuzz_db, fuzz_table, 
num_copies=1,
@@ -311,6 +333,9 @@ class TestScannersFuzzing(ImpalaTestSuite):
           not suffix.startswith('base_') and
           not suffix.startswith('delta_') and
           not suffix.startswith('delete_delta_')):
+        # Null partitions are stored as __HIVE_DEFAULT_PARTITION__ but 
expected as null
+        # in ALTER TABLE ADD PARTITION.
+        suffix = suffix.replace("__HIVE_DEFAULT_PARTITION__", "null")
         reversed_partitions.append(suffix)
     return reversed(reversed_partitions)
 

Reply via email to