IMPALA-3909: Populate min/max statistics in Parquet writer

Change-Id: I8368ee58daa50c07a3b8ef65be70203eb941f619
Reviewed-on: http://gerrit.cloudera.org:8080/5611
Reviewed-by: Lars Volker <l...@cloudera.com>
Tested-by: Impala Public Jenkins
Reviewed-by: Tim Armstrong <tarmstr...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/6251d8b4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/6251d8b4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/6251d8b4

Branch: refs/heads/master
Commit: 6251d8b4ddac3bdd6fb651f000aea15b7a0d1603
Parents: a5b7689
Author: Lars Volker <l...@cloudera.com>
Authored: Mon Dec 5 19:18:51 2016 +0100
Committer: Tim Armstrong <tarmstr...@cloudera.com>
Committed: Thu Feb 2 06:44:48 2017 +0000

----------------------------------------------------------------------
 be/src/exec/hdfs-parquet-table-writer.cc | 209 ++++++++++-----
 be/src/exec/hdfs-parquet-table-writer.h  |   5 +
 be/src/exec/parquet-column-stats.h       | 201 +++++++++++++++
 be/src/exec/parquet-common.h             |  19 +-
 be/src/runtime/string-value.h            |   2 +-
 be/src/util/dict-test.cc                 |  14 +-
 tests/query_test/test_insert_parquet.py  | 356 ++++++++++++++++++++++++--
 tests/util/get_parquet_metadata.py       | 105 +++++++-
 8 files changed, 792 insertions(+), 119 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6251d8b4/be/src/exec/hdfs-parquet-table-writer.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-table-writer.cc 
b/be/src/exec/hdfs-parquet-table-writer.cc
index 7ae55b7..9e9cb3e 100644
--- a/be/src/exec/hdfs-parquet-table-writer.cc
+++ b/be/src/exec/hdfs-parquet-table-writer.cc
@@ -18,8 +18,10 @@
 #include "exec/hdfs-parquet-table-writer.h"
 
 #include "common/version.h"
-#include "exprs/expr.h"
+#include "exec/parquet-column-stats.h"
 #include "exprs/expr-context.h"
+#include "exprs/expr.h"
+#include "rpc/thrift-util.h"
 #include "runtime/decimal-value.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/raw-value.h"
@@ -34,7 +36,6 @@
 #include "util/dict-encoding.h"
 #include "util/hdfs-util.h"
 #include "util/rle-encoding.h"
-#include "rpc/thrift-util.h"
 
 #include <sstream>
 
