http://git-wip-us.apache.org/repos/asf/impala/blob/07fd3320/be/src/exec/parquet-column-readers.h ---------------------------------------------------------------------- diff --git a/be/src/exec/parquet-column-readers.h b/be/src/exec/parquet-column-readers.h deleted file mode 100644 index d689aed..0000000 --- a/be/src/exec/parquet-column-readers.h +++ /dev/null @@ -1,607 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#ifndef IMPALA_PARQUET_COLUMN_READERS_H -#define IMPALA_PARQUET_COLUMN_READERS_H - -#include <boost/scoped_ptr.hpp> - -#include "exec/hdfs-parquet-scanner.h" -#include "util/codec.h" -#include "util/bit-stream-utils.h" -#include "util/dict-encoding.h" -#include "util/rle-encoding.h" - -namespace impala { - -class Tuple; -class MemPool; - -/// Decoder for all supported Parquet level encodings. Optionally reads, decodes, and -/// caches level values in batches. -/// Level values are unsigned 8-bit integers because we support a maximum nesting -/// depth of 100, as enforced by the FE. Using a small type saves memory and speeds up -/// populating the level cache (e.g., with RLE we can memset() repeated values). -class ParquetLevelDecoder { - public: - ParquetLevelDecoder(bool is_def_level_decoder) - : decoding_error_code_(is_def_level_decoder ? TErrorCode::PARQUET_DEF_LEVEL_ERROR : - TErrorCode::PARQUET_REP_LEVEL_ERROR) {} - - /// Initialize the LevelDecoder. Reads and advances the provided data buffer if the - /// encoding requires reading metadata from the page header. 'cache_size' will be - /// rounded up to a multiple of 32 internally. - Status Init(const string& filename, parquet::Encoding::type encoding, - MemPool* cache_pool, int cache_size, int max_level, uint8_t** data, int* data_size); - - /// Returns the next level or INVALID_LEVEL if there was an error. Not as efficient - /// as batched methods. - inline int16_t ReadLevel(); - - /// If the next value is part of a repeated run and is not cached, return the length - /// of the repeated run. A max level of 0 is treated as an arbitrarily long run of - /// zeroes, so this returns numeric_limits<int32_t>::max(). Otherwise return 0. - int32_t NextRepeatedRunLength(); - - /// Get the value of the repeated run (if NextRepeatedRunLength() > 0) and consume - /// 'num_to_consume' items in the run. Not valid to call if there are cached levels - /// that have not been consumed. - uint8_t GetRepeatedValue(uint32_t num_to_consume); - - /// Decodes and caches the next batch of levels given that there are 'vals_remaining' - /// values left to decode in the page. Resets members associated with the cache. - /// Returns a non-ok status if there was a problem decoding a level, if a level was - /// encountered with a value greater than max_level_, or if fewer than - /// min(CacheSize(), vals_remaining) levels could be read, which indicates that the - /// input did not have the expected number of values. Only valid to call when - /// the cache has been exhausted, i.e. CacheHasNext() is false. - Status CacheNextBatch(int vals_remaining); - - /// Functions for working with the level cache. - inline bool CacheHasNext() const { return cached_level_idx_ < num_cached_levels_; } - inline uint8_t CacheGetNext() { - DCHECK_LT(cached_level_idx_, num_cached_levels_); - return cached_levels_[cached_level_idx_++]; - } - inline void CacheSkipLevels(int num_levels) { - DCHECK_LE(cached_level_idx_ + num_levels, num_cached_levels_); - cached_level_idx_ += num_levels; - } - inline int CacheSize() const { return num_cached_levels_; } - inline int CacheRemaining() const { return num_cached_levels_ - cached_level_idx_; } - inline int CacheCurrIdx() const { return cached_level_idx_; } - private: - /// Initializes members associated with the level cache. Allocates memory for - /// the cache from pool, if necessary. - Status InitCache(MemPool* pool, int cache_size); - - /// Decodes and writes a batch of levels into the cache. Returns true and sets - /// the number of values written to the cache via *num_cached_levels if no errors - /// are encountered. *num_cached_levels is < 'batch_size' in this case iff the - /// end of input was hit without any other errors. Returns false if there was an - /// error decoding a level or if there was an invalid level value greater than - /// 'max_level_'. Only valid to call when the cache has been exhausted, i.e. - /// CacheHasNext() is false. - bool FillCache(int batch_size, int* num_cached_levels); - - /// RLE decoder, used if 'encoding_' is RLE and max_level_ > 0. - RleBatchDecoder<uint8_t> rle_decoder_; - - /// Buffer for a batch of levels. The memory is allocated and owned by a pool passed - /// in Init(). - uint8_t* cached_levels_ = nullptr; - - /// Number of valid level values in the cache. - int num_cached_levels_ = 0; - - /// Current index into cached_levels_. - int cached_level_idx_ = 0; - - /// The parquet encoding used for the levels. Only RLE is supported for now. - parquet::Encoding::type encoding_ = parquet::Encoding::PLAIN; - - /// For error checking and reporting. - int max_level_ = 0; - - /// Number of level values cached_levels_ has memory allocated for. Always - /// a multiple of 32 to allow reading directly from 'bit_reader_' in batches. - int cache_size_ = 0; - - /// Name of the parquet file. Used for reporting level decoding errors. - string filename_; - - /// Error code to use when reporting level decoding errors. - TErrorCode::type decoding_error_code_; -}; - -/// Base class for reading a Parquet column. Reads a logical column, not necessarily a -/// column materialized in the file (e.g. collections). The two subclasses are -/// BaseScalarColumnReader and CollectionColumnReader. Column readers read one def and rep -/// level pair at a time. The current def and rep level are exposed to the user, and the -/// corresponding value (if defined) can optionally be copied into a slot via -/// ReadValue(). Can also write position slots. -/// -/// The constructor adds the object to the obj_pool of the parent HdfsParquetScanner. -class ParquetColumnReader { - public: - /// Creates a column reader for 'node' and associates it with the given parent scanner. - /// The constructor of column readers add the new object to the parent's object pool. - /// 'slot_desc' may be NULL, in which case the returned column reader can only be used - /// to read def/rep levels. - /// 'is_collection_field' should be set to true if the returned reader is reading a - /// collection. This cannot be determined purely by 'node' because a repeated scalar - /// node represents both an array and the array's items (in this case - /// 'is_collection_field' should be true if the reader reads one value per array, and - /// false if it reads one value per item). The reader is added to the runtime state's - /// object pool. Does not create child readers for collection readers; these must be - /// added by the caller. - /// - /// It supports the following primitive type widening that does not have any loss of - /// precision. - /// - tinyint (INT32) -> smallint (INT32), int (INT32), bigint (INT64), double (DOUBLE) - /// - smallint (INT32) -> int (INT32), bigint (INT64), double (DOUBLE) - /// - int (INT32) -> bigint (INT64), double (DOUBLE) - /// - float (FLOAT) -> double (DOUBLE) - static ParquetColumnReader* Create(const SchemaNode& node, bool is_collection_field, - const SlotDescriptor* slot_desc, HdfsParquetScanner* parent); - - static ParquetColumnReader* CreateTimestampColumnReader(const SchemaNode& node, - const SlotDescriptor* slot_desc, HdfsParquetScanner* parent); - - virtual ~ParquetColumnReader() { } - - int def_level() const { return def_level_; } - int rep_level() const { return rep_level_; } - - const SlotDescriptor* slot_desc() const { return slot_desc_; } - const parquet::SchemaElement& schema_element() const { return *node_.element; } - int16_t max_def_level() const { return max_def_level_; } - int16_t max_rep_level() const { return max_rep_level_; } - int def_level_of_immediate_repeated_ancestor() const { - return node_.def_level_of_immediate_repeated_ancestor; - } - const SlotDescriptor* pos_slot_desc() const { return pos_slot_desc_; } - void set_pos_slot_desc(const SlotDescriptor* pos_slot_desc) { - DCHECK(pos_slot_desc_ == NULL); - pos_slot_desc_ = pos_slot_desc; - } - - /// Returns true if this reader materializes collections (i.e. CollectionValues). - virtual bool IsCollectionReader() const { return false; } - - const char* filename() const { return parent_->filename(); } - - /// Read the current value (or null) into 'tuple' for this column. This should only be - /// called when a value is defined, i.e., def_level() >= - /// def_level_of_immediate_repeated_ancestor() (since empty or NULL collections produce - /// no output values), otherwise NextLevels() should be called instead. - /// - /// Advances this column reader to the next value (i.e. NextLevels() doesn't need to be - /// called after calling ReadValue()). - /// - /// Returns false if execution should be aborted for some reason, e.g. parse_error_ is - /// set, the query is cancelled, or the scan node limit was reached. Otherwise returns - /// true. - /// - /// NextLevels() must be called on this reader before calling ReadValue() for the first - /// time. This is to initialize the current value that ReadValue() will read. - /// - /// TODO: this is the function that needs to be codegen'd (e.g. CodegenReadValue()) - /// The codegened functions from all the materialized cols will then be combined - /// into one function. - /// TODO: another option is to materialize col by col for the entire row batch in - /// one call. e.g. MaterializeCol would write out 1024 values. Our row batches - /// are currently dense so we'll need to figure out something there. - virtual bool ReadValue(MemPool* pool, Tuple* tuple) = 0; - - /// Same as ReadValue() but does not advance repetition level. Only valid for columns - /// not in collections. - virtual bool ReadNonRepeatedValue(MemPool* pool, Tuple* tuple) = 0; - - /// Batched version of ReadValue() that reads up to max_values at once and materializes - /// them into tuples in tuple_mem. Returns the number of values actually materialized - /// in *num_values. The return value, error behavior and state changes are generally - /// the same as in ReadValue(). For example, if an error occurs in the middle of - /// materializing a batch then false is returned, and num_values, tuple_mem, as well as - /// this column reader are left in an undefined state, assuming that the caller will - /// immediately abort execution. NextLevels() does *not* need to be called before - /// ReadValueBatch(), unlike ReadValue(). - virtual bool ReadValueBatch(MemPool* pool, int max_values, int tuple_size, - uint8_t* tuple_mem, int* num_values); - - /// Batched version of ReadNonRepeatedValue() that reads up to max_values at once and - /// materializes them into tuples in tuple_mem. - /// The return value and error behavior are the same as in ReadValueBatch(). - virtual bool ReadNonRepeatedValueBatch(MemPool* pool, int max_values, int tuple_size, - uint8_t* tuple_mem, int* num_values); - - /// Advances this column reader's def and rep levels to the next logical value, i.e. to - /// the next scalar value or the beginning of the next collection, without attempting to - /// read the value. This is used to skip past def/rep levels that don't materialize a - /// value, such as the def/rep levels corresponding to an empty containing collection. - /// - /// NextLevels() must be called on this reader before calling ReadValue() for the first - /// time. This is to initialize the current value that ReadValue() will read. - /// - /// Returns false if execution should be aborted for some reason, e.g. parse_error_ is - /// set, the query is cancelled, or the scan node limit was reached. Otherwise returns - /// true. - virtual bool NextLevels() = 0; - - /// Writes pos_current_value_ (i.e. "reads" the synthetic position field of the - /// parent collection) to 'pos' and increments pos_current_value_. Only valid to - /// call when doing non-batched reading, i.e. NextLevels() must have been called - /// before each call to this function to advance to the next element in the - /// collection. - void ReadPositionNonBatched(int64_t* pos); - - /// Returns true if this column reader has reached the end of the row group. - inline bool RowGroupAtEnd() { - DCHECK_EQ(rep_level_ == HdfsParquetScanner::ROW_GROUP_END, - def_level_ == HdfsParquetScanner::ROW_GROUP_END); - return rep_level_ == HdfsParquetScanner::ROW_GROUP_END; - } - - /// If 'row_batch' is non-NULL, transfers the remaining resources backing tuples to it, - /// and frees up other resources. If 'row_batch' is NULL frees all resources instead. - virtual void Close(RowBatch* row_batch) = 0; - - protected: - HdfsParquetScanner* parent_; - const SchemaNode& node_; - const SlotDescriptor* const slot_desc_; - - /// The slot descriptor for the position field of the tuple, if there is one. NULL if - /// there's not. Only one column reader for a given tuple desc will have this set. - const SlotDescriptor* pos_slot_desc_; - - /// The next value to write into the position slot, if there is one. 64-bit int because - /// the pos slot is always a BIGINT Set to INVALID_POS when this column reader does not - /// have a current rep and def level (i.e. before the first NextLevels() call or after - /// the last value in the column has been read). - int64_t pos_current_value_; - - /// The current repetition and definition levels of this reader. Advanced via - /// ReadValue() and NextLevels(). Set to INVALID_LEVEL before the first NextLevels() - /// call for a row group or if an error is encountered decoding a level. Set to - /// ROW_GROUP_END after the last value in the column has been read). If this is not - /// inside a collection, rep_level_ is always 0, INVALID_LEVEL or ROW_GROUP_END. - /// int16_t is large enough to hold the valid levels 0-255 and negative sentinel values - /// INVALID_LEVEL and ROW_GROUP_END. The maximum values are cached here because they - /// are accessed in inner loops. - int16_t rep_level_; - const int16_t max_rep_level_; - int16_t def_level_; - const int16_t max_def_level_; - - // Cache frequently accessed members of slot_desc_ for perf. - - /// slot_desc_->tuple_offset(). -1 if slot_desc_ is NULL. - const int tuple_offset_; - - /// slot_desc_->null_indicator_offset(). Invalid if slot_desc_ is NULL. - const NullIndicatorOffset null_indicator_offset_; - - ParquetColumnReader(HdfsParquetScanner* parent, const SchemaNode& node, - const SlotDescriptor* slot_desc) - : parent_(parent), - node_(node), - slot_desc_(slot_desc), - pos_slot_desc_(NULL), - pos_current_value_(HdfsParquetScanner::INVALID_POS), - rep_level_(HdfsParquetScanner::INVALID_LEVEL), - max_rep_level_(node_.max_rep_level), - def_level_(HdfsParquetScanner::INVALID_LEVEL), - max_def_level_(node_.max_def_level), - tuple_offset_(slot_desc == NULL ? -1 : slot_desc->tuple_offset()), - null_indicator_offset_(slot_desc == NULL ? NullIndicatorOffset() : - slot_desc->null_indicator_offset()) { - DCHECK(parent != nullptr); - parent->obj_pool_.Add(this); - - DCHECK_GE(node_.max_rep_level, 0); - DCHECK_LE(node_.max_rep_level, std::numeric_limits<int16_t>::max()); - DCHECK_GE(node_.max_def_level, 0); - DCHECK_LE(node_.max_def_level, std::numeric_limits<int16_t>::max()); - // rep_level_ is always valid and equal to 0 if col not in collection. - if (max_rep_level() == 0) rep_level_ = 0; - } - - /// Called in the middle of creating a scratch tuple batch to simulate failures - /// such as exceeding memory limit or cancellation. Returns false if the debug - /// action deems that the parquet column reader should halt execution. 'val_count' - /// is the counter which the column reader uses to track the number of tuples - /// produced so far. If the column reader should halt execution, 'parse_status_' - /// is updated with the error status and 'val_count' is set to 0. - bool ColReaderDebugAction(int* val_count); -}; - -/// Reader for a single column from the parquet file. It's associated with a -/// ScannerContext::Stream and is responsible for decoding the data. Super class for -/// per-type column readers. This contains most of the logic, the type specific functions -/// must be implemented in the subclass. -class BaseScalarColumnReader : public ParquetColumnReader { - public: - BaseScalarColumnReader(HdfsParquetScanner* parent, const SchemaNode& node, - const SlotDescriptor* slot_desc) - : ParquetColumnReader(parent, node, slot_desc), - data_page_pool_(new MemPool(parent->scan_node_->mem_tracker())) { - DCHECK_GE(node_.col_idx, 0) << node_.DebugString(); - } - - virtual ~BaseScalarColumnReader() { } - - /// Resets the reader for each row group in the file and creates the scan - /// range for the column, but does not start it. To start scanning, - /// set_io_reservation() must be called to assign reservation to this - /// column, followed by StartScan(). - Status Reset(const HdfsFileDesc& file_desc, const parquet::ColumnChunk& col_chunk, - int row_group_idx); - - /// Starts the column scan range. The reader must be Reset() and have a - /// reservation assigned via set_io_reservation(). This must be called - /// before any of the column data can be read (including dictionary and - /// data pages). Returns an error status if there was an error starting the - /// scan or allocating buffers for it. - Status StartScan(); - - /// Helper to start scans for multiple columns at once. - static Status StartScans(const std::vector<BaseScalarColumnReader*> readers) { - for (BaseScalarColumnReader* reader : readers) RETURN_IF_ERROR(reader->StartScan()); - return Status::OK(); - } - - virtual void Close(RowBatch* row_batch) { - if (row_batch != nullptr && PageContainsTupleData(page_encoding_)) { - row_batch->tuple_data_pool()->AcquireData(data_page_pool_.get(), false); - } else { - data_page_pool_->FreeAll(); - } - if (decompressor_ != nullptr) decompressor_->Close(); - DictDecoderBase* dict_decoder = GetDictionaryDecoder(); - if (dict_decoder != nullptr) dict_decoder->Close(); - } - - io::ScanRange* scan_range() const { return scan_range_; } - int64_t total_len() const { return metadata_->total_compressed_size; } - int col_idx() const { return node_.col_idx; } - THdfsCompression::type codec() const { - if (metadata_ == NULL) return THdfsCompression::NONE; - return ConvertParquetToImpalaCodec(metadata_->codec); - } - void set_io_reservation(int bytes) { io_reservation_ = bytes; } - - /// Reads the next definition and repetition levels for this column. Initializes the - /// next data page if necessary. - virtual bool NextLevels() { return NextLevels<true>(); } - - /// Check the data stream to see if there is a dictionary page. If there is, - /// use that page to initialize dict_decoder_ and advance the data stream - /// past the dictionary page. - Status InitDictionary(); - - /// Convenience function to initialize multiple dictionaries. - static Status InitDictionaries(const std::vector<BaseScalarColumnReader*> readers); - - // Returns the dictionary or NULL if the dictionary doesn't exist - virtual DictDecoderBase* GetDictionaryDecoder() { return nullptr; } - - // Returns whether the datatype for this column requires conversion from the on-disk - // format for correctness. For example, timestamps can require an offset to be - // applied. - virtual bool NeedsConversion() { return false; } - - // Returns whether the datatype for this column requires validation. For example, - // the timestamp format has certain bit combinations that are invalid, and these - // need to be validated when read from disk. - virtual bool NeedsValidation() { return false; } - - // TODO: Some encodings might benefit a lot from a SkipValues(int num_rows) if - // we know this row can be skipped. This could be very useful with stats and big - // sections can be skipped. Implement that when we can benefit from it. - - protected: - // Friend parent scanner so it can perform validation (e.g. ValidateEndOfRowGroup()) - friend class HdfsParquetScanner; - - // Class members that are accessed for every column should be included up here so they - // fit in as few cache lines as possible. - - /// Pointer to start of next value in data page - uint8_t* data_ = nullptr; - - /// End of the data page. - const uint8_t* data_end_ = nullptr; - - /// Decoder for definition levels. - ParquetLevelDecoder def_levels_{true}; - - /// Decoder for repetition levels. - ParquetLevelDecoder rep_levels_{false}; - - /// Page encoding for values of the current data page. Cached here for perf. Set in - /// InitDataPage(). - parquet::Encoding::type page_encoding_ = parquet::Encoding::PLAIN_DICTIONARY; - - /// Num values remaining in the current data page - int num_buffered_values_ = 0; - - // Less frequently used members that are not accessed in inner loop should go below - // here so they do not occupy precious cache line space. - - /// The number of values seen so far. Updated per data page. - int64_t num_values_read_ = 0; - - /// Metadata for the column for the current row group. - const parquet::ColumnMetaData* metadata_ = nullptr; - - boost::scoped_ptr<Codec> decompressor_; - - /// The scan range for the column's data. Initialized for each row group by Reset(). - io::ScanRange* scan_range_ = nullptr; - - // Stream used to read data from 'scan_range_'. Initialized by StartScan(). - ScannerContext::Stream* stream_ = nullptr; - - /// Reservation in bytes to use for I/O buffers in 'scan_range_'/'stream_'. Must be set - /// with set_io_reservation() before 'stream_' is initialized. Reset for each row group - /// by Reset(). - int64_t io_reservation_ = 0; - - /// Pool to allocate storage for data pages from - either decompression buffers for - /// compressed data pages or copies of the data page with var-len data to attach to - /// batches. - boost::scoped_ptr<MemPool> data_page_pool_; - - /// Header for current data page. - parquet::PageHeader current_page_header_; - - /// Reads the next page header into next_page_header/next_header_size. - /// If the stream reaches the end before reading a complete page header, - /// eos is set to true. If peek is false, the stream position is advanced - /// past the page header. If peek is true, the stream position is not moved. - /// Returns an error status if the next page header could not be read. - Status ReadPageHeader(bool peek, parquet::PageHeader* next_page_header, - uint32_t* next_header_size, bool* eos); - - /// Read the next data page. If a dictionary page is encountered, that will be read and - /// this function will continue reading the next data page. - Status ReadDataPage(); - - /// Try to move the the next page and buffer more values. Return false and sets rep_level_, - /// def_level_ and pos_current_value_ to -1 if no more pages or an error encountered. - bool NextPage(); - - /// Implementation for NextLevels(). - template <bool ADVANCE_REP_LEVEL> - bool NextLevels(); - - /// Creates a dictionary decoder from values/size. 'decoder' is set to point to a - /// dictionary decoder stored in this object. Subclass must implement this. Returns - /// an error status if the dictionary values could not be decoded successfully. - virtual Status CreateDictionaryDecoder(uint8_t* values, int size, - DictDecoderBase** decoder) = 0; - - /// Return true if the column has a dictionary decoder. Subclass must implement this. - virtual bool HasDictionaryDecoder() = 0; - - /// Clear the dictionary decoder so HasDictionaryDecoder() will return false. Subclass - /// must implement this. - virtual void ClearDictionaryDecoder() = 0; - - /// Initializes the reader with the data contents. This is the content for the entire - /// decompressed data page. Decoders can initialize state from here. The caller must - /// validate the input such that 'size' is non-negative and that 'data' has at least - /// 'size' bytes remaining. - virtual Status InitDataPage(uint8_t* data, int size) = 0; - - /// Allocate memory for the uncompressed contents of a data page of 'size' bytes from - /// 'data_page_pool_'. 'err_ctx' provides context for error messages. On success, 'buffer' - /// points to the allocated memory. Otherwise an error status is returned. - Status AllocateUncompressedDataPage( - int64_t size, const char* err_ctx, uint8_t** buffer); - - /// Returns true if a data page for this column with the specified 'encoding' may - /// contain strings referenced by returned batches. Cases where this is not true are: - /// * Dictionary-compressed pages, where any string data lives in 'dictionary_pool_'. - /// * Fixed-length slots, where there is no string data. - bool PageContainsTupleData(parquet::Encoding::type page_encoding) { - return page_encoding != parquet::Encoding::PLAIN_DICTIONARY - && slot_desc_ != nullptr && slot_desc_->type().IsVarLenStringType(); - } - - /// Slow-path status construction code for def/rep decoding errors. 'level_name' is - /// either "rep" or "def", 'decoded_level' is the value returned from - /// ParquetLevelDecoder::ReadLevel() and 'max_level' is the maximum allowed value. - void __attribute__((noinline)) SetLevelDecodeError(const char* level_name, - int decoded_level, int max_level); - - // Returns a detailed error message about unsupported encoding. - Status GetUnsupportedDecodingError(); -}; - -/// Collections are not materialized directly in parquet files; only scalar values appear -/// in the file. CollectionColumnReader uses the definition and repetition levels of child -/// column readers to figure out the boundaries of each collection in this column. -class CollectionColumnReader : public ParquetColumnReader { - public: - CollectionColumnReader(HdfsParquetScanner* parent, const SchemaNode& node, - const SlotDescriptor* slot_desc) - : ParquetColumnReader(parent, node, slot_desc) { - DCHECK(node_.is_repeated()); - if (slot_desc != NULL) DCHECK(slot_desc->type().IsCollectionType()); - } - - virtual ~CollectionColumnReader() { } - - vector<ParquetColumnReader*>* children() { return &children_; } - - virtual bool IsCollectionReader() const { return true; } - - /// The repetition level indicating that the current value is the first in a new - /// collection (meaning the last value read was the final item in the previous - /// collection). - int new_collection_rep_level() const { return max_rep_level() - 1; } - - /// Materializes CollectionValue into tuple slot (if materializing) and advances to next - /// value. - virtual bool ReadValue(MemPool* pool, Tuple* tuple); - - /// Same as ReadValue but does not advance repetition level. Only valid for columns not - /// in collections. - virtual bool ReadNonRepeatedValue(MemPool* pool, Tuple* tuple); - - /// Advances all child readers to the beginning of the next collection and updates this - /// reader's state. - virtual bool NextLevels(); - - /// This is called once for each row group in the file. - void Reset() { - def_level_ = HdfsParquetScanner::INVALID_LEVEL; - rep_level_ = HdfsParquetScanner::INVALID_LEVEL; - pos_current_value_ = HdfsParquetScanner::INVALID_POS; - } - - virtual void Close(RowBatch* row_batch) { - for (ParquetColumnReader* child_reader: children_) { - child_reader->Close(row_batch); - } - } - - private: - /// Column readers of fields contained within this collection. There is at least one - /// child reader per collection reader. Child readers either materialize slots in the - /// collection item tuples, or there is a single child reader that does not materialize - /// any slot and is only used by this reader to read def and rep levels. - vector<ParquetColumnReader*> children_; - - /// Updates this reader's def_level_, rep_level_, and pos_current_value_ based on child - /// reader's state. - void UpdateDerivedState(); - - /// Recursively reads from children_ to assemble a single CollectionValue into - /// 'slot'. Also advances rep_level_ and def_level_ via NextLevels(). - /// - /// Returns false if execution should be aborted for some reason, e.g. parse_error_ is - /// set, the query is cancelled, or the scan node limit was reached. Otherwise returns - /// true. - inline bool ReadSlot(CollectionValue* slot, MemPool* pool); -}; - -} - -#endif
http://git-wip-us.apache.org/repos/asf/impala/blob/07fd3320/be/src/exec/parquet-column-stats.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/parquet-column-stats.cc b/be/src/exec/parquet-column-stats.cc deleted file mode 100644 index 478bba4..0000000 --- a/be/src/exec/parquet-column-stats.cc +++ /dev/null @@ -1,193 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "parquet-column-stats.inline.h" - -#include <algorithm> -#include <cmath> -#include <limits> - -#include "common/names.h" - -namespace impala { - -bool ColumnStatsReader::ReadFromThrift(StatsField stats_field, void* slot) const { - if (!(col_chunk_.__isset.meta_data && col_chunk_.meta_data.__isset.statistics)) { - return false; - } - const parquet::Statistics& stats = col_chunk_.meta_data.statistics; - - // Try to read the requested stats field. If it is not set, we may fall back to reading - // the old stats, based on the column type. - const string* stat_value = nullptr; - switch (stats_field) { - case StatsField::MIN: - if (stats.__isset.min_value && CanUseStats()) { - stat_value = &stats.min_value; - break; - } - if (stats.__isset.min && CanUseDeprecatedStats()) { - stat_value = &stats.min; - } - break; - case StatsField::MAX: - if (stats.__isset.max_value && CanUseStats()) { - stat_value = &stats.max_value; - break; - } - if (stats.__isset.max && CanUseDeprecatedStats()) { - stat_value = &stats.max; - } - break; - default: - DCHECK(false) << "Unsupported statistics field requested"; - } - if (stat_value == nullptr) return false; - - switch (col_type_.type) { - case TYPE_BOOLEAN: - return ColumnStats<bool>::DecodePlainValue(*stat_value, slot, - parquet::Type::BOOLEAN); - case TYPE_TINYINT: { - // parquet::Statistics encodes INT_8 values using 4 bytes. - int32_t col_stats; - bool ret = ColumnStats<int32_t>::DecodePlainValue(*stat_value, &col_stats, - parquet::Type::INT32); - if (!ret || col_stats < std::numeric_limits<int8_t>::min() || - col_stats > std::numeric_limits<int8_t>::max()) { - return false; - } - *static_cast<int8_t*>(slot) = col_stats; - return true; - } - case TYPE_SMALLINT: { - // parquet::Statistics encodes INT_16 values using 4 bytes. - int32_t col_stats; - bool ret = ColumnStats<int32_t>::DecodePlainValue(*stat_value, &col_stats, - parquet::Type::INT32); - if (!ret || col_stats < std::numeric_limits<int16_t>::min() || - col_stats > std::numeric_limits<int16_t>::max()) { - return false; - } - *static_cast<int16_t*>(slot) = col_stats; - return true; - } - case TYPE_INT: - return ColumnStats<int32_t>::DecodePlainValue(*stat_value, slot, element_.type); - case TYPE_BIGINT: - return ColumnStats<int64_t>::DecodePlainValue(*stat_value, slot, element_.type); - case TYPE_FLOAT: - // IMPALA-6527, IMPALA-6538: ignore min/max stats if NaN - return ColumnStats<float>::DecodePlainValue(*stat_value, slot, element_.type) - && !std::isnan(*reinterpret_cast<float*>(slot)); - case TYPE_DOUBLE: - // IMPALA-6527, IMPALA-6538: ignore min/max stats if NaN - return ColumnStats<double>::DecodePlainValue(*stat_value, slot, element_.type) - && !std::isnan(*reinterpret_cast<double*>(slot)); - case TYPE_TIMESTAMP: - return DecodeTimestamp(*stat_value, stats_field, - static_cast<TimestampValue*>(slot)); - case TYPE_STRING: - case TYPE_VARCHAR: - return ColumnStats<StringValue>::DecodePlainValue(*stat_value, slot, element_.type); - case TYPE_CHAR: - /// We don't read statistics for CHAR columns, since CHAR support is broken in - /// Impala (IMPALA-1652). - return false; - case TYPE_DECIMAL: - switch (col_type_.GetByteSize()) { - case 4: - return ColumnStats<Decimal4Value>::DecodePlainValue(*stat_value, slot, - element_.type); - case 8: - return ColumnStats<Decimal8Value>::DecodePlainValue(*stat_value, slot, - element_.type); - case 16: - return ColumnStats<Decimal16Value>::DecodePlainValue(*stat_value, slot, - element_.type); - } - DCHECK(false) << "Unknown decimal byte size: " << col_type_.GetByteSize(); - default: - DCHECK(false) << col_type_.DebugString(); - } - return false; -} - -bool ColumnStatsReader::DecodeTimestamp(const std::string& stat_value, - ColumnStatsReader::StatsField stats_field, TimestampValue* slot) const { - bool stats_read = false; - if (element_.type == parquet::Type::INT96) { - stats_read = - ColumnStats<TimestampValue>::DecodePlainValue(stat_value, slot, element_.type); - } else if (element_.type == parquet::Type::INT64) { - int64_t tmp; - stats_read = ColumnStats<int64_t>::DecodePlainValue(stat_value, &tmp, element_.type); - if (stats_read) *slot = timestamp_decoder_.Int64ToTimestampValue(tmp); - } else { - DCHECK(false) << element_.name; - return false; - } - - if (stats_read && timestamp_decoder_.NeedsConversion()) { - if (stats_field == ColumnStatsReader::StatsField::MIN) { - timestamp_decoder_.ConvertMinStatToLocalTime(slot); - } else { - timestamp_decoder_.ConvertMaxStatToLocalTime(slot); - } - } - return stats_read && slot->HasDateAndTime(); -} - -bool ColumnStatsReader::ReadNullCountStat(int64_t* null_count) const { - if (!(col_chunk_.__isset.meta_data && col_chunk_.meta_data.__isset.statistics)) { - return false; - } - const parquet::Statistics& stats = col_chunk_.meta_data.statistics; - if (stats.__isset.null_count) { - *null_count = stats.null_count; - return true; - } - return false; -} - -Status ColumnStatsBase::CopyToBuffer(StringBuffer* buffer, StringValue* value) { - if (value->ptr == buffer->buffer()) return Status::OK(); - buffer->Clear(); - RETURN_IF_ERROR(buffer->Append(value->ptr, value->len)); - value->ptr = buffer->buffer(); - return Status::OK(); -} - -bool ColumnStatsReader::CanUseStats() const { - // If column order is not set, only statistics for numeric types can be trusted. - if (col_order_ == nullptr) { - return col_type_.IsBooleanType() || col_type_.IsIntegerType() - || col_type_.IsFloatingPointType(); - } - // Stats can be used if the column order is TypeDefinedOrder (see parquet.thrift). - return col_order_->__isset.TYPE_ORDER; -} - -bool ColumnStatsReader::CanUseDeprecatedStats() const { - // If column order is set to something other than TypeDefinedOrder, we shall not use the - // stats (see parquet.thrift). - if (col_order_ != nullptr && !col_order_->__isset.TYPE_ORDER) return false; - return col_type_.IsBooleanType() || col_type_.IsIntegerType() - || col_type_.IsFloatingPointType(); -} - -} // end ns impala http://git-wip-us.apache.org/repos/asf/impala/blob/07fd3320/be/src/exec/parquet-column-stats.h ---------------------------------------------------------------------- diff --git a/be/src/exec/parquet-column-stats.h b/be/src/exec/parquet-column-stats.h deleted file mode 100644 index fc880f9..0000000 --- a/be/src/exec/parquet-column-stats.h +++ /dev/null @@ -1,299 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#ifndef IMPALA_EXEC_PARQUET_COLUMN_STATS_H -#define IMPALA_EXEC_PARQUET_COLUMN_STATS_H - -#include <string> -#include <type_traits> - -#include "exec/parquet-common.h" -#include "runtime/decimal-value.h" -#include "runtime/string-buffer.h" -#include "runtime/timestamp-value.h" -#include "runtime/types.h" - -#include "gen-cpp/parquet_types.h" - -namespace impala { - -/// This class, together with its derivatives, is used to update column statistics when -/// writing parquet files. It provides an interface to populate a parquet::Statistics -/// object and attach it to an object supplied by the caller. It can also be used to -/// decode parquet::Statistics into slots. -/// -/// We currently support writing the 'min_value' and 'max_value' fields in -/// parquet::Statistics. The other two statistical values - 'null_count' and -/// 'distinct_count' - are not tracked or populated. We do not populate the deprecated -/// 'min' and 'max' fields. -/// -/// Regarding the ordering of values, we follow the parquet-format specification for -/// logical types (LogicalTypes.md in parquet-format): -/// -/// - Numeric values (BOOLEAN, INT, FLOAT, DOUBLE, DECIMAL) are ordered by their numeric -/// value (as opposed to their binary representation). -/// -/// - Strings are ordered using bytewise, unsigned comparison. -/// -/// - Timestamps are compared by numerically comparing the points in time they represent. -/// -/// NULL values are not considered for min/max statistics, and if a column consists only -/// of NULL values, then no min/max statistics are written. -/// -/// Updating the statistics is handled in derived classes to alleviate the need for -/// virtual function calls. -/// -/// TODO: Populate null_count and distinct_count. -class ColumnStatsBase { - public: - /// min and max functions for types that are not floating point numbers - template <typename T, typename Enable = void> - struct MinMaxTrait { - static decltype(auto) MinValue(const T& a, const T& b) { return std::min(a, b); } - static decltype(auto) MaxValue(const T& a, const T& b) { return std::max(a, b); } - static int Compare(const T& a, const T& b) { - if (a < b) return -1; - if (a > b) return 1; - return 0; - } - }; - - /// min and max functions for floating point types - template <typename T> - struct MinMaxTrait<T, std::enable_if_t<std::is_floating_point<T>::value>> { - static decltype(auto) MinValue(const T& a, const T& b) { return std::fmin(a, b); } - static decltype(auto) MaxValue(const T& a, const T& b) { return std::fmax(a, b); } - static int Compare(const T& a, const T& b) { - //TODO: Should be aligned with PARQUET-1222, once resolved - if (a == b) return 0; - if (std::isnan(a) && std::isnan(b)) return 0; - if (MaxValue(a, b) == a) return 1; - return -1; - } - }; - - ColumnStatsBase() : has_min_max_values_(false), null_count_(0) {} - virtual ~ColumnStatsBase() {} - - /// Merges this statistics object with values from 'other'. If other has not been - /// initialized, then this object will not be changed. It maintains internal state that - /// tracks whether the min/max values are ordered. - virtual void Merge(const ColumnStatsBase& other) = 0; - - /// Copies the contents of this object's statistics values to internal buffers. Some - /// data types (e.g. StringValue) need to be copied at the end of processing a row - /// batch, since the batch memory will be released. Overwrite this method in derived - /// classes to provide the functionality. - virtual Status MaterializeStringValuesToInternalBuffers() WARN_UNUSED_RESULT { - return Status::OK(); - } - - /// Returns the number of bytes needed to encode the current statistics into a - /// parquet::Statistics object. - virtual int64_t BytesNeeded() const = 0; - - /// Encodes the current values into a Statistics thrift message. - virtual void EncodeToThrift(parquet::Statistics* out) const = 0; - - /// Resets the state of this object. - void Reset(); - - /// Update the statistics by incrementing the null_count. It is called each time a null - /// value is appended to the column or the statistics are merged. - void IncrementNullCount(int64_t count) { null_count_ += count; } - - /// Returns the boundary order of the pages. That is, whether the lists of min/max - /// elements inside the ColumnIndex are ordered and if so, in which direction. - /// If both 'ascending_boundary_order_' and 'descending_boundary_order_' is true, - /// it means all elements are equal, we choose ascending order in this case. - /// If only one flag is true, or both of them is false, then we return the identified - /// ordering, or unordered. - parquet::BoundaryOrder::type GetBoundaryOrder() const { - if (ascending_boundary_order_) return parquet::BoundaryOrder::ASCENDING; - if (descending_boundary_order_) return parquet::BoundaryOrder::DESCENDING; - return parquet::BoundaryOrder::UNORDERED; - } - - protected: - // Copies the memory of 'value' into 'buffer' and make 'value' point to 'buffer'. - // 'buffer' is reset before making the copy. - static Status CopyToBuffer(StringBuffer* buffer, StringValue* value) WARN_UNUSED_RESULT; - - /// Stores whether the min and max values of the current object have been initialized. - bool has_min_max_values_; - - // Number of null values since the last call to Reset(). - int64_t null_count_; - - // If true, min/max values are ascending. - // We assume the values are ascending, so start with true and only make it false when - // we find a descending value. If not all values are equal, then at least one of - // 'ascending_boundary_order_' and 'descending_boundary_order_' will be false. - bool ascending_boundary_order_ = true; - - // If true, min/max values are descending. - // See description of 'ascending_boundary_order_'. - bool descending_boundary_order_ = true; -}; - -/// This class contains behavior specific to our in-memory formats for different types. -template <typename T> -class ColumnStats : public ColumnStatsBase { - friend class ColumnStatsBase; - // We explicitly require types to be listed here in order to support column statistics. - // When adding a type here, users of this class need to ensure that the statistics - // follow the ordering semantics of parquet's min/max statistics for the new type. - // Details on how the values should be ordered can be found in the 'parquet-format' - // project in 'parquet.thrift' and 'LogicalTypes.md'. - using value_type = typename std::enable_if< - std::is_arithmetic<T>::value - || std::is_same<bool, T>::value - || std::is_same<StringValue, T>::value - || std::is_same<TimestampValue, T>::value - || std::is_same<Decimal4Value, T>::value - || std::is_same<Decimal8Value, T>::value - || std::is_same<Decimal16Value, T>::value, - T>::type; - - public: - /// 'mem_pool' is used to materialize string values so that the user of this class can - /// free the memory of the original values. - /// 'plain_encoded_value_size' specifies the size of each encoded value in plain - /// encoding, -1 if the type is variable-length. - ColumnStats(MemPool* mem_pool, int plain_encoded_value_size) - : ColumnStatsBase(), - plain_encoded_value_size_(plain_encoded_value_size), - mem_pool_(mem_pool), - min_buffer_(mem_pool), - max_buffer_(mem_pool), - prev_page_min_buffer_(mem_pool), - prev_page_max_buffer_(mem_pool) {} - - /// Updates the statistics based on the values min_value and max_value. If necessary, - /// initializes the statistics. It may keep a reference to either value until - /// MaterializeStringValuesToInternalBuffers() gets called. - void Update(const T& min_value, const T& max_value); - - /// Wrapper to call the Update function which takes in the min_value and max_value. - void Update(const T& v) { Update(v, v); } - - virtual void Merge(const ColumnStatsBase& other) override; - virtual Status MaterializeStringValuesToInternalBuffers() override { - return Status::OK(); - } - - virtual int64_t BytesNeeded() const override; - virtual void EncodeToThrift(parquet::Statistics* out) const override; - - /// Decodes the plain encoded stats value from 'buffer' and writes the result into the - /// buffer pointed to by 'slot'. Returns true if decoding was successful, false - /// otherwise. For timestamps, an additional validation will be performed. - static bool DecodePlainValue(const std::string& buffer, void* slot, - parquet::Type::type parquet_type); - - protected: - /// Encodes a single value using parquet's plain encoding and stores it into the binary - /// string 'out'. String values are stored without additional encoding. 'bytes_needed' - /// must be positive. - static void EncodePlainValue(const T& v, int64_t bytes_needed, std::string* out); - - /// Returns the number of bytes needed to encode value 'v'. - int64_t BytesNeeded(const T& v) const; - - // Size of each encoded value in plain encoding, -1 if the type is variable-length. - int plain_encoded_value_size_; - - // Minimum value since the last call to Reset(). - T min_value_; - - // Maximum value since the last call to Reset(). - T max_value_; - - // Minimum value of the previous page. Need to store that to calculate boundary order. - T prev_page_min_value_; - - // Maximum value of the previous page. Need to store that to calculate boundary order. - T prev_page_max_value_; - - // Memory pool to allocate from when making copies of the statistics data. - MemPool* mem_pool_; - - // Local buffers to copy statistics data into. - StringBuffer min_buffer_; - StringBuffer max_buffer_; - StringBuffer prev_page_min_buffer_; - StringBuffer prev_page_max_buffer_; -}; - -/// Class that handles the decoding of Parquet stats (min/max/null_count) for a given -/// column chunk. -class ColumnStatsReader { -public: - /// Enum to select whether to read minimum or maximum statistics. Values do not - /// correspond to fields in parquet::Statistics, but instead select between retrieving - /// the minimum or maximum value. - enum class StatsField { MIN, MAX }; - - ColumnStatsReader(const parquet::ColumnChunk& col_chunk, const ColumnType& col_type, - const parquet::ColumnOrder* col_order, const parquet::SchemaElement& element) - : col_chunk_(col_chunk), - col_type_(col_type), - col_order_(col_order), - element_(element) {} - - /// Sets extra information that is only needed for decoding TIMESTAMP stats. - void SetTimestampDecoder(ParquetTimestampDecoder timestamp_decoder) { - timestamp_decoder_ = timestamp_decoder; - } - - /// Decodes the parquet::Statistics from 'col_chunk_' and writes the value selected by - /// 'stats_field' into the buffer pointed to by 'slot', based on 'col_type_'. Returns - /// true if reading statistics for columns of type 'col_type_' is supported and decoding - /// was successful, false otherwise. - bool ReadFromThrift(StatsField stats_field, void* slot) const; - - // Gets the null_count statistics from the column chunk's metadata and returns - // it via an output parameter. - // Returns true if the null_count stats were read successfully, false otherwise. - bool ReadNullCountStat(int64_t* null_count) const; - -private: - /// Returns true if we support reading statistics stored in the fields 'min_value' and - /// 'max_value' in parquet::Statistics for the type 'col_type_' and the column order - /// 'col_order_'. Otherwise, returns false. If 'col_order_' is nullptr, only primitive - /// numeric types are supported. - bool CanUseStats() const; - - /// Returns true if we consider statistics stored in the deprecated fields 'min' and - /// 'max' in parquet::Statistics to be correct for the type 'col_type_' and the column - /// order 'col_order_'. Otherwise, returns false. - bool CanUseDeprecatedStats() const; - - /// Decodes 'stat_value' and does INT64->TimestampValue and timezone conversions if - /// necessary. Returns true if the decoding and conversions were successful. - bool DecodeTimestamp(const std::string& stat_value, - ColumnStatsReader::StatsField stats_field, - TimestampValue* slot) const; - - const parquet::ColumnChunk& col_chunk_; - const ColumnType& col_type_; - const parquet::ColumnOrder* col_order_; - const parquet::SchemaElement& element_; - ParquetTimestampDecoder timestamp_decoder_; -}; -} // end ns impala -#endif http://git-wip-us.apache.org/repos/asf/impala/blob/07fd3320/be/src/exec/parquet-column-stats.inline.h ---------------------------------------------------------------------- diff --git a/be/src/exec/parquet-column-stats.inline.h b/be/src/exec/parquet-column-stats.inline.h deleted file mode 100644 index 6e78b82..0000000 --- a/be/src/exec/parquet-column-stats.inline.h +++ /dev/null @@ -1,254 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#ifndef IMPALA_EXEC_PARQUET_COLUMN_STATS_INLINE_H -#define IMPALA_EXEC_PARQUET_COLUMN_STATS_INLINE_H - -#include "exec/parquet-common.h" -#include "gen-cpp/parquet_types.h" -#include "parquet-column-stats.h" -#include "runtime/string-value.inline.h" - -namespace impala { - -inline void ColumnStatsBase::Reset() { - has_min_max_values_ = false; - null_count_ = 0; - ascending_boundary_order_ = true; - descending_boundary_order_ = true; -} - -template <typename T> -inline void ColumnStats<T>::Update(const T& min_value, const T& max_value) { - if (!has_min_max_values_) { - has_min_max_values_ = true; - min_value_ = min_value; - max_value_ = max_value; - } else { - min_value_ = MinMaxTrait<T>::MinValue(min_value_, min_value); - max_value_ = MinMaxTrait<T>::MaxValue(max_value_, max_value); - } -} - -template <typename T> -inline void ColumnStats<T>::Merge(const ColumnStatsBase& other) { - DCHECK(dynamic_cast<const ColumnStats<T>*>(&other)); - const ColumnStats<T>* cs = static_cast<const ColumnStats<T>*>(&other); - if (cs->has_min_max_values_) { - if (has_min_max_values_) { - if (ascending_boundary_order_) { - if (MinMaxTrait<T>::Compare(prev_page_max_value_, cs->max_value_) > 0 || - MinMaxTrait<T>::Compare(prev_page_min_value_, cs->min_value_) > 0) { - ascending_boundary_order_ = false; - } - } - if (descending_boundary_order_) { - if (MinMaxTrait<T>::Compare(prev_page_max_value_, cs->max_value_) < 0 || - MinMaxTrait<T>::Compare(prev_page_min_value_, cs->min_value_) < 0) { - descending_boundary_order_ = false; - } - } - } - Update(cs->min_value_, cs->max_value_); - prev_page_min_value_ = cs->min_value_; - prev_page_max_value_ = cs->max_value_; - } - IncrementNullCount(cs->null_count_); -} - -template <typename T> -inline int64_t ColumnStats<T>::BytesNeeded() const { - return BytesNeeded(min_value_) + BytesNeeded(max_value_) - + ParquetPlainEncoder::ByteSize(null_count_); -} - -template <typename T> -inline void ColumnStats<T>::EncodeToThrift(parquet::Statistics* out) const { - if (has_min_max_values_) { - std::string min_str; - EncodePlainValue(min_value_, BytesNeeded(min_value_), &min_str); - out->__set_min_value(move(min_str)); - std::string max_str; - EncodePlainValue(max_value_, BytesNeeded(max_value_), &max_str); - out->__set_max_value(move(max_str)); - } - out->__set_null_count(null_count_); -} - -template <typename T> -inline void ColumnStats<T>::EncodePlainValue( - const T& v, int64_t bytes_needed, std::string* out) { - DCHECK_GT(bytes_needed, 0); - out->resize(bytes_needed); - const int64_t bytes_written = ParquetPlainEncoder::Encode( - v, bytes_needed, reinterpret_cast<uint8_t*>(&(*out)[0])); - DCHECK_EQ(bytes_needed, bytes_written); -} - -template <typename T> -inline bool ColumnStats<T>::DecodePlainValue(const std::string& buffer, void* slot, - parquet::Type::type parquet_type) { - T* result = reinterpret_cast<T*>(slot); - int size = buffer.size(); - const uint8_t* data = reinterpret_cast<const uint8_t*>(buffer.data()); - if (ParquetPlainEncoder::DecodeByParquetType<T>(data, data + size, size, result, - parquet_type) == -1) { - return false; - } - return true; -} - -template <typename T> -inline int64_t ColumnStats<T>::BytesNeeded(const T& v) const { - return plain_encoded_value_size_ < 0 ? ParquetPlainEncoder::ByteSize<T>(v) : - plain_encoded_value_size_; -} - -/// Plain encoding for Boolean values is not handled by the ParquetPlainEncoder and thus -/// needs special handling here. -template <> -inline void ColumnStats<bool>::EncodePlainValue( - const bool& v, int64_t bytes_needed, std::string* out) { - char c = v; - out->assign(1, c); -} - -template <> -inline bool ColumnStats<bool>::DecodePlainValue(const std::string& buffer, void* slot, - parquet::Type::type parquet_type) { - bool* result = reinterpret_cast<bool*>(slot); - DCHECK(buffer.size() == 1); - *result = (buffer[0] != 0); - return true; -} - -template <> -inline int64_t ColumnStats<bool>::BytesNeeded(const bool& v) const { - return 1; -} - -/// Timestamp values need validation. -template <> -inline bool ColumnStats<TimestampValue>::DecodePlainValue( - const std::string& buffer, void* slot, parquet::Type::type parquet_type) { - TimestampValue* result = reinterpret_cast<TimestampValue*>(slot); - int size = buffer.size(); - const uint8_t* data = reinterpret_cast<const uint8_t*>(buffer.data()); - if (parquet_type == parquet::Type::INT96) { - if (ParquetPlainEncoder::Decode<TimestampValue, parquet::Type::INT96>(data, - data + size, size, result) == -1) { - return false; - } - } else { - DCHECK(false); - return false; - } - - // We don't need to convert the value here, because it is done by the caller. - // If this function were not static, then it would be possible to store the information - // needed for timezone conversion in the object and do the conversion here. - return TimestampValue::IsValidDate(result->date()); -} - -/// parquet::Statistics stores string values directly and does not use plain encoding. -template <> -inline void ColumnStats<StringValue>::EncodePlainValue( - const StringValue& v, int64_t bytes_needed, string* out) { - out->assign(v.ptr, v.len); -} - -template <> -inline bool ColumnStats<StringValue>::DecodePlainValue( - const std::string& buffer, void* slot, parquet::Type::type parquet_type) { - StringValue* result = reinterpret_cast<StringValue*>(slot); - result->ptr = const_cast<char*>(buffer.data()); - result->len = buffer.size(); - return true; -} - -template <> -inline void ColumnStats<StringValue>::Update( - const StringValue& min_value, const StringValue& max_value) { - if (!has_min_max_values_) { - has_min_max_values_ = true; - min_value_ = min_value; - min_buffer_.Clear(); - max_value_ = max_value; - max_buffer_.Clear(); - } else { - if (min_value < min_value_) { - min_value_ = min_value; - min_buffer_.Clear(); - } - if (max_value > max_value_) { - max_value_ = max_value; - max_buffer_.Clear(); - } - } -} - -template <> -inline void ColumnStats<StringValue>::Merge(const ColumnStatsBase& other) { - DCHECK(dynamic_cast<const ColumnStats<StringValue>*>(&other)); - const ColumnStats<StringValue>* cs = static_cast< - const ColumnStats<StringValue>*>(&other); - if (cs->has_min_max_values_) { - if (has_min_max_values_) { - // Make sure that we copied the previous page's min/max values to their own buffer. - DCHECK_NE(static_cast<void*>(prev_page_min_value_.ptr), - static_cast<void*>(cs->min_value_.ptr)); - DCHECK_NE(static_cast<void*>(prev_page_max_value_.ptr), - static_cast<void*>(cs->max_value_.ptr)); - if (ascending_boundary_order_) { - if (prev_page_max_value_ > cs->max_value_ || - prev_page_min_value_ > cs->min_value_) { - ascending_boundary_order_ = false; - } - } - if (descending_boundary_order_) { - if (prev_page_max_value_ < cs->max_value_ || - prev_page_min_value_ < cs->min_value_) { - descending_boundary_order_ = false; - } - } - } - Update(cs->min_value_, cs->max_value_); - prev_page_min_value_ = cs->min_value_; - prev_page_max_value_ = cs->max_value_; - prev_page_min_buffer_.Clear(); - prev_page_max_buffer_.Clear(); - } - IncrementNullCount(cs->null_count_); -} - -// StringValues need to be copied at the end of processing a row batch, since the batch -// memory will be released. -template <> -inline Status ColumnStats<StringValue>::MaterializeStringValuesToInternalBuffers() { - if (min_buffer_.IsEmpty()) RETURN_IF_ERROR(CopyToBuffer(&min_buffer_, &min_value_)); - if (max_buffer_.IsEmpty()) RETURN_IF_ERROR(CopyToBuffer(&max_buffer_, &max_value_)); - if (prev_page_min_buffer_.IsEmpty()) { - RETURN_IF_ERROR(CopyToBuffer(&prev_page_min_buffer_, &prev_page_min_value_)); - } - if (prev_page_max_buffer_.IsEmpty()) { - RETURN_IF_ERROR(CopyToBuffer(&prev_page_max_buffer_, &prev_page_max_value_)); - } - return Status::OK(); -} - -} // end ns impala -#endif http://git-wip-us.apache.org/repos/asf/impala/blob/07fd3320/be/src/exec/parquet-common.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/parquet-common.cc b/be/src/exec/parquet-common.cc deleted file mode 100644 index f967f9f..0000000 --- a/be/src/exec/parquet-common.cc +++ /dev/null @@ -1,132 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "exec/parquet-common.h" - -namespace impala { - -/// Mapping of impala's internal types to parquet storage types. This is indexed by -/// PrimitiveType enum -const parquet::Type::type INTERNAL_TO_PARQUET_TYPES[] = { - parquet::Type::BOOLEAN, // Invalid - parquet::Type::BOOLEAN, // NULL type - parquet::Type::BOOLEAN, - parquet::Type::INT32, - parquet::Type::INT32, - parquet::Type::INT32, - parquet::Type::INT64, - parquet::Type::FLOAT, - parquet::Type::DOUBLE, - parquet::Type::INT96, // Timestamp - parquet::Type::BYTE_ARRAY, // String - parquet::Type::BYTE_ARRAY, // Date, NYI - parquet::Type::BYTE_ARRAY, // DateTime, NYI - parquet::Type::BYTE_ARRAY, // Binary NYI - parquet::Type::FIXED_LEN_BYTE_ARRAY, // Decimal - parquet::Type::BYTE_ARRAY, // VARCHAR(N) - parquet::Type::BYTE_ARRAY, // CHAR(N) -}; - -const int INTERNAL_TO_PARQUET_TYPES_SIZE = - sizeof(INTERNAL_TO_PARQUET_TYPES) / sizeof(INTERNAL_TO_PARQUET_TYPES[0]); - -/// Mapping of Parquet codec enums to Impala enums -const THdfsCompression::type PARQUET_TO_IMPALA_CODEC[] = { - THdfsCompression::NONE, - THdfsCompression::SNAPPY, - THdfsCompression::GZIP, - THdfsCompression::LZO -}; - -const int PARQUET_TO_IMPALA_CODEC_SIZE = - sizeof(PARQUET_TO_IMPALA_CODEC) / sizeof(PARQUET_TO_IMPALA_CODEC[0]); - -/// Mapping of Impala codec enums to Parquet enums -const parquet::CompressionCodec::type IMPALA_TO_PARQUET_CODEC[] = { - parquet::CompressionCodec::UNCOMPRESSED, - parquet::CompressionCodec::SNAPPY, // DEFAULT - parquet::CompressionCodec::GZIP, // GZIP - parquet::CompressionCodec::GZIP, // DEFLATE - parquet::CompressionCodec::SNAPPY, - parquet::CompressionCodec::SNAPPY, // SNAPPY_BLOCKED - parquet::CompressionCodec::LZO, -}; - -const int IMPALA_TO_PARQUET_CODEC_SIZE = - sizeof(IMPALA_TO_PARQUET_CODEC) / sizeof(IMPALA_TO_PARQUET_CODEC[0]); - -parquet::Type::type ConvertInternalToParquetType(PrimitiveType type) { - DCHECK_GE(type, 0); - DCHECK_LT(type, INTERNAL_TO_PARQUET_TYPES_SIZE); - return INTERNAL_TO_PARQUET_TYPES[type]; -} - -THdfsCompression::type ConvertParquetToImpalaCodec( - parquet::CompressionCodec::type codec) { - DCHECK_GE(codec, 0); - DCHECK_LT(codec, PARQUET_TO_IMPALA_CODEC_SIZE); - return PARQUET_TO_IMPALA_CODEC[codec]; -} - -parquet::CompressionCodec::type ConvertImpalaToParquetCodec( - THdfsCompression::type codec) { - DCHECK_GE(codec, 0); - DCHECK_LT(codec, IMPALA_TO_PARQUET_CODEC_SIZE); - return IMPALA_TO_PARQUET_CODEC[codec]; -} - -ParquetTimestampDecoder::ParquetTimestampDecoder(const parquet::SchemaElement& e, - const Timezone* timezone, bool convert_int96_timestamps) { - bool needs_conversion = false; - if (e.__isset.logicalType) { - DCHECK(e.logicalType.__isset.TIMESTAMP); - needs_conversion = e.logicalType.TIMESTAMP.isAdjustedToUTC; - precision_ = e.logicalType.TIMESTAMP.unit.__isset.MILLIS - ? ParquetTimestampDecoder::MILLI : ParquetTimestampDecoder::MICRO; - } else { - if (e.__isset.converted_type) { - // Timestamp with converted type but without logical type are/were never written - // by Impala, so it is assumed that the writer is Parquet-mr and that timezone - // conversion is needed. - needs_conversion = true; - precision_ = e.converted_type == parquet::ConvertedType::TIMESTAMP_MILLIS - ? ParquetTimestampDecoder::MILLI : ParquetTimestampDecoder::MICRO; - } else { - // INT96 timestamps needs conversion depending on the writer. - needs_conversion = convert_int96_timestamps; - precision_ = ParquetTimestampDecoder::NANO; - } - } - if (needs_conversion) timezone_ = timezone; -} - -void ParquetTimestampDecoder::ConvertMinStatToLocalTime(TimestampValue* v) const { - DCHECK(timezone_ != nullptr); - if (!v->HasDateAndTime()) return; - TimestampValue repeated_period_start; - v->UtcToLocal(*timezone_, &repeated_period_start); - if (repeated_period_start.HasDateAndTime()) *v = repeated_period_start; -} - -void ParquetTimestampDecoder::ConvertMaxStatToLocalTime(TimestampValue* v) const { - DCHECK(timezone_ != nullptr); - if (!v->HasDateAndTime()) return; - TimestampValue repeated_period_end; - v->UtcToLocal(*timezone_, nullptr, &repeated_period_end); - if (repeated_period_end.HasDateAndTime()) *v = repeated_period_end; -} -} http://git-wip-us.apache.org/repos/asf/impala/blob/07fd3320/be/src/exec/parquet-common.h ---------------------------------------------------------------------- diff --git a/be/src/exec/parquet-common.h b/be/src/exec/parquet-common.h deleted file mode 100644 index 806db45..0000000 --- a/be/src/exec/parquet-common.h +++ /dev/null @@ -1,535 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - - -#ifndef IMPALA_EXEC_PARQUET_COMMON_H -#define IMPALA_EXEC_PARQUET_COMMON_H - -#include "common/compiler-util.h" -#include "gen-cpp/Descriptors_types.h" -#include "gen-cpp/parquet_types.h" -#include "runtime/decimal-value.h" -#include "runtime/string-value.h" -#include "runtime/timestamp-value.inline.h" -#include "util/bit-util.h" -#include "util/decimal-util.h" -#include "util/mem-util.h" - -/// This file contains common elements between the parquet Writer and Scanner. -namespace impala { - -const uint8_t PARQUET_VERSION_NUMBER[4] = {'P', 'A', 'R', '1'}; -const uint32_t PARQUET_CURRENT_VERSION = 1; - -/// Return the Parquet type corresponding to Impala's internal type. The caller must -/// validate that the type is valid, otherwise this will DCHECK. -parquet::Type::type ConvertInternalToParquetType(PrimitiveType type); - -/// Return the Impala compression type for the given Parquet codec. The caller must -/// validate that the codec is a supported one, otherwise this will DCHECK. -THdfsCompression::type ConvertParquetToImpalaCodec(parquet::CompressionCodec::type codec); - -/// Return the Parquet code for the given Impala compression type. The caller must -/// validate that the codec is a supported one, otherwise this will DCHECK. -parquet::CompressionCodec::type ConvertImpalaToParquetCodec( - THdfsCompression::type codec); - -/// The plain encoding does not maintain any state so all these functions -/// are static helpers. -/// TODO: we are using templates to provide a generic interface (over the -/// types) to avoid performance penalties. This makes the code more complex -/// and should be removed when we have codegen support to inline virtual -/// calls. -class ParquetPlainEncoder { - public: - /// Returns the byte size of 'v' where InternalType is the datatype that Impala uses - /// internally to store tuple data. Used in some template function implementations to - /// determine the encoded byte size for fixed-length types. - template <typename InternalType> - static int ByteSize(const InternalType& v) { return sizeof(InternalType); } - - /// Returns the encoded size of values of type t. Returns -1 if it is variable - /// length. This can be different than the slot size of the types. - static int EncodedByteSize(const ColumnType& t) { - switch (t.type) { - case TYPE_STRING: - case TYPE_VARCHAR: - case TYPE_CHAR: - // CHAR is varlen here because we don't write the padding to the file - return -1; - case TYPE_TINYINT: - case TYPE_SMALLINT: - case TYPE_INT: - case TYPE_FLOAT: - return 4; - case TYPE_BIGINT: - case TYPE_DOUBLE: - return 8; - case TYPE_TIMESTAMP: - return 12; - case TYPE_DECIMAL: - return DecimalSize(t); - case TYPE_NULL: - case TYPE_BOOLEAN: // These types are not plain encoded. - default: - DCHECK(false); - return -1; - } - } - - /// The minimum byte size to store decimals of with precision t.precision. - static int DecimalSize(const ColumnType& t) { - DCHECK(t.type == TYPE_DECIMAL); - // Numbers in the comment is the max positive value that can be represented - // with those number of bits (max negative is -(X + 1)). - // TODO: use closed form for this? - switch (t.precision) { - case 1: case 2: - return 1; // 127 - case 3: case 4: - return 2; // 32,767 - case 5: case 6: - return 3; // 8,388,607 - case 7: case 8: case 9: - return 4; // 2,147,483,427 - case 10: case 11: - return 5; // 549,755,813,887 - case 12: case 13: case 14: - return 6; // 140,737,488,355,327 - case 15: case 16: - return 7; // 36,028,797,018,963,967 - case 17: case 18: - return 8; // 9,223,372,036,854,775,807 - case 19: case 20: case 21: - return 9; // 2,361,183,241,434,822,606,847 - case 22: case 23: - return 10; // 604,462,909,807,314,587,353,087 - case 24: case 25: case 26: - return 11; // 154,742,504,910,672,534,362,390,527 - case 27: case 28: - return 12; // 39,614,081,257,132,168,796,771,975,167 - case 29: case 30: case 31: - return 13; // 10,141,204,801,825,835,211,973,625,643,007 - case 32: case 33: - return 14; // 2,596,148,429,267,413,814,265,248,164,610,047 - case 34: case 35: - return 15; // 664,613,997,892,457,936,451,903,530,140,172,287 - case 36: case 37: case 38: - return 16; // 170,141,183,460,469,231,731,687,303,715,884,105,727 - default: - DCHECK(false); - break; - } - return -1; - } - - /// Encodes t into buffer. Returns the number of bytes added. buffer must - /// be preallocated and big enough. Buffer need not be aligned. - /// 'fixed_len_size' is only applicable for data encoded using FIXED_LEN_BYTE_ARRAY and - /// is the number of bytes the plain encoder should use. - template <typename InternalType> - static int Encode(const InternalType& t, int fixed_len_size, uint8_t* buffer) { - memcpy(buffer, &t, ByteSize(t)); - return ByteSize(t); - } - - template <typename InternalType> - static int DecodeByParquetType(const uint8_t* buffer, const uint8_t* buffer_end, - int fixed_len_size, InternalType* v, parquet::Type::type parquet_type) { - switch (parquet_type) { - case parquet::Type::BOOLEAN: - return ParquetPlainEncoder::Decode<InternalType, parquet::Type::BOOLEAN>(buffer, - buffer_end, fixed_len_size, v); - case parquet::Type::INT32: - return ParquetPlainEncoder::Decode<InternalType, parquet::Type::INT32>(buffer, - buffer_end, fixed_len_size, v); - case parquet::Type::INT64: - return ParquetPlainEncoder::Decode<InternalType, parquet::Type::INT64>(buffer, - buffer_end, fixed_len_size, v); - case parquet::Type::INT96: - return ParquetPlainEncoder::Decode<InternalType, parquet::Type::INT96>(buffer, - buffer_end, fixed_len_size, v); - case parquet::Type::FLOAT: - return ParquetPlainEncoder::Decode<InternalType, parquet::Type::FLOAT>(buffer, - buffer_end, fixed_len_size, v); - case parquet::Type::DOUBLE: - return ParquetPlainEncoder::Decode<InternalType, parquet::Type::DOUBLE>(buffer, - buffer_end, fixed_len_size, v); - case parquet::Type::BYTE_ARRAY: - return ParquetPlainEncoder::Decode<InternalType, - parquet::Type::BYTE_ARRAY>(buffer, buffer_end, fixed_len_size, v); - case parquet::Type::FIXED_LEN_BYTE_ARRAY: - return ParquetPlainEncoder::Decode<InternalType, - parquet::Type::FIXED_LEN_BYTE_ARRAY>(buffer, buffer_end, fixed_len_size, v); - default: - DCHECK(false) << "Unexpected physical type"; - return -1; - } - } - - /// Decodes t from 'buffer', reading up to the byte before 'buffer_end'. 'buffer' - /// need not be aligned. If PARQUET_TYPE is FIXED_LEN_BYTE_ARRAY then 'fixed_len_size' - /// is the size of the object. Otherwise, it is unused. - /// Returns the number of bytes read or -1 if the value was not decoded successfully. - /// This generic template function is used with the following types: - /// ============================= - /// InternalType | PARQUET_TYPE - /// ============================= - /// int32_t | INT32 - /// int64_t | INT64 - /// float | FLOAT - /// double | DOUBLE - /// Decimal4Value | INT32 - /// Decimal8Value | INT64 - /// TimestampValue | INT96 - template <typename InternalType, parquet::Type::type PARQUET_TYPE> - static int Decode(const uint8_t* buffer, const uint8_t* buffer_end, int fixed_len_size, - InternalType* v) { - int byte_size = ByteSize(*v); - if (UNLIKELY(buffer_end - buffer < byte_size)) return -1; - memcpy(v, buffer, byte_size); - return byte_size; - } - - /// Batched version of Decode() that tries to decode 'num_values' values from the memory - /// range [buffer, buffer_end) and writes them to 'v' with a stride of 'stride' bytes. - /// Returns the number of bytes read from 'buffer' or -1 if there was an error - /// decoding, e.g. invalid data or running out of input data before reading - /// 'num_values'. - template <typename InternalType, parquet::Type::type PARQUET_TYPE> - static int64_t DecodeBatch(const uint8_t* buffer, const uint8_t* buffer_end, - int fixed_len_size, int64_t num_values, int64_t stride, InternalType* v); -}; - -/// Calling this with arguments of type ColumnType is certainly a programmer error, so we -/// disallow it. -template <> int ParquetPlainEncoder::ByteSize(const ColumnType& t); - -/// Disable for bools. Plain encoding is not used for booleans. -template <> int ParquetPlainEncoder::ByteSize(const bool& b); -template <> int ParquetPlainEncoder::Encode(const bool&, int fixed_len_size, uint8_t*); -template <> int ParquetPlainEncoder::Decode<bool, parquet::Type::BOOLEAN>(const uint8_t*, - const uint8_t*, int fixed_len_size, bool* v); - -template <> -inline int ParquetPlainEncoder::ByteSize(const Decimal4Value&) { - // Only used when the decimal is stored as INT32. - return sizeof(Decimal4Value::StorageType); -} -template <> -inline int ParquetPlainEncoder::ByteSize(const Decimal8Value&) { - // Only used when the decimal is stored as INT64. - return sizeof(Decimal8Value::StorageType); -} -template <> -inline int ParquetPlainEncoder::ByteSize(const Decimal16Value&) { - // Not used, since such big decimals can only be stored as BYTE_ARRAY or - // FIXED_LEN_BYTE_ARRAY. - DCHECK(false); - return -1; -} - -/// Parquet doesn't have 8-bit or 16-bit ints. They are converted to 32-bit. -template <> -inline int ParquetPlainEncoder::ByteSize(const int8_t& v) { return sizeof(int32_t); } -template <> -inline int ParquetPlainEncoder::ByteSize(const int16_t& v) { return sizeof(int32_t); } - -template <> -inline int ParquetPlainEncoder::ByteSize(const StringValue& v) { - return sizeof(int32_t) + v.len; -} - -template <> -inline int ParquetPlainEncoder::ByteSize(const TimestampValue& v) { - return 12; -} - -template <typename From, typename To> -inline int DecodeWithConversion(const uint8_t* buffer, const uint8_t* buffer_end, To* v) { - int byte_size = sizeof(From); - if (UNLIKELY(buffer_end - buffer < byte_size)) return -1; - From dest; - memcpy(&dest, buffer, byte_size); - *v = dest; - return byte_size; -} - -template <> -inline int ParquetPlainEncoder::Decode<int64_t, parquet::Type::INT32>( - const uint8_t* buffer, const uint8_t* buffer_end, int fixed_len_size, int64_t* v) { - return DecodeWithConversion<int32_t, int64_t>(buffer, buffer_end, v); -} - -template <> -inline int ParquetPlainEncoder::Decode<double, parquet::Type::INT32>( - const uint8_t* buffer, const uint8_t* buffer_end, int fixed_len_size, double* v) { - return DecodeWithConversion<int32_t, double>(buffer, buffer_end, v); -} - -template <> -inline int ParquetPlainEncoder::Decode<double, parquet::Type::FLOAT>( - const uint8_t* buffer, const uint8_t* buffer_end, int fixed_len_size, double* v) { - return DecodeWithConversion<float, double>(buffer, buffer_end, v); -} - -template <> -inline int ParquetPlainEncoder::Decode<int8_t, parquet::Type::INT32>( - const uint8_t* buffer, const uint8_t* buffer_end, int fixed_len_size, int8_t* v) { - int byte_size = ByteSize(*v); - if (UNLIKELY(buffer_end - buffer < byte_size)) return -1; - *v = *buffer; - return byte_size; -} -template <> -inline int ParquetPlainEncoder::Decode<int16_t, parquet::Type::INT32>( - const uint8_t* buffer, const uint8_t* buffer_end, int fixed_len_size, int16_t* v) { - int byte_size = ByteSize(*v); - if (UNLIKELY(buffer_end - buffer < byte_size)) return -1; - memcpy(v, buffer, sizeof(int16_t)); - return byte_size; -} - -template<typename T> -inline int EncodeToInt32(const T& v, int fixed_len_size, uint8_t* buffer) { - int32_t val = v; - memcpy(buffer, &val, sizeof(int32_t)); - return ParquetPlainEncoder::ByteSize(v); -} - -template <> -inline int ParquetPlainEncoder::Encode( - const int8_t& v, int fixed_len_size, uint8_t* buffer) { - return EncodeToInt32(v, fixed_len_size, buffer); -} - -template <> -inline int ParquetPlainEncoder::Encode( - const int16_t& v, int fixed_len_size, uint8_t* buffer) { - return EncodeToInt32(v, fixed_len_size, buffer); -} - -template <> -inline int ParquetPlainEncoder::Encode( - const StringValue& v, int fixed_len_size, uint8_t* buffer) { - memcpy(buffer, &v.len, sizeof(int32_t)); - memcpy(buffer + sizeof(int32_t), v.ptr, v.len); - return ByteSize(v); -} - -template <> -inline int ParquetPlainEncoder::Decode<StringValue, parquet::Type::BYTE_ARRAY>( - const uint8_t* buffer, const uint8_t* buffer_end, int fixed_len_size, - StringValue* v) { - if (UNLIKELY(buffer_end - buffer < sizeof(int32_t))) return -1; - memcpy(&v->len, buffer, sizeof(int32_t)); - int byte_size = ByteSize(*v); - if (UNLIKELY(v->len < 0 || buffer_end - buffer < byte_size)) return -1; - v->ptr = reinterpret_cast<char*>(const_cast<uint8_t*>(buffer)) + sizeof(int32_t); - if (fixed_len_size > 0) v->len = std::min(v->len, fixed_len_size); - // we still read byte_size bytes, even if we truncate - return byte_size; -} - -/// Write decimals as big endian (byte comparable) to benefit from common prefixes. -/// fixed_len_size can be less than sizeof(Decimal*Value) for space savings. This means -/// that the value in the in-memory format has leading zeros or negative 1's. -/// For example, precision 2 fits in 1 byte. All decimals stored as Decimal4Value -/// will have 3 bytes of leading zeros, we will only store the interesting byte. -template<typename T> -inline int EncodeDecimal(const T& v, int fixed_len_size, uint8_t* buffer) { - DecimalUtil::EncodeToFixedLenByteArray(buffer, fixed_len_size, v); - return fixed_len_size; -} - -template <> -inline int ParquetPlainEncoder::Encode( - const Decimal4Value& v, int fixed_len_size, uint8_t* buffer) { - return EncodeDecimal(v, fixed_len_size, buffer); -} - -template <> -inline int ParquetPlainEncoder::Encode( - const Decimal8Value& v, int fixed_len_size, uint8_t* buffer) { - return EncodeDecimal(v, fixed_len_size, buffer); -} - -template <> -inline int ParquetPlainEncoder::Encode( - const Decimal16Value& v, int fixed_len_size, uint8_t* buffer) { - return EncodeDecimal(v, fixed_len_size, buffer); -} - -template <typename InternalType, parquet::Type::type PARQUET_TYPE> -inline int64_t ParquetPlainEncoder::DecodeBatch(const uint8_t* buffer, - const uint8_t* buffer_end, int fixed_len_size, int64_t num_values, int64_t stride, - InternalType* v) { - const uint8_t* buffer_pos = buffer; - StrideWriter<InternalType> out(v, stride); - for (int64_t i = 0; i < num_values; ++i) { - int encoded_len = Decode<InternalType, PARQUET_TYPE>( - buffer_pos, buffer_end, fixed_len_size, out.Advance()); - if (UNLIKELY(encoded_len < 0)) return -1; - buffer_pos += encoded_len; - } - return buffer_pos - buffer; -} - -template <typename T> -inline int DecodeDecimalFixedLen( - const uint8_t* buffer, const uint8_t* buffer_end, int fixed_len_size, T* v) { - if (UNLIKELY(buffer_end - buffer < fixed_len_size)) return -1; - DecimalUtil::DecodeFromFixedLenByteArray(buffer, fixed_len_size, v); - return fixed_len_size; -} - -template <> -inline int ParquetPlainEncoder:: -Decode<Decimal4Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(const uint8_t* buffer, - const uint8_t* buffer_end, int fixed_len_size, Decimal4Value* v) { - return DecodeDecimalFixedLen(buffer, buffer_end, fixed_len_size, v); -} - -template <> -inline int ParquetPlainEncoder:: -Decode<Decimal8Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(const uint8_t* buffer, - const uint8_t* buffer_end, int fixed_len_size, Decimal8Value* v) { - return DecodeDecimalFixedLen(buffer, buffer_end, fixed_len_size, v); -} - -template <> -inline int ParquetPlainEncoder:: -Decode<Decimal16Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(const uint8_t* buffer, - const uint8_t* buffer_end, int fixed_len_size, Decimal16Value* v) { - return DecodeDecimalFixedLen(buffer, buffer_end, fixed_len_size, v); -} - -/// Helper method to decode Decimal type stored as variable length byte array. -template<typename T> -inline int DecodeDecimalByteArray(const uint8_t* buffer, const uint8_t* buffer_end, - int fixed_len_size, T* v) { - if (UNLIKELY(buffer_end - buffer < sizeof(int32_t))) return -1; - int encoded_byte_size; - memcpy(&encoded_byte_size, buffer, sizeof(int32_t)); - int byte_size = sizeof(int32_t) + encoded_byte_size; - if (UNLIKELY(encoded_byte_size < 0 || buffer_end - buffer < byte_size)) return -1; - uint8_t* val_ptr = const_cast<uint8_t*>(buffer) + sizeof(int32_t); - DecimalUtil::DecodeFromFixedLenByteArray(val_ptr, encoded_byte_size, v); - return byte_size; -} - -template <> -inline int ParquetPlainEncoder::Decode<Decimal4Value, parquet::Type::BYTE_ARRAY>( - const uint8_t* buffer, const uint8_t* buffer_end, int fixed_len_size, - Decimal4Value* v) { - return DecodeDecimalByteArray(buffer, buffer_end, fixed_len_size, v); -} - -template <> -inline int ParquetPlainEncoder::Decode<Decimal8Value, parquet::Type::BYTE_ARRAY>( - const uint8_t* buffer, const uint8_t* buffer_end, int fixed_len_size, - Decimal8Value* v) { - return DecodeDecimalByteArray(buffer, buffer_end, fixed_len_size, v); -} - -template <> -inline int ParquetPlainEncoder::Decode<Decimal16Value, parquet::Type::BYTE_ARRAY>( - const uint8_t* buffer, const uint8_t* buffer_end, int fixed_len_size, - Decimal16Value* v) { - return DecodeDecimalByteArray(buffer, buffer_end, fixed_len_size, v); -} - -/// Helper class that contains the parameters needed for Timestamp decoding. -/// Can be safely passed by value. -class ParquetTimestampDecoder { -public: - ParquetTimestampDecoder() {} - - ParquetTimestampDecoder( const parquet::SchemaElement& e, const Timezone* timezone, - bool convert_int96_timestamps); - - bool NeedsConversion() const { return timezone_ != nullptr; } - - /// Decodes next PARQUET_TYPE from 'buffer', reading up to the byte before 'buffer_end' - /// and converts it TimestampValue. 'buffer' need not be aligned. - template <parquet::Type::type PARQUET_TYPE> - int Decode(const uint8_t* buffer, const uint8_t* buffer_end, TimestampValue* v) const; - - TimestampValue Int64ToTimestampValue(int64_t unix_time) const { - DCHECK(precision_ == MILLI || precision_ == MICRO); - return precision_ == MILLI - ? TimestampValue::UtcFromUnixTimeMillis(unix_time) - : TimestampValue::UtcFromUnixTimeMicros(unix_time); - } - - void ConvertToLocalTime(TimestampValue* v) const { - DCHECK(timezone_ != nullptr); - if (v->HasDateAndTime()) v->UtcToLocal(*timezone_); - } - - /// Timezone conversion of min/max stats need some extra logic because UTC->local - /// conversion can change ordering near timezone rule changes. The max value is - /// increased and min value is decreased to avoid incorrectly dropping column chunks - /// (or pages once IMPALA-5843 is ready). - - /// If timestamp t >= v before conversion, then this function converts v in such a - /// way that the same will be true after t is converted. - void ConvertMinStatToLocalTime(TimestampValue* v) const; - - /// If timestamp t <= v before conversion, then this function converts v in such a - /// way that the same will be true after t is converted. - void ConvertMaxStatToLocalTime(TimestampValue* v) const; - -private: - enum Precision { MILLI, MICRO, NANO }; - - /// Timezone used for UTC->Local conversions. If nullptr, no conversion is needed. - const Timezone* timezone_ = nullptr; - - /// Unit of the encoded timestamp. Used to decide between milli and microseconds during - /// INT64 decoding. INT64 with nanosecond precision (and reduced range) is also planned - /// to be implemented once it is added in Parquet (PARQUET-1387). - Precision precision_ = NANO; -}; - -template <> -inline int ParquetTimestampDecoder::Decode<parquet::Type::INT64>( - const uint8_t* buffer, const uint8_t* buffer_end, TimestampValue* v) const { - DCHECK(precision_ == MILLI || precision_ == MICRO); - int64_t unix_time; - int bytes_read = ParquetPlainEncoder::Decode<int64_t, parquet::Type::INT64>( - buffer, buffer_end, 0, &unix_time); - if (UNLIKELY(bytes_read < 0)) { - return bytes_read; - } - *v = Int64ToTimestampValue(unix_time); - // TODO: It would be more efficient to do the timezone conversion in the same step - // as the int64_t -> TimestampValue conversion. This would be also needed to - // move conversion/validation to dictionary construction (IMPALA-4994) and to - // implement dictionary filtering for TimestampValues. - return bytes_read; -} - -template <> -inline int ParquetTimestampDecoder::Decode<parquet::Type::INT96>( - const uint8_t* buffer, const uint8_t* buffer_end, TimestampValue* v) const { - DCHECK_EQ(precision_, NANO); - return ParquetPlainEncoder::Decode<TimestampValue, parquet::Type::INT96>( - buffer, buffer_end, 0, v); -} -} -#endif
