http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6ee15fad/be/src/exec/hdfs-parquet-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc index fa64f84..1ebb650 100644 --- a/be/src/exec/hdfs-parquet-scanner.cc +++ b/be/src/exec/hdfs-parquet-scanner.cc @@ -17,17 +17,15 @@ #include <limits> // for std::numeric_limits #include <queue> -#include <boost/algorithm/string.hpp> #include <gflags/gflags.h> #include <gutil/strings/substitute.h> -#include "common/object-pool.h" #include "common/logging.h" +#include "exec/hdfs-scanner.h" #include "exec/hdfs-scan-node.h" +#include "exec/parquet-column-readers.h" #include "exec/scanner-context.inline.h" -#include "exec/read-write-util.h" #include "exprs/expr.h" -#include "gutil/bits.h" #include "runtime/collection-value-builder.h" #include "runtime/descriptors.h" #include "runtime/runtime-state.h" @@ -37,47 +35,18 @@ #include "runtime/tuple-row.h" #include "runtime/tuple.h" #include "runtime/string-value.h" -#include "util/bitmap.h" -#include "util/bit-util.h" -#include "util/decompress.h" #include "util/debug-util.h" #include "util/error-util.h" -#include "util/dict-encoding.h" -#include "util/rle-encoding.h" -#include "util/runtime-profile-counters.h" #include "rpc/thrift-util.h" #include "common/names.h" -using boost::algorithm::is_any_of; -using boost::algorithm::split; -using boost::algorithm::token_compress_on; +using strings::Substitute; using namespace impala; -using namespace strings; - -// Provide a workaround for IMPALA-1658. -DEFINE_bool(convert_legacy_hive_parquet_utc_timestamps, false, - "When true, TIMESTAMPs read from files written by Parquet-MR (used by Hive) will " - "be converted from UTC to local time. Writes are unaffected."); DEFINE_double(parquet_min_filter_reject_ratio, 0.1, "(Advanced) If the percentage of " "rows rejected by a runtime filter drops below this value, the filter is disabled."); -const int64_t HdfsParquetScanner::FOOTER_SIZE = 100 * 1024; -const int16_t HdfsParquetScanner::ROW_GROUP_END = numeric_limits<int16_t>::min(); -const int16_t HdfsParquetScanner::INVALID_LEVEL = -1; -const int16_t HdfsParquetScanner::INVALID_POS = -1; - -// Max data page header size in bytes. This is an estimate and only needs to be an upper -// bound. It is theoretically possible to have a page header of any size due to string -// value statistics, but in practice we'll have trouble reading string values this large. -// Also, this limit is in place to prevent impala from reading corrupt parquet files. -DEFINE_int32(max_page_header_size, 8*1024*1024, "max parquet page header size in bytes"); - -// Max dictionary page header size in bytes. This is an estimate and only needs to be an -// upper bound. -const int MAX_DICT_HEADER_SIZE = 100; - // The number of rows between checks to see if a filter is not effective, and should be // disabled. Must be a power of two. const int ROWS_PER_FILTER_SELECTIVITY_CHECK = 16 * 1024; @@ -85,19 +54,14 @@ static_assert( !(ROWS_PER_FILTER_SELECTIVITY_CHECK & (ROWS_PER_FILTER_SELECTIVITY_CHECK - 1)), "ROWS_PER_FILTER_SELECTIVITY_CHECK must be a power of two"); -// FILE_CHECKs are conditions that we expect to be true but could fail due to a malformed -// input file. They differentiate these cases from DCHECKs, which indicate conditions that -// are true unless there's a bug in Impala. We would ideally always return a bad Status -// instead of failing a FILE_CHECK, but in many cases we use FILE_CHECK instead because -// there's a performance cost to doing the check in a release build, or just due to legacy -// code. -#define FILE_CHECK(a) DCHECK(a) -#define FILE_CHECK_EQ(a, b) DCHECK_EQ(a, b) -#define FILE_CHECK_NE(a, b) DCHECK_NE(a, b) -#define FILE_CHECK_GT(a, b) DCHECK_GT(a, b) -#define FILE_CHECK_LT(a, b) DCHECK_LT(a, b) -#define FILE_CHECK_GE(a, b) DCHECK_GE(a, b) -#define FILE_CHECK_LE(a, b) DCHECK_LE(a, b) +// Max dictionary page header size in bytes. This is an estimate and only needs to be an +// upper bound. +const int MAX_DICT_HEADER_SIZE = 100; + +const int64_t HdfsParquetScanner::FOOTER_SIZE; +const int16_t HdfsParquetScanner::ROW_GROUP_END; +const int16_t HdfsParquetScanner::INVALID_LEVEL; +const int16_t HdfsParquetScanner::INVALID_POS; Status HdfsParquetScanner::IssueInitialRanges(HdfsScanNode* scan_node, const std::vector<HdfsFileDesc*>& files) { @@ -168,967 +132,18 @@ DiskIoMgr::ScanRange* HdfsParquetScanner::FindFooterSplit(HdfsFileDesc* file) { namespace impala { -/// Helper struct that holds a batch of tuples allocated from a mem pool, as well -/// as state associated with iterating over its tuples and transferring -/// them to an output batch in TransferScratchTuples(). -struct ScratchTupleBatch { - // Memory backing the batch of tuples. Allocated from batch's tuple data pool. - uint8_t* tuple_mem; - // Keeps track of the current tuple index. - int tuple_idx; - // Number of valid tuples in tuple_mem. - int num_tuples; - // Cached for convenient access. - const int tuple_byte_size; - - // Helper batch for safely allocating tuple_mem from its tuple data pool using - // ResizeAndAllocateTupleBuffer(). - RowBatch batch; - - ScratchTupleBatch( - const RowDescriptor& row_desc, int batch_size, MemTracker* mem_tracker) - : tuple_mem(NULL), - tuple_idx(0), - num_tuples(0), - tuple_byte_size(row_desc.GetRowSize()), - batch(row_desc, batch_size, mem_tracker) { - DCHECK_EQ(row_desc.tuple_descriptors().size(), 1); - } - - Status Reset(RuntimeState* state) { - tuple_idx = 0; - num_tuples = 0; - // Buffer size is not needed. - int64_t buffer_size; - RETURN_IF_ERROR(batch.ResizeAndAllocateTupleBuffer(state, &buffer_size, &tuple_mem)); - return Status::OK(); - } - - inline Tuple* GetTuple(int tuple_idx) const { - return reinterpret_cast<Tuple*>(tuple_mem + tuple_idx * tuple_byte_size); - } - - inline MemPool* mem_pool() { return batch.tuple_data_pool(); } - inline int capacity() const { return batch.capacity(); } - inline uint8_t* CurrTuple() const { return tuple_mem + tuple_idx * tuple_byte_size; } - inline uint8_t* TupleEnd() const { return tuple_mem + num_tuples * tuple_byte_size; } - inline bool AtEnd() const { return tuple_idx == num_tuples; } -}; - -const string PARQUET_MEM_LIMIT_EXCEEDED = "HdfsParquetScanner::$0() failed to allocate " - "$1 bytes for $2."; - HdfsParquetScanner::HdfsParquetScanner(HdfsScanNode* scan_node, RuntimeState* state) : HdfsScanner(scan_node, state), scratch_batch_(new ScratchTupleBatch( scan_node->row_desc(), state_->batch_size(), scan_node->mem_tracker())), metadata_range_(NULL), dictionary_pool_(new MemPool(scan_node->mem_tracker())), - assemble_rows_timer_(scan_node_->materialize_tuple_timer()) { + assemble_rows_timer_(scan_node_->materialize_tuple_timer()), + num_cols_counter_(NULL), + num_row_groups_counter_(NULL) { assemble_rows_timer_.Stop(); } -HdfsParquetScanner::~HdfsParquetScanner() { -} - -// TODO for 2.3: move column readers to separate file - -/// Decoder for all supported Parquet level encodings. Optionally reads, decodes, and -/// caches level values in batches. -/// Level values are unsigned 8-bit integers because we support a maximum nesting -/// depth of 100, as enforced by the FE. Using a small type saves memory and speeds up -/// populating the level cache (e.g., with RLE we can memset() repeated values). -/// -/// Inherits from RleDecoder instead of containing one for performance reasons. -/// The containment design would require two BitReaders per column reader. The extra -/// BitReader causes enough bloat for a column reader to require another cache line. -/// TODO: It is not clear whether the inheritance vs. containment choice still makes -/// sense with column-wise materialization. The containment design seems cleaner and -/// we should revisit. -class HdfsParquetScanner::LevelDecoder : public RleDecoder { - public: - LevelDecoder(bool is_def_level_decoder) - : cached_levels_(NULL), - num_cached_levels_(0), - cached_level_idx_(0), - encoding_(parquet::Encoding::PLAIN), - max_level_(0), - cache_size_(0), - num_buffered_values_(0), - decoding_error_code_(is_def_level_decoder ? - TErrorCode::PARQUET_DEF_LEVEL_ERROR : TErrorCode::PARQUET_REP_LEVEL_ERROR) { - } - - /// Initialize the LevelDecoder. Reads and advances the provided data buffer if the - /// encoding requires reading metadata from the page header. - Status Init(const string& filename, parquet::Encoding::type encoding, - MemPool* cache_pool, int cache_size, int max_level, int num_buffered_values, - uint8_t** data, int* data_size); - - /// Returns the next level or INVALID_LEVEL if there was an error. - inline int16_t ReadLevel(); - - /// Decodes and caches the next batch of levels. Resets members associated with the - /// cache. Returns a non-ok status if there was a problem decoding a level, or if a - /// level was encountered with a value greater than max_level_. - Status CacheNextBatch(int batch_size); - - /// Functions for working with the level cache. - inline bool CacheHasNext() const { return cached_level_idx_ < num_cached_levels_; } - inline uint8_t CacheGetNext() { - DCHECK_LT(cached_level_idx_, num_cached_levels_); - return cached_levels_[cached_level_idx_++]; - } - inline void CacheSkipLevels(int num_levels) { - DCHECK_LE(cached_level_idx_ + num_levels, num_cached_levels_); - cached_level_idx_ += num_levels; - } - inline int CacheSize() const { return num_cached_levels_; } - inline int CacheRemaining() const { return num_cached_levels_ - cached_level_idx_; } - inline int CacheCurrIdx() const { return cached_level_idx_; } - - private: - /// Initializes members associated with the level cache. Allocates memory for - /// the cache from pool, if necessary. - Status InitCache(MemPool* pool, int cache_size); - - /// Decodes and writes a batch of levels into the cache. Sets the number of - /// values written to the cache in *num_cached_levels. Returns false if there was - /// an error decoding a level or if there was a level value greater than max_level_. - bool FillCache(int batch_size, int* num_cached_levels); - - /// Buffer for a batch of levels. The memory is allocated and owned by a pool in - /// passed in Init(). - uint8_t* cached_levels_; - /// Number of valid level values in the cache. - int num_cached_levels_; - /// Current index into cached_levels_. - int cached_level_idx_; - parquet::Encoding::type encoding_; - - /// For error checking and reporting. - int max_level_; - /// Number of level values cached_levels_ has memory allocated for. - int cache_size_; - /// Number of remaining data values in the current data page. - int num_buffered_values_; - string filename_; - TErrorCode::type decoding_error_code_; -}; - -/// Base class for reading a column. Reads a logical column, not necessarily a column -/// materialized in the file (e.g. collections). The two subclasses are -/// BaseScalarColumnReader and CollectionColumnReader. Column readers read one def and rep -/// level pair at a time. The current def and rep level are exposed to the user, and the -/// corresponding value (if defined) can optionally be copied into a slot via -/// ReadValue(). Can also write position slots. -class HdfsParquetScanner::ColumnReader { - public: - virtual ~ColumnReader() { } - - int def_level() const { return def_level_; } - int rep_level() const { return rep_level_; } - - const SlotDescriptor* slot_desc() const { return slot_desc_; } - const parquet::SchemaElement& schema_element() const { return *node_.element; } - int16_t max_def_level() const { return max_def_level_; } - int16_t max_rep_level() const { return max_rep_level_; } - int def_level_of_immediate_repeated_ancestor() const { - return node_.def_level_of_immediate_repeated_ancestor; - } - const SlotDescriptor* pos_slot_desc() const { return pos_slot_desc_; } - void set_pos_slot_desc(const SlotDescriptor* pos_slot_desc) { - DCHECK(pos_slot_desc_ == NULL); - pos_slot_desc_ = pos_slot_desc; - } - - /// Returns true if this reader materializes collections (i.e. CollectionValues). - virtual bool IsCollectionReader() const { return false; } - - const char* filename() const { return parent_->filename(); }; - - /// Read the current value (or null) into 'tuple' for this column. This should only be - /// called when a value is defined, i.e., def_level() >= - /// def_level_of_immediate_repeated_ancestor() (since empty or NULL collections produce - /// no output values), otherwise NextLevels() should be called instead. - /// - /// Advances this column reader to the next value (i.e. NextLevels() doesn't need to be - /// called after calling ReadValue()). - /// - /// Returns false if execution should be aborted for some reason, e.g. parse_error_ is - /// set, the query is cancelled, or the scan node limit was reached. Otherwise returns - /// true. - /// - /// NextLevels() must be called on this reader before calling ReadValue() for the first - /// time. This is to initialize the current value that ReadValue() will read. - /// - /// TODO: this is the function that needs to be codegen'd (e.g. CodegenReadValue()) - /// The codegened functions from all the materialized cols will then be combined - /// into one function. - /// TODO: another option is to materialize col by col for the entire row batch in - /// one call. e.g. MaterializeCol would write out 1024 values. Our row batches - /// are currently dense so we'll need to figure out something there. - virtual bool ReadValue(MemPool* pool, Tuple* tuple) = 0; - - /// Same as ReadValue() but does not advance repetition level. Only valid for columns - /// not in collections. - virtual bool ReadNonRepeatedValue(MemPool* pool, Tuple* tuple) = 0; - - /// Returns true if this reader needs to be seeded with NextLevels() before - /// calling ReadValueBatch() or ReadNonRepeatedValueBatch(). - /// Note that all readers need to be seeded before calling the non-batched ReadValue(). - virtual bool NeedsSeedingForBatchedReading() const { return true; } - - /// Batched version of ReadValue() that reads up to max_values at once and materializes - /// them into tuples in tuple_mem. Returns the number of values actually materialized - /// in *num_values. The return value, error behavior and state changes are generally - /// the same as in ReadValue(). For example, if an error occurs in the middle of - /// materializing a batch then false is returned, and num_values, tuple_mem, as well as - /// this column reader are left in an undefined state, assuming that the caller will - /// immediately abort execution. - virtual bool ReadValueBatch(MemPool* pool, int max_values, int tuple_size, - uint8_t* tuple_mem, int* num_values); - - /// Batched version of ReadNonRepeatedValue() that reads up to max_values at once and - /// materializes them into tuples in tuple_mem. - /// The return value and error behavior are the same as in ReadValueBatch(). - virtual bool ReadNonRepeatedValueBatch(MemPool* pool, int max_values, int tuple_size, - uint8_t* tuple_mem, int* num_values); - - /// Advances this column reader's def and rep levels to the next logical value, i.e. to - /// the next scalar value or the beginning of the next collection, without attempting to - /// read the value. This is used to skip past def/rep levels that don't materialize a - /// value, such as the def/rep levels corresponding to an empty containing collection. - /// - /// NextLevels() must be called on this reader before calling ReadValue() for the first - /// time. This is to initialize the current value that ReadValue() will read. - /// - /// Returns false if execution should be aborted for some reason, e.g. parse_error_ is - /// set, the query is cancelled, or the scan node limit was reached. Otherwise returns - /// true. - virtual bool NextLevels() = 0; - - /// Should only be called if pos_slot_desc_ is non-NULL. Writes pos_current_value_ to - /// 'tuple' (i.e. "reads" the synthetic position field of the parent collection into - /// 'tuple') and increments pos_current_value_. - void ReadPosition(Tuple* tuple); - - /// Returns true if this column reader has reached the end of the row group. - inline bool RowGroupAtEnd() { return rep_level_ == ROW_GROUP_END; } - - protected: - HdfsParquetScanner* parent_; - const SchemaNode& node_; - const SlotDescriptor* slot_desc_; - - /// The slot descriptor for the position field of the tuple, if there is one. NULL if - /// there's not. Only one column reader for a given tuple desc will have this set. - const SlotDescriptor* pos_slot_desc_; - - /// The next value to write into the position slot, if there is one. 64-bit int because - /// the pos slot is always a BIGINT Set to -1 when this column reader does not have a - /// current rep and def level (i.e. before the first NextLevels() call or after the last - /// value in the column has been read). - int64_t pos_current_value_; - - /// The current repetition and definition levels of this reader. Advanced via - /// ReadValue() and NextLevels(). Set to -1 when this column reader does not have a - /// current rep and def level (i.e. before the first NextLevels() call or after the last - /// value in the column has been read). If this is not inside a collection, rep_level_ is - /// always 0. - /// int16_t is large enough to hold the valid levels 0-255 and sentinel value -1. - /// The maximum values are cached here because they are accessed in inner loops. - int16_t rep_level_; - int16_t max_rep_level_; - int16_t def_level_; - int16_t max_def_level_; - - // Cache frequently accessed members of slot_desc_ for perf. - - /// slot_desc_->tuple_offset(). -1 if slot_desc_ is NULL. - int tuple_offset_; - - /// slot_desc_->null_indicator_offset(). Invalid if slot_desc_ is NULL. - NullIndicatorOffset null_indicator_offset_; - - ColumnReader(HdfsParquetScanner* parent, const SchemaNode& node, - const SlotDescriptor* slot_desc) - : parent_(parent), - node_(node), - slot_desc_(slot_desc), - pos_slot_desc_(NULL), - pos_current_value_(INVALID_POS), - rep_level_(INVALID_LEVEL), - max_rep_level_(node_.max_rep_level), - def_level_(INVALID_LEVEL), - max_def_level_(node_.max_def_level), - tuple_offset_(slot_desc == NULL ? -1 : slot_desc->tuple_offset()), - null_indicator_offset_(slot_desc == NULL ? NullIndicatorOffset(-1, -1) : - slot_desc->null_indicator_offset()) { - DCHECK_GE(node_.max_rep_level, 0); - DCHECK_LE(node_.max_rep_level, std::numeric_limits<int16_t>::max()); - DCHECK_GE(node_.max_def_level, 0); - DCHECK_LE(node_.max_def_level, std::numeric_limits<int16_t>::max()); - // rep_level_ is always valid and equal to 0 if col not in collection. - if (max_rep_level() == 0) rep_level_ = 0; - } -}; - -/// Collections are not materialized directly in parquet files; only scalar values appear -/// in the file. CollectionColumnReader uses the definition and repetition levels of child -/// column readers to figure out the boundaries of each collection in this column. -class HdfsParquetScanner::CollectionColumnReader : - public HdfsParquetScanner::ColumnReader { - public: - CollectionColumnReader(HdfsParquetScanner* parent, const SchemaNode& node, - const SlotDescriptor* slot_desc) - : ColumnReader(parent, node, slot_desc) { - DCHECK(node_.is_repeated()); - if (slot_desc != NULL) DCHECK(slot_desc->type().IsCollectionType()); - } - - virtual ~CollectionColumnReader() { } - - vector<ColumnReader*>* children() { return &children_; } - - virtual bool IsCollectionReader() const { return true; } - - /// The repetition level indicating that the current value is the first in a new - /// collection (meaning the last value read was the final item in the previous - /// collection). - int new_collection_rep_level() const { return max_rep_level() - 1; } - - /// Materializes CollectionValue into tuple slot (if materializing) and advances to next - /// value. - virtual bool ReadValue(MemPool* pool, Tuple* tuple); - - /// Same as ReadValue but does not advance repetition level. Only valid for columns not - /// in collections. - virtual bool ReadNonRepeatedValue(MemPool* pool, Tuple* tuple); - - /// Advances all child readers to the beginning of the next collection and updates this - /// reader's state. - virtual bool NextLevels(); - - /// This is called once for each row group in the file. - void Reset() { - def_level_ = -1; - rep_level_ = -1; - pos_current_value_ = -1; - } - - private: - /// Column readers of fields contained within this collection. There is at least one - /// child reader per collection reader. Child readers either materialize slots in the - /// collection item tuples, or there is a single child reader that does not materialize - /// any slot and is only used by this reader to read def and rep levels. - vector<ColumnReader*> children_; - - /// Updates this reader's def_level_, rep_level_, and pos_current_value_ based on child - /// reader's state. - void UpdateDerivedState(); - - /// Recursively reads from children_ to assemble a single CollectionValue into - /// *slot. Also advances rep_level_ and def_level_ via NextLevels(). - /// - /// Returns false if execution should be aborted for some reason, e.g. parse_error_ is - /// set, the query is cancelled, or the scan node limit was reached. Otherwise returns - /// true. - inline bool ReadSlot(void* slot, MemPool* pool); -}; - -/// Reader for a single column from the parquet file. It's associated with a -/// ScannerContext::Stream and is responsible for decoding the data. Super class for -/// per-type column readers. This contains most of the logic, the type specific functions -/// must be implemented in the subclass. -class HdfsParquetScanner::BaseScalarColumnReader : - public HdfsParquetScanner::ColumnReader { - public: - BaseScalarColumnReader(HdfsParquetScanner* parent, const SchemaNode& node, - const SlotDescriptor* slot_desc) - : ColumnReader(parent, node, slot_desc), - def_levels_(true), - rep_levels_(false), - num_buffered_values_(0), - num_values_read_(0), - metadata_(NULL), - stream_(NULL), - decompressed_data_pool_(new MemPool(parent->scan_node_->mem_tracker())) { - DCHECK_GE(node_.col_idx, 0) << node_.DebugString(); - - } - - virtual ~BaseScalarColumnReader() { } - - /// This is called once for each row group in the file. - Status Reset(const parquet::ColumnMetaData* metadata, ScannerContext::Stream* stream) { - DCHECK(stream != NULL); - DCHECK(metadata != NULL); - - num_buffered_values_ = 0; - data_ = NULL; - data_end_ = NULL; - stream_ = stream; - metadata_ = metadata; - num_values_read_ = 0; - def_level_ = -1; - // See ColumnReader constructor. - rep_level_ = max_rep_level() == 0 ? 0 : -1; - pos_current_value_ = -1; - - if (metadata_->codec != parquet::CompressionCodec::UNCOMPRESSED) { - RETURN_IF_ERROR(Codec::CreateDecompressor( - NULL, false, PARQUET_TO_IMPALA_CODEC[metadata_->codec], &decompressor_)); - } - ClearDictionaryDecoder(); - return Status::OK(); - } - - /// Called once when the scanner is complete for final cleanup. - void Close() { - if (decompressor_.get() != NULL) decompressor_->Close(); - } - - int64_t total_len() const { return metadata_->total_compressed_size; } - int col_idx() const { return node_.col_idx; } - THdfsCompression::type codec() const { - if (metadata_ == NULL) return THdfsCompression::NONE; - return PARQUET_TO_IMPALA_CODEC[metadata_->codec]; - } - MemPool* decompressed_data_pool() const { return decompressed_data_pool_.get(); } - - /// Reads the next definition and repetition levels for this column. Initializes the - /// next data page if necessary. - virtual bool NextLevels() { return NextLevels<true>(); } - - // TODO: Some encodings might benefit a lot from a SkipValues(int num_rows) if - // we know this row can be skipped. This could be very useful with stats and big - // sections can be skipped. Implement that when we can benefit from it. - - protected: - // Friend parent scanner so it can perform validation (e.g. ValidateEndOfRowGroup()) - friend class HdfsParquetScanner; - - // Class members that are accessed for every column should be included up here so they - // fit in as few cache lines as possible. - - /// Pointer to start of next value in data page - uint8_t* data_; - - /// End of the data page. - const uint8_t* data_end_; - - /// Decoder for definition levels. - LevelDecoder def_levels_; - - /// Decoder for repetition levels. - LevelDecoder rep_levels_; - - /// Page encoding for values. Cached here for perf. - parquet::Encoding::type page_encoding_; - - /// Num values remaining in the current data page - int num_buffered_values_; - - // Less frequently used members that are not accessed in inner loop should go below - // here so they do not occupy precious cache line space. - - /// The number of values seen so far. Updated per data page. - int64_t num_values_read_; - - const parquet::ColumnMetaData* metadata_; - scoped_ptr<Codec> decompressor_; - ScannerContext::Stream* stream_; - - /// Pool to allocate decompression buffers from. - boost::scoped_ptr<MemPool> decompressed_data_pool_; - - /// Header for current data page. - parquet::PageHeader current_page_header_; - - /// Read the next data page. If a dictionary page is encountered, that will be read and - /// this function will continue reading the next data page. - Status ReadDataPage(); - - /// Try to move the the next page and buffer more values. Return false and sets rep_level_, - /// def_level_ and pos_current_value_ to -1 if no more pages or an error encountered. - bool NextPage(); - - /// Implementation for NextLevels(). - template <bool ADVANCE_REP_LEVEL> - bool NextLevels(); - - /// Creates a dictionary decoder from values/size. 'decoder' is set to point to a - /// dictionary decoder stored in this object. Subclass must implement this. Returns - /// an error status if the dictionary values could not be decoded successfully. - virtual Status CreateDictionaryDecoder(uint8_t* values, int size, - DictDecoderBase** decoder) = 0; - - /// Return true if the column has an initialized dictionary decoder. Subclass must - /// implement this. - virtual bool HasDictionaryDecoder() = 0; - - /// Clear the dictionary decoder so HasDictionaryDecoder() will return false. Subclass - /// must implement this. - virtual void ClearDictionaryDecoder() = 0; - - /// Initializes the reader with the data contents. This is the content for the entire - /// decompressed data page. Decoders can initialize state from here. - virtual Status InitDataPage(uint8_t* data, int size) = 0; - - private: - /// Writes the next value into *slot using pool if necessary. Also advances rep_level_ - /// and def_level_ via NextLevels(). - /// - /// Returns false if execution should be aborted for some reason, e.g. parse_error_ is - /// set, the query is cancelled, or the scan node limit was reached. Otherwise returns - /// true. - template <bool IN_COLLECTION> - inline bool ReadSlot(void* slot, MemPool* pool); -}; - -/// Per column type reader. If MATERIALIZED is true, the column values are materialized -/// into the slot described by slot_desc. If MATERIALIZED is false, the column values -/// are not materialized, but the position can be accessed. -template<typename T, bool MATERIALIZED> -class HdfsParquetScanner::ScalarColumnReader : - public HdfsParquetScanner::BaseScalarColumnReader { - public: - ScalarColumnReader(HdfsParquetScanner* parent, const SchemaNode& node, - const SlotDescriptor* slot_desc) - : BaseScalarColumnReader(parent, node, slot_desc), - dict_decoder_init_(false) { - if (!MATERIALIZED) { - // We're not materializing any values, just counting them. No need (or ability) to - // initialize state used to materialize values. - DCHECK(slot_desc_ == NULL); - return; - } - - DCHECK(slot_desc_ != NULL); - DCHECK_NE(slot_desc_->type().type, TYPE_BOOLEAN); - if (slot_desc_->type().type == TYPE_DECIMAL) { - fixed_len_size_ = ParquetPlainEncoder::DecimalSize(slot_desc_->type()); - } else if (slot_desc_->type().type == TYPE_VARCHAR) { - fixed_len_size_ = slot_desc_->type().len; - } else { - fixed_len_size_ = -1; - } - needs_conversion_ = slot_desc_->type().type == TYPE_CHAR || - // TODO: Add logic to detect file versions that have unconverted TIMESTAMP - // values. Currently all versions have converted values. - (FLAGS_convert_legacy_hive_parquet_utc_timestamps && - slot_desc_->type().type == TYPE_TIMESTAMP && - parent->file_version_.application == "parquet-mr"); - } - - virtual ~ScalarColumnReader() { } - - virtual bool ReadValue(MemPool* pool, Tuple* tuple) { - return ReadValue<true>(pool, tuple); - } - - virtual bool ReadNonRepeatedValue(MemPool* pool, Tuple* tuple) { - return ReadValue<false>(pool, tuple); - } - - virtual bool NeedsSeedingForBatchedReading() const { return false; } - - virtual bool ReadValueBatch(MemPool* pool, int max_values, int tuple_size, - uint8_t* tuple_mem, int* num_values) { - return ReadValueBatch<true>(pool, max_values, tuple_size, tuple_mem, num_values); - } - - virtual bool ReadNonRepeatedValueBatch(MemPool* pool, int max_values, int tuple_size, - uint8_t* tuple_mem, int* num_values) { - return ReadValueBatch<false>(pool, max_values, tuple_size, tuple_mem, num_values); - } - - protected: - template <bool IN_COLLECTION> - inline bool ReadValue(MemPool* pool, Tuple* tuple) { - // NextLevels() should have already been called and def and rep levels should be in - // valid range. - DCHECK_GE(rep_level_, 0); - DCHECK_LE(rep_level_, max_rep_level()); - DCHECK_GE(def_level_, 0); - DCHECK_LE(def_level_, max_def_level()); - DCHECK_GE(def_level_, def_level_of_immediate_repeated_ancestor()) << - "Caller should have called NextLevels() until we are ready to read a value"; - - if (MATERIALIZED) { - if (def_level_ >= max_def_level()) { - if (page_encoding_ == parquet::Encoding::PLAIN_DICTIONARY) { - if (!ReadSlot<true>(tuple->GetSlot(tuple_offset_), pool)) return false; - } else { - if (!ReadSlot<false>(tuple->GetSlot(tuple_offset_), pool)) return false; - } - } else { - tuple->SetNull(null_indicator_offset_); - } - } - return NextLevels<IN_COLLECTION>(); - } - - /// Implementation of the ReadValueBatch() functions specialized for this - /// column reader type. This function drives the reading of data pages and - /// caching of rep/def levels. Once a data page and cached levels are available, - /// it calls into a more specialized MaterializeValueBatch() for doing the actual - /// value materialization using the level caches. - template<bool IN_COLLECTION> - bool ReadValueBatch(MemPool* pool, int max_values, int tuple_size, - uint8_t* tuple_mem, int* num_values) { - // Repetition level is only present if this column is nested in a collection type. - if (!IN_COLLECTION) DCHECK_EQ(max_rep_level(), 0) << slot_desc()->DebugString(); - if (IN_COLLECTION) DCHECK_GT(max_rep_level(), 0) << slot_desc()->DebugString(); - - int val_count = 0; - bool continue_execution = true; - while (val_count < max_values && !RowGroupAtEnd() && continue_execution) { - // Read next page if necessary. - if (num_buffered_values_ == 0) { - if (!NextPage()) { - continue_execution = parent_->parse_status_.ok(); - continue; - } - } - - // Fill def/rep level caches if they are empty. - int level_batch_size = min(parent_->state_->batch_size(), num_buffered_values_); - if (!def_levels_.CacheHasNext()) { - parent_->parse_status_.MergeStatus(def_levels_.CacheNextBatch(level_batch_size)); - } - // We only need the repetition levels for populating the position slot since we - // are only populating top-level tuples. - if (IN_COLLECTION && pos_slot_desc_ != NULL && !rep_levels_.CacheHasNext()) { - parent_->parse_status_.MergeStatus(rep_levels_.CacheNextBatch(level_batch_size)); - } - if (UNLIKELY(!parent_->parse_status_.ok())) return false; - - // This special case is most efficiently handled here directly. - if (!MATERIALIZED && !IN_COLLECTION) { - int vals_to_add = min(def_levels_.CacheRemaining(), max_values - val_count); - val_count += vals_to_add; - def_levels_.CacheSkipLevels(vals_to_add); - num_buffered_values_ -= vals_to_add; - continue; - } - - // Read data page and cached levels to materialize values. - int cache_start_idx = def_levels_.CacheCurrIdx(); - uint8_t* next_tuple = tuple_mem + val_count * tuple_size; - int remaining_val_capacity = max_values - val_count; - int ret_val_count = 0; - if (page_encoding_ == parquet::Encoding::PLAIN_DICTIONARY) { - continue_execution = MaterializeValueBatch<IN_COLLECTION, true>( - pool, remaining_val_capacity, tuple_size, next_tuple, &ret_val_count); - } else { - continue_execution = MaterializeValueBatch<IN_COLLECTION, false>( - pool, remaining_val_capacity, tuple_size, next_tuple, &ret_val_count); - } - val_count += ret_val_count; - num_buffered_values_ -= (def_levels_.CacheCurrIdx() - cache_start_idx); - } - *num_values = val_count; - return continue_execution; - } - - /// Helper function for ReadValueBatch() above that performs value materialization. - /// It assumes a data page with remaining values is available, and that the def/rep - /// level caches have been populated. - /// For efficiency, the simple special case of !MATERIALIZED && !IN_COLLECTION is not - /// handled in this function. - template<bool IN_COLLECTION, bool IS_DICT_ENCODED> - bool MaterializeValueBatch(MemPool* pool, int max_values, int tuple_size, - uint8_t* tuple_mem, int* num_values) { - DCHECK(MATERIALIZED || IN_COLLECTION); - DCHECK_GT(num_buffered_values_, 0); - DCHECK(def_levels_.CacheHasNext()); - if (IN_COLLECTION && pos_slot_desc_ != NULL) DCHECK(rep_levels_.CacheHasNext()); - - uint8_t* curr_tuple = tuple_mem; - int val_count = 0; - while (def_levels_.CacheHasNext()) { - Tuple* tuple = reinterpret_cast<Tuple*>(curr_tuple); - int def_level = def_levels_.CacheGetNext(); - - if (IN_COLLECTION) { - if (def_level < def_level_of_immediate_repeated_ancestor()) { - // A containing repeated field is empty or NULL. Skip the value but - // move to the next repetition level if necessary. - if (pos_slot_desc_ != NULL) rep_levels_.CacheGetNext(); - continue; - } - if (pos_slot_desc_ != NULL) { - int rep_level = rep_levels_.CacheGetNext(); - // Reset position counter if we are at the start of a new parent collection. - if (rep_level <= max_rep_level() - 1) pos_current_value_ = 0; - void* pos_slot = tuple->GetSlot(pos_slot_desc()->tuple_offset()); - *reinterpret_cast<int64_t*>(pos_slot) = pos_current_value_++; - } - } - - if (MATERIALIZED) { - if (def_level >= max_def_level()) { - bool continue_execution = - ReadSlot<IS_DICT_ENCODED>(tuple->GetSlot(tuple_offset_), pool); - if (UNLIKELY(!continue_execution)) return false; - } else { - tuple->SetNull(null_indicator_offset_); - } - } - - curr_tuple += tuple_size; - ++val_count; - if (UNLIKELY(val_count == max_values)) break; - } - *num_values = val_count; - return true; - } - - virtual Status CreateDictionaryDecoder(uint8_t* values, int size, - DictDecoderBase** decoder) { - if (!dict_decoder_.Reset(values, size, fixed_len_size_)) { - return Status(TErrorCode::PARQUET_CORRUPT_DICTIONARY, filename(), - slot_desc_->type().DebugString(), "could not decode dictionary"); - } - dict_decoder_init_ = true; - *decoder = &dict_decoder_; - return Status::OK(); - } - - virtual bool HasDictionaryDecoder() { - return dict_decoder_init_; - } - - virtual void ClearDictionaryDecoder() { - dict_decoder_init_ = false; - } - - virtual Status InitDataPage(uint8_t* data, int size) { - page_encoding_ = current_page_header_.data_page_header.encoding; - if (page_encoding_ != parquet::Encoding::PLAIN_DICTIONARY && - page_encoding_ != parquet::Encoding::PLAIN) { - stringstream ss; - ss << "File '" << filename() << "' is corrupt: unexpected encoding: " - << PrintEncoding(page_encoding_) << " for data page of column '" - << schema_element().name << "'."; - return Status(ss.str()); - } - - // If slot_desc_ is NULL, dict_decoder_ is uninitialized - if (page_encoding_ == parquet::Encoding::PLAIN_DICTIONARY && slot_desc_ != NULL) { - if (!dict_decoder_init_) { - return Status("File corrupt. Missing dictionary page."); - } - dict_decoder_.SetData(data, size); - } - - // TODO: Perform filter selectivity checks here. - return Status::OK(); - } - - private: - /// Writes the next value into *slot using pool if necessary. - /// - /// Returns false if execution should be aborted for some reason, e.g. parse_error_ is - /// set, the query is cancelled, or the scan node limit was reached. Otherwise returns - /// true. - template<bool IS_DICT_ENCODED> - inline bool ReadSlot(void* slot, MemPool* pool) { - T val; - T* val_ptr = NeedsConversion() ? &val : reinterpret_cast<T*>(slot); - if (IS_DICT_ENCODED) { - DCHECK_EQ(page_encoding_, parquet::Encoding::PLAIN_DICTIONARY); - if (UNLIKELY(!dict_decoder_.GetValue(val_ptr))) { - SetDictDecodeError(); - return false; - } - } else { - DCHECK_EQ(page_encoding_, parquet::Encoding::PLAIN); - int encoded_len = - ParquetPlainEncoder::Decode<T>(data_, data_end_, fixed_len_size_, val_ptr); - if (UNLIKELY(encoded_len < 0)) { - SetPlainDecodeError(); - return false; - } - data_ += encoded_len; - } - if (UNLIKELY(NeedsConversion() && - !ConvertSlot(&val, reinterpret_cast<T*>(slot), pool))) { - return false; - } - return true; - } - - /// Most column readers never require conversion, so we can avoid branches by - /// returning constant false. Column readers for types that require conversion - /// must specialize this function. - inline bool NeedsConversion() const { - DCHECK(!needs_conversion_); - return false; - } - - /// Converts and writes src into dst based on desc_->type() - bool ConvertSlot(const T* src, T* dst, MemPool* pool) { - DCHECK(false); - return false; - } - - /// Pull out slow-path Status construction code from ReadRepetitionLevel()/ - /// ReadDefinitionLevel() for performance. - void __attribute__((noinline)) SetDictDecodeError() { - parent_->parse_status_ = Status(TErrorCode::PARQUET_DICT_DECODE_FAILURE, filename(), - slot_desc_->type().DebugString(), stream_->file_offset()); - } - void __attribute__((noinline)) SetPlainDecodeError() { - parent_->parse_status_ = Status(TErrorCode::PARQUET_CORRUPT_PLAIN_VALUE, filename(), - slot_desc_->type().DebugString(), stream_->file_offset()); - } - - /// Dictionary decoder for decoding column values. - DictDecoder<T> dict_decoder_; - - /// True if dict_decoder_ has been initialized with a dictionary page. - bool dict_decoder_init_; - - /// true if decoded values must be converted before being written to an output tuple. - bool needs_conversion_; - - /// The size of this column with plain encoding for FIXED_LEN_BYTE_ARRAY, or - /// the max length for VARCHAR columns. Unused otherwise. - int fixed_len_size_; -}; - -template<> -inline bool HdfsParquetScanner::ScalarColumnReader<StringValue, true>::NeedsConversion() const { - return needs_conversion_; -} - -template<> -bool HdfsParquetScanner::ScalarColumnReader<StringValue, true>::ConvertSlot( - const StringValue* src, StringValue* dst, MemPool* pool) { - DCHECK(slot_desc() != NULL); - DCHECK(slot_desc()->type().type == TYPE_CHAR); - int len = slot_desc()->type().len; - StringValue sv; - sv.len = len; - if (slot_desc()->type().IsVarLenStringType()) { - sv.ptr = reinterpret_cast<char*>(pool->TryAllocate(len)); - if (UNLIKELY(sv.ptr == NULL)) { - string details = Substitute(PARQUET_MEM_LIMIT_EXCEEDED, "ConvertSlot", - len, "StringValue"); - parent_->parse_status_ = - pool->mem_tracker()->MemLimitExceeded(parent_->state_, details, len); - return false; - } - } else { - sv.ptr = reinterpret_cast<char*>(dst); - } - int unpadded_len = min(len, src->len); - memcpy(sv.ptr, src->ptr, unpadded_len); - StringValue::PadWithSpaces(sv.ptr, len, unpadded_len); - - if (slot_desc()->type().IsVarLenStringType()) *dst = sv; - return true; -} - -template<> -inline bool HdfsParquetScanner::ScalarColumnReader<TimestampValue, true>::NeedsConversion() const { - return needs_conversion_; -} - -template<> -bool HdfsParquetScanner::ScalarColumnReader<TimestampValue, true>::ConvertSlot( - const TimestampValue* src, TimestampValue* dst, MemPool* pool) { - // Conversion should only happen when this flag is enabled. - DCHECK(FLAGS_convert_legacy_hive_parquet_utc_timestamps); - *dst = *src; - if (dst->HasDateAndTime()) dst->UtcToLocal(); - return true; -} - -class HdfsParquetScanner::BoolColumnReader : - public HdfsParquetScanner::BaseScalarColumnReader { - public: - BoolColumnReader(HdfsParquetScanner* parent, const SchemaNode& node, - const SlotDescriptor* slot_desc) - : BaseScalarColumnReader(parent, node, slot_desc) { - if (slot_desc_ != NULL) DCHECK_EQ(slot_desc_->type().type, TYPE_BOOLEAN); - } - - virtual ~BoolColumnReader() { } - - virtual bool ReadValue(MemPool* pool, Tuple* tuple) { - return ReadValue<true>(pool, tuple); - } - - virtual bool ReadNonRepeatedValue(MemPool* pool, Tuple* tuple) { - return ReadValue<false>(pool, tuple); - } - - protected: - virtual Status CreateDictionaryDecoder(uint8_t* values, int size, - DictDecoderBase** decoder) { - DCHECK(false) << "Dictionary encoding is not supported for bools. Should never " - << "have gotten this far."; - return Status::OK(); - } - - virtual bool HasDictionaryDecoder() { - // Decoder should never be created for bools. - return false; - } - - virtual void ClearDictionaryDecoder() { } - - virtual Status InitDataPage(uint8_t* data, int size) { - // Initialize bool decoder - bool_values_ = BitReader(data, size); - return Status::OK(); - } - - private: - template<bool IN_COLLECTION> - inline bool ReadValue(MemPool* pool, Tuple* tuple) { - DCHECK(slot_desc_ != NULL); - // Def and rep levels should be in valid range. - DCHECK_GE(rep_level_, 0); - DCHECK_LE(rep_level_, max_rep_level()); - DCHECK_GE(def_level_, 0); - DCHECK_LE(def_level_, max_def_level()); - DCHECK_GE(def_level_, def_level_of_immediate_repeated_ancestor()) << - "Caller should have called NextLevels() until we are ready to read a value"; - - if (def_level_ >= max_def_level()) { - return ReadSlot<IN_COLLECTION>(tuple->GetSlot(tuple_offset_), pool); - } else { - // Null value - tuple->SetNull(null_indicator_offset_); - return NextLevels<IN_COLLECTION>(); - } - } - - /// Writes the next value into *slot using pool if necessary. Also advances def_level_ - /// and rep_level_ via NextLevels(). - /// - /// Returns false if execution should be aborted for some reason, e.g. parse_error_ is - /// set, the query is cancelled, or the scan node limit was reached. Otherwise returns - /// true. - template <bool IN_COLLECTION> - inline bool ReadSlot(void* slot, MemPool* pool) { - if (!bool_values_.GetValue(1, reinterpret_cast<bool*>(slot))) { - parent_->parse_status_ = Status("Invalid bool column."); - return false; - } - return NextLevels<IN_COLLECTION>(); - } - - BitReader bool_values_; -}; - -} - Status HdfsParquetScanner::Prepare(ScannerContext* context) { RETURN_IF_ERROR(HdfsScanner::Prepare(context)); metadata_range_ = stream_->scan_range(); @@ -1154,16 +169,16 @@ void HdfsParquetScanner::Close() { vector<THdfsCompression::type> compression_types; // Visit each column reader, including collection reader children. - stack<ColumnReader*> readers; - for (ColumnReader* r: column_readers_) readers.push(r); + stack<ParquetColumnReader*> readers; + for (ParquetColumnReader* r: column_readers_) readers.push(r); while (!readers.empty()) { - ColumnReader* col_reader = readers.top(); + ParquetColumnReader* col_reader = readers.top(); readers.pop(); if (col_reader->IsCollectionReader()) { CollectionColumnReader* collection_reader = static_cast<CollectionColumnReader*>(col_reader); - for (ColumnReader* r: *collection_reader->children()) readers.push(r); + for (ParquetColumnReader* r: *collection_reader->children()) readers.push(r); continue; } @@ -1207,629 +222,6 @@ void HdfsParquetScanner::Close() { HdfsScanner::Close(); } -HdfsParquetScanner::ColumnReader* HdfsParquetScanner::CreateReader( - const SchemaNode& node, bool is_collection_field, const SlotDescriptor* slot_desc) { - ColumnReader* reader = NULL; - if (is_collection_field) { - // Create collection reader (note this handles both NULL and non-NULL 'slot_desc') - reader = new CollectionColumnReader(this, node, slot_desc); - } else if (slot_desc != NULL) { - // Create the appropriate ScalarColumnReader type to read values into 'slot_desc' - switch (slot_desc->type().type) { - case TYPE_BOOLEAN: - reader = new BoolColumnReader(this, node, slot_desc); - break; - case TYPE_TINYINT: - reader = new ScalarColumnReader<int8_t, true>(this, node, slot_desc); - break; - case TYPE_SMALLINT: - reader = new ScalarColumnReader<int16_t, true>(this, node, slot_desc); - break; - case TYPE_INT: - reader = new ScalarColumnReader<int32_t, true>(this, node, slot_desc); - break; - case TYPE_BIGINT: - reader = new ScalarColumnReader<int64_t, true>(this, node, slot_desc); - break; - case TYPE_FLOAT: - reader = new ScalarColumnReader<float, true>(this, node, slot_desc); - break; - case TYPE_DOUBLE: - reader = new ScalarColumnReader<double, true>(this, node, slot_desc); - break; - case TYPE_TIMESTAMP: - reader = new ScalarColumnReader<TimestampValue, true>(this, node, slot_desc); - break; - case TYPE_STRING: - case TYPE_VARCHAR: - case TYPE_CHAR: - reader = new ScalarColumnReader<StringValue, true>(this, node, slot_desc); - break; - case TYPE_DECIMAL: - switch (slot_desc->type().GetByteSize()) { - case 4: - reader = new ScalarColumnReader<Decimal4Value, true>(this, node, slot_desc); - break; - case 8: - reader = new ScalarColumnReader<Decimal8Value, true>(this, node, slot_desc); - break; - case 16: - reader = new ScalarColumnReader<Decimal16Value, true>(this, node, slot_desc); - break; - } - break; - default: - DCHECK(false) << slot_desc->type().DebugString(); - } - } else { - // Special case for counting scalar values (e.g. count(*), no materialized columns in - // the file, only materializing a position slot). We won't actually read any values, - // only the rep and def levels, so it doesn't matter what kind of reader we make. - reader = new ScalarColumnReader<int8_t, false>(this, node, slot_desc); - } - return obj_pool_.Add(reader); -} - -bool HdfsParquetScanner::ColumnReader::ReadValueBatch(MemPool* pool, int max_values, - int tuple_size, uint8_t* tuple_mem, int* num_values) { - int val_count = 0; - bool continue_execution = true; - while (val_count < max_values && !RowGroupAtEnd() && continue_execution) { - Tuple* tuple = reinterpret_cast<Tuple*>(tuple_mem + val_count * tuple_size); - if (def_level_ < def_level_of_immediate_repeated_ancestor()) { - // A containing repeated field is empty or NULL - continue_execution = NextLevels(); - continue; - } - // Fill in position slot if applicable - if (pos_slot_desc_ != NULL) ReadPosition(tuple); - continue_execution = ReadValue(pool, tuple); - ++val_count; - } - *num_values = val_count; - return continue_execution; -} - -bool HdfsParquetScanner::ColumnReader::ReadNonRepeatedValueBatch(MemPool* pool, - int max_values, int tuple_size, uint8_t* tuple_mem, int* num_values) { - int val_count = 0; - bool continue_execution = true; - while (val_count < max_values && !RowGroupAtEnd() && continue_execution) { - Tuple* tuple = reinterpret_cast<Tuple*>(tuple_mem + val_count * tuple_size); - continue_execution = ReadNonRepeatedValue(pool, tuple); - ++val_count; - } - *num_values = val_count; - return continue_execution; -} - -void HdfsParquetScanner::ColumnReader::ReadPosition(Tuple* tuple) { - DCHECK(pos_slot_desc() != NULL); - // NextLevels() should have already been called - DCHECK_GE(rep_level_, 0); - DCHECK_GE(def_level_, 0); - DCHECK_GE(pos_current_value_, 0); - DCHECK_GE(def_level_, def_level_of_immediate_repeated_ancestor()) << - "Caller should have called NextLevels() until we are ready to read a value"; - - void* slot = tuple->GetSlot(pos_slot_desc()->tuple_offset()); - *reinterpret_cast<int64_t*>(slot) = pos_current_value_++; -} - -// In 1.1, we had a bug where the dictionary page metadata was not set. Returns true -// if this matches those versions and compatibility workarounds need to be used. -static bool RequiresSkippedDictionaryHeaderCheck( - const HdfsParquetScanner::FileVersion& v) { - if (v.application != "impala") return false; - return v.VersionEq(1,1,0) || (v.VersionEq(1,2,0) && v.is_impala_internal); -} - -Status HdfsParquetScanner::BaseScalarColumnReader::ReadDataPage() { - Status status; - uint8_t* buffer; - - // We're about to move to the next data page. The previous data page is - // now complete, pass along the memory allocated for it. - parent_->scratch_batch_->mem_pool()->AcquireData(decompressed_data_pool_.get(), false); - - // Read the next data page, skipping page types we don't care about. - // We break out of this loop on the non-error case (a data page was found or we read all - // the pages). - while (true) { - DCHECK_EQ(num_buffered_values_, 0); - if (num_values_read_ == metadata_->num_values) { - // No more pages to read - // TODO: should we check for stream_->eosr()? - break; - } else if (num_values_read_ > metadata_->num_values) { - ErrorMsg msg(TErrorCode::PARQUET_COLUMN_METADATA_INVALID, - metadata_->num_values, num_values_read_, node_.element->name, filename()); - RETURN_IF_ERROR(parent_->LogOrReturnError(msg)); - return Status::OK(); - } - - int64_t buffer_size; - RETURN_IF_ERROR(stream_->GetBuffer(true, &buffer, &buffer_size)); - if (buffer_size == 0) { - // The data pages contain fewer values than stated in the column metadata. - DCHECK(stream_->eosr()); - DCHECK_LT(num_values_read_, metadata_->num_values); - // TODO for 2.3: node_.element->name isn't necessarily useful - ErrorMsg msg(TErrorCode::PARQUET_COLUMN_METADATA_INVALID, - metadata_->num_values, num_values_read_, node_.element->name, filename()); - RETURN_IF_ERROR(parent_->LogOrReturnError(msg)); - return Status::OK(); - } - - // We don't know the actual header size until the thrift object is deserialized. Loop - // until we successfully deserialize the header or exceed the maximum header size. - uint32_t header_size; - while (true) { - header_size = buffer_size; - status = DeserializeThriftMsg( - buffer, &header_size, true, ¤t_page_header_); - if (status.ok()) break; - - if (buffer_size >= FLAGS_max_page_header_size) { - stringstream ss; - ss << "ParquetScanner: could not read data page because page header exceeded " - << "maximum size of " - << PrettyPrinter::Print(FLAGS_max_page_header_size, TUnit::BYTES); - status.AddDetail(ss.str()); - return status; - } - - // Didn't read entire header, increase buffer size and try again - Status status; - int64_t new_buffer_size = max<int64_t>(buffer_size * 2, 1024); - bool success = stream_->GetBytes( - new_buffer_size, &buffer, &new_buffer_size, &status, /* peek */ true); - if (!success) { - DCHECK(!status.ok()); - return status; - } - DCHECK(status.ok()); - - if (buffer_size == new_buffer_size) { - DCHECK_NE(new_buffer_size, 0); - return Status(TErrorCode::PARQUET_HEADER_EOF, filename()); - } - DCHECK_GT(new_buffer_size, buffer_size); - buffer_size = new_buffer_size; - } - - // Successfully deserialized current_page_header_ - if (!stream_->SkipBytes(header_size, &status)) return status; - - int data_size = current_page_header_.compressed_page_size; - int uncompressed_size = current_page_header_.uncompressed_page_size; - - if (current_page_header_.type == parquet::PageType::DICTIONARY_PAGE) { - if (slot_desc_ == NULL) { - // Skip processing the dictionary page if we don't need to decode any values. In - // addition to being unnecessary, we are likely unable to successfully decode the - // dictionary values because we don't necessarily create the right type of scalar - // reader if there's no slot to read into (see CreateReader()). - if (!stream_->ReadBytes(data_size, &data_, &status)) return status; - continue; - } - - if (HasDictionaryDecoder()) { - return Status("Column chunk should not contain two dictionary pages."); - } - if (node_.element->type == parquet::Type::BOOLEAN) { - return Status("Unexpected dictionary page. Dictionary page is not" - " supported for booleans."); - } - const parquet::DictionaryPageHeader* dict_header = NULL; - if (current_page_header_.__isset.dictionary_page_header) { - dict_header = ¤t_page_header_.dictionary_page_header; - } else { - if (!RequiresSkippedDictionaryHeaderCheck(parent_->file_version_)) { - return Status("Dictionary page does not have dictionary header set."); - } - } - if (dict_header != NULL && - dict_header->encoding != parquet::Encoding::PLAIN && - dict_header->encoding != parquet::Encoding::PLAIN_DICTIONARY) { - return Status("Only PLAIN and PLAIN_DICTIONARY encodings are supported " - "for dictionary pages."); - } - - if (!stream_->ReadBytes(data_size, &data_, &status)) return status; - data_end_ = data_ + data_size; - - uint8_t* dict_values = NULL; - if (decompressor_.get() != NULL) { - dict_values = parent_->dictionary_pool_->TryAllocate(uncompressed_size); - if (UNLIKELY(dict_values == NULL)) { - string details = Substitute(PARQUET_MEM_LIMIT_EXCEEDED, "ReadDataPage", - uncompressed_size, "dictionary"); - return parent_->dictionary_pool_->mem_tracker()->MemLimitExceeded( - parent_->state_, details, uncompressed_size); - } - RETURN_IF_ERROR(decompressor_->ProcessBlock32(true, data_size, data_, - &uncompressed_size, &dict_values)); - VLOG_FILE << "Decompressed " << data_size << " to " << uncompressed_size; - if (current_page_header_.uncompressed_page_size != uncompressed_size) { - return Status(Substitute("Error decompressing dictionary page in file '$0'. " - "Expected $1 uncompressed bytes but got $2", filename(), - current_page_header_.uncompressed_page_size, uncompressed_size)); - } - data_size = uncompressed_size; - } else { - if (current_page_header_.uncompressed_page_size != data_size) { - return Status(Substitute("Error reading dictionary page in file '$0'. " - "Expected $1 bytes but got $2", filename(), - current_page_header_.uncompressed_page_size, data_size)); - } - // Copy dictionary from io buffer (which will be recycled as we read - // more data) to a new buffer - dict_values = parent_->dictionary_pool_->TryAllocate(data_size); - if (UNLIKELY(dict_values == NULL)) { - string details = Substitute(PARQUET_MEM_LIMIT_EXCEEDED, "ReadDataPage", - data_size, "dictionary"); - return parent_->dictionary_pool_->mem_tracker()->MemLimitExceeded( - parent_->state_, details, data_size); - } - memcpy(dict_values, data_, data_size); - } - - DictDecoderBase* dict_decoder; - RETURN_IF_ERROR(CreateDictionaryDecoder(dict_values, data_size, &dict_decoder)); - if (dict_header != NULL && - dict_header->num_values != dict_decoder->num_entries()) { - return Status(TErrorCode::PARQUET_CORRUPT_DICTIONARY, filename(), - slot_desc_->type().DebugString(), - Substitute("Expected $0 entries but data contained $1 entries", - dict_header->num_values, dict_decoder->num_entries())); - } - // Done with dictionary page, read next page - continue; - } - - if (current_page_header_.type != parquet::PageType::DATA_PAGE) { - // We can safely skip non-data pages - if (!stream_->SkipBytes(data_size, &status)) return status; - continue; - } - - // Read Data Page - // TODO: when we start using page statistics, we will need to ignore certain corrupt - // statistics. See IMPALA-2208 and PARQUET-251. - if (!stream_->ReadBytes(data_size, &data_, &status)) return status; - data_end_ = data_ + data_size; - num_buffered_values_ = current_page_header_.data_page_header.num_values; - num_values_read_ += num_buffered_values_; - - if (decompressor_.get() != NULL) { - SCOPED_TIMER(parent_->decompress_timer_); - uint8_t* decompressed_buffer = - decompressed_data_pool_->TryAllocate(uncompressed_size); - if (UNLIKELY(decompressed_buffer == NULL)) { - string details = Substitute(PARQUET_MEM_LIMIT_EXCEEDED, "ReadDataPage", - uncompressed_size, "decompressed data"); - return decompressed_data_pool_->mem_tracker()->MemLimitExceeded( - parent_->state_, details, uncompressed_size); - } - RETURN_IF_ERROR(decompressor_->ProcessBlock32(true, - current_page_header_.compressed_page_size, data_, &uncompressed_size, - &decompressed_buffer)); - VLOG_FILE << "Decompressed " << current_page_header_.compressed_page_size - << " to " << uncompressed_size; - if (current_page_header_.uncompressed_page_size != uncompressed_size) { - return Status(Substitute("Error decompressing data page in file '$0'. " - "Expected $1 uncompressed bytes but got $2", filename(), - current_page_header_.uncompressed_page_size, uncompressed_size)); - } - data_ = decompressed_buffer; - data_size = current_page_header_.uncompressed_page_size; - data_end_ = data_ + data_size; - } else { - DCHECK_EQ(metadata_->codec, parquet::CompressionCodec::UNCOMPRESSED); - if (current_page_header_.compressed_page_size != uncompressed_size) { - return Status(Substitute("Error reading data page in file '$0'. " - "Expected $1 bytes but got $2", filename(), - current_page_header_.compressed_page_size, uncompressed_size)); - } - } - - // Initialize the repetition level data - RETURN_IF_ERROR(rep_levels_.Init(filename(), - current_page_header_.data_page_header.repetition_level_encoding, - parent_->level_cache_pool_.get(), parent_->state_->batch_size(), - max_rep_level(), num_buffered_values_, - &data_, &data_size)); - - // Initialize the definition level data - RETURN_IF_ERROR(def_levels_.Init(filename(), - current_page_header_.data_page_header.definition_level_encoding, - parent_->level_cache_pool_.get(), parent_->state_->batch_size(), - max_def_level(), num_buffered_values_, &data_, &data_size)); - - // Data can be empty if the column contains all NULLs - if (data_size != 0) RETURN_IF_ERROR(InitDataPage(data_, data_size)); - break; - } - - return Status::OK(); -} - -Status HdfsParquetScanner::LevelDecoder::Init(const string& filename, - parquet::Encoding::type encoding, MemPool* cache_pool, int cache_size, - int max_level, int num_buffered_values, uint8_t** data, int* data_size) { - encoding_ = encoding; - max_level_ = max_level; - num_buffered_values_ = num_buffered_values; - filename_ = filename; - RETURN_IF_ERROR(InitCache(cache_pool, cache_size)); - - // Return because there is no level data to read, e.g., required field. - if (max_level == 0) return Status::OK(); - - int32_t num_bytes = 0; - switch (encoding) { - case parquet::Encoding::RLE: { - Status status; - if (!ReadWriteUtil::Read(data, data_size, &num_bytes, &status)) { - return status; - } - if (num_bytes < 0) { - return Status(TErrorCode::PARQUET_CORRUPT_RLE_BYTES, filename, num_bytes); - } - int bit_width = Bits::Log2Ceiling64(max_level + 1); - Reset(*data, num_bytes, bit_width); - break; - } - case parquet::Encoding::BIT_PACKED: - num_bytes = BitUtil::Ceil(num_buffered_values, 8); - bit_reader_.Reset(*data, num_bytes); - break; - default: { - stringstream ss; - ss << "Unsupported encoding: " << encoding; - return Status(ss.str()); - } - } - DCHECK_GT(num_bytes, 0); - *data += num_bytes; - *data_size -= num_bytes; - return Status::OK(); -} - -Status HdfsParquetScanner::LevelDecoder::InitCache(MemPool* pool, int cache_size) { - num_cached_levels_ = 0; - cached_level_idx_ = 0; - // Memory has already been allocated. - if (cached_levels_ != NULL) { - DCHECK_EQ(cache_size_, cache_size); - return Status::OK(); - } - - cached_levels_ = reinterpret_cast<uint8_t*>(pool->TryAllocate(cache_size)); - if (cached_levels_ == NULL) { - return pool->mem_tracker()->MemLimitExceeded( - NULL, "Definition level cache", cache_size); - } - memset(cached_levels_, 0, cache_size); - cache_size_ = cache_size; - return Status::OK(); -} - -inline int16_t HdfsParquetScanner::LevelDecoder::ReadLevel() { - bool valid; - uint8_t level; - if (encoding_ == parquet::Encoding::RLE) { - valid = Get(&level); - } else { - DCHECK_EQ(encoding_, parquet::Encoding::BIT_PACKED); - valid = bit_reader_.GetValue(1, &level); - } - return LIKELY(valid) ? level : INVALID_LEVEL; -} - -Status HdfsParquetScanner::LevelDecoder::CacheNextBatch(int batch_size) { - DCHECK_LE(batch_size, cache_size_); - cached_level_idx_ = 0; - if (max_level_ > 0) { - if (UNLIKELY(!FillCache(batch_size, &num_cached_levels_))) { - return Status(decoding_error_code_, num_buffered_values_, filename_); - } - } else { - // No levels to read, e.g., because the field is required. The cache was - // already initialized with all zeros, so we can hand out those values. - DCHECK_EQ(max_level_, 0); - num_cached_levels_ = batch_size; - } - return Status::OK(); -} - -bool HdfsParquetScanner::LevelDecoder::FillCache(int batch_size, - int* num_cached_levels) { - DCHECK(num_cached_levels != NULL); - int num_values = 0; - if (encoding_ == parquet::Encoding::RLE) { - while (true) { - // Add RLE encoded values by repeating the current value this number of times. - uint32_t num_repeats_to_set = - min<uint32_t>(repeat_count_, batch_size - num_values); - memset(cached_levels_ + num_values, current_value_, num_repeats_to_set); - num_values += num_repeats_to_set; - repeat_count_ -= num_repeats_to_set; - - // Add remaining literal values, if any. - uint32_t num_literals_to_set = - min<uint32_t>(literal_count_, batch_size - num_values); - int num_values_end = min<uint32_t>(num_values + literal_count_, batch_size); - for (; num_values < num_values_end; ++num_values) { - bool valid = bit_reader_.GetValue(bit_width_, &cached_levels_[num_values]); - if (UNLIKELY(!valid || cached_levels_[num_values] > max_level_)) return false; - } - literal_count_ -= num_literals_to_set; - - if (num_values == batch_size) break; - if (UNLIKELY(!NextCounts<int16_t>())) return false; - if (repeat_count_ > 0 && current_value_ > max_level_) return false; - } - } else { - DCHECK_EQ(encoding_, parquet::Encoding::BIT_PACKED); - for (; num_values < batch_size; ++num_values) { - bool valid = bit_reader_.GetValue(1, &cached_levels_[num_values]); - if (UNLIKELY(!valid || cached_levels_[num_values] > max_level_)) return false; - } - } - *num_cached_levels = num_values; - return true; -} - -template <bool ADVANCE_REP_LEVEL> -bool HdfsParquetScanner::BaseScalarColumnReader::NextLevels() { - if (!ADVANCE_REP_LEVEL) DCHECK_EQ(max_rep_level(), 0) << slot_desc()->DebugString(); - - if (UNLIKELY(num_buffered_values_ == 0)) { - if (!NextPage()) return parent_->parse_status_.ok(); - } - --num_buffered_values_; - - // Definition level is not present if column and any containing structs are required. - def_level_ = max_def_level() == 0 ? 0 : def_levels_.ReadLevel(); - - if (ADVANCE_REP_LEVEL && max_rep_level() > 0) { - // Repetition level is only present if this column is nested in any collection type. - rep_level_ = rep_levels_.ReadLevel(); - // Reset position counter if we are at the start of a new parent collection. - if (rep_level_ <= max_rep_level() - 1) pos_current_value_ = 0; - } - - return parent_->parse_status_.ok(); -} - -bool HdfsParquetScanner::BaseScalarColumnReader::NextPage() { - parent_->assemble_rows_timer_.Stop(); - parent_->parse_status_ = ReadDataPage(); - if (UNLIKELY(!parent_->parse_status_.ok())) return false; - if (num_buffered_values_ == 0) { - rep_level_ = ROW_GROUP_END; - def_level_ = INVALID_LEVEL; - pos_current_value_ = INVALID_POS; - return false; - } - parent_->assemble_rows_timer_.Start(); - return true; -} - -bool HdfsParquetScanner::CollectionColumnReader::NextLevels() { - DCHECK(!children_.empty()); - DCHECK_LE(rep_level_, new_collection_rep_level()); - for (int c = 0; c < children_.size(); ++c) { - do { - // TODO(skye): verify somewhere that all column readers are at end - if (!children_[c]->NextLevels()) return false; - } while (children_[c]->rep_level() > new_collection_rep_level()); - } - UpdateDerivedState(); - return true; -} - -bool HdfsParquetScanner::CollectionColumnReader::ReadValue(MemPool* pool, Tuple* tuple) { - DCHECK_GE(rep_level_, 0); - DCHECK_GE(def_level_, 0); - DCHECK_GE(def_level_, def_level_of_immediate_repeated_ancestor()) << - "Caller should have called NextLevels() until we are ready to read a value"; - - if (tuple_offset_ == -1) { - return CollectionColumnReader::NextLevels(); - } else if (def_level_ >= max_def_level()) { - return ReadSlot(tuple->GetSlot(tuple_offset_), pool); - } else { - // Null value - tuple->SetNull(null_indicator_offset_); - return CollectionColumnReader::NextLevels(); - } -} - -bool HdfsParquetScanner::CollectionColumnReader::ReadNonRepeatedValue( - MemPool* pool, Tuple* tuple) { - return CollectionColumnReader::ReadValue(pool, tuple); -} - -bool HdfsParquetScanner::CollectionColumnReader::ReadSlot(void* slot, MemPool* pool) { - DCHECK(!children_.empty()); - DCHECK_LE(rep_level_, new_collection_rep_level()); - - // Recursively read the collection into a new CollectionValue. - CollectionValue* coll_slot = reinterpret_cast<CollectionValue*>(slot); - *coll_slot = CollectionValue(); - CollectionValueBuilder builder( - coll_slot, *slot_desc_->collection_item_descriptor(), pool, parent_->state_); - bool continue_execution = parent_->AssembleCollection( - children_, new_collection_rep_level(), &builder); - if (!continue_execution) return false; - - // AssembleCollection() advances child readers, so we don't need to call NextLevels() - UpdateDerivedState(); - return true; -} - -void HdfsParquetScanner::CollectionColumnReader::UpdateDerivedState() { - // We don't need to cap our def_level_ at max_def_level(). We always check def_level_ - // >= max_def_level() to check if the collection is defined. - // TODO(skye): consider capping def_level_ at max_def_level() - def_level_ = children_[0]->def_level(); - rep_level_ = children_[0]->rep_level(); - - // All children should have been advanced to the beginning of the next collection - for (int i = 0; i < children_.size(); ++i) { - DCHECK_EQ(children_[i]->rep_level(), rep_level_); - if (def_level_ < max_def_level()) { - // Collection not defined - FILE_CHECK_EQ(children_[i]->def_level(), def_level_); - } else { - // Collection is defined - FILE_CHECK_GE(children_[i]->def_level(), max_def_level()); - } - } - - if (RowGroupAtEnd()) { - // No more values - pos_current_value_ = INVALID_POS; - } else if (rep_level_ <= max_rep_level() - 2) { - // Reset position counter if we are at the start of a new parent collection (i.e., - // the current collection is the first item in a new parent collection). - pos_current_value_ = 0; - } -} - -Status HdfsParquetScanner::ValidateColumnOffsets(const parquet::RowGroup& row_group) { - const HdfsFileDesc* file_desc = scan_node_->GetFileDesc(filename()); - for (int i = 0; i < row_group.columns.size(); ++i) { - const parquet::ColumnChunk& col_chunk = row_group.columns[i]; - int64_t col_start = col_chunk.meta_data.data_page_offset; - // The file format requires that if a dictionary page exists, it be before data pages. - if (col_chunk.meta_data.__isset.dictionary_page_offset) { - if (col_chunk.meta_data.dictionary_page_offset >= col_start) { - stringstream ss; - ss << "File " << file_desc->filename << ": metadata is corrupt. " - << "Dictionary page (offset=" << col_chunk.meta_data.dictionary_page_offset - << ") must come before any data pages (offset=" << col_start << ")."; - return Status(ss.str()); - } - col_start = col_chunk.meta_data.dictionary_page_offset; - } - int64_t col_len = col_chunk.meta_data.total_compressed_size; - int64_t col_end = col_start + col_len; - if (col_end <= 0 || col_end > file_desc->file_length) { - stringstream ss; - ss << "File " << file_desc->filename << ": metadata is corrupt. " - << "Column " << i << " has invalid column offsets " - << "(offset=" << col_start << ", size=" << col_len << ", " - << "file_size=" << file_desc->file_length << ")."; - return Status(ss.str()); - } - } - return Status::OK(); -} - // Get the start of the column. static int64_t GetColumnStartOffset(const parquet::ColumnMetaData& column) { if (column.__isset.dictionary_page_offset) { @@ -1851,18 +243,18 @@ static int64_t GetRowGroupMidOffset(const parquet::RowGroup& row_group) { return start_offset + (end_offset - start_offset) / 2; } -int HdfsParquetScanner::CountScalarColumns(const vector<ColumnReader*>& column_readers) { +int HdfsParquetScanner::CountScalarColumns(const vector<ParquetColumnReader*>& column_readers) { DCHECK(!column_readers.empty()); int num_columns = 0; - stack<ColumnReader*> readers; - for (ColumnReader* r: column_readers_) readers.push(r); + stack<ParquetColumnReader*> readers; + for (ParquetColumnReader* r: column_readers_) readers.push(r); while (!readers.empty()) { - ColumnReader* col_reader = readers.top(); + ParquetColumnReader* col_reader = readers.top(); readers.pop(); if (col_reader->IsCollectionReader()) { CollectionColumnReader* collection_reader = static_cast<CollectionColumnReader*>(col_reader); - for (ColumnReader* r: *collection_reader->children()) readers.push(r); + for (ParquetColumnReader* r: *collection_reader->children()) readers.push(r); continue; } ++num_columns; @@ -1875,11 +267,16 @@ Status HdfsParquetScanner::ProcessSplit() { // First process the file metadata in the footer bool eosr; RETURN_IF_ERROR(ProcessFooter(&eosr)); - if (eosr) return Status::OK(); + // Parse the file schema into an internal representation for schema resolution. + ParquetSchemaResolver schema_resolver(*scan_node_->hdfs_table(), + state_->query_options().parquet_fallback_schema_resolution); + RETURN_IF_ERROR(schema_resolver.Init(&file_metadata_, filename())); + // We've processed the metadata and there are columns that need to be materialized. - RETURN_IF_ERROR(CreateColumnReaders(*scan_node_->tuple_desc(), &column_readers_)); + RETURN_IF_ERROR( + CreateColumnReaders(*scan_node_->tuple_desc(), schema_resolver, &column_readers_)); COUNTER_SET(num_cols_counter_, static_cast<int64_t>(CountScalarColumns(column_readers_))); // Set top-level template tuple. @@ -1895,9 +292,11 @@ Status HdfsParquetScanner::ProcessSplit() { const parquet::RowGroup& row_group = file_metadata_.row_groups[i]; if (row_group.num_rows == 0) continue; - const DiskIoMgr::ScanRange* split_range = - reinterpret_cast<ScanRangeMetadata*>(metadata_range_->meta_data())->original_split; - RETURN_IF_ERROR(ValidateColumnOffsets(row_group)); + const DiskIoMgr::ScanRange* split_range = reinterpret_cast<ScanRangeMetadata*>( + metadata_range_->meta_data())->original_split; + HdfsFileDesc* file_desc = scan_node_->GetFileDesc(filename()); + RETURN_IF_ERROR(ParquetMetadataUtils::ValidateColumnOffsets( + file_desc->filename, file_desc->file_length, row_group)); int64_t row_group_mid_pos = GetRowGroupMidOffset(row_group); int64_t split_offset = split_range->offset(); @@ -1919,7 +318,7 @@ Status HdfsParquetScanner::ProcessSplit() { // Prepare column readers for first read bool continue_execution = true; - for (ColumnReader* col_reader: column_readers_) { + for (ParquetColumnReader* col_reader: column_readers_) { // Seed collection and boolean column readers with NextLevel(). // The ScalarColumnReaders use an optimized ReadValueBatch() that // should not be seeded. @@ -1950,7 +349,7 @@ Status HdfsParquetScanner::ProcessSplit() { // with parse_status_. RETURN_IF_ERROR(state_->GetQueryStatus()); if (UNLIKELY(!parse_status_.ok())) { - RETURN_IF_ERROR(LogOrReturnError(parse_status_.msg())); + RETURN_IF_ERROR(state_->LogOrReturnError(parse_status_.msg())); } if (scan_node_->ReachedLimit()) return Status::OK(); if (context_->cancelled()) return Status::OK(); @@ -2078,7 +477,7 @@ bool HdfsParquetScanner::EvalRuntimeFilters(TupleRow* row) { /// difficult to maintain a maximum memory footprint without throwing away at least /// some work. This point needs further experimentation and thought. bool HdfsParquetScanner::AssembleRows( - const vector<ColumnReader*>& column_readers, int row_group_idx, bool* filters_pass) { + const vector<ParquetColumnReader*>& column_readers, int row_group_idx, bool* filters_pass) { DCHECK(!column_readers.empty()); DCHECK(scratch_batch_ != NULL); @@ -2111,7 +510,7 @@ bool HdfsParquetScanner::AssembleRows( int last_num_tuples = -1; int num_col_readers = column_readers.size(); for (int c = 0; c < num_col_readers; ++c) { - ColumnReader* col_reader = column_readers[c]; + ParquetColumnReader* col_reader = column_readers[c]; if (col_reader->max_rep_level() > 0) { continue_execution = col_reader->ReadValueBatch( scratch_batch_->mem_pool(), scratch_capacity, tuple_byte_size_, @@ -2150,7 +549,7 @@ bool HdfsParquetScanner::AssembleRows( } bool HdfsParquetScanner::AssembleCollection( - const vector<ColumnReader*>& column_readers, int new_collection_rep_level, + const vector<ParquetColumnReader*>& column_readers, int new_collection_rep_level, CollectionValueBuilder* coll_value_builder) { DCHECK(!column_readers.empty()); DCHECK_GE(new_collection_rep_level, 0); @@ -2229,13 +628,13 @@ bool HdfsParquetScanner::AssembleCollection( } inline bool HdfsParquetScanner::ReadCollectionItem( - const vector<ColumnReader*>& column_readers, + const vector<ParquetColumnReader*>& column_readers, bool materialize_tuple, MemPool* pool, Tuple* tuple) const { DCHECK(!column_readers.empty()); bool continue_execution = true; int size = column_readers.size(); for (int c = 0; c < size; ++c) { - ColumnReader* col_reader = column_readers[c]; + ParquetColumnReader* col_reader = column_readers[c]; if (materialize_tuple) { // All column readers for this tuple should a value to materialize. FILE_CHECK_GE(col_reader->def_level(), @@ -2364,9 +763,11 @@ Status HdfsParquetScanner::ProcessFooter(bool* eosr) { status.GetDetail())); } - RETURN_IF_ERROR(ValidateFileMetadata()); - // Parse file schema - RETURN_IF_ERROR(CreateSchemaTree(file_metadata_.schema, &schema_)); + RETURN_IF_ERROR(ParquetMetadataUtils::ValidateFileVersion(file_metadata_, filename())); + // Parse out the created by application version string + if (file_metadata_.__isset.created_by) { + file_version_ = ParquetFileVersion(file_metadata_.created_by); + } if (scan_node_->IsZeroSlotTableScan()) { // There are no materialized slots, e.g. count(*) over the table. We can serve @@ -2398,301 +799,12 @@ Status HdfsParquetScanner::ProcessFooter(bool* eosr) { return Status( Substitute("Invalid file. This file: $0 has no row groups", filename())); } - if (schema_.children.empty()) { - return Status(Substitute("Invalid file: '$0' has no columns.", filename())); - } - return Status::OK(); -} - -Status HdfsParquetScanner::ResolvePath(const SchemaPath& path, SchemaNode** node, - bool* pos_field, bool* missing_field) { - *missing_field = false; - // First try two-level array encoding. - bool missing_field_two_level; - Status status_two_level = - ResolvePathHelper(TWO_LEVEL, path, node, pos_field, &missing_field_two_level); - if (missing_field_two_level) DCHECK(status_two_level.ok()); - if (status_two_level.ok() && !missing_field_two_level) return Status::OK(); - // The two-level resolution failed or reported a missing field, try three-level array - // encoding. - bool missing_field_three_level; - Status status_three_level = - ResolvePathHelper(THREE_LEVEL, path, node, pos_field, &missing_field_three_level); - if (missing_field_three_level) DCHECK(status_three_level.ok()); - if (status_three_level.ok() && !missing_field_three_level) return Status::OK(); - // The three-level resolution failed or reported a missing field, try one-level array - // encoding. - bool missing_field_one_level; - Status status_one_level = - ResolvePathHelper(ONE_LEVEL, path, node, pos_field, &missing_field_one_level); - if (missing_field_one_level) DCHECK(status_one_level.ok()); - if (status_one_level.ok() && !missing_field_one_level) return Status::OK(); - // None of resolutions yielded a node. Set *missing_field to true if any of the - // resolutions reported a missing a field. - if (missing_field_one_level || missing_field_two_level || missing_field_three_level) { - *node = NULL; - *missing_field = true; - return Status::OK(); - } - // All resolutions failed. Log and return the status from the three-level resolution - // (which is technically the standard). - DCHECK(!status_one_level.ok() && !status_two_level.ok() && !status_three_level.ok()); - *node = NULL; - VLOG_QUERY << status_three_level.msg().msg() << "\n" << GetStackTrace(); - return status_three_level; -} - -Status HdfsParquetScanner::ResolvePathHelper(ArrayEncoding array_encoding, - const SchemaPath& path, SchemaNode** node, bool* pos_field, bool* missing_field) { - DCHECK(schema_.element != NULL) - << "schema_ must be initialized before calling ResolvePath()"; - - *pos_field = false; - *missing_field = false; - *node = &schema_; - const ColumnType* col_type = NULL; - - // Traverse 'path' and resolve 'node' to the corresponding SchemaNode in 'schema_' (by - // ordinal), or set 'node' to NULL if 'path' doesn't exist in this file's schema. - for (int i = 0; i < path.size(); ++i) { - // Advance '*node' if necessary - if (i == 0 || col_type->type != TYPE_ARRAY || array_encoding == THREE_LEVEL) { - *node = NextSchemaNode(col_type, path, i, *node, missing_field); - if (*missing_field) return Status::OK(); - } else { - // We just resolved an array, meaning *node is set to the repeated field of the - // array. Since we are trying to resolve using one- or two-level array encoding, the - // repeated field represents both the array and the array's item (i.e. there is no - // explict item field), so we don't advance *node in this case. - DCHECK(col_type != NULL); - DCHECK_EQ(col_type->type, TYPE_ARRAY); - DCHECK(array_encoding == ONE_LEVEL || array_encoding == TWO_LEVEL); - DCHECK((*node)->is_repeated()); - } - - // Advance 'col_type' - int table_idx = path[i]; - col_type = i == 0 ? &scan_node_->hdfs_table()->col_descs()[table_idx].type() - : &col_type->children[table_idx]; - - // Resolve path[i] - if (col_type->type == TYPE_ARRAY) { - DCHECK_EQ(col_type->children.size(), 1); - RETURN_IF_ERROR( - ResolveArray(array_encoding, path, i, node, pos_field, missing_field)); - if (*missing_field || *pos_field) return Status::OK(); - } else if (col_type->type == TYPE_MAP) { - DCHECK_EQ(col_type->children.size(), 2); - RETURN_IF_ERROR(ResolveMap(path, i, node, missing_field)); - if (*missing_field) return Status::OK(); - } else if (col_type->type == TYPE_STRUCT) { - DCHECK_GT(col_type->children.size(), 0); - // Nothing to do for structs - } else { - DCHECK(!col_type->IsComplexType()); - DCHECK_EQ(i, path.size() - 1); - RETURN_IF_ERROR(ValidateScalarNode(**node, *col_type, path, i)); - } - } - DCHECK(*node != NULL); - return Status::OK(); -} - -HdfsParquetScanner::SchemaNode* HdfsParquetScanner::NextSchemaNode( - const ColumnType* col_type, const SchemaPath& path, int next_idx, SchemaNode* node, - bool* missing_field) { - DCHECK_LT(next_idx, path.size()); - if (next_idx != 0) DCHECK(col_type != NULL); - - int file_idx; - int table_idx = path[next_idx]; - bool resolve_by_name = state_->query_options().parquet_fallback_schema_resolution == - TParquetFallbackSchemaResolution::NAME; - if (resolve_by_name) { - if (next_idx == 0) { - // Resolve top-level table column by name. - DCHECK_LT(table_idx, scan_node_->hdfs_table()->col_descs().size()); - const string& name = scan_node_->hdfs_table()->col_descs()[table_idx].name(); - file_idx = FindChildWithName(node, name); - } else if (col_type->type == TYPE_STRUCT) { - // Resolve struct field by name. - DCHECK_LT(table_idx, col_type->field_names.size()); - const string& name = col_type->field_names[table_idx]; - file_idx = FindChildWithName(node, name); - } else if (col_type->type == TYPE_ARRAY) { - // Arrays have only one child in the file. - DCHECK_EQ(table_idx, SchemaPathConstants::ARRAY_ITEM); - file_idx = table_idx; - } else { - DCHECK_EQ(col_type->type, TYPE_MAP); - // Maps have two values, "key" and "value". These are supposed to be ordered and may - // not have the right field names, but try to resolve by name in case they're - // switched and otherwise use the order. See - // https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#maps for - // more details. - DCHECK(table_idx == SchemaPathConstants::MAP_KEY || - table_idx == SchemaPathConstants::MAP_VALUE); - const string& name = table_idx == SchemaPathConstants::MAP_KEY ? "key" : "value"; - file_idx = FindChildWithName(node, name); - if (file_idx >= node->children.size()) { - // Couldn't resolve by name, fall back to resolution by position. - file_idx = table_idx; - } - } - } else { - // Resolution by position. - DCHECK_EQ(state_->query_options().parquet_fallback_schema_resolution, - TParquetFallbackSchemaResolution::POSITION); - if (next_idx == 0) { - // For top-level columns, the first index in a path includes the table's partition - // keys. - file_idx = table_idx - scan_node_->num_partition_keys(); - } else { - file_idx = table_idx; - } - } - - if (file_idx >= node->children.size()) { - VLOG_FILE << Substitute( - "File '$0' does not contain path '$1' (resolving by $2)", filename(), - PrintPath(path), resolve_by_name ? "name" : "position"); - *missing_field = true; - return NULL; - } - return &node->children[file_idx]; -} - -int HdfsParquetScanner::FindChildWithName(HdfsParquetScanner::SchemaNode* node, - const string& name) { - int idx; - for (idx = 0; idx < node->children.size(); ++idx) { - if (node->children[idx].element->name == name) break; - } - return idx; -} - -// There are three types of array encodings: -// -// 1. One-level encoding -// A bare repeated field. This is interpreted as a required array of required -// items. -// Example: -// repeated <item-type> item; -// -// 2. Two-level encoding -// A group containing a single repeated field. This is interpreted as a -// <list-repetition> array of required items (<list-repetition> is either -// optional or required). -// Example: -// <list-repetition> group <name> { -// repeated <item-type> item; -// } -// -// 3. Three-level encoding -// The "official" encoding according to the parquet spec. A group containing a -// single repeated group containing the item field. This is interpreted as a -// <list-repetition> array of <item-repetition> items (<list-repetition> and -// <item-repetition> are each either optional or required). -// Example: -// <list-repetition> group <name> { -// repeated group list { -// <item-repetition> <item-type> item; -// } -// } -// -// We ignore any field annotations or names, making us more permissive than the -// Parquet spec dictates. Note that in any of the encodings, <item-type> may be a -// group containing more fields, which corresponds to a complex item type. See -// https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists for -// more details and examples. -// -// This function resolves the array at '*node' assuming one-, two-, or three-level -// encoding, determined by 'array_encoding'. '*node' is set to the repeated field for all -// three encodings (unless '*pos_field' or '*missing_field' are set to true). -Status HdfsParquetScanner::ResolveArray(ArrayEncoding array_encoding, - const SchemaPath& path, int idx, SchemaNode** node, bool* pos_field, - bool* missing_field) { - if (array_encoding == ONE_LEVEL) { - if (!(*node)->is_repeated()) { - ErrorMsg msg(TErrorCode::PARQUET_UNRECOGNIZED_SCHEMA, filename(), - PrintPath(path, idx), "array", (*node)->DebugString()); - return Status::Expected(msg); - } - } else { - // In the multi-level case, we always expect the outer group to contain a single - // repeated field - if ((*node)->children.size() != 1 || !(*node)->children[0].is_repeated()) { - ErrorMsg msg(TErrorCode::PARQUET_UNRECOGNIZED_SCHEMA, filename(), - PrintPath(path, idx), "array", (*node)->DebugString()); - return Status::Expected(msg); - } - // Set *node to the repeated field - *node = &(*node)->children[0]; - } - DCHECK((*node)->is_repeated()); - - if (idx + 1 < path.size()) { - if (path[idx + 1] == SchemaPathConstants::ARRAY_POS) { - // The next index in 'path' is the artifical position field. - DCHECK_EQ(path.size(), idx + 2) << "position field cannot have children!"; - *pos_field = true; - *node = NULL; - return Status::OK(); - } else { - // The n
<TRUNCATED>
