http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6ee15fad/be/src/exec/hdfs-parquet-scanner.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-parquet-scanner.h b/be/src/exec/hdfs-parquet-scanner.h index 5a12602..3791ae1 100644 --- a/be/src/exec/hdfs-parquet-scanner.h +++ b/be/src/exec/hdfs-parquet-scanner.h @@ -18,13 +18,27 @@ #include "exec/hdfs-scanner.h" #include "exec/parquet-common.h" +#include "exec/parquet-scratch-tuple-batch.h" +#include "exec/parquet-metadata-utils.h" #include "util/runtime-profile-counters.h" namespace impala { class CollectionValueBuilder; struct HdfsFileDesc; -struct ScratchTupleBatch; + +/// Internal schema representation and resolution. +class SchemaNode; + +/// Class that implements Parquet definition and repetition level decoding. +class ParquetLevelDecoder; + +/// Per column reader. +class ParquetColumnReader; +class CollectionColumnReader; +class BaseScalarColumnReader; +template<typename T, bool MATERIALIZED> class ScalarColumnReader; +class BoolColumnReader; /// This scanner parses Parquet files located in HDFS, and writes the content as tuples in /// the Impala in-memory representation of data, e.g. (tuples, rows, row batches). @@ -305,7 +319,7 @@ class HdfsParquetScanner : public HdfsScanner { public: HdfsParquetScanner(HdfsScanNode* scan_node, RuntimeState* state); - virtual ~HdfsParquetScanner(); + virtual ~HdfsParquetScanner() {}; virtual Status Prepare(ScannerContext* context); virtual void Close(); virtual Status ProcessSplit(); @@ -315,100 +329,24 @@ class HdfsParquetScanner : public HdfsScanner { static Status IssueInitialRanges(HdfsScanNode* scan_node, const std::vector<HdfsFileDesc*>& files); - struct FileVersion { - /// Application that wrote the file. e.g. "IMPALA" - std::string application; - - /// Version of the application that wrote the file, expressed in three parts - /// (<major>.<minor>.<patch>). Unspecified parts default to 0, and extra parts are - /// ignored. e.g.: - /// "1.2.3" => {1, 2, 3} - /// "1.2" => {1, 2, 0} - /// "1.2-cdh5" => {1, 2, 0} - struct { - int major; - int minor; - int patch; - } version; - - /// If true, this file was generated by an Impala internal release - bool is_impala_internal; - - FileVersion() : is_impala_internal(false) { } - - /// Parses the version from the created_by string - FileVersion(const std::string& created_by); - - /// Returns true if version is strictly less than <major>.<minor>.<patch> - bool VersionLt(int major, int minor = 0, int patch = 0) const; - - /// Returns true if version is equal to <major>.<minor>.<patch> - bool VersionEq(int major, int minor, int patch) const; - }; - - private: - /// Internal representation of a column schema (including nested-type columns). - struct SchemaNode { - /// The corresponding schema element defined in the file metadata - const parquet::SchemaElement* element; - - /// The index into the RowGroup::columns list if this column is materialized in the - /// file (i.e. it's a scalar type). -1 for nested types. - int col_idx; - - /// The maximum definition level of this column, i.e., the definition level that - /// corresponds to a non-NULL value. Valid values are >= 0. - int max_def_level; - - /// The maximum repetition level of this column. Valid values are >= 0. - int max_rep_level; - - /// The definition level of the most immediate ancestor of this node with repeated - /// field repetition type. 0 if there are no repeated ancestors. - int def_level_of_immediate_repeated_ancestor; - - /// Any nested schema nodes. Empty for non-nested types. - std::vector<SchemaNode> children; - - SchemaNode() : element(NULL), col_idx(-1), max_def_level(-1), max_rep_level(-1), - def_level_of_immediate_repeated_ancestor(-1) { } - - std::string DebugString(int indent = 0) const; - - bool is_repeated() const { - return element->repetition_type == parquet::FieldRepetitionType::REPEATED; - } - }; - - /// Size of the file footer. This is a guess. If this value is too little, we will - /// need to issue another read. - static const int64_t FOOTER_SIZE; - /// The repetition level is set to this value to indicate the end of a row group. - static const int16_t ROW_GROUP_END; + static const int16_t ROW_GROUP_END = numeric_limits<int16_t>::min(); /// Indicates an invalid definition or repetition level. - static const int16_t INVALID_LEVEL; + static const int16_t INVALID_LEVEL = -1; /// Indicates an invalid position value. - static const int16_t INVALID_POS; - - /// Class that implements Parquet definition and repetition level decoding. - class LevelDecoder; + static const int16_t INVALID_POS = -1; - /// Per column reader. - class ColumnReader; - friend class ColumnReader; - - class CollectionColumnReader; + private: + friend class ParquetColumnReader; friend class CollectionColumnReader; - - class BaseScalarColumnReader; friend class BaseScalarColumnReader; - - template<typename T, bool MATERIALIZED> class ScalarColumnReader; template<typename T, bool MATERIALIZED> friend class ScalarColumnReader; - class BoolColumnReader; friend class BoolColumnReader; + /// Size of the file footer. This is a guess. If this value is too little, we will + /// need to issue another read. + static const int64_t FOOTER_SIZE = 1024 * 100; + /// Cached runtime filter contexts, one for each filter that applies to this column. vector<const FilterContext*> filter_ctxs_; @@ -443,7 +381,7 @@ class HdfsParquetScanner : public HdfsScanner { vector<LocalFilterStats> filter_stats_; /// Column reader for each materialized columns for this file. - std::vector<ColumnReader*> column_readers_; + std::vector<ParquetColumnReader*> column_readers_; /// Column readers will write slot values into this scratch batch for /// top-level tuples. See AssembleRows(). @@ -453,10 +391,7 @@ class HdfsParquetScanner : public HdfsScanner { parquet::FileMetaData file_metadata_; /// Version of the application that wrote this file. - FileVersion file_version_; - - /// The root schema node for this file. - SchemaNode schema_; + ParquetFileVersion file_version_; /// Scan range for the metadata. const DiskIoMgr::ScanRange* metadata_range_; @@ -493,7 +428,7 @@ class HdfsParquetScanner : public HdfsScanner { /// If 'filters_pass' is set to false by this method, the partition columns associated /// with this row group did not pass all the runtime filters (and therefore only filter /// contexts that apply only to partition columns are checked). - bool AssembleRows(const std::vector<ColumnReader*>& column_readers, + bool AssembleRows(const std::vector<ParquetColumnReader*>& column_readers, int row_group_idx, bool* filters_pass); /// Evaluates runtime filters and conjuncts (if any) against the tuples in @@ -522,7 +457,7 @@ class HdfsParquetScanner : public HdfsScanner { /// - scan node limit was reached /// When false is returned the column_readers are left in an undefined state and /// execution should be aborted immediately by the caller. - bool AssembleCollection(const std::vector<ColumnReader*>& column_readers, + bool AssembleCollection(const std::vector<ParquetColumnReader*>& column_readers, int new_collection_rep_level, CollectionValueBuilder* coll_value_builder); /// Function used by AssembleCollection() to materialize a single collection item @@ -530,17 +465,13 @@ class HdfsParquetScanner : public HdfsScanner { /// otherwise returns true. /// If 'materialize_tuple' is false, only advances the column readers' levels, /// and does not read any data values. - inline bool ReadCollectionItem(const std::vector<ColumnReader*>& column_readers, + inline bool ReadCollectionItem(const std::vector<ParquetColumnReader*>& column_readers, bool materialize_tuple, MemPool* pool, Tuple* tuple) const; /// Find and return the last split in the file if it is assigned to this scan node. /// Returns NULL otherwise. static DiskIoMgr::ScanRange* FindFooterSplit(HdfsFileDesc* file); - /// Validate column offsets by checking if the dictionary page comes before the data - /// pages and checking if the column offsets lie within the file. - Status ValidateColumnOffsets(const parquet::RowGroup& row_group); - /// Process the file footer and parse file_metadata_. This should be called with the /// last FOOTER_SIZE bytes in context_. /// *eosr is a return value. If true, the scan range is complete (e.g. select count(*)) @@ -551,23 +482,12 @@ class HdfsParquetScanner : public HdfsScanner { /// well. Fills in the appropriate template tuple slot with NULL for any materialized /// fields missing in the file. Status CreateColumnReaders(const TupleDescriptor& tuple_desc, - std::vector<ColumnReader*>* column_readers); + const ParquetSchemaResolver& schema_resolver, + std::vector<ParquetColumnReader*>* column_readers); /// Returns the total number of scalar column readers in 'column_readers', including /// the children of collection readers. - int CountScalarColumns(const std::vector<ColumnReader*>& column_readers); - - /// Creates a column reader for 'node'. slot_desc may be NULL, in which case the - /// returned column reader can only be used to read def/rep levels. - /// 'is_collection_field' should be set to true if the returned reader is reading a - /// collection. This cannot be determined purely by 'node' because a repeated scalar - /// node represents both an array and the array's items (in this case - /// 'is_collection_field' should be true if the reader reads one value per array, and - /// false if it reads one value per item). The reader is added to the runtime state's - /// object pool. Does not create child readers for collection readers; these must be - /// added by the caller. - ColumnReader* CreateReader(const SchemaNode& node, bool is_collection_field, - const SlotDescriptor* slot_desc); + int CountScalarColumns(const std::vector<ParquetColumnReader*>& column_readers); /// Creates a column reader that reads one value for each item in the table or /// collection element corresponding to 'parent_path'. 'parent_path' should point to @@ -578,89 +498,24 @@ class HdfsParquetScanner : public HdfsScanner { /// This is used for counting item values, rather than materializing any values. For /// example, in a count(*) over a collection, there are no values to materialize, but we /// still need to iterate over every item in the collection to count them. - Status CreateCountingReader( - const SchemaPath& parent_path, ColumnReader** reader); + Status CreateCountingReader(const SchemaPath& parent_path, + const ParquetSchemaResolver& schema_resolver, + ParquetColumnReader** reader); /// Walks file_metadata_ and initiates reading the materialized columns. This /// initializes 'column_readers' and issues the reads for the columns. 'column_readers' /// should be the readers used to materialize a single tuple (i.e., column_readers_ or /// the children of a collection node). Status InitColumns( - int row_group_idx, const std::vector<ColumnReader*>& column_readers); - - /// Validates the file metadata - Status ValidateFileMetadata(); - - /// Validates the column metadata to make sure this column is supported (e.g. encoding, - /// type, etc) and matches the type of col_reader's slot desc. - Status ValidateColumn(const BaseScalarColumnReader& col_reader, int row_group_idx); + int row_group_idx, const std::vector<ParquetColumnReader*>& column_readers); /// Performs some validation once we've reached the end of a row group to help detect /// bugs or bad input files. - Status ValidateEndOfRowGroup(const std::vector<ColumnReader*>& column_readers, + Status ValidateEndOfRowGroup(const std::vector<ParquetColumnReader*>& column_readers, int row_group_idx, int64_t rows_read); /// Part of the HdfsScanner interface, not used in Parquet. Status InitNewRange() { return Status::OK(); }; - - /// Unflattens the schema metadata from a Parquet file metadata and converts it to our - /// SchemaNode representation. Returns the result in 'n' unless an error status is - /// returned. Does not set the slot_desc field of any SchemaNode. - Status CreateSchemaTree(const std::vector<parquet::SchemaElement>& schema, - SchemaNode* node) const; - - /// Recursive implementation used internally by the above CreateSchemaTree() function. - Status CreateSchemaTree(const std::vector<parquet::SchemaElement>& schema, - int max_def_level, int max_rep_level, int ira_def_level, int* idx, int* col_idx, - SchemaNode* node) const; - - /// Traverses 'schema_' according to 'path', returning the result in 'node'. If 'path' - /// does not exist in this file's schema, 'missing_field' is set to true and - /// Status::OK() is returned, otherwise 'missing_field' is set to false. If 'path' - /// resolves to a collecton position field, *pos_field is set to true. Otherwise - /// 'pos_field' is set to false. Returns a non-OK status if 'path' cannot be resolved - /// against the file's schema (e.g., unrecognized collection schema). - /// - /// Tries to resolve assuming either two- or three-level array encoding in - /// 'schema_'. Returns a bad status if resolution fails in both cases. - Status ResolvePath(const SchemaPath& path, SchemaNode** node, bool* pos_field, - bool* missing_field); - - /// The 'array_encoding' parameter determines whether to assume one-, two-, or - /// three-level array encoding. The returned status is not logged (i.e. it's an expected - /// error). - enum ArrayEncoding { - ONE_LEVEL, - TWO_LEVEL, - THREE_LEVEL - }; - Status ResolvePathHelper(ArrayEncoding array_encoding, const SchemaPath& path, - SchemaNode** node, bool* pos_field, bool* missing_field); - - /// Helper functions for ResolvePathHelper(). - - /// Advances 'node' to one of its children based on path[next_idx] and - /// 'col_type'. 'col_type' is NULL if 'node' is the root node, otherwise it's the type - /// associated with 'node'. Returns the child node or sets 'missing_field' to true. - SchemaNode* NextSchemaNode(const ColumnType* col_type, const SchemaPath& path, - int next_idx, SchemaNode* node, bool* missing_field); - - /// Returns the index of 'node's child with 'name', or the number of children if not - /// found. - int FindChildWithName(SchemaNode* node, const string& name); - - /// The ResolvePathHelper() logic for arrays. - Status ResolveArray(ArrayEncoding array_encoding, const SchemaPath& path, int idx, - SchemaNode** node, bool* pos_field, bool* missing_field); - - /// The ResolvePathHelper() logic for maps. - Status ResolveMap(const SchemaPath& path, int idx, SchemaNode** node, - bool* missing_field); - - /// The ResolvePathHelper() logic for scalars (just does validation since there's no - /// more actual work to be done). - Status ValidateScalarNode(const SchemaNode& node, const ColumnType& col_type, - const SchemaPath& path, int idx); }; } // namespace impala
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6ee15fad/be/src/exec/hdfs-rcfile-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-rcfile-scanner.cc b/be/src/exec/hdfs-rcfile-scanner.cc index d85ad49..3914845 100644 --- a/be/src/exec/hdfs-rcfile-scanner.cc +++ b/be/src/exec/hdfs-rcfile-scanner.cc @@ -525,7 +525,7 @@ Status HdfsRCFileScanner::ProcessRange() { if (error_in_row) { error_in_row = false; ErrorMsg msg(TErrorCode::GENERAL, Substitute("file: $0", stream_->filename())); - RETURN_IF_ERROR(LogOrReturnError(msg)); + RETURN_IF_ERROR(state_->LogOrReturnError(msg)); } current_row->SetTuple(scan_node_->tuple_idx(), tuple); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6ee15fad/be/src/exec/hdfs-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scanner.cc b/be/src/exec/hdfs-scanner.cc index 5abd346..4f63a73 100644 --- a/be/src/exec/hdfs-scanner.cc +++ b/be/src/exec/hdfs-scanner.cc @@ -671,23 +671,3 @@ void HdfsScanner::ReportColumnParseError(const SlotDescriptor* desc, } } -Status HdfsScanner::LogOrReturnError(const ErrorMsg& message) const { - DCHECK_NE(message.error(), TErrorCode::OK); - // If either abort_on_error=true or the error necessitates execution stops - // immediately, return an error status. - if (state_->abort_on_error() || - message.error() == TErrorCode::MEM_LIMIT_EXCEEDED || - message.error() == TErrorCode::CANCELLED) { - return Status(message); - } - // Otherwise, add the error to the error log and continue. - state_->LogError(message); - return Status::OK(); -} - -string HdfsScanner::PrintPath(const SchemaPath& path, int subpath_idx) const { - SchemaPath::const_iterator subpath_end = - subpath_idx == -1 ? path.end() : path.begin() + subpath_idx + 1; - SchemaPath subpath(path.begin(), subpath_end); - return impala::PrintPath(*scan_node_->hdfs_table(), subpath); -} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6ee15fad/be/src/exec/hdfs-scanner.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scanner.h b/be/src/exec/hdfs-scanner.h index 9069451..7a723b8 100644 --- a/be/src/exec/hdfs-scanner.h +++ b/be/src/exec/hdfs-scanner.h @@ -408,17 +408,6 @@ class HdfsScanner { return reinterpret_cast<TupleRow*>(mem + batch_->row_byte_size()); } - /// Given an error message, determine whether execution should be aborted and, if so, - /// return the corresponding error status. Otherwise, log the error and return - /// Status::OK(). Execution is aborted if the ABORT_ON_ERROR query option is set to - /// true or the error is not recoverable and should be handled upstream. - Status LogOrReturnError(const ErrorMsg& message) const; - - // Convenience function for calling the PrintPath() function in - // debug-util. 'subpath_idx' can be specified in order to truncate the output to end on - // the i-th element of 'path' (inclusive). - string PrintPath(const SchemaPath& path, int subpath_idx = -1) const; - /// Simple wrapper around scanner_conjunct_ctxs_. Used in the codegen'd version of /// WriteCompleteTuple() because it's easier than writing IR to access /// scanner_conjunct_ctxs_. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6ee15fad/be/src/exec/hdfs-text-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-text-scanner.cc b/be/src/exec/hdfs-text-scanner.cc index dcd3081..6d45880 100644 --- a/be/src/exec/hdfs-text-scanner.cc +++ b/be/src/exec/hdfs-text-scanner.cc @@ -286,7 +286,8 @@ Status HdfsTextScanner::FinishScanRange() { stringstream ss; ss << "Read failed while trying to finish scan range: " << stream_->filename() << ":" << stream_->file_offset() << endl << status.GetDetail(); - RETURN_IF_ERROR(LogOrReturnError(ErrorMsg(TErrorCode::GENERAL, ss.str()))); + RETURN_IF_ERROR(state_->LogOrReturnError( + ErrorMsg(TErrorCode::GENERAL, ss.str()))); } else if (!partial_tuple_empty_ || !boundary_column_.IsEmpty() || !boundary_row_.IsEmpty() || (delimited_text_parser_->HasUnfinishedTuple() && http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6ee15fad/be/src/exec/parquet-column-readers.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/parquet-column-readers.cc b/be/src/exec/parquet-column-readers.cc new file mode 100644 index 0000000..280c206 --- /dev/null +++ b/be/src/exec/parquet-column-readers.cc @@ -0,0 +1,1093 @@ +// Copyright 2016 Cloudera Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "parquet-column-readers.h" + +#include <boost/scoped_ptr.hpp> +#include <string> +#include <sstream> +#include <gflags/gflags.h> +#include <gutil/strings/substitute.h> + +#include "exec/hdfs-parquet-scanner.h" +#include "exec/parquet-metadata-utils.h" +#include "exec/parquet-scratch-tuple-batch.h" +#include "exec/read-write-util.h" +#include "gutil/bits.h" +#include "rpc/thrift-util.h" +#include "runtime/collection-value-builder.h" +#include "runtime/tuple-row.h" +#include "runtime/tuple.h" +#include "runtime/runtime-state.h" +#include "runtime/mem-pool.h" +#include "util/codec.h" +#include "util/debug-util.h" +#include "util/dict-encoding.h" +#include "util/rle-encoding.h" + +#include "common/names.h" + +using strings::Substitute; + +// Provide a workaround for IMPALA-1658. +DEFINE_bool(convert_legacy_hive_parquet_utc_timestamps, false, + "When true, TIMESTAMPs read from files written by Parquet-MR (used by Hive) will " + "be converted from UTC to local time. Writes are unaffected."); + +// Max data page header size in bytes. This is an estimate and only needs to be an upper +// bound. It is theoretically possible to have a page header of any size due to string +// value statistics, but in practice we'll have trouble reading string values this large. +// Also, this limit is in place to prevent impala from reading corrupt parquet files. +DEFINE_int32(max_page_header_size, 8*1024*1024, "max parquet page header size in bytes"); + +namespace impala { + +const string PARQUET_MEM_LIMIT_EXCEEDED = "HdfsParquetScanner::$0() failed to allocate " + "$1 bytes for $2."; + +Status ParquetLevelDecoder::Init(const string& filename, + parquet::Encoding::type encoding, MemPool* cache_pool, int cache_size, + int max_level, int num_buffered_values, uint8_t** data, int* data_size) { + encoding_ = encoding; + max_level_ = max_level; + num_buffered_values_ = num_buffered_values; + filename_ = filename; + RETURN_IF_ERROR(InitCache(cache_pool, cache_size)); + + // Return because there is no level data to read, e.g., required field. + if (max_level == 0) return Status::OK(); + + int32_t num_bytes = 0; + switch (encoding) { + case parquet::Encoding::RLE: { + Status status; + if (!ReadWriteUtil::Read(data, data_size, &num_bytes, &status)) { + return status; + } + if (num_bytes < 0) { + return Status(TErrorCode::PARQUET_CORRUPT_RLE_BYTES, filename, num_bytes); + } + int bit_width = Bits::Log2Ceiling64(max_level + 1); + Reset(*data, num_bytes, bit_width); + break; + } + case parquet::Encoding::BIT_PACKED: + num_bytes = BitUtil::Ceil(num_buffered_values, 8); + bit_reader_.Reset(*data, num_bytes); + break; + default: { + stringstream ss; + ss << "Unsupported encoding: " << encoding; + return Status(ss.str()); + } + } + DCHECK_GT(num_bytes, 0); + *data += num_bytes; + *data_size -= num_bytes; + return Status::OK(); +} + +Status ParquetLevelDecoder::InitCache(MemPool* pool, int cache_size) { + num_cached_levels_ = 0; + cached_level_idx_ = 0; + // Memory has already been allocated. + if (cached_levels_ != NULL) { + DCHECK_EQ(cache_size_, cache_size); + return Status::OK(); + } + + cached_levels_ = reinterpret_cast<uint8_t*>(pool->TryAllocate(cache_size)); + if (cached_levels_ == NULL) { + return pool->mem_tracker()->MemLimitExceeded( + NULL, "Definition level cache", cache_size); + } + memset(cached_levels_, 0, cache_size); + cache_size_ = cache_size; + return Status::OK(); +} + +inline int16_t ParquetLevelDecoder::ReadLevel() { + bool valid; + uint8_t level; + if (encoding_ == parquet::Encoding::RLE) { + valid = Get(&level); + } else { + DCHECK_EQ(encoding_, parquet::Encoding::BIT_PACKED); + valid = bit_reader_.GetValue(1, &level); + } + return LIKELY(valid) ? level : HdfsParquetScanner::INVALID_LEVEL; +} + +Status ParquetLevelDecoder::CacheNextBatch(int batch_size) { + DCHECK_LE(batch_size, cache_size_); + cached_level_idx_ = 0; + if (max_level_ > 0) { + if (UNLIKELY(!FillCache(batch_size, &num_cached_levels_))) { + return Status(decoding_error_code_, num_buffered_values_, filename_); + } + } else { + // No levels to read, e.g., because the field is required. The cache was + // already initialized with all zeros, so we can hand out those values. + DCHECK_EQ(max_level_, 0); + num_cached_levels_ = batch_size; + } + return Status::OK(); +} + +bool ParquetLevelDecoder::FillCache(int batch_size, + int* num_cached_levels) { + DCHECK(num_cached_levels != NULL); + int num_values = 0; + if (encoding_ == parquet::Encoding::RLE) { + while (true) { + // Add RLE encoded values by repeating the current value this number of times. + uint32_t num_repeats_to_set = + min<uint32_t>(repeat_count_, batch_size - num_values); + memset(cached_levels_ + num_values, current_value_, num_repeats_to_set); + num_values += num_repeats_to_set; + repeat_count_ -= num_repeats_to_set; + + // Add remaining literal values, if any. + uint32_t num_literals_to_set = + min<uint32_t>(literal_count_, batch_size - num_values); + int num_values_end = min<uint32_t>(num_values + literal_count_, batch_size); + for (; num_values < num_values_end; ++num_values) { + bool valid = bit_reader_.GetValue(bit_width_, &cached_levels_[num_values]); + if (UNLIKELY(!valid || cached_levels_[num_values] > max_level_)) return false; + } + literal_count_ -= num_literals_to_set; + + if (num_values == batch_size) break; + if (UNLIKELY(!NextCounts<int16_t>())) return false; + if (repeat_count_ > 0 && current_value_ > max_level_) return false; + } + } else { + DCHECK_EQ(encoding_, parquet::Encoding::BIT_PACKED); + for (; num_values < batch_size; ++num_values) { + bool valid = bit_reader_.GetValue(1, &cached_levels_[num_values]); + if (UNLIKELY(!valid || cached_levels_[num_values] > max_level_)) return false; + } + } + *num_cached_levels = num_values; + return true; +} + +/// Per column type reader. If MATERIALIZED is true, the column values are materialized +/// into the slot described by slot_desc. If MATERIALIZED is false, the column values +/// are not materialized, but the position can be accessed. +template<typename T, bool MATERIALIZED> +class ScalarColumnReader : public BaseScalarColumnReader { + public: + ScalarColumnReader(HdfsParquetScanner* parent, const SchemaNode& node, + const SlotDescriptor* slot_desc) + : BaseScalarColumnReader(parent, node, slot_desc), + dict_decoder_init_(false) { + if (!MATERIALIZED) { + // We're not materializing any values, just counting them. No need (or ability) to + // initialize state used to materialize values. + DCHECK(slot_desc_ == NULL); + return; + } + + DCHECK(slot_desc_ != NULL); + DCHECK_NE(slot_desc_->type().type, TYPE_BOOLEAN); + if (slot_desc_->type().type == TYPE_DECIMAL) { + fixed_len_size_ = ParquetPlainEncoder::DecimalSize(slot_desc_->type()); + } else if (slot_desc_->type().type == TYPE_VARCHAR) { + fixed_len_size_ = slot_desc_->type().len; + } else { + fixed_len_size_ = -1; + } + needs_conversion_ = slot_desc_->type().type == TYPE_CHAR || + // TODO: Add logic to detect file versions that have unconverted TIMESTAMP + // values. Currently all versions have converted values. + (FLAGS_convert_legacy_hive_parquet_utc_timestamps && + slot_desc_->type().type == TYPE_TIMESTAMP && + parent->file_version_.application == "parquet-mr"); + } + + virtual ~ScalarColumnReader() { } + + virtual bool ReadValue(MemPool* pool, Tuple* tuple) { + return ReadValue<true>(pool, tuple); + } + + virtual bool ReadNonRepeatedValue(MemPool* pool, Tuple* tuple) { + return ReadValue<false>(pool, tuple); + } + + virtual bool NeedsSeedingForBatchedReading() const { return false; } + + virtual bool ReadValueBatch(MemPool* pool, int max_values, int tuple_size, + uint8_t* tuple_mem, int* num_values) { + return ReadValueBatch<true>(pool, max_values, tuple_size, tuple_mem, num_values); + } + + virtual bool ReadNonRepeatedValueBatch(MemPool* pool, int max_values, int tuple_size, + uint8_t* tuple_mem, int* num_values) { + return ReadValueBatch<false>(pool, max_values, tuple_size, tuple_mem, num_values); + } + + protected: + template <bool IN_COLLECTION> + inline bool ReadValue(MemPool* pool, Tuple* tuple) { + // NextLevels() should have already been called and def and rep levels should be in + // valid range. + DCHECK_GE(rep_level_, 0); + DCHECK_LE(rep_level_, max_rep_level()); + DCHECK_GE(def_level_, 0); + DCHECK_LE(def_level_, max_def_level()); + DCHECK_GE(def_level_, def_level_of_immediate_repeated_ancestor()) << + "Caller should have called NextLevels() until we are ready to read a value"; + + if (MATERIALIZED) { + if (def_level_ >= max_def_level()) { + if (page_encoding_ == parquet::Encoding::PLAIN_DICTIONARY) { + if (!ReadSlot<true>(tuple->GetSlot(tuple_offset_), pool)) return false; + } else { + if (!ReadSlot<false>(tuple->GetSlot(tuple_offset_), pool)) return false; + } + } else { + tuple->SetNull(null_indicator_offset_); + } + } + return NextLevels<IN_COLLECTION>(); + } + + /// Implementation of the ReadValueBatch() functions specialized for this + /// column reader type. This function drives the reading of data pages and + /// caching of rep/def levels. Once a data page and cached levels are available, + /// it calls into a more specialized MaterializeValueBatch() for doing the actual + /// value materialization using the level caches. + template<bool IN_COLLECTION> + bool ReadValueBatch(MemPool* pool, int max_values, int tuple_size, + uint8_t* tuple_mem, int* num_values) { + // Repetition level is only present if this column is nested in a collection type. + if (!IN_COLLECTION) DCHECK_EQ(max_rep_level(), 0) << slot_desc()->DebugString(); + if (IN_COLLECTION) DCHECK_GT(max_rep_level(), 0) << slot_desc()->DebugString(); + + int val_count = 0; + bool continue_execution = true; + while (val_count < max_values && !RowGroupAtEnd() && continue_execution) { + // Read next page if necessary. + if (num_buffered_values_ == 0) { + if (!NextPage()) { + continue_execution = parent_->parse_status_.ok(); + continue; + } + } + + // Fill def/rep level caches if they are empty. + int level_batch_size = min(parent_->state_->batch_size(), num_buffered_values_); + if (!def_levels_.CacheHasNext()) { + parent_->parse_status_.MergeStatus(def_levels_.CacheNextBatch(level_batch_size)); + } + // We only need the repetition levels for populating the position slot since we + // are only populating top-level tuples. + if (IN_COLLECTION && pos_slot_desc_ != NULL && !rep_levels_.CacheHasNext()) { + parent_->parse_status_.MergeStatus(rep_levels_.CacheNextBatch(level_batch_size)); + } + if (UNLIKELY(!parent_->parse_status_.ok())) return false; + + // This special case is most efficiently handled here directly. + if (!MATERIALIZED && !IN_COLLECTION) { + int vals_to_add = min(def_levels_.CacheRemaining(), max_values - val_count); + val_count += vals_to_add; + def_levels_.CacheSkipLevels(vals_to_add); + num_buffered_values_ -= vals_to_add; + continue; + } + + // Read data page and cached levels to materialize values. + int cache_start_idx = def_levels_.CacheCurrIdx(); + uint8_t* next_tuple = tuple_mem + val_count * tuple_size; + int remaining_val_capacity = max_values - val_count; + int ret_val_count = 0; + if (page_encoding_ == parquet::Encoding::PLAIN_DICTIONARY) { + continue_execution = MaterializeValueBatch<IN_COLLECTION, true>( + pool, remaining_val_capacity, tuple_size, next_tuple, &ret_val_count); + } else { + continue_execution = MaterializeValueBatch<IN_COLLECTION, false>( + pool, remaining_val_capacity, tuple_size, next_tuple, &ret_val_count); + } + val_count += ret_val_count; + num_buffered_values_ -= (def_levels_.CacheCurrIdx() - cache_start_idx); + } + *num_values = val_count; + return continue_execution; + } + + /// Helper function for ReadValueBatch() above that performs value materialization. + /// It assumes a data page with remaining values is available, and that the def/rep + /// level caches have been populated. + /// For efficiency, the simple special case of !MATERIALIZED && !IN_COLLECTION is not + /// handled in this function. + template<bool IN_COLLECTION, bool IS_DICT_ENCODED> + bool MaterializeValueBatch(MemPool* pool, int max_values, int tuple_size, + uint8_t* tuple_mem, int* num_values) { + DCHECK(MATERIALIZED || IN_COLLECTION); + DCHECK_GT(num_buffered_values_, 0); + DCHECK(def_levels_.CacheHasNext()); + if (IN_COLLECTION && pos_slot_desc_ != NULL) DCHECK(rep_levels_.CacheHasNext()); + + uint8_t* curr_tuple = tuple_mem; + int val_count = 0; + while (def_levels_.CacheHasNext()) { + Tuple* tuple = reinterpret_cast<Tuple*>(curr_tuple); + int def_level = def_levels_.CacheGetNext(); + + if (IN_COLLECTION) { + if (def_level < def_level_of_immediate_repeated_ancestor()) { + // A containing repeated field is empty or NULL. Skip the value but + // move to the next repetition level if necessary. + if (pos_slot_desc_ != NULL) rep_levels_.CacheGetNext(); + continue; + } + if (pos_slot_desc_ != NULL) { + int rep_level = rep_levels_.CacheGetNext(); + // Reset position counter if we are at the start of a new parent collection. + if (rep_level <= max_rep_level() - 1) pos_current_value_ = 0; + void* pos_slot = tuple->GetSlot(pos_slot_desc()->tuple_offset()); + *reinterpret_cast<int64_t*>(pos_slot) = pos_current_value_++; + } + } + + if (MATERIALIZED) { + if (def_level >= max_def_level()) { + bool continue_execution = + ReadSlot<IS_DICT_ENCODED>(tuple->GetSlot(tuple_offset_), pool); + if (UNLIKELY(!continue_execution)) return false; + } else { + tuple->SetNull(null_indicator_offset_); + } + } + + curr_tuple += tuple_size; + ++val_count; + if (UNLIKELY(val_count == max_values)) break; + } + *num_values = val_count; + return true; + } + + virtual Status CreateDictionaryDecoder(uint8_t* values, int size, + DictDecoderBase** decoder) { + if (!dict_decoder_.Reset(values, size, fixed_len_size_)) { + return Status(TErrorCode::PARQUET_CORRUPT_DICTIONARY, filename(), + slot_desc_->type().DebugString(), "could not decode dictionary"); + } + dict_decoder_init_ = true; + *decoder = &dict_decoder_; + return Status::OK(); + } + + virtual bool HasDictionaryDecoder() { + return dict_decoder_init_; + } + + virtual void ClearDictionaryDecoder() { + dict_decoder_init_ = false; + } + + virtual Status InitDataPage(uint8_t* data, int size) { + page_encoding_ = current_page_header_.data_page_header.encoding; + if (page_encoding_ != parquet::Encoding::PLAIN_DICTIONARY && + page_encoding_ != parquet::Encoding::PLAIN) { + stringstream ss; + ss << "File '" << filename() << "' is corrupt: unexpected encoding: " + << PrintEncoding(page_encoding_) << " for data page of column '" + << schema_element().name << "'."; + return Status(ss.str()); + } + + // If slot_desc_ is NULL, dict_decoder_ is uninitialized + if (page_encoding_ == parquet::Encoding::PLAIN_DICTIONARY && slot_desc_ != NULL) { + if (!dict_decoder_init_) { + return Status("File corrupt. Missing dictionary page."); + } + dict_decoder_.SetData(data, size); + } + + // TODO: Perform filter selectivity checks here. + return Status::OK(); + } + + private: + /// Writes the next value into *slot using pool if necessary. + /// + /// Returns false if execution should be aborted for some reason, e.g. parse_error_ is + /// set, the query is cancelled, or the scan node limit was reached. Otherwise returns + /// true. + template<bool IS_DICT_ENCODED> + inline bool ReadSlot(void* slot, MemPool* pool) { + T val; + T* val_ptr = NeedsConversion() ? &val : reinterpret_cast<T*>(slot); + if (IS_DICT_ENCODED) { + DCHECK_EQ(page_encoding_, parquet::Encoding::PLAIN_DICTIONARY); + if (UNLIKELY(!dict_decoder_.GetValue(val_ptr))) { + SetDictDecodeError(); + return false; + } + } else { + DCHECK_EQ(page_encoding_, parquet::Encoding::PLAIN); + int encoded_len = + ParquetPlainEncoder::Decode<T>(data_, data_end_, fixed_len_size_, val_ptr); + if (UNLIKELY(encoded_len < 0)) { + SetPlainDecodeError(); + return false; + } + data_ += encoded_len; + } + if (UNLIKELY(NeedsConversion() && + !ConvertSlot(&val, reinterpret_cast<T*>(slot), pool))) { + return false; + } + return true; + } + + /// Most column readers never require conversion, so we can avoid branches by + /// returning constant false. Column readers for types that require conversion + /// must specialize this function. + inline bool NeedsConversion() const { + DCHECK(!needs_conversion_); + return false; + } + + /// Converts and writes src into dst based on desc_->type() + bool ConvertSlot(const T* src, T* dst, MemPool* pool) { + DCHECK(false); + return false; + } + + /// Pull out slow-path Status construction code from ReadRepetitionLevel()/ + /// ReadDefinitionLevel() for performance. + void __attribute__((noinline)) SetDictDecodeError() { + parent_->parse_status_ = Status(TErrorCode::PARQUET_DICT_DECODE_FAILURE, filename(), + slot_desc_->type().DebugString(), stream_->file_offset()); + } + void __attribute__((noinline)) SetPlainDecodeError() { + parent_->parse_status_ = Status(TErrorCode::PARQUET_CORRUPT_PLAIN_VALUE, filename(), + slot_desc_->type().DebugString(), stream_->file_offset()); + } + + /// Dictionary decoder for decoding column values. + DictDecoder<T> dict_decoder_; + + /// True if dict_decoder_ has been initialized with a dictionary page. + bool dict_decoder_init_; + + /// true if decoded values must be converted before being written to an output tuple. + bool needs_conversion_; + + /// The size of this column with plain encoding for FIXED_LEN_BYTE_ARRAY, or + /// the max length for VARCHAR columns. Unused otherwise. + int fixed_len_size_; +}; + +template<> +inline bool ScalarColumnReader<StringValue, true>::NeedsConversion() const { + return needs_conversion_; +} + +template<> +bool ScalarColumnReader<StringValue, true>::ConvertSlot( + const StringValue* src, StringValue* dst, MemPool* pool) { + DCHECK(slot_desc() != NULL); + DCHECK(slot_desc()->type().type == TYPE_CHAR); + int len = slot_desc()->type().len; + StringValue sv; + sv.len = len; + if (slot_desc()->type().IsVarLenStringType()) { + sv.ptr = reinterpret_cast<char*>(pool->TryAllocate(len)); + if (UNLIKELY(sv.ptr == NULL)) { + string details = Substitute(PARQUET_MEM_LIMIT_EXCEEDED, "ConvertSlot", + len, "StringValue"); + parent_->parse_status_ = + pool->mem_tracker()->MemLimitExceeded(parent_->state_, details, len); + return false; + } + } else { + sv.ptr = reinterpret_cast<char*>(dst); + } + int unpadded_len = min(len, src->len); + memcpy(sv.ptr, src->ptr, unpadded_len); + StringValue::PadWithSpaces(sv.ptr, len, unpadded_len); + + if (slot_desc()->type().IsVarLenStringType()) *dst = sv; + return true; +} + +template<> +inline bool ScalarColumnReader<TimestampValue, true>::NeedsConversion() const { + return needs_conversion_; +} + +template<> +bool ScalarColumnReader<TimestampValue, true>::ConvertSlot( + const TimestampValue* src, TimestampValue* dst, MemPool* pool) { + // Conversion should only happen when this flag is enabled. + DCHECK(FLAGS_convert_legacy_hive_parquet_utc_timestamps); + *dst = *src; + if (dst->HasDateAndTime()) dst->UtcToLocal(); + return true; +} + +class BoolColumnReader : public BaseScalarColumnReader { + public: + BoolColumnReader(HdfsParquetScanner* parent, const SchemaNode& node, + const SlotDescriptor* slot_desc) + : BaseScalarColumnReader(parent, node, slot_desc) { + if (slot_desc_ != NULL) DCHECK_EQ(slot_desc_->type().type, TYPE_BOOLEAN); + } + + virtual ~BoolColumnReader() { } + + virtual bool ReadValue(MemPool* pool, Tuple* tuple) { + return ReadValue<true>(pool, tuple); + } + + virtual bool ReadNonRepeatedValue(MemPool* pool, Tuple* tuple) { + return ReadValue<false>(pool, tuple); + } + + protected: + virtual Status CreateDictionaryDecoder(uint8_t* values, int size, + DictDecoderBase** decoder) { + DCHECK(false) << "Dictionary encoding is not supported for bools. Should never " + << "have gotten this far."; + return Status::OK(); + } + + virtual bool HasDictionaryDecoder() { + // Decoder should never be created for bools. + return false; + } + + virtual void ClearDictionaryDecoder() { } + + virtual Status InitDataPage(uint8_t* data, int size) { + // Initialize bool decoder + bool_values_ = BitReader(data, size); + return Status::OK(); + } + + private: + template<bool IN_COLLECTION> + inline bool ReadValue(MemPool* pool, Tuple* tuple) { + DCHECK(slot_desc_ != NULL); + // Def and rep levels should be in valid range. + DCHECK_GE(rep_level_, 0); + DCHECK_LE(rep_level_, max_rep_level()); + DCHECK_GE(def_level_, 0); + DCHECK_LE(def_level_, max_def_level()); + DCHECK_GE(def_level_, def_level_of_immediate_repeated_ancestor()) << + "Caller should have called NextLevels() until we are ready to read a value"; + + if (def_level_ >= max_def_level()) { + return ReadSlot<IN_COLLECTION>(tuple->GetSlot(tuple_offset_), pool); + } else { + // Null value + tuple->SetNull(null_indicator_offset_); + return NextLevels<IN_COLLECTION>(); + } + } + + /// Writes the next value into *slot using pool if necessary. Also advances def_level_ + /// and rep_level_ via NextLevels(). + /// + /// Returns false if execution should be aborted for some reason, e.g. parse_error_ is + /// set, the query is cancelled, or the scan node limit was reached. Otherwise returns + /// true. + template <bool IN_COLLECTION> + inline bool ReadSlot(void* slot, MemPool* pool) { + if (!bool_values_.GetValue(1, reinterpret_cast<bool*>(slot))) { + parent_->parse_status_ = Status("Invalid bool column."); + return false; + } + return NextLevels<IN_COLLECTION>(); + } + + BitReader bool_values_; +}; + +bool ParquetColumnReader::ReadValueBatch(MemPool* pool, int max_values, + int tuple_size, uint8_t* tuple_mem, int* num_values) { + int val_count = 0; + bool continue_execution = true; + while (val_count < max_values && !RowGroupAtEnd() && continue_execution) { + Tuple* tuple = reinterpret_cast<Tuple*>(tuple_mem + val_count * tuple_size); + if (def_level_ < def_level_of_immediate_repeated_ancestor()) { + // A containing repeated field is empty or NULL + continue_execution = NextLevels(); + continue; + } + // Fill in position slot if applicable + if (pos_slot_desc_ != NULL) ReadPosition(tuple); + continue_execution = ReadValue(pool, tuple); + ++val_count; + } + *num_values = val_count; + return continue_execution; +} + +bool ParquetColumnReader::ReadNonRepeatedValueBatch(MemPool* pool, + int max_values, int tuple_size, uint8_t* tuple_mem, int* num_values) { + int val_count = 0; + bool continue_execution = true; + while (val_count < max_values && !RowGroupAtEnd() && continue_execution) { + Tuple* tuple = reinterpret_cast<Tuple*>(tuple_mem + val_count * tuple_size); + continue_execution = ReadNonRepeatedValue(pool, tuple); + ++val_count; + } + *num_values = val_count; + return continue_execution; +} + +void ParquetColumnReader::ReadPosition(Tuple* tuple) { + DCHECK(pos_slot_desc() != NULL); + // NextLevels() should have already been called + DCHECK_GE(rep_level_, 0); + DCHECK_GE(def_level_, 0); + DCHECK_GE(pos_current_value_, 0); + DCHECK_GE(def_level_, def_level_of_immediate_repeated_ancestor()) << + "Caller should have called NextLevels() until we are ready to read a value"; + + void* slot = tuple->GetSlot(pos_slot_desc()->tuple_offset()); + *reinterpret_cast<int64_t*>(slot) = pos_current_value_++; +} + +// In 1.1, we had a bug where the dictionary page metadata was not set. Returns true +// if this matches those versions and compatibility workarounds need to be used. +static bool RequiresSkippedDictionaryHeaderCheck( + const ParquetFileVersion& v) { + if (v.application != "impala") return false; + return v.VersionEq(1,1,0) || (v.VersionEq(1,2,0) && v.is_impala_internal); +} + +Status BaseScalarColumnReader::ReadDataPage() { + Status status; + uint8_t* buffer; + + // We're about to move to the next data page. The previous data page is + // now complete, pass along the memory allocated for it. + parent_->scratch_batch_->mem_pool()->AcquireData(decompressed_data_pool_.get(), false); + + // Read the next data page, skipping page types we don't care about. + // We break out of this loop on the non-error case (a data page was found or we read all + // the pages). + while (true) { + DCHECK_EQ(num_buffered_values_, 0); + if (num_values_read_ == metadata_->num_values) { + // No more pages to read + // TODO: should we check for stream_->eosr()? + break; + } else if (num_values_read_ > metadata_->num_values) { + ErrorMsg msg(TErrorCode::PARQUET_COLUMN_METADATA_INVALID, + metadata_->num_values, num_values_read_, node_.element->name, filename()); + RETURN_IF_ERROR(parent_->state_->LogOrReturnError(msg)); + return Status::OK(); + } + + int64_t buffer_size; + RETURN_IF_ERROR(stream_->GetBuffer(true, &buffer, &buffer_size)); + if (buffer_size == 0) { + // The data pages contain fewer values than stated in the column metadata. + DCHECK(stream_->eosr()); + DCHECK_LT(num_values_read_, metadata_->num_values); + // TODO for 2.3: node_.element->name isn't necessarily useful + ErrorMsg msg(TErrorCode::PARQUET_COLUMN_METADATA_INVALID, + metadata_->num_values, num_values_read_, node_.element->name, filename()); + RETURN_IF_ERROR(parent_->state_->LogOrReturnError(msg)); + return Status::OK(); + } + + // We don't know the actual header size until the thrift object is deserialized. Loop + // until we successfully deserialize the header or exceed the maximum header size. + uint32_t header_size; + while (true) { + header_size = buffer_size; + status = DeserializeThriftMsg( + buffer, &header_size, true, ¤t_page_header_); + if (status.ok()) break; + + if (buffer_size >= FLAGS_max_page_header_size) { + stringstream ss; + ss << "ParquetScanner: could not read data page because page header exceeded " + << "maximum size of " + << PrettyPrinter::Print(FLAGS_max_page_header_size, TUnit::BYTES); + status.AddDetail(ss.str()); + return status; + } + + // Didn't read entire header, increase buffer size and try again + Status status; + int64_t new_buffer_size = max<int64_t>(buffer_size * 2, 1024); + bool success = stream_->GetBytes( + new_buffer_size, &buffer, &new_buffer_size, &status, /* peek */ true); + if (!success) { + DCHECK(!status.ok()); + return status; + } + DCHECK(status.ok()); + + if (buffer_size == new_buffer_size) { + DCHECK_NE(new_buffer_size, 0); + return Status(TErrorCode::PARQUET_HEADER_EOF, filename()); + } + DCHECK_GT(new_buffer_size, buffer_size); + buffer_size = new_buffer_size; + } + + // Successfully deserialized current_page_header_ + if (!stream_->SkipBytes(header_size, &status)) return status; + + int data_size = current_page_header_.compressed_page_size; + int uncompressed_size = current_page_header_.uncompressed_page_size; + + if (current_page_header_.type == parquet::PageType::DICTIONARY_PAGE) { + if (slot_desc_ == NULL) { + // Skip processing the dictionary page if we don't need to decode any values. In + // addition to being unnecessary, we are likely unable to successfully decode the + // dictionary values because we don't necessarily create the right type of scalar + // reader if there's no slot to read into (see CreateReader()). + if (!stream_->ReadBytes(data_size, &data_, &status)) return status; + continue; + } + + if (HasDictionaryDecoder()) { + return Status("Column chunk should not contain two dictionary pages."); + } + if (node_.element->type == parquet::Type::BOOLEAN) { + return Status("Unexpected dictionary page. Dictionary page is not" + " supported for booleans."); + } + const parquet::DictionaryPageHeader* dict_header = NULL; + if (current_page_header_.__isset.dictionary_page_header) { + dict_header = ¤t_page_header_.dictionary_page_header; + } else { + if (!RequiresSkippedDictionaryHeaderCheck(parent_->file_version_)) { + return Status("Dictionary page does not have dictionary header set."); + } + } + if (dict_header != NULL && + dict_header->encoding != parquet::Encoding::PLAIN && + dict_header->encoding != parquet::Encoding::PLAIN_DICTIONARY) { + return Status("Only PLAIN and PLAIN_DICTIONARY encodings are supported " + "for dictionary pages."); + } + + if (!stream_->ReadBytes(data_size, &data_, &status)) return status; + data_end_ = data_ + data_size; + + uint8_t* dict_values = NULL; + if (decompressor_.get() != NULL) { + dict_values = parent_->dictionary_pool_->TryAllocate(uncompressed_size); + if (UNLIKELY(dict_values == NULL)) { + string details = Substitute(PARQUET_MEM_LIMIT_EXCEEDED, "ReadDataPage", + uncompressed_size, "dictionary"); + return parent_->dictionary_pool_->mem_tracker()->MemLimitExceeded( + parent_->state_, details, uncompressed_size); + } + RETURN_IF_ERROR(decompressor_->ProcessBlock32(true, data_size, data_, + &uncompressed_size, &dict_values)); + VLOG_FILE << "Decompressed " << data_size << " to " << uncompressed_size; + if (current_page_header_.uncompressed_page_size != uncompressed_size) { + return Status(Substitute("Error decompressing dictionary page in file '$0'. " + "Expected $1 uncompressed bytes but got $2", filename(), + current_page_header_.uncompressed_page_size, uncompressed_size)); + } + data_size = uncompressed_size; + } else { + if (current_page_header_.uncompressed_page_size != data_size) { + return Status(Substitute("Error reading dictionary page in file '$0'. " + "Expected $1 bytes but got $2", filename(), + current_page_header_.uncompressed_page_size, data_size)); + } + // Copy dictionary from io buffer (which will be recycled as we read + // more data) to a new buffer + dict_values = parent_->dictionary_pool_->TryAllocate(data_size); + if (UNLIKELY(dict_values == NULL)) { + string details = Substitute(PARQUET_MEM_LIMIT_EXCEEDED, "ReadDataPage", + data_size, "dictionary"); + return parent_->dictionary_pool_->mem_tracker()->MemLimitExceeded( + parent_->state_, details, data_size); + } + memcpy(dict_values, data_, data_size); + } + + DictDecoderBase* dict_decoder; + RETURN_IF_ERROR(CreateDictionaryDecoder(dict_values, data_size, &dict_decoder)); + if (dict_header != NULL && + dict_header->num_values != dict_decoder->num_entries()) { + return Status(TErrorCode::PARQUET_CORRUPT_DICTIONARY, filename(), + slot_desc_->type().DebugString(), + Substitute("Expected $0 entries but data contained $1 entries", + dict_header->num_values, dict_decoder->num_entries())); + } + // Done with dictionary page, read next page + continue; + } + + if (current_page_header_.type != parquet::PageType::DATA_PAGE) { + // We can safely skip non-data pages + if (!stream_->SkipBytes(data_size, &status)) return status; + continue; + } + + // Read Data Page + // TODO: when we start using page statistics, we will need to ignore certain corrupt + // statistics. See IMPALA-2208 and PARQUET-251. + if (!stream_->ReadBytes(data_size, &data_, &status)) return status; + data_end_ = data_ + data_size; + num_buffered_values_ = current_page_header_.data_page_header.num_values; + num_values_read_ += num_buffered_values_; + + if (decompressor_.get() != NULL) { + SCOPED_TIMER(parent_->decompress_timer_); + uint8_t* decompressed_buffer = + decompressed_data_pool_->TryAllocate(uncompressed_size); + if (UNLIKELY(decompressed_buffer == NULL)) { + string details = Substitute(PARQUET_MEM_LIMIT_EXCEEDED, "ReadDataPage", + uncompressed_size, "decompressed data"); + return decompressed_data_pool_->mem_tracker()->MemLimitExceeded( + parent_->state_, details, uncompressed_size); + } + RETURN_IF_ERROR(decompressor_->ProcessBlock32(true, + current_page_header_.compressed_page_size, data_, &uncompressed_size, + &decompressed_buffer)); + VLOG_FILE << "Decompressed " << current_page_header_.compressed_page_size + << " to " << uncompressed_size; + if (current_page_header_.uncompressed_page_size != uncompressed_size) { + return Status(Substitute("Error decompressing data page in file '$0'. " + "Expected $1 uncompressed bytes but got $2", filename(), + current_page_header_.uncompressed_page_size, uncompressed_size)); + } + data_ = decompressed_buffer; + data_size = current_page_header_.uncompressed_page_size; + data_end_ = data_ + data_size; + } else { + DCHECK_EQ(metadata_->codec, parquet::CompressionCodec::UNCOMPRESSED); + if (current_page_header_.compressed_page_size != uncompressed_size) { + return Status(Substitute("Error reading data page in file '$0'. " + "Expected $1 bytes but got $2", filename(), + current_page_header_.compressed_page_size, uncompressed_size)); + } + } + + // Initialize the repetition level data + RETURN_IF_ERROR(rep_levels_.Init(filename(), + current_page_header_.data_page_header.repetition_level_encoding, + parent_->level_cache_pool_.get(), parent_->state_->batch_size(), + max_rep_level(), num_buffered_values_, + &data_, &data_size)); + + // Initialize the definition level data + RETURN_IF_ERROR(def_levels_.Init(filename(), + current_page_header_.data_page_header.definition_level_encoding, + parent_->level_cache_pool_.get(), parent_->state_->batch_size(), + max_def_level(), num_buffered_values_, &data_, &data_size)); + + // Data can be empty if the column contains all NULLs + if (data_size != 0) RETURN_IF_ERROR(InitDataPage(data_, data_size)); + break; + } + + return Status::OK(); +} + +template <bool ADVANCE_REP_LEVEL> +bool BaseScalarColumnReader::NextLevels() { + if (!ADVANCE_REP_LEVEL) DCHECK_EQ(max_rep_level(), 0) << slot_desc()->DebugString(); + + if (UNLIKELY(num_buffered_values_ == 0)) { + if (!NextPage()) return parent_->parse_status_.ok(); + } + --num_buffered_values_; + + // Definition level is not present if column and any containing structs are required. + def_level_ = max_def_level() == 0 ? 0 : def_levels_.ReadLevel(); + + if (ADVANCE_REP_LEVEL && max_rep_level() > 0) { + // Repetition level is only present if this column is nested in any collection type. + rep_level_ = rep_levels_.ReadLevel(); + // Reset position counter if we are at the start of a new parent collection. + if (rep_level_ <= max_rep_level() - 1) pos_current_value_ = 0; + } + + return parent_->parse_status_.ok(); +} + +bool BaseScalarColumnReader::NextPage() { + parent_->assemble_rows_timer_.Stop(); + parent_->parse_status_ = ReadDataPage(); + if (UNLIKELY(!parent_->parse_status_.ok())) return false; + if (num_buffered_values_ == 0) { + rep_level_ = HdfsParquetScanner::ROW_GROUP_END; + def_level_ = HdfsParquetScanner::INVALID_LEVEL; + pos_current_value_ = HdfsParquetScanner::INVALID_POS; + return false; + } + parent_->assemble_rows_timer_.Start(); + return true; +} + +bool CollectionColumnReader::NextLevels() { + DCHECK(!children_.empty()); + DCHECK_LE(rep_level_, new_collection_rep_level()); + for (int c = 0; c < children_.size(); ++c) { + do { + // TODO(skye): verify somewhere that all column readers are at end + if (!children_[c]->NextLevels()) return false; + } while (children_[c]->rep_level() > new_collection_rep_level()); + } + UpdateDerivedState(); + return true; +} + +bool CollectionColumnReader::ReadValue(MemPool* pool, Tuple* tuple) { + DCHECK_GE(rep_level_, 0); + DCHECK_GE(def_level_, 0); + DCHECK_GE(def_level_, def_level_of_immediate_repeated_ancestor()) << + "Caller should have called NextLevels() until we are ready to read a value"; + + if (tuple_offset_ == -1) { + return CollectionColumnReader::NextLevels(); + } else if (def_level_ >= max_def_level()) { + return ReadSlot(tuple->GetSlot(tuple_offset_), pool); + } else { + // Null value + tuple->SetNull(null_indicator_offset_); + return CollectionColumnReader::NextLevels(); + } +} + +bool CollectionColumnReader::ReadNonRepeatedValue( + MemPool* pool, Tuple* tuple) { + return CollectionColumnReader::ReadValue(pool, tuple); +} + +bool CollectionColumnReader::ReadSlot(void* slot, MemPool* pool) { + DCHECK(!children_.empty()); + DCHECK_LE(rep_level_, new_collection_rep_level()); + + // Recursively read the collection into a new CollectionValue. + CollectionValue* coll_slot = reinterpret_cast<CollectionValue*>(slot); + *coll_slot = CollectionValue(); + CollectionValueBuilder builder( + coll_slot, *slot_desc_->collection_item_descriptor(), pool, parent_->state_); + bool continue_execution = parent_->AssembleCollection( + children_, new_collection_rep_level(), &builder); + if (!continue_execution) return false; + + // AssembleCollection() advances child readers, so we don't need to call NextLevels() + UpdateDerivedState(); + return true; +} + +void CollectionColumnReader::UpdateDerivedState() { + // We don't need to cap our def_level_ at max_def_level(). We always check def_level_ + // >= max_def_level() to check if the collection is defined. + // TODO(skye): consider capping def_level_ at max_def_level() + def_level_ = children_[0]->def_level(); + rep_level_ = children_[0]->rep_level(); + + // All children should have been advanced to the beginning of the next collection + for (int i = 0; i < children_.size(); ++i) { + DCHECK_EQ(children_[i]->rep_level(), rep_level_); + if (def_level_ < max_def_level()) { + // Collection not defined + FILE_CHECK_EQ(children_[i]->def_level(), def_level_); + } else { + // Collection is defined + FILE_CHECK_GE(children_[i]->def_level(), max_def_level()); + } + } + + if (RowGroupAtEnd()) { + // No more values + pos_current_value_ = HdfsParquetScanner::INVALID_POS; + } else if (rep_level_ <= max_rep_level() - 2) { + // Reset position counter if we are at the start of a new parent collection (i.e., + // the current collection is the first item in a new parent collection). + pos_current_value_ = 0; + } +} + +ParquetColumnReader* ParquetColumnReader::Create(const SchemaNode& node, + bool is_collection_field, const SlotDescriptor* slot_desc, HdfsParquetScanner* parent) { + ParquetColumnReader* reader = NULL; + if (is_collection_field) { + // Create collection reader (note this handles both NULL and non-NULL 'slot_desc') + reader = new CollectionColumnReader(parent, node, slot_desc); + } else if (slot_desc != NULL) { + // Create the appropriate ScalarColumnReader type to read values into 'slot_desc' + switch (slot_desc->type().type) { + case TYPE_BOOLEAN: + reader = new BoolColumnReader(parent, node, slot_desc); + break; + case TYPE_TINYINT: + reader = new ScalarColumnReader<int8_t, true>(parent, node, slot_desc); + break; + case TYPE_SMALLINT: + reader = new ScalarColumnReader<int16_t, true>(parent, node, slot_desc); + break; + case TYPE_INT: + reader = new ScalarColumnReader<int32_t, true>(parent, node, slot_desc); + break; + case TYPE_BIGINT: + reader = new ScalarColumnReader<int64_t, true>(parent, node, slot_desc); + break; + case TYPE_FLOAT: + reader = new ScalarColumnReader<float, true>(parent, node, slot_desc); + break; + case TYPE_DOUBLE: + reader = new ScalarColumnReader<double, true>(parent, node, slot_desc); + break; + case TYPE_TIMESTAMP: + reader = new ScalarColumnReader<TimestampValue, true>(parent, node, slot_desc); + break; + case TYPE_STRING: + case TYPE_VARCHAR: + case TYPE_CHAR: + reader = new ScalarColumnReader<StringValue, true>(parent, node, slot_desc); + break; + case TYPE_DECIMAL: + switch (slot_desc->type().GetByteSize()) { + case 4: + reader = new ScalarColumnReader<Decimal4Value, true>( + parent, node, slot_desc); + break; + case 8: + reader = new ScalarColumnReader<Decimal8Value, true>( + parent, node, slot_desc); + break; + case 16: + reader = new ScalarColumnReader<Decimal16Value, true>( + parent, node, slot_desc); + break; + } + break; + default: + DCHECK(false) << slot_desc->type().DebugString(); + } + } else { + // Special case for counting scalar values (e.g. count(*), no materialized columns in + // the file, only materializing a position slot). We won't actually read any values, + // only the rep and def levels, so it doesn't matter what kind of reader we make. + reader = new ScalarColumnReader<int8_t, false>(parent, node, slot_desc); + } + return parent->obj_pool_.Add(reader); +} + +} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6ee15fad/be/src/exec/parquet-column-readers.h ---------------------------------------------------------------------- diff --git a/be/src/exec/parquet-column-readers.h b/be/src/exec/parquet-column-readers.h new file mode 100644 index 0000000..3c26084 --- /dev/null +++ b/be/src/exec/parquet-column-readers.h @@ -0,0 +1,500 @@ +// Copyright 2016 Cloudera Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef IMPALA_PARQUET_COLUMN_READERS_H +#define IMPALA_PARQUET_COLUMN_READERS_H + +#include <boost/scoped_ptr.hpp> + +#include "exec/hdfs-parquet-scanner.h" +#include "util/codec.h" +#include "util/dict-encoding.h" +#include "util/rle-encoding.h" + +namespace impala { + +class Tuple; +class MemPool; + +/// Decoder for all supported Parquet level encodings. Optionally reads, decodes, and +/// caches level values in batches. +/// Level values are unsigned 8-bit integers because we support a maximum nesting +/// depth of 100, as enforced by the FE. Using a small type saves memory and speeds up +/// populating the level cache (e.g., with RLE we can memset() repeated values). +/// +/// Inherits from RleDecoder instead of containing one for performance reasons. +/// The containment design would require two BitReaders per column reader. The extra +/// BitReader causes enough bloat for a column reader to require another cache line. +/// TODO: It is not clear whether the inheritance vs. containment choice still makes +/// sense with column-wise materialization. The containment design seems cleaner and +/// we should revisit. +class ParquetLevelDecoder : public RleDecoder { + public: + ParquetLevelDecoder(bool is_def_level_decoder) + : cached_levels_(NULL), + num_cached_levels_(0), + cached_level_idx_(0), + encoding_(parquet::Encoding::PLAIN), + max_level_(0), + cache_size_(0), + num_buffered_values_(0), + decoding_error_code_(is_def_level_decoder ? + TErrorCode::PARQUET_DEF_LEVEL_ERROR : TErrorCode::PARQUET_REP_LEVEL_ERROR) { + } + + /// Initialize the LevelDecoder. Reads and advances the provided data buffer if the + /// encoding requires reading metadata from the page header. + Status Init(const string& filename, parquet::Encoding::type encoding, + MemPool* cache_pool, int cache_size, int max_level, int num_buffered_values, + uint8_t** data, int* data_size); + + /// Returns the next level or INVALID_LEVEL if there was an error. + inline int16_t ReadLevel(); + + /// Decodes and caches the next batch of levels. Resets members associated with the + /// cache. Returns a non-ok status if there was a problem decoding a level, or if a + /// level was encountered with a value greater than max_level_. + Status CacheNextBatch(int batch_size); + + /// Functions for working with the level cache. + inline bool CacheHasNext() const { return cached_level_idx_ < num_cached_levels_; } + inline uint8_t CacheGetNext() { + DCHECK_LT(cached_level_idx_, num_cached_levels_); + return cached_levels_[cached_level_idx_++]; + } + inline void CacheSkipLevels(int num_levels) { + DCHECK_LE(cached_level_idx_ + num_levels, num_cached_levels_); + cached_level_idx_ += num_levels; + } + inline int CacheSize() const { return num_cached_levels_; } + inline int CacheRemaining() const { return num_cached_levels_ - cached_level_idx_; } + inline int CacheCurrIdx() const { return cached_level_idx_; } + + private: + /// Initializes members associated with the level cache. Allocates memory for + /// the cache from pool, if necessary. + Status InitCache(MemPool* pool, int cache_size); + + /// Decodes and writes a batch of levels into the cache. Sets the number of + /// values written to the cache in *num_cached_levels. Returns false if there was + /// an error decoding a level or if there was a level value greater than max_level_. + bool FillCache(int batch_size, int* num_cached_levels); + + /// Buffer for a batch of levels. The memory is allocated and owned by a pool in + /// passed in Init(). + uint8_t* cached_levels_; + /// Number of valid level values in the cache. + int num_cached_levels_; + /// Current index into cached_levels_. + int cached_level_idx_; + parquet::Encoding::type encoding_; + + /// For error checking and reporting. + int max_level_; + /// Number of level values cached_levels_ has memory allocated for. + int cache_size_; + /// Number of remaining data values in the current data page. + int num_buffered_values_; + string filename_; + TErrorCode::type decoding_error_code_; +}; + +/// Base class for reading a Parquet column. Reads a logical column, not necessarily a +/// column materialized in the file (e.g. collections). The two subclasses are +/// BaseScalarColumnReader and CollectionColumnReader. Column readers read one def and rep +/// level pair at a time. The current def and rep level are exposed to the user, and the +/// corresponding value (if defined) can optionally be copied into a slot via +/// ReadValue(). Can also write position slots. +class ParquetColumnReader { + public: + /// Creates a column reader for 'node' and associates it with the given parent scanner. + /// Adds the new column reader to the parent's object pool. + /// 'slot_desc' may be NULL, in which case the returned column reader can only be used + /// to read def/rep levels. + /// 'is_collection_field' should be set to true if the returned reader is reading a + /// collection. This cannot be determined purely by 'node' because a repeated scalar + /// node represents both an array and the array's items (in this case + /// 'is_collection_field' should be true if the reader reads one value per array, and + /// false if it reads one value per item). The reader is added to the runtime state's + /// object pool. Does not create child readers for collection readers; these must be + /// added by the caller. + static ParquetColumnReader* Create(const SchemaNode& node, bool is_collection_field, + const SlotDescriptor* slot_desc, HdfsParquetScanner* parent); + + virtual ~ParquetColumnReader() { } + + int def_level() const { return def_level_; } + int rep_level() const { return rep_level_; } + + const SlotDescriptor* slot_desc() const { return slot_desc_; } + const parquet::SchemaElement& schema_element() const { return *node_.element; } + int16_t max_def_level() const { return max_def_level_; } + int16_t max_rep_level() const { return max_rep_level_; } + int def_level_of_immediate_repeated_ancestor() const { + return node_.def_level_of_immediate_repeated_ancestor; + } + const SlotDescriptor* pos_slot_desc() const { return pos_slot_desc_; } + void set_pos_slot_desc(const SlotDescriptor* pos_slot_desc) { + DCHECK(pos_slot_desc_ == NULL); + pos_slot_desc_ = pos_slot_desc; + } + + /// Returns true if this reader materializes collections (i.e. CollectionValues). + virtual bool IsCollectionReader() const { return false; } + + const char* filename() const { return parent_->filename(); }; + + /// Read the current value (or null) into 'tuple' for this column. This should only be + /// called when a value is defined, i.e., def_level() >= + /// def_level_of_immediate_repeated_ancestor() (since empty or NULL collections produce + /// no output values), otherwise NextLevels() should be called instead. + /// + /// Advances this column reader to the next value (i.e. NextLevels() doesn't need to be + /// called after calling ReadValue()). + /// + /// Returns false if execution should be aborted for some reason, e.g. parse_error_ is + /// set, the query is cancelled, or the scan node limit was reached. Otherwise returns + /// true. + /// + /// NextLevels() must be called on this reader before calling ReadValue() for the first + /// time. This is to initialize the current value that ReadValue() will read. + /// + /// TODO: this is the function that needs to be codegen'd (e.g. CodegenReadValue()) + /// The codegened functions from all the materialized cols will then be combined + /// into one function. + /// TODO: another option is to materialize col by col for the entire row batch in + /// one call. e.g. MaterializeCol would write out 1024 values. Our row batches + /// are currently dense so we'll need to figure out something there. + virtual bool ReadValue(MemPool* pool, Tuple* tuple) = 0; + + /// Same as ReadValue() but does not advance repetition level. Only valid for columns + /// not in collections. + virtual bool ReadNonRepeatedValue(MemPool* pool, Tuple* tuple) = 0; + + /// Returns true if this reader needs to be seeded with NextLevels() before + /// calling ReadValueBatch() or ReadNonRepeatedValueBatch(). + /// Note that all readers need to be seeded before calling the non-batched ReadValue(). + virtual bool NeedsSeedingForBatchedReading() const { return true; } + + /// Batched version of ReadValue() that reads up to max_values at once and materializes + /// them into tuples in tuple_mem. Returns the number of values actually materialized + /// in *num_values. The return value, error behavior and state changes are generally + /// the same as in ReadValue(). For example, if an error occurs in the middle of + /// materializing a batch then false is returned, and num_values, tuple_mem, as well as + /// this column reader are left in an undefined state, assuming that the caller will + /// immediately abort execution. + virtual bool ReadValueBatch(MemPool* pool, int max_values, int tuple_size, + uint8_t* tuple_mem, int* num_values); + + /// Batched version of ReadNonRepeatedValue() that reads up to max_values at once and + /// materializes them into tuples in tuple_mem. + /// The return value and error behavior are the same as in ReadValueBatch(). + virtual bool ReadNonRepeatedValueBatch(MemPool* pool, int max_values, int tuple_size, + uint8_t* tuple_mem, int* num_values); + + /// Advances this column reader's def and rep levels to the next logical value, i.e. to + /// the next scalar value or the beginning of the next collection, without attempting to + /// read the value. This is used to skip past def/rep levels that don't materialize a + /// value, such as the def/rep levels corresponding to an empty containing collection. + /// + /// NextLevels() must be called on this reader before calling ReadValue() for the first + /// time. This is to initialize the current value that ReadValue() will read. + /// + /// Returns false if execution should be aborted for some reason, e.g. parse_error_ is + /// set, the query is cancelled, or the scan node limit was reached. Otherwise returns + /// true. + virtual bool NextLevels() = 0; + + /// Should only be called if pos_slot_desc_ is non-NULL. Writes pos_current_value_ to + /// 'tuple' (i.e. "reads" the synthetic position field of the parent collection into + /// 'tuple') and increments pos_current_value_. + void ReadPosition(Tuple* tuple); + + /// Returns true if this column reader has reached the end of the row group. + inline bool RowGroupAtEnd() { return rep_level_ == HdfsParquetScanner::ROW_GROUP_END; } + + protected: + HdfsParquetScanner* parent_; + const SchemaNode& node_; + const SlotDescriptor* slot_desc_; + + /// The slot descriptor for the position field of the tuple, if there is one. NULL if + /// there's not. Only one column reader for a given tuple desc will have this set. + const SlotDescriptor* pos_slot_desc_; + + /// The next value to write into the position slot, if there is one. 64-bit int because + /// the pos slot is always a BIGINT Set to -1 when this column reader does not have a + /// current rep and def level (i.e. before the first NextLevels() call or after the last + /// value in the column has been read). + int64_t pos_current_value_; + + /// The current repetition and definition levels of this reader. Advanced via + /// ReadValue() and NextLevels(). Set to -1 when this column reader does not have a + /// current rep and def level (i.e. before the first NextLevels() call or after the last + /// value in the column has been read). If this is not inside a collection, rep_level_ is + /// always 0. + /// int16_t is large enough to hold the valid levels 0-255 and sentinel value -1. + /// The maximum values are cached here because they are accessed in inner loops. + int16_t rep_level_; + int16_t max_rep_level_; + int16_t def_level_; + int16_t max_def_level_; + + // Cache frequently accessed members of slot_desc_ for perf. + + /// slot_desc_->tuple_offset(). -1 if slot_desc_ is NULL. + int tuple_offset_; + + /// slot_desc_->null_indicator_offset(). Invalid if slot_desc_ is NULL. + NullIndicatorOffset null_indicator_offset_; + + ParquetColumnReader(HdfsParquetScanner* parent, const SchemaNode& node, + const SlotDescriptor* slot_desc) + : parent_(parent), + node_(node), + slot_desc_(slot_desc), + pos_slot_desc_(NULL), + pos_current_value_(HdfsParquetScanner::INVALID_POS), + rep_level_(HdfsParquetScanner::INVALID_LEVEL), + max_rep_level_(node_.max_rep_level), + def_level_(HdfsParquetScanner::INVALID_LEVEL), + max_def_level_(node_.max_def_level), + tuple_offset_(slot_desc == NULL ? -1 : slot_desc->tuple_offset()), + null_indicator_offset_(slot_desc == NULL ? NullIndicatorOffset(-1, -1) : + slot_desc->null_indicator_offset()) { + DCHECK_GE(node_.max_rep_level, 0); + DCHECK_LE(node_.max_rep_level, std::numeric_limits<int16_t>::max()); + DCHECK_GE(node_.max_def_level, 0); + DCHECK_LE(node_.max_def_level, std::numeric_limits<int16_t>::max()); + // rep_level_ is always valid and equal to 0 if col not in collection. + if (max_rep_level() == 0) rep_level_ = 0; + } +}; + +/// Reader for a single column from the parquet file. It's associated with a +/// ScannerContext::Stream and is responsible for decoding the data. Super class for +/// per-type column readers. This contains most of the logic, the type specific functions +/// must be implemented in the subclass. +class BaseScalarColumnReader : public ParquetColumnReader { + public: + BaseScalarColumnReader(HdfsParquetScanner* parent, const SchemaNode& node, + const SlotDescriptor* slot_desc) + : ParquetColumnReader(parent, node, slot_desc), + data_(NULL), + data_end_(NULL), + def_levels_(true), + rep_levels_(false), + page_encoding_(parquet::Encoding::PLAIN_DICTIONARY), + num_buffered_values_(0), + num_values_read_(0), + metadata_(NULL), + stream_(NULL), + decompressed_data_pool_(new MemPool(parent->scan_node_->mem_tracker())) { + DCHECK_GE(node_.col_idx, 0) << node_.DebugString(); + } + + virtual ~BaseScalarColumnReader() { } + + /// This is called once for each row group in the file. + Status Reset(const parquet::ColumnMetaData* metadata, ScannerContext::Stream* stream) { + DCHECK(stream != NULL); + DCHECK(metadata != NULL); + + num_buffered_values_ = 0; + data_ = NULL; + data_end_ = NULL; + stream_ = stream; + metadata_ = metadata; + num_values_read_ = 0; + def_level_ = -1; + // See ColumnReader constructor. + rep_level_ = max_rep_level() == 0 ? 0 : -1; + pos_current_value_ = -1; + + if (metadata_->codec != parquet::CompressionCodec::UNCOMPRESSED) { + RETURN_IF_ERROR(Codec::CreateDecompressor( + NULL, false, PARQUET_TO_IMPALA_CODEC[metadata_->codec], &decompressor_)); + } + ClearDictionaryDecoder(); + return Status::OK(); + } + + /// Called once when the scanner is complete for final cleanup. + void Close() { + if (decompressor_.get() != NULL) decompressor_->Close(); + } + + int64_t total_len() const { return metadata_->total_compressed_size; } + int col_idx() const { return node_.col_idx; } + THdfsCompression::type codec() const { + if (metadata_ == NULL) return THdfsCompression::NONE; + return PARQUET_TO_IMPALA_CODEC[metadata_->codec]; + } + MemPool* decompressed_data_pool() const { return decompressed_data_pool_.get(); } + + /// Reads the next definition and repetition levels for this column. Initializes the + /// next data page if necessary. + virtual bool NextLevels() { return NextLevels<true>(); } + + // TODO: Some encodings might benefit a lot from a SkipValues(int num_rows) if + // we know this row can be skipped. This could be very useful with stats and big + // sections can be skipped. Implement that when we can benefit from it. + + protected: + // Friend parent scanner so it can perform validation (e.g. ValidateEndOfRowGroup()) + friend class HdfsParquetScanner; + + // Class members that are accessed for every column should be included up here so they + // fit in as few cache lines as possible. + + /// Pointer to start of next value in data page + uint8_t* data_; + + /// End of the data page. + const uint8_t* data_end_; + + /// Decoder for definition levels. + ParquetLevelDecoder def_levels_; + + /// Decoder for repetition levels. + ParquetLevelDecoder rep_levels_; + + /// Page encoding for values. Cached here for perf. + parquet::Encoding::type page_encoding_; + + /// Num values remaining in the current data page + int num_buffered_values_; + + // Less frequently used members that are not accessed in inner loop should go below + // here so they do not occupy precious cache line space. + + /// The number of values seen so far. Updated per data page. + int64_t num_values_read_; + + const parquet::ColumnMetaData* metadata_; + boost::scoped_ptr<Codec> decompressor_; + ScannerContext::Stream* stream_; + + /// Pool to allocate decompression buffers from. + boost::scoped_ptr<MemPool> decompressed_data_pool_; + + /// Header for current data page. + parquet::PageHeader current_page_header_; + + /// Read the next data page. If a dictionary page is encountered, that will be read and + /// this function will continue reading the next data page. + Status ReadDataPage(); + + /// Try to move the the next page and buffer more values. Return false and sets rep_level_, + /// def_level_ and pos_current_value_ to -1 if no more pages or an error encountered. + bool NextPage(); + + /// Implementation for NextLevels(). + template <bool ADVANCE_REP_LEVEL> + bool NextLevels(); + + /// Creates a dictionary decoder from values/size. 'decoder' is set to point to a + /// dictionary decoder stored in this object. Subclass must implement this. Returns + /// an error status if the dictionary values could not be decoded successfully. + virtual Status CreateDictionaryDecoder(uint8_t* values, int size, + DictDecoderBase** decoder) = 0; + + /// Return true if the column has an initialized dictionary decoder. Subclass must + /// implement this. + virtual bool HasDictionaryDecoder() = 0; + + /// Clear the dictionary decoder so HasDictionaryDecoder() will return false. Subclass + /// must implement this. + virtual void ClearDictionaryDecoder() = 0; + + /// Initializes the reader with the data contents. This is the content for the entire + /// decompressed data page. Decoders can initialize state from here. + virtual Status InitDataPage(uint8_t* data, int size) = 0; + + private: + /// Writes the next value into *slot using pool if necessary. Also advances rep_level_ + /// and def_level_ via NextLevels(). + /// + /// Returns false if execution should be aborted for some reason, e.g. parse_error_ is + /// set, the query is cancelled, or the scan node limit was reached. Otherwise returns + /// true. + template <bool IN_COLLECTION> + inline bool ReadSlot(void* slot, MemPool* pool); +}; + +/// Collections are not materialized directly in parquet files; only scalar values appear +/// in the file. CollectionColumnReader uses the definition and repetition levels of child +/// column readers to figure out the boundaries of each collection in this column. +class CollectionColumnReader : public ParquetColumnReader { + public: + CollectionColumnReader(HdfsParquetScanner* parent, const SchemaNode& node, + const SlotDescriptor* slot_desc) + : ParquetColumnReader(parent, node, slot_desc) { + DCHECK(node_.is_repeated()); + if (slot_desc != NULL) DCHECK(slot_desc->type().IsCollectionType()); + } + + virtual ~CollectionColumnReader() { } + + vector<ParquetColumnReader*>* children() { return &children_; } + + virtual bool IsCollectionReader() const { return true; } + + /// The repetition level indicating that the current value is the first in a new + /// collection (meaning the last value read was the final item in the previous + /// collection). + int new_collection_rep_level() const { return max_rep_level() - 1; } + + /// Materializes CollectionValue into tuple slot (if materializing) and advances to next + /// value. + virtual bool ReadValue(MemPool* pool, Tuple* tuple); + + /// Same as ReadValue but does not advance repetition level. Only valid for columns not + /// in collections. + virtual bool ReadNonRepeatedValue(MemPool* pool, Tuple* tuple); + + /// Advances all child readers to the beginning of the next collection and updates this + /// reader's state. + virtual bool NextLevels(); + + /// This is called once for each row group in the file. + void Reset() { + def_level_ = -1; + rep_level_ = -1; + pos_current_value_ = -1; + } + + private: + /// Column readers of fields contained within this collection. There is at least one + /// child reader per collection reader. Child readers either materialize slots in the + /// collection item tuples, or there is a single child reader that does not materialize + /// any slot and is only used by this reader to read def and rep levels. + vector<ParquetColumnReader*> children_; + + /// Updates this reader's def_level_, rep_level_, and pos_current_value_ based on child + /// reader's state. + void UpdateDerivedState(); + + /// Recursively reads from children_ to assemble a single CollectionValue into + /// *slot. Also advances rep_level_ and def_level_ via NextLevels(). + /// + /// Returns false if execution should be aborted for some reason, e.g. parse_error_ is + /// set, the query is cancelled, or the scan node limit was reached. Otherwise returns + /// true. + inline bool ReadSlot(void* slot, MemPool* pool); +}; + +} + +#endif