@@ -88,14 +89,20 @@ class HdfsParquetTableWriter::BaseColumnWriter {
   // expr - the expression to generate output values for this column.
   BaseColumnWriter(HdfsParquetTableWriter* parent, ExprContext* expr_ctx,
       const THdfsCompression::type& codec)
-    : parent_(parent), expr_ctx_(expr_ctx), codec_(codec),
-      page_size_(DEFAULT_DATA_PAGE_SIZE), current_page_(NULL), num_values_(0),
+    : parent_(parent),
+      expr_ctx_(expr_ctx),
+      codec_(codec),
+      page_size_(DEFAULT_DATA_PAGE_SIZE),
+      current_page_(nullptr),
+      num_values_(0),
       total_compressed_byte_size_(0),
       total_uncompressed_byte_size_(0),
-      dict_encoder_base_(NULL),
-      def_levels_(NULL),
-      values_buffer_len_(DEFAULT_DATA_PAGE_SIZE) {
-    Codec::CreateCompressor(NULL, false, codec, &compressor_);
+      dict_encoder_base_(nullptr),
+      def_levels_(nullptr),
+      values_buffer_len_(DEFAULT_DATA_PAGE_SIZE),
+      page_stats_base_(nullptr),
+      row_group_stats_base_(nullptr) {
+    Codec::CreateCompressor(nullptr, false, codec, &compressor_);
 
     def_levels_ = parent_->state_->obj_pool()->Add(
         new 
RleEncoder(parent_->reusable_col_mem_pool_->Allocate(DEFAULT_DATA_PAGE_SIZE),
@@ -122,13 +129,24 @@ class HdfsParquetTableWriter::BaseColumnWriter {
   Status Flush(int64_t* file_pos, int64_t* first_data_page,
       int64_t* first_dictionary_page);
 
+  // Encodes the row group statistics into a parquet::Statistics object and 
attaches it to
+  // 'meta_data'.
+  void EncodeRowGroupStats(ColumnMetaData* meta_data) {
+    DCHECK(row_group_stats_base_ != nullptr);
+    if (row_group_stats_base_->has_values()
+        && row_group_stats_base_->BytesNeeded() <= MAX_COLUMN_STATS_SIZE) {
+      row_group_stats_base_->EncodeToThrift(&meta_data->statistics);
+      meta_data->__isset.statistics = true;
+    }
+  }
+
   // Resets all the data accumulated for this column.  Memory can now be 
reused for
   // the next row group
   // Any data for previous row groups must be reset (e.g. dictionaries).
   // Subclasses must call this if they override this function.
   virtual void Reset() {
     num_data_pages_ = 0;
-    current_page_ = NULL;
+    current_page_ = nullptr;
     num_values_ = 0;
     total_compressed_byte_size_ = 0;
     current_encoding_ = Encoding::PLAIN;
@@ -137,8 +155,8 @@ class HdfsParquetTableWriter::BaseColumnWriter {
   // Close this writer. This is only called after Flush() and no more rows will
   // be added.
   void Close() {
-    if (compressor_.get() != NULL) compressor_->Close();
-    if (dict_encoder_base_ != NULL) dict_encoder_base_->ClearIndices();
+    if (compressor_.get() != nullptr) compressor_->Close();
+    if (dict_encoder_base_ != nullptr) dict_encoder_base_->ClearIndices();
   }
 
   const ColumnType& type() const { return expr_ctx_->root()->type(); }
@@ -152,13 +170,14 @@ class HdfsParquetTableWriter::BaseColumnWriter {
  protected:
   friend class HdfsParquetTableWriter;
 
-  // Encode value into the current page output buffer. Returns true if the 
value fits
-  // on the current page. If this function returned false, the caller should 
create a
-  // new page and try again with the same value.
+  // Encodes value into the current page output buffer and updates the column 
statistics
+  // aggregates. Returns true if the value fits on the current page. If this 
function
+  // returned false, the caller should create a new page and try again with 
the same
+  // value.
   // *bytes_needed will contain the (estimated) number of bytes needed to 
successfully
   // encode the value in the page.
   // Implemented in the subclass.
-  virtual bool EncodeValue(void* value, int64_t* bytes_needed) = 0;
+  virtual bool ProcessValue(void* value, int64_t* bytes_needed) = 0;
 
   // Encodes out all data for the current page and updates the metadata.
   virtual void FinalizeCurrentPage();
@@ -194,7 +213,8 @@ class HdfsParquetTableWriter::BaseColumnWriter {
 
   THdfsCompression::type codec_;
 
-  // Compression codec for this column.  If NULL, this column is will not be 
compressed.
+  // Compression codec for this column.  If nullptr, this column is will not be
+  // compressed.
   scoped_ptr<Codec> compressor_;
 
   vector<DataPage> pages_;
@@ -210,25 +230,31 @@ class HdfsParquetTableWriter::BaseColumnWriter {
   // TODO: Consider removing and only creating a single large page as 
necessary.
   int64_t page_size_;
 
+  // Pointer to the current page in 'pages_'. Not owned.
   DataPage* current_page_;
-  int64_t num_values_; // Total number of values across all pages, including 
NULLs.
+
+  int64_t num_values_; // Total number of values across all pages, including 
nullptr.
   int64_t total_compressed_byte_size_;
   int64_t total_uncompressed_byte_size_;
   Encoding::type current_encoding_;
 
-  // Created and set by the base class.
+  // Created, owned, and set by the derived class.
   DictEncoderBase* dict_encoder_base_;
 
-  // Rle encoder object for storing definition levels. For non-nested schemas,
-  // this always uses 1 bit per row.
-  // This is reused across pages since the underlying buffer is copied out when
-  // the page is finalized.
+  // Rle encoder object for storing definition levels, owned by instances of 
this class.
+  // For non-nested schemas, this always uses 1 bit per row. This is reused 
across pages
+  // since the underlying buffer is copied out when the page is finalized.
   RleEncoder* def_levels_;
 
-  // Data for buffered values. This is reused across pages.
+  // Data for buffered values. This is owned by instances of this class and 
gets reused
+  // across pages.
   uint8_t* values_buffer_;
   // The size of values_buffer_.
   int values_buffer_len_;
+
+  // Pointers to statistics, created, owned, and set by the derived class.
+  ColumnStatsBase* page_stats_base_;
+  ColumnStatsBase* row_group_stats_base_;
 };
 
 // Per type column writer.
@@ -237,10 +263,16 @@ class HdfsParquetTableWriter::ColumnWriter :
     public HdfsParquetTableWriter::BaseColumnWriter {
  public:
   ColumnWriter(HdfsParquetTableWriter* parent, ExprContext* ctx,
-      const THdfsCompression::type& codec) : BaseColumnWriter(parent, ctx, 
codec),
-      num_values_since_dict_size_check_(0) {
+      const THdfsCompression::type& codec)
+    : BaseColumnWriter(parent, ctx, codec),
+      num_values_since_dict_size_check_(0),
+      plain_encoded_value_size_(
+          ParquetPlainEncoder::EncodedByteSize(ctx->root()->type())),
+      page_stats_(plain_encoded_value_size_),
+      row_group_stats_(plain_encoded_value_size_) {
     DCHECK_NE(ctx->root()->type().type, TYPE_BOOLEAN);
-    encoded_value_size_ = ParquetPlainEncoder::ByteSize(ctx->root()->type());
+    page_stats_base_ = &page_stats_;
+    row_group_stats_base_ = &row_group_stats_;
   }
 
   virtual void Reset() {
@@ -249,12 +281,14 @@ class HdfsParquetTableWriter::ColumnWriter :
     // it will fall back to plain.
     current_encoding_ = Encoding::PLAIN_DICTIONARY;
     dict_encoder_.reset(
-        new DictEncoder<T>(parent_->per_file_mem_pool_.get(), 
encoded_value_size_));
+        new DictEncoder<T>(parent_->per_file_mem_pool_.get(), 
plain_encoded_value_size_));
     dict_encoder_base_ = dict_encoder_.get();
+    page_stats_.Reset();
+    row_group_stats_.Reset();
   }
 
  protected:
-  virtual bool EncodeValue(void* value, int64_t* bytes_needed) {
+  virtual bool ProcessValue(void* value, int64_t* bytes_needed) {
     if (current_encoding_ == Encoding::PLAIN_DICTIONARY) {
       if (UNLIKELY(num_values_since_dict_size_check_ >=
                    DICTIONARY_DATA_PAGE_SIZE_CHECK_PERIOD)) {
@@ -273,20 +307,22 @@ class HdfsParquetTableWriter::ColumnWriter :
       parent_->file_size_estimate_ += *bytes_needed;
     } else if (current_encoding_ == Encoding::PLAIN) {
       T* v = CastValue(value);
-      *bytes_needed = encoded_value_size_ < 0 ?
-          ParquetPlainEncoder::ByteSize<T>(*v) : encoded_value_size_;
+      *bytes_needed = plain_encoded_value_size_ < 0 ?
+          ParquetPlainEncoder::ByteSize<T>(*v) :
+          plain_encoded_value_size_;
       if (current_page_->header.uncompressed_page_size + *bytes_needed > 
page_size_) {
         return false;
       }
       uint8_t* dst_ptr = values_buffer_ + 
current_page_->header.uncompressed_page_size;
       int64_t written_len =
-          ParquetPlainEncoder::Encode(dst_ptr, encoded_value_size_, *v);
+          ParquetPlainEncoder::Encode(dst_ptr, plain_encoded_value_size_, *v);
       DCHECK_EQ(*bytes_needed, written_len);
       current_page_->header.uncompressed_page_size += written_len;
     } else {
       // TODO: support other encodings here
       DCHECK(false);
     }
+    page_stats_.Update(*CastValue(value));
     return true;
   }
 
@@ -306,12 +342,18 @@ class HdfsParquetTableWriter::ColumnWriter :
   // The number of values added since we last checked the dictionary.
   int num_values_since_dict_size_check_;
 
-  // Size of each encoded value. -1 if the size is type is variable-length.
-  int64_t encoded_value_size_;
+  // Size of each encoded value in plain encoding. -1 if the type is 
variable-length.
+  int64_t plain_encoded_value_size_;
 
   // Temporary string value to hold CHAR(N)
   StringValue temp_;
 
+  // Tracks statistics per page.
+  ColumnStats<T> page_stats_;
+
+  // Tracks statistics per row group. This gets reset when starting a new row 
group.
+  ColumnStats<T> row_group_stats_;
+
   // Converts a slot pointer to a raw value suitable for encoding
   inline T* CastValue(void* value) {
     return reinterpret_cast<T*>(value);
@@ -334,23 +376,30 @@ class HdfsParquetTableWriter::BoolColumnWriter :
     public HdfsParquetTableWriter::BaseColumnWriter {
  public:
   BoolColumnWriter(HdfsParquetTableWriter* parent, ExprContext* ctx,
-      const THdfsCompression::type& codec) : BaseColumnWriter(parent, ctx, 
codec) {
+      const THdfsCompression::type& codec)
+    : BaseColumnWriter(parent, ctx, codec), page_stats_(-1), 
row_group_stats_(-1) {
     DCHECK_EQ(ctx->root()->type().type, TYPE_BOOLEAN);
     bool_values_ = parent_->state_->obj_pool()->Add(
         new BitWriter(values_buffer_, values_buffer_len_));
     // Dictionary encoding doesn't make sense for bools and is not allowed by
     // the format.
     current_encoding_ = Encoding::PLAIN;
-    dict_encoder_base_ = NULL;
+    dict_encoder_base_ = nullptr;
+
+    page_stats_base_ = &page_stats_;
+    row_group_stats_base_ = &row_group_stats_;
   }
 
  protected:
-  virtual bool EncodeValue(void* value, int64_t* bytes_needed) {
-    return bool_values_->PutValue(*reinterpret_cast<bool*>(value), 1);
+  virtual bool ProcessValue(void* value, int64_t* bytes_needed) {
+    bool v = *reinterpret_cast<bool*>(value);
+    if (!bool_values_->PutValue(v, 1)) return false;
+    page_stats_.Update(v);
+    return true;
   }
 
   virtual void FinalizeCurrentPage() {
-    DCHECK(current_page_ != NULL);
+    DCHECK(current_page_ != nullptr);
     if (current_page_->finalized) return;
     bool_values_->Flush();
     int num_bytes = bool_values_->bytes_written();
@@ -363,6 +412,12 @@ class HdfsParquetTableWriter::BoolColumnWriter :
  private:
   // Used to encode bools as single bit values. This is reused across pages.
   BitWriter* bool_values_;
+
+  // Tracks statistics per page.
+  ColumnStats<bool> page_stats_;
+
+  // Tracks statistics per row group. This gets reset when starting a new file.
+  ColumnStats<bool> row_group_stats_;
 };
 
 }
@@ -370,7 +425,7 @@ class HdfsParquetTableWriter::BoolColumnWriter :
 inline Status HdfsParquetTableWriter::BaseColumnWriter::AppendRow(TupleRow* 
row) {
   ++num_values_;
   void* value = expr_ctx_->GetValue(row);
-  if (current_page_ == NULL) NewPage();
+  if (current_page_ == nullptr) NewPage();
 
   // Ensure that we have enough space for the definition level, but don't 
write it yet in
   // case we don't have enough space for the value.
@@ -386,10 +441,10 @@ inline Status 
HdfsParquetTableWriter::BaseColumnWriter::AppendRow(TupleRow* row)
   // this won't loop forever.
   while (true) {
     // Nulls don't get encoded.
-    if (value == NULL) break;
+    if (value == nullptr) break;
 
     int64_t bytes_needed = 0;
-    if (EncodeValue(value, &bytes_needed)) {
+    if (ProcessValue(value, &bytes_needed)) {
       ++current_page_->num_non_null;
       break;
     }
@@ -416,7 +471,7 @@ inline Status 
HdfsParquetTableWriter::BaseColumnWriter::AppendRow(TupleRow* row)
   }
 
   // Now that the value has been successfully written, write the definition 
level.
-  bool ret = def_levels_->Put(value != NULL);
+  bool ret = def_levels_->Put(value != nullptr);
   // Writing the def level will succeed because we ensured there was enough 
space for it
   // above, and new pages will always have space for at least a single def 
level.
   DCHECK(ret);
@@ -426,7 +481,7 @@ inline Status 
HdfsParquetTableWriter::BaseColumnWriter::AppendRow(TupleRow* row)
 }
 
 inline void HdfsParquetTableWriter::BaseColumnWriter::WriteDictDataPage() {
-  DCHECK(dict_encoder_base_ != NULL);
+  DCHECK(dict_encoder_base_ != nullptr);
   DCHECK_EQ(current_page_->header.uncompressed_page_size, 0);
   if (current_page_->num_non_null == 0) return;
   int len = dict_encoder_base_->WriteData(values_buffer_, values_buffer_len_);
@@ -443,7 +498,7 @@ inline void 
HdfsParquetTableWriter::BaseColumnWriter::WriteDictDataPage() {
 
 Status HdfsParquetTableWriter::BaseColumnWriter::Flush(int64_t* file_pos,
    int64_t* first_data_page, int64_t* first_dictionary_page) {
-  if (current_page_ == NULL) {
+  if (current_page_ == nullptr) {
     // This column/file is empty
     *first_data_page = *file_pos;
     *first_dictionary_page = -1;
@@ -454,7 +509,7 @@ Status 
HdfsParquetTableWriter::BaseColumnWriter::Flush(int64_t* file_pos,
 
   *first_dictionary_page = -1;
   // First write the dictionary page before any of the data pages.
-  if (dict_encoder_base_ != NULL) {
+  if (dict_encoder_base_ != nullptr) {
     *first_dictionary_page = *file_pos;
     // Write dictionary page header
     DictionaryPageHeader dict_header;
@@ -470,7 +525,7 @@ Status 
HdfsParquetTableWriter::BaseColumnWriter::Flush(int64_t* file_pos,
     uint8_t* dict_buffer = parent_->per_file_mem_pool_->Allocate(
         header.uncompressed_page_size);
     dict_encoder_base_->WriteDict(dict_buffer);
-    if (compressor_.get() != NULL) {
+    if (compressor_.get() != nullptr) {
       SCOPED_TIMER(parent_->parent_->compress_timer());
       int64_t max_compressed_size =
           compressor_->MaxOutputLen(header.uncompressed_page_size);
@@ -515,7 +570,7 @@ Status 
HdfsParquetTableWriter::BaseColumnWriter::Flush(int64_t* file_pos,
     }
 
     // Write data page header
-    uint8_t* buffer = NULL;
+    uint8_t* buffer = nullptr;
     uint32_t len = 0;
     RETURN_IF_ERROR(
         parent_->thrift_serializer_->Serialize(&page.header, &len, &buffer));
@@ -530,10 +585,10 @@ Status 
HdfsParquetTableWriter::BaseColumnWriter::Flush(int64_t* file_pos,
 }
 
 void HdfsParquetTableWriter::BaseColumnWriter::FinalizeCurrentPage() {
-  DCHECK(current_page_ != NULL);
+  DCHECK(current_page_ != nullptr);
   if (current_page_->finalized) return;
 
-  // If the entire page was NULL, encode it as PLAIN since there is no
+  // If the entire page was nullptr, encode it as PLAIN since there is no
   // data anyway. We don't output a useless dictionary page and it works
   // around a parquet MR bug (see IMPALA-759 for more details).
   if (current_page_->num_non_null == 0) current_encoding_ = Encoding::PLAIN;
@@ -549,8 +604,8 @@ void 
HdfsParquetTableWriter::BaseColumnWriter::FinalizeCurrentPage() {
   header.uncompressed_page_size += current_page_->num_def_bytes;
 
   // At this point we know all the data for the data page.  Combine them into 
one buffer.
-  uint8_t* uncompressed_data = NULL;
-  if (compressor_.get() == NULL) {
+  uint8_t* uncompressed_data = nullptr;
+  if (compressor_.get() == nullptr) {
     uncompressed_data =
         parent_->per_file_mem_pool_->Allocate(header.uncompressed_page_size);
   } else {
@@ -571,8 +626,8 @@ void 
HdfsParquetTableWriter::BaseColumnWriter::FinalizeCurrentPage() {
   buffer.Append(values_buffer_, buffer.capacity() - buffer.size());
 
   // Apply compression if necessary
-  if (compressor_.get() == NULL) {
-    current_page_->data = reinterpret_cast<uint8_t*>(uncompressed_data);
+  if (compressor_.get() == nullptr) {
+    current_page_->data = uncompressed_data;
     header.compressed_page_size = header.uncompressed_page_size;
   } else {
     SCOPED_TIMER(parent_->parent_->compress_timer());
@@ -591,6 +646,18 @@ void 
HdfsParquetTableWriter::BaseColumnWriter::FinalizeCurrentPage() {
         max_compressed_size - header.compressed_page_size);
   }
 
+  // Build page statistics and add them to the header.
+  DCHECK(page_stats_base_ != nullptr);
+  if (page_stats_base_->has_values()
+      && page_stats_base_->BytesNeeded() <= MAX_COLUMN_STATS_SIZE) {
+    page_stats_base_->EncodeToThrift(&header.data_page_header.statistics);
+    header.data_page_header.__isset.statistics = true;
+  }
+
+  // Update row group statistics from page statistics.
+  DCHECK(row_group_stats_base_ != nullptr);
+  row_group_stats_base_->Merge(*page_stats_base_);
+
   // Add the size of the data page header
   uint8_t* header_buffer;
   uint32_t header_len = 0;
@@ -623,21 +690,20 @@ void HdfsParquetTableWriter::BaseColumnWriter::NewPage() {
   }
   current_page_->finalized = false;
   current_page_->num_non_null = 0;
+  page_stats_base_->Reset();
 }
 
 HdfsParquetTableWriter::HdfsParquetTableWriter(HdfsTableSink* parent, 
RuntimeState* state,
     OutputPartition* output, const HdfsPartitionDescriptor* part_desc,
     const HdfsTableDescriptor* table_desc, const vector<ExprContext*>& 
output_expr_ctxs)
-    : HdfsTableWriter(
-        parent, state, output, part_desc, table_desc, output_expr_ctxs),
-      thrift_serializer_(new ThriftSerializer(true)),
-      current_row_group_(NULL),
-      row_count_(0),
-      file_size_limit_(0),
-      reusable_col_mem_pool_(new MemPool(parent_->mem_tracker())),
-      per_file_mem_pool_(new MemPool(parent_->mem_tracker())),
-      row_idx_(0) {
-}
+  : HdfsTableWriter(parent, state, output, part_desc, table_desc, 
output_expr_ctxs),
+    thrift_serializer_(new ThriftSerializer(true)),
+    current_row_group_(nullptr),
+    row_count_(0),
+    file_size_limit_(0),
+    reusable_col_mem_pool_(new MemPool(parent_->mem_tracker())),
+    per_file_mem_pool_(new MemPool(parent_->mem_tracker())),
+    row_idx_(0) {}
 
 HdfsParquetTableWriter::~HdfsParquetTableWriter() {
 }
@@ -671,7 +737,7 @@ Status HdfsParquetTableWriter::Init() {
   columns_.resize(table_desc_->num_cols() - 
table_desc_->num_clustering_cols());
   // Initialize each column structure.
   for (int i = 0; i < columns_.size(); ++i) {
-    BaseColumnWriter* writer = NULL;
+    BaseColumnWriter* writer = nullptr;
     const ColumnType& type = output_expr_ctxs_[i]->root()->type();
     switch (type.type) {
       case TYPE_BOOLEAN:
@@ -776,7 +842,7 @@ Status HdfsParquetTableWriter::CreateSchema() {
 }
 
 Status HdfsParquetTableWriter::AddRowGroup() {
-  if (current_row_group_ != NULL) RETURN_IF_ERROR(FlushCurrentRowGroup());
+  if (current_row_group_ != nullptr) RETURN_IF_ERROR(FlushCurrentRowGroup());
   file_metadata_.row_groups.push_back(RowGroup());
   current_row_group_ = 
&file_metadata_.row_groups[file_metadata_.row_groups.size() - 1];
 
@@ -825,7 +891,7 @@ uint64_t HdfsParquetTableWriter::default_block_size() const 
{
 }
 
 Status HdfsParquetTableWriter::InitNewFile() {
-  DCHECK(current_row_group_ == NULL);
+  DCHECK(current_row_group_ == nullptr);
 
   per_file_mem_pool_->Clear();
 
@@ -939,7 +1005,7 @@ Status HdfsParquetTableWriter::WriteFileHeader() {
 }
 
 Status HdfsParquetTableWriter::FlushCurrentRowGroup() {
-  if (current_row_group_ == NULL) return Status::OK();
+  if (current_row_group_ == nullptr) return Status::OK();
 
   int num_clustering_cols = table_desc_->num_clustering_cols();
   for (int i = 0; i < columns_.size(); ++i) {
@@ -965,6 +1031,9 @@ Status HdfsParquetTableWriter::FlushCurrentRowGroup() {
     const string& col_name = table_desc_->col_descs()[i + 
num_clustering_cols].name();
     parquet_stats_.per_column_size[col_name] += 
columns_[i]->total_compressed_size();
 
+    // Build column statistics and add them to the header.
+    
columns_[i]->EncodeRowGroupStats(&current_row_group_->columns[i].meta_data);
+
     // Since we don't supported complex schemas, all columns should have the 
same
     // number of values.
     DCHECK_EQ(current_row_group_->columns[0].meta_data.num_values,
@@ -973,7 +1042,7 @@ Status HdfsParquetTableWriter::FlushCurrentRowGroup() {
     // Metadata for this column is complete, write it out to file.  The column 
metadata
     // goes at the end so that when we have collocated files, the column data 
can be
     // written without buffering.
-    uint8_t* buffer = NULL;
+    uint8_t* buffer = nullptr;
     uint32_t len = 0;
     RETURN_IF_ERROR(
         thrift_serializer_->Serialize(&current_row_group_->columns[i], &len, 
&buffer));
@@ -983,14 +1052,14 @@ Status HdfsParquetTableWriter::FlushCurrentRowGroup() {
     columns_[i]->Reset();
   }
 
-  current_row_group_ = NULL;
+  current_row_group_ = nullptr;
   return Status::OK();
 }
 
 Status HdfsParquetTableWriter::WriteFileFooter() {
   // Write file_meta_data
   uint32_t file_metadata_len = 0;
-  uint8_t* buffer = NULL;
+  uint8_t* buffer = nullptr;
   RETURN_IF_ERROR(
       thrift_serializer_->Serialize(&file_metadata_, &file_metadata_len, 
&buffer));
   RETURN_IF_ERROR(Write(buffer, file_metadata_len));

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6251d8b4/be/src/exec/hdfs-parquet-table-writer.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-table-writer.h 
b/be/src/exec/hdfs-parquet-table-writer.h
index 4e16707..94ad932 100644
--- a/be/src/exec/hdfs-parquet-table-writer.h
+++ b/be/src/exec/hdfs-parquet-table-writer.h
@@ -100,6 +100,11 @@ class HdfsParquetTableWriter : public HdfsTableWriter {
   /// Minimum file size.  If the configured size is less, fail.
   static const int HDFS_MIN_FILE_SIZE = 8 * 1024 * 1024;
 
+  /// Maximum statistics size. If the size of a single thrift 
parquet::Statistics struct
+  /// for a page or row group exceed this value, we'll not write it. We use 
the same value
+  /// as 'parquet-mr'.
+  static const int MAX_COLUMN_STATS_SIZE = 4 * 1024;
+
   /// Per-column information state.  This contains some metadata as well as the
   /// data buffers.
   class BaseColumnWriter;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6251d8b4/be/src/exec/parquet-column-stats.h
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-stats.h 
b/be/src/exec/parquet-column-stats.h
new file mode 100644
index 0000000..8bcf4de
--- /dev/null
+++ b/be/src/exec/parquet-column-stats.h
@@ -0,0 +1,201 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_EXEC_PARQUET_COLUMN_STATS_H
+#define IMPALA_EXEC_PARQUET_COLUMN_STATS_H
+
+#include <type_traits>
+
+namespace impala {
+
+/// This class, together with its derivatives, is used to track column 
statistics when
+/// writing parquet files. It provides an interface to populate a 
parquet::Statistics
+/// object and attach it to an object supplied by the caller.
+///
+/// We currently support tracking 'min' and 'max' values for statistics. The 
other two
+/// statistical values in parquet.thrift, 'null_count' and 'distinct_count' 
are not
+/// tracked or populated.
+///
+/// Regarding the ordering of values, we follow the parquet-mr reference 
implementation.
+///
+/// Numeric values (BOOLEAN, INT, FLOAT, DOUBLE) are ordered by their numeric
+/// value (as opposed to their binary representation).
+///
+/// We currently don't write statistics for DECIMAL values and character array 
values
+/// (CHAR, VARCHAR, STRING) due to several issues with parquet-mr and 
subsequently, Hive
+/// (PARQUET-251, PARQUET-686). For those types, the Update() method is empty, 
so that the
+/// stats are not tracked.
+///
+/// NULL values are not considered for min/max statistics, and if a column 
consists only
+/// of NULL values, then no min/max statistics are written.
+///
+/// Updating the statistics is handled in derived classes to alleviate the 
need for
+/// virtual function calls.
+///
+/// TODO: Populate null_count and distinct_count.
+class ColumnStatsBase {
+ public:
+  ColumnStatsBase() : has_values_(false) {}
+  virtual ~ColumnStatsBase() {}
+
+  /// Merges this statistics object with values from 'other'. If other has not 
been
+  /// initialized, then this object will not be changed.
+  virtual void Merge(const ColumnStatsBase& other) = 0;
+
+  /// Returns the number of bytes needed to encode the current statistics into 
a
+  /// parquet::Statistics object.
+  virtual int64_t BytesNeeded() const = 0;
+
+  /// Encodes the current values into a Statistics thrift message.
+  virtual void EncodeToThrift(parquet::Statistics* out) const = 0;
+
+  /// Resets the state of this object.
+  void Reset() { has_values_ = false; }
+
+  bool has_values() const { return has_values_; }
+
+ protected:
+  /// Stores whether the current object has been initialized with a set of 
values.
+  bool has_values_;
+};
+
+/// This class contains the type-specific behavior to track statistics per 
column.
+template <typename T>
+class ColumnStats : public ColumnStatsBase {
+  // We explicitly require types to be listed here in order to support column 
statistics.
+  // When adding a type here, users of this class need to ensure that the 
statistics
+  // follow the ordering semantics of parquet's min/max statistics for the new 
type.
+  // Details on how the values should be ordered can be found in the 
'parquet-format'
+  // project in 'parquet.thrift' and 'LogicalTypes.md'.
+  using value_type = typename std::enable_if<
+      std::is_arithmetic<T>::value
+        || std::is_same<bool, T>::value
+        || std::is_same<StringValue, T>::value
+        || std::is_same<TimestampValue, T>::value
+        || std::is_same<Decimal4Value, T>::value
+        || std::is_same<Decimal8Value, T>::value
+        || std::is_same<Decimal16Value, T>::value,
+      T>::type;
+
+ public:
+  ColumnStats(int plain_encoded_value_size)
+    : ColumnStatsBase(), plain_encoded_value_size_(plain_encoded_value_size) {}
+
+  /// Updates the statistics based on the value 'v'. If necessary, initializes 
the
+  /// statistics.
+  void Update(const T& v) {
+    if (!has_values_) {
+      has_values_ = true;
+      min_value_ = v;
+      max_value_ = v;
+    } else {
+      min_value_ = min(min_value_, v);
+      max_value_ = max(max_value_, v);
+    }
+  }
+
+  virtual void Merge(const ColumnStatsBase& other) override {
+    DCHECK(dynamic_cast<const ColumnStats<T>*>(&other));
+    const ColumnStats<T>* cs = static_cast<const ColumnStats<T>*>(&other);
+    if (!cs->has_values_) return;
+    if (!has_values_) {
+      has_values_ = true;
+      min_value_ = cs->min_value_;
+      max_value_ = cs->max_value_;
+    } else {
+      min_value_ = min(min_value_, cs->min_value_);
+      max_value_ = max(max_value_, cs->max_value_);
+    }
+  }
+
+  virtual int64_t BytesNeeded() const override {
+    return BytesNeededInternal(min_value_) + BytesNeededInternal(max_value_);
+  }
+
+  virtual void EncodeToThrift(parquet::Statistics* out) const override {
+    DCHECK(has_values_);
+    string min_str;
+    EncodeValueToString(min_value_, &min_str);
+    out->__set_min(move(min_str));
+    string max_str;
+    EncodeValueToString(max_value_, &max_str);
+    out->__set_max(move(max_str));
+  }
+
+ protected:
+  /// Encodes a single value using parquet's PLAIN encoding and stores it into 
the
+  /// binary string 'out'.
+  void EncodeValueToString(const T& v, string* out) const {
+    int64_t bytes_needed = BytesNeededInternal(v);
+    out->resize(bytes_needed);
+    int64_t bytes_written = ParquetPlainEncoder::Encode(
+        reinterpret_cast<uint8_t*>(&(*out)[0]), bytes_needed, v);
+    DCHECK_EQ(bytes_needed, bytes_written);
+  }
+
+  /// Returns the number of bytes needed to encode value 'v'.
+  int64_t BytesNeededInternal(const T& v) const {
+    return plain_encoded_value_size_ < 0 ? ParquetPlainEncoder::ByteSize<T>(v) 
:
+        plain_encoded_value_size_;
+  }
+
+  // Size of each encoded value in plain encoding, -1 if the type is 
variable-length.
+  int plain_encoded_value_size_;
+
+  // Minimum value since the last call to Reset().
+  T min_value_;
+
+  // Maximum value since the last call to Reset().
+  T max_value_;
+};
+
+/// Plain encoding for Boolean values is not handled by the 
ParquetPlainEncoder and thus
+/// needs special handling here.
+template <>
+void ColumnStats<bool>::EncodeValueToString(const bool& v, string* out) const {
+  char c = v;
+  out->assign(1, c);
+}
+
+template <>
+int64_t ColumnStats<bool>::BytesNeededInternal(const bool& v) const {
+  return 1;
+}
+
+/// parquet-mr and subsequently Hive currently do not handle the following 
types
+/// correctly (PARQUET-251, PARQUET-686), so we disable support for them.
+/// The relevant Impala Jiras are for
+/// - StringValue    IMPALA-4817
+/// - TimestampValue IMPALA-4819
+/// - DecimalValue   IMPALA-4815
+template <>
+void ColumnStats<StringValue>::Update(const StringValue& v) {}
+
+template <>
+void ColumnStats<TimestampValue>::Update(const TimestampValue& v) {}
+
+template <>
+void ColumnStats<Decimal4Value>::Update(const Decimal4Value& v) {}
+
+template <>
+void ColumnStats<Decimal8Value>::Update(const Decimal8Value& v) {}
+
+template <>
+void ColumnStats<Decimal16Value>::Update(const Decimal16Value& v) {}
+
+} // end ns impala
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6251d8b4/be/src/exec/parquet-common.h
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-common.h b/be/src/exec/parquet-common.h
index d4ffa3d..ff82fed 100644
--- a/be/src/exec/parquet-common.h
+++ b/be/src/exec/parquet-common.h
@@ -90,7 +90,7 @@ class ParquetPlainEncoder {
 
   /// Returns the encoded size of values of type t. Returns -1 if it is 
variable
   /// length. This can be different than the slot size of the types.
-  static int ByteSize(const ColumnType& t) {
+  static int EncodedByteSize(const ColumnType& t) {
     switch (t.type) {
       case TYPE_STRING:
       case TYPE_VARCHAR:
@@ -185,20 +185,13 @@ class ParquetPlainEncoder {
     memcpy(v, buffer, byte_size);
     return byte_size;
   }
-
-  /// Encode 't', which must be in the machine endian, to FIXED_LEN_BYTE_ARRAY
-  /// of 'fixed_len_size'. The result is encoded as big endian.
-  template <typename T>
-  static int EncodeToFixedLenByteArray(uint8_t* buffer, int fixed_len_size, 
const T& t);
-
-  /// Decodes into v assuming buffer is encoded using FIXED_LEN_BYTE_ARRAY of
-  /// 'fixed_len_size'. The bytes in buffer must be big endian and the result 
stored in
-  /// v is the machine endian format. The caller is responsible for ensuring 
that
-  /// 'buffer' is at least 'fixed_len_size' bytes long.
-  template<typename T>
-  static int DecodeFromFixedLenByteArray(uint8_t* buffer, int fixed_len_size, 
T* v);
 };
 
+/// Calling this with arguments of type ColumnType is certainly a programmer 
error, so we
+/// disallow it.
+template <>
+int ParquetPlainEncoder::ByteSize(const ColumnType& t);
+
 /// Disable for bools. Plain encoding is not used for booleans.
 template<> int ParquetPlainEncoder::ByteSize(const bool& b);
 template<> int ParquetPlainEncoder::Encode(uint8_t*, int fixed_len_size, const 
bool&);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6251d8b4/be/src/runtime/string-value.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/string-value.h b/be/src/runtime/string-value.h
index cc905fe..3fce865 100644
--- a/be/src/runtime/string-value.h
+++ b/be/src/runtime/string-value.h
@@ -31,7 +31,7 @@ namespace impala {
 
 /// The format of a string-typed slot.
 /// The returned StringValue of all functions that return StringValue
-/// shares its buffer the parent.
+/// shares its buffer with the parent.
 /// TODO: rename this to be less confusing with impala_udf::StringVal.
 struct StringValue {
   /// The current limitation for a string instance is 1GB character data.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6251d8b4/be/src/util/dict-test.cc
----------------------------------------------------------------------
diff --git a/be/src/util/dict-test.cc b/be/src/util/dict-test.cc
index 39cba66..a14922c 100644
--- a/be/src/util/dict-test.cc
+++ b/be/src/util/dict-test.cc
@@ -95,7 +95,7 @@ TEST(DictTest, TestTimestamps) {
   values.push_back(tv1);
   values.push_back(tv1);
 
-  ValidateDict(values, 
ParquetPlainEncoder::ByteSize(ColumnType(TYPE_TIMESTAMP)));
+  ValidateDict(values, 
ParquetPlainEncoder::EncodedByteSize(ColumnType(TYPE_TIMESTAMP)));
 }
 
 template<typename T>
@@ -125,12 +125,12 @@ void TestNumbers(int value_byte_size) {
 }
 
 TEST(DictTest, TestNumbers) {
-  TestNumbers<int8_t>(ParquetPlainEncoder::ByteSize(ColumnType(TYPE_TINYINT)));
-  
TestNumbers<int16_t>(ParquetPlainEncoder::ByteSize(ColumnType(TYPE_SMALLINT)));
-  TestNumbers<int32_t>(ParquetPlainEncoder::ByteSize(ColumnType(TYPE_INT)));
-  TestNumbers<int64_t>(ParquetPlainEncoder::ByteSize(ColumnType(TYPE_BIGINT)));
-  TestNumbers<float>(ParquetPlainEncoder::ByteSize(ColumnType(TYPE_FLOAT)));
-  TestNumbers<double>(ParquetPlainEncoder::ByteSize(ColumnType(TYPE_DOUBLE)));
+  
TestNumbers<int8_t>(ParquetPlainEncoder::EncodedByteSize(ColumnType(TYPE_TINYINT)));
+  
TestNumbers<int16_t>(ParquetPlainEncoder::EncodedByteSize(ColumnType(TYPE_SMALLINT)));
+  
TestNumbers<int32_t>(ParquetPlainEncoder::EncodedByteSize(ColumnType(TYPE_INT)));
+  
TestNumbers<int64_t>(ParquetPlainEncoder::EncodedByteSize(ColumnType(TYPE_BIGINT)));
+  
TestNumbers<float>(ParquetPlainEncoder::EncodedByteSize(ColumnType(TYPE_FLOAT)));
+  
TestNumbers<double>(ParquetPlainEncoder::EncodedByteSize(ColumnType(TYPE_DOUBLE)));
 
   for (int i = 1; i <= 16; ++i) {
     if (i <= 4) TestNumbers<Decimal4Value>(i);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6251d8b4/tests/query_test/test_insert_parquet.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_insert_parquet.py 
b/tests/query_test/test_insert_parquet.py
index b7cb285..58a3d74 100644
--- a/tests/query_test/test_insert_parquet.py
+++ b/tests/query_test/test_insert_parquet.py
@@ -18,8 +18,8 @@
 # Targeted Impala insert tests
 
 import os
-import pytest
 
+from collections import namedtuple
 from shutil import rmtree
 from subprocess import check_call
 from tempfile import mkdtemp as make_tmp_dir
@@ -30,16 +30,35 @@ from tests.common.parametrize import UniqueDatabase
 from tests.common.skip import SkipIfIsilon, SkipIfLocal
 from tests.common.test_dimensions import create_exec_option_dimension
 from tests.common.test_vector import ImpalaTestDimension
-from tests.util.filesystem_utils import get_fs_path, WAREHOUSE
+from tests.util.filesystem_utils import get_fs_path
+from tests.util.get_parquet_metadata import get_parquet_metadata, 
decode_stats_value
 
 # TODO: Add Gzip back.  IMPALA-424
 PARQUET_CODECS = ['none', 'snappy']
 
+
+class RoundFloat():
+  """Class to compare floats after rounding them to a specified number of 
digits. This
+  can be used in scenarios where floating point precision is an issue.
+  """
+  def __init__(self, value, num_digits):
+    self.value = value
+    self.num_digits = num_digits
+
+  def __eq__(self, numeral):
+    """Compares this objects's value to a numeral after rounding it."""
+    return round(self.value, self.num_digits) == round(numeral, 
self.num_digits)
+
+ColumnStats = namedtuple('ColumnStats', ['name', 'min', 'max'])
+
 # Test a smaller parquet file size as well
 # TODO: these tests take a while so we don't want to go through too many sizes 
but
 # we should in more exhaustive testing
 PARQUET_FILE_SIZES = [0, 32 * 1024 * 1024]
+
+
 class TestInsertParquetQueries(ImpalaTestSuite):
+
   @classmethod
   def get_workload(self):
     return 'tpch'
@@ -57,14 +76,14 @@ class TestInsertParquetQueries(ImpalaTestSuite):
         sync_ddl=[1]))
 
     cls.ImpalaTestMatrix.add_dimension(
-        ImpalaTestDimension("compression_codec", *PARQUET_CODECS));
+        ImpalaTestDimension("compression_codec", *PARQUET_CODECS))
     cls.ImpalaTestMatrix.add_dimension(
-        ImpalaTestDimension("file_size", *PARQUET_FILE_SIZES));
+        ImpalaTestDimension("file_size", *PARQUET_FILE_SIZES))
 
-    cls.ImpalaTestMatrix.add_constraint(lambda v:\
-        v.get_value('table_format').file_format == 'parquet')
-    cls.ImpalaTestMatrix.add_constraint(lambda v:\
-        v.get_value('table_format').compression_codec == 'none')
+    cls.ImpalaTestMatrix.add_constraint(lambda v:
+                                        
v.get_value('table_format').file_format == 'parquet')
+    cls.ImpalaTestMatrix.add_constraint(lambda v:
+                                        
v.get_value('table_format').compression_codec == 'none')
 
   @SkipIfLocal.multiple_impalad
   @UniqueDatabase.parametrize(sync_ddl=True)
@@ -75,7 +94,9 @@ class TestInsertParquetQueries(ImpalaTestSuite):
         vector.get_value('compression_codec')
     self.run_test_case('insert_parquet', vector, unique_database, 
multiple_impalad=True)
 
+
 class TestInsertParquetInvalidCodec(ImpalaTestSuite):
+
   @classmethod
   def get_workload(self):
     return 'functional-query'
@@ -88,21 +109,22 @@ class TestInsertParquetInvalidCodec(ImpalaTestSuite):
         cluster_sizes=[0], disable_codegen_options=[False], batch_sizes=[0],
         sync_ddl=[1]))
     cls.ImpalaTestMatrix.add_dimension(
-        ImpalaTestDimension("compression_codec", 'bzip2'));
-    cls.ImpalaTestMatrix.add_constraint(lambda v:\
-        v.get_value('table_format').file_format == 'parquet')
-    cls.ImpalaTestMatrix.add_constraint(lambda v:\
-        v.get_value('table_format').compression_codec == 'none')
+        ImpalaTestDimension("compression_codec", 'bzip2'))
+    cls.ImpalaTestMatrix.add_constraint(lambda v:
+                                        
v.get_value('table_format').file_format == 'parquet')
+    cls.ImpalaTestMatrix.add_constraint(lambda v:
+                                        
v.get_value('table_format').compression_codec == 'none')
 
   @SkipIfLocal.multiple_impalad
   def test_insert_parquet_invalid_codec(self, vector):
     vector.get_value('exec_option')['COMPRESSION_CODEC'] = \
         vector.get_value('compression_codec')
-    self.run_test_case('QueryTest/insert_parquet_invalid_codec', vector,\
+    self.run_test_case('QueryTest/insert_parquet_invalid_codec', vector,
                        multiple_impalad=True)
 
 
 class TestInsertParquetVerifySize(ImpalaTestSuite):
+
   @classmethod
   def get_workload(self):
     return 'tpch'
@@ -114,12 +136,12 @@ class TestInsertParquetVerifySize(ImpalaTestSuite):
     cls.ImpalaTestMatrix.add_dimension(create_exec_option_dimension(
         cluster_sizes=[0], disable_codegen_options=[False], batch_sizes=[0],
         sync_ddl=[1]))
-    cls.ImpalaTestMatrix.add_constraint(lambda v:\
-        v.get_value('table_format').file_format == 'parquet')
-    cls.ImpalaTestMatrix.add_constraint(lambda v:\
-        v.get_value('table_format').compression_codec == 'none')
+    cls.ImpalaTestMatrix.add_constraint(lambda v:
+                                        
v.get_value('table_format').file_format == 'parquet')
+    cls.ImpalaTestMatrix.add_constraint(lambda v:
+                                        
v.get_value('table_format').compression_codec == 'none')
     cls.ImpalaTestMatrix.add_dimension(
-        ImpalaTestDimension("compression_codec", *PARQUET_CODECS));
+        ImpalaTestDimension("compression_codec", *PARQUET_CODECS))
 
   @SkipIfIsilon.hdfs_block_size
   @SkipIfLocal.hdfs_client
@@ -149,10 +171,12 @@ class TestInsertParquetVerifySize(ImpalaTestSuite):
       assert size < block_size, "File size greater than expected.\
           Expected: {0}, Got: {1}".format(block_size, size)
       if size < block_size * 0.80:
-        assert found_small_file == False
+        assert not found_small_file
         found_small_file = True
 
+
 class TestHdfsParquetTableWriter(ImpalaTestSuite):
+
   @classmethod
   def get_workload(cls):
     return 'functional-query'
@@ -171,12 +195,12 @@ class TestHdfsParquetTableWriter(ImpalaTestSuite):
     table_name = "test_hdfs_parquet_table_writer"
     qualified_table_name = "%s.%s" % (unique_database, table_name)
     self.execute_query("create table %s stored as parquet as select 
l_linenumber from "
-        "tpch_parquet.lineitem limit 180000" % qualified_table_name)
+                       "tpch_parquet.lineitem limit 180000" % 
qualified_table_name)
 
     tmp_dir = make_tmp_dir()
     try:
       hdfs_file = get_fs_path('/test-warehouse/%s.db/%s/*.parq'
-          % (unique_database, table_name))
+                              % (unique_database, table_name))
       check_call(['hdfs', 'dfs', '-copyToLocal', hdfs_file, tmp_dir])
 
       for root, subdirs, files in os.walk(tmp_dir):
@@ -184,7 +208,293 @@ class TestHdfsParquetTableWriter(ImpalaTestSuite):
           if not f.endswith('parq'):
             continue
           check_call([os.path.join(impalad_basedir, 'util/parquet-reader'), 
'--file',
-              os.path.join(tmp_dir, str(f))])
+                      os.path.join(tmp_dir, str(f))])
     finally:
       self.execute_query("drop table %s" % qualified_table_name)
       rmtree(tmp_dir)
+
+
+class TestHdfsParquetTableStatsWriter(ImpalaTestSuite):
+
+  @classmethod
+  def get_workload(cls):
+    return 'functional-query'
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestHdfsParquetTableStatsWriter, cls).add_test_dimensions()
+    cls.ImpalaTestMatrix.add_constraint(
+        lambda v: v.get_value('table_format').file_format == 'parquet')
+
+  def _decode_row_group_stats(self, schemas, row_group_stats):
+    """Decodes and return a list of statistics for a single row group."""
+    decoded = []
+    assert len(schemas) == len(row_group_stats)
+    for schema, stats in zip(schemas, row_group_stats):
+      if stats is None:
+        decoded.append(None)
+        continue
+
+      if stats.min is None and stats.max is None:
+        decoded.append(None)
+        continue
+
+      assert stats.min is not None and stats.max is not None
+      min_value = decode_stats_value(schema, stats.min)
+      max_value = decode_stats_value(schema, stats.max)
+      decoded.append(ColumnStats(schema.name, min_value, max_value))
+
+    assert len(decoded) == len(schemas)
+    return decoded
+
+  def _get_row_group_stats_from_file(self, parquet_file):
+    """Returns a list of statistics for each row group in file 'parquet_file'. 
The result
+    is a two-dimensional list, containing stats by row group and column."""
+    file_meta_data = get_parquet_metadata(parquet_file)
+    # We only support flat schemas, the additional element is the root element.
+    schemas = file_meta_data.schema[1:]
+    file_stats = []
+    for row_group in file_meta_data.row_groups:
+      num_columns = len(row_group.columns)
+      assert num_columns == len(schemas)
+      column_stats = [c.meta_data.statistics for c in row_group.columns]
+      file_stats.append(self._decode_row_group_stats(schemas, column_stats))
+
+    return file_stats
+
+  def _get_row_group_stats_from_hdfs_folder(self, hdfs_path):
+    """Returns a list of statistics for each row group in all parquet files in
+    'hdfs_path'. The result is a two-dimensional list, containing stats by row 
group and
+    column."""
+    row_group_stats = []
+
+    try:
+      tmp_dir = make_tmp_dir()
+      check_call(['hdfs', 'dfs', '-get', hdfs_path, tmp_dir])
+
+      for root, subdirs, files in os.walk(tmp_dir):
+        for f in files:
+          parquet_file = os.path.join(root, str(f))
+          
row_group_stats.extend(self._get_row_group_stats_from_file(parquet_file))
+
+    finally:
+      rmtree(tmp_dir)
+
+    return row_group_stats
+
+  def _validate_min_max_stats(self, hdfs_path, expected_values, skip_col_idxs 
= None):
+    """Validates that 'hdfs_path' contains exactly one parquet file and that 
the rowgroup
+    statistics in that file match the values in 'expected_values'. Columns 
indexed by
+    'skip_col_idx' are excluded from the verification of the expected values.
+    """
+    skip_col_idxs = skip_col_idxs or []
+    # The caller has to make sure that the table fits into a single row group. 
We enforce
+    # it here to make sure the results are predictable and independent of how 
the data
+    # could get written across multiple files.
+    row_group_stats = self._get_row_group_stats_from_hdfs_folder(hdfs_path)
+    assert(len(row_group_stats)) == 1
+    table_stats = row_group_stats[0]
+
+    num_columns = len(table_stats)
+    assert num_columns == len(expected_values)
+
+    for col_idx, stats, expected in zip(range(num_columns), table_stats, 
expected_values):
+      if col_idx in skip_col_idxs:
+        continue
+      if not expected:
+        assert not stats
+        continue
+      assert stats == expected
+
+  def _ctas_table_and_verify_stats(self, vector, unique_database, source_table,
+                                   expected_values, hive_skip_col_idx = None):
+    """Copies 'source_table' into a parquet table and makes sure that the row 
group
+    statistics in the resulting parquet file match those in 'expected_values'. 
The
+    comparison is performed against both Hive and Impala. For Hive, columns 
indexed by
+    'hive_skip_col_idx' are excluded from the verification of the expected 
values.
+    """
+    table_name = "test_hdfs_parquet_table_writer"
+    qualified_table_name = "{0}.{1}".format(unique_database, table_name)
+    hdfs_path = 
get_fs_path('/test-warehouse/{0}.db/{1}/'.format(unique_database,
+                                                                 table_name))
+
+    # Validate against Hive.
+    self.execute_query("drop table if exists {0}".format(qualified_table_name))
+    self.run_stmt_in_hive("create table {0} stored as parquet as select * from 
"
+                          "{1}".format(qualified_table_name, source_table))
+    self.execute_query("invalidate metadata {0}".format(qualified_table_name))
+    self._validate_min_max_stats(hdfs_path, expected_values, hive_skip_col_idx)
+
+    # Validate against Impala. Setting exec_single_node_rows_threshold and 
adding a limit
+    # clause ensures that the query is executed on the coordinator, resulting 
in a single
+    # parquet file being written.
+    num_rows = self.execute_scalar("select count(*) from 
{0}".format(source_table))
+    self.execute_query("drop table {0}".format(qualified_table_name))
+    query = ("create table {0} stored as parquet as select * from {1} limit "
+             "{2}").format(qualified_table_name, source_table, num_rows)
+    vector.get_value('exec_option')['EXEC_SINGLE_NODE_ROWS_THRESHOLD'] = 
num_rows
+    self.execute_query(query, vector.get_value('exec_option'))
+    self._validate_min_max_stats(hdfs_path, expected_values)
+
+  def test_write_statistics_alltypes(self, vector, unique_database):
+    """Test that writing a parquet file populates the rowgroup statistics with 
the correct
+    values.
+    """
+    # Expected values for functional.alltypes
+    expected_min_max_values = [
+        ColumnStats('id', 0, 7299),
+        ColumnStats('bool_col', False, True),
+        ColumnStats('tinyint_col', 0, 9),
+        ColumnStats('smallint_col', 0, 9),
+        ColumnStats('int_col', 0, 9),
+        ColumnStats('bigint_col', 0, 90),
+        ColumnStats('float_col', 0, RoundFloat(9.9, 1)),
+        ColumnStats('double_col', 0, RoundFloat(90.9, 1)),
+        None,
+        None,
+        None,
+        ColumnStats('year', 2009, 2010),
+        ColumnStats('month', 1, 12),
+    ]
+
+    # Skip comparison of unsupported columns types with Hive.
+    hive_skip_col_idx = [8, 9, 10]
+
+    self._ctas_table_and_verify_stats(vector, unique_database, 
"functional.alltypes",
+                                      expected_min_max_values, 
hive_skip_col_idx)
+
+  def test_write_statistics_decimal(self, vector, unique_database):
+    """Test that Impala does not write statistics for decimal columns."""
+    # Expected values for functional.decimal_tbl
+    expected_min_max_values = [None, None, None, None, None, None]
+
+    # Skip comparison of unsupported columns types with Hive.
+    hive_skip_col_idx = range(len(expected_min_max_values))
+
+    self._ctas_table_and_verify_stats(vector, unique_database, 
"functional.decimal_tbl",
+                                      expected_min_max_values, 
hive_skip_col_idx)
+
+  def test_write_statistics_multi_page(self, vector, unique_database):
+    """Test that writing a parquet file populates the rowgroup statistics with 
the correct
+    values. This test write a single parquet file with several pages per 
column.
+    """
+    # Expected values for tpch_parquet.customer
+    expected_min_max_values = [
+        ColumnStats('c_custkey', 1, 150000),
+        None,
+        None,
+        ColumnStats('c_nationkey', 0, 24),
+        None,
+        None,
+        None,
+        None,
+    ]
+
+    # Skip comparison of unsupported columns types with Hive.
+    hive_skip_col_idx = [1, 2, 4, 5, 6, 7]
+
+    self._ctas_table_and_verify_stats(vector, unique_database, 
"tpch_parquet.customer",
+                                      expected_min_max_values, 
hive_skip_col_idx)
+
+  def test_write_statistics_null(self, vector, unique_database):
+    """Test that we don't write min/max statistics for null columns."""
+    expected_min_max_values = [None, None, None, None, None, None, None]
+
+    # Skip comparison of unsupported columns types with Hive.
+    hive_skip_col_idx = range(len(expected_min_max_values))
+
+    self._ctas_table_and_verify_stats(vector, unique_database, 
"functional.nulltable",
+                                      expected_min_max_values, 
hive_skip_col_idx)
+
+  def test_write_statistics_char_types(self, vector, unique_database):
+    """Test that Impala does not write statistics for char columns."""
+    expected_min_max_values = [None, None, None]
+
+    # Skip comparison of unsupported columns types with Hive.
+    hive_skip_col_idx = range(len(expected_min_max_values))
+
+    self._ctas_table_and_verify_stats(vector, unique_database, 
"functional.chars_formats",
+                                      expected_min_max_values, 
hive_skip_col_idx)
+
+  def test_write_statistics_negative(self, vector, unique_database):
+    """Test that Impala correctly writes statistics for negative values."""
+    view_name = "test_negative_view"
+    qualified_view_name = "{0}.{1}".format(unique_database, view_name)
+
+    # Create a view to generate test data with negative values by negating 
every other
+    # row.
+    create_view_stmt = """create view {0} as select
+        id * cast(pow(-1, id % 2) as int) as id,
+        int_col * cast(pow(-1, id % 2) as int) as int_col,
+        bigint_col * cast(pow(-1, id % 2) as bigint) as bigint_col,
+        float_col * pow(-1, id % 2) as float_col,
+        double_col * pow(-1, id % 2) as double_col
+        from functional.alltypes""".format(qualified_view_name)
+    self.execute_query(create_view_stmt)
+
+    expected_min_max_values = [
+        ColumnStats('id', -7299, 7298),
+        ColumnStats('int_col', -9, 8),
+        ColumnStats('bigint_col', -90, 80),
+        ColumnStats('float_col', RoundFloat(-9.9, 1), RoundFloat(8.8, 1)),
+        ColumnStats('double_col', RoundFloat(-90.9, 1), RoundFloat(80.8, 1)),
+    ]
+
+    self._ctas_table_and_verify_stats(vector, unique_database, 
qualified_view_name,
+                                      expected_min_max_values)
+
+  def test_write_statistics_multiple_row_groups(self, vector, unique_database):
+    """Test that writing multiple row groups works as expected. This is done 
by inserting
+    into a table using the sortby() hint and then making sure that the min and 
max values
+    of row groups don't overlap."""
+    source_table = "tpch_parquet.orders"
+    target_table = "test_hdfs_parquet_table_writer"
+    qualified_target_table = "{0}.{1}".format(unique_database, target_table)
+    hdfs_path = get_fs_path("/test-warehouse/{0}.db/{1}/".format(
+        unique_database, target_table))
+
+    # Insert a large amount of data on a single backend with a limited parquet 
file size.
+    # This will result in several files being written, exercising code that 
tracks
+    # statistics for row groups.
+    num_rows = self.execute_scalar("select count(*) from 
{0}".format(source_table))
+    query = "create table {0} like {1} stored as 
parquet".format(qualified_target_table,
+                                                                 source_table)
+    self.execute_query(query, vector.get_value('exec_option'))
+    query = ("insert into {0} /* +sortby(o_orderkey) */ select * from {1} 
limit"
+             "{2}").format(qualified_target_table, source_table, num_rows)
+    vector.get_value('exec_option')['EXEC_SINGLE_NODE_ROWS_THRESHOLD'] = 
num_rows
+    vector.get_value('exec_option')['PARQUET_FILE_SIZE'] = 8 * 1024 * 1024
+    self.execute_query(query, vector.get_value('exec_option'))
+
+    # Get all stats for the o_orderkey column
+    row_group_stats = self._get_row_group_stats_from_hdfs_folder(hdfs_path)
+    assert len(row_group_stats) > 1
+    orderkey_stats = [s[0] for s in row_group_stats]
+
+    # Make sure that they don't overlap by ordering by the min value, then 
looking at
+    # boundaries.
+    orderkey_stats.sort(key = lambda s: s.min)
+    for l, r in zip(orderkey_stats, orderkey_stats[1:]):
+      assert l.max <= r.min
+
+  def test_write_statistics_float_infinity(self, vector, unique_database):
+    """Test that statistics for -inf and inf are written correctly."""
+    table_name = "test_float_infinity"
+    qualified_table_name = "{0}.{1}".format(unique_database, table_name)
+
+    create_table_stmt = "create table {0} (f float, d double);".format(
+        qualified_table_name)
+    self.execute_query(create_table_stmt)
+
+    insert_stmt = """insert into {0} values
+        (cast('-inf' as float), cast('-inf' as double)),
+        (cast('inf' as float), cast('inf' as 
double))""".format(qualified_table_name)
+    self.execute_query(insert_stmt)
+
+    expected_min_max_values = [
+        ColumnStats('f', float('-inf'), float('inf')),
+        ColumnStats('d', float('-inf'), float('inf')),
+    ]
+
+    self._ctas_table_and_verify_stats(vector, unique_database, 
qualified_table_name,
+                                      expected_min_max_values)

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6251d8b4/tests/util/get_parquet_metadata.py
----------------------------------------------------------------------
diff --git a/tests/util/get_parquet_metadata.py 
b/tests/util/get_parquet_metadata.py
index cb417a5..8cf0405 100644
--- a/tests/util/get_parquet_metadata.py
+++ b/tests/util/get_parquet_metadata.py
@@ -18,15 +18,110 @@
 import os
 import struct
 
-from parquet.ttypes import FileMetaData
+from datetime import date, datetime, time, timedelta
+from decimal import Decimal
+from parquet.ttypes import FileMetaData, Type
 from thrift.protocol import TCompactProtocol
 from thrift.transport import TTransport
 
 PARQUET_VERSION_NUMBER = 'PAR1'
 
-def parse_int(s):
-  """Reinterprets the string 's' as a 4-byte integer."""
-  return struct.unpack('i', s)[0]
+
+def julian_day_to_date(julian_day):
+  """Converts a julian day number into a Gregorian date. The reference date is 
picked
+  arbitrarily and can be validated with an online converter like
+  http://aa.usno.navy.mil/jdconverter?ID=AA&jd=2457755
+  """
+  return date(2017, 01, 01) + timedelta(julian_day - 2457755)
+
+
+def nanos_to_time(nanos):
+  """Converts nanoseconds to time of day."""
+  micros = nanos // 1000  # integer division
+  seconds, micros = divmod(micros, 10**6)
+  minutes, seconds = divmod(seconds, 60)
+  hours, minutes = divmod(minutes, 60)
+  return time(hours, minutes, seconds, micros)
+
+
+def parse_boolean(s):
+  """Parses a single boolean value from a single byte"""
+  return struct.unpack('<?', s)[0]
+
+
+def parse_int32(s):
+  """Reinterprets the string 's' as a signed 4-byte integer."""
+  return struct.unpack('<i', s)[0]
+
+
+def parse_int64(s):
+  """Reinterprets the string 's' as a signed 8-byte integer."""
+  return struct.unpack('<q', s)[0]
+
+
+def parse_float(s):
+  """Reinterprets the string 's' as an IEEE single precision float."""
+  return struct.unpack('<f', s)[0]
+
+
+def parse_double(s):
+  """Reinterprets the string 's' as an IEEE double precision float."""
+  return struct.unpack('<d', s)[0]
+
+
+def decode_timestamp(s):
+  """Reinterprets the string 's' as a 12-byte timestamp as written by Impala 
and decode it
+  into a datetime object."""
+  # Impala writes timestamps as 12-byte values. The first 8 byte store a
+  # boost::posix_time::time_duration, which is the time within the current day 
in
+  # nanoseconds stored as int64. The last 4 bytes store a 
boost::gregorian::date,
+  # which is the Julian date, stored as utin32.
+  day_nanos, julian_day = struct.unpack('<qI', s)
+  return datetime.combine(julian_day_to_date(julian_day), 
nanos_to_time(day_nanos))
+
+
+def decode_decimal(schema, value):
+  """Decodes 'value' into a decimal by interpreting its contents according to 
'schema'."""
+  assert len(value) > 0
+  assert schema.type_length == len(value)
+  assert schema.type == Type.FIXED_LEN_BYTE_ARRAY
+
+  numeric = Decimal(reduce(lambda x, y: x * 256 + y, map(ord, value)))
+
+  # Compute two's complement for negative values.
+  if (ord(value[0]) > 127):
+    bit_width = 8 * len(value)
+    numeric = numeric - (2 ** bit_width)
+
+  return numeric / 10 ** schema.scale
+
+
+def decode_stats_value(schema, value):
+  """Decodes 'value' according to 'schema. It expects 'value' to be plain 
encoded. For
+  BOOLEAN values, only the least significant bit is parsed and returned. 
Binary arrays are
+  expected to be stored as such, without a preceding length."""
+  column_type = schema.type
+  if column_type == Type.BOOLEAN:
+    return parse_boolean(value)
+  elif column_type == Type.INT32:
+    return parse_int32(value)
+  elif column_type == Type.INT64:
+    return parse_int64(value)
+  elif column_type == Type.INT96:
+    # Impala uses INT96 to store timestamps
+    return decode_timestamp(value)
+  elif column_type == Type.FLOAT:
+    return parse_float(value)
+  elif column_type == Type.DOUBLE:
+    return parse_double(value)
+  elif column_type == Type.BYTE_ARRAY:
+    # In parquet::Statistics, strings are stored as is.
+    return value
+  elif column_type == Type.FIXED_LEN_BYTE_ARRAY:
+    return decode_decimal(schema, value)
+  assert False
+  return None
+
 
 def get_parquet_metadata(filename):
   """Returns a FileMetaData as defined in parquet.thrift. 'filename' must be a 
local
@@ -43,7 +138,7 @@ def get_parquet_metadata(filename):
 
     # Read metadata length
     f.seek(file_size - len(PARQUET_VERSION_NUMBER) - 4)
-    metadata_len = parse_int(f.read(4))
+    metadata_len = parse_int32(f.read(4))
 
     # Read metadata
     f.seek(file_size - len(PARQUET_VERSION_NUMBER) - 4 - metadata_len)

Reply via email to