IMPALA-5842: Write page index in Parquet files

This commit builds on the previous work of
Pooja Nilangekar: https://gerrit.cloudera.org/#/c/7464/

The commit implements the write path of PARQUET-922:
"Add column indexes to parquet.thrift". As specified in the
parquet-format, Impala writes the page indexes just before
the footer. This allows much more efficient page filtering
than using the same information from the 'statistics' field
of DataPageHeader.

I updated Pooja's python tests as well.

Change-Id: Icbacf7fe3b7672e3ce719261ecef445b16f8dec9
Reviewed-on: http://gerrit.cloudera.org:8080/9693
Reviewed-by: Zoltan Borok-Nagy <borokna...@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/ccf19f9f
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/ccf19f9f
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/ccf19f9f

Branch: refs/heads/master
Commit: ccf19f9f8f2914639b6997849a56c13cfd2399b8
Parents: 05e0db3
Author: Zoltan Borok-Nagy <borokna...@cloudera.com>
Authored: Mon Apr 9 16:10:00 2018 +0200
Committer: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
Committed: Thu May 17 20:22:02 2018 +0000

----------------------------------------------------------------------
 be/src/exec/hdfs-parquet-table-writer.cc        | 227 +++++++++---
 be/src/exec/hdfs-parquet-table-writer.h         |  14 +-
 be/src/exec/parquet-column-stats.h              |  51 ++-
 be/src/exec/parquet-column-stats.inline.h       |  62 +++-
 be/src/util/CMakeLists.txt                      |   2 +
 be/src/util/string-util-test.cc                 |  84 +++++
 be/src/util/string-util.cc                      |  57 +++
 be/src/util/string-util.h                       |  42 +++
 common/thrift/parquet.thrift                    |  85 +++++
 testdata/bin/load-dependent-tables.sql          |  35 +-
 .../queries/QueryTest/stats-extrapolation.test  |  14 +-
 tests/query_test/test_chars.py                  |  33 --
 tests/query_test/test_parquet_page_index.py     | 365 +++++++++++++++++++
 tests/util/get_parquet_metadata.py              |  44 ++-
 14 files changed, 1002 insertions(+), 113 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/ccf19f9f/be/src/exec/hdfs-parquet-table-writer.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-table-writer.cc 
b/be/src/exec/hdfs-parquet-table-writer.cc
index 6370859..6277a99 100644
--- a/be/src/exec/hdfs-parquet-table-writer.cc
+++ b/be/src/exec/hdfs-parquet-table-writer.cc
@@ -36,6 +36,7 @@
 #include "util/debug-util.h"
 #include "util/dict-encoding.h"
 #include "util/hdfs-util.h"
+#include "util/string-util.h"
 #include "util/rle-encoding.h"
 
 #include <sstream>
@@ -44,7 +45,6 @@
 
 #include "common/names.h"
 using namespace impala;
-using namespace parquet;
 using namespace apache::thrift;
 
 // Managing file sizes: We need to estimate how big the files being buffered
