IMPALA-2736: Optimized ReadValueBatch() for Parquet scalar column readers. This change builds on top of the recent move to column-wise materialization of scalar values in the Parquet scanner.
The goal of this patch is to improve the scan efficiency, and show the future direction for all column readers. Major TODO: The current patch has minor code duplication/redundancy, and the new ReadValueBatch() departs from (but improves) the existing column reader control flow. To improve code reuse and readability we should overhaul all column readers to be more uniform. Summary of changes: - refactor ReadValueBatch() to simplify control flow - introduce caching of def/rep levels for faster level decoding, and for a tigher value materialization loop - new templated function for value materialization that takes the value encoding as a template argument Mini benchmark vs. cdh5-trunk I ran the following queries on a single impalad before and after my change using a synthetic 'huge_lineitem' table. I modified hdfs-scan-node.cc to set the number of rows of any row batch to 0 to focus the measurement on the scan time. Query options: set num_scanner_threads=1; set disable_codegen=true; set num_nodes=1; select * from huge_lineitem; Before: 22.39s Afer: 13.62s select * from huge_lineitem where l_linenumber < 0; Before: 25.11s After: 17.73s select * from huge_lineitem where l_linenumber % 2 = 0; Before: 26.32s After: 16.68s select l_linenumber from huge_lineitem; Before: 1.74s After: 0.92s Testing: I ran a private exhaustive build and all tests passed. Change-Id: I21fa9b050a45f2dd45cc0091ea5b008d3c0a3f30 Reviewed-on: http://gerrit.cloudera.org:8080/2843 Reviewed-by: Alex Behm <[email protected]> Tested-by: Alex Behm <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/14cdb049 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/14cdb049 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/14cdb049 Branch: refs/heads/master Commit: 14cdb0497c52f102623b386bc35482f4c2fa7f8b Parents: df8bf3a Author: Alex Behm <[email protected]> Authored: Thu Apr 21 23:39:21 2016 -0700 Committer: Tim Armstrong <[email protected]> Committed: Thu May 12 14:18:05 2016 -0700 ---------------------------------------------------------------------- be/src/exec/hdfs-parquet-scanner.cc | 468 ++++++++++++++++++++++--------- be/src/exec/hdfs-parquet-scanner.h | 11 + be/src/util/rle-encoding.h | 10 +- 3 files changed, 356 insertions(+), 133 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/14cdb049/be/src/exec/hdfs-parquet-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc index 52fe3fa..47e19a6 100644 --- a/be/src/exec/hdfs-parquet-scanner.cc +++ b/be/src/exec/hdfs-parquet-scanner.cc @@ -64,6 +64,9 @@ DEFINE_double(parquet_min_filter_reject_ratio, 0.1, "(Advanced) If the percentag "rows rejected by a runtime filter drops below this value, the filter is disabled."); const int64_t HdfsParquetScanner::FOOTER_SIZE = 100 * 1024; +const int16_t HdfsParquetScanner::ROW_GROUP_END = numeric_limits<int16_t>::min(); +const int16_t HdfsParquetScanner::INVALID_LEVEL = -1; +const int16_t HdfsParquetScanner::INVALID_POS = -1; // Max data page header size in bytes. This is an estimate and only needs to be an upper // bound. It is theoretically possible to have a page header of any size due to string @@ -230,22 +233,87 @@ HdfsParquetScanner::~HdfsParquetScanner() { // TODO for 2.3: move column readers to separate file -/// Decoder for all supported Parquet level encodings. -/// Overrides RleDecoder so it can use RLE decoding and internal bit reader. -class HdfsParquetScanner::LevelDecoder : protected RleDecoder { +/// Decoder for all supported Parquet level encodings. Optionally reads, decodes, and +/// caches level values in batches. +/// Level values are unsigned 8-bit integers because we support a maximum nesting +/// depth of 100, as enforced by the FE. Using a small type saves memory and speeds up +/// populating the level cache (e.g., with RLE we can memset() repeated values). +/// +/// Inherits from RleDecoder instead of containing one for performance reasons. +/// The containment design would require two BitReaders per column reader. The extra +/// BitReader causes enough bloat for a column reader to require another cache line. +/// TODO: It is not clear whether the inheritance vs. containment choice still makes +/// sense with column-wise materialization. The containment design seems cleaner and +/// we should revisit. +class HdfsParquetScanner::LevelDecoder : public RleDecoder { public: - LevelDecoder() {}; + LevelDecoder(bool is_def_level_decoder) + : cached_levels_(NULL), + num_cached_levels_(0), + cached_level_idx_(0), + encoding_(parquet::Encoding::PLAIN), + max_level_(0), + cache_size_(0), + num_buffered_values_(0), + decoding_error_code_(is_def_level_decoder ? + TErrorCode::PARQUET_DEF_LEVEL_ERROR : TErrorCode::PARQUET_REP_LEVEL_ERROR) { + } /// Initialize the LevelDecoder. Reads and advances the provided data buffer if the /// encoding requires reading metadata from the page header. - Status Init(const string& filename, parquet::Encoding::type encoding, int max_level, - int num_buffered_values, uint8_t** data, int* data_size); + Status Init(const string& filename, parquet::Encoding::type encoding, + MemPool* cache_pool, int cache_size, int max_level, int num_buffered_values, + uint8_t** data, int* data_size); - /// Reads the next level. + /// Returns the next level or INVALID_LEVEL if there was an error. inline int16_t ReadLevel(); + /// Decodes and caches the next batch of levels. Resets members associated with the + /// cache. Returns a non-ok status if there was a problem decoding a level, or if a + /// level was encountered with a value greater than max_level_. + Status CacheNextBatch(int batch_size); + + /// Functions for working with the level cache. + inline bool CacheHasNext() const { return cached_level_idx_ < num_cached_levels_; } + inline uint8_t CacheGetNext() { + DCHECK_LT(cached_level_idx_, num_cached_levels_); + return cached_levels_[cached_level_idx_++]; + } + inline void CacheSkipLevels(int num_levels) { + DCHECK_LE(cached_level_idx_ + num_levels, num_cached_levels_); + cached_level_idx_ += num_levels; + } + inline int CacheSize() const { return num_cached_levels_; } + inline int CacheRemaining() const { return num_cached_levels_ - cached_level_idx_; } + inline int CacheCurrIdx() const { return cached_level_idx_; } + private: + /// Initializes members associated with the level cache. Allocates memory for + /// the cache from pool, if necessary. + Status InitCache(MemPool* pool, int cache_size); + + /// Decodes and writes a batch of levels into the cache. Sets the number of + /// values written to the cache in *num_cached_levels. Returns false if there was + /// an error decoding a level or if there was a level value greater than max_level_. + bool FillCache(int batch_size, int* num_cached_levels); + + /// Buffer for a batch of levels. The memory is allocated and owned by a pool in + /// passed in Init(). + uint8_t* cached_levels_; + /// Number of valid level values in the cache. + int num_cached_levels_; + /// Current index into cached_levels_. + int cached_level_idx_; parquet::Encoding::type encoding_; + + /// For error checking and reporting. + int max_level_; + /// Number of level values cached_levels_ has memory allocated for. + int cache_size_; + /// Number of remaining data values in the current data page. + int num_buffered_values_; + string filename_; + TErrorCode::type decoding_error_code_; }; /// Base class for reading a column. Reads a logical column, not necessarily a column @@ -306,6 +374,11 @@ class HdfsParquetScanner::ColumnReader { /// not in collections. virtual bool ReadNonRepeatedValue(MemPool* pool, Tuple* tuple) = 0; + /// Returns true if this reader needs to be seeded with NextLevels() before + /// calling ReadValueBatch() or ReadNonRepeatedValueBatch(). + /// Note that all readers need to be seeded before calling the non-batched ReadValue(). + virtual bool NeedsSeedingForBatchedReading() const { return true; } + /// Batched version of ReadValue() that reads up to max_values at once and materializes /// them into tuples in tuple_mem. Returns the number of values actually materialized /// in *num_values. The return value, error behavior and state changes are generally @@ -340,9 +413,8 @@ class HdfsParquetScanner::ColumnReader { /// 'tuple') and increments pos_current_value_. void ReadPosition(Tuple* tuple); - /// Returns true if this column reader has reached the end of the row group, or - /// if this column reader has not been seeded with a first NextLevels(). - inline bool RowGroupAtEnd() { return rep_level_ == -1; } + /// Returns true if this column reader has reached the end of the row group. + inline bool RowGroupAtEnd() { return rep_level_ == ROW_GROUP_END; } protected: HdfsParquetScanner* parent_; @@ -385,10 +457,10 @@ class HdfsParquetScanner::ColumnReader { node_(node), slot_desc_(slot_desc), pos_slot_desc_(NULL), - pos_current_value_(-1), - rep_level_(-1), + pos_current_value_(INVALID_POS), + rep_level_(INVALID_LEVEL), max_rep_level_(node_.max_rep_level), - def_level_(-1), + def_level_(INVALID_LEVEL), max_def_level_(node_.max_def_level), tuple_offset_(slot_desc == NULL ? -1 : slot_desc->tuple_offset()), null_indicator_offset_(slot_desc == NULL ? NullIndicatorOffset(-1, -1) : @@ -402,7 +474,6 @@ class HdfsParquetScanner::ColumnReader { } }; - /// Collections are not materialized directly in parquet files; only scalar values appear /// in the file. CollectionColumnReader uses the definition and repetition levels of child /// column readers to figure out the boundaries of each collection in this column. @@ -476,6 +547,8 @@ class HdfsParquetScanner::BaseScalarColumnReader : BaseScalarColumnReader(HdfsParquetScanner* parent, const SchemaNode& node, const SlotDescriptor* slot_desc) : ColumnReader(parent, node, slot_desc), + def_levels_(true), + rep_levels_(false), num_buffered_values_(0), num_values_read_(0), metadata_(NULL), @@ -577,18 +650,10 @@ class HdfsParquetScanner::BaseScalarColumnReader : /// def_level_ and pos_current_value_ to -1 if no more pages or an error encountered. bool NextPage(); - /// Implementation for NextLevels() and NextDefLevel(). + /// Implementation for NextLevels(). template <bool ADVANCE_REP_LEVEL> bool NextLevels(); - /// Returns the definition level for the next value - /// Returns -1 and sets parse_status_ if there was a error parsing it. - int16_t ReadDefinitionLevel(); - - /// Returns the repetition level for the next value - /// Returns -1 and sets parse_status_ if there was a error parsing it. - int16_t ReadRepetitionLevel(); - /// Creates a dictionary decoder from values/size and store in class. Subclass must /// implement this. virtual DictDecoderBase* CreateDictionaryDecoder(uint8_t* values, int size) = 0; @@ -606,12 +671,6 @@ class HdfsParquetScanner::BaseScalarColumnReader : virtual Status InitDataPage(uint8_t* data, int size) = 0; private: - // Pull out slow-path Status construction code from ReadRepetitionLevel()/ - // ReadDefinitionLevel() for performance. - void __attribute__((noinline)) SetLevelError(TErrorCode::type error_code) { - parent_->parse_status_ = Status(error_code, num_buffered_values_, filename()); - } - /// Writes the next value into *slot using pool if necessary. Also advances rep_level_ /// and def_level_ via NextLevels(). /// @@ -667,11 +726,17 @@ class HdfsParquetScanner::ScalarColumnReader : return ReadValue<false>(pool, tuple); } + virtual bool NeedsSeedingForBatchedReading() const { return false; } + virtual bool ReadValueBatch(MemPool* pool, int max_values, int tuple_size, - uint8_t* tuple_mem, int* num_values); + uint8_t* tuple_mem, int* num_values) { + return ReadValueBatch<true>(pool, max_values, tuple_size, tuple_mem, num_values); + } virtual bool ReadNonRepeatedValueBatch(MemPool* pool, int max_values, int tuple_size, - uint8_t* tuple_mem, int* num_values); + uint8_t* tuple_mem, int* num_values) { + return ReadValueBatch<false>(pool, max_values, tuple_size, tuple_mem, num_values); + } protected: template <bool IN_COLLECTION> @@ -685,15 +750,134 @@ class HdfsParquetScanner::ScalarColumnReader : DCHECK_GE(def_level_, def_level_of_immediate_repeated_ancestor()) << "Caller should have called NextLevels() until we are ready to read a value"; - if (!MATERIALIZED) { - return NextLevels<IN_COLLECTION>(); - } else if (def_level_ >= max_def_level()) { - return ReadSlot<IN_COLLECTION>(tuple->GetSlot(tuple_offset_), pool); - } else { - // Null value - tuple->SetNull(null_indicator_offset_); - return NextLevels<IN_COLLECTION>(); + if (MATERIALIZED) { + if (def_level_ >= max_def_level()) { + if (page_encoding_ == parquet::Encoding::PLAIN_DICTIONARY) { + if (!ReadSlot<true>(tuple->GetSlot(tuple_offset_), pool)) return false; + } else { + if (!ReadSlot<false>(tuple->GetSlot(tuple_offset_), pool)) return false; + } + } else { + tuple->SetNull(null_indicator_offset_); + } } + return NextLevels<IN_COLLECTION>(); + } + + /// Implementation of the ReadValueBatch() functions specialized for this + /// column reader type. This function drives the reading of data pages and + /// caching of rep/def levels. Once a data page and cached levels are available, + /// it calls into a more specialized MaterializeValueBatch() for doing the actual + /// value materialization using the level caches. + template<bool IN_COLLECTION> + bool ReadValueBatch(MemPool* pool, int max_values, int tuple_size, + uint8_t* tuple_mem, int* num_values) { + // Repetition level is only present if this column is nested in a collection type. + if (!IN_COLLECTION) DCHECK_EQ(max_rep_level(), 0) << slot_desc()->DebugString(); + if (IN_COLLECTION) DCHECK_GT(max_rep_level(), 0) << slot_desc()->DebugString(); + + int val_count = 0; + bool continue_execution = true; + while (val_count < max_values && !RowGroupAtEnd() && continue_execution) { + // Read next page if necessary. + if (num_buffered_values_ == 0) { + if (!NextPage()) { + continue_execution = parent_->parse_status_.ok(); + continue; + } + } + + // Fill def/rep level caches if they are empty. + int level_batch_size = min(parent_->state_->batch_size(), num_buffered_values_); + if (!def_levels_.CacheHasNext()) { + parent_->parse_status_.MergeStatus(def_levels_.CacheNextBatch(level_batch_size)); + } + // We only need the repetition levels for populating the position slot since we + // are only populating top-level tuples. + if (IN_COLLECTION && pos_slot_desc_ != NULL && !rep_levels_.CacheHasNext()) { + parent_->parse_status_.MergeStatus(rep_levels_.CacheNextBatch(level_batch_size)); + } + if (UNLIKELY(!parent_->parse_status_.ok())) return false; + + // This special case is most efficiently handled here directly. + if (!MATERIALIZED && !IN_COLLECTION) { + int vals_to_add = min(def_levels_.CacheRemaining(), max_values - val_count); + val_count += vals_to_add; + def_levels_.CacheSkipLevels(vals_to_add); + num_buffered_values_ -= vals_to_add; + continue; + } + + // Read data page and cached levels to materialize values. + int cache_start_idx = def_levels_.CacheCurrIdx(); + uint8_t* next_tuple = tuple_mem + val_count * tuple_size; + int remaining_val_capacity = max_values - val_count; + int ret_val_count = 0; + if (page_encoding_ == parquet::Encoding::PLAIN_DICTIONARY) { + continue_execution = MaterializeValueBatch<IN_COLLECTION, true>( + pool, remaining_val_capacity, tuple_size, next_tuple, &ret_val_count); + } else { + continue_execution = MaterializeValueBatch<IN_COLLECTION, false>( + pool, remaining_val_capacity, tuple_size, next_tuple, &ret_val_count); + } + val_count += ret_val_count; + num_buffered_values_ -= (def_levels_.CacheCurrIdx() - cache_start_idx); + } + *num_values = val_count; + return continue_execution; + } + + /// Helper function for ReadValueBatch() above that performs value materialization. + /// It assumes a data page with remaining values is available, and that the def/rep + /// level caches have been populated. + /// For efficiency, the simple special case of !MATERIALIZED && !IN_COLLECTION is not + /// handled in this function. + template<bool IN_COLLECTION, bool IS_DICT_ENCODED> + bool MaterializeValueBatch(MemPool* pool, int max_values, int tuple_size, + uint8_t* tuple_mem, int* num_values) { + DCHECK(MATERIALIZED || IN_COLLECTION); + DCHECK_GT(num_buffered_values_, 0); + DCHECK(def_levels_.CacheHasNext()); + if (IN_COLLECTION && pos_slot_desc_ != NULL) DCHECK(rep_levels_.CacheHasNext()); + + uint8_t* curr_tuple = tuple_mem; + int val_count = 0; + while (def_levels_.CacheHasNext()) { + Tuple* tuple = reinterpret_cast<Tuple*>(curr_tuple); + int def_level = def_levels_.CacheGetNext(); + + if (IN_COLLECTION) { + if (def_level < def_level_of_immediate_repeated_ancestor()) { + // A containing repeated field is empty or NULL. Skip the value but + // move to the next repetition level if necessary. + if (pos_slot_desc_ != NULL) rep_levels_.CacheGetNext(); + continue; + } + if (pos_slot_desc_ != NULL) { + int rep_level = rep_levels_.CacheGetNext(); + // Reset position counter if we are at the start of a new parent collection. + if (rep_level <= max_rep_level() - 1) pos_current_value_ = 0; + void* pos_slot = tuple->GetSlot(pos_slot_desc()->tuple_offset()); + *reinterpret_cast<int64_t*>(pos_slot) = pos_current_value_++; + } + } + + if (MATERIALIZED) { + if (def_level >= max_def_level()) { + bool continue_execution = + ReadSlot<IS_DICT_ENCODED>(tuple->GetSlot(tuple_offset_), pool); + if (UNLIKELY(!continue_execution)) break; + } else { + tuple->SetNull(null_indicator_offset_); + } + } + + curr_tuple += tuple_size; + ++val_count; + if (UNLIKELY(val_count == max_values)) break; + } + *num_values = val_count; + return true; } virtual DictDecoderBase* CreateDictionaryDecoder(uint8_t* values, int size) { @@ -734,17 +918,17 @@ class HdfsParquetScanner::ScalarColumnReader : } private: - /// Writes the next value into *slot using pool if necessary. Also advances def_level_ - /// and rep_level_ via NextLevels(). + /// Writes the next value into *slot using pool if necessary. /// /// Returns false if execution should be aborted for some reason, e.g. parse_error_ is /// set, the query is cancelled, or the scan node limit was reached. Otherwise returns /// true. - template <bool IN_COLLECTION> + template<bool IS_DICT_ENCODED> inline bool ReadSlot(void* slot, MemPool* pool) { T val; T* val_ptr = NeedsConversion() ? &val : reinterpret_cast<T*>(slot); - if (page_encoding_ == parquet::Encoding::PLAIN_DICTIONARY) { + if (IS_DICT_ENCODED) { + DCHECK_EQ(page_encoding_, parquet::Encoding::PLAIN_DICTIONARY); if (UNLIKELY(!dict_decoder_.GetValue(val_ptr))) { SetDictDecodeError(); return false; @@ -757,7 +941,7 @@ class HdfsParquetScanner::ScalarColumnReader : !ConvertSlot(&val, reinterpret_cast<T*>(slot), pool))) { return false; } - return NextLevels<IN_COLLECTION>(); + return true; } /// Most column readers never require conversion, so we can avoid branches by @@ -794,48 +978,6 @@ class HdfsParquetScanner::ScalarColumnReader : int fixed_len_size_; }; -/// Implementations of the batched ReadValue() functions specialized for this -/// column reader type. -/// TODO: The code is only a proof-of-concept. It is almost identical to the -/// generic ColumnReader::ReadValueBatch() function, except that this version -/// avoids the virtual function calls for NextLevels()/ReadValue(). -/// This function needs to be optimized further. -template<typename T, bool MATERIALIZED> -bool HdfsParquetScanner::ScalarColumnReader<T, MATERIALIZED>::ReadValueBatch( - MemPool* pool, int max_values, int tuple_size, uint8_t* tuple_mem, int* num_values) { - int val_count = 0; - bool continue_execution = true; - while (val_count < max_values && !RowGroupAtEnd() && continue_execution) { - Tuple* tuple = reinterpret_cast<Tuple*>(tuple_mem + val_count * tuple_size); - if (def_level_ < def_level_of_immediate_repeated_ancestor()) { - // A containing repeated field is empty or NULL - continue_execution = NextLevels<true>(); - continue; - } - // Fill in position slot if applicable - if (pos_slot_desc_ != NULL) ReadPosition(tuple); - continue_execution = ReadValue<true>(pool, tuple); - ++val_count; - } - *num_values = val_count; - return continue_execution; -} - -template<typename T, bool MATERIALIZED> -bool HdfsParquetScanner::ScalarColumnReader<T, MATERIALIZED>::ReadNonRepeatedValueBatch( - MemPool* pool, int max_values, int tuple_size, - uint8_t* tuple_mem, int* num_values) { - int val_count = 0; - bool continue_execution = true; - while (val_count < max_values && !RowGroupAtEnd() && continue_execution) { - Tuple* tuple = reinterpret_cast<Tuple*>(tuple_mem + val_count * tuple_size); - continue_execution = ReadValue<false>(pool, tuple); - ++val_count; - } - *num_values = val_count; - return continue_execution; -} - template<> inline bool HdfsParquetScanner::ScalarColumnReader<StringValue, true>::NeedsConversion() const { return needs_conversion_; @@ -974,6 +1116,8 @@ Status HdfsParquetScanner::Prepare(ScannerContext* context) { scan_node_->IncNumScannersCodegenDisabled(); + level_cache_pool_.reset(new MemPool(scan_node_->mem_tracker())); + for (int i = 0; i < context->filter_ctxs().size(); ++i) { const FilterContext* ctx = &context->filter_ctxs()[i]; DCHECK(ctx->filter != NULL); @@ -1023,6 +1167,11 @@ void HdfsParquetScanner::Close() { assemble_rows_timer_.Stop(); assemble_rows_timer_.ReleaseCounter(); + if (level_cache_pool_.get() != NULL) { + level_cache_pool_->FreeAll(); + level_cache_pool_.reset(NULL); + } + for (int i = 0; i < filter_ctxs_.size(); ++i) { const FilterStats* stats = filter_ctxs_[i]->stats; const LocalFilterStats& local = filter_stats_[i]; @@ -1338,19 +1487,18 @@ Status HdfsParquetScanner::BaseScalarColumnReader::ReadDataPage() { FILE_CHECK_EQ(current_page_header_.compressed_page_size, uncompressed_size); } - if (max_rep_level() > 0) { - // Initialize the repetition level data - rep_levels_.Init(filename(), - current_page_header_.data_page_header.repetition_level_encoding, - max_rep_level(), num_buffered_values_, &data_, &data_size); - } + // Initialize the repetition level data + rep_levels_.Init(filename(), + current_page_header_.data_page_header.repetition_level_encoding, + parent_->level_cache_pool_.get(), parent_->state_->batch_size(), + max_rep_level(), num_buffered_values_, + &data_, &data_size); - if (max_def_level() > 0) { - // Initialize the definition level data - def_levels_.Init(filename(), - current_page_header_.data_page_header.definition_level_encoding, - max_def_level(), num_buffered_values_, &data_, &data_size); - } + // Initialize the definition level data + def_levels_.Init(filename(), + current_page_header_.data_page_header.definition_level_encoding, + parent_->level_cache_pool_.get(), parent_->state_->batch_size(), + max_def_level(), num_buffered_values_, &data_, &data_size); // Data can be empty if the column contains all NULLs if (data_size != 0) RETURN_IF_ERROR(InitDataPage(data_, data_size)); @@ -1361,9 +1509,17 @@ Status HdfsParquetScanner::BaseScalarColumnReader::ReadDataPage() { } Status HdfsParquetScanner::LevelDecoder::Init(const string& filename, - parquet::Encoding::type encoding, int max_level, - int num_buffered_values, uint8_t** data, int* data_size) { + parquet::Encoding::type encoding, MemPool* cache_pool, int cache_size, + int max_level, int num_buffered_values, uint8_t** data, int* data_size) { encoding_ = encoding; + max_level_ = max_level; + num_buffered_values_ = num_buffered_values; + filename_ = filename; + RETURN_IF_ERROR(InitCache(cache_pool, cache_size)); + + // Return because there is no level data to read, e.g., required field. + if (max_level == 0) return Status::OK(); + int32_t num_bytes = 0; switch (encoding) { case parquet::Encoding::RLE: { @@ -1394,7 +1550,25 @@ Status HdfsParquetScanner::LevelDecoder::Init(const string& filename, return Status::OK(); } -// TODO More codegen here as well. +Status HdfsParquetScanner::LevelDecoder::InitCache(MemPool* pool, int cache_size) { + num_cached_levels_ = 0; + cached_level_idx_ = 0; + // Memory has already been allocated. + if (cached_levels_ != NULL) { + DCHECK_EQ(cache_size_, cache_size); + return Status::OK(); + } + + cached_levels_ = reinterpret_cast<uint8_t*>(pool->TryAllocate(cache_size)); + if (cached_levels_ == NULL) { + return pool->mem_tracker()->MemLimitExceeded( + NULL, "Definition level cache", cache_size); + } + memset(cached_levels_, 0, cache_size); + cache_size_ = cache_size; + return Status::OK(); +} + inline int16_t HdfsParquetScanner::LevelDecoder::ReadLevel() { bool valid; uint8_t level; @@ -1404,31 +1578,60 @@ inline int16_t HdfsParquetScanner::LevelDecoder::ReadLevel() { DCHECK_EQ(encoding_, parquet::Encoding::BIT_PACKED); valid = bit_reader_.GetValue(1, &level); } - return LIKELY(valid) ? level : -1; + return LIKELY(valid) ? level : INVALID_LEVEL; } -// TODO(skye): try reading + caching many levels at once to avoid error checking etc on -// each call (here and RLE decoder) for perf -inline int16_t HdfsParquetScanner::BaseScalarColumnReader::ReadDefinitionLevel() { - DCHECK_GT(max_def_level(), 0); - int16_t def_level = def_levels_.ReadLevel(); - - if (UNLIKELY(def_level < 0 || def_level > max_def_level())) { - SetLevelError(TErrorCode::PARQUET_DEF_LEVEL_ERROR); - return -1; +Status HdfsParquetScanner::LevelDecoder::CacheNextBatch(int batch_size) { + DCHECK_LE(batch_size, cache_size_); + cached_level_idx_ = 0; + if (max_level_ > 0) { + if (UNLIKELY(!FillCache(batch_size, &num_cached_levels_))) { + return Status(decoding_error_code_, num_buffered_values_, filename_); + } + } else { + // No levels to read, e.g., because the field is required. The cache was + // already initialized with all zeros, so we can hand out those values. + DCHECK_EQ(max_level_, 0); + num_cached_levels_ = batch_size; } - return def_level; + return Status::OK(); } -inline int16_t HdfsParquetScanner::BaseScalarColumnReader::ReadRepetitionLevel() { - DCHECK_GT(max_rep_level(), 0); - int16_t rep_level = rep_levels_.ReadLevel(); +bool HdfsParquetScanner::LevelDecoder::FillCache(int batch_size, + int* num_cached_levels) { + DCHECK(num_cached_levels != NULL); + int num_values = 0; + if (encoding_ == parquet::Encoding::RLE) { + while (true) { + // Add RLE encoded values by repeating the current value this number of times. + uint32_t num_repeats_to_set = + min<uint32_t>(repeat_count_, batch_size - num_values); + memset(cached_levels_ + num_values, current_value_, num_repeats_to_set); + num_values += num_repeats_to_set; + repeat_count_ -= num_repeats_to_set; + + // Add remaining literal values, if any. + uint32_t num_literals_to_set = + min<uint32_t>(literal_count_, batch_size - num_values); + int num_values_end = min<uint32_t>(num_values + literal_count_, batch_size); + for (; num_values < num_values_end; ++num_values) { + bool valid = bit_reader_.GetValue(bit_width_, &cached_levels_[num_values]); + if (UNLIKELY(!valid || cached_levels_[num_values] > max_level_)) return false; + } + literal_count_ -= num_literals_to_set; - if (UNLIKELY(rep_level < 0 || rep_level > max_rep_level())) { - SetLevelError(TErrorCode::PARQUET_REP_LEVEL_ERROR); - return -1; + if (num_values == batch_size || !NextCounts<int16_t>()) break; + if (repeat_count_ > 0 && current_value_ > max_level_) return false; + } + } else { + DCHECK_EQ(encoding_, parquet::Encoding::BIT_PACKED); + for (; num_values < batch_size; ++num_values) { + bool valid = bit_reader_.GetValue(1, &cached_levels_[num_values]); + if (UNLIKELY(!valid || cached_levels_[num_values] > max_level_)) return false; + } } - return rep_level; + *num_cached_levels = num_values; + return true; } template <bool ADVANCE_REP_LEVEL> @@ -1441,11 +1644,11 @@ bool HdfsParquetScanner::BaseScalarColumnReader::NextLevels() { --num_buffered_values_; // Definition level is not present if column and any containing structs are required. - def_level_ = max_def_level() == 0 ? 0 : ReadDefinitionLevel(); + def_level_ = max_def_level() == 0 ? 0 : def_levels_.ReadLevel(); if (ADVANCE_REP_LEVEL && max_rep_level() > 0) { // Repetition level is only present if this column is nested in any collection type. - rep_level_ = ReadRepetitionLevel(); + rep_level_ = rep_levels_.ReadLevel(); // Reset position counter if we are at the start of a new parent collection. if (rep_level_ <= max_rep_level() - 1) pos_current_value_ = 0; } @@ -1458,9 +1661,9 @@ bool HdfsParquetScanner::BaseScalarColumnReader::NextPage() { parent_->parse_status_ = ReadDataPage(); if (UNLIKELY(!parent_->parse_status_.ok())) return false; if (num_buffered_values_ == 0) { - rep_level_ = -1; - def_level_ = -1; - pos_current_value_ = -1; + rep_level_ = ROW_GROUP_END; + def_level_ = INVALID_LEVEL; + pos_current_value_ = INVALID_POS; return false; } parent_->assemble_rows_timer_.Start(); @@ -1541,7 +1744,7 @@ void HdfsParquetScanner::CollectionColumnReader::UpdateDerivedState() { if (RowGroupAtEnd()) { // No more values - pos_current_value_ = -1; + pos_current_value_ = INVALID_POS; } else if (rep_level_ <= max_rep_level() - 2) { // Reset position counter if we are at the start of a new parent collection (i.e., // the current collection is the first item in a new parent collection). @@ -1669,14 +1872,23 @@ Status HdfsParquetScanner::ProcessSplit() { // Prepare column readers for first read bool continue_execution = true; for (ColumnReader* col_reader: column_readers_) { - continue_execution = col_reader->NextLevels(); + // Seed collection and boolean column readers with NextLevel(). + // The ScalarColumnReaders use an optimized ReadValueBatch() that + // should not be seeded. + // TODO: Refactor the column readers to look more like the optimized + // ScalarColumnReader::ReadValueBatch() which does not need seeding. This + // will allow better sharing of code between the row-wise and column-wise + // materialization strategies. + if (col_reader->NeedsSeedingForBatchedReading()) { + continue_execution = col_reader->NextLevels(); + } if (!continue_execution) break; DCHECK(parse_status_.ok()) << "Invalid parse_status_" << parse_status_.GetDetail(); } bool filters_pass = true; if (continue_execution) { - AssembleRows(column_readers_, i, &filters_pass); + continue_execution = AssembleRows(column_readers_, i, &filters_pass); assemble_rows_timer_.Stop(); } @@ -1698,7 +1910,7 @@ Status HdfsParquetScanner::ProcessSplit() { DCHECK(continue_execution || !state_->abort_on_error()); // We should be at the end of the row group if we get this far with no parse error - if (parse_status_.ok()) DCHECK_EQ(column_readers_[0]->rep_level(), -1); + if (parse_status_.ok()) DCHECK(column_readers_[0]->RowGroupAtEnd()); // Reset parse_status_ for the next row group. parse_status_ = Status::OK(); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/14cdb049/be/src/exec/hdfs-parquet-scanner.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-parquet-scanner.h b/be/src/exec/hdfs-parquet-scanner.h index 3935332..0700cbe 100644 --- a/be/src/exec/hdfs-parquet-scanner.h +++ b/be/src/exec/hdfs-parquet-scanner.h @@ -383,6 +383,13 @@ class HdfsParquetScanner : public HdfsScanner { /// need to issue another read. static const int64_t FOOTER_SIZE; + /// The repetition level is set to this value to indicate the end of a row group. + static const int16_t ROW_GROUP_END; + /// Indicates an invalid definition or repetition level. + static const int16_t INVALID_LEVEL; + /// Indicates an invalid position value. + static const int16_t INVALID_POS; + /// Class that implements Parquet definition and repetition level decoding. class LevelDecoder; @@ -425,6 +432,10 @@ class HdfsParquetScanner : public HdfsScanner { LocalFilterStats() : considered(0), rejected(0), total_possible(0), enabled(1) { } }; + /// Pool used for allocating caches of definition/repetition levels that are + /// populated by the level readers. The pool is freed in Close(). + boost::scoped_ptr<MemPool> level_cache_pool_; + /// Track statistics of each filter (one for each filter in filter_ctxs_) per scanner so /// that expensive aggregation up to the scan node can be performed once, during /// Close(). http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/14cdb049/be/src/util/rle-encoding.h ---------------------------------------------------------------------- diff --git a/be/src/util/rle-encoding.h b/be/src/util/rle-encoding.h index 73472f0..c6686e3 100644 --- a/be/src/util/rle-encoding.h +++ b/be/src/util/rle-encoding.h @@ -105,17 +105,17 @@ class RleDecoder { bool Get(T* val); protected: + /// Fills literal_count_ and repeat_count_ with next values. Returns false if there + /// are no more. + template<typename T> + bool NextCounts(); + BitReader bit_reader_; /// Number of bits needed to encode the value. Must be between 0 and 64. int bit_width_; uint64_t current_value_; uint32_t repeat_count_; uint32_t literal_count_; - private: - /// Fills literal_count_ and repeat_count_ with next values. Returns false if there - /// are no more. - template<typename T> - bool NextCounts(); }; /// Class to incrementally build the rle data. This class does not allocate any memory.
