IMPALA-3909: Populate min/max statistics in Parquet writer Change-Id: I8368ee58daa50c07a3b8ef65be70203eb941f619 Reviewed-on: http://gerrit.cloudera.org:8080/5611 Reviewed-by: Lars Volker <l...@cloudera.com> Tested-by: Impala Public Jenkins Reviewed-by: Tim Armstrong <tarmstr...@cloudera.com>
Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/6251d8b4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/6251d8b4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/6251d8b4 Branch: refs/heads/master Commit: 6251d8b4ddac3bdd6fb651f000aea15b7a0d1603 Parents: a5b7689 Author: Lars Volker <l...@cloudera.com> Authored: Mon Dec 5 19:18:51 2016 +0100 Committer: Tim Armstrong <tarmstr...@cloudera.com> Committed: Thu Feb 2 06:44:48 2017 +0000 ---------------------------------------------------------------------- be/src/exec/hdfs-parquet-table-writer.cc | 209 ++++++++++----- be/src/exec/hdfs-parquet-table-writer.h | 5 + be/src/exec/parquet-column-stats.h | 201 +++++++++++++++ be/src/exec/parquet-common.h | 19 +- be/src/runtime/string-value.h | 2 +- be/src/util/dict-test.cc | 14 +- tests/query_test/test_insert_parquet.py | 356 ++++++++++++++++++++++++-- tests/util/get_parquet_metadata.py | 105 +++++++- 8 files changed, 792 insertions(+), 119 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6251d8b4/be/src/exec/hdfs-parquet-table-writer.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-parquet-table-writer.cc b/be/src/exec/hdfs-parquet-table-writer.cc index 7ae55b7..9e9cb3e 100644 --- a/be/src/exec/hdfs-parquet-table-writer.cc +++ b/be/src/exec/hdfs-parquet-table-writer.cc @@ -18,8 +18,10 @@ #include "exec/hdfs-parquet-table-writer.h" #include "common/version.h" -#include "exprs/expr.h" +#include "exec/parquet-column-stats.h" #include "exprs/expr-context.h" +#include "exprs/expr.h" +#include "rpc/thrift-util.h" #include "runtime/decimal-value.h" #include "runtime/mem-tracker.h" #include "runtime/raw-value.h" @@ -34,7 +36,6 @@ #include "util/dict-encoding.h" #include "util/hdfs-util.h" #include "util/rle-encoding.h" -#include "rpc/thrift-util.h" #include <sstream> @@ -88,14 +89,20 @@ class HdfsParquetTableWriter::BaseColumnWriter { // expr - the expression to generate output values for this column. BaseColumnWriter(HdfsParquetTableWriter* parent, ExprContext* expr_ctx, const THdfsCompression::type& codec) - : parent_(parent), expr_ctx_(expr_ctx), codec_(codec), - page_size_(DEFAULT_DATA_PAGE_SIZE), current_page_(NULL), num_values_(0), + : parent_(parent), + expr_ctx_(expr_ctx), + codec_(codec), + page_size_(DEFAULT_DATA_PAGE_SIZE), + current_page_(nullptr), + num_values_(0), total_compressed_byte_size_(0), total_uncompressed_byte_size_(0), - dict_encoder_base_(NULL), - def_levels_(NULL), - values_buffer_len_(DEFAULT_DATA_PAGE_SIZE) { - Codec::CreateCompressor(NULL, false, codec, &compressor_); + dict_encoder_base_(nullptr), + def_levels_(nullptr), + values_buffer_len_(DEFAULT_DATA_PAGE_SIZE), + page_stats_base_(nullptr), + row_group_stats_base_(nullptr) { + Codec::CreateCompressor(nullptr, false, codec, &compressor_); def_levels_ = parent_->state_->obj_pool()->Add( new RleEncoder(parent_->reusable_col_mem_pool_->Allocate(DEFAULT_DATA_PAGE_SIZE), @@ -122,13 +129,24 @@ class HdfsParquetTableWriter::BaseColumnWriter { Status Flush(int64_t* file_pos, int64_t* first_data_page, int64_t* first_dictionary_page); + // Encodes the row group statistics into a parquet::Statistics object and attaches it to + // 'meta_data'. + void EncodeRowGroupStats(ColumnMetaData* meta_data) { + DCHECK(row_group_stats_base_ != nullptr); + if (row_group_stats_base_->has_values() + && row_group_stats_base_->BytesNeeded() <= MAX_COLUMN_STATS_SIZE) { + row_group_stats_base_->EncodeToThrift(&meta_data->statistics); + meta_data->__isset.statistics = true; + } + } + // Resets all the data accumulated for this column. Memory can now be reused for // the next row group // Any data for previous row groups must be reset (e.g. dictionaries). // Subclasses must call this if they override this function. virtual void Reset() { num_data_pages_ = 0; - current_page_ = NULL; + current_page_ = nullptr; num_values_ = 0; total_compressed_byte_size_ = 0; current_encoding_ = Encoding::PLAIN; @@ -137,8 +155,8 @@ class HdfsParquetTableWriter::BaseColumnWriter { // Close this writer. This is only called after Flush() and no more rows will // be added. void Close() { - if (compressor_.get() != NULL) compressor_->Close(); - if (dict_encoder_base_ != NULL) dict_encoder_base_->ClearIndices(); + if (compressor_.get() != nullptr) compressor_->Close(); + if (dict_encoder_base_ != nullptr) dict_encoder_base_->ClearIndices(); } const ColumnType& type() const { return expr_ctx_->root()->type(); } @@ -152,13 +170,14 @@ class HdfsParquetTableWriter::BaseColumnWriter { protected: friend class HdfsParquetTableWriter; - // Encode value into the current page output buffer. Returns true if the value fits - // on the current page. If this function returned false, the caller should create a - // new page and try again with the same value. + // Encodes value into the current page output buffer and updates the column statistics + // aggregates. Returns true if the value fits on the current page. If this function + // returned false, the caller should create a new page and try again with the same + // value. // *bytes_needed will contain the (estimated) number of bytes needed to successfully // encode the value in the page. // Implemented in the subclass. - virtual bool EncodeValue(void* value, int64_t* bytes_needed) = 0; + virtual bool ProcessValue(void* value, int64_t* bytes_needed) = 0; // Encodes out all data for the current page and updates the metadata. virtual void FinalizeCurrentPage(); @@ -194,7 +213,8 @@ class HdfsParquetTableWriter::BaseColumnWriter { THdfsCompression::type codec_; - // Compression codec for this column. If NULL, this column is will not be compressed. + // Compression codec for this column. If nullptr, this column is will not be + // compressed. scoped_ptr<Codec> compressor_; vector<DataPage> pages_; @@ -210,25 +230,31 @@ class HdfsParquetTableWriter::BaseColumnWriter { // TODO: Consider removing and only creating a single large page as necessary. int64_t page_size_; + // Pointer to the current page in 'pages_'. Not owned. DataPage* current_page_; - int64_t num_values_; // Total number of values across all pages, including NULLs. + + int64_t num_values_; // Total number of values across all pages, including nullptr. int64_t total_compressed_byte_size_; int64_t total_uncompressed_byte_size_; Encoding::type current_encoding_; - // Created and set by the base class. + // Created, owned, and set by the derived class. DictEncoderBase* dict_encoder_base_; - // Rle encoder object for storing definition levels. For non-nested schemas, - // this always uses 1 bit per row. - // This is reused across pages since the underlying buffer is copied out when - // the page is finalized. + // Rle encoder object for storing definition levels, owned by instances of this class. + // For non-nested schemas, this always uses 1 bit per row. This is reused across pages + // since the underlying buffer is copied out when the page is finalized. RleEncoder* def_levels_; - // Data for buffered values. This is reused across pages. + // Data for buffered values. This is owned by instances of this class and gets reused + // across pages. uint8_t* values_buffer_; // The size of values_buffer_. int values_buffer_len_; + + // Pointers to statistics, created, owned, and set by the derived class. + ColumnStatsBase* page_stats_base_; + ColumnStatsBase* row_group_stats_base_; }; // Per type column writer. @@ -237,10 +263,16 @@ class HdfsParquetTableWriter::ColumnWriter : public HdfsParquetTableWriter::BaseColumnWriter { public: ColumnWriter(HdfsParquetTableWriter* parent, ExprContext* ctx, - const THdfsCompression::type& codec) : BaseColumnWriter(parent, ctx, codec), - num_values_since_dict_size_check_(0) { + const THdfsCompression::type& codec) + : BaseColumnWriter(parent, ctx, codec), + num_values_since_dict_size_check_(0), + plain_encoded_value_size_( + ParquetPlainEncoder::EncodedByteSize(ctx->root()->type())), + page_stats_(plain_encoded_value_size_), + row_group_stats_(plain_encoded_value_size_) { DCHECK_NE(ctx->root()->type().type, TYPE_BOOLEAN); - encoded_value_size_ = ParquetPlainEncoder::ByteSize(ctx->root()->type()); + page_stats_base_ = &page_stats_; + row_group_stats_base_ = &row_group_stats_; } virtual void Reset() { @@ -249,12 +281,14 @@ class HdfsParquetTableWriter::ColumnWriter : // it will fall back to plain. current_encoding_ = Encoding::PLAIN_DICTIONARY; dict_encoder_.reset( - new DictEncoder<T>(parent_->per_file_mem_pool_.get(), encoded_value_size_)); + new DictEncoder<T>(parent_->per_file_mem_pool_.get(), plain_encoded_value_size_)); dict_encoder_base_ = dict_encoder_.get(); + page_stats_.Reset(); + row_group_stats_.Reset(); } protected: - virtual bool EncodeValue(void* value, int64_t* bytes_needed) { + virtual bool ProcessValue(void* value, int64_t* bytes_needed) { if (current_encoding_ == Encoding::PLAIN_DICTIONARY) { if (UNLIKELY(num_values_since_dict_size_check_ >= DICTIONARY_DATA_PAGE_SIZE_CHECK_PERIOD)) { @@ -273,20 +307,22 @@ class HdfsParquetTableWriter::ColumnWriter : parent_->file_size_estimate_ += *bytes_needed; } else if (current_encoding_ == Encoding::PLAIN) { T* v = CastValue(value); - *bytes_needed = encoded_value_size_ < 0 ? - ParquetPlainEncoder::ByteSize<T>(*v) : encoded_value_size_; + *bytes_needed = plain_encoded_value_size_ < 0 ? + ParquetPlainEncoder::ByteSize<T>(*v) : + plain_encoded_value_size_; if (current_page_->header.uncompressed_page_size + *bytes_needed > page_size_) { return false; } uint8_t* dst_ptr = values_buffer_ + current_page_->header.uncompressed_page_size; int64_t written_len = - ParquetPlainEncoder::Encode(dst_ptr, encoded_value_size_, *v); + ParquetPlainEncoder::Encode(dst_ptr, plain_encoded_value_size_, *v); DCHECK_EQ(*bytes_needed, written_len); current_page_->header.uncompressed_page_size += written_len; } else { // TODO: support other encodings here DCHECK(false); } + page_stats_.Update(*CastValue(value)); return true; } @@ -306,12 +342,18 @@ class HdfsParquetTableWriter::ColumnWriter : // The number of values added since we last checked the dictionary. int num_values_since_dict_size_check_; - // Size of each encoded value. -1 if the size is type is variable-length. - int64_t encoded_value_size_; + // Size of each encoded value in plain encoding. -1 if the type is variable-length. + int64_t plain_encoded_value_size_; // Temporary string value to hold CHAR(N) StringValue temp_; + // Tracks statistics per page. + ColumnStats<T> page_stats_; + + // Tracks statistics per row group. This gets reset when starting a new row group. + ColumnStats<T> row_group_stats_; + // Converts a slot pointer to a raw value suitable for encoding inline T* CastValue(void* value) { return reinterpret_cast<T*>(value); @@ -334,23 +376,30 @@ class HdfsParquetTableWriter::BoolColumnWriter : public HdfsParquetTableWriter::BaseColumnWriter { public: BoolColumnWriter(HdfsParquetTableWriter* parent, ExprContext* ctx, - const THdfsCompression::type& codec) : BaseColumnWriter(parent, ctx, codec) { + const THdfsCompression::type& codec) + : BaseColumnWriter(parent, ctx, codec), page_stats_(-1), row_group_stats_(-1) { DCHECK_EQ(ctx->root()->type().type, TYPE_BOOLEAN); bool_values_ = parent_->state_->obj_pool()->Add( new BitWriter(values_buffer_, values_buffer_len_)); // Dictionary encoding doesn't make sense for bools and is not allowed by // the format. current_encoding_ = Encoding::PLAIN; - dict_encoder_base_ = NULL; + dict_encoder_base_ = nullptr; + + page_stats_base_ = &page_stats_; + row_group_stats_base_ = &row_group_stats_; } protected: - virtual bool EncodeValue(void* value, int64_t* bytes_needed) { - return bool_values_->PutValue(*reinterpret_cast<bool*>(value), 1); + virtual bool ProcessValue(void* value, int64_t* bytes_needed) { + bool v = *reinterpret_cast<bool*>(value); + if (!bool_values_->PutValue(v, 1)) return false; + page_stats_.Update(v); + return true; } virtual void FinalizeCurrentPage() { - DCHECK(current_page_ != NULL); + DCHECK(current_page_ != nullptr); if (current_page_->finalized) return; bool_values_->Flush(); int num_bytes = bool_values_->bytes_written(); @@ -363,6 +412,12 @@ class HdfsParquetTableWriter::BoolColumnWriter : private: // Used to encode bools as single bit values. This is reused across pages. BitWriter* bool_values_; + + // Tracks statistics per page. + ColumnStats<bool> page_stats_; + + // Tracks statistics per row group. This gets reset when starting a new file. + ColumnStats<bool> row_group_stats_; }; } @@ -370,7 +425,7 @@ class HdfsParquetTableWriter::BoolColumnWriter : inline Status HdfsParquetTableWriter::BaseColumnWriter::AppendRow(TupleRow* row) { ++num_values_; void* value = expr_ctx_->GetValue(row); - if (current_page_ == NULL) NewPage(); + if (current_page_ == nullptr) NewPage(); // Ensure that we have enough space for the definition level, but don't write it yet in // case we don't have enough space for the value. @@ -386,10 +441,10 @@ inline Status HdfsParquetTableWriter::BaseColumnWriter::AppendRow(TupleRow* row) // this won't loop forever. while (true) { // Nulls don't get encoded. - if (value == NULL) break; + if (value == nullptr) break; int64_t bytes_needed = 0; - if (EncodeValue(value, &bytes_needed)) { + if (ProcessValue(value, &bytes_needed)) { ++current_page_->num_non_null; break; } @@ -416,7 +471,7 @@ inline Status HdfsParquetTableWriter::BaseColumnWriter::AppendRow(TupleRow* row) } // Now that the value has been successfully written, write the definition level. - bool ret = def_levels_->Put(value != NULL); + bool ret = def_levels_->Put(value != nullptr); // Writing the def level will succeed because we ensured there was enough space for it // above, and new pages will always have space for at least a single def level. DCHECK(ret); @@ -426,7 +481,7 @@ inline Status HdfsParquetTableWriter::BaseColumnWriter::AppendRow(TupleRow* row) } inline void HdfsParquetTableWriter::BaseColumnWriter::WriteDictDataPage() { - DCHECK(dict_encoder_base_ != NULL); + DCHECK(dict_encoder_base_ != nullptr); DCHECK_EQ(current_page_->header.uncompressed_page_size, 0); if (current_page_->num_non_null == 0) return; int len = dict_encoder_base_->WriteData(values_buffer_, values_buffer_len_); @@ -443,7 +498,7 @@ inline void HdfsParquetTableWriter::BaseColumnWriter::WriteDictDataPage() { Status HdfsParquetTableWriter::BaseColumnWriter::Flush(int64_t* file_pos, int64_t* first_data_page, int64_t* first_dictionary_page) { - if (current_page_ == NULL) { + if (current_page_ == nullptr) { // This column/file is empty *first_data_page = *file_pos; *first_dictionary_page = -1; @@ -454,7 +509,7 @@ Status HdfsParquetTableWriter::BaseColumnWriter::Flush(int64_t* file_pos, *first_dictionary_page = -1; // First write the dictionary page before any of the data pages. - if (dict_encoder_base_ != NULL) { + if (dict_encoder_base_ != nullptr) { *first_dictionary_page = *file_pos; // Write dictionary page header DictionaryPageHeader dict_header; @@ -470,7 +525,7 @@ Status HdfsParquetTableWriter::BaseColumnWriter::Flush(int64_t* file_pos, uint8_t* dict_buffer = parent_->per_file_mem_pool_->Allocate( header.uncompressed_page_size); dict_encoder_base_->WriteDict(dict_buffer); - if (compressor_.get() != NULL) { + if (compressor_.get() != nullptr) { SCOPED_TIMER(parent_->parent_->compress_timer()); int64_t max_compressed_size = compressor_->MaxOutputLen(header.uncompressed_page_size); @@ -515,7 +570,7 @@ Status HdfsParquetTableWriter::BaseColumnWriter::Flush(int64_t* file_pos, } // Write data page header - uint8_t* buffer = NULL; + uint8_t* buffer = nullptr; uint32_t len = 0; RETURN_IF_ERROR( parent_->thrift_serializer_->Serialize(&page.header, &len, &buffer)); @@ -530,10 +585,10 @@ Status HdfsParquetTableWriter::BaseColumnWriter::Flush(int64_t* file_pos, } void HdfsParquetTableWriter::BaseColumnWriter::FinalizeCurrentPage() { - DCHECK(current_page_ != NULL); + DCHECK(current_page_ != nullptr); if (current_page_->finalized) return; - // If the entire page was NULL, encode it as PLAIN since there is no + // If the entire page was nullptr, encode it as PLAIN since there is no // data anyway. We don't output a useless dictionary page and it works // around a parquet MR bug (see IMPALA-759 for more details). if (current_page_->num_non_null == 0) current_encoding_ = Encoding::PLAIN; @@ -549,8 +604,8 @@ void HdfsParquetTableWriter::BaseColumnWriter::FinalizeCurrentPage() { header.uncompressed_page_size += current_page_->num_def_bytes; // At this point we know all the data for the data page. Combine them into one buffer. - uint8_t* uncompressed_data = NULL; - if (compressor_.get() == NULL) { + uint8_t* uncompressed_data = nullptr; + if (compressor_.get() == nullptr) { uncompressed_data = parent_->per_file_mem_pool_->Allocate(header.uncompressed_page_size); } else { @@ -571,8 +626,8 @@ void HdfsParquetTableWriter::BaseColumnWriter::FinalizeCurrentPage() { buffer.Append(values_buffer_, buffer.capacity() - buffer.size()); // Apply compression if necessary - if (compressor_.get() == NULL) { - current_page_->data = reinterpret_cast<uint8_t*>(uncompressed_data); + if (compressor_.get() == nullptr) { + current_page_->data = uncompressed_data; header.compressed_page_size = header.uncompressed_page_size; } else { SCOPED_TIMER(parent_->parent_->compress_timer()); @@ -591,6 +646,18 @@ void HdfsParquetTableWriter::BaseColumnWriter::FinalizeCurrentPage() { max_compressed_size - header.compressed_page_size); } + // Build page statistics and add them to the header. + DCHECK(page_stats_base_ != nullptr); + if (page_stats_base_->has_values() + && page_stats_base_->BytesNeeded() <= MAX_COLUMN_STATS_SIZE) { + page_stats_base_->EncodeToThrift(&header.data_page_header.statistics); + header.data_page_header.__isset.statistics = true; + } + + // Update row group statistics from page statistics. + DCHECK(row_group_stats_base_ != nullptr); + row_group_stats_base_->Merge(*page_stats_base_); + // Add the size of the data page header uint8_t* header_buffer; uint32_t header_len = 0; @@ -623,21 +690,20 @@ void HdfsParquetTableWriter::BaseColumnWriter::NewPage() { } current_page_->finalized = false; current_page_->num_non_null = 0; + page_stats_base_->Reset(); } HdfsParquetTableWriter::HdfsParquetTableWriter(HdfsTableSink* parent, RuntimeState* state, OutputPartition* output, const HdfsPartitionDescriptor* part_desc, const HdfsTableDescriptor* table_desc, const vector<ExprContext*>& output_expr_ctxs) - : HdfsTableWriter( - parent, state, output, part_desc, table_desc, output_expr_ctxs), - thrift_serializer_(new ThriftSerializer(true)), - current_row_group_(NULL), - row_count_(0), - file_size_limit_(0), - reusable_col_mem_pool_(new MemPool(parent_->mem_tracker())), - per_file_mem_pool_(new MemPool(parent_->mem_tracker())), - row_idx_(0) { -} + : HdfsTableWriter(parent, state, output, part_desc, table_desc, output_expr_ctxs), + thrift_serializer_(new ThriftSerializer(true)), + current_row_group_(nullptr), + row_count_(0), + file_size_limit_(0), + reusable_col_mem_pool_(new MemPool(parent_->mem_tracker())), + per_file_mem_pool_(new MemPool(parent_->mem_tracker())), + row_idx_(0) {} HdfsParquetTableWriter::~HdfsParquetTableWriter() { } @@ -671,7 +737,7 @@ Status HdfsParquetTableWriter::Init() { columns_.resize(table_desc_->num_cols() - table_desc_->num_clustering_cols()); // Initialize each column structure. for (int i = 0; i < columns_.size(); ++i) { - BaseColumnWriter* writer = NULL; + BaseColumnWriter* writer = nullptr; const ColumnType& type = output_expr_ctxs_[i]->root()->type(); switch (type.type) { case TYPE_BOOLEAN: @@ -776,7 +842,7 @@ Status HdfsParquetTableWriter::CreateSchema() { } Status HdfsParquetTableWriter::AddRowGroup() { - if (current_row_group_ != NULL) RETURN_IF_ERROR(FlushCurrentRowGroup()); + if (current_row_group_ != nullptr) RETURN_IF_ERROR(FlushCurrentRowGroup()); file_metadata_.row_groups.push_back(RowGroup()); current_row_group_ = &file_metadata_.row_groups[file_metadata_.row_groups.size() - 1]; @@ -825,7 +891,7 @@ uint64_t HdfsParquetTableWriter::default_block_size() const { } Status HdfsParquetTableWriter::InitNewFile() { - DCHECK(current_row_group_ == NULL); + DCHECK(current_row_group_ == nullptr); per_file_mem_pool_->Clear(); @@ -939,7 +1005,7 @@ Status HdfsParquetTableWriter::WriteFileHeader() { } Status HdfsParquetTableWriter::FlushCurrentRowGroup() { - if (current_row_group_ == NULL) return Status::OK(); + if (current_row_group_ == nullptr) return Status::OK(); int num_clustering_cols = table_desc_->num_clustering_cols(); for (int i = 0; i < columns_.size(); ++i) { @@ -965,6 +1031,9 @@ Status HdfsParquetTableWriter::FlushCurrentRowGroup() { const string& col_name = table_desc_->col_descs()[i + num_clustering_cols].name(); parquet_stats_.per_column_size[col_name] += columns_[i]->total_compressed_size(); + // Build column statistics and add them to the header. + columns_[i]->EncodeRowGroupStats(¤t_row_group_->columns[i].meta_data); + // Since we don't supported complex schemas, all columns should have the same // number of values. DCHECK_EQ(current_row_group_->columns[0].meta_data.num_values, @@ -973,7 +1042,7 @@ Status HdfsParquetTableWriter::FlushCurrentRowGroup() { // Metadata for this column is complete, write it out to file. The column metadata // goes at the end so that when we have collocated files, the column data can be // written without buffering. - uint8_t* buffer = NULL; + uint8_t* buffer = nullptr; uint32_t len = 0; RETURN_IF_ERROR( thrift_serializer_->Serialize(¤t_row_group_->columns[i], &len, &buffer)); @@ -983,14 +1052,14 @@ Status HdfsParquetTableWriter::FlushCurrentRowGroup() { columns_[i]->Reset(); } - current_row_group_ = NULL; + current_row_group_ = nullptr; return Status::OK(); } Status HdfsParquetTableWriter::WriteFileFooter() { // Write file_meta_data uint32_t file_metadata_len = 0; - uint8_t* buffer = NULL; + uint8_t* buffer = nullptr; RETURN_IF_ERROR( thrift_serializer_->Serialize(&file_metadata_, &file_metadata_len, &buffer)); RETURN_IF_ERROR(Write(buffer, file_metadata_len)); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6251d8b4/be/src/exec/hdfs-parquet-table-writer.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-parquet-table-writer.h b/be/src/exec/hdfs-parquet-table-writer.h index 4e16707..94ad932 100644 --- a/be/src/exec/hdfs-parquet-table-writer.h +++ b/be/src/exec/hdfs-parquet-table-writer.h @@ -100,6 +100,11 @@ class HdfsParquetTableWriter : public HdfsTableWriter { /// Minimum file size. If the configured size is less, fail. static const int HDFS_MIN_FILE_SIZE = 8 * 1024 * 1024; + /// Maximum statistics size. If the size of a single thrift parquet::Statistics struct + /// for a page or row group exceed this value, we'll not write it. We use the same value + /// as 'parquet-mr'. + static const int MAX_COLUMN_STATS_SIZE = 4 * 1024; + /// Per-column information state. This contains some metadata as well as the /// data buffers. class BaseColumnWriter; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6251d8b4/be/src/exec/parquet-column-stats.h ---------------------------------------------------------------------- diff --git a/be/src/exec/parquet-column-stats.h b/be/src/exec/parquet-column-stats.h new file mode 100644 index 0000000..8bcf4de --- /dev/null +++ b/be/src/exec/parquet-column-stats.h @@ -0,0 +1,201 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef IMPALA_EXEC_PARQUET_COLUMN_STATS_H +#define IMPALA_EXEC_PARQUET_COLUMN_STATS_H + +#include <type_traits> + +namespace impala { + +/// This class, together with its derivatives, is used to track column statistics when +/// writing parquet files. It provides an interface to populate a parquet::Statistics +/// object and attach it to an object supplied by the caller. +/// +/// We currently support tracking 'min' and 'max' values for statistics. The other two +/// statistical values in parquet.thrift, 'null_count' and 'distinct_count' are not +/// tracked or populated. +/// +/// Regarding the ordering of values, we follow the parquet-mr reference implementation. +/// +/// Numeric values (BOOLEAN, INT, FLOAT, DOUBLE) are ordered by their numeric +/// value (as opposed to their binary representation). +/// +/// We currently don't write statistics for DECIMAL values and character array values +/// (CHAR, VARCHAR, STRING) due to several issues with parquet-mr and subsequently, Hive +/// (PARQUET-251, PARQUET-686). For those types, the Update() method is empty, so that the +/// stats are not tracked. +/// +/// NULL values are not considered for min/max statistics, and if a column consists only +/// of NULL values, then no min/max statistics are written. +/// +/// Updating the statistics is handled in derived classes to alleviate the need for +/// virtual function calls. +/// +/// TODO: Populate null_count and distinct_count. +class ColumnStatsBase { + public: + ColumnStatsBase() : has_values_(false) {} + virtual ~ColumnStatsBase() {} + + /// Merges this statistics object with values from 'other'. If other has not been + /// initialized, then this object will not be changed. + virtual void Merge(const ColumnStatsBase& other) = 0; + + /// Returns the number of bytes needed to encode the current statistics into a + /// parquet::Statistics object. + virtual int64_t BytesNeeded() const = 0; + + /// Encodes the current values into a Statistics thrift message. + virtual void EncodeToThrift(parquet::Statistics* out) const = 0; + + /// Resets the state of this object. + void Reset() { has_values_ = false; } + + bool has_values() const { return has_values_; } + + protected: + /// Stores whether the current object has been initialized with a set of values. + bool has_values_; +}; + +/// This class contains the type-specific behavior to track statistics per column. +template <typename T> +class ColumnStats : public ColumnStatsBase { + // We explicitly require types to be listed here in order to support column statistics. + // When adding a type here, users of this class need to ensure that the statistics + // follow the ordering semantics of parquet's min/max statistics for the new type. + // Details on how the values should be ordered can be found in the 'parquet-format' + // project in 'parquet.thrift' and 'LogicalTypes.md'. + using value_type = typename std::enable_if< + std::is_arithmetic<T>::value + || std::is_same<bool, T>::value + || std::is_same<StringValue, T>::value + || std::is_same<TimestampValue, T>::value + || std::is_same<Decimal4Value, T>::value + || std::is_same<Decimal8Value, T>::value + || std::is_same<Decimal16Value, T>::value, + T>::type; + + public: + ColumnStats(int plain_encoded_value_size) + : ColumnStatsBase(), plain_encoded_value_size_(plain_encoded_value_size) {} + + /// Updates the statistics based on the value 'v'. If necessary, initializes the + /// statistics. + void Update(const T& v) { + if (!has_values_) { + has_values_ = true; + min_value_ = v; + max_value_ = v; + } else { + min_value_ = min(min_value_, v); + max_value_ = max(max_value_, v); + } + } + + virtual void Merge(const ColumnStatsBase& other) override { + DCHECK(dynamic_cast<const ColumnStats<T>*>(&other)); + const ColumnStats<T>* cs = static_cast<const ColumnStats<T>*>(&other); + if (!cs->has_values_) return; + if (!has_values_) { + has_values_ = true; + min_value_ = cs->min_value_; + max_value_ = cs->max_value_; + } else { + min_value_ = min(min_value_, cs->min_value_); + max_value_ = max(max_value_, cs->max_value_); + } + } + + virtual int64_t BytesNeeded() const override { + return BytesNeededInternal(min_value_) + BytesNeededInternal(max_value_); + } + + virtual void EncodeToThrift(parquet::Statistics* out) const override { + DCHECK(has_values_); + string min_str; + EncodeValueToString(min_value_, &min_str); + out->__set_min(move(min_str)); + string max_str; + EncodeValueToString(max_value_, &max_str); + out->__set_max(move(max_str)); + } + + protected: + /// Encodes a single value using parquet's PLAIN encoding and stores it into the + /// binary string 'out'. + void EncodeValueToString(const T& v, string* out) const { + int64_t bytes_needed = BytesNeededInternal(v); + out->resize(bytes_needed); + int64_t bytes_written = ParquetPlainEncoder::Encode( + reinterpret_cast<uint8_t*>(&(*out)[0]), bytes_needed, v); + DCHECK_EQ(bytes_needed, bytes_written); + } + + /// Returns the number of bytes needed to encode value 'v'. + int64_t BytesNeededInternal(const T& v) const { + return plain_encoded_value_size_ < 0 ? ParquetPlainEncoder::ByteSize<T>(v) : + plain_encoded_value_size_; + } + + // Size of each encoded value in plain encoding, -1 if the type is variable-length. + int plain_encoded_value_size_; + + // Minimum value since the last call to Reset(). + T min_value_; + + // Maximum value since the last call to Reset(). + T max_value_; +}; + +/// Plain encoding for Boolean values is not handled by the ParquetPlainEncoder and thus +/// needs special handling here. +template <> +void ColumnStats<bool>::EncodeValueToString(const bool& v, string* out) const { + char c = v; + out->assign(1, c); +} + +template <> +int64_t ColumnStats<bool>::BytesNeededInternal(const bool& v) const { + return 1; +} + +/// parquet-mr and subsequently Hive currently do not handle the following types +/// correctly (PARQUET-251, PARQUET-686), so we disable support for them. +/// The relevant Impala Jiras are for +/// - StringValue IMPALA-4817 +/// - TimestampValue IMPALA-4819 +/// - DecimalValue IMPALA-4815 +template <> +void ColumnStats<StringValue>::Update(const StringValue& v) {} + +template <> +void ColumnStats<TimestampValue>::Update(const TimestampValue& v) {} + +template <> +void ColumnStats<Decimal4Value>::Update(const Decimal4Value& v) {} + +template <> +void ColumnStats<Decimal8Value>::Update(const Decimal8Value& v) {} + +template <> +void ColumnStats<Decimal16Value>::Update(const Decimal16Value& v) {} + +} // end ns impala +#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6251d8b4/be/src/exec/parquet-common.h ---------------------------------------------------------------------- diff --git a/be/src/exec/parquet-common.h b/be/src/exec/parquet-common.h index d4ffa3d..ff82fed 100644 --- a/be/src/exec/parquet-common.h +++ b/be/src/exec/parquet-common.h @@ -90,7 +90,7 @@ class ParquetPlainEncoder { /// Returns the encoded size of values of type t. Returns -1 if it is variable /// length. This can be different than the slot size of the types. - static int ByteSize(const ColumnType& t) { + static int EncodedByteSize(const ColumnType& t) { switch (t.type) { case TYPE_STRING: case TYPE_VARCHAR: @@ -185,20 +185,13 @@ class ParquetPlainEncoder { memcpy(v, buffer, byte_size); return byte_size; } - - /// Encode 't', which must be in the machine endian, to FIXED_LEN_BYTE_ARRAY - /// of 'fixed_len_size'. The result is encoded as big endian. - template <typename T> - static int EncodeToFixedLenByteArray(uint8_t* buffer, int fixed_len_size, const T& t); - - /// Decodes into v assuming buffer is encoded using FIXED_LEN_BYTE_ARRAY of - /// 'fixed_len_size'. The bytes in buffer must be big endian and the result stored in - /// v is the machine endian format. The caller is responsible for ensuring that - /// 'buffer' is at least 'fixed_len_size' bytes long. - template<typename T> - static int DecodeFromFixedLenByteArray(uint8_t* buffer, int fixed_len_size, T* v); }; +/// Calling this with arguments of type ColumnType is certainly a programmer error, so we +/// disallow it. +template <> +int ParquetPlainEncoder::ByteSize(const ColumnType& t); + /// Disable for bools. Plain encoding is not used for booleans. template<> int ParquetPlainEncoder::ByteSize(const bool& b); template<> int ParquetPlainEncoder::Encode(uint8_t*, int fixed_len_size, const bool&); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6251d8b4/be/src/runtime/string-value.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/string-value.h b/be/src/runtime/string-value.h index cc905fe..3fce865 100644 --- a/be/src/runtime/string-value.h +++ b/be/src/runtime/string-value.h @@ -31,7 +31,7 @@ namespace impala { /// The format of a string-typed slot. /// The returned StringValue of all functions that return StringValue -/// shares its buffer the parent. +/// shares its buffer with the parent. /// TODO: rename this to be less confusing with impala_udf::StringVal. struct StringValue { /// The current limitation for a string instance is 1GB character data. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6251d8b4/be/src/util/dict-test.cc ---------------------------------------------------------------------- diff --git a/be/src/util/dict-test.cc b/be/src/util/dict-test.cc index 39cba66..a14922c 100644 --- a/be/src/util/dict-test.cc +++ b/be/src/util/dict-test.cc @@ -95,7 +95,7 @@ TEST(DictTest, TestTimestamps) { values.push_back(tv1); values.push_back(tv1); - ValidateDict(values, ParquetPlainEncoder::ByteSize(ColumnType(TYPE_TIMESTAMP))); + ValidateDict(values, ParquetPlainEncoder::EncodedByteSize(ColumnType(TYPE_TIMESTAMP))); } template<typename T> @@ -125,12 +125,12 @@ void TestNumbers(int value_byte_size) { } TEST(DictTest, TestNumbers) { - TestNumbers<int8_t>(ParquetPlainEncoder::ByteSize(ColumnType(TYPE_TINYINT))); - TestNumbers<int16_t>(ParquetPlainEncoder::ByteSize(ColumnType(TYPE_SMALLINT))); - TestNumbers<int32_t>(ParquetPlainEncoder::ByteSize(ColumnType(TYPE_INT))); - TestNumbers<int64_t>(ParquetPlainEncoder::ByteSize(ColumnType(TYPE_BIGINT))); - TestNumbers<float>(ParquetPlainEncoder::ByteSize(ColumnType(TYPE_FLOAT))); - TestNumbers<double>(ParquetPlainEncoder::ByteSize(ColumnType(TYPE_DOUBLE))); + TestNumbers<int8_t>(ParquetPlainEncoder::EncodedByteSize(ColumnType(TYPE_TINYINT))); + TestNumbers<int16_t>(ParquetPlainEncoder::EncodedByteSize(ColumnType(TYPE_SMALLINT))); + TestNumbers<int32_t>(ParquetPlainEncoder::EncodedByteSize(ColumnType(TYPE_INT))); + TestNumbers<int64_t>(ParquetPlainEncoder::EncodedByteSize(ColumnType(TYPE_BIGINT))); + TestNumbers<float>(ParquetPlainEncoder::EncodedByteSize(ColumnType(TYPE_FLOAT))); + TestNumbers<double>(ParquetPlainEncoder::EncodedByteSize(ColumnType(TYPE_DOUBLE))); for (int i = 1; i <= 16; ++i) { if (i <= 4) TestNumbers<Decimal4Value>(i); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6251d8b4/tests/query_test/test_insert_parquet.py ---------------------------------------------------------------------- diff --git a/tests/query_test/test_insert_parquet.py b/tests/query_test/test_insert_parquet.py index b7cb285..58a3d74 100644 --- a/tests/query_test/test_insert_parquet.py +++ b/tests/query_test/test_insert_parquet.py @@ -18,8 +18,8 @@ # Targeted Impala insert tests import os -import pytest +from collections import namedtuple from shutil import rmtree from subprocess import check_call from tempfile import mkdtemp as make_tmp_dir @@ -30,16 +30,35 @@ from tests.common.parametrize import UniqueDatabase from tests.common.skip import SkipIfIsilon, SkipIfLocal from tests.common.test_dimensions import create_exec_option_dimension from tests.common.test_vector import ImpalaTestDimension -from tests.util.filesystem_utils import get_fs_path, WAREHOUSE +from tests.util.filesystem_utils import get_fs_path +from tests.util.get_parquet_metadata import get_parquet_metadata, decode_stats_value # TODO: Add Gzip back. IMPALA-424 PARQUET_CODECS = ['none', 'snappy'] + +class RoundFloat(): + """Class to compare floats after rounding them to a specified number of digits. This + can be used in scenarios where floating point precision is an issue. + """ + def __init__(self, value, num_digits): + self.value = value + self.num_digits = num_digits + + def __eq__(self, numeral): + """Compares this objects's value to a numeral after rounding it.""" + return round(self.value, self.num_digits) == round(numeral, self.num_digits) + +ColumnStats = namedtuple('ColumnStats', ['name', 'min', 'max']) + # Test a smaller parquet file size as well # TODO: these tests take a while so we don't want to go through too many sizes but # we should in more exhaustive testing PARQUET_FILE_SIZES = [0, 32 * 1024 * 1024] + + class TestInsertParquetQueries(ImpalaTestSuite): + @classmethod def get_workload(self): return 'tpch' @@ -57,14 +76,14 @@ class TestInsertParquetQueries(ImpalaTestSuite): sync_ddl=[1])) cls.ImpalaTestMatrix.add_dimension( - ImpalaTestDimension("compression_codec", *PARQUET_CODECS)); + ImpalaTestDimension("compression_codec", *PARQUET_CODECS)) cls.ImpalaTestMatrix.add_dimension( - ImpalaTestDimension("file_size", *PARQUET_FILE_SIZES)); + ImpalaTestDimension("file_size", *PARQUET_FILE_SIZES)) - cls.ImpalaTestMatrix.add_constraint(lambda v:\ - v.get_value('table_format').file_format == 'parquet') - cls.ImpalaTestMatrix.add_constraint(lambda v:\ - v.get_value('table_format').compression_codec == 'none') + cls.ImpalaTestMatrix.add_constraint(lambda v: + v.get_value('table_format').file_format == 'parquet') + cls.ImpalaTestMatrix.add_constraint(lambda v: + v.get_value('table_format').compression_codec == 'none') @SkipIfLocal.multiple_impalad @UniqueDatabase.parametrize(sync_ddl=True) @@ -75,7 +94,9 @@ class TestInsertParquetQueries(ImpalaTestSuite): vector.get_value('compression_codec') self.run_test_case('insert_parquet', vector, unique_database, multiple_impalad=True) + class TestInsertParquetInvalidCodec(ImpalaTestSuite): + @classmethod def get_workload(self): return 'functional-query' @@ -88,21 +109,22 @@ class TestInsertParquetInvalidCodec(ImpalaTestSuite): cluster_sizes=[0], disable_codegen_options=[False], batch_sizes=[0], sync_ddl=[1])) cls.ImpalaTestMatrix.add_dimension( - ImpalaTestDimension("compression_codec", 'bzip2')); - cls.ImpalaTestMatrix.add_constraint(lambda v:\ - v.get_value('table_format').file_format == 'parquet') - cls.ImpalaTestMatrix.add_constraint(lambda v:\ - v.get_value('table_format').compression_codec == 'none') + ImpalaTestDimension("compression_codec", 'bzip2')) + cls.ImpalaTestMatrix.add_constraint(lambda v: + v.get_value('table_format').file_format == 'parquet') + cls.ImpalaTestMatrix.add_constraint(lambda v: + v.get_value('table_format').compression_codec == 'none') @SkipIfLocal.multiple_impalad def test_insert_parquet_invalid_codec(self, vector): vector.get_value('exec_option')['COMPRESSION_CODEC'] = \ vector.get_value('compression_codec') - self.run_test_case('QueryTest/insert_parquet_invalid_codec', vector,\ + self.run_test_case('QueryTest/insert_parquet_invalid_codec', vector, multiple_impalad=True) class TestInsertParquetVerifySize(ImpalaTestSuite): + @classmethod def get_workload(self): return 'tpch' @@ -114,12 +136,12 @@ class TestInsertParquetVerifySize(ImpalaTestSuite): cls.ImpalaTestMatrix.add_dimension(create_exec_option_dimension( cluster_sizes=[0], disable_codegen_options=[False], batch_sizes=[0], sync_ddl=[1])) - cls.ImpalaTestMatrix.add_constraint(lambda v:\ - v.get_value('table_format').file_format == 'parquet') - cls.ImpalaTestMatrix.add_constraint(lambda v:\ - v.get_value('table_format').compression_codec == 'none') + cls.ImpalaTestMatrix.add_constraint(lambda v: + v.get_value('table_format').file_format == 'parquet') + cls.ImpalaTestMatrix.add_constraint(lambda v: + v.get_value('table_format').compression_codec == 'none') cls.ImpalaTestMatrix.add_dimension( - ImpalaTestDimension("compression_codec", *PARQUET_CODECS)); + ImpalaTestDimension("compression_codec", *PARQUET_CODECS)) @SkipIfIsilon.hdfs_block_size @SkipIfLocal.hdfs_client @@ -149,10 +171,12 @@ class TestInsertParquetVerifySize(ImpalaTestSuite): assert size < block_size, "File size greater than expected.\ Expected: {0}, Got: {1}".format(block_size, size) if size < block_size * 0.80: - assert found_small_file == False + assert not found_small_file found_small_file = True + class TestHdfsParquetTableWriter(ImpalaTestSuite): + @classmethod def get_workload(cls): return 'functional-query' @@ -171,12 +195,12 @@ class TestHdfsParquetTableWriter(ImpalaTestSuite): table_name = "test_hdfs_parquet_table_writer" qualified_table_name = "%s.%s" % (unique_database, table_name) self.execute_query("create table %s stored as parquet as select l_linenumber from " - "tpch_parquet.lineitem limit 180000" % qualified_table_name) + "tpch_parquet.lineitem limit 180000" % qualified_table_name) tmp_dir = make_tmp_dir() try: hdfs_file = get_fs_path('/test-warehouse/%s.db/%s/*.parq' - % (unique_database, table_name)) + % (unique_database, table_name)) check_call(['hdfs', 'dfs', '-copyToLocal', hdfs_file, tmp_dir]) for root, subdirs, files in os.walk(tmp_dir): @@ -184,7 +208,293 @@ class TestHdfsParquetTableWriter(ImpalaTestSuite): if not f.endswith('parq'): continue check_call([os.path.join(impalad_basedir, 'util/parquet-reader'), '--file', - os.path.join(tmp_dir, str(f))]) + os.path.join(tmp_dir, str(f))]) finally: self.execute_query("drop table %s" % qualified_table_name) rmtree(tmp_dir) + + +class TestHdfsParquetTableStatsWriter(ImpalaTestSuite): + + @classmethod + def get_workload(cls): + return 'functional-query' + + @classmethod + def add_test_dimensions(cls): + super(TestHdfsParquetTableStatsWriter, cls).add_test_dimensions() + cls.ImpalaTestMatrix.add_constraint( + lambda v: v.get_value('table_format').file_format == 'parquet') + + def _decode_row_group_stats(self, schemas, row_group_stats): + """Decodes and return a list of statistics for a single row group.""" + decoded = [] + assert len(schemas) == len(row_group_stats) + for schema, stats in zip(schemas, row_group_stats): + if stats is None: + decoded.append(None) + continue + + if stats.min is None and stats.max is None: + decoded.append(None) + continue + + assert stats.min is not None and stats.max is not None + min_value = decode_stats_value(schema, stats.min) + max_value = decode_stats_value(schema, stats.max) + decoded.append(ColumnStats(schema.name, min_value, max_value)) + + assert len(decoded) == len(schemas) + return decoded + + def _get_row_group_stats_from_file(self, parquet_file): + """Returns a list of statistics for each row group in file 'parquet_file'. The result + is a two-dimensional list, containing stats by row group and column.""" + file_meta_data = get_parquet_metadata(parquet_file) + # We only support flat schemas, the additional element is the root element. + schemas = file_meta_data.schema[1:] + file_stats = [] + for row_group in file_meta_data.row_groups: + num_columns = len(row_group.columns) + assert num_columns == len(schemas) + column_stats = [c.meta_data.statistics for c in row_group.columns] + file_stats.append(self._decode_row_group_stats(schemas, column_stats)) + + return file_stats + + def _get_row_group_stats_from_hdfs_folder(self, hdfs_path): + """Returns a list of statistics for each row group in all parquet files in + 'hdfs_path'. The result is a two-dimensional list, containing stats by row group and + column.""" + row_group_stats = [] + + try: + tmp_dir = make_tmp_dir() + check_call(['hdfs', 'dfs', '-get', hdfs_path, tmp_dir]) + + for root, subdirs, files in os.walk(tmp_dir): + for f in files: + parquet_file = os.path.join(root, str(f)) + row_group_stats.extend(self._get_row_group_stats_from_file(parquet_file)) + + finally: + rmtree(tmp_dir) + + return row_group_stats + + def _validate_min_max_stats(self, hdfs_path, expected_values, skip_col_idxs = None): + """Validates that 'hdfs_path' contains exactly one parquet file and that the rowgroup + statistics in that file match the values in 'expected_values'. Columns indexed by + 'skip_col_idx' are excluded from the verification of the expected values. + """ + skip_col_idxs = skip_col_idxs or [] + # The caller has to make sure that the table fits into a single row group. We enforce + # it here to make sure the results are predictable and independent of how the data + # could get written across multiple files. + row_group_stats = self._get_row_group_stats_from_hdfs_folder(hdfs_path) + assert(len(row_group_stats)) == 1 + table_stats = row_group_stats[0] + + num_columns = len(table_stats) + assert num_columns == len(expected_values) + + for col_idx, stats, expected in zip(range(num_columns), table_stats, expected_values): + if col_idx in skip_col_idxs: + continue + if not expected: + assert not stats + continue + assert stats == expected + + def _ctas_table_and_verify_stats(self, vector, unique_database, source_table, + expected_values, hive_skip_col_idx = None): + """Copies 'source_table' into a parquet table and makes sure that the row group + statistics in the resulting parquet file match those in 'expected_values'. The + comparison is performed against both Hive and Impala. For Hive, columns indexed by + 'hive_skip_col_idx' are excluded from the verification of the expected values. + """ + table_name = "test_hdfs_parquet_table_writer" + qualified_table_name = "{0}.{1}".format(unique_database, table_name) + hdfs_path = get_fs_path('/test-warehouse/{0}.db/{1}/'.format(unique_database, + table_name)) + + # Validate against Hive. + self.execute_query("drop table if exists {0}".format(qualified_table_name)) + self.run_stmt_in_hive("create table {0} stored as parquet as select * from " + "{1}".format(qualified_table_name, source_table)) + self.execute_query("invalidate metadata {0}".format(qualified_table_name)) + self._validate_min_max_stats(hdfs_path, expected_values, hive_skip_col_idx) + + # Validate against Impala. Setting exec_single_node_rows_threshold and adding a limit + # clause ensures that the query is executed on the coordinator, resulting in a single + # parquet file being written. + num_rows = self.execute_scalar("select count(*) from {0}".format(source_table)) + self.execute_query("drop table {0}".format(qualified_table_name)) + query = ("create table {0} stored as parquet as select * from {1} limit " + "{2}").format(qualified_table_name, source_table, num_rows) + vector.get_value('exec_option')['EXEC_SINGLE_NODE_ROWS_THRESHOLD'] = num_rows + self.execute_query(query, vector.get_value('exec_option')) + self._validate_min_max_stats(hdfs_path, expected_values) + + def test_write_statistics_alltypes(self, vector, unique_database): + """Test that writing a parquet file populates the rowgroup statistics with the correct + values. + """ + # Expected values for functional.alltypes + expected_min_max_values = [ + ColumnStats('id', 0, 7299), + ColumnStats('bool_col', False, True), + ColumnStats('tinyint_col', 0, 9), + ColumnStats('smallint_col', 0, 9), + ColumnStats('int_col', 0, 9), + ColumnStats('bigint_col', 0, 90), + ColumnStats('float_col', 0, RoundFloat(9.9, 1)), + ColumnStats('double_col', 0, RoundFloat(90.9, 1)), + None, + None, + None, + ColumnStats('year', 2009, 2010), + ColumnStats('month', 1, 12), + ] + + # Skip comparison of unsupported columns types with Hive. + hive_skip_col_idx = [8, 9, 10] + + self._ctas_table_and_verify_stats(vector, unique_database, "functional.alltypes", + expected_min_max_values, hive_skip_col_idx) + + def test_write_statistics_decimal(self, vector, unique_database): + """Test that Impala does not write statistics for decimal columns.""" + # Expected values for functional.decimal_tbl + expected_min_max_values = [None, None, None, None, None, None] + + # Skip comparison of unsupported columns types with Hive. + hive_skip_col_idx = range(len(expected_min_max_values)) + + self._ctas_table_and_verify_stats(vector, unique_database, "functional.decimal_tbl", + expected_min_max_values, hive_skip_col_idx) + + def test_write_statistics_multi_page(self, vector, unique_database): + """Test that writing a parquet file populates the rowgroup statistics with the correct + values. This test write a single parquet file with several pages per column. + """ + # Expected values for tpch_parquet.customer + expected_min_max_values = [ + ColumnStats('c_custkey', 1, 150000), + None, + None, + ColumnStats('c_nationkey', 0, 24), + None, + None, + None, + None, + ] + + # Skip comparison of unsupported columns types with Hive. + hive_skip_col_idx = [1, 2, 4, 5, 6, 7] + + self._ctas_table_and_verify_stats(vector, unique_database, "tpch_parquet.customer", + expected_min_max_values, hive_skip_col_idx) + + def test_write_statistics_null(self, vector, unique_database): + """Test that we don't write min/max statistics for null columns.""" + expected_min_max_values = [None, None, None, None, None, None, None] + + # Skip comparison of unsupported columns types with Hive. + hive_skip_col_idx = range(len(expected_min_max_values)) + + self._ctas_table_and_verify_stats(vector, unique_database, "functional.nulltable", + expected_min_max_values, hive_skip_col_idx) + + def test_write_statistics_char_types(self, vector, unique_database): + """Test that Impala does not write statistics for char columns.""" + expected_min_max_values = [None, None, None] + + # Skip comparison of unsupported columns types with Hive. + hive_skip_col_idx = range(len(expected_min_max_values)) + + self._ctas_table_and_verify_stats(vector, unique_database, "functional.chars_formats", + expected_min_max_values, hive_skip_col_idx) + + def test_write_statistics_negative(self, vector, unique_database): + """Test that Impala correctly writes statistics for negative values.""" + view_name = "test_negative_view" + qualified_view_name = "{0}.{1}".format(unique_database, view_name) + + # Create a view to generate test data with negative values by negating every other + # row. + create_view_stmt = """create view {0} as select + id * cast(pow(-1, id % 2) as int) as id, + int_col * cast(pow(-1, id % 2) as int) as int_col, + bigint_col * cast(pow(-1, id % 2) as bigint) as bigint_col, + float_col * pow(-1, id % 2) as float_col, + double_col * pow(-1, id % 2) as double_col + from functional.alltypes""".format(qualified_view_name) + self.execute_query(create_view_stmt) + + expected_min_max_values = [ + ColumnStats('id', -7299, 7298), + ColumnStats('int_col', -9, 8), + ColumnStats('bigint_col', -90, 80), + ColumnStats('float_col', RoundFloat(-9.9, 1), RoundFloat(8.8, 1)), + ColumnStats('double_col', RoundFloat(-90.9, 1), RoundFloat(80.8, 1)), + ] + + self._ctas_table_and_verify_stats(vector, unique_database, qualified_view_name, + expected_min_max_values) + + def test_write_statistics_multiple_row_groups(self, vector, unique_database): + """Test that writing multiple row groups works as expected. This is done by inserting + into a table using the sortby() hint and then making sure that the min and max values + of row groups don't overlap.""" + source_table = "tpch_parquet.orders" + target_table = "test_hdfs_parquet_table_writer" + qualified_target_table = "{0}.{1}".format(unique_database, target_table) + hdfs_path = get_fs_path("/test-warehouse/{0}.db/{1}/".format( + unique_database, target_table)) + + # Insert a large amount of data on a single backend with a limited parquet file size. + # This will result in several files being written, exercising code that tracks + # statistics for row groups. + num_rows = self.execute_scalar("select count(*) from {0}".format(source_table)) + query = "create table {0} like {1} stored as parquet".format(qualified_target_table, + source_table) + self.execute_query(query, vector.get_value('exec_option')) + query = ("insert into {0} /* +sortby(o_orderkey) */ select * from {1} limit" + "{2}").format(qualified_target_table, source_table, num_rows) + vector.get_value('exec_option')['EXEC_SINGLE_NODE_ROWS_THRESHOLD'] = num_rows + vector.get_value('exec_option')['PARQUET_FILE_SIZE'] = 8 * 1024 * 1024 + self.execute_query(query, vector.get_value('exec_option')) + + # Get all stats for the o_orderkey column + row_group_stats = self._get_row_group_stats_from_hdfs_folder(hdfs_path) + assert len(row_group_stats) > 1 + orderkey_stats = [s[0] for s in row_group_stats] + + # Make sure that they don't overlap by ordering by the min value, then looking at + # boundaries. + orderkey_stats.sort(key = lambda s: s.min) + for l, r in zip(orderkey_stats, orderkey_stats[1:]): + assert l.max <= r.min + + def test_write_statistics_float_infinity(self, vector, unique_database): + """Test that statistics for -inf and inf are written correctly.""" + table_name = "test_float_infinity" + qualified_table_name = "{0}.{1}".format(unique_database, table_name) + + create_table_stmt = "create table {0} (f float, d double);".format( + qualified_table_name) + self.execute_query(create_table_stmt) + + insert_stmt = """insert into {0} values + (cast('-inf' as float), cast('-inf' as double)), + (cast('inf' as float), cast('inf' as double))""".format(qualified_table_name) + self.execute_query(insert_stmt) + + expected_min_max_values = [ + ColumnStats('f', float('-inf'), float('inf')), + ColumnStats('d', float('-inf'), float('inf')), + ] + + self._ctas_table_and_verify_stats(vector, unique_database, qualified_table_name, + expected_min_max_values) http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6251d8b4/tests/util/get_parquet_metadata.py ---------------------------------------------------------------------- diff --git a/tests/util/get_parquet_metadata.py b/tests/util/get_parquet_metadata.py index cb417a5..8cf0405 100644 --- a/tests/util/get_parquet_metadata.py +++ b/tests/util/get_parquet_metadata.py @@ -18,15 +18,110 @@ import os import struct -from parquet.ttypes import FileMetaData +from datetime import date, datetime, time, timedelta +from decimal import Decimal +from parquet.ttypes import FileMetaData, Type from thrift.protocol import TCompactProtocol from thrift.transport import TTransport PARQUET_VERSION_NUMBER = 'PAR1' -def parse_int(s): - """Reinterprets the string 's' as a 4-byte integer.""" - return struct.unpack('i', s)[0] + +def julian_day_to_date(julian_day): + """Converts a julian day number into a Gregorian date. The reference date is picked + arbitrarily and can be validated with an online converter like + http://aa.usno.navy.mil/jdconverter?ID=AA&jd=2457755 + """ + return date(2017, 01, 01) + timedelta(julian_day - 2457755) + + +def nanos_to_time(nanos): + """Converts nanoseconds to time of day.""" + micros = nanos // 1000 # integer division + seconds, micros = divmod(micros, 10**6) + minutes, seconds = divmod(seconds, 60) + hours, minutes = divmod(minutes, 60) + return time(hours, minutes, seconds, micros) + + +def parse_boolean(s): + """Parses a single boolean value from a single byte""" + return struct.unpack('<?', s)[0] + + +def parse_int32(s): + """Reinterprets the string 's' as a signed 4-byte integer.""" + return struct.unpack('<i', s)[0] + + +def parse_int64(s): + """Reinterprets the string 's' as a signed 8-byte integer.""" + return struct.unpack('<q', s)[0] + + +def parse_float(s): + """Reinterprets the string 's' as an IEEE single precision float.""" + return struct.unpack('<f', s)[0] + + +def parse_double(s): + """Reinterprets the string 's' as an IEEE double precision float.""" + return struct.unpack('<d', s)[0] + + +def decode_timestamp(s): + """Reinterprets the string 's' as a 12-byte timestamp as written by Impala and decode it + into a datetime object.""" + # Impala writes timestamps as 12-byte values. The first 8 byte store a + # boost::posix_time::time_duration, which is the time within the current day in + # nanoseconds stored as int64. The last 4 bytes store a boost::gregorian::date, + # which is the Julian date, stored as utin32. + day_nanos, julian_day = struct.unpack('<qI', s) + return datetime.combine(julian_day_to_date(julian_day), nanos_to_time(day_nanos)) + + +def decode_decimal(schema, value): + """Decodes 'value' into a decimal by interpreting its contents according to 'schema'.""" + assert len(value) > 0 + assert schema.type_length == len(value) + assert schema.type == Type.FIXED_LEN_BYTE_ARRAY + + numeric = Decimal(reduce(lambda x, y: x * 256 + y, map(ord, value))) + + # Compute two's complement for negative values. + if (ord(value[0]) > 127): + bit_width = 8 * len(value) + numeric = numeric - (2 ** bit_width) + + return numeric / 10 ** schema.scale + + +def decode_stats_value(schema, value): + """Decodes 'value' according to 'schema. It expects 'value' to be plain encoded. For + BOOLEAN values, only the least significant bit is parsed and returned. Binary arrays are + expected to be stored as such, without a preceding length.""" + column_type = schema.type + if column_type == Type.BOOLEAN: + return parse_boolean(value) + elif column_type == Type.INT32: + return parse_int32(value) + elif column_type == Type.INT64: + return parse_int64(value) + elif column_type == Type.INT96: + # Impala uses INT96 to store timestamps + return decode_timestamp(value) + elif column_type == Type.FLOAT: + return parse_float(value) + elif column_type == Type.DOUBLE: + return parse_double(value) + elif column_type == Type.BYTE_ARRAY: + # In parquet::Statistics, strings are stored as is. + return value + elif column_type == Type.FIXED_LEN_BYTE_ARRAY: + return decode_decimal(schema, value) + assert False + return None + def get_parquet_metadata(filename): """Returns a FileMetaData as defined in parquet.thrift. 'filename' must be a local @@ -43,7 +138,7 @@ def get_parquet_metadata(filename): # Read metadata length f.seek(file_size - len(PARQUET_VERSION_NUMBER) - 4) - metadata_len = parse_int(f.read(4)) + metadata_len = parse_int32(f.read(4)) # Read metadata f.seek(file_size - len(PARQUET_VERSION_NUMBER) - 4 - metadata_len)