@@ -102,7 +102,10 @@ class HdfsParquetTableWriter::BaseColumnWriter {
       def_levels_(nullptr),
       values_buffer_len_(DEFAULT_DATA_PAGE_SIZE),
       page_stats_base_(nullptr),
-      row_group_stats_base_(nullptr) {
+      row_group_stats_base_(nullptr),
+      table_sink_mem_tracker_(parent_->parent_->mem_tracker()) {
+    static_assert(std::is_same<decltype(parent_->parent_), 
HdfsTableSink*>::value,
+        "'table_sink_mem_tracker_' must point to the mem tracker of an 
HdfsTableSink");
     def_levels_ = parent_->state_->obj_pool()->Add(
         new 
RleEncoder(parent_->reusable_col_mem_pool_->Allocate(DEFAULT_DATA_PAGE_SIZE),
                        DEFAULT_DATA_PAGE_SIZE, 1));
@@ -145,7 +148,7 @@ class HdfsParquetTableWriter::BaseColumnWriter {
 
   // Encodes the row group statistics into a parquet::Statistics object and 
attaches it to
   // 'meta_data'.
-  void EncodeRowGroupStats(ColumnMetaData* meta_data) {
+  void EncodeRowGroupStats(parquet::ColumnMetaData* meta_data) {
     DCHECK(row_group_stats_base_ != nullptr);
     if (row_group_stats_base_->BytesNeeded() <= MAX_COLUMN_STATS_SIZE) {
       row_group_stats_base_->EncodeToThrift(&meta_data->statistics);
@@ -162,13 +165,21 @@ class HdfsParquetTableWriter::BaseColumnWriter {
     current_page_ = nullptr;
     num_values_ = 0;
     total_compressed_byte_size_ = 0;
-    current_encoding_ = Encoding::PLAIN;
-    next_page_encoding_ = Encoding::PLAIN;
+    current_encoding_ = parquet::Encoding::PLAIN;
+    next_page_encoding_ = parquet::Encoding::PLAIN;
     column_encodings_.clear();
     dict_encoding_stats_.clear();
     data_encoding_stats_.clear();
     // Repetition/definition level encodings are constant. Incorporate them 
here.
-    column_encodings_.insert(Encoding::RLE);
+    column_encodings_.insert(parquet::Encoding::RLE);
+    offset_index_.page_locations.clear();
+    column_index_.null_pages.clear();
+    column_index_.min_values.clear();
+    column_index_.max_values.clear();
+    table_sink_mem_tracker_->Release(page_index_memory_consumption_);
+    page_index_memory_consumption_ = 0;
+    column_index_.null_counts.clear();
+    valid_column_index_ = true;
   }
 
   // Close this writer. This is only called after Flush() and no more rows will
@@ -176,6 +187,9 @@ class HdfsParquetTableWriter::BaseColumnWriter {
   void Close() {
     if (compressor_.get() != nullptr) compressor_->Close();
     if (dict_encoder_base_ != nullptr) dict_encoder_base_->Close();
+    // We must release the memory consumption of this column writer.
+    table_sink_mem_tracker_->Release(page_index_memory_consumption_);
+    page_index_memory_consumption_ = 0;
   }
 
   const ColumnType& type() const { return expr_eval_->root().type(); }
@@ -211,7 +225,7 @@ class HdfsParquetTableWriter::BaseColumnWriter {
 
   struct DataPage {
     // Page header.  This is a union of all page types.
-    PageHeader header;
+    parquet::PageHeader header;
 
     // Number of bytes needed to store definition levels.
     int num_def_bytes;
@@ -259,21 +273,21 @@ class HdfsParquetTableWriter::BaseColumnWriter {
   int64_t total_compressed_byte_size_;
   int64_t total_uncompressed_byte_size_;
   // Encoding of the current page.
-  Encoding::type current_encoding_;
+  parquet::Encoding::type current_encoding_;
   // Encoding to use for the next page. By default, the same as 
'current_encoding_'.
   // Used by the column writer to switch encoding while writing a column, e.g. 
if the
   // dictionary overflows.
-  Encoding::type next_page_encoding_;
+  parquet::Encoding::type next_page_encoding_;
 
   // Set of all encodings used in the column chunk
-  unordered_set<Encoding::type> column_encodings_;
+  unordered_set<parquet::Encoding::type> column_encodings_;
 
   // Map from the encoding to the number of pages in the column chunk with 
this encoding
   // These are used to construct the PageEncodingStats, which provide 
information
   // about encoding usage for each different page type. Currently, only 
dictionary
   // and data pages are used.
-  unordered_map<Encoding::type, int> dict_encoding_stats_;
-  unordered_map<Encoding::type, int> data_encoding_stats_;
+  unordered_map<parquet::Encoding::type, int> dict_encoding_stats_;
+  unordered_map<parquet::Encoding::type, int> data_encoding_stats_;
 
   // Created, owned, and set by the derived class.
   DictEncoderBase* dict_encoder_base_;
@@ -292,6 +306,22 @@ class HdfsParquetTableWriter::BaseColumnWriter {
   // Pointers to statistics, created, owned, and set by the derived class.
   ColumnStatsBase* page_stats_base_;
   ColumnStatsBase* row_group_stats_base_;
+
+  // OffsetIndex stores the locations of the pages.
+  parquet::OffsetIndex offset_index_;
+
+  // ColumnIndex stores the statistics of the pages.
+  parquet::ColumnIndex column_index_;
+
+  // Pointer to the HdfsTableSink's MemTracker.
+  MemTracker* table_sink_mem_tracker_;
+
+  // Memory consumption of the min/max values in the page index.
+  int64_t page_index_memory_consumption_ = 0;
+
+  // Only write ColumnIndex when 'valid_column_index_' is true. We always need 
to write
+  // the OffsetIndex though.
+  bool valid_column_index_ = true;
 };
 
 // Per type column writer.
@@ -312,8 +342,8 @@ class HdfsParquetTableWriter::ColumnWriter :
     BaseColumnWriter::Reset();
     // Default to dictionary encoding.  If the cardinality ends up being too 
high,
     // it will fall back to plain.
-    current_encoding_ = Encoding::PLAIN_DICTIONARY;
-    next_page_encoding_ = Encoding::PLAIN_DICTIONARY;
+    current_encoding_ = parquet::Encoding::PLAIN_DICTIONARY;
+    next_page_encoding_ = parquet::Encoding::PLAIN_DICTIONARY;
     dict_encoder_.reset(
         new DictEncoder<T>(parent_->per_file_mem_pool_.get(), 
plain_encoded_value_size_,
             parent_->parent_->mem_tracker()));
@@ -328,7 +358,7 @@ class HdfsParquetTableWriter::ColumnWriter :
 
  protected:
   virtual bool ProcessValue(void* value, int64_t* bytes_needed) {
-    if (current_encoding_ == Encoding::PLAIN_DICTIONARY) {
+    if (current_encoding_ == parquet::Encoding::PLAIN_DICTIONARY) {
       if (UNLIKELY(num_values_since_dict_size_check_ >=
                    DICTIONARY_DATA_PAGE_SIZE_CHECK_PERIOD)) {
         num_values_since_dict_size_check_ = 0;
@@ -339,11 +369,11 @@ class HdfsParquetTableWriter::ColumnWriter :
       // If the dictionary contains the maximum number of values, switch to 
plain
       // encoding for the next page. The current page is full and must be 
written out.
       if (UNLIKELY(*bytes_needed < 0)) {
-        next_page_encoding_ = Encoding::PLAIN;
+        next_page_encoding_ = parquet::Encoding::PLAIN;
         return false;
       }
       parent_->file_size_estimate_ += *bytes_needed;
-    } else if (current_encoding_ == Encoding::PLAIN) {
+    } else if (current_encoding_ == parquet::Encoding::PLAIN) {
       T* v = CastValue(value);
       *bytes_needed = plain_encoded_value_size_ < 0 ?
           ParquetPlainEncoder::ByteSize<T>(*v) :
@@ -386,8 +416,7 @@ class HdfsParquetTableWriter::ColumnWriter :
   // Temporary string value to hold CHAR(N)
   StringValue temp_;
 
-  // Tracks statistics per page. These are not written out currently but are 
merged into
-  // the row group stats. TODO(IMPALA-5841): Write these to the page index.
+  // Tracks statistics per page. These are written out to the page index.
   scoped_ptr<ColumnStats<T>> page_stats_;
 
   // Tracks statistics per row group. This gets reset when starting a new row 
group.
@@ -424,7 +453,7 @@ class HdfsParquetTableWriter::BoolColumnWriter :
         new BitWriter(values_buffer_, values_buffer_len_));
     // Dictionary encoding doesn't make sense for bools and is not allowed by
     // the format.
-    current_encoding_ = Encoding::PLAIN;
+    current_encoding_ = parquet::Encoding::PLAIN;
     dict_encoder_base_ = nullptr;
 
     page_stats_base_ = &page_stats_;
@@ -455,8 +484,7 @@ class HdfsParquetTableWriter::BoolColumnWriter :
   // Used to encode bools as single bit values. This is reused across pages.
   BitWriter* bool_values_;
 
-  // Tracks statistics per page. These are not written out currently but are 
merged into
-  // the row group stats. TODO(IMPALA-5841): Write these to the page index.
+  // Tracks statistics per page. These are written out to the page index.
   ColumnStats<bool> page_stats_;
 
   // Tracks statistics per row group. This gets reset when starting a new file.
@@ -559,13 +587,13 @@ Status 
HdfsParquetTableWriter::BaseColumnWriter::Flush(int64_t* file_pos,
   if (dict_encoder_base_ != nullptr) {
     *first_dictionary_page = *file_pos;
     // Write dictionary page header
-    DictionaryPageHeader dict_header;
+    parquet::DictionaryPageHeader dict_header;
     dict_header.num_values = dict_encoder_base_->num_entries();
-    dict_header.encoding = Encoding::PLAIN_DICTIONARY;
+    dict_header.encoding = parquet::Encoding::PLAIN_DICTIONARY;
     ++dict_encoding_stats_[dict_header.encoding];
 
-    PageHeader header;
-    header.type = PageType::DICTIONARY_PAGE;
+    parquet::PageHeader header;
+    header.type = parquet::PageType::DICTIONARY_PAGE;
     header.uncompressed_page_size = dict_encoder_base_->dict_encoded_size();
     header.__set_dictionary_page_header(dict_header);
 
@@ -608,15 +636,26 @@ Status 
HdfsParquetTableWriter::BaseColumnWriter::Flush(int64_t* file_pos,
   }
 
   *first_data_page = *file_pos;
+  int64_t current_row_group_index = 0;
+  offset_index_.page_locations.resize(num_data_pages_);
+
   // Write data pages
   for (int i = 0; i < num_data_pages_; ++i) {
     DataPage& page = pages_[i];
+    parquet::PageLocation location;
 
     if (page.header.data_page_header.num_values == 0) {
       // Skip empty pages
+      location.offset = -1;
+      location.compressed_page_size = 0;
+      location.first_row_index = -1;
+      offset_index_.page_locations[i] = location;
       continue;
     }
 
+    location.offset = *file_pos;
+    location.first_row_index = current_row_group_index;
+
     // Write data page header
     uint8_t* buffer = nullptr;
     uint32_t len = 0;
@@ -625,9 +664,17 @@ Status 
HdfsParquetTableWriter::BaseColumnWriter::Flush(int64_t* file_pos,
     RETURN_IF_ERROR(parent_->Write(buffer, len));
     *file_pos += len;
 
+    // Note that the namings are confusing here:
+    // parquet::PageHeader::compressed_page_size is the compressed page size 
in bytes, as
+    // its name suggests. On the other hand, 
parquet::PageLocation::compressed_page_size
+    // also includes the size of the page header.
+    location.compressed_page_size = page.header.compressed_page_size + len;
+    offset_index_.page_locations[i] = location;
+
     // Write the page data
     RETURN_IF_ERROR(parent_->Write(page.data, 
page.header.compressed_page_size));
     *file_pos += page.header.compressed_page_size;
+    current_row_group_index += page.header.data_page_header.num_values;
   }
   return Status::OK();
 }
@@ -639,11 +686,11 @@ Status 
HdfsParquetTableWriter::BaseColumnWriter::FinalizeCurrentPage() {
   // If the entire page was NULL, encode it as PLAIN since there is no
   // data anyway. We don't output a useless dictionary page and it works
   // around a parquet MR bug (see IMPALA-759 for more details).
-  if (current_page_->num_non_null == 0) current_encoding_ = Encoding::PLAIN;
+  if (current_page_->num_non_null == 0) current_encoding_ = 
parquet::Encoding::PLAIN;
 
-  if (current_encoding_ == Encoding::PLAIN_DICTIONARY) WriteDictDataPage();
+  if (current_encoding_ == parquet::Encoding::PLAIN_DICTIONARY) 
WriteDictDataPage();
 
-  PageHeader& header = current_page_->header;
+  parquet::PageHeader& header = current_page_->header;
   header.data_page_header.encoding = current_encoding_;
 
   // Accumulate encoding statistics
@@ -698,9 +745,41 @@ Status 
HdfsParquetTableWriter::BaseColumnWriter::FinalizeCurrentPage() {
         max_compressed_size - header.compressed_page_size);
   }
 
+  DCHECK(page_stats_base_ != nullptr);
+  parquet::Statistics page_stats;
+  page_stats_base_->EncodeToThrift(&page_stats);
+  {
+    // If pages_stats contains min_value and max_value, then append them to 
min_values_
+    // and max_values_ and also mark the page as not null. In case min and max 
values are
+    // not set, push empty strings to maintain the consistency of the index 
and mark the
+    // page as null. Always push the null_count.
+    string min_val;
+    string max_val;
+    if ((page_stats.__isset.min_value) && (page_stats.__isset.max_value)) {
+      Status s_min = TruncateDown(page_stats.min_value, 
PAGE_INDEX_MAX_STRING_LENGTH,
+          &min_val);
+      Status s_max = TruncateUp(page_stats.max_value, 
PAGE_INDEX_MAX_STRING_LENGTH,
+          &max_val);
+      if (!s_min.ok() || !s_max.ok()) valid_column_index_ = false;
+      column_index_.null_pages.push_back(false);
+    } else {
+      DCHECK(!page_stats.__isset.min_value && !page_stats.__isset.max_value);
+      column_index_.null_pages.push_back(true);
+      DCHECK_EQ(page_stats.null_count, num_values_);
+    }
+    int64_t new_memory_allocation = min_val.capacity() + max_val.capacity();
+    if (UNLIKELY(!table_sink_mem_tracker_->TryConsume(new_memory_allocation))) 
{
+      return table_sink_mem_tracker_->MemLimitExceeded(parent_->state_,
+          "Failed to allocate memory for Parquet page index.", 
new_memory_allocation);
+    }
+    page_index_memory_consumption_ += new_memory_allocation;
+    column_index_.min_values.emplace_back(std::move(min_val));
+    column_index_.max_values.emplace_back(std::move(max_val));
+    column_index_.null_counts.push_back(page_stats.null_count);
+  }
+
   // Update row group statistics from page statistics.
   DCHECK(row_group_stats_base_ != nullptr);
-  DCHECK(page_stats_base_ != nullptr);
   row_group_stats_base_->Merge(*page_stats_base_);
 
   // Add the size of the data page header
@@ -728,13 +807,13 @@ void HdfsParquetTableWriter::BaseColumnWriter::NewPage() {
     pages_.push_back(DataPage());
     current_page_ = &pages_[num_data_pages_++];
 
-    DataPageHeader header;
+    parquet::DataPageHeader header;
     header.num_values = 0;
     // The code that populates the column chunk metadata's encodings field
     // relies on these specific values for the definition/repetition level
     // encodings.
-    header.definition_level_encoding = Encoding::RLE;
-    header.repetition_level_encoding = Encoding::RLE;
+    header.definition_level_encoding = parquet::Encoding::RLE;
+    header.repetition_level_encoding = parquet::Encoding::RLE;
     current_page_->header.__set_data_page_header(header);
   }
   current_encoding_ = next_page_encoding_;
@@ -861,14 +940,14 @@ Status HdfsParquetTableWriter::CreateSchema() {
     const ColumnType& type = output_expr_evals_[i]->root().type();
     node.name = table_desc_->col_descs()[i + num_clustering_cols].name();
     node.__set_type(ConvertInternalToParquetType(type.type));
-    node.__set_repetition_type(FieldRepetitionType::OPTIONAL);
+    node.__set_repetition_type(parquet::FieldRepetitionType::OPTIONAL);
     if (type.type == TYPE_DECIMAL) {
       // This column is type decimal. Update the file metadata to include the
       // additional fields:
       //  1) converted_type: indicate this is really a decimal column.
       //  2) type_length: the number of bytes used per decimal value in the 
data
       //  3) precision/scale
-      node.__set_converted_type(ConvertedType::DECIMAL);
+      node.__set_converted_type(parquet::ConvertedType::DECIMAL);
       node.__set_type_length(
           
ParquetPlainEncoder::DecimalSize(output_expr_evals_[i]->root().type()));
       node.__set_scale(output_expr_evals_[i]->root().type().scale);
@@ -876,15 +955,15 @@ Status HdfsParquetTableWriter::CreateSchema() {
     } else if (type.type == TYPE_VARCHAR || type.type == TYPE_CHAR ||
         (type.type == TYPE_STRING &&
          state_->query_options().parquet_annotate_strings_utf8)) {
-      node.__set_converted_type(ConvertedType::UTF8);
+      node.__set_converted_type(parquet::ConvertedType::UTF8);
     } else if (type.type == TYPE_TINYINT) {
-      node.__set_converted_type(ConvertedType::INT_8);
+      node.__set_converted_type(parquet::ConvertedType::INT_8);
     } else if (type.type == TYPE_SMALLINT) {
-      node.__set_converted_type(ConvertedType::INT_16);
+      node.__set_converted_type(parquet::ConvertedType::INT_16);
     } else if (type.type == TYPE_INT) {
-      node.__set_converted_type(ConvertedType::INT_32);
+      node.__set_converted_type(parquet::ConvertedType::INT_32);
     } else if (type.type == TYPE_BIGINT) {
-      node.__set_converted_type(ConvertedType::INT_64);
+      node.__set_converted_type(parquet::ConvertedType::INT_64);
     }
   }
 
@@ -893,14 +972,14 @@ Status HdfsParquetTableWriter::CreateSchema() {
 
 Status HdfsParquetTableWriter::AddRowGroup() {
   if (current_row_group_ != nullptr) RETURN_IF_ERROR(FlushCurrentRowGroup());
-  file_metadata_.row_groups.push_back(RowGroup());
+  file_metadata_.row_groups.push_back(parquet::RowGroup());
   current_row_group_ = 
&file_metadata_.row_groups[file_metadata_.row_groups.size() - 1];
 
   // Initialize new row group metadata.
   int num_clustering_cols = table_desc_->num_clustering_cols();
   current_row_group_->columns.resize(columns_.size());
   for (int i = 0; i < columns_.size(); ++i) {
-    ColumnMetaData metadata;
+    parquet::ColumnMetaData metadata;
     metadata.type = ConvertInternalToParquetType(columns_[i]->type().type);
     metadata.path_in_schema.push_back(
         table_desc_->col_descs()[i + num_clustering_cols].name());
@@ -1029,12 +1108,13 @@ Status HdfsParquetTableWriter::Finalize() {
   file_metadata_.num_rows = row_count_;
 
   // Set the ordering used to write parquet statistics for columns in the file.
-  ColumnOrder col_order = ColumnOrder();
-  col_order.__set_TYPE_ORDER(TypeDefinedOrder());
+  parquet::ColumnOrder col_order = parquet::ColumnOrder();
+  col_order.__set_TYPE_ORDER(parquet::TypeDefinedOrder());
   file_metadata_.column_orders.assign(columns_.size(), col_order);
   file_metadata_.__isset.column_orders = true;
 
   RETURN_IF_ERROR(FlushCurrentRowGroup());
+  RETURN_IF_ERROR(WritePageIndex());
   RETURN_IF_ERROR(WriteFileFooter());
   stats_.__set_parquet_stats(parquet_insert_stats_);
   COUNTER_ADD(parent_->rows_inserted_counter(), row_count_);
@@ -1069,8 +1149,8 @@ Status HdfsParquetTableWriter::FlushCurrentRowGroup() {
     RETURN_IF_ERROR(columns_[i]->Flush(&file_pos_, &data_page_offset, 
&dict_page_offset));
     DCHECK_GT(data_page_offset, 0);
 
-    ColumnChunk& col_chunk = current_row_group_->columns[i];
-    ColumnMetaData& col_metadata = col_chunk.meta_data;
+    parquet::ColumnChunk& col_chunk = current_row_group_->columns[i];
+    parquet::ColumnMetaData& col_metadata = col_chunk.meta_data;
     col_metadata.data_page_offset = data_page_offset;
     if (dict_page_offset >= 0) {
       col_metadata.__set_dictionary_page_offset(dict_page_offset);
@@ -1089,23 +1169,23 @@ Status HdfsParquetTableWriter::FlushCurrentRowGroup() {
 
     // Write encodings and encoding stats for this column
     col_metadata.encodings.clear();
-    for (Encoding::type encoding : col_writer->column_encodings_) {
+    for (parquet::Encoding::type encoding : col_writer->column_encodings_) {
       col_metadata.encodings.push_back(encoding);
     }
 
-    vector<PageEncodingStats> encoding_stats;
+    vector<parquet::PageEncodingStats> encoding_stats;
     // Add dictionary page encoding stats
     for (const auto& entry: col_writer->dict_encoding_stats_) {
-      PageEncodingStats dict_enc_stat;
-      dict_enc_stat.page_type = PageType::DICTIONARY_PAGE;
+      parquet::PageEncodingStats dict_enc_stat;
+      dict_enc_stat.page_type = parquet::PageType::DICTIONARY_PAGE;
       dict_enc_stat.encoding = entry.first;
       dict_enc_stat.count = entry.second;
       encoding_stats.push_back(dict_enc_stat);
     }
     // Add data page encoding stats
     for (const auto& entry: col_writer->data_encoding_stats_) {
-      PageEncodingStats data_enc_stat;
-      data_enc_stat.page_type = PageType::DATA_PAGE;
+      parquet::PageEncodingStats data_enc_stat;
+      data_enc_stat.page_type = parquet::PageType::DATA_PAGE;
       data_enc_stat.encoding = entry.first;
       data_enc_stat.count = entry.second;
       encoding_stats.push_back(data_enc_stat);
@@ -1129,8 +1209,6 @@ Status HdfsParquetTableWriter::FlushCurrentRowGroup() {
         thrift_serializer_->Serialize(&current_row_group_->columns[i], &len, 
&buffer));
     RETURN_IF_ERROR(Write(buffer, len));
     file_pos_ += len;
-
-    col_writer->Reset();
   }
 
   // Populate RowGroup::sorting_columns with all columns specified by the 
Frontend.
@@ -1148,6 +1226,47 @@ Status HdfsParquetTableWriter::FlushCurrentRowGroup() {
   return Status::OK();
 }
 
+Status HdfsParquetTableWriter::WritePageIndex() {
+  // Currently Impala only write Parquet files with a single row group. The 
current
+  // page index logic depends on this behavior as it only keeps one row group's
+  // statistics in memory.
+  DCHECK_EQ(file_metadata_.row_groups.size(), 1);
+
+  parquet::RowGroup* row_group = &(file_metadata_.row_groups[0]);
+  // Write out the column indexes.
+  for (int i = 0; i < columns_.size(); ++i) {
+    auto& column = *columns_[i];
+    if (!column.valid_column_index_) continue;
+    column.column_index_.__set_boundary_order(
+        column.row_group_stats_base_->GetBoundaryOrder());
+    // We always set null_counts.
+    column.column_index_.__isset.null_counts = true;
+    uint8_t* buffer = nullptr;
+    uint32_t len = 0;
+    RETURN_IF_ERROR(thrift_serializer_->Serialize(&column.column_index_, &len, 
&buffer));
+    RETURN_IF_ERROR(Write(buffer, len));
+    // Update the column_index_offset and column_index_length of the 
ColumnChunk
+    row_group->columns[i].__set_column_index_offset(file_pos_);
+    row_group->columns[i].__set_column_index_length(len);
+    file_pos_ += len;
+  }
+  // Write out the offset indexes.
+  for (int i = 0; i < columns_.size(); ++i) {
+    auto& column = *columns_[i];
+    uint8_t* buffer = nullptr;
+    uint32_t len = 0;
+    RETURN_IF_ERROR(thrift_serializer_->Serialize(&column.offset_index_, &len, 
&buffer));
+    RETURN_IF_ERROR(Write(buffer, len));
+    // Update the offset_index_offset and offset_index_length of the 
ColumnChunk
+    row_group->columns[i].__set_offset_index_offset(file_pos_);
+    row_group->columns[i].__set_offset_index_length(len);
+    file_pos_ += len;
+  }
+  // Reset column writers.
+  for (auto& column : columns_) column->Reset();
+  return Status::OK();
+}
+
 Status HdfsParquetTableWriter::WriteFileFooter() {
   // Write file_meta_data
   uint32_t file_metadata_len = 0;

http://git-wip-us.apache.org/repos/asf/impala/blob/ccf19f9f/be/src/exec/hdfs-parquet-table-writer.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-table-writer.h 
b/be/src/exec/hdfs-parquet-table-writer.h
index 1334b19..fd77bf7 100644
--- a/be/src/exec/hdfs-parquet-table-writer.h
+++ b/be/src/exec/hdfs-parquet-table-writer.h
@@ -103,6 +103,12 @@ class HdfsParquetTableWriter : public HdfsTableWriter {
   /// as 'parquet-mr'.
   static const int MAX_COLUMN_STATS_SIZE = 4 * 1024;
 
+  /// In parquet::ColumnIndex we store the min and max values for each page.
+  /// However, we don't want to store very long strings, so we truncate them.
+  /// The value of it must not be too small, since we don't want to truncate
+  /// non-string values.
+  static const int PAGE_INDEX_MAX_STRING_LENGTH = 64;
+
   /// Per-column information state.  This contains some metadata as well as the
   /// data buffers.
   class BaseColumnWriter;
@@ -120,10 +126,14 @@ class HdfsParquetTableWriter : public HdfsTableWriter {
   /// table_desc_ into the format in the file metadata
   Status CreateSchema();
 
-  /// Write the file header information to the output file.
+  /// Writes the file header information to the output file.
   Status WriteFileHeader();
 
-  /// Write the file metadata and footer.
+  /// Writes the column index and offset index of each page in the file.
+  /// It also resets the column writers.
+  Status WritePageIndex();
+
+  /// Writes the file metadata and footer.
   Status WriteFileFooter();
 
   /// Flushes the current row group to file.  This will compute the final

http://git-wip-us.apache.org/repos/asf/impala/blob/ccf19f9f/be/src/exec/parquet-column-stats.h
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-stats.h 
b/be/src/exec/parquet-column-stats.h
index 44d4a65..6d2743a 100644
--- a/be/src/exec/parquet-column-stats.h
+++ b/be/src/exec/parquet-column-stats.h
@@ -26,6 +26,8 @@
 #include "runtime/timestamp-value.h"
 #include "runtime/types.h"
 
+#include "gen-cpp/parquet_types.h"
+
 namespace impala {
 
 /// This class, together with its derivatives, is used to update column 
statistics when
@@ -67,6 +69,11 @@ class ColumnStatsBase {
   struct MinMaxTrait {
     static decltype(auto) MinValue(const T& a, const T& b) { return 
std::min(a, b); }
     static decltype(auto) MaxValue(const T& a, const T& b) { return 
std::max(a, b); }
+    static int Compare(const T& a, const T& b) {
+      if (a < b) return -1;
+      if (a > b) return 1;
+      return 0;
+    }
   };
 
   /// min and max functions for floating point types
@@ -74,6 +81,13 @@ class ColumnStatsBase {
   struct MinMaxTrait<T, std::enable_if_t<std::is_floating_point<T>::value>> {
     static decltype(auto) MinValue(const T& a, const T& b) { return 
std::fmin(a, b); }
     static decltype(auto) MaxValue(const T& a, const T& b) { return 
std::fmax(a, b); }
+    static int Compare(const T& a, const T& b) {
+      //TODO: Should be aligned with PARQUET-1222, once resolved
+      if (a == b) return 0;
+      if (std::isnan(a) && std::isnan(b)) return 0;
+      if (MaxValue(a, b) == a) return 1;
+      return -1;
+    }
   };
 
   ColumnStatsBase() : has_min_max_values_(false), null_count_(0) {}
@@ -94,7 +108,8 @@ class ColumnStatsBase {
       int64_t* null_count);
 
   /// Merges this statistics object with values from 'other'. If other has not 
been
-  /// initialized, then this object will not be changed.
+  /// initialized, then this object will not be changed. It maintains internal 
state that
+  /// tracks whether the min/max values are ordered.
   virtual void Merge(const ColumnStatsBase& other) = 0;
 
   /// Copies the contents of this object's statistics values to internal 
buffers. Some
@@ -119,6 +134,18 @@ class ColumnStatsBase {
   /// value is appended to the column or the statistics are merged.
   void IncrementNullCount(int64_t count) { null_count_ += count; }
 
+  /// Returns the boundary order of the pages. That is, whether the lists of 
min/max
+  /// elements inside the ColumnIndex are ordered and if so, in which 
direction.
+  /// If both 'ascending_boundary_order_' and 'descending_boundary_order_' is 
true,
+  /// it means all elements are equal, we choose ascending order in this case.
+  /// If only one flag is true, or both of them is false, then we return the 
identified
+  /// ordering, or unordered.
+  parquet::BoundaryOrder::type GetBoundaryOrder() const {
+    if (ascending_boundary_order_) return parquet::BoundaryOrder::ASCENDING;
+    if (descending_boundary_order_) return parquet::BoundaryOrder::DESCENDING;
+    return parquet::BoundaryOrder::UNORDERED;
+  }
+
  protected:
   // Copies the memory of 'value' into 'buffer' and make 'value' point to 
'buffer'.
   // 'buffer' is reset before making the copy.
@@ -130,6 +157,16 @@ class ColumnStatsBase {
   // Number of null values since the last call to Reset().
   int64_t null_count_;
 
+  // If true, min/max values are ascending.
+  // We assume the values are ascending, so start with true and only make it 
false when
+  // we find a descending value. If not all values are equal, then at least 
one of
+  // 'ascending_boundary_order_' and 'descending_boundary_order_' will be 
false.
+  bool ascending_boundary_order_ = true;
+
+  // If true, min/max values are descending.
+  // See description of 'ascending_boundary_order_'.
+  bool descending_boundary_order_ = true;
+
  private:
   /// Returns true if we support reading statistics stored in the fields 
'min_value' and
   /// 'max_value' in parquet::Statistics for the type 'col_type' and the 
column order
@@ -174,7 +211,9 @@ class ColumnStats : public ColumnStatsBase {
       plain_encoded_value_size_(plain_encoded_value_size),
       mem_pool_(mem_pool),
       min_buffer_(mem_pool),
-      max_buffer_(mem_pool) {}
+      max_buffer_(mem_pool),
+      prev_page_min_buffer_(mem_pool),
+      prev_page_max_buffer_(mem_pool) {}
 
   /// Updates the statistics based on the values min_value and max_value. If 
necessary,
   /// initializes the statistics. It may keep a reference to either value until
@@ -216,12 +255,20 @@ class ColumnStats : public ColumnStatsBase {
   // Maximum value since the last call to Reset().
   T max_value_;
 
+  // Minimum value of the previous page. Need to store that to calculate 
boundary order.
+  T prev_page_min_value_;
+
+  // Maximum value of the previous page. Need to store that to calculate 
boundary order.
+  T prev_page_max_value_;
+
   // Memory pool to allocate from when making copies of the statistics data.
   MemPool* mem_pool_;
 
   // Local buffers to copy statistics data into.
   StringBuffer min_buffer_;
   StringBuffer max_buffer_;
+  StringBuffer prev_page_min_buffer_;
+  StringBuffer prev_page_max_buffer_;
 };
 
 } // end ns impala

http://git-wip-us.apache.org/repos/asf/impala/blob/ccf19f9f/be/src/exec/parquet-column-stats.inline.h
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-stats.inline.h 
b/be/src/exec/parquet-column-stats.inline.h
index 0b618f9..094fadd 100644
--- a/be/src/exec/parquet-column-stats.inline.h
+++ b/be/src/exec/parquet-column-stats.inline.h
@@ -28,6 +28,8 @@ namespace impala {
 inline void ColumnStatsBase::Reset() {
   has_min_max_values_ = false;
   null_count_ = 0;
+  ascending_boundary_order_ = true;
+  descending_boundary_order_ = true;
 }
 
 template <typename T>
@@ -46,7 +48,25 @@ template <typename T>
 inline void ColumnStats<T>::Merge(const ColumnStatsBase& other) {
   DCHECK(dynamic_cast<const ColumnStats<T>*>(&other));
   const ColumnStats<T>* cs = static_cast<const ColumnStats<T>*>(&other);
-  if (cs->has_min_max_values_) Update(cs->min_value_, cs->max_value_);
+  if (cs->has_min_max_values_) {
+    if (has_min_max_values_) {
+      if (ascending_boundary_order_) {
+        if (MinMaxTrait<T>::Compare(prev_page_max_value_, cs->max_value_) > 0 
||
+            MinMaxTrait<T>::Compare(prev_page_min_value_, cs->min_value_) > 0) 
{
+          ascending_boundary_order_ = false;
+        }
+      }
+      if (descending_boundary_order_) {
+        if (MinMaxTrait<T>::Compare(prev_page_max_value_, cs->max_value_) < 0 
||
+            MinMaxTrait<T>::Compare(prev_page_min_value_, cs->min_value_) < 0) 
{
+          descending_boundary_order_ = false;
+        }
+      }
+    }
+    Update(cs->min_value_, cs->max_value_);
+    prev_page_min_value_ = cs->min_value_;
+    prev_page_max_value_ = cs->max_value_;
+  }
   IncrementNullCount(cs->null_count_);
 }
 
@@ -176,12 +196,52 @@ inline void ColumnStats<StringValue>::Update(
   }
 }
 
+template <>
+inline void ColumnStats<StringValue>::Merge(const ColumnStatsBase& other) {
+  DCHECK(dynamic_cast<const ColumnStats<StringValue>*>(&other));
+  const ColumnStats<StringValue>* cs = static_cast<
+      const ColumnStats<StringValue>*>(&other);
+  if (cs->has_min_max_values_) {
+    if (has_min_max_values_) {
+      // Make sure that we copied the previous page's min/max values to their 
own buffer.
+      DCHECK_NE(static_cast<void*>(prev_page_min_value_.ptr),
+                static_cast<void*>(cs->min_value_.ptr));
+      DCHECK_NE(static_cast<void*>(prev_page_max_value_.ptr),
+                static_cast<void*>(cs->max_value_.ptr));
+      if (ascending_boundary_order_) {
+        if (prev_page_max_value_ > cs->max_value_ ||
+            prev_page_min_value_ > cs->min_value_) {
+          ascending_boundary_order_ = false;
+        }
+      }
+      if (descending_boundary_order_) {
+        if (prev_page_max_value_ < cs->max_value_ ||
+            prev_page_min_value_ < cs->min_value_) {
+          descending_boundary_order_ = false;
+        }
+      }
+    }
+    Update(cs->min_value_, cs->max_value_);
+    prev_page_min_value_ = cs->min_value_;
+    prev_page_max_value_ = cs->max_value_;
+    prev_page_min_buffer_.Clear();
+    prev_page_max_buffer_.Clear();
+  }
+  IncrementNullCount(cs->null_count_);
+}
+
 // StringValues need to be copied at the end of processing a row batch, since 
the batch
 // memory will be released.
 template <>
 inline Status 
ColumnStats<StringValue>::MaterializeStringValuesToInternalBuffers() {
   if (min_buffer_.IsEmpty()) RETURN_IF_ERROR(CopyToBuffer(&min_buffer_, 
&min_value_));
   if (max_buffer_.IsEmpty()) RETURN_IF_ERROR(CopyToBuffer(&max_buffer_, 
&max_value_));
+  if (prev_page_min_buffer_.IsEmpty()) {
+    RETURN_IF_ERROR(CopyToBuffer(&prev_page_min_buffer_, 
&prev_page_min_value_));
+  }
+  if (prev_page_max_buffer_.IsEmpty()) {
+    RETURN_IF_ERROR(CopyToBuffer(&prev_page_max_buffer_, 
&prev_page_max_value_));
+  }
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/ccf19f9f/be/src/util/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/util/CMakeLists.txt b/be/src/util/CMakeLists.txt
index 546c1c0..d9092ce 100644
--- a/be/src/util/CMakeLists.txt
+++ b/be/src/util/CMakeLists.txt
@@ -74,6 +74,7 @@ add_library(Util
   runtime-profile.cc
   simple-logger.cc
   string-parser.cc
+  string-util.cc
   symbols-util.cc
   static-asserts.cc
   summary-util.cc
@@ -135,6 +136,7 @@ ADD_BE_TEST(redactor-unconfigured-test)
 ADD_BE_TEST(rle-test)
 ADD_BE_TEST(runtime-profile-test)
 ADD_BE_TEST(string-parser-test)
+ADD_BE_TEST(string-util-test)
 ADD_BE_TEST(symbols-util-test)
 ADD_BE_TEST(sys-info-test)
 ADD_BE_TEST(thread-pool-test)

http://git-wip-us.apache.org/repos/asf/impala/blob/ccf19f9f/be/src/util/string-util-test.cc
----------------------------------------------------------------------
diff --git a/be/src/util/string-util-test.cc b/be/src/util/string-util-test.cc
new file mode 100644
index 0000000..979eb9f
--- /dev/null
+++ b/be/src/util/string-util-test.cc
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "testutil/gtest-util.h"
+
+#include "util/string-util.h"
+#include "runtime/mem-pool.h"
+#include "runtime/mem-tracker.h"
+
+#include "common/names.h"
+
+namespace impala {
+
+enum Truncation {
+  DOWN,
+  UP
+};
+
+void EvalTruncation(const string& original, const string& expected_result,
+    int32_t max_length, Truncation boundary) {
+  string result;
+  if (boundary == DOWN) {
+    ASSERT_OK(TruncateDown(original, max_length, &result));
+  } else {
+    ASSERT_OK(TruncateUp(original, max_length, &result));
+  }
+  EXPECT_EQ(expected_result, result);
+}
+
+TEST(TruncateDownTest, Basic) {
+  EvalTruncation("0123456789", "0123456789", 100, DOWN);
+  EvalTruncation("0123456789", "0123456789", 10, DOWN);
+  EvalTruncation("0123456789", "01234", 5, DOWN);
+  EvalTruncation("0123456789", "", 0, DOWN);
+  EvalTruncation("", "", 10, DOWN);
+  EvalTruncation(string("\0\0\0", 3), string("\0\0", 2), 2, DOWN);
+  EvalTruncation("asdfghjkl", "asdf", 4, DOWN);
+  char a[] = {'a', CHAR_MAX, CHAR_MIN, 'b', '\0'};
+  char b[] = {'a', CHAR_MAX, '\0'};
+  EvalTruncation(a, b, 2, DOWN);
+}
+
+TEST(TruncateUpTest, Basic) {
+  EvalTruncation("0123456789", "0123456789", 100, UP);
+  EvalTruncation("abcdefghij", "abcdefghij", 10, UP);
+  EvalTruncation("abcdefghij", "abcdefghj", 9, UP);
+  EvalTruncation("abcdefghij", "abcdf", 5, UP);
+
+  string max_string(100, 0xFF);
+  EvalTruncation(max_string, max_string, 100, UP);
+
+  string normal_plus_max = "abcdef" + max_string;
+  EvalTruncation(normal_plus_max, normal_plus_max, 200, UP);
+  EvalTruncation(normal_plus_max, "abcdeg", 10, UP);
+
+  string result;
+  Status s = TruncateUp(max_string, 10, &result);
+  EXPECT_EQ(s.GetDetail(), "TruncateUp() couldn't increase string.\n");
+
+  EvalTruncation("", "", 10, UP);
+  EvalTruncation(string("\0\0\0", 3), string("\0\001", 2), 2, UP);
+  EvalTruncation("asdfghjkl", "asdg", 4, UP);
+  char a[] = {0, (char)0x7F, (char)0xFF, 0};
+  char b[] = {0, (char)0x80, 0};
+  EvalTruncation(a, b, 2, UP);
+}
+
+}
+
+IMPALA_TEST_MAIN();

http://git-wip-us.apache.org/repos/asf/impala/blob/ccf19f9f/be/src/util/string-util.cc
----------------------------------------------------------------------
diff --git a/be/src/util/string-util.cc b/be/src/util/string-util.cc
new file mode 100644
index 0000000..b6c8fb7
--- /dev/null
+++ b/be/src/util/string-util.cc
@@ -0,0 +1,57 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <algorithm>
+
+#include "gutil/strings/substitute.h"
+#include "util/string-util.h"
+
+#include "common/names.h"
+
+namespace impala {
+
+Status TruncateDown(const string& str, int32_t max_length, string* result) {
+  DCHECK(result != nullptr);
+  *result = str.substr(0, std::min(static_cast<int32_t>(str.length()), 
max_length));
+  return Status::OK();
+}
+
+Status TruncateUp(const string& str, int32_t max_length, string* result) {
+  DCHECK(result != nullptr);
+  if (str.length() <= max_length) {
+    *result = str;
+    return Status::OK();
+  }
+
+  *result = str.substr(0, max_length);
+  int i = max_length - 1;
+  while (i > 0 && static_cast<int32_t>((*result)[i]) == -1) {
+    (*result)[i] += 1;
+    --i;
+  }
+  // We convert it to unsigned because signed overflow results in undefined 
behavior.
+  unsigned char uch = static_cast<unsigned char>((*result)[i]);
+  uch += 1;
+  (*result)[i] = uch;
+  if (i == 0 && (*result)[i] == 0) {
+    return Status("TruncateUp() couldn't increase string.");
+  }
+  result->resize(i + 1);
+  return Status::OK();
+}
+
+}

http://git-wip-us.apache.org/repos/asf/impala/blob/ccf19f9f/be/src/util/string-util.h
----------------------------------------------------------------------
diff --git a/be/src/util/string-util.h b/be/src/util/string-util.h
new file mode 100644
index 0000000..7e7ab12
--- /dev/null
+++ b/be/src/util/string-util.h
@@ -0,0 +1,42 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_UTIL_STRING_UTIL_H
+#define IMPALA_UTIL_STRING_UTIL_H
+
+#include <string>
+
+#include "common/status.h"
+
+namespace impala {
+
+/// 'str' holds the minimum value of some string set. We need to truncate it
+/// if it is longer than 'max_length'.
+WARN_UNUSED_RESULT
+Status TruncateDown(const std::string& str, int32_t max_length, std::string* 
result);
+
+/// 'str' holds the maximum value of some string set. We want to truncate it
+/// to only occupy 'max_length' bytes. We also want to guarantee that the 
truncated
+/// value remains greater than all the strings in the original set, so we need
+/// to increase it after truncation. E.g.: when 'max_length' == 3: AAAAAAA => 
AAB
+/// Returns error if it cannot increase the string value, ie. all bytes are 
0xFF.
+WARN_UNUSED_RESULT
+Status TruncateUp(const std::string& str, int32_t max_length, std::string* 
result);
+
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/impala/blob/ccf19f9f/common/thrift/parquet.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/parquet.thrift b/common/thrift/parquet.thrift
index c4afb77..3666a43 100644
--- a/common/thrift/parquet.thrift
+++ b/common/thrift/parquet.thrift
@@ -362,6 +362,16 @@ enum PageType {
   DATA_PAGE_V2 = 3;
 }
 
+/**
+ * Enum to annotate whether lists of min/max elements inside ColumnIndex
+ * are ordered and if so, in which direction.
+ */
+enum BoundaryOrder {
+  UNORDERED = 0;
+  ASCENDING = 1;
+  DESCENDING = 2;
+}
+
 /** Data page header */
 struct DataPageHeader {
   /** Number of values, including NULLs, in this data page. **/
@@ -551,6 +561,18 @@ struct ColumnChunk {
    * metadata.
    **/
   3: optional ColumnMetaData meta_data
+
+  /** File offset of ColumnChunk's OffsetIndex **/
+  4: optional i64 offset_index_offset
+
+  /** Size of ColumnChunk's OffsetIndex, in bytes **/
+  5: optional i32 offset_index_length
+
+  /** File offset of ColumnChunk's ColumnIndex **/
+  6: optional i64 column_index_offset
+
+  /** Size of ColumnChunk's ColumnIndex, in bytes **/
+  7: optional i32 column_index_length
 }
 
 struct RowGroup {
@@ -588,6 +610,69 @@ union ColumnOrder {
   1: TypeDefinedOrder TYPE_ORDER;
 }
 
+struct PageLocation {
+  /** Offset of the page in the file **/
+  1: required i64 offset
+
+  /**
+   * Size of the page, including header. Sum of compressed_page_size and header
+   * length
+   */
+  2: required i32 compressed_page_size
+
+  /**
+   * Index within the RowGroup of the first row of the page; this means pages
+   * change on record boundaries (r = 0).
+   */
+  3: required i64 first_row_index
+}
+
+struct OffsetIndex {
+  /**
+   * PageLocations, ordered by increasing PageLocation.offset. It is required
+   * that page_locations[i].first_row_index < 
page_locations[i+1].first_row_index.
+   */
+  1: required list<PageLocation> page_locations
+}
+
+/**
+ * Description for ColumnIndex.
+ * Each <array-field>[i] refers to the page at OffsetIndex.page_locations[i]
+ */
+struct ColumnIndex {
+  /**
+   * A list of Boolean values to determine the validity of the corresponding
+   * min and max values. If true, a page contains only null values, and writers
+   * have to set the corresponding entries in min_values and max_values to
+   * byte[0], so that all lists have the same length. If false, the
+   * corresponding entries in min_values and max_values must be valid.
+   */
+  1: required list<bool> null_pages
+
+  /**
+   * Two lists containing lower and upper bounds for the values of each page.
+   * These may be the actual minimum and maximum values found on a page, but
+   * can also be (more compact) values that do not exist on a page. For
+   * example, instead of storing ""Blart Versenwald III", a writer may set
+   * min_values[i]="B", max_values[i]="C". Such more compact values must still
+   * be valid values within the column's logical type. Readers must make sure
+   * that list entries are populated before using them by inspecting 
null_pages.
+   */
+  2: required list<binary> min_values
+  3: required list<binary> max_values
+
+  /**
+   * Stores whether both min_values and max_values are orderd and if so, in
+   * which direction. This allows readers to perform binary searches in both
+   * lists. Readers cannot assume that max_values[i] <= min_values[i+1], even
+   * if the lists are ordered.
+   */
+  4: required BoundaryOrder boundary_order
+
+  /** A list containing the number of null values for each page **/
+  5: optional list<i64> null_counts
+}
+
 /**
  * Description for file metadata
  */

http://git-wip-us.apache.org/repos/asf/impala/blob/ccf19f9f/testdata/bin/load-dependent-tables.sql
----------------------------------------------------------------------
diff --git a/testdata/bin/load-dependent-tables.sql 
b/testdata/bin/load-dependent-tables.sql
index 7b25375..9de462f 100644
--- a/testdata/bin/load-dependent-tables.sql
+++ b/testdata/bin/load-dependent-tables.sql
@@ -66,6 +66,38 @@ ALTER TABLE alltypesmixedformat PARTITION (year=2009, 
month=2)
 ALTER TABLE alltypesmixedformat PARTITION (year=2009, month=3)
   SET FILEFORMAT RCFILE;
 
+DROP TABLE IF EXISTS functional_parquet.chars_formats;
+CREATE EXTERNAL TABLE functional_parquet.chars_formats
+(cs CHAR(5), cl CHAR(140), vc VARCHAR(32))
+STORED AS PARQUET
+LOCATION '${hiveconf:hive.metastore.warehouse.dir}/chars_formats_parquet';
+
+DROP TABLE IF EXISTS functional_orc_def.chars_formats;
+CREATE EXTERNAL TABLE functional_orc_def.chars_formats
+(cs CHAR(5), cl CHAR(140), vc VARCHAR(32))
+STORED AS ORC
+LOCATION '${hiveconf:hive.metastore.warehouse.dir}/chars_formats_orc_def';
+
+DROP TABLE IF EXISTS functional.chars_formats;
+CREATE EXTERNAL TABLE functional.chars_formats
+(cs CHAR(5), cl CHAR(140), vc VARCHAR(32))
+ROW FORMAT delimited fields terminated by ','  escaped by '\\'
+STORED AS TEXTFILE
+LOCATION '${hiveconf:hive.metastore.warehouse.dir}/chars_formats_text';
+
+DROP TABLE IF EXISTS functional_avro_snap.chars_formats;
+CREATE EXTERNAL TABLE functional_avro_snap.chars_formats
+(cs CHAR(5), cl CHAR(140), vc VARCHAR(32))
+STORED AS AVRO
+LOCATION '${hiveconf:hive.metastore.warehouse.dir}/chars_formats_avro_snap'
+TBLPROPERTIES ('avro.schema.literal'='{"type":"record",
+"name":"CharTypesTest","doc":"Schema generated by Kite",
+"fields":[
+{"name":"cs","type":["null","string"], "doc":"Type inferred"},
+{"name":"cl","type":["null","string"], "doc":"Type inferred"},
+{"name":"vc","type":["null","string"], "doc":"Type inferred"}
+]}');
+
 ---- Unsupported Impala table types
 USE functional;
 CREATE VIEW IF NOT EXISTS hive_view AS SELECT 1 AS int_col FROM alltypes limit 
1;
@@ -74,4 +106,5 @@ USE functional;
 DROP INDEX IF EXISTS hive_index ON alltypes;
 CREATE INDEX hive_index ON TABLE alltypes (int_col)
 AS 'org.apache.hadoop.hive.ql.index.compact.CompactIndexHandler'
-WITH DEFERRED REBUILD IN TABLE hive_index_tbl
+WITH DEFERRED REBUILD IN TABLE hive_index_tbl;
+

http://git-wip-us.apache.org/repos/asf/impala/blob/ccf19f9f/testdata/workloads/functional-query/queries/QueryTest/stats-extrapolation.test
----------------------------------------------------------------------
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/stats-extrapolation.test
 
b/testdata/workloads/functional-query/queries/QueryTest/stats-extrapolation.test
index ceaf1d0..55ec0a8 100644
--- 
a/testdata/workloads/functional-query/queries/QueryTest/stats-extrapolation.test
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/stats-extrapolation.test
@@ -33,17 +33,17 @@ show table stats alltypes
 YEAR, MONTH, #ROWS, EXTRAP #ROWS, #FILES, SIZE, BYTES CACHED, CACHE 
REPLICATION, FORMAT, INCREMENTAL STATS, LOCATION
 ---- RESULTS
 '2009','1',-1,308,1,regex:.*B,'NOT CACHED','NOT 
CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=1'
-'2009','2',-1,288,1,regex:.*B,'NOT CACHED','NOT 
CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=2'
-'2009','3',-1,308,1,regex:.*B,'NOT CACHED','NOT 
CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=3'
+'2009','2',-1,289,1,regex:.*B,'NOT CACHED','NOT 
CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=2'
+'2009','3',-1,307,1,regex:.*B,'NOT CACHED','NOT 
CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=3'
 '2009','4',-1,302,1,regex:.*B,'NOT CACHED','NOT 
CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=4'
-'2009','5',-1,308,1,regex:.*B,'NOT CACHED','NOT 
CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=5'
+'2009','5',-1,307,1,regex:.*B,'NOT CACHED','NOT 
CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=5'
 '2009','6',-1,302,1,regex:.*B,'NOT CACHED','NOT 
CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=6'
-'2009','7',-1,308,1,regex:.*B,'NOT CACHED','NOT 
CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=7'
-'2009','8',-1,308,1,regex:.*B,'NOT CACHED','NOT 
CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=8'
+'2009','7',-1,307,1,regex:.*B,'NOT CACHED','NOT 
CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=7'
+'2009','8',-1,307,1,regex:.*B,'NOT CACHED','NOT 
CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=8'
 '2009','9',-1,302,1,regex:.*B,'NOT CACHED','NOT 
CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=9'
-'2009','10',-1,308,1,regex:.*B,'NOT CACHED','NOT 
CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=10'
+'2009','10',-1,307,1,regex:.*B,'NOT CACHED','NOT 
CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=10'
 '2009','11',-1,302,1,regex:.*B,'NOT CACHED','NOT 
CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=11'
-'2009','12',-1,308,1,regex:.*B,'NOT CACHED','NOT 
CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=12'
+'2009','12',-1,307,1,regex:.*B,'NOT CACHED','NOT 
CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=12'
 'Total','',3650,3650,12,regex:.*B,'0B','','','',''
 ---- TYPES
 STRING,STRING,BIGINT,BIGINT,BIGINT,STRING,STRING,STRING,STRING,STRING,STRING

http://git-wip-us.apache.org/repos/asf/impala/blob/ccf19f9f/tests/query_test/test_chars.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_chars.py b/tests/query_test/test_chars.py
index 4444410..86ec095 100644
--- a/tests/query_test/test_chars.py
+++ b/tests/query_test/test_chars.py
@@ -47,39 +47,6 @@ class TestCharFormats(ImpalaTestSuite):
   def get_workload(cls):
     return 'functional-query'
 
-  def setup_method(self, method):
-    self.__create_char_tables()
-
-  def __create_char_tables(self):
-    self.client.execute('''create external table if not exists
-        functional_parquet.chars_formats
-        (cs CHAR(5), cl CHAR(140), vc VARCHAR(32))
-        STORED AS PARQUET
-        LOCATION 
"{0}"'''.format(get_fs_path("/test-warehouse/chars_formats_parquet")))
-    self.client.execute('''create external table if not exists
-        functional_orc_def.chars_formats
-        (cs CHAR(5), cl CHAR(140), vc VARCHAR(32))
-        STORED AS ORC
-        LOCATION 
"{0}"'''.format(get_fs_path("/test-warehouse/chars_formats_orc_def")))
-    self.client.execute('''create external table if not exists
-        functional.chars_formats
-        (cs CHAR(5), cl CHAR(140), vc VARCHAR(32))
-        ROW FORMAT delimited fields terminated by ','  escaped by '\\\\'
-        STORED AS TEXTFILE
-        LOCATION 
"{0}"'''.format(get_fs_path("/test-warehouse/chars_formats_text")))
-    self.client.execute('''create external table if not exists
-        functional_avro_snap.chars_formats
-        (cs CHAR(5), cl CHAR(140), vc VARCHAR(32))
-        STORED AS AVRO
-        LOCATION "{0}"
-        TBLPROPERTIES ('avro.schema.literal'='{{"type":"record",
-        "name":"CharTypesTest","doc":"Schema generated by Kite",
-        "fields":[
-        {{"name":"cs","type":["null","string"], "doc":"Type inferred"}},
-        {{"name":"cl","type":["null","string"], "doc":"Type inferred"}},
-        {{"name":"vc","type":["null","string"],"doc":"Type inferred"}}]}}')
-        '''.format(get_fs_path("/test-warehouse/chars_formats_avro_snap")))
-
   @classmethod
   def add_test_dimensions(cls):
     super(TestCharFormats, cls).add_test_dimensions()

http://git-wip-us.apache.org/repos/asf/impala/blob/ccf19f9f/tests/query_test/test_parquet_page_index.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_parquet_page_index.py 
b/tests/query_test/test_parquet_page_index.py
new file mode 100644
index 0000000..51632e5
--- /dev/null
+++ b/tests/query_test/test_parquet_page_index.py
@@ -0,0 +1,365 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Targeted Impala insert tests
+
+import os
+
+from collections import namedtuple
+from subprocess import check_call
+from parquet.ttypes import BoundaryOrder, ColumnIndex, OffsetIndex, 
PageHeader, PageType
+
+from tests.common.impala_test_suite import ImpalaTestSuite
+from tests.util.filesystem_utils import get_fs_path
+from tests.util.get_parquet_metadata import (
+    decode_stats_value,
+    get_parquet_metadata,
+    read_serialized_object
+)
+
+PAGE_INDEX_MAX_STRING_LENGTH = 64
+
+
+class TestHdfsParquetTableIndexWriter(ImpalaTestSuite):
+  """Since PARQUET-922 page statistics can be written before the footer.
+  The tests in this class checks if Impala writes the page indices correctly.
+  """
+  @classmethod
+  def get_workload(cls):
+    return 'functional-query'
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestHdfsParquetTableIndexWriter, cls).add_test_dimensions()
+    cls.ImpalaTestMatrix.add_constraint(
+        lambda v: v.get_value('table_format').file_format == 'parquet')
+
+  def _get_row_group_from_file(self, parquet_file):
+    """Returns namedtuples that contain the schema, stats, offset_index, 
column_index,
+    and page_headers for each column in the first row group in file 
'parquet_file'. Fails
+    if the file contains multiple row groups.
+    """
+    ColumnInfo = namedtuple('ColumnInfo', ['schema', 'stats', 'offset_index',
+        'column_index', 'page_headers'])
+
+    file_meta_data = get_parquet_metadata(parquet_file)
+    assert len(file_meta_data.row_groups) == 1
+    # We only support flat schemas, the additional element is the root element.
+    schemas = file_meta_data.schema[1:]
+    row_group = file_meta_data.row_groups[0]
+    assert len(schemas) == len(row_group.columns)
+    row_group_index = []
+    with open(parquet_file) as file_handle:
+      for column, schema in zip(row_group.columns, schemas):
+        column_index_offset = column.column_index_offset
+        column_index_length = column.column_index_length
+        column_index = None
+        if column_index_offset and column_index_length:
+          column_index = read_serialized_object(ColumnIndex, file_handle,
+                                                column_index_offset, 
column_index_length)
+        column_meta_data = column.meta_data
+        stats = None
+        if column_meta_data:
+          stats = column_meta_data.statistics
+
+        offset_index_offset = column.offset_index_offset
+        offset_index_length = column.offset_index_length
+        offset_index = None
+        page_headers = []
+        if offset_index_offset and offset_index_length:
+          offset_index = read_serialized_object(OffsetIndex, file_handle,
+                                                offset_index_offset, 
offset_index_length)
+          for page_loc in offset_index.page_locations:
+            page_header = read_serialized_object(PageHeader, file_handle, 
page_loc.offset,
+                                                 page_loc.compressed_page_size)
+            page_headers.append(page_header)
+
+        column_info = ColumnInfo(schema, stats, offset_index, column_index, 
page_headers)
+        row_group_index.append(column_info)
+    return row_group_index
+
+  def _get_row_groups_from_hdfs_folder(self, hdfs_path, tmpdir):
+    """Returns a list of column infos (containing the schema, stats, 
offset_index,
+    column_index, and page_headers) for the first row group in all parquet 
files in
+    'hdfs_path'.
+    """
+    row_group_indexes = []
+    check_call(['hdfs', 'dfs', '-get', hdfs_path, tmpdir.strpath])
+    for root, subdirs, files in os.walk(tmpdir.strpath):
+      for f in files:
+        parquet_file = os.path.join(root, str(f))
+        row_group_indexes.append(self._get_row_group_from_file(parquet_file))
+    return row_group_indexes
+
+  def _validate_page_locations(self, page_locations):
+    """Validate that the page locations are in order."""
+    for previous_loc, current_loc in zip(page_locations[:-1], 
page_locations[1:]):
+      assert previous_loc.offset < current_loc.offset
+      assert previous_loc.first_row_index < current_loc.first_row_index
+
+  def _validate_null_stats(self, index_size, column_info):
+    """Validates the statistics stored in null_pages and null_counts."""
+    column_index = column_info.column_index
+    column_stats = column_info.stats
+    assert column_index.null_pages is not None
+    assert len(column_index.null_pages) == index_size
+    assert column_index.null_counts is not None
+    assert len(column_index.null_counts) == index_size
+
+    for page_is_null, null_count, page_header in zip(column_index.null_pages,
+        column_index.null_counts, column_info.page_headers):
+      assert page_header.type == PageType.DATA_PAGE
+      num_values = page_header.data_page_header.num_values
+      assert not page_is_null or null_count == num_values
+
+    if column_stats:
+      assert column_stats.null_count == sum(column_index.null_counts)
+
+  def _validate_min_max_values(self, index_size, column_info):
+    """Validate min/max values of the pages in a column chunk."""
+    column_index = column_info.column_index
+    min_values = column_info.column_index.min_values
+    assert len(min_values) == index_size
+    max_values = column_info.column_index.max_values
+    assert len(max_values) == index_size
+
+    if not column_info.stats:
+      return
+
+    column_min_value_str = column_info.stats.min_value
+    column_max_value_str = column_info.stats.max_value
+    if column_min_value_str is None or column_max_value_str is None:
+      # If either is None, then both need to be None.
+      assert column_min_value_str is None and column_max_value_str is None
+      # No min and max value, all pages need to be null
+      for idx, null_page in enumerate(column_index.null_pages):
+        assert null_page, "Page {} of column {} is not null, \
+            but doesn't have min and max values!".format(idx, 
column_index.schema.name)
+      # Everything is None, no further checks needed.
+      return
+
+    column_min_value = decode_stats_value(column_info.schema, 
column_min_value_str)
+    for null_page, page_min_str in zip(column_index.null_pages, min_values):
+      if not null_page:
+        page_min_value = decode_stats_value(column_info.schema, page_min_str)
+        # If type is str, page_min_value might have been truncated.
+        if isinstance(page_min_value, basestring):
+          assert page_min_value >= column_min_value[:len(page_min_value)]
+        else:
+          assert page_min_value >= column_min_value
+
+    column_max_value = decode_stats_value(column_info.schema, 
column_max_value_str)
+    for null_page, page_max_str in zip(column_index.null_pages, max_values):
+      if not null_page:
+        page_max_value = decode_stats_value(column_info.schema, page_max_str)
+        # If type is str, page_max_value might have been truncated and 
incremented.
+        if (isinstance(page_max_value, basestring) and
+            len(page_max_value) == PAGE_INDEX_MAX_STRING_LENGTH):
+          max_val_prefix = page_max_value.rstrip('\0')
+          assert max_val_prefix[:-1] <= column_max_value
+        else:
+          assert page_max_value <= column_max_value
+
+  def _validate_ordering(self, ordering, schema, null_pages, min_values, 
max_values):
+    """Check if the ordering of the values reflects the value of 'ordering'."""
+
+    def is_sorted(l, reverse=False):
+      if not reverse:
+        return all(a <= b for a, b in zip(l, l[1:]))
+      else:
+        return all(a >= b for a, b in zip(l, l[1:]))
+
+    # Filter out null pages and decode the actual min/max values.
+    actual_min_values = [decode_stats_value(schema, min_val)
+                         for min_val, is_null in zip(min_values, null_pages)
+                         if not is_null]
+    actual_max_values = [decode_stats_value(schema, max_val)
+                         for max_val, is_null in zip(max_values, null_pages)
+                         if not is_null]
+
+    # For ASCENDING and DESCENDING, both min and max values need to be sorted.
+    if ordering == BoundaryOrder.ASCENDING:
+      assert is_sorted(actual_min_values)
+      assert is_sorted(actual_max_values)
+    elif ordering == BoundaryOrder.DESCENDING:
+      assert is_sorted(actual_min_values, reverse=True)
+      assert is_sorted(actual_max_values, reverse=True)
+    else:
+      assert ordering == BoundaryOrder.UNORDERED
+      # For UNORDERED, min and max values cannot be both sorted.
+      assert not is_sorted(actual_min_values) or not 
is_sorted(actual_max_values)
+      assert (not is_sorted(actual_min_values, reverse=True) or
+              not is_sorted(actual_max_values, reverse=True))
+
+  def _validate_boundary_order(self, column_info):
+    """Validate that min/max values are really in the order specified by
+    boundary order.
+    """
+    column_index = column_info.column_index
+    self._validate_ordering(column_index.boundary_order, column_info.schema,
+        column_index.null_pages, column_index.min_values, 
column_index.max_values)
+
+  def _validate_parquet_page_index(self, hdfs_path, tmpdir):
+    """Validates that 'hdfs_path' contains exactly one parquet file and that 
the rowgroup
+    index in that file is in the valid format.
+    """
+    row_group_indexes = self._get_row_groups_from_hdfs_folder(hdfs_path, 
tmpdir)
+    for columns in row_group_indexes:
+      for column_info in columns:
+        try:
+          index_size = len(column_info.offset_index.page_locations)
+          assert index_size > 0
+          
self._validate_page_locations(column_info.offset_index.page_locations)
+          self._validate_null_stats(index_size, column_info)
+          self._validate_min_max_values(index_size, column_info)
+          self._validate_boundary_order(column_info)
+        except AssertionError as e:
+          e.args += ("Validation failed on column 
{}.".format(column_info.schema.name),)
+          raise
+
+  def _ctas_table_and_verify_index(self, vector, unique_database, source_table,
+                                   tmpdir, sorting_column=None):
+    """Copies 'source_table' into a parquet table and makes sure that the index
+    in the resulting parquet file is valid.
+    """
+    table_name = "test_hdfs_parquet_table_writer"
+    qualified_table_name = "{0}.{1}".format(unique_database, table_name)
+    hdfs_path = 
get_fs_path('/test-warehouse/{0}.db/{1}/'.format(unique_database,
+                                                                 table_name))
+    # Setting num_nodes = 1 ensures that the query is executed on the 
coordinator,
+    # resulting in a single parquet file being written.
+    vector.get_value('exec_option')['num_nodes'] = 1
+    self.execute_query("drop table if exists {0}".format(qualified_table_name))
+    if sorting_column is None:
+      query = ("create table {0} stored as parquet as select * from 
{1}").format(
+          qualified_table_name, source_table)
+    else:
+      query = ("create table {0} sort by({1}) stored as parquet as select * 
from {2}"
+               ).format(qualified_table_name, sorting_column, source_table)
+    self.execute_query(query, vector.get_value('exec_option'))
+    self._validate_parquet_page_index(hdfs_path, tmpdir.join(source_table))
+
+  def _create_string_table_with_values(self, vector, unique_database, 
table_name,
+                                       values_sql):
+    """Creates a parquet table that has a single string column, then invokes 
an insert
+    statement on it with the 'values_sql' parameter. E.g. 'values_sql' is 
"('asdf')".
+    It returns the HDFS path for the table.
+    """
+    qualified_table_name = "{0}.{1}".format(unique_database, table_name)
+    self.execute_query("drop table if exists {0}".format(qualified_table_name))
+    vector.get_value('exec_option')['num_nodes'] = 1
+    query = ("create table {0} (str string) stored as 
parquet").format(qualified_table_name)
+    self.execute_query(query, vector.get_value('exec_option'))
+    self.execute_query("insert into {0} values 
{1}".format(qualified_table_name,
+        values_sql), vector.get_value('exec_option'))
+    return get_fs_path('/test-warehouse/{0}.db/{1}/'.format(unique_database,
+        table_name))
+
+  def test_write_index_alltypes(self, vector, unique_database, tmpdir):
+    """Test that writing a parquet file populates the rowgroup indexes with 
the correct
+    values.
+    """
+    self._ctas_table_and_verify_index(vector, unique_database, 
"functional.alltypes",
+        tmpdir)
+
+  def test_write_index_decimals(self, vector, unique_database, tmpdir):
+    """Test that writing a parquet file populates the rowgroup indexes with 
the correct
+    values, using decimal types.
+    """
+    self._ctas_table_and_verify_index(vector, unique_database, 
"functional.decimal_tbl",
+        tmpdir)
+
+  def test_write_index_chars(self, vector, unique_database, tmpdir):
+    """Test that writing a parquet file populates the rowgroup indexes with 
the correct
+    values, using char types.
+    """
+    self._ctas_table_and_verify_index(vector, unique_database, 
"functional.chars_formats",
+        tmpdir)
+
+  def test_write_index_null(self, vector, unique_database, tmpdir):
+    """Test that we don't write min/max values in the index for null columns.
+    Ensure null_count is set for columns with null values.
+    """
+    self._ctas_table_and_verify_index(vector, unique_database, 
"functional.nulltable",
+        tmpdir)
+
+  def test_write_index_multi_page(self, vector, unique_database, tmpdir):
+    """Test that when a ColumnChunk is written across multiple pages, the 
index is
+    valid.
+    """
+    self._ctas_table_and_verify_index(vector, unique_database, "tpch.customer",
+        tmpdir)
+    self._ctas_table_and_verify_index(vector, unique_database, "tpch.orders",
+        tmpdir)
+
+  def test_write_index_sorting_column(self, vector, unique_database, tmpdir):
+    """Test that when the schema has a sorting column, the index is valid."""
+    self._ctas_table_and_verify_index(vector, unique_database,
+        "functional_parquet.zipcode_incomes", tmpdir, "id")
+
+  def test_write_index_wide_table(self, vector, unique_database, tmpdir):
+    """Test table with wide row."""
+    self._ctas_table_and_verify_index(vector, unique_database,
+        "functional_parquet.widerow", tmpdir)
+
+  def test_write_index_many_columns_tables(self, vector, unique_database, 
tmpdir):
+    """Test tables with wide rows and many columns."""
+    self._ctas_table_and_verify_index(vector, unique_database,
+        "functional_parquet.widetable_250_cols", tmpdir)
+    self._ctas_table_and_verify_index(vector, unique_database,
+        "functional_parquet.widetable_500_cols", tmpdir)
+    self._ctas_table_and_verify_index(vector, unique_database,
+        "functional_parquet.widetable_1000_cols", tmpdir)
+
+  def test_max_string_values(self, vector, unique_database, tmpdir):
+    """Test string values that are all 0xFFs or end with 0xFFs."""
+
+    # String value is all of 0xFFs but its length is less than 
PAGE_INDEX_TRUNCATE_LENGTH.
+    short_tbl = "short_tbl"
+    short_hdfs_path = self._create_string_table_with_values(vector, 
unique_database,
+        short_tbl, "(rpad('', {0}, 
chr(255)))".format(PAGE_INDEX_MAX_STRING_LENGTH - 1))
+    self._validate_parquet_page_index(short_hdfs_path, tmpdir.join(short_tbl))
+
+    # String value is all of 0xFFs and its length is 
PAGE_INDEX_TRUNCATE_LENGTH.
+    fit_tbl = "fit_tbl"
+    fit_hdfs_path = self._create_string_table_with_values(vector, 
unique_database,
+        fit_tbl, "(rpad('', {0}, 
chr(255)))".format(PAGE_INDEX_MAX_STRING_LENGTH))
+    self._validate_parquet_page_index(fit_hdfs_path, tmpdir.join(fit_tbl))
+
+    # All bytes are 0xFFs and the string is longer then 
PAGE_INDEX_TRUNCATE_LENGTH, so we
+    # should not write page statistics.
+    too_long_tbl = "too_long_tbl"
+    too_long_hdfs_path = self._create_string_table_with_values(vector, 
unique_database,
+        too_long_tbl, "(rpad('', {0}, 
chr(255)))".format(PAGE_INDEX_MAX_STRING_LENGTH + 1))
+    row_group_indexes = 
self._get_row_groups_from_hdfs_folder(too_long_hdfs_path,
+        tmpdir.join(too_long_tbl))
+    column = row_group_indexes[0][0]
+    assert column.column_index is None
+    # We always write the offset index
+    assert column.offset_index is not None
+
+    # Test string with value that starts with 'aaa' following with 0xFFs and 
its length is
+    # greater than PAGE_INDEX_TRUNCATE_LENGTH. Max value should be 'aab'.
+    aaa_tbl = "aaa_tbl"
+    aaa_hdfs_path = self._create_string_table_with_values(vector, 
unique_database,
+        aaa_tbl, "(rpad('aaa', {0}, 
chr(255)))".format(PAGE_INDEX_MAX_STRING_LENGTH + 1))
+    row_group_indexes = self._get_row_groups_from_hdfs_folder(aaa_hdfs_path,
+        tmpdir.join(aaa_tbl))
+    column = row_group_indexes[0][0]
+    assert len(column.column_index.max_values) == 1
+    max_value = column.column_index.max_values[0]
+    assert max_value == 'aab'

http://git-wip-us.apache.org/repos/asf/impala/blob/ccf19f9f/tests/util/get_parquet_metadata.py
----------------------------------------------------------------------
diff --git a/tests/util/get_parquet_metadata.py 
b/tests/util/get_parquet_metadata.py
index 8cf0405..f6a0e59 100644
--- a/tests/util/get_parquet_metadata.py
+++ b/tests/util/get_parquet_metadata.py
@@ -20,13 +20,21 @@ import struct
 
 from datetime import date, datetime, time, timedelta
 from decimal import Decimal
-from parquet.ttypes import FileMetaData, Type
+from parquet.ttypes import ColumnIndex, FileMetaData, OffsetIndex, PageHeader, 
Type
 from thrift.protocol import TCompactProtocol
 from thrift.transport import TTransport
 
 PARQUET_VERSION_NUMBER = 'PAR1'
 
 
+def create_protocol(serialized_object_buffer):
+  """Creates a thrift protocol object from a memory buffer. The buffer should
+  contain a serialized thrift object.
+  """
+  transport = TTransport.TMemoryBuffer(serialized_object_buffer)
+  return TCompactProtocol.TCompactProtocol(transport)
+
+
 def julian_day_to_date(julian_day):
   """Converts a julian day number into a Gregorian date. The reference date is 
picked
   arbitrarily and can be validated with an online converter like
@@ -71,7 +79,8 @@ def parse_double(s):
 
 def decode_timestamp(s):
   """Reinterprets the string 's' as a 12-byte timestamp as written by Impala 
and decode it
-  into a datetime object."""
+  into a datetime object.
+  """
   # Impala writes timestamps as 12-byte values. The first 8 byte store a
   # boost::posix_time::time_duration, which is the time within the current day 
in
   # nanoseconds stored as int64. The last 4 bytes store a 
boost::gregorian::date,
@@ -99,7 +108,8 @@ def decode_decimal(schema, value):
 def decode_stats_value(schema, value):
   """Decodes 'value' according to 'schema. It expects 'value' to be plain 
encoded. For
   BOOLEAN values, only the least significant bit is parsed and returned. 
Binary arrays are
-  expected to be stored as such, without a preceding length."""
+  expected to be stored as such, without a preceding length.
+  """
   column_type = schema.type
   if column_type == Type.BOOLEAN:
     return parse_boolean(value)
@@ -123,9 +133,22 @@ def decode_stats_value(schema, value):
   return None
 
 
+def read_serialized_object(thrift_class, file, file_pos, length):
+  """Reads an instance of class 'thrift_class' from an already opened file at 
the
+  given position.
+  """
+  file.seek(file_pos)
+  serialized_thrift_object = file.read(length)
+  protocol = create_protocol(serialized_thrift_object)
+  thrift_object = thrift_class()
+  thrift_object.read(protocol)
+  return thrift_object
+
+
 def get_parquet_metadata(filename):
   """Returns a FileMetaData as defined in parquet.thrift. 'filename' must be a 
local
-  file path."""
+  file path.
+  """
   file_size = os.path.getsize(filename)
   with open(filename) as f:
     # Check file starts and ends with magic bytes
@@ -140,13 +163,8 @@ def get_parquet_metadata(filename):
     f.seek(file_size - len(PARQUET_VERSION_NUMBER) - 4)
     metadata_len = parse_int32(f.read(4))
 
-    # Read metadata
-    f.seek(file_size - len(PARQUET_VERSION_NUMBER) - 4 - metadata_len)
-    serialized_metadata = f.read(metadata_len)
+    # Calculate metadata position in file
+    metadata_pos = file_size - len(PARQUET_VERSION_NUMBER) - 4 - metadata_len
 
-    # Deserialize metadata
-    transport = TTransport.TMemoryBuffer(serialized_metadata)
-    protocol = TCompactProtocol.TCompactProtocol(transport)
-    metadata = FileMetaData()
-    metadata.read(protocol)
-    return metadata
+    # Return deserialized FileMetaData object
+    return read_serialized_object(FileMetaData, f, metadata_pos, metadata_len)

Reply via email